meerschaum.connectors.valkey
Import the ValkeyConnector
.
19@make_connector 20class ValkeyConnector(InstanceConnector): 21 """ 22 Manage a Valkey instance. 23 24 Build a `ValkeyConnector` from connection attributes or a URI string. 25 """ 26 REQUIRED_ATTRIBUTES: List[str] = ['host'] 27 OPTIONAL_ATTRIBUTES: List[str] = [ 28 'port', 'username', 'password', 'db', 'socket_timeout', 29 ] 30 DEFAULT_ATTRIBUTES: Dict[str, Any] = { 31 'username': 'default', 32 'port': 6379, 33 'db': 0, 34 'socket_timeout': 300, 35 } 36 KEY_SEPARATOR: str = ':' 37 38 from ._pipes import ( 39 register_pipe, 40 get_pipe_id, 41 get_pipe_attributes, 42 edit_pipe, 43 pipe_exists, 44 drop_pipe, 45 delete_pipe, 46 get_pipe_data, 47 sync_pipe, 48 get_pipe_columns_types, 49 clear_pipe, 50 get_sync_time, 51 get_pipe_rowcount, 52 fetch_pipes_keys, 53 get_document_key, 54 get_table_quoted_doc_key, 55 ) 56 from ._fetch import ( 57 fetch, 58 ) 59 60 from ._users import ( 61 get_users_pipe, 62 get_user_key, 63 get_user_keys_vals, 64 register_user, 65 get_user_id, 66 edit_user, 67 get_user_attributes, 68 delete_user, 69 get_users, 70 get_user_password_hash, 71 get_user_type, 72 ) 73 from ._plugins import ( 74 get_plugins_pipe, 75 get_plugin_key, 76 get_plugin_keys_vals, 77 register_plugin, 78 get_plugin_id, 79 get_plugin_version, 80 get_plugin_user_id, 81 get_plugin_username, 82 get_plugin_attributes, 83 delete_plugin, 84 ) 85 86 @property 87 def client(self): 88 """ 89 Return the Valkey client. 90 """ 91 if '_client' in self.__dict__: 92 return self.__dict__['_client'] 93 94 valkey = mrsm.attempt_import('valkey') 95 96 if 'uri' in self.__dict__: 97 self._client = valkey.Valkey.from_url(self.__dict__.get('uri')) 98 return self._client 99 100 optional_kwargs = { 101 key: self.__dict__.get(key) 102 for key in self.OPTIONAL_ATTRIBUTES 103 if key in self.__dict__ 104 } 105 connection_kwargs = { 106 'host': self.host, 107 **optional_kwargs 108 } 109 110 self._client = valkey.Valkey(**connection_kwargs) 111 return self._client 112 113 @property 114 def URI(self) -> str: 115 """ 116 Return the connection URI for this connector. 117 """ 118 import urllib.parse 119 120 if 'uri' in self.__dict__: 121 return self.__dict__.get('uri') 122 123 uri = "valkey://" 124 if 'username' in self.__dict__: 125 uri += urllib.parse.quote_plus(self.username) + ':' 126 127 if 'password' in self.__dict__: 128 uri += urllib.parse.quote_plus(self.password) + '@' 129 130 if 'host' in self.__dict__: 131 uri += self.host 132 133 if 'port' in self.__dict__: 134 uri += f':{self.port}' 135 136 if 'db' in self.__dict__: 137 uri += f"/{self.db}" 138 139 if 'socket_timeout' in self.__dict__: 140 uri += f"?timeout={self.socket_timeout}s" 141 142 return uri 143 144 def set(self, key: str, value: Any, **kwargs: Any) -> bool: 145 """ 146 Set the `key` to `value`. 147 """ 148 return self.client.set(key, value, **kwargs) 149 150 def get(self, key: str, decode: bool = True) -> Union[str, None]: 151 """ 152 Get the value for `key`. 153 """ 154 val = self.client.get(key) 155 if val is None: 156 return None 157 158 return val.decode('utf-8') if decode else val 159 160 def test_connection(self) -> bool: 161 """ 162 Return whether a connection may be established. 163 """ 164 return self.client.ping() 165 166 @classmethod 167 def quote_table(cls, table: str) -> str: 168 """ 169 Return a quoted key. 170 """ 171 return shlex.quote(table) 172 173 @classmethod 174 def get_counter_key(cls, table: str) -> str: 175 """ 176 Return the counter key for a given table. 177 """ 178 table_name = cls.quote_table(table) 179 return f"{table_name}:counter" 180 181 def push_df( 182 self, 183 df: 'pd.DataFrame', 184 table: str, 185 datetime_column: Optional[str] = None, 186 debug: bool = False, 187 ) -> int: 188 """ 189 Append a pandas DataFrame to a table. 190 191 Parameters 192 ---------- 193 df: pd.DataFrame 194 The pandas DataFrame to append to the table. 195 196 table: str 197 The "table" name (root key). 198 199 datetime_column: Optional[str], default None 200 If provided, use this key as the datetime index. 201 202 Returns 203 ------- 204 The current index counter value (how many docs have been pushed). 205 """ 206 from meerschaum.utils.dataframe import to_json 207 docs_str = to_json(df) 208 docs = json.loads(docs_str) 209 return self.push_docs( 210 docs, 211 table, 212 datetime_column=datetime_column, 213 debug=debug, 214 ) 215 216 def push_docs( 217 self, 218 docs: List[Dict[str, Any]], 219 table: str, 220 datetime_column: Optional[str] = None, 221 debug: bool = False, 222 ) -> int: 223 """ 224 Append a list of documents to a table. 225 226 Parameters 227 ---------- 228 docs: List[Dict[str, Any]] 229 The docs to be pushed. 230 All keys and values will be coerced into strings. 231 232 table: str 233 The "table" name (root key). 234 235 datetime_column: Optional[str], default None 236 If set, create a sorted set with this datetime column as the index. 237 Otherwise push the docs to a list. 238 239 Returns 240 ------- 241 The current index counter value (how many docs have been pushed). 242 """ 243 from meerschaum.utils.dtypes import json_serialize_value 244 table_name = self.quote_table(table) 245 datetime_column_key = self.get_datetime_column_key(table) 246 remote_datetime_column = self.get(datetime_column_key) 247 datetime_column = datetime_column or remote_datetime_column 248 dateutil_parser = mrsm.attempt_import('dateutil.parser') 249 250 old_len = ( 251 self.client.zcard(table_name) 252 if datetime_column 253 else self.client.scard(table_name) 254 ) 255 for doc in docs: 256 original_dt_val = ( 257 doc[datetime_column] 258 if datetime_column and datetime_column in doc 259 else 0 260 ) 261 dt_val = ( 262 dateutil_parser.parse(str(original_dt_val)) 263 if not isinstance(original_dt_val, int) 264 else int(original_dt_val) 265 ) if datetime_column else None 266 ts = ( 267 int(dt_val.replace(tzinfo=timezone.utc).timestamp()) 268 if isinstance(dt_val, datetime) 269 else int(dt_val) 270 ) if datetime_column else None 271 doc_str = json.dumps( 272 doc, 273 default=json_serialize_value, 274 separators=(',', ':'), 275 sort_keys=True, 276 ) 277 if datetime_column: 278 self.client.zadd(table_name, {doc_str: ts}) 279 else: 280 self.client.sadd(table_name, doc_str) 281 282 if datetime_column: 283 self.set(datetime_column_key, datetime_column) 284 new_len = ( 285 self.client.zcard(table_name) 286 if datetime_column 287 else self.client.scard(table_name) 288 ) 289 290 return new_len - old_len 291 292 def _push_hash_docs_to_list(self, docs: List[Dict[str, Any]], table: str) -> int: 293 table_name = self.quote_table(table) 294 next_ix = max(self.client.llen(table_name) or 0, 1) 295 for i, doc in enumerate(docs): 296 doc_key = f"{table_name}:{next_ix + i}" 297 self.client.hset( 298 doc_key, 299 mapping={ 300 str(k): str(v) 301 for k, v in doc.items() 302 }, 303 ) 304 self.client.rpush(table_name, doc_key) 305 306 return next_ix + len(docs) 307 308 def get_datetime_column_key(self, table: str) -> str: 309 """ 310 Return the key to store the datetime index for `table`. 311 """ 312 table_name = self.quote_table(table) 313 return f'{table_name}:datetime_column' 314 315 def read( 316 self, 317 table: str, 318 begin: Union[datetime, int, str, None] = None, 319 end: Union[datetime, int, str, None] = None, 320 params: Optional[Dict[str, Any]] = None, 321 datetime_column: Optional[str] = None, 322 select_columns: Optional[List[str]] = None, 323 omit_columns: Optional[List[str]] = None, 324 debug: bool = False 325 ) -> Union['pd.DataFrame', None]: 326 """ 327 Query the table and return the result dataframe. 328 329 Parameters 330 ---------- 331 table: str 332 The "table" name to be queried. 333 334 begin: Union[datetime, int, str, None], default None 335 If provided, only return rows greater than or equal to this datetime. 336 337 end: Union[datetime, int, str, None], default None 338 If provided, only return rows older than this datetime. 339 340 params: Optional[Dict[str, Any]] 341 Additional Meerschaum filter parameters. 342 343 datetime_column: Optional[str], default None 344 If provided, use this column for the datetime index. 345 Otherwise infer from the table metadata. 346 347 select_columns: Optional[List[str]], default None 348 If provided, only return these columns. 349 350 omit_columns: Optional[List[str]], default None 351 If provided, do not include these columns in the result. 352 353 Returns 354 ------- 355 A Pandas DataFrame of the result, or `None`. 356 """ 357 from meerschaum.utils.dataframe import parse_df_datetimes, query_df 358 docs = self.read_docs( 359 table, 360 begin=begin, 361 end=end, 362 debug=debug, 363 ) 364 df = parse_df_datetimes(docs) 365 datetime_column_key = self.get_datetime_column_key(table) 366 datetime_column = datetime_column or self.get(datetime_column_key) 367 368 return query_df( 369 df, 370 begin=(begin if datetime_column is not None else None), 371 end=(end if datetime_column is not None else None), 372 params=params, 373 datetime_column=datetime_column, 374 select_columns=select_columns, 375 omit_columns=omit_columns, 376 inplace=True, 377 reset_index=True, 378 debug=debug, 379 ) 380 381 def read_docs( 382 self, 383 table: str, 384 begin: Union[datetime, int, str, None] = None, 385 end: Union[datetime, int, str, None] = None, 386 debug: bool = False, 387 ) -> List[Dict[str, str]]: 388 """ 389 Return a list of previously pushed docs. 390 391 Parameters 392 ---------- 393 table: str 394 The "table" name (root key) under which the docs were pushed. 395 396 begin: Union[datetime, int, str, None], default None 397 If provided and the table was created with a datetime index, only return documents 398 newer than this datetime. 399 If the table was not created with a datetime index and `begin` is an `int`, 400 return documents with a positional index greater than or equal to this value. 401 402 end: Union[datetime, int, str, None], default None 403 If provided and the table was created with a datetime index, only return documents 404 older than this datetime. 405 If the table was not created with a datetime index and `begin` is an `int`, 406 return documents with a positional index less than this value. 407 408 Returns 409 ------- 410 A list of dictionaries, where all keys and values are strings. 411 """ 412 from meerschaum.utils.dtypes import coerce_timezone 413 table_name = self.quote_table(table) 414 datetime_column_key = self.get_datetime_column_key(table) 415 datetime_column = self.get(datetime_column_key) 416 417 if debug: 418 dprint(f"Reading documents from '{table}' with {begin=}, {end=}") 419 420 if not datetime_column: 421 return [ 422 json.loads(doc_bytes.decode('utf-8')) 423 for doc_bytes in self.client.smembers(table_name) 424 ] 425 426 dateutil_parser = mrsm.attempt_import('dateutil.parser') 427 428 if isinstance(begin, str): 429 begin = coerce_timezone(dateutil_parser.parse(begin)) 430 431 if isinstance(end, str): 432 end = coerce_timezone(dateutil_parser.parse(end)) 433 434 begin_ts = ( 435 ( 436 int(begin.replace(tzinfo=timezone.utc).timestamp()) 437 if isinstance(begin, datetime) 438 else int(begin) 439 ) 440 if begin is not None else '-inf' 441 ) 442 end_ts = ( 443 ( 444 int(end.replace(tzinfo=timezone.utc).timestamp()) 445 if isinstance(end, datetime) 446 else int(end) 447 ) 448 if end is not None else '+inf' 449 ) 450 451 if debug: 452 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 453 454 return [ 455 json.loads(doc_bytes.decode('utf-8')) 456 for doc_bytes in self.client.zrangebyscore( 457 table_name, 458 begin_ts, 459 end_ts, 460 withscores=False, 461 ) 462 ] 463 464 def _read_docs_from_list( 465 self, 466 table: str, 467 begin_ix: Optional[int] = 0, 468 end_ix: Optional[int] = -1, 469 debug: bool = False, 470 ): 471 """ 472 Read a list of documents from a "table". 473 474 Parameters 475 ---------- 476 table: str 477 The "table" (root key) from which to read docs. 478 479 begin_ix: Optional[int], default 0 480 If provided, only read documents from this starting index. 481 482 end_ix: Optional[int], default -1 483 If provided, only read documents up to (not including) this index. 484 485 Returns 486 ------- 487 A list of documents. 488 """ 489 if begin_ix is None: 490 begin_ix = 0 491 492 if end_ix == 0: 493 return 494 495 if end_ix is None: 496 end_ix = -1 497 else: 498 end_ix -= 1 499 500 table_name = self.quote_table(table) 501 doc_keys = self.client.lrange(table_name, begin_ix, end_ix) 502 for doc_key in doc_keys: 503 yield { 504 key.decode('utf-8'): value.decode('utf-8') 505 for key, value in self.client.hgetall(doc_key).items() 506 } 507 508 def drop_table(self, table: str, debug: bool = False) -> None: 509 """ 510 Drop a "table" of documents. 511 512 Parameters 513 ---------- 514 table: str 515 The "table" name (root key) to be deleted. 516 """ 517 table_name = self.quote_table(table) 518 datetime_column_key = self.get_datetime_column_key(table) 519 self.client.delete(table_name) 520 self.client.delete(datetime_column_key) 521 522 @classmethod 523 def get_entity_key(cls, *keys: Any) -> str: 524 """ 525 Return a joined key to set an entity. 526 """ 527 if not keys: 528 raise ValueError("No keys to be joined.") 529 530 for key in keys: 531 if cls.KEY_SEPARATOR in str(key): 532 raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.") 533 534 return cls.KEY_SEPARATOR.join([str(key) for key in keys])
Manage a Valkey instance.
Build a ValkeyConnector
from connection attributes or a URI string.
86 @property 87 def client(self): 88 """ 89 Return the Valkey client. 90 """ 91 if '_client' in self.__dict__: 92 return self.__dict__['_client'] 93 94 valkey = mrsm.attempt_import('valkey') 95 96 if 'uri' in self.__dict__: 97 self._client = valkey.Valkey.from_url(self.__dict__.get('uri')) 98 return self._client 99 100 optional_kwargs = { 101 key: self.__dict__.get(key) 102 for key in self.OPTIONAL_ATTRIBUTES 103 if key in self.__dict__ 104 } 105 connection_kwargs = { 106 'host': self.host, 107 **optional_kwargs 108 } 109 110 self._client = valkey.Valkey(**connection_kwargs) 111 return self._client
Return the Valkey client.
113 @property 114 def URI(self) -> str: 115 """ 116 Return the connection URI for this connector. 117 """ 118 import urllib.parse 119 120 if 'uri' in self.__dict__: 121 return self.__dict__.get('uri') 122 123 uri = "valkey://" 124 if 'username' in self.__dict__: 125 uri += urllib.parse.quote_plus(self.username) + ':' 126 127 if 'password' in self.__dict__: 128 uri += urllib.parse.quote_plus(self.password) + '@' 129 130 if 'host' in self.__dict__: 131 uri += self.host 132 133 if 'port' in self.__dict__: 134 uri += f':{self.port}' 135 136 if 'db' in self.__dict__: 137 uri += f"/{self.db}" 138 139 if 'socket_timeout' in self.__dict__: 140 uri += f"?timeout={self.socket_timeout}s" 141 142 return uri
Return the connection URI for this connector.
144 def set(self, key: str, value: Any, **kwargs: Any) -> bool: 145 """ 146 Set the `key` to `value`. 147 """ 148 return self.client.set(key, value, **kwargs)
Set the key
to value
.
150 def get(self, key: str, decode: bool = True) -> Union[str, None]: 151 """ 152 Get the value for `key`. 153 """ 154 val = self.client.get(key) 155 if val is None: 156 return None 157 158 return val.decode('utf-8') if decode else val
Get the value for key
.
160 def test_connection(self) -> bool: 161 """ 162 Return whether a connection may be established. 163 """ 164 return self.client.ping()
Return whether a connection may be established.
166 @classmethod 167 def quote_table(cls, table: str) -> str: 168 """ 169 Return a quoted key. 170 """ 171 return shlex.quote(table)
Return a quoted key.
173 @classmethod 174 def get_counter_key(cls, table: str) -> str: 175 """ 176 Return the counter key for a given table. 177 """ 178 table_name = cls.quote_table(table) 179 return f"{table_name}:counter"
Return the counter key for a given table.
181 def push_df( 182 self, 183 df: 'pd.DataFrame', 184 table: str, 185 datetime_column: Optional[str] = None, 186 debug: bool = False, 187 ) -> int: 188 """ 189 Append a pandas DataFrame to a table. 190 191 Parameters 192 ---------- 193 df: pd.DataFrame 194 The pandas DataFrame to append to the table. 195 196 table: str 197 The "table" name (root key). 198 199 datetime_column: Optional[str], default None 200 If provided, use this key as the datetime index. 201 202 Returns 203 ------- 204 The current index counter value (how many docs have been pushed). 205 """ 206 from meerschaum.utils.dataframe import to_json 207 docs_str = to_json(df) 208 docs = json.loads(docs_str) 209 return self.push_docs( 210 docs, 211 table, 212 datetime_column=datetime_column, 213 debug=debug, 214 )
Append a pandas DataFrame to a table.
Parameters
- df (pd.DataFrame): The pandas DataFrame to append to the table.
- table (str): The "table" name (root key).
- datetime_column (Optional[str], default None): If provided, use this key as the datetime index.
Returns
- The current index counter value (how many docs have been pushed).
216 def push_docs( 217 self, 218 docs: List[Dict[str, Any]], 219 table: str, 220 datetime_column: Optional[str] = None, 221 debug: bool = False, 222 ) -> int: 223 """ 224 Append a list of documents to a table. 225 226 Parameters 227 ---------- 228 docs: List[Dict[str, Any]] 229 The docs to be pushed. 230 All keys and values will be coerced into strings. 231 232 table: str 233 The "table" name (root key). 234 235 datetime_column: Optional[str], default None 236 If set, create a sorted set with this datetime column as the index. 237 Otherwise push the docs to a list. 238 239 Returns 240 ------- 241 The current index counter value (how many docs have been pushed). 242 """ 243 from meerschaum.utils.dtypes import json_serialize_value 244 table_name = self.quote_table(table) 245 datetime_column_key = self.get_datetime_column_key(table) 246 remote_datetime_column = self.get(datetime_column_key) 247 datetime_column = datetime_column or remote_datetime_column 248 dateutil_parser = mrsm.attempt_import('dateutil.parser') 249 250 old_len = ( 251 self.client.zcard(table_name) 252 if datetime_column 253 else self.client.scard(table_name) 254 ) 255 for doc in docs: 256 original_dt_val = ( 257 doc[datetime_column] 258 if datetime_column and datetime_column in doc 259 else 0 260 ) 261 dt_val = ( 262 dateutil_parser.parse(str(original_dt_val)) 263 if not isinstance(original_dt_val, int) 264 else int(original_dt_val) 265 ) if datetime_column else None 266 ts = ( 267 int(dt_val.replace(tzinfo=timezone.utc).timestamp()) 268 if isinstance(dt_val, datetime) 269 else int(dt_val) 270 ) if datetime_column else None 271 doc_str = json.dumps( 272 doc, 273 default=json_serialize_value, 274 separators=(',', ':'), 275 sort_keys=True, 276 ) 277 if datetime_column: 278 self.client.zadd(table_name, {doc_str: ts}) 279 else: 280 self.client.sadd(table_name, doc_str) 281 282 if datetime_column: 283 self.set(datetime_column_key, datetime_column) 284 new_len = ( 285 self.client.zcard(table_name) 286 if datetime_column 287 else self.client.scard(table_name) 288 ) 289 290 return new_len - old_len
Append a list of documents to a table.
Parameters
- docs (List[Dict[str, Any]]): The docs to be pushed. All keys and values will be coerced into strings.
- table (str): The "table" name (root key).
- datetime_column (Optional[str], default None): If set, create a sorted set with this datetime column as the index. Otherwise push the docs to a list.
Returns
- The current index counter value (how many docs have been pushed).
308 def get_datetime_column_key(self, table: str) -> str: 309 """ 310 Return the key to store the datetime index for `table`. 311 """ 312 table_name = self.quote_table(table) 313 return f'{table_name}:datetime_column'
Return the key to store the datetime index for table
.
315 def read( 316 self, 317 table: str, 318 begin: Union[datetime, int, str, None] = None, 319 end: Union[datetime, int, str, None] = None, 320 params: Optional[Dict[str, Any]] = None, 321 datetime_column: Optional[str] = None, 322 select_columns: Optional[List[str]] = None, 323 omit_columns: Optional[List[str]] = None, 324 debug: bool = False 325 ) -> Union['pd.DataFrame', None]: 326 """ 327 Query the table and return the result dataframe. 328 329 Parameters 330 ---------- 331 table: str 332 The "table" name to be queried. 333 334 begin: Union[datetime, int, str, None], default None 335 If provided, only return rows greater than or equal to this datetime. 336 337 end: Union[datetime, int, str, None], default None 338 If provided, only return rows older than this datetime. 339 340 params: Optional[Dict[str, Any]] 341 Additional Meerschaum filter parameters. 342 343 datetime_column: Optional[str], default None 344 If provided, use this column for the datetime index. 345 Otherwise infer from the table metadata. 346 347 select_columns: Optional[List[str]], default None 348 If provided, only return these columns. 349 350 omit_columns: Optional[List[str]], default None 351 If provided, do not include these columns in the result. 352 353 Returns 354 ------- 355 A Pandas DataFrame of the result, or `None`. 356 """ 357 from meerschaum.utils.dataframe import parse_df_datetimes, query_df 358 docs = self.read_docs( 359 table, 360 begin=begin, 361 end=end, 362 debug=debug, 363 ) 364 df = parse_df_datetimes(docs) 365 datetime_column_key = self.get_datetime_column_key(table) 366 datetime_column = datetime_column or self.get(datetime_column_key) 367 368 return query_df( 369 df, 370 begin=(begin if datetime_column is not None else None), 371 end=(end if datetime_column is not None else None), 372 params=params, 373 datetime_column=datetime_column, 374 select_columns=select_columns, 375 omit_columns=omit_columns, 376 inplace=True, 377 reset_index=True, 378 debug=debug, 379 )
Query the table and return the result dataframe.
Parameters
- table (str): The "table" name to be queried.
- begin (Union[datetime, int, str, None], default None): If provided, only return rows greater than or equal to this datetime.
- end (Union[datetime, int, str, None], default None): If provided, only return rows older than this datetime.
- params (Optional[Dict[str, Any]]): Additional Meerschaum filter parameters.
- datetime_column (Optional[str], default None): If provided, use this column for the datetime index. Otherwise infer from the table metadata.
- select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
Returns
- A Pandas DataFrame of the result, or
None
.
381 def read_docs( 382 self, 383 table: str, 384 begin: Union[datetime, int, str, None] = None, 385 end: Union[datetime, int, str, None] = None, 386 debug: bool = False, 387 ) -> List[Dict[str, str]]: 388 """ 389 Return a list of previously pushed docs. 390 391 Parameters 392 ---------- 393 table: str 394 The "table" name (root key) under which the docs were pushed. 395 396 begin: Union[datetime, int, str, None], default None 397 If provided and the table was created with a datetime index, only return documents 398 newer than this datetime. 399 If the table was not created with a datetime index and `begin` is an `int`, 400 return documents with a positional index greater than or equal to this value. 401 402 end: Union[datetime, int, str, None], default None 403 If provided and the table was created with a datetime index, only return documents 404 older than this datetime. 405 If the table was not created with a datetime index and `begin` is an `int`, 406 return documents with a positional index less than this value. 407 408 Returns 409 ------- 410 A list of dictionaries, where all keys and values are strings. 411 """ 412 from meerschaum.utils.dtypes import coerce_timezone 413 table_name = self.quote_table(table) 414 datetime_column_key = self.get_datetime_column_key(table) 415 datetime_column = self.get(datetime_column_key) 416 417 if debug: 418 dprint(f"Reading documents from '{table}' with {begin=}, {end=}") 419 420 if not datetime_column: 421 return [ 422 json.loads(doc_bytes.decode('utf-8')) 423 for doc_bytes in self.client.smembers(table_name) 424 ] 425 426 dateutil_parser = mrsm.attempt_import('dateutil.parser') 427 428 if isinstance(begin, str): 429 begin = coerce_timezone(dateutil_parser.parse(begin)) 430 431 if isinstance(end, str): 432 end = coerce_timezone(dateutil_parser.parse(end)) 433 434 begin_ts = ( 435 ( 436 int(begin.replace(tzinfo=timezone.utc).timestamp()) 437 if isinstance(begin, datetime) 438 else int(begin) 439 ) 440 if begin is not None else '-inf' 441 ) 442 end_ts = ( 443 ( 444 int(end.replace(tzinfo=timezone.utc).timestamp()) 445 if isinstance(end, datetime) 446 else int(end) 447 ) 448 if end is not None else '+inf' 449 ) 450 451 if debug: 452 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 453 454 return [ 455 json.loads(doc_bytes.decode('utf-8')) 456 for doc_bytes in self.client.zrangebyscore( 457 table_name, 458 begin_ts, 459 end_ts, 460 withscores=False, 461 ) 462 ]
Return a list of previously pushed docs.
Parameters
- table (str): The "table" name (root key) under which the docs were pushed.
- begin (Union[datetime, int, str, None], default None):
If provided and the table was created with a datetime index, only return documents
newer than this datetime.
If the table was not created with a datetime index and
begin
is anint
, return documents with a positional index greater than or equal to this value. - end (Union[datetime, int, str, None], default None):
If provided and the table was created with a datetime index, only return documents
older than this datetime.
If the table was not created with a datetime index and
begin
is anint
, return documents with a positional index less than this value.
Returns
- A list of dictionaries, where all keys and values are strings.
508 def drop_table(self, table: str, debug: bool = False) -> None: 509 """ 510 Drop a "table" of documents. 511 512 Parameters 513 ---------- 514 table: str 515 The "table" name (root key) to be deleted. 516 """ 517 table_name = self.quote_table(table) 518 datetime_column_key = self.get_datetime_column_key(table) 519 self.client.delete(table_name) 520 self.client.delete(datetime_column_key)
Drop a "table" of documents.
Parameters
- table (str): The "table" name (root key) to be deleted.
522 @classmethod 523 def get_entity_key(cls, *keys: Any) -> str: 524 """ 525 Return a joined key to set an entity. 526 """ 527 if not keys: 528 raise ValueError("No keys to be joined.") 529 530 for key in keys: 531 if cls.KEY_SEPARATOR in str(key): 532 raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.") 533 534 return cls.KEY_SEPARATOR.join([str(key) for key in keys])
Return a joined key to set an entity.
142def register_pipe( 143 self, 144 pipe: mrsm.Pipe, 145 debug: bool = False, 146 **kwargs: Any 147) -> SuccessTuple: 148 """ 149 Insert the pipe's attributes into the internal `pipes` table. 150 151 Parameters 152 ---------- 153 pipe: mrsm.Pipe 154 The pipe to be registered. 155 156 Returns 157 ------- 158 A `SuccessTuple` of the result. 159 """ 160 attributes = { 161 'connector_keys': str(pipe.connector_keys), 162 'metric_key': str(pipe.metric_key), 163 'location_key': str(pipe.location_key), 164 } 165 parameters_str = json.dumps( 166 pipe._attributes.get('parameters', {}), 167 separators=(',', ':'), 168 ) 169 170 pipe_key = get_pipe_key(pipe) 171 parameters_key = get_pipe_parameters_key(pipe) 172 173 try: 174 existing_pipe_id = self.get(pipe_key) 175 if existing_pipe_id is not None: 176 return False, f"{pipe} is already registered." 177 178 pipe_id = self.client.incr(PIPES_COUNTER) 179 _ = self.push_docs( 180 [{'pipe_id': pipe_id, **attributes}], 181 PIPES_TABLE, 182 datetime_column='pipe_id', 183 debug=debug, 184 ) 185 self.set(pipe_key, pipe_id) 186 self.set(parameters_key, parameters_str) 187 188 except Exception as e: 189 return False, f"Failed to register {pipe}:\n{e}" 190 191 return True, "Success"
Insert the pipe's attributes into the internal pipes
table.
Parameters
- pipe (mrsm.Pipe): The pipe to be registered.
Returns
- A
SuccessTuple
of the result.
194def get_pipe_id( 195 self, 196 pipe: mrsm.Pipe, 197 debug: bool = False, 198 **kwargs: Any 199) -> Union[str, int, None]: 200 """ 201 Return the `_id` for the pipe if it exists. 202 203 Parameters 204 ---------- 205 pipe: mrsm.Pipe 206 The pipe whose `_id` to fetch. 207 208 Returns 209 ------- 210 The `_id` for the pipe's document or `None`. 211 """ 212 pipe_key = get_pipe_key(pipe) 213 try: 214 return int(self.get(pipe_key)) 215 except Exception: 216 pass 217 return None
Return the _id
for the pipe if it exists.
Parameters
- pipe (mrsm.Pipe):
The pipe whose
_id
to fetch.
Returns
- The
_id
for the pipe's document orNone
.
220def get_pipe_attributes( 221 self, 222 pipe: mrsm.Pipe, 223 debug: bool = False, 224 **kwargs: Any 225) -> Dict[str, Any]: 226 """ 227 Return the pipe's document from the internal `pipes` collection. 228 229 Parameters 230 ---------- 231 pipe: mrsm.Pipe 232 The pipe whose attributes should be retrieved. 233 234 Returns 235 ------- 236 The document that matches the keys of the pipe. 237 """ 238 pipe_id = pipe.get_id(debug=debug) 239 if pipe_id is None: 240 return {} 241 242 parameters_key = get_pipe_parameters_key(pipe) 243 parameters_str = self.get(parameters_key) 244 245 parameters = json.loads(parameters_str) if parameters_str else {} 246 247 attributes = { 248 'connector_keys': pipe.connector_keys, 249 'metric_key': pipe.metric_key, 250 'location_key': pipe.location_key, 251 'parameters': parameters, 252 'pipe_id': pipe_id, 253 } 254 return attributes
Return the pipe's document from the internal pipes
collection.
Parameters
- pipe (mrsm.Pipe): The pipe whose attributes should be retrieved.
Returns
- The document that matches the keys of the pipe.
257def edit_pipe( 258 self, 259 pipe: mrsm.Pipe, 260 debug: bool = False, 261 **kwargs: Any 262) -> mrsm.SuccessTuple: 263 """ 264 Edit the attributes of the pipe. 265 266 Parameters 267 ---------- 268 pipe: mrsm.Pipe 269 The pipe whose in-memory parameters must be persisted. 270 271 Returns 272 ------- 273 A `SuccessTuple` indicating success. 274 """ 275 pipe_id = pipe.get_id(debug=debug) 276 if pipe_id is None: 277 return False, f"{pipe} is not registered." 278 279 parameters_key = get_pipe_parameters_key(pipe) 280 parameters_str = json.dumps(pipe.get_parameters(apply_symlinks=False), separators=(',', ':')) 281 self.set(parameters_key, parameters_str) 282 return True, "Success"
Edit the attributes of the pipe.
Parameters
- pipe (mrsm.Pipe): The pipe whose in-memory parameters must be persisted.
Returns
- A
SuccessTuple
indicating success.
285def pipe_exists( 286 self, 287 pipe: mrsm.Pipe, 288 debug: bool = False, 289 **kwargs: Any 290) -> bool: 291 """ 292 Check whether a pipe's target table exists. 293 294 Parameters 295 ---------- 296 pipe: mrsm.Pipe 297 The pipe to check whether its table exists. 298 299 Returns 300 ------- 301 A `bool` indicating the table exists. 302 """ 303 table_name = self.quote_table(pipe.target) 304 return self.client.exists(table_name) != 0
Check whether a pipe's target table exists.
Parameters
- pipe (mrsm.Pipe): The pipe to check whether its table exists.
Returns
- A
bool
indicating the table exists.
307def drop_pipe( 308 self, 309 pipe: mrsm.Pipe, 310 debug: bool = False, 311 **kwargs: Any 312) -> mrsm.SuccessTuple: 313 """ 314 Drop a pipe's collection if it exists. 315 316 Parameters 317 ---------- 318 pipe: mrsm.Pipe 319 The pipe to be dropped. 320 321 Returns 322 ------- 323 A `SuccessTuple` indicating success. 324 """ 325 if not pipe.exists(debug=debug): 326 return True, f"{pipe} does not exist, so it was not dropped." 327 328 table_name = self.quote_table(pipe.target) 329 dt_col = pipe.columns.get('datetime', None) 330 331 try: 332 members = ( 333 self.client.zrange(table_name, 0, -1) 334 if dt_col 335 else self.client.smembers(table_name) 336 ) 337 338 keys_to_delete = [] 339 for member_bytes in members: 340 member_str = member_bytes.decode('utf-8') 341 member_doc = json.loads(member_str) 342 ix_str = member_doc.get('ix') 343 if not ix_str: 344 continue 345 346 ix_doc = string_to_dict(ix_str.replace(COLON, ':')) 347 doc_key = self.get_document_key(ix_doc, list(ix_doc.keys()), table_name) 348 keys_to_delete.append(doc_key) 349 350 if keys_to_delete: 351 batch_size = 1000 352 for i in range(0, len(keys_to_delete), batch_size): 353 batch = keys_to_delete[i:i+batch_size] 354 self.client.delete(*batch) 355 356 except Exception as e: 357 return False, f"Failed to delete documents for {pipe}:\n{e}" 358 359 try: 360 self.drop_table(pipe.target, debug=debug) 361 except Exception as e: 362 return False, f"Failed to drop {pipe}:\n{e}" 363 364 if 'valkey' not in pipe.parameters: 365 return True, "Success" 366 367 pipe._attributes['parameters']['valkey']['dtypes'] = {} 368 if not pipe.temporary: 369 edit_success, edit_msg = pipe.edit(debug=debug) 370 if not edit_success: 371 return edit_success, edit_msg 372 373 return True, "Success"
Drop a pipe's collection if it exists.
Parameters
- pipe (mrsm.Pipe): The pipe to be dropped.
Returns
- A
SuccessTuple
indicating success.
376def delete_pipe( 377 self, 378 pipe: mrsm.Pipe, 379 debug: bool = False, 380 **kwargs: Any 381) -> mrsm.SuccessTuple: 382 """ 383 Delete a pipe's registration from the `pipes` collection. 384 385 Parameters 386 ---------- 387 pipe: mrsm.Pipe 388 The pipe to be deleted. 389 390 Returns 391 ------- 392 A `SuccessTuple` indicating success. 393 """ 394 drop_success, drop_message = pipe.drop(debug=debug) 395 if not drop_success: 396 return drop_success, drop_message 397 398 pipe_id = self.get_pipe_id(pipe, debug=debug) 399 if pipe_id is None: 400 return False, f"{pipe} is not registered." 401 402 pipe_key = get_pipe_key(pipe) 403 parameters_key = get_pipe_parameters_key(pipe) 404 self.client.delete(pipe_key) 405 self.client.delete(parameters_key) 406 df = self.read(PIPES_TABLE, params={'pipe_id': pipe_id}) 407 docs = df.to_dict(orient='records') 408 if docs: 409 doc = docs[0] 410 doc_str = json.dumps( 411 doc, 412 default=json_serialize_value, 413 separators=(',', ':'), 414 sort_keys=True, 415 ) 416 self.client.zrem(PIPES_TABLE, doc_str) 417 return True, "Success"
Delete a pipe's registration from the pipes
collection.
Parameters
- pipe (mrsm.Pipe): The pipe to be deleted.
Returns
- A
SuccessTuple
indicating success.
420def get_pipe_data( 421 self, 422 pipe: mrsm.Pipe, 423 select_columns: Optional[List[str]] = None, 424 omit_columns: Optional[List[str]] = None, 425 begin: Union[datetime, int, None] = None, 426 end: Union[datetime, int, None] = None, 427 params: Optional[Dict[str, Any]] = None, 428 debug: bool = False, 429 **kwargs: Any 430) -> Union['pd.DataFrame', None]: 431 """ 432 Query a pipe's target table and return the DataFrame. 433 434 Parameters 435 ---------- 436 pipe: mrsm.Pipe 437 The pipe with the target table from which to read. 438 439 select_columns: Optional[List[str]], default None 440 If provided, only select these given columns. 441 Otherwise select all available columns (i.e. `SELECT *`). 442 443 omit_columns: Optional[List[str]], default None 444 If provided, remove these columns from the selection. 445 446 begin: Union[datetime, int, None], default None 447 The earliest `datetime` value to search from (inclusive). 448 449 end: Union[datetime, int, None], default None 450 The lastest `datetime` value to search from (exclusive). 451 452 params: Optional[Dict[str, str]], default None 453 Additional filters to apply to the query. 454 455 Returns 456 ------- 457 The target table's data as a DataFrame. 458 """ 459 if not pipe.exists(debug=debug): 460 return None 461 462 from meerschaum.utils.dataframe import query_df, parse_df_datetimes 463 from meerschaum.utils.dtypes import are_dtypes_equal 464 465 valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {}) 466 dt_col = pipe.columns.get('datetime', None) 467 table_name = self.quote_table(pipe.target) 468 indices = [col for col in pipe.columns.values() if col] 469 ix_docs = [ 470 string_to_dict(doc.get('ix', '').replace(COLON, ':')) 471 for doc in self.read_docs( 472 pipe.target, 473 begin=begin, 474 end=end, 475 debug=debug, 476 ) 477 ] 478 try: 479 docs_strings = [ 480 self.get( 481 self.get_document_key( 482 doc, 483 indices, 484 table_name, 485 ) 486 ) 487 for doc in ix_docs 488 ] 489 except Exception as e: 490 warn(f"Failed to fetch documents for {pipe}:\n{e}") 491 docs_strings = [] 492 493 docs = [ 494 json.loads(doc_str) 495 for doc_str in docs_strings 496 if doc_str 497 ] 498 ignore_dt_cols = [ 499 col 500 for col, dtype in pipe.dtypes.items() 501 if not are_dtypes_equal(str(dtype), 'datetime') 502 ] 503 504 df = parse_df_datetimes( 505 docs, 506 ignore_cols=ignore_dt_cols, 507 chunksize=kwargs.get('chunksize', None), 508 strip_timezone=(pipe.tzinfo is None), 509 debug=debug, 510 ) 511 for col, typ in valkey_dtypes.items(): 512 try: 513 df[col] = df[col].astype(typ) 514 except Exception: 515 pass 516 517 df = pipe.enforce_dtypes(df, debug=debug) 518 519 if len(df) == 0: 520 return query_df(df, select_columns=select_columns, omit_columns=omit_columns) 521 522 return query_df( 523 df, 524 select_columns=select_columns, 525 omit_columns=omit_columns, 526 params=params, 527 begin=begin, 528 end=end, 529 datetime_column=dt_col, 530 inplace=True, 531 reset_index=True, 532 )
Query a pipe's target table and return the DataFrame.
Parameters
- pipe (mrsm.Pipe): The pipe with the target table from which to read.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, None], default None):
The earliest
datetime
value to search from (inclusive). - end (Union[datetime, int, None], default None):
The lastest
datetime
value to search from (exclusive). - params (Optional[Dict[str, str]], default None): Additional filters to apply to the query.
Returns
- The target table's data as a DataFrame.
535def sync_pipe( 536 self, 537 pipe: mrsm.Pipe, 538 df: 'pd.DataFrame' = None, 539 check_existing: bool = True, 540 debug: bool = False, 541 **kwargs: Any 542) -> mrsm.SuccessTuple: 543 """ 544 Upsert new documents into the pipe's collection. 545 546 Parameters 547 ---------- 548 pipe: mrsm.Pipe 549 The pipe whose collection should receive the new documents. 550 551 df: Union['pd.DataFrame', Iterator['pd.DataFrame']], default None 552 The data to be synced. 553 554 check_existing: bool, default True 555 If `False`, do not check the documents against existing data and instead insert directly. 556 557 Returns 558 ------- 559 A `SuccessTuple` indicating success. 560 """ 561 from meerschaum.utils.dtypes import are_dtypes_equal 562 dt_col = pipe.columns.get('datetime', None) 563 indices = [col for col in pipe.columns.values() if col] 564 table_name = self.quote_table(pipe.target) 565 is_dask = 'dask' in df.__module__ 566 if is_dask: 567 df = df.compute() 568 upsert = pipe.parameters.get('upsert', False) 569 static = pipe.parameters.get('static', False) 570 571 def _serialize_indices_docs(_docs): 572 return [ 573 { 574 'ix': self.get_document_key(doc, indices), 575 **( 576 { 577 dt_col: doc.get(dt_col, 0) 578 } 579 if dt_col 580 else {} 581 ) 582 } 583 for doc in _docs 584 ] 585 586 valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {}) 587 new_dtypes = { 588 str(key): str(val) 589 for key, val in df.dtypes.items() 590 if str(key) not in valkey_dtypes 591 } 592 for col, typ in {c: v for c, v in valkey_dtypes.items()}.items(): 593 if col in df.columns: 594 try: 595 df[col] = df[col].astype(typ) 596 except Exception: 597 import traceback 598 traceback.print_exc() 599 valkey_dtypes[col] = 'string' 600 new_dtypes[col] = 'string' 601 df[col] = df[col].astype('string') 602 603 if new_dtypes and (not static or not valkey_dtypes): 604 valkey_dtypes.update(new_dtypes) 605 update_success, update_msg = pipe.update_parameters( 606 {'valkey': {'dtypes': valkey_dtypes}}, 607 debug=debug, 608 ) 609 if not update_success: 610 return False, update_msg 611 612 unseen_df, update_df, delta_df = ( 613 pipe.filter_existing(df, include_unchanged_columns=True, debug=debug) 614 if check_existing and not upsert 615 else (None, df, df) 616 ) 617 num_insert = len(unseen_df) if unseen_df is not None else 0 618 num_update = len(update_df) if update_df is not None else 0 619 msg = ( 620 f"Inserted {num_insert}, updated {num_update} rows." 621 if not upsert 622 else f"Upserted {num_update} rows." 623 ) 624 if len(delta_df) == 0: 625 return True, msg 626 627 unseen_docs = unseen_df.to_dict(orient='records') if unseen_df is not None else [] 628 unseen_indices_docs = _serialize_indices_docs(unseen_docs) 629 unseen_ix_vals = { 630 self.get_document_key(doc, indices, table_name): serialize_document(doc) 631 for doc in unseen_docs 632 } 633 for key, val in unseen_ix_vals.items(): 634 try: 635 self.set(key, val) 636 except Exception as e: 637 return False, f"Failed to set keys for {pipe}:\n{e}" 638 639 try: 640 self.push_docs( 641 unseen_indices_docs, 642 pipe.target, 643 datetime_column=dt_col, 644 debug=debug, 645 ) 646 except Exception as e: 647 return False, f"Failed to push docs to '{pipe.target}':\n{e}" 648 649 update_docs = update_df.to_dict(orient='records') if update_df is not None else [] 650 update_ix_docs = { 651 self.get_document_key(doc, indices, table_name): doc 652 for doc in update_docs 653 } 654 existing_docs_data = { 655 key: self.get(key) 656 for key in update_ix_docs 657 } if pipe.exists(debug=debug) else {} 658 existing_docs = { 659 key: json.loads(data) 660 for key, data in existing_docs_data.items() 661 if data 662 } 663 new_update_docs = { 664 key: doc 665 for key, doc in update_ix_docs.items() 666 if key not in existing_docs 667 } 668 new_ix_vals = { 669 self.get_document_key(doc, indices, table_name): serialize_document(doc) 670 for doc in new_update_docs.values() 671 } 672 for key, val in new_ix_vals.items(): 673 try: 674 self.set(key, val) 675 except Exception as e: 676 return False, f"Failed to set keys for {pipe}:\n{e}" 677 678 old_update_docs = { 679 key: { 680 **existing_docs[key], 681 **doc 682 } 683 for key, doc in update_ix_docs.items() 684 if key in existing_docs 685 } 686 new_indices_docs = _serialize_indices_docs([doc for doc in new_update_docs.values()]) 687 try: 688 if new_indices_docs: 689 self.push_docs( 690 new_indices_docs, 691 pipe.target, 692 datetime_column=dt_col, 693 debug=debug, 694 ) 695 except Exception as e: 696 return False, f"Failed to upsert '{pipe.target}':\n{e}" 697 698 for key, doc in old_update_docs.items(): 699 try: 700 self.set(key, serialize_document(doc)) 701 except Exception as e: 702 return False, f"Failed to set keys for {pipe}:\n{e}" 703 704 return True, msg
Upsert new documents into the pipe's collection.
Parameters
- pipe (mrsm.Pipe): The pipe whose collection should receive the new documents.
- df (Union['pd.DataFrame', Iterator['pd.DataFrame']], default None): The data to be synced.
- check_existing (bool, default True):
If
False
, do not check the documents against existing data and instead insert directly.
Returns
- A
SuccessTuple
indicating success.
707def get_pipe_columns_types( 708 self, 709 pipe: mrsm.Pipe, 710 debug: bool = False, 711 **kwargs: Any 712) -> Dict[str, str]: 713 """ 714 Return the data types for the columns in the target table for data type enforcement. 715 716 Parameters 717 ---------- 718 pipe: mrsm.Pipe 719 The pipe whose target table contains columns and data types. 720 721 Returns 722 ------- 723 A dictionary mapping columns to data types. 724 """ 725 if not pipe.exists(debug=debug): 726 return {} 727 728 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 729 return { 730 col: get_db_type_from_pd_type(typ, flavor='postgresql') 731 for col, typ in pipe.parameters.get('valkey', {}).get('dtypes', {}).items() 732 }
Return the data types for the columns in the target table for data type enforcement.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table contains columns and data types.
Returns
- A dictionary mapping columns to data types.
735def clear_pipe( 736 self, 737 pipe: mrsm.Pipe, 738 begin: Union[datetime, int, None] = None, 739 end: Union[datetime, int, None] = None, 740 params: Optional[Dict[str, Any]] = None, 741 debug: bool = False, 742) -> mrsm.SuccessTuple: 743 """ 744 Delete rows within `begin`, `end`, and `params`. 745 746 Parameters 747 ---------- 748 pipe: mrsm.Pipe 749 The pipe whose rows to clear. 750 751 begin: Union[datetime, int, None], default None 752 If provided, remove rows >= `begin`. 753 754 end: Union[datetime, int, None], default None 755 If provided, remove rows < `end`. 756 757 params: Optional[Dict[str, Any]], default None 758 If provided, only remove rows which match the `params` filter. 759 760 Returns 761 ------- 762 A `SuccessTuple` indicating success. 763 """ 764 dt_col = pipe.columns.get('datetime', None) 765 766 existing_df = pipe.get_data( 767 begin=begin, 768 end=end, 769 params=params, 770 debug=debug, 771 ) 772 if existing_df is None or len(existing_df) == 0: 773 return True, "Deleted 0 rows." 774 775 docs = existing_df.to_dict(orient='records') 776 table_name = self.quote_table(pipe.target) 777 indices = [col for col in pipe.columns.values() if col] 778 for doc in docs: 779 set_doc_key = self.get_document_key(doc, indices) 780 table_doc_key = self.get_document_key(doc, indices, table_name) 781 try: 782 if dt_col: 783 self.client.zrem(table_name, set_doc_key) 784 else: 785 self.client.srem(table_name, set_doc_key) 786 self.client.delete(table_doc_key) 787 except Exception as e: 788 return False, f"Failed to delete documents:\n{e}" 789 msg = ( 790 f"Deleted {len(docs)} row" 791 + ('s' if len(docs) != 1 else '') 792 + '.' 793 ) 794 return True, msg
Delete rows within begin
, end
, and params
.
Parameters
- pipe (mrsm.Pipe): The pipe whose rows to clear.
- begin (Union[datetime, int, None], default None):
If provided, remove rows >=
begin
. - end (Union[datetime, int, None], default None):
If provided, remove rows <
end
. - params (Optional[Dict[str, Any]], default None):
If provided, only remove rows which match the
params
filter.
Returns
- A
SuccessTuple
indicating success.
797def get_sync_time( 798 self, 799 pipe: mrsm.Pipe, 800 newest: bool = True, 801 **kwargs: Any 802) -> Union[datetime, int, None]: 803 """ 804 Return the newest (or oldest) timestamp in a pipe. 805 """ 806 from meerschaum.utils.dtypes import are_dtypes_equal 807 dt_col = pipe.columns.get('datetime', None) 808 dt_typ = pipe.dtypes.get(dt_col, 'datetime') 809 if not dt_col: 810 return None 811 812 dateutil_parser = mrsm.attempt_import('dateutil.parser') 813 table_name = self.quote_table(pipe.target) 814 try: 815 vals = ( 816 self.client.zrevrange(table_name, 0, 0, withscores=True) 817 if newest 818 else self.client.zrange(table_name, 0, 0, withscores=True) 819 ) 820 if not vals: 821 return None 822 val = vals[0][0] 823 if isinstance(val, bytes): 824 val = val.decode('utf-8') 825 except Exception: 826 import traceback 827 traceback.print_exc() 828 return None 829 830 doc = json.loads(val) 831 dt_val = doc.get(dt_col, None) 832 if dt_val is None: 833 return None 834 835 try: 836 return ( 837 int(dt_val) 838 if are_dtypes_equal(dt_typ, 'int') 839 else dateutil_parser.parse(str(dt_val)) 840 ) 841 except Exception as e: 842 warn(f"Failed to parse sync time for {pipe}:\n{e}") 843 844 return None
Return the newest (or oldest) timestamp in a pipe.
847def get_pipe_rowcount( 848 self, 849 pipe: mrsm.Pipe, 850 begin: Union[datetime, int, None] = None, 851 end: Union[datetime, int, None] = None, 852 params: Optional[Dict[str, Any]] = None, 853 debug: bool = False, 854 **kwargs: Any 855) -> Union[int, None]: 856 """ 857 Return the number of documents in the pipe's set. 858 """ 859 dt_col = pipe.columns.get('datetime', None) 860 table_name = self.quote_table(pipe.target) 861 862 if not pipe.exists(): 863 return 0 864 865 try: 866 if begin is None and end is None and not params: 867 return ( 868 self.client.zcard(table_name) 869 if dt_col 870 else self.client.scard(table_name) 871 ) 872 except Exception as e: 873 if debug: 874 dprint(f"Failed to get rowcount for {pipe}:\n{e}") 875 return None 876 877 df = pipe.get_data(begin=begin, end=end, params=params, debug=debug) 878 if df is None: 879 return 0 880 881 return len(df)
Return the number of documents in the pipe's set.
884def fetch_pipes_keys( 885 self, 886 connector_keys: Optional[List[str]] = None, 887 metric_keys: Optional[List[str]] = None, 888 location_keys: Optional[List[str]] = None, 889 tags: Optional[List[str]] = None, 890 params: Optional[Dict[str, Any]] = None, 891 debug: bool = False 892) -> List[ 893 Tuple[str, str, Union[str, None], Dict[str, Any]] 894 ]: 895 """ 896 Return the keys for the registered pipes. 897 """ 898 from meerschaum.utils.dataframe import query_df 899 from meerschaum.utils.misc import separate_negation_values 900 try: 901 df = self.read(PIPES_TABLE, debug=debug) 902 except Exception: 903 return [] 904 905 if df is None or len(df) == 0: 906 return [] 907 908 query = {} 909 if connector_keys: 910 query['connector_keys'] = [str(k) for k in connector_keys] 911 if metric_keys: 912 query['metric_key'] = [str(k) for k in metric_keys] 913 if location_keys: 914 query['location_key'] = [str(k) for k in location_keys] 915 if params: 916 query.update(params) 917 918 df = query_df(df, query, inplace=True) 919 920 keys = [ 921 ( 922 doc['connector_keys'], 923 doc['metric_key'], 924 doc['location_key'], 925 doc.get('parameters', {}) 926 ) 927 for doc in df.to_dict(orient='records') 928 ] 929 if not tags: 930 return keys 931 932 tag_groups = [tag.split(',') for tag in tags] 933 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 934 935 filtered_keys = [] 936 for ck, mk, lk, parameters in keys: 937 pipe_tags = set(parameters.get('tags', [])) 938 939 include_pipe = True 940 for in_tags, ex_tags in in_ex_tag_groups: 941 all_in = all(tag in pipe_tags for tag in in_tags) 942 any_ex = any(tag in pipe_tags for tag in ex_tags) 943 944 if (not all_in) or any_ex: 945 include_pipe = False 946 continue 947 948 if include_pipe: 949 filtered_keys.append((ck, mk, lk, parameters)) 950 951 return filtered_keys
Return the keys for the registered pipes.
59@staticmethod 60def get_document_key( 61 doc: Dict[str, Any], 62 indices: List[str], 63 table_name: Optional[str] = None, 64) -> str: 65 """ 66 Return a serialized string for a document's indices only. 67 68 Parameters 69 ---------- 70 doc: Dict[str, Any] 71 The document containing index values to be serialized. 72 73 indices: List[str] 74 The name of the indices to be serialized. 75 76 table_name: Optional[str], default None 77 If provided, prepend the table to the key. 78 79 Returns 80 ------- 81 A serialized string of the document's indices. 82 """ 83 from meerschaum.utils.dtypes import coerce_timezone 84 index_vals = { 85 key: ( 86 str(val).replace(':', COLON) 87 if not isinstance(val, datetime) 88 else str(int(coerce_timezone(val).replace(tzinfo=timezone.utc).timestamp())) 89 ) 90 for key, val in doc.items() 91 if ((key in indices) if indices else True) 92 } 93 indices_str = ( 94 ( 95 ( 96 ( 97 table_name 98 + ':' 99 + ('indices:' if True else '') 100 ) 101 ) 102 if table_name 103 else '' 104 ) + ','.join( 105 sorted( 106 [ 107 f'{key}{COLON}{val}' 108 for key, val in index_vals.items() 109 ] 110 ) 111 ) 112 ) 113 return indices_str
Return a serialized string for a document's indices only.
Parameters
- doc (Dict[str, Any]): The document containing index values to be serialized.
- indices (List[str]): The name of the indices to be serialized.
- table_name (Optional[str], default None): If provided, prepend the table to the key.
Returns
- A serialized string of the document's indices.
116@classmethod 117def get_table_quoted_doc_key( 118 cls, 119 table_name: str, 120 doc: Dict[str, Any], 121 indices: List[str], 122 datetime_column: Optional[str] = None, 123) -> str: 124 """ 125 Return the document string as stored in the underling set. 126 """ 127 return json.dumps( 128 { 129 cls.get_document_key(doc, indices, table_name): serialize_document(doc), 130 **( 131 {datetime_column: doc.get(datetime_column, 0)} 132 if datetime_column 133 else {} 134 ) 135 }, 136 sort_keys=True, 137 separators=(',', ':'), 138 default=json_serialize_value, 139 )
Return the document string as stored in the underling set.
17def fetch( 18 self, 19 pipe: mrsm.Pipe, 20 begin: Union[datetime, int, None] = None, 21 end: Union[datetime, int, None] = None, 22 params: Optional[Dict[str, Any]] = None, 23 debug: bool = False, 24 **kwargs: Any 25) -> List[Dict[str, Any]]: 26 """ 27 Return data from a source database. 28 """ 29 source_key = pipe.parameters.get('valkey', {}).get('key', None) 30 if not source_key: 31 return [] 32 33 try: 34 key_type = self.client.type(source_key).decode('utf-8') 35 except Exception: 36 warn(f"Could not determine the type for key '{source_key}'.") 37 return [] 38 39 begin_ts = ( 40 ( 41 int(begin.replace(tzinfo=timezone.utc).timestamp()) 42 if isinstance(begin, datetime) 43 else int(begin) 44 ) 45 if begin is not None else '-inf' 46 ) 47 end_ts = ( 48 ( 49 int(end.replace(tzinfo=timezone.utc).timestamp()) 50 if isinstance(end, datetime) 51 else int(end) 52 ) 53 if end is not None else '+inf' 54 ) 55 56 if debug: 57 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 58 59 if key_type == 'set': 60 return [ 61 json.loads(doc_bytes.decode('utf-8')) 62 for doc_bytes in self.client.smembers(source_key) 63 ] 64 65 if key_type == 'zset': 66 return [ 67 json.loads(doc_bytes.decode('utf-8')) 68 for doc_bytes in self.client.zrangebyscore( 69 source_key, 70 begin_ts, 71 end_ts, 72 withscores=False, 73 ) 74 ] 75 76 return [{source_key: self.get(source_key)}]
Return data from a source database.
20def get_users_pipe(self): 21 """ 22 Return the pipe which stores the registered users. 23 """ 24 return mrsm.Pipe( 25 'mrsm', 'users', 26 columns=['user_id'], 27 temporary=True, 28 target=USERS_TABLE, 29 instance=self, 30 )
Return the pipe which stores the registered users.
33@classmethod 34def get_user_key(cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str: 35 """ 36 Return the key to store metadata about a user. 37 38 Parameters 39 ---------- 40 user_id_or_username: str 41 The user ID or username of the given user. 42 If `by_username` is `True`, then provide the username. 43 44 sub_key: str 45 The key suffix, e.g. `'attributes'`. 46 47 by_username: bool, default False 48 If `True`, then treat `user_id_or_username` as a username. 49 50 Returns 51 ------- 52 A key to store information about a user. 53 54 Examples 55 -------- 56 >>> get_user_key('deadbeef', 'attributes') 57 'mrsm_user:user_id:deadbeef:attributes' 58 >>> get_user_key('foo', 'user_id', by_username=True) 59 'mrsm_user:username:foo:user_id' 60 """ 61 key_type = 'user_id' if not by_username else 'username' 62 return cls.get_entity_key(USER_PREFIX, key_type, user_id_or_username, sub_key)
Return the key to store metadata about a user.
Parameters
- user_id_or_username (str):
The user ID or username of the given user.
If
by_username
isTrue
, then provide the username. - sub_key (str):
The key suffix, e.g.
'attributes'
. - by_username (bool, default False):
If
True
, then treatuser_id_or_username
as a username.
Returns
- A key to store information about a user.
Examples
>>> get_user_key('deadbeef', 'attributes')
'mrsm_user:user_id:deadbeef:attributes'
>>> get_user_key('foo', 'user_id', by_username=True)
'mrsm_user:username:foo:user_id'
65@classmethod 66def get_user_keys_vals( 67 cls, 68 user: 'mrsm.core.User', 69 mutable_only: bool = False, 70) -> Dict[str, str]: 71 """ 72 Return a dictionary containing keys and values to set for the user. 73 74 Parameters 75 ---------- 76 user: mrsm.core.User 77 The user for which to generate the keys. 78 79 mutable_only: bool, default False 80 If `True`, only return keys which may be edited. 81 82 Returns 83 ------- 84 A dictionary mapping a user's keys to values. 85 """ 86 user_attributes_str = json.dumps(user.attributes, separators=(',', ':')) 87 mutable_keys_vals = { 88 cls.get_user_key(user.user_id, 'attributes'): user_attributes_str, 89 cls.get_user_key(user.user_id, 'email'): user.email, 90 cls.get_user_key(user.user_id, 'type'): user.type, 91 cls.get_user_key(user.user_id, 'password_hash'): user.password_hash, 92 } 93 if mutable_only: 94 return mutable_keys_vals 95 96 immutable_keys_vals = { 97 cls.get_user_key(user.user_id, 'username'): user.username, 98 cls.get_user_key(user.username, 'user_id', by_username=True): user.user_id, 99 } 100 101 return {**immutable_keys_vals, **mutable_keys_vals}
Return a dictionary containing keys and values to set for the user.
Parameters
- user (mrsm.core.User): The user for which to generate the keys.
- mutable_only (bool, default False):
If
True
, only return keys which may be edited.
Returns
- A dictionary mapping a user's keys to values.
104def register_user( 105 self, 106 user: 'mrsm.core.User', 107 debug: bool = False, 108 **kwargs: Any 109) -> SuccessTuple: 110 """ 111 Register a new user. 112 """ 113 from meerschaum.utils.misc import generate_password 114 115 user.user_id = generate_password(12) 116 users_pipe = self.get_users_pipe() 117 keys_vals = self.get_user_keys_vals(user) 118 119 try: 120 sync_success, sync_msg = users_pipe.sync( 121 [ 122 { 123 'user_id': user.user_id, 124 'username': user.username, 125 }, 126 ], 127 check_existing=False, 128 debug=debug, 129 ) 130 if not sync_success: 131 return sync_success, sync_msg 132 133 for key, val in keys_vals.items(): 134 if val is not None: 135 self.set(key, val) 136 137 success, msg = True, "Success" 138 except Exception as e: 139 success = False 140 import traceback 141 traceback.print_exc() 142 msg = f"Failed to register '{user.username}':\n{e}" 143 144 if not success: 145 for key in keys_vals: 146 try: 147 self.client.delete(key) 148 except Exception: 149 pass 150 151 return success, msg
Register a new user.
154def get_user_id(self, user: 'mrsm.core.User', debug: bool = False) -> Union[str, None]: 155 """ 156 Return the ID for a user, or `None`. 157 """ 158 username_user_id_key = self.get_user_key(user.username, 'user_id', by_username=True) 159 try: 160 user_id = self.get(username_user_id_key) 161 except Exception: 162 user_id = None 163 return user_id
Return the ID for a user, or None
.
166def edit_user( 167 self, 168 user: 'mrsm.core.User', 169 debug: bool = False, 170 **kw: Any 171) -> SuccessTuple: 172 """ 173 Edit the attributes for an existing user. 174 """ 175 keys_vals = self.get_user_keys_vals(user, mutable_only=True) 176 try: 177 old_keys_vals = { 178 key: self.get(key) 179 for key in keys_vals 180 } 181 except Exception as e: 182 return False, f"Failed to edit user:\n{e}" 183 184 try: 185 for key, val in keys_vals.items(): 186 self.set(key, val) 187 success, msg = True, "Success" 188 except Exception as e: 189 success = False 190 msg = f"Failed to edit user:\n{e}" 191 192 if not success: 193 try: 194 for key, old_val in old_keys_vals.items(): 195 self.set(key, old_val) 196 except Exception: 197 pass 198 199 return success, msg
Edit the attributes for an existing user.
202def get_user_attributes( 203 self, 204 user: 'mrsm.core.User', 205 debug: bool = False 206) -> Union[Dict[str, Any], None]: 207 """ 208 Return the user's attributes. 209 """ 210 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 211 user_id_attributes_key = self.get_user_key(user_id, 'attributes') 212 try: 213 return json.loads(self.get(user_id_attributes_key)) 214 except Exception: 215 return None
Return the user's attributes.
218def delete_user( 219 self, 220 user: 'mrsm.core.User', 221 debug: bool = False 222) -> SuccessTuple: 223 """ 224 Delete a user's keys. 225 """ 226 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 227 users_pipe = self.get_users_pipe() 228 keys_vals = self.get_user_keys_vals(user) 229 try: 230 old_keys_vals = { 231 key: self.get(key) 232 for key in keys_vals 233 } 234 except Exception as e: 235 return False, f"Failed to delete user:\n{e}" 236 237 clear_success, clear_msg = users_pipe.clear(params={'user_id': user_id}) 238 if not clear_success: 239 return clear_success, clear_msg 240 241 try: 242 for key in keys_vals: 243 self.client.delete(key) 244 success, msg = True, "Success" 245 except Exception as e: 246 success = False 247 msg = f"Failed to delete user:\n{e}" 248 249 if not success: 250 try: 251 for key, old_val in old_keys_vals.items(): 252 self.set(key, old_val) 253 except Exception: 254 pass 255 256 return success, msg
Delete a user's keys.
259def get_users( 260 self, 261 debug: bool = False, 262 **kw: Any 263) -> List[str]: 264 """ 265 Get the registered usernames. 266 """ 267 users_pipe = self.get_users_pipe() 268 df = users_pipe.get_data() 269 if df is None: 270 return [] 271 272 return list(df['username'])
Get the registered usernames.
275def get_user_password_hash( 276 self, 277 user: 'mrsm.core.User', 278 debug: bool = False, 279 **kw: Any 280) -> Union[str, None]: 281 """ 282 Return the password has for a user. 283 """ 284 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 285 user_id_password_hash_key = self.get_user_key(user_id, 'password_hash') 286 try: 287 return self.get(user_id_password_hash_key) 288 except Exception: 289 return None
Return the password has for a user.
292def get_user_type( 293 self, 294 user: 'mrsm.core.User', 295 debug: bool = False, 296 **kw: Any 297) -> Union[str, None]: 298 """ 299 Return the user's type. 300 """ 301 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 302 user_id_type_key = self.get_user_key(user_id, 'type') 303 try: 304 return self.get(user_id_type_key) 305 except Exception: 306 return None
Return the user's type.
20def get_plugins_pipe(self) -> mrsm.Pipe: 21 """ 22 Return the pipe to store the plugins. 23 """ 24 return mrsm.Pipe( 25 'mrsm', 'plugins', 26 columns=['plugin_name'], 27 temporary=True, 28 target=PLUGINS_TABLE, 29 instance=self, 30 )
Return the pipe to store the plugins.
33@classmethod 34def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str: 35 """ 36 Return the key for a plugin's attribute. 37 """ 38 return cls.get_entity_key(PLUGIN_PREFIX, plugin_name, sub_key)
Return the key for a plugin's attribute.
41@classmethod 42def get_plugin_keys_vals( 43 cls, 44 plugin: 'mrsm.core.Plugin', 45 mutable_only: bool = False, 46) -> Dict[str, str]: 47 """ 48 Return a dictionary containing keys and values to set for the plugin. 49 50 Parameters 51 ---------- 52 plugin: mrsm.core.Plugin 53 The plugin for which to generate the keys. 54 55 mutable_only: bool, default False 56 If `True`, only return keys which may be edited. 57 58 Returns 59 ------- 60 A dictionary mapping a plugins's keys to values. 61 """ 62 plugin_attributes_str = json.dumps(plugin.attributes, separators=(',', ':')) 63 mutable_keys_vals = { 64 cls.get_plugin_key(plugin.name, 'attributes'): plugin_attributes_str, 65 cls.get_plugin_key(plugin.name, 'version'): plugin.version, 66 } 67 if mutable_only: 68 return mutable_keys_vals 69 70 immutable_keys_vals = { 71 cls.get_plugin_key(plugin.name, 'user_id'): plugin.user_id, 72 } 73 74 return {**immutable_keys_vals, **mutable_keys_vals}
Return a dictionary containing keys and values to set for the plugin.
Parameters
- plugin (mrsm.core.Plugin): The plugin for which to generate the keys.
- mutable_only (bool, default False):
If
True
, only return keys which may be edited.
Returns
- A dictionary mapping a plugins's keys to values.
77def register_plugin( 78 self, 79 plugin: 'mrsm.core.Plugin', 80 force: bool = False, 81 debug: bool = False, 82 **kw: Any 83) -> SuccessTuple: 84 """Register a new plugin to the `mrsm_plugins` "table".""" 85 from meerschaum.utils.misc import generate_password 86 87 plugins_pipe = self.get_plugins_pipe() 88 keys_vals = self.get_plugin_keys_vals(plugin) 89 90 try: 91 sync_success, sync_msg = plugins_pipe.sync( 92 [ 93 { 94 'plugin_name': plugin.name, 95 'user_id': plugin.user_id, 96 }, 97 ], 98 check_existing=False, 99 debug=debug, 100 ) 101 if not sync_success: 102 return sync_success, sync_msg 103 104 for key, val in keys_vals.items(): 105 if val is not None: 106 self.set(key, val) 107 108 success, msg = True, "Success" 109 except Exception as e: 110 success = False 111 msg = f"Failed to register plugin '{plugin.name}':\n{e}" 112 113 if not success: 114 for key in keys_vals: 115 try: 116 self.client.delete(key) 117 except Exception: 118 pass 119 120 return success, msg
Register a new plugin to the mrsm_plugins
"table".
123def get_plugin_id( 124 self, 125 plugin: 'mrsm.core.Plugin', 126 debug: bool = False 127) -> Union[str, None]: 128 """ 129 Return a plugin's ID. 130 """ 131 user_id = self.get_plugin_user_id(plugin, debug=debug) 132 return plugin.name if user_id is not None else None
Return a plugin's ID.
135def get_plugin_version( 136 self, 137 plugin: 'mrsm.core.Plugin', 138 debug: bool = False, 139) -> Union[str, None]: 140 """ 141 Return a plugin's version. 142 """ 143 version_key = self.get_plugin_key(plugin.name, 'version') 144 145 try: 146 return self.get(version_key) 147 except Exception: 148 return None
Return a plugin's version.
151def get_plugin_user_id( 152 self, 153 plugin: 'mrsm.core.Plugin', 154 debug: bool = False 155) -> Union[str, None]: 156 """ 157 Return a plugin's user ID. 158 """ 159 user_id_key = self.get_plugin_key(plugin.name, 'user_id') 160 161 try: 162 return self.get(user_id_key) 163 except Exception: 164 return None
Return a plugin's user ID.
167def get_plugin_username( 168 self, 169 plugin: 'mrsm.core.Plugin', 170 debug: bool = False 171) -> Union[str]: 172 """ 173 Return the username of a plugin's owner. 174 """ 175 user_id = self.get_plugin_user_id(plugin, debug=debug) 176 if user_id is None: 177 return None 178 179 username_key = self.get_user_key(user_id, 'username') 180 try: 181 return self.get(username_key) 182 except Exception: 183 return None
Return the username of a plugin's owner.
186def get_plugin_attributes( 187 self, 188 plugin: 'mrsm.core.Plugin', 189 debug: bool = False 190) -> Dict[str, Any]: 191 """ 192 Return the attributes of a plugin. 193 """ 194 attributes_key = self.get_plugin_key(plugin.name, 'attributes') 195 try: 196 attributes_str = self.get(attributes_key) 197 if not attributes_str: 198 return {} 199 return json.loads(attributes_str) 200 except Exception: 201 return {}
Return the attributes of a plugin.
204def delete_plugin( 205 self, 206 plugin: 'mrsm.core.Plugin', 207 debug: bool = False, 208 **kw: Any 209) -> SuccessTuple: 210 """ 211 Delete a plugin from the plugins table. 212 """ 213 plugins_pipe = self.get_plugins_pipe() 214 clear_success, clear_msg = plugins_pipe.clear(params={'plugin_name': plugin.name}, debug=debug) 215 if not clear_success: 216 return clear_success, clear_msg 217 218 keys_vals = self.get_plugin_keys_vals(plugin) 219 try: 220 old_keys_vals = { 221 key: self.get(key) 222 for key in keys_vals 223 } 224 except Exception as e: 225 return False, f"Failed to delete plugin '{plugin.name}':\n{e}" 226 227 try: 228 for key in keys_vals: 229 self.client.delete(key) 230 success, msg = True, "Success" 231 except Exception as e: 232 success = False 233 msg = f"Failed to delete plugin '{plugin.name}':\n{e}" 234 235 if not success: 236 try: 237 for key, old_val in old_keys_vals.items(): 238 self.set(key, old_val) 239 except Exception: 240 pass 241 242 return success, msg
Delete a plugin from the plugins table.