meerschaum.connectors.valkey
Import the ValkeyConnector
.
19@make_connector 20class ValkeyConnector(Connector): 21 """ 22 Manage a Valkey instance. 23 24 Build a `ValkeyConnector` from connection attributes or a URI string. 25 """ 26 IS_INSTANCE: bool = True 27 REQUIRED_ATTRIBUTES: List[str] = ['host'] 28 OPTIONAL_ATTRIBUTES: List[str] = [ 29 'port', 'username', 'password', 'db', 'socket_timeout', 30 ] 31 DEFAULT_ATTRIBUTES: Dict[str, Any] = { 32 'username': 'default', 33 'port': 6379, 34 'db': 0, 35 'socket_timeout': 300, 36 } 37 KEY_SEPARATOR: str = ':' 38 39 from ._pipes import ( 40 register_pipe, 41 get_pipe_id, 42 get_pipe_attributes, 43 edit_pipe, 44 pipe_exists, 45 drop_pipe, 46 delete_pipe, 47 get_pipe_data, 48 sync_pipe, 49 get_pipe_columns_types, 50 clear_pipe, 51 get_sync_time, 52 get_pipe_rowcount, 53 fetch_pipes_keys, 54 get_document_key, 55 get_table_quoted_doc_key, 56 ) 57 from ._fetch import ( 58 fetch, 59 ) 60 61 from ._users import ( 62 get_users_pipe, 63 get_user_key, 64 get_user_keys_vals, 65 register_user, 66 get_user_id, 67 edit_user, 68 get_user_attributes, 69 delete_user, 70 get_users, 71 get_user_password_hash, 72 get_user_type, 73 ) 74 from ._plugins import ( 75 get_plugins_pipe, 76 get_plugin_key, 77 get_plugin_keys_vals, 78 register_plugin, 79 get_plugin_id, 80 get_plugin_version, 81 get_plugin_user_id, 82 get_plugin_username, 83 get_plugin_attributes, 84 get_plugins, 85 delete_plugin, 86 ) 87 88 @property 89 def client(self): 90 """ 91 Return the Valkey client. 92 """ 93 if '_client' in self.__dict__: 94 return self.__dict__['_client'] 95 96 valkey = mrsm.attempt_import('valkey') 97 98 if 'uri' in self.__dict__: 99 self._client = valkey.Valkey.from_url(self.__dict__.get('uri')) 100 return self._client 101 102 optional_kwargs = { 103 key: self.__dict__.get(key) 104 for key in self.OPTIONAL_ATTRIBUTES 105 if key in self.__dict__ 106 } 107 connection_kwargs = { 108 'host': self.host, 109 **optional_kwargs 110 } 111 112 self._client = valkey.Valkey(**connection_kwargs) 113 return self._client 114 115 @property 116 def URI(self) -> str: 117 """ 118 Return the connection URI for this connector. 119 """ 120 import urllib.parse 121 122 if 'uri' in self.__dict__: 123 return self.__dict__.get('uri') 124 125 uri = "valkey://" 126 if 'username' in self.__dict__: 127 uri += urllib.parse.quote_plus(self.username) + ':' 128 129 if 'password' in self.__dict__: 130 uri += urllib.parse.quote_plus(self.password) + '@' 131 132 if 'host' in self.__dict__: 133 uri += self.host 134 135 if 'port' in self.__dict__: 136 uri += f':{self.port}' 137 138 if 'db' in self.__dict__: 139 uri += f"/{self.db}" 140 141 if 'socket_timeout' in self.__dict__: 142 uri += f"?timeout={self.socket_timeout}s" 143 144 return uri 145 146 def set(self, key: str, value: Any, **kwargs: Any) -> None: 147 """ 148 Set the `key` to `value`. 149 """ 150 return self.client.set(key, value, **kwargs) 151 152 def get(self, key: str) -> Union[str, None]: 153 """ 154 Get the value for `key`. 155 """ 156 val = self.client.get(key) 157 if val is None: 158 return None 159 160 return val.decode('utf-8') 161 162 def test_connection(self) -> bool: 163 """ 164 Return whether a connection may be established. 165 """ 166 return self.client.ping() 167 168 @classmethod 169 def quote_table(cls, table: str) -> str: 170 """ 171 Return a quoted key. 172 """ 173 return shlex.quote(table) 174 175 @classmethod 176 def get_counter_key(cls, table: str) -> str: 177 """ 178 Return the counter key for a given table. 179 """ 180 table_name = cls.quote_table(table) 181 return f"{table_name}:counter" 182 183 def push_df( 184 self, 185 df: 'pd.DataFrame', 186 table: str, 187 datetime_column: Optional[str] = None, 188 debug: bool = False, 189 ) -> int: 190 """ 191 Append a pandas DataFrame to a table. 192 193 Parameters 194 ---------- 195 df: pd.DataFrame 196 The pandas DataFrame to append to the table. 197 198 table: str 199 The "table" name (root key). 200 201 datetime_column: Optional[str], default None 202 If provided, use this key as the datetime index. 203 204 Returns 205 ------- 206 The current index counter value (how many docs have been pushed). 207 """ 208 from meerschaum.utils.dataframe import to_json 209 docs_str = to_json(df) 210 docs = json.loads(docs_str) 211 return self.push_docs( 212 docs, 213 table, 214 datetime_column=datetime_column, 215 debug=debug, 216 ) 217 218 def push_docs( 219 self, 220 docs: List[Dict[str, Any]], 221 table: str, 222 datetime_column: Optional[str] = None, 223 debug: bool = False, 224 ) -> int: 225 """ 226 Append a list of documents to a table. 227 228 Parameters 229 ---------- 230 docs: List[Dict[str, Any]] 231 The docs to be pushed. 232 All keys and values will be coerced into strings. 233 234 table: str 235 The "table" name (root key). 236 237 datetime_column: Optional[str], default None 238 If set, create a sorted set with this datetime column as the index. 239 Otherwise push the docs to a list. 240 241 Returns 242 ------- 243 The current index counter value (how many docs have been pushed). 244 """ 245 from meerschaum.utils.dtypes import json_serialize_value 246 table_name = self.quote_table(table) 247 datetime_column_key = self.get_datetime_column_key(table) 248 remote_datetime_column = self.get(datetime_column_key) 249 datetime_column = datetime_column or remote_datetime_column 250 dateutil_parser = mrsm.attempt_import('dateutil.parser') 251 252 old_len = ( 253 self.client.zcard(table_name) 254 if datetime_column 255 else self.client.scard(table_name) 256 ) 257 for doc in docs: 258 original_dt_val = ( 259 doc[datetime_column] 260 if datetime_column and datetime_column in doc 261 else 0 262 ) 263 dt_val = ( 264 dateutil_parser.parse(str(original_dt_val)) 265 if not isinstance(original_dt_val, int) 266 else int(original_dt_val) 267 ) if datetime_column else None 268 ts = ( 269 int(dt_val.replace(tzinfo=timezone.utc).timestamp()) 270 if isinstance(dt_val, datetime) 271 else int(dt_val) 272 ) if datetime_column else None 273 doc_str = json.dumps( 274 doc, 275 default=json_serialize_value, 276 separators=(',', ':'), 277 sort_keys=True, 278 ) 279 if datetime_column: 280 self.client.zadd(table_name, {doc_str: ts}) 281 else: 282 self.client.sadd(table_name, doc_str) 283 284 if datetime_column: 285 self.set(datetime_column_key, datetime_column) 286 new_len = ( 287 self.client.zcard(table_name) 288 if datetime_column 289 else self.client.scard(table_name) 290 ) 291 292 return new_len - old_len 293 294 def _push_hash_docs_to_list(self, docs: List[Dict[str, Any]], table: str) -> int: 295 table_name = self.quote_table(table) 296 next_ix = max(self.client.llen(table_name) or 0, 1) 297 for i, doc in enumerate(docs): 298 doc_key = f"{table_name}:{next_ix + i}" 299 self.client.hset( 300 doc_key, 301 mapping={ 302 str(k): str(v) 303 for k, v in doc.items() 304 }, 305 ) 306 self.client.rpush(table_name, doc_key) 307 308 return next_ix + len(docs) 309 310 def get_datetime_column_key(self, table: str) -> str: 311 """ 312 Return the key to store the datetime index for `table`. 313 """ 314 table_name = self.quote_table(table) 315 return f'{table_name}:datetime_column' 316 317 def read( 318 self, 319 table: str, 320 begin: Union[datetime, int, str, None] = None, 321 end: Union[datetime, int, str, None] = None, 322 params: Optional[Dict[str, Any]] = None, 323 datetime_column: Optional[str] = None, 324 select_columns: Optional[List[str]] = None, 325 omit_columns: Optional[List[str]] = None, 326 debug: bool = False 327 ) -> Union['pd.DataFrame', None]: 328 """ 329 Query the table and return the result dataframe. 330 331 Parameters 332 ---------- 333 table: str 334 The "table" name to be queried. 335 336 begin: Union[datetime, int, str, None], default None 337 If provided, only return rows greater than or equal to this datetime. 338 339 end: Union[datetime, int, str, None], default None 340 If provided, only return rows older than this datetime. 341 342 params: Optional[Dict[str, Any]] 343 Additional Meerschaum filter parameters. 344 345 datetime_column: Optional[str], default None 346 If provided, use this column for the datetime index. 347 Otherwise infer from the table metadata. 348 349 select_columns: Optional[List[str]], default None 350 If provided, only return these columns. 351 352 omit_columns: Optional[List[str]], default None 353 If provided, do not include these columns in the result. 354 355 Returns 356 ------- 357 A Pandas DataFrame of the result, or `None`. 358 """ 359 from meerschaum.utils.dataframe import parse_df_datetimes, query_df 360 docs = self.read_docs( 361 table, 362 begin=begin, 363 end=end, 364 debug=debug, 365 ) 366 df = parse_df_datetimes(docs) 367 datetime_column_key = self.get_datetime_column_key(table) 368 datetime_column = datetime_column or self.get(datetime_column_key) 369 370 return query_df( 371 df, 372 begin=(begin if datetime_column is not None else None), 373 end=(end if datetime_column is not None else None), 374 params=params, 375 datetime_column=datetime_column, 376 select_columns=select_columns, 377 omit_columns=omit_columns, 378 inplace=True, 379 reset_index=True, 380 debug=debug, 381 ) 382 383 def read_docs( 384 self, 385 table: str, 386 begin: Union[datetime, int, str, None] = None, 387 end: Union[datetime, int, str, None] = None, 388 debug: bool = False, 389 ) -> List[Dict[str, str]]: 390 """ 391 Return a list of previously pushed docs. 392 393 Parameters 394 ---------- 395 table: str 396 The "table" name (root key) under which the docs were pushed. 397 398 begin: Union[datetime, int, str, None], default None 399 If provided and the table was created with a datetime index, only return documents 400 newer than this datetime. 401 If the table was not created with a datetime index and `begin` is an `int`, 402 return documents with a positional index greater than or equal to this value. 403 404 end: Union[datetime, int, str, None], default None 405 If provided and the table was created with a datetime index, only return documents 406 older than this datetime. 407 If the table was not created with a datetime index and `begin` is an `int`, 408 return documents with a positional index less than this value. 409 410 Returns 411 ------- 412 A list of dictionaries, where all keys and values are strings. 413 """ 414 from meerschaum.utils.dtypes import coerce_timezone 415 table_name = self.quote_table(table) 416 datetime_column_key = self.get_datetime_column_key(table) 417 datetime_column = self.get(datetime_column_key) 418 419 if debug: 420 dprint(f"Reading documents from '{table}' with {begin=}, {end=}") 421 422 if not datetime_column: 423 return [ 424 json.loads(doc_bytes.decode('utf-8')) 425 for doc_bytes in self.client.smembers(table_name) 426 ] 427 428 dateutil_parser = mrsm.attempt_import('dateutil.parser') 429 430 if isinstance(begin, str): 431 begin = coerce_timezone(dateutil_parser.parse(begin)) 432 433 if isinstance(end, str): 434 end = coerce_timezone(dateutil_parser.parse(end)) 435 436 begin_ts = ( 437 ( 438 int(begin.replace(tzinfo=timezone.utc).timestamp()) 439 if isinstance(begin, datetime) 440 else int(begin) 441 ) 442 if begin is not None else '-inf' 443 ) 444 end_ts = ( 445 ( 446 int(end.replace(tzinfo=timezone.utc).timestamp()) 447 if isinstance(end, datetime) 448 else int(end) 449 ) 450 if end is not None else '+inf' 451 ) 452 453 if debug: 454 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 455 456 return [ 457 json.loads(doc_bytes.decode('utf-8')) 458 for doc_bytes in self.client.zrangebyscore( 459 table_name, 460 begin_ts, 461 end_ts, 462 withscores=False, 463 ) 464 ] 465 466 def _read_docs_from_list( 467 self, 468 table: str, 469 begin_ix: Optional[int] = 0, 470 end_ix: Optional[int] = -1, 471 debug: bool = False, 472 ): 473 """ 474 Read a list of documents from a "table". 475 476 Parameters 477 ---------- 478 table: str 479 The "table" (root key) from which to read docs. 480 481 begin_ix: Optional[int], default 0 482 If provided, only read documents from this starting index. 483 484 end_ix: Optional[int], default -1 485 If provided, only read documents up to (not including) this index. 486 487 Returns 488 ------- 489 A list of documents. 490 """ 491 if begin_ix is None: 492 begin_ix = 0 493 494 if end_ix == 0: 495 return 496 497 if end_ix is None: 498 end_ix = -1 499 else: 500 end_ix -= 1 501 502 table_name = self.quote_table(table) 503 doc_keys = self.client.lrange(table_name, begin_ix, end_ix) 504 for doc_key in doc_keys: 505 yield { 506 key.decode('utf-8'): value.decode('utf-8') 507 for key, value in self.client.hgetall(doc_key).items() 508 } 509 510 def drop_table(self, table: str, debug: bool = False) -> None: 511 """ 512 Drop a "table" of documents. 513 514 Parameters 515 ---------- 516 table: str 517 The "table" name (root key) to be deleted. 518 """ 519 table_name = self.quote_table(table) 520 datetime_column_key = self.get_datetime_column_key(table) 521 self.client.delete(table_name) 522 self.client.delete(datetime_column_key) 523 524 @classmethod 525 def get_entity_key(cls, *keys: Any) -> str: 526 """ 527 Return a joined key to set an entity. 528 """ 529 if not keys: 530 raise ValueError("No keys to be joined.") 531 532 for key in keys: 533 if cls.KEY_SEPARATOR in str(key): 534 raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.") 535 536 return cls.KEY_SEPARATOR.join([str(key) for key in keys])
Manage a Valkey instance.
Build a ValkeyConnector
from connection attributes or a URI string.
88 @property 89 def client(self): 90 """ 91 Return the Valkey client. 92 """ 93 if '_client' in self.__dict__: 94 return self.__dict__['_client'] 95 96 valkey = mrsm.attempt_import('valkey') 97 98 if 'uri' in self.__dict__: 99 self._client = valkey.Valkey.from_url(self.__dict__.get('uri')) 100 return self._client 101 102 optional_kwargs = { 103 key: self.__dict__.get(key) 104 for key in self.OPTIONAL_ATTRIBUTES 105 if key in self.__dict__ 106 } 107 connection_kwargs = { 108 'host': self.host, 109 **optional_kwargs 110 } 111 112 self._client = valkey.Valkey(**connection_kwargs) 113 return self._client
Return the Valkey client.
115 @property 116 def URI(self) -> str: 117 """ 118 Return the connection URI for this connector. 119 """ 120 import urllib.parse 121 122 if 'uri' in self.__dict__: 123 return self.__dict__.get('uri') 124 125 uri = "valkey://" 126 if 'username' in self.__dict__: 127 uri += urllib.parse.quote_plus(self.username) + ':' 128 129 if 'password' in self.__dict__: 130 uri += urllib.parse.quote_plus(self.password) + '@' 131 132 if 'host' in self.__dict__: 133 uri += self.host 134 135 if 'port' in self.__dict__: 136 uri += f':{self.port}' 137 138 if 'db' in self.__dict__: 139 uri += f"/{self.db}" 140 141 if 'socket_timeout' in self.__dict__: 142 uri += f"?timeout={self.socket_timeout}s" 143 144 return uri
Return the connection URI for this connector.
146 def set(self, key: str, value: Any, **kwargs: Any) -> None: 147 """ 148 Set the `key` to `value`. 149 """ 150 return self.client.set(key, value, **kwargs)
Set the key
to value
.
152 def get(self, key: str) -> Union[str, None]: 153 """ 154 Get the value for `key`. 155 """ 156 val = self.client.get(key) 157 if val is None: 158 return None 159 160 return val.decode('utf-8')
Get the value for key
.
162 def test_connection(self) -> bool: 163 """ 164 Return whether a connection may be established. 165 """ 166 return self.client.ping()
Return whether a connection may be established.
168 @classmethod 169 def quote_table(cls, table: str) -> str: 170 """ 171 Return a quoted key. 172 """ 173 return shlex.quote(table)
Return a quoted key.
175 @classmethod 176 def get_counter_key(cls, table: str) -> str: 177 """ 178 Return the counter key for a given table. 179 """ 180 table_name = cls.quote_table(table) 181 return f"{table_name}:counter"
Return the counter key for a given table.
183 def push_df( 184 self, 185 df: 'pd.DataFrame', 186 table: str, 187 datetime_column: Optional[str] = None, 188 debug: bool = False, 189 ) -> int: 190 """ 191 Append a pandas DataFrame to a table. 192 193 Parameters 194 ---------- 195 df: pd.DataFrame 196 The pandas DataFrame to append to the table. 197 198 table: str 199 The "table" name (root key). 200 201 datetime_column: Optional[str], default None 202 If provided, use this key as the datetime index. 203 204 Returns 205 ------- 206 The current index counter value (how many docs have been pushed). 207 """ 208 from meerschaum.utils.dataframe import to_json 209 docs_str = to_json(df) 210 docs = json.loads(docs_str) 211 return self.push_docs( 212 docs, 213 table, 214 datetime_column=datetime_column, 215 debug=debug, 216 )
Append a pandas DataFrame to a table.
Parameters
- df (pd.DataFrame): The pandas DataFrame to append to the table.
- table (str): The "table" name (root key).
- datetime_column (Optional[str], default None): If provided, use this key as the datetime index.
Returns
- The current index counter value (how many docs have been pushed).
218 def push_docs( 219 self, 220 docs: List[Dict[str, Any]], 221 table: str, 222 datetime_column: Optional[str] = None, 223 debug: bool = False, 224 ) -> int: 225 """ 226 Append a list of documents to a table. 227 228 Parameters 229 ---------- 230 docs: List[Dict[str, Any]] 231 The docs to be pushed. 232 All keys and values will be coerced into strings. 233 234 table: str 235 The "table" name (root key). 236 237 datetime_column: Optional[str], default None 238 If set, create a sorted set with this datetime column as the index. 239 Otherwise push the docs to a list. 240 241 Returns 242 ------- 243 The current index counter value (how many docs have been pushed). 244 """ 245 from meerschaum.utils.dtypes import json_serialize_value 246 table_name = self.quote_table(table) 247 datetime_column_key = self.get_datetime_column_key(table) 248 remote_datetime_column = self.get(datetime_column_key) 249 datetime_column = datetime_column or remote_datetime_column 250 dateutil_parser = mrsm.attempt_import('dateutil.parser') 251 252 old_len = ( 253 self.client.zcard(table_name) 254 if datetime_column 255 else self.client.scard(table_name) 256 ) 257 for doc in docs: 258 original_dt_val = ( 259 doc[datetime_column] 260 if datetime_column and datetime_column in doc 261 else 0 262 ) 263 dt_val = ( 264 dateutil_parser.parse(str(original_dt_val)) 265 if not isinstance(original_dt_val, int) 266 else int(original_dt_val) 267 ) if datetime_column else None 268 ts = ( 269 int(dt_val.replace(tzinfo=timezone.utc).timestamp()) 270 if isinstance(dt_val, datetime) 271 else int(dt_val) 272 ) if datetime_column else None 273 doc_str = json.dumps( 274 doc, 275 default=json_serialize_value, 276 separators=(',', ':'), 277 sort_keys=True, 278 ) 279 if datetime_column: 280 self.client.zadd(table_name, {doc_str: ts}) 281 else: 282 self.client.sadd(table_name, doc_str) 283 284 if datetime_column: 285 self.set(datetime_column_key, datetime_column) 286 new_len = ( 287 self.client.zcard(table_name) 288 if datetime_column 289 else self.client.scard(table_name) 290 ) 291 292 return new_len - old_len
Append a list of documents to a table.
Parameters
- docs (List[Dict[str, Any]]): The docs to be pushed. All keys and values will be coerced into strings.
- table (str): The "table" name (root key).
- datetime_column (Optional[str], default None): If set, create a sorted set with this datetime column as the index. Otherwise push the docs to a list.
Returns
- The current index counter value (how many docs have been pushed).
310 def get_datetime_column_key(self, table: str) -> str: 311 """ 312 Return the key to store the datetime index for `table`. 313 """ 314 table_name = self.quote_table(table) 315 return f'{table_name}:datetime_column'
Return the key to store the datetime index for table
.
317 def read( 318 self, 319 table: str, 320 begin: Union[datetime, int, str, None] = None, 321 end: Union[datetime, int, str, None] = None, 322 params: Optional[Dict[str, Any]] = None, 323 datetime_column: Optional[str] = None, 324 select_columns: Optional[List[str]] = None, 325 omit_columns: Optional[List[str]] = None, 326 debug: bool = False 327 ) -> Union['pd.DataFrame', None]: 328 """ 329 Query the table and return the result dataframe. 330 331 Parameters 332 ---------- 333 table: str 334 The "table" name to be queried. 335 336 begin: Union[datetime, int, str, None], default None 337 If provided, only return rows greater than or equal to this datetime. 338 339 end: Union[datetime, int, str, None], default None 340 If provided, only return rows older than this datetime. 341 342 params: Optional[Dict[str, Any]] 343 Additional Meerschaum filter parameters. 344 345 datetime_column: Optional[str], default None 346 If provided, use this column for the datetime index. 347 Otherwise infer from the table metadata. 348 349 select_columns: Optional[List[str]], default None 350 If provided, only return these columns. 351 352 omit_columns: Optional[List[str]], default None 353 If provided, do not include these columns in the result. 354 355 Returns 356 ------- 357 A Pandas DataFrame of the result, or `None`. 358 """ 359 from meerschaum.utils.dataframe import parse_df_datetimes, query_df 360 docs = self.read_docs( 361 table, 362 begin=begin, 363 end=end, 364 debug=debug, 365 ) 366 df = parse_df_datetimes(docs) 367 datetime_column_key = self.get_datetime_column_key(table) 368 datetime_column = datetime_column or self.get(datetime_column_key) 369 370 return query_df( 371 df, 372 begin=(begin if datetime_column is not None else None), 373 end=(end if datetime_column is not None else None), 374 params=params, 375 datetime_column=datetime_column, 376 select_columns=select_columns, 377 omit_columns=omit_columns, 378 inplace=True, 379 reset_index=True, 380 debug=debug, 381 )
Query the table and return the result dataframe.
Parameters
- table (str): The "table" name to be queried.
- begin (Union[datetime, int, str, None], default None): If provided, only return rows greater than or equal to this datetime.
- end (Union[datetime, int, str, None], default None): If provided, only return rows older than this datetime.
- params (Optional[Dict[str, Any]]): Additional Meerschaum filter parameters.
- datetime_column (Optional[str], default None): If provided, use this column for the datetime index. Otherwise infer from the table metadata.
- select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
Returns
- A Pandas DataFrame of the result, or
None
.
383 def read_docs( 384 self, 385 table: str, 386 begin: Union[datetime, int, str, None] = None, 387 end: Union[datetime, int, str, None] = None, 388 debug: bool = False, 389 ) -> List[Dict[str, str]]: 390 """ 391 Return a list of previously pushed docs. 392 393 Parameters 394 ---------- 395 table: str 396 The "table" name (root key) under which the docs were pushed. 397 398 begin: Union[datetime, int, str, None], default None 399 If provided and the table was created with a datetime index, only return documents 400 newer than this datetime. 401 If the table was not created with a datetime index and `begin` is an `int`, 402 return documents with a positional index greater than or equal to this value. 403 404 end: Union[datetime, int, str, None], default None 405 If provided and the table was created with a datetime index, only return documents 406 older than this datetime. 407 If the table was not created with a datetime index and `begin` is an `int`, 408 return documents with a positional index less than this value. 409 410 Returns 411 ------- 412 A list of dictionaries, where all keys and values are strings. 413 """ 414 from meerschaum.utils.dtypes import coerce_timezone 415 table_name = self.quote_table(table) 416 datetime_column_key = self.get_datetime_column_key(table) 417 datetime_column = self.get(datetime_column_key) 418 419 if debug: 420 dprint(f"Reading documents from '{table}' with {begin=}, {end=}") 421 422 if not datetime_column: 423 return [ 424 json.loads(doc_bytes.decode('utf-8')) 425 for doc_bytes in self.client.smembers(table_name) 426 ] 427 428 dateutil_parser = mrsm.attempt_import('dateutil.parser') 429 430 if isinstance(begin, str): 431 begin = coerce_timezone(dateutil_parser.parse(begin)) 432 433 if isinstance(end, str): 434 end = coerce_timezone(dateutil_parser.parse(end)) 435 436 begin_ts = ( 437 ( 438 int(begin.replace(tzinfo=timezone.utc).timestamp()) 439 if isinstance(begin, datetime) 440 else int(begin) 441 ) 442 if begin is not None else '-inf' 443 ) 444 end_ts = ( 445 ( 446 int(end.replace(tzinfo=timezone.utc).timestamp()) 447 if isinstance(end, datetime) 448 else int(end) 449 ) 450 if end is not None else '+inf' 451 ) 452 453 if debug: 454 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 455 456 return [ 457 json.loads(doc_bytes.decode('utf-8')) 458 for doc_bytes in self.client.zrangebyscore( 459 table_name, 460 begin_ts, 461 end_ts, 462 withscores=False, 463 ) 464 ]
Return a list of previously pushed docs.
Parameters
- table (str): The "table" name (root key) under which the docs were pushed.
- begin (Union[datetime, int, str, None], default None):
If provided and the table was created with a datetime index, only return documents
newer than this datetime.
If the table was not created with a datetime index and
begin
is anint
, return documents with a positional index greater than or equal to this value. - end (Union[datetime, int, str, None], default None):
If provided and the table was created with a datetime index, only return documents
older than this datetime.
If the table was not created with a datetime index and
begin
is anint
, return documents with a positional index less than this value.
Returns
- A list of dictionaries, where all keys and values are strings.
510 def drop_table(self, table: str, debug: bool = False) -> None: 511 """ 512 Drop a "table" of documents. 513 514 Parameters 515 ---------- 516 table: str 517 The "table" name (root key) to be deleted. 518 """ 519 table_name = self.quote_table(table) 520 datetime_column_key = self.get_datetime_column_key(table) 521 self.client.delete(table_name) 522 self.client.delete(datetime_column_key)
Drop a "table" of documents.
Parameters
- table (str): The "table" name (root key) to be deleted.
524 @classmethod 525 def get_entity_key(cls, *keys: Any) -> str: 526 """ 527 Return a joined key to set an entity. 528 """ 529 if not keys: 530 raise ValueError("No keys to be joined.") 531 532 for key in keys: 533 if cls.KEY_SEPARATOR in str(key): 534 raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.") 535 536 return cls.KEY_SEPARATOR.join([str(key) for key in keys])
Return a joined key to set an entity.
142def register_pipe( 143 self, 144 pipe: mrsm.Pipe, 145 debug: bool = False, 146 **kwargs: Any 147) -> SuccessTuple: 148 """ 149 Insert the pipe's attributes into the internal `pipes` table. 150 151 Parameters 152 ---------- 153 pipe: mrsm.Pipe 154 The pipe to be registered. 155 156 Returns 157 ------- 158 A `SuccessTuple` of the result. 159 """ 160 attributes = { 161 'connector_keys': str(pipe.connector_keys), 162 'metric_key': str(pipe.metric_key), 163 'location_key': str(pipe.location_key), 164 } 165 parameters_str = json.dumps( 166 pipe._attributes.get('parameters', {}), 167 separators=(',', ':'), 168 ) 169 170 pipe_key = get_pipe_key(pipe) 171 parameters_key = get_pipe_parameters_key(pipe) 172 173 try: 174 existing_pipe_id = self.get(pipe_key) 175 if existing_pipe_id is not None: 176 return False, f"{pipe} is already registered." 177 178 pipe_id = self.client.incr(PIPES_COUNTER) 179 _ = self.push_docs( 180 [{'pipe_id': pipe_id, **attributes}], 181 PIPES_TABLE, 182 datetime_column='pipe_id', 183 debug=debug, 184 ) 185 self.set(pipe_key, pipe_id) 186 self.set(parameters_key, parameters_str) 187 188 except Exception as e: 189 return False, f"Failed to register {pipe}:\n{e}" 190 191 return True, "Success"
Insert the pipe's attributes into the internal pipes
table.
Parameters
- pipe (mrsm.Pipe): The pipe to be registered.
Returns
- A
SuccessTuple
of the result.
194def get_pipe_id( 195 self, 196 pipe: mrsm.Pipe, 197 debug: bool = False, 198 **kwargs: Any 199) -> Union[str, int, None]: 200 """ 201 Return the `_id` for the pipe if it exists. 202 203 Parameters 204 ---------- 205 pipe: mrsm.Pipe 206 The pipe whose `_id` to fetch. 207 208 Returns 209 ------- 210 The `_id` for the pipe's document or `None`. 211 """ 212 pipe_key = get_pipe_key(pipe) 213 try: 214 return int(self.get(pipe_key)) 215 except Exception: 216 pass 217 return None
Return the _id
for the pipe if it exists.
Parameters
- pipe (mrsm.Pipe):
The pipe whose
_id
to fetch.
Returns
- The
_id
for the pipe's document orNone
.
220def get_pipe_attributes( 221 self, 222 pipe: mrsm.Pipe, 223 debug: bool = False, 224 **kwargs: Any 225) -> Dict[str, Any]: 226 """ 227 Return the pipe's document from the internal `pipes` collection. 228 229 Parameters 230 ---------- 231 pipe: mrsm.Pipe 232 The pipe whose attributes should be retrieved. 233 234 Returns 235 ------- 236 The document that matches the keys of the pipe. 237 """ 238 pipe_id = pipe.get_id(debug=debug) 239 if pipe_id is None: 240 return {} 241 242 parameters_key = get_pipe_parameters_key(pipe) 243 parameters_str = self.get(parameters_key) 244 245 parameters = json.loads(parameters_str) if parameters_str else {} 246 247 attributes = { 248 'connector_keys': pipe.connector_keys, 249 'metric_key': pipe.metric_key, 250 'location_key': pipe.location_key, 251 'parameters': parameters, 252 'pipe_id': pipe_id, 253 } 254 return attributes
Return the pipe's document from the internal pipes
collection.
Parameters
- pipe (mrsm.Pipe): The pipe whose attributes should be retrieved.
Returns
- The document that matches the keys of the pipe.
257def edit_pipe( 258 self, 259 pipe: mrsm.Pipe, 260 debug: bool = False, 261 **kwargs: Any 262) -> mrsm.SuccessTuple: 263 """ 264 Edit the attributes of the pipe. 265 266 Parameters 267 ---------- 268 pipe: mrsm.Pipe 269 The pipe whose in-memory parameters must be persisted. 270 271 Returns 272 ------- 273 A `SuccessTuple` indicating success. 274 """ 275 pipe_id = pipe.get_id(debug=debug) 276 if pipe_id is None: 277 return False, f"{pipe} is not registered." 278 279 parameters_key = get_pipe_parameters_key(pipe) 280 parameters_str = json.dumps(pipe.parameters, separators=(',', ':')) 281 self.set(parameters_key, parameters_str) 282 return True, "Success"
Edit the attributes of the pipe.
Parameters
- pipe (mrsm.Pipe): The pipe whose in-memory parameters must be persisted.
Returns
- A
SuccessTuple
indicating success.
285def pipe_exists( 286 self, 287 pipe: mrsm.Pipe, 288 debug: bool = False, 289 **kwargs: Any 290) -> bool: 291 """ 292 Check whether a pipe's target table exists. 293 294 Parameters 295 ---------- 296 pipe: mrsm.Pipe 297 The pipe to check whether its table exists. 298 299 Returns 300 ------- 301 A `bool` indicating the table exists. 302 """ 303 table_name = self.quote_table(pipe.target) 304 return self.client.exists(table_name) != 0
Check whether a pipe's target table exists.
Parameters
- pipe (mrsm.Pipe): The pipe to check whether its table exists.
Returns
- A
bool
indicating the table exists.
307def drop_pipe( 308 self, 309 pipe: mrsm.Pipe, 310 debug: bool = False, 311 **kwargs: Any 312) -> mrsm.SuccessTuple: 313 """ 314 Drop a pipe's collection if it exists. 315 316 Parameters 317 ---------- 318 pipe: mrsm.Pipe 319 The pipe to be dropped. 320 321 Returns 322 ------- 323 A `SuccessTuple` indicating success. 324 """ 325 for chunk_begin, chunk_end in pipe.get_chunk_bounds(debug=debug): 326 clear_chunk_success, clear_chunk_msg = pipe.clear( 327 begin=chunk_begin, 328 end=chunk_end, 329 debug=debug, 330 ) 331 if not clear_chunk_success: 332 return clear_chunk_success, clear_chunk_msg 333 try: 334 self.drop_table(pipe.target, debug=debug) 335 except Exception as e: 336 return False, f"Failed to drop {pipe}:\n{e}" 337 338 if 'valkey' not in pipe.parameters: 339 return True, "Success" 340 341 pipe.parameters['valkey']['dtypes'] = {} 342 if not pipe.temporary: 343 edit_success, edit_msg = pipe.edit(debug=debug) 344 if not edit_success: 345 return edit_success, edit_msg 346 347 return True, "Success"
Drop a pipe's collection if it exists.
Parameters
- pipe (mrsm.Pipe): The pipe to be dropped.
Returns
- A
SuccessTuple
indicating success.
350def delete_pipe( 351 self, 352 pipe: mrsm.Pipe, 353 debug: bool = False, 354 **kwargs: Any 355) -> mrsm.SuccessTuple: 356 """ 357 Delete a pipe's registration from the `pipes` collection. 358 359 Parameters 360 ---------- 361 pipe: mrsm.Pipe 362 The pipe to be deleted. 363 364 Returns 365 ------- 366 A `SuccessTuple` indicating success. 367 """ 368 drop_success, drop_message = pipe.drop(debug=debug) 369 if not drop_success: 370 return drop_success, drop_message 371 372 pipe_id = self.get_pipe_id(pipe, debug=debug) 373 if pipe_id is None: 374 return False, f"{pipe} is not registered." 375 376 pipe_key = get_pipe_key(pipe) 377 parameters_key = get_pipe_parameters_key(pipe) 378 self.client.delete(pipe_key) 379 self.client.delete(parameters_key) 380 df = self.read(PIPES_TABLE, params={'pipe_id': pipe_id}) 381 docs = df.to_dict(orient='records') 382 if docs: 383 doc = docs[0] 384 doc_str = json.dumps( 385 doc, 386 default=json_serialize_value, 387 separators=(',', ':'), 388 sort_keys=True, 389 ) 390 self.client.zrem(PIPES_TABLE, doc_str) 391 return True, "Success"
Delete a pipe's registration from the pipes
collection.
Parameters
- pipe (mrsm.Pipe): The pipe to be deleted.
Returns
- A
SuccessTuple
indicating success.
394def get_pipe_data( 395 self, 396 pipe: mrsm.Pipe, 397 select_columns: Optional[List[str]] = None, 398 omit_columns: Optional[List[str]] = None, 399 begin: Union[datetime, int, None] = None, 400 end: Union[datetime, int, None] = None, 401 params: Optional[Dict[str, Any]] = None, 402 debug: bool = False, 403 **kwargs: Any 404) -> Union['pd.DataFrame', None]: 405 """ 406 Query a pipe's target table and return the DataFrame. 407 408 Parameters 409 ---------- 410 pipe: mrsm.Pipe 411 The pipe with the target table from which to read. 412 413 select_columns: Optional[List[str]], default None 414 If provided, only select these given columns. 415 Otherwise select all available columns (i.e. `SELECT *`). 416 417 omit_columns: Optional[List[str]], default None 418 If provided, remove these columns from the selection. 419 420 begin: Union[datetime, int, None], default None 421 The earliest `datetime` value to search from (inclusive). 422 423 end: Union[datetime, int, None], default None 424 The lastest `datetime` value to search from (exclusive). 425 426 params: Optional[Dict[str, str]], default None 427 Additional filters to apply to the query. 428 429 Returns 430 ------- 431 The target table's data as a DataFrame. 432 """ 433 if not pipe.exists(debug=debug): 434 return None 435 436 from meerschaum.utils.dataframe import query_df, parse_df_datetimes 437 from meerschaum.utils.dtypes import are_dtypes_equal 438 439 valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {}) 440 dt_col = pipe.columns.get('datetime', None) 441 table_name = self.quote_table(pipe.target) 442 indices = [col for col in pipe.columns.values() if col] 443 ix_docs = [ 444 string_to_dict(doc.get('ix', '').replace(COLON, ':')) 445 for doc in self.read_docs( 446 pipe.target, 447 begin=begin, 448 end=end, 449 debug=debug, 450 ) 451 ] 452 try: 453 docs_strings = [ 454 self.get( 455 self.get_document_key( 456 doc, 457 indices, 458 table_name, 459 ) 460 ) 461 for doc in ix_docs 462 ] 463 except Exception as e: 464 warn(f"Failed to fetch documents for {pipe}:\n{e}") 465 docs_strings = [] 466 467 docs = [ 468 json.loads(doc_str) 469 for doc_str in docs_strings 470 if doc_str 471 ] 472 ignore_dt_cols = [ 473 col 474 for col, dtype in pipe.dtypes.items() 475 if not are_dtypes_equal(str(dtype), 'datetime') 476 ] 477 478 df = parse_df_datetimes( 479 docs, 480 ignore_cols=ignore_dt_cols, 481 chunksize=kwargs.get('chunksize', None), 482 strip_timezone=(pipe.tzinfo is None), 483 debug=debug, 484 ) 485 for col, typ in valkey_dtypes.items(): 486 try: 487 df[col] = df[col].astype(typ) 488 except Exception: 489 pass 490 491 df = pipe.enforce_dtypes(df, debug=debug) 492 493 if len(df) == 0: 494 return query_df(df, select_columns=select_columns, omit_columns=omit_columns) 495 496 return query_df( 497 df, 498 select_columns=select_columns, 499 omit_columns=omit_columns, 500 params=params, 501 begin=begin, 502 end=end, 503 datetime_column=dt_col, 504 inplace=True, 505 reset_index=True, 506 )
Query a pipe's target table and return the DataFrame.
Parameters
- pipe (mrsm.Pipe): The pipe with the target table from which to read.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, None], default None):
The earliest
datetime
value to search from (inclusive). - end (Union[datetime, int, None], default None):
The lastest
datetime
value to search from (exclusive). - params (Optional[Dict[str, str]], default None): Additional filters to apply to the query.
Returns
- The target table's data as a DataFrame.
509def sync_pipe( 510 self, 511 pipe: mrsm.Pipe, 512 df: 'pd.DataFrame' = None, 513 check_existing: bool = True, 514 debug: bool = False, 515 **kwargs: Any 516) -> mrsm.SuccessTuple: 517 """ 518 Upsert new documents into the pipe's collection. 519 520 Parameters 521 ---------- 522 pipe: mrsm.Pipe 523 The pipe whose collection should receive the new documents. 524 525 df: Union['pd.DataFrame', Iterator['pd.DataFrame']], default None 526 The data to be synced. 527 528 check_existing: bool, default True 529 If `False`, do not check the documents against existing data and instead insert directly. 530 531 Returns 532 ------- 533 A `SuccessTuple` indicating success. 534 """ 535 from meerschaum.utils.dtypes import are_dtypes_equal 536 dt_col = pipe.columns.get('datetime', None) 537 indices = [col for col in pipe.columns.values() if col] 538 table_name = self.quote_table(pipe.target) 539 is_dask = 'dask' in df.__module__ 540 if is_dask: 541 df = df.compute() 542 upsert = pipe.parameters.get('upsert', False) 543 static = pipe.parameters.get('static', False) 544 545 def _serialize_indices_docs(_docs): 546 return [ 547 { 548 'ix': self.get_document_key(doc, indices), 549 **( 550 { 551 dt_col: doc.get(dt_col, 0) 552 } 553 if dt_col 554 else {} 555 ) 556 } 557 for doc in _docs 558 ] 559 560 valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {}) 561 new_dtypes = { 562 str(key): ( 563 str(val) 564 if not are_dtypes_equal(str(val), 'datetime') 565 else 'datetime64[ns, UTC]' 566 ) 567 for key, val in df.dtypes.items() 568 if str(key) not in valkey_dtypes 569 } 570 for col, typ in {c: v for c, v in valkey_dtypes.items()}.items(): 571 if col in df.columns: 572 try: 573 df[col] = df[col].astype(typ) 574 except Exception: 575 valkey_dtypes[col] = 'string' 576 new_dtypes[col] = 'string' 577 df[col] = df[col].astype('string') 578 579 if new_dtypes and (not static or not valkey_dtypes): 580 valkey_dtypes.update(new_dtypes) 581 if 'valkey' not in pipe.parameters: 582 pipe.parameters['valkey'] = {} 583 pipe.parameters['valkey']['dtypes'] = valkey_dtypes 584 if not pipe.temporary: 585 edit_success, edit_msg = pipe.edit(debug=debug) 586 if not edit_success: 587 return edit_success, edit_msg 588 589 unseen_df, update_df, delta_df = ( 590 pipe.filter_existing(df, include_unchanged_columns=True, debug=debug) 591 if check_existing and not upsert 592 else (None, df, df) 593 ) 594 num_insert = len(unseen_df) if unseen_df is not None else 0 595 num_update = len(update_df) if update_df is not None else 0 596 msg = ( 597 f"Inserted {num_insert}, updated {num_update} rows." 598 if not upsert 599 else f"Upserted {num_update} rows." 600 ) 601 if len(delta_df) == 0: 602 return True, msg 603 604 unseen_docs = unseen_df.to_dict(orient='records') if unseen_df is not None else [] 605 unseen_indices_docs = _serialize_indices_docs(unseen_docs) 606 unseen_ix_vals = { 607 self.get_document_key(doc, indices, table_name): serialize_document(doc) 608 for doc in unseen_docs 609 } 610 for key, val in unseen_ix_vals.items(): 611 try: 612 self.set(key, val) 613 except Exception as e: 614 return False, f"Failed to set keys for {pipe}:\n{e}" 615 616 try: 617 self.push_docs( 618 unseen_indices_docs, 619 pipe.target, 620 datetime_column=dt_col, 621 debug=debug, 622 ) 623 except Exception as e: 624 return False, f"Failed to push docs to '{pipe.target}':\n{e}" 625 626 update_docs = update_df.to_dict(orient='records') if update_df is not None else [] 627 update_ix_docs = { 628 self.get_document_key(doc, indices, table_name): doc 629 for doc in update_docs 630 } 631 existing_docs_data = { 632 key: self.get(key) 633 for key in update_ix_docs 634 } if pipe.exists(debug=debug) else {} 635 existing_docs = { 636 key: json.loads(data) 637 for key, data in existing_docs_data.items() 638 if data 639 } 640 new_update_docs = { 641 key: doc 642 for key, doc in update_ix_docs.items() 643 if key not in existing_docs 644 } 645 new_ix_vals = { 646 self.get_document_key(doc, indices, table_name): serialize_document(doc) 647 for doc in new_update_docs.values() 648 } 649 for key, val in new_ix_vals.items(): 650 try: 651 self.set(key, val) 652 except Exception as e: 653 return False, f"Failed to set keys for {pipe}:\n{e}" 654 655 old_update_docs = { 656 key: { 657 **existing_docs[key], 658 **doc 659 } 660 for key, doc in update_ix_docs.items() 661 if key in existing_docs 662 } 663 new_indices_docs = _serialize_indices_docs([doc for doc in new_update_docs.values()]) 664 try: 665 if new_indices_docs: 666 self.push_docs( 667 new_indices_docs, 668 pipe.target, 669 datetime_column=dt_col, 670 debug=debug, 671 ) 672 except Exception as e: 673 return False, f"Failed to upsert '{pipe.target}':\n{e}" 674 675 for key, doc in old_update_docs.items(): 676 try: 677 self.set(key, serialize_document(doc)) 678 except Exception as e: 679 return False, f"Failed to set keys for {pipe}:\n{e}" 680 681 return True, msg
Upsert new documents into the pipe's collection.
Parameters
- pipe (mrsm.Pipe): The pipe whose collection should receive the new documents.
- df (Union['pd.DataFrame', Iterator['pd.DataFrame']], default None): The data to be synced.
- check_existing (bool, default True):
If
False
, do not check the documents against existing data and instead insert directly.
Returns
- A
SuccessTuple
indicating success.
684def get_pipe_columns_types( 685 self, 686 pipe: mrsm.Pipe, 687 debug: bool = False, 688 **kwargs: Any 689) -> Dict[str, str]: 690 """ 691 Return the data types for the columns in the target table for data type enforcement. 692 693 Parameters 694 ---------- 695 pipe: mrsm.Pipe 696 The pipe whose target table contains columns and data types. 697 698 Returns 699 ------- 700 A dictionary mapping columns to data types. 701 """ 702 if not pipe.exists(debug=debug): 703 return {} 704 705 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 706 return { 707 col: get_db_type_from_pd_type(typ, flavor='postgresql') 708 for col, typ in pipe.parameters.get('valkey', {}).get('dtypes', {}).items() 709 }
Return the data types for the columns in the target table for data type enforcement.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table contains columns and data types.
Returns
- A dictionary mapping columns to data types.
712def clear_pipe( 713 self, 714 pipe: mrsm.Pipe, 715 begin: Union[datetime, int, None] = None, 716 end: Union[datetime, int, None] = None, 717 params: Optional[Dict[str, Any]] = None, 718 debug: bool = False, 719) -> mrsm.SuccessTuple: 720 """ 721 Delete rows within `begin`, `end`, and `params`. 722 723 Parameters 724 ---------- 725 pipe: mrsm.Pipe 726 The pipe whose rows to clear. 727 728 begin: Union[datetime, int, None], default None 729 If provided, remove rows >= `begin`. 730 731 end: Union[datetime, int, None], default None 732 If provided, remove rows < `end`. 733 734 params: Optional[Dict[str, Any]], default None 735 If provided, only remove rows which match the `params` filter. 736 737 Returns 738 ------- 739 A `SuccessTuple` indicating success. 740 """ 741 dt_col = pipe.columns.get('datetime', None) 742 743 existing_df = pipe.get_data( 744 begin=begin, 745 end=end, 746 params=params, 747 debug=debug, 748 ) 749 if existing_df is None or len(existing_df) == 0: 750 return True, "Deleted 0 rows." 751 752 docs = existing_df.to_dict(orient='records') 753 table_name = self.quote_table(pipe.target) 754 indices = [col for col in pipe.columns.values() if col] 755 for doc in docs: 756 set_doc_key = self.get_document_key(doc, indices) 757 table_doc_key = self.get_document_key(doc, indices, table_name) 758 try: 759 if dt_col: 760 self.client.zrem(table_name, set_doc_key) 761 else: 762 self.client.srem(table_name, set_doc_key) 763 self.client.delete(table_doc_key) 764 except Exception as e: 765 return False, f"Failed to delete documents:\n{e}" 766 msg = ( 767 f"Deleted {len(docs)} row" 768 + ('s' if len(docs) != 1 else '') 769 + '.' 770 ) 771 return True, msg
Delete rows within begin
, end
, and params
.
Parameters
- pipe (mrsm.Pipe): The pipe whose rows to clear.
- begin (Union[datetime, int, None], default None):
If provided, remove rows >=
begin
. - end (Union[datetime, int, None], default None):
If provided, remove rows <
end
. - params (Optional[Dict[str, Any]], default None):
If provided, only remove rows which match the
params
filter.
Returns
- A
SuccessTuple
indicating success.
774def get_sync_time( 775 self, 776 pipe: mrsm.Pipe, 777 newest: bool = True, 778 **kwargs: Any 779) -> Union[datetime, int, None]: 780 """ 781 Return the newest (or oldest) timestamp in a pipe. 782 """ 783 from meerschaum.utils.dtypes import are_dtypes_equal 784 dt_col = pipe.columns.get('datetime', None) 785 dt_typ = pipe.dtypes.get(dt_col, 'datetime64[ns, UTC]') 786 if not dt_col: 787 return None 788 789 dateutil_parser = mrsm.attempt_import('dateutil.parser') 790 table_name = self.quote_table(pipe.target) 791 try: 792 vals = ( 793 self.client.zrevrange(table_name, 0, 0) 794 if newest 795 else self.client.zrange(table_name, 0, 0) 796 ) 797 if not vals: 798 return None 799 val = vals[0] 800 except Exception: 801 return None 802 803 doc = json.loads(val) 804 dt_val = doc.get(dt_col, None) 805 if dt_val is None: 806 return None 807 808 try: 809 return ( 810 int(dt_val) 811 if are_dtypes_equal(dt_typ, 'int') 812 else dateutil_parser.parse(str(dt_val)) 813 ) 814 except Exception as e: 815 warn(f"Failed to parse sync time for {pipe}:\n{e}") 816 817 return None
Return the newest (or oldest) timestamp in a pipe.
820def get_pipe_rowcount( 821 self, 822 pipe: mrsm.Pipe, 823 begin: Union[datetime, int, None] = None, 824 end: Union[datetime, int, None] = None, 825 params: Optional[Dict[str, Any]] = None, 826 debug: bool = False, 827 **kwargs: Any 828) -> Union[int, None]: 829 """ 830 Return the number of documents in the pipe's set. 831 """ 832 dt_col = pipe.columns.get('datetime', None) 833 table_name = self.quote_table(pipe.target) 834 835 if not pipe.exists(): 836 return 0 837 838 try: 839 if begin is None and end is None and not params: 840 return ( 841 self.client.zcard(table_name) 842 if dt_col 843 else self.client.scard(table_name) 844 ) 845 except Exception as e: 846 if debug: 847 dprint(f"Failed to get rowcount for {pipe}:\n{e}") 848 return None 849 850 df = pipe.get_data(begin=begin, end=end, params=params, debug=debug) 851 if df is None: 852 return 0 853 854 return len(df)
Return the number of documents in the pipe's set.
857def fetch_pipes_keys( 858 self, 859 connector_keys: Optional[List[str]] = None, 860 metric_keys: Optional[List[str]] = None, 861 location_keys: Optional[List[str]] = None, 862 tags: Optional[List[str]] = None, 863 params: Optional[Dict[str, Any]] = None, 864 debug: bool = False 865) -> Optional[List[Tuple[str, str, Optional[str]]]]: 866 """ 867 Return the keys for the registered pipes. 868 """ 869 from meerschaum.utils.dataframe import query_df 870 from meerschaum.utils.misc import separate_negation_values 871 try: 872 df = self.read(PIPES_TABLE, debug=debug) 873 except Exception: 874 return [] 875 876 if df is None or len(df) == 0: 877 return [] 878 879 query = {} 880 if connector_keys: 881 query['connector_keys'] = [str(k) for k in connector_keys] 882 if metric_keys: 883 query['metric_key'] = [str(k) for k in metric_keys] 884 if location_keys: 885 query['location_key'] = [str(k) for k in location_keys] 886 if params: 887 query.update(params) 888 889 df = query_df(df, query, inplace=True) 890 891 keys = [ 892 ( 893 doc['connector_keys'], 894 doc['metric_key'], 895 doc['location_key'], 896 ) 897 for doc in df.to_dict(orient='records') 898 ] 899 if not tags: 900 return keys 901 902 tag_groups = [tag.split(',') for tag in tags] 903 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 904 905 filtered_keys = [] 906 for ck, mk, lk in keys: 907 pipe = mrsm.Pipe(ck, mk, lk, instance=self) 908 pipe_tags = set(pipe.tags) 909 910 include_pipe = True 911 for in_tags, ex_tags in in_ex_tag_groups: 912 all_in = all(tag in pipe_tags for tag in in_tags) 913 any_ex = any(tag in pipe_tags for tag in ex_tags) 914 915 if (not all_in) or any_ex: 916 include_pipe = False 917 continue 918 919 if include_pipe: 920 filtered_keys.append((ck, mk, lk)) 921 922 return filtered_keys
Return the keys for the registered pipes.
59@staticmethod 60def get_document_key( 61 doc: Dict[str, Any], 62 indices: List[str], 63 table_name: Optional[str] = None, 64) -> str: 65 """ 66 Return a serialized string for a document's indices only. 67 68 Parameters 69 ---------- 70 doc: Dict[str, Any] 71 The document containing index values to be serialized. 72 73 indices: List[str] 74 The name of the indices to be serialized. 75 76 table_name: Optional[str], default None 77 If provided, prepend the table to the key. 78 79 Returns 80 ------- 81 A serialized string of the document's indices. 82 """ 83 from meerschaum.utils.dtypes import coerce_timezone 84 index_vals = { 85 key: ( 86 str(val).replace(':', COLON) 87 if not isinstance(val, datetime) 88 else str(int(coerce_timezone(val).replace(tzinfo=timezone.utc).timestamp())) 89 ) 90 for key, val in doc.items() 91 if ((key in indices) if indices else True) 92 } 93 indices_str = ( 94 ( 95 ( 96 ( 97 table_name 98 + ':' 99 + ('indices:' if True else '') 100 ) 101 ) 102 if table_name 103 else '' 104 ) + ','.join( 105 sorted( 106 [ 107 f'{key}{COLON}{val}' 108 for key, val in index_vals.items() 109 ] 110 ) 111 ) 112 ) 113 return indices_str
Return a serialized string for a document's indices only.
Parameters
- doc (Dict[str, Any]): The document containing index values to be serialized.
- indices (List[str]): The name of the indices to be serialized.
- table_name (Optional[str], default None): If provided, prepend the table to the key.
Returns
- A serialized string of the document's indices.
116@classmethod 117def get_table_quoted_doc_key( 118 cls, 119 table_name: str, 120 doc: Dict[str, Any], 121 indices: List[str], 122 datetime_column: Optional[str] = None, 123) -> str: 124 """ 125 Return the document string as stored in the underling set. 126 """ 127 return json.dumps( 128 { 129 cls.get_document_key(doc, indices, table_name): serialize_document(doc), 130 **( 131 {datetime_column: doc.get(datetime_column, 0)} 132 if datetime_column 133 else {} 134 ) 135 }, 136 sort_keys=True, 137 separators=(',', ':'), 138 default=json_serialize_value, 139 )
Return the document string as stored in the underling set.
17def fetch( 18 self, 19 pipe: mrsm.Pipe, 20 begin: Union[datetime, int, None] = None, 21 end: Union[datetime, int, None] = None, 22 params: Optional[Dict[str, Any]] = None, 23 debug: bool = False, 24 **kwargs: Any 25) -> List[Dict[str, Any]]: 26 """ 27 Return data from a source database. 28 """ 29 source_key = pipe.parameters.get('valkey', {}).get('key', None) 30 if not source_key: 31 return [] 32 33 try: 34 key_type = self.client.type(source_key).decode('utf-8') 35 except Exception: 36 warn(f"Could not determine the type for key '{source_key}'.") 37 return [] 38 39 begin_ts = ( 40 ( 41 int(begin.replace(tzinfo=timezone.utc).timestamp()) 42 if isinstance(begin, datetime) 43 else int(begin) 44 ) 45 if begin is not None else '-inf' 46 ) 47 end_ts = ( 48 ( 49 int(end.replace(tzinfo=timezone.utc).timestamp()) 50 if isinstance(end, datetime) 51 else int(end) 52 ) 53 if end is not None else '+inf' 54 ) 55 56 if debug: 57 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 58 59 if key_type == 'set': 60 return [ 61 json.loads(doc_bytes.decode('utf-8')) 62 for doc_bytes in self.client.smembers(source_key) 63 ] 64 65 if key_type == 'zset': 66 return [ 67 json.loads(doc_bytes.decode('utf-8')) 68 for doc_bytes in self.client.zrangebyscore( 69 source_key, 70 begin_ts, 71 end_ts, 72 withscores=False, 73 ) 74 ] 75 76 return [{source_key: self.get(source_key)}]
Return data from a source database.
20def get_users_pipe(self): 21 """ 22 Return the pipe which stores the registered users. 23 """ 24 return mrsm.Pipe( 25 'mrsm', 'users', 26 columns=['user_id'], 27 temporary=True, 28 target=USERS_TABLE, 29 instance=self, 30 )
Return the pipe which stores the registered users.
33@classmethod 34def get_user_key(cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str: 35 """ 36 Return the key to store metadata about a user. 37 38 Parameters 39 ---------- 40 user_id_or_username: str 41 The user ID or username of the given user. 42 If `by_username` is `True`, then provide the username. 43 44 sub_key: str 45 The key suffix, e.g. `'attributes'`. 46 47 by_username: bool, default False 48 If `True`, then treat `user_id_or_username` as a username. 49 50 Returns 51 ------- 52 A key to store information about a user. 53 54 Examples 55 -------- 56 >>> get_user_key('deadbeef', 'attributes') 57 'mrsm_user:user_id:deadbeef:attributes' 58 >>> get_user_key('foo', 'user_id', by_username=True) 59 'mrsm_user:username:foo:user_id' 60 """ 61 key_type = 'user_id' if not by_username else 'username' 62 return cls.get_entity_key(USER_PREFIX, key_type, user_id_or_username, sub_key)
Return the key to store metadata about a user.
Parameters
- user_id_or_username (str):
The user ID or username of the given user.
If
by_username
isTrue
, then provide the username. - sub_key (str):
The key suffix, e.g.
'attributes'
. - by_username (bool, default False):
If
True
, then treatuser_id_or_username
as a username.
Returns
- A key to store information about a user.
Examples
>>> get_user_key('deadbeef', 'attributes')
'mrsm_user:user_id:deadbeef:attributes'
>>> get_user_key('foo', 'user_id', by_username=True)
'mrsm_user:username:foo:user_id'
65@classmethod 66def get_user_keys_vals( 67 cls, 68 user: 'mrsm.core.User', 69 mutable_only: bool = False, 70) -> Dict[str, str]: 71 """ 72 Return a dictionary containing keys and values to set for the user. 73 74 Parameters 75 ---------- 76 user: mrsm.core.User 77 The user for which to generate the keys. 78 79 mutable_only: bool, default False 80 If `True`, only return keys which may be edited. 81 82 Returns 83 ------- 84 A dictionary mapping a user's keys to values. 85 """ 86 user_attributes_str = json.dumps(user.attributes, separators=(',', ':')) 87 mutable_keys_vals = { 88 cls.get_user_key(user.user_id, 'attributes'): user_attributes_str, 89 cls.get_user_key(user.user_id, 'email'): user.email, 90 cls.get_user_key(user.user_id, 'type'): user.type, 91 cls.get_user_key(user.user_id, 'password_hash'): user.password_hash, 92 } 93 if mutable_only: 94 return mutable_keys_vals 95 96 immutable_keys_vals = { 97 cls.get_user_key(user.user_id, 'username'): user.username, 98 cls.get_user_key(user.username, 'user_id', by_username=True): user.user_id, 99 } 100 101 return {**immutable_keys_vals, **mutable_keys_vals}
Return a dictionary containing keys and values to set for the user.
Parameters
- user (mrsm.core.User): The user for which to generate the keys.
- mutable_only (bool, default False):
If
True
, only return keys which may be edited.
Returns
- A dictionary mapping a user's keys to values.
104def register_user( 105 self, 106 user: 'mrsm.core.User', 107 debug: bool = False, 108 **kwargs: Any 109) -> SuccessTuple: 110 """ 111 Register a new user. 112 """ 113 from meerschaum.utils.misc import generate_password 114 115 user.user_id = generate_password(12) 116 users_pipe = self.get_users_pipe() 117 keys_vals = self.get_user_keys_vals(user) 118 119 try: 120 sync_success, sync_msg = users_pipe.sync( 121 [ 122 { 123 'user_id': user.user_id, 124 'username': user.username, 125 }, 126 ], 127 check_existing=False, 128 debug=debug, 129 ) 130 if not sync_success: 131 return sync_success, sync_msg 132 133 for key, val in keys_vals.items(): 134 if val is not None: 135 self.set(key, val) 136 137 success, msg = True, "Success" 138 except Exception as e: 139 success = False 140 import traceback 141 traceback.print_exc() 142 msg = f"Failed to register '{user.username}':\n{e}" 143 144 if not success: 145 for key in keys_vals: 146 try: 147 self.client.delete(key) 148 except Exception: 149 pass 150 151 return success, msg
Register a new user.
154def get_user_id(self, user: 'mrsm.core.User', debug: bool = False) -> Union[str, None]: 155 """ 156 Return the ID for a user, or `None`. 157 """ 158 username_user_id_key = self.get_user_key(user.username, 'user_id', by_username=True) 159 try: 160 user_id = self.get(username_user_id_key) 161 except Exception: 162 user_id = None 163 return user_id
Return the ID for a user, or None
.
166def edit_user( 167 self, 168 user: 'mrsm.core.User', 169 debug: bool = False, 170 **kw: Any 171) -> SuccessTuple: 172 """ 173 Edit the attributes for an existing user. 174 """ 175 keys_vals = self.get_user_keys_vals(user, mutable_only=True) 176 try: 177 old_keys_vals = { 178 key: self.get(key) 179 for key in keys_vals 180 } 181 except Exception as e: 182 return False, f"Failed to edit user:\n{e}" 183 184 try: 185 for key, val in keys_vals.items(): 186 self.set(key, val) 187 success, msg = True, "Success" 188 except Exception as e: 189 success = False 190 msg = f"Failed to edit user:\n{e}" 191 192 if not success: 193 try: 194 for key, old_val in old_keys_vals.items(): 195 self.set(key, old_val) 196 except Exception: 197 pass 198 199 return success, msg
Edit the attributes for an existing user.
202def get_user_attributes( 203 self, 204 user: 'mrsm.core.User', 205 debug: bool = False 206) -> Union[Dict[str, Any], None]: 207 """ 208 Return the user's attributes. 209 """ 210 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 211 user_id_attributes_key = self.get_user_key(user_id, 'attributes') 212 try: 213 return json.loads(self.get(user_id_attributes_key)) 214 except Exception: 215 return None
Return the user's attributes.
218def delete_user( 219 self, 220 user: 'mrsm.core.User', 221 debug: bool = False 222) -> SuccessTuple: 223 """ 224 Delete a user's keys. 225 """ 226 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 227 users_pipe = self.get_users_pipe() 228 keys_vals = self.get_user_keys_vals(user) 229 try: 230 old_keys_vals = { 231 key: self.get(key) 232 for key in keys_vals 233 } 234 except Exception as e: 235 return False, f"Failed to delete user:\n{e}" 236 237 clear_success, clear_msg = users_pipe.clear(params={'user_id': user_id}) 238 if not clear_success: 239 return clear_success, clear_msg 240 241 try: 242 for key in keys_vals: 243 self.client.delete(key) 244 success, msg = True, "Success" 245 except Exception as e: 246 success = False 247 msg = f"Failed to delete user:\n{e}" 248 249 if not success: 250 try: 251 for key, old_val in old_keys_vals.items(): 252 self.set(key, old_val) 253 except Exception: 254 pass 255 256 return success, msg
Delete a user's keys.
259def get_users( 260 self, 261 debug: bool = False, 262 **kw: Any 263) -> List[str]: 264 """ 265 Get the registered usernames. 266 """ 267 users_pipe = self.get_users_pipe() 268 df = users_pipe.get_data() 269 if df is None: 270 return [] 271 272 return list(df['username'])
Get the registered usernames.
275def get_user_password_hash( 276 self, 277 user: 'mrsm.core.User', 278 debug: bool = False, 279 **kw: Any 280) -> Union[str, None]: 281 """ 282 Return the password has for a user. 283 """ 284 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 285 user_id_password_hash_key = self.get_user_key(user_id, 'password_hash') 286 try: 287 return self.get(user_id_password_hash_key) 288 except Exception: 289 return None
Return the password has for a user.
292def get_user_type( 293 self, 294 user: 'mrsm.core.User', 295 debug: bool = False, 296 **kw: Any 297) -> Union[str, None]: 298 """ 299 Return the user's type. 300 """ 301 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 302 user_id_type_key = self.get_user_key(user_id, 'type') 303 try: 304 return self.get(user_id_type_key) 305 except Exception: 306 return None
Return the user's type.
20def get_plugins_pipe(self) -> mrsm.Pipe: 21 """ 22 Return the pipe to store the plugins. 23 """ 24 return mrsm.Pipe( 25 'mrsm', 'plugins', 26 columns=['plugin_name'], 27 temporary=True, 28 target=PLUGINS_TABLE, 29 instance=self, 30 )
Return the pipe to store the plugins.
33@classmethod 34def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str: 35 """ 36 Return the key for a plugin's attribute. 37 """ 38 return cls.get_entity_key(PLUGIN_PREFIX, plugin_name, sub_key)
Return the key for a plugin's attribute.
41@classmethod 42def get_plugin_keys_vals( 43 cls, 44 plugin: 'mrsm.core.Plugin', 45 mutable_only: bool = False, 46) -> Dict[str, str]: 47 """ 48 Return a dictionary containing keys and values to set for the plugin. 49 50 Parameters 51 ---------- 52 plugin: mrsm.core.Plugin 53 The plugin for which to generate the keys. 54 55 mutable_only: bool, default False 56 If `True`, only return keys which may be edited. 57 58 Returns 59 ------- 60 A dictionary mapping a plugins's keys to values. 61 """ 62 plugin_attributes_str = json.dumps(plugin.attributes, separators=(',', ':')) 63 mutable_keys_vals = { 64 cls.get_plugin_key(plugin.name, 'attributes'): plugin_attributes_str, 65 cls.get_plugin_key(plugin.name, 'version'): plugin.version, 66 } 67 if mutable_only: 68 return mutable_keys_vals 69 70 immutable_keys_vals = { 71 cls.get_plugin_key(plugin.name, 'user_id'): plugin.user_id, 72 } 73 74 return {**immutable_keys_vals, **mutable_keys_vals}
Return a dictionary containing keys and values to set for the plugin.
Parameters
- plugin (mrsm.core.Plugin): The plugin for which to generate the keys.
- mutable_only (bool, default False):
If
True
, only return keys which may be edited.
Returns
- A dictionary mapping a plugins's keys to values.
77def register_plugin( 78 self, 79 plugin: 'mrsm.core.Plugin', 80 force: bool = False, 81 debug: bool = False, 82 **kw: Any 83) -> SuccessTuple: 84 """Register a new plugin to the `mrsm_plugins` "table".""" 85 from meerschaum.utils.misc import generate_password 86 87 plugins_pipe = self.get_plugins_pipe() 88 keys_vals = self.get_plugin_keys_vals(plugin) 89 90 try: 91 sync_success, sync_msg = plugins_pipe.sync( 92 [ 93 { 94 'plugin_name': plugin.name, 95 'user_id': plugin.user_id, 96 }, 97 ], 98 check_existing=False, 99 debug=debug, 100 ) 101 if not sync_success: 102 return sync_success, sync_msg 103 104 for key, val in keys_vals.items(): 105 if val is not None: 106 self.set(key, val) 107 108 success, msg = True, "Success" 109 except Exception as e: 110 success = False 111 msg = f"Failed to register plugin '{plugin.name}':\n{e}" 112 113 if not success: 114 for key in keys_vals: 115 try: 116 self.client.delete(key) 117 except Exception: 118 pass 119 120 return success, msg
Register a new plugin to the mrsm_plugins
"table".
123def get_plugin_id( 124 self, 125 plugin: 'mrsm.core.Plugin', 126 debug: bool = False 127) -> Union[str, None]: 128 """ 129 Return a plugin's ID. 130 """ 131 return plugin.name
Return a plugin's ID.
134def get_plugin_version( 135 self, 136 plugin: 'mrsm.core.Plugin', 137 debug: bool = False, 138) -> Union[str, None]: 139 """ 140 Return a plugin's version. 141 """ 142 version_key = self.get_plugin_key(plugin.name, 'version') 143 144 try: 145 return self.get(version_key) 146 except Exception: 147 return None
Return a plugin's version.
150def get_plugin_user_id( 151 self, 152 plugin: 'mrsm.core.Plugin', 153 debug: bool = False 154) -> Union[str, None]: 155 """ 156 Return a plugin's user ID. 157 """ 158 user_id_key = self.get_plugin_key(plugin.name, 'user_id') 159 160 try: 161 return self.get(user_id_key) 162 except Exception: 163 return None
Return a plugin's user ID.
166def get_plugin_username( 167 self, 168 plugin: 'mrsm.core.Plugin', 169 debug: bool = False 170) -> Union[str]: 171 """ 172 Return the username of a plugin's owner. 173 """ 174 user_id = self.get_plugin_user_id(plugin, debug=debug) 175 if user_id is None: 176 return None 177 178 username_key = self.get_user_key(user_id, 'username') 179 try: 180 return self.get(username_key) 181 except Exception: 182 return None
Return the username of a plugin's owner.
185def get_plugin_attributes( 186 self, 187 plugin: 'mrsm.core.Plugin', 188 debug: bool = False 189) -> Dict[str, Any]: 190 """ 191 Return the attributes of a plugin. 192 """ 193 attributes_key = self.get_plugin_key(plugin.name, 'attributes') 194 try: 195 attributes_str = self.get(attributes_key) 196 if not attributes_str: 197 return {} 198 return json.loads(attributes_str) 199 except Exception: 200 return {}
Return the attributes of a plugin.
203def get_plugins( 204 self, 205 user_id: Optional[int] = None, 206 search_term: Optional[str] = None, 207 debug: bool = False, 208 **kw: Any 209) -> List[str]: 210 """ 211 Return a list of plugin names. 212 """ 213 plugins_pipe = self.get_plugins_pipe() 214 params = {} 215 if user_id: 216 params['user_id'] = user_id 217 218 df = plugins_pipe.get_data(['plugin_name'], params=params, debug=debug) 219 docs = df.to_dict(orient='records') 220 221 return [ 222 doc['plugin_name'] 223 for doc in docs 224 if (plugin_name := doc['plugin_name']).startswith(search_term or '') 225 ]
Return a list of plugin names.
228def delete_plugin( 229 self, 230 plugin: 'mrsm.core.Plugin', 231 debug: bool = False, 232 **kw: Any 233) -> SuccessTuple: 234 """ 235 Delete a plugin from the plugins table. 236 """ 237 plugins_pipe = self.get_plugins_pipe() 238 clear_success, clear_msg = plugins_pipe.clear(params={'plugin_name': plugin.name}, debug=debug) 239 if not clear_success: 240 return clear_success, clear_msg 241 242 keys_vals = self.get_plugin_keys_vals(plugin) 243 try: 244 old_keys_vals = { 245 key: self.get(key) 246 for key in keys_vals 247 } 248 except Exception as e: 249 return False, f"Failed to delete plugin '{plugin.name}':\n{e}" 250 251 try: 252 for key in keys_vals: 253 self.client.delete(key) 254 success, msg = True, "Success" 255 except Exception as e: 256 success = False 257 msg = f"Failed to delete plugin '{plugin.name}':\n{e}" 258 259 if not success: 260 try: 261 for key, old_val in old_keys_vals.items(): 262 self.set(key, old_val) 263 except Exception: 264 pass 265 266 return success, msg
Delete a plugin from the plugins table.