meerschaum.connectors.valkey
Import the ValkeyConnector
.
19@make_connector 20class ValkeyConnector(Connector): 21 """ 22 Manage a Valkey instance. 23 24 Build a `ValkeyConnector` from connection attributes or a URI string. 25 """ 26 IS_INSTANCE: bool = True 27 REQUIRED_ATTRIBUTES: List[str] = ['host'] 28 OPTIONAL_ATTRIBUTES: List[str] = [ 29 'port', 'username', 'password', 'db', 'socket_timeout', 30 ] 31 DEFAULT_ATTRIBUTES: Dict[str, Any] = { 32 'username': 'default', 33 'port': 6379, 34 'db': 0, 35 'socket_timeout': 300, 36 } 37 KEY_SEPARATOR: str = ':' 38 39 from ._pipes import ( 40 register_pipe, 41 get_pipe_id, 42 get_pipe_attributes, 43 edit_pipe, 44 pipe_exists, 45 drop_pipe, 46 delete_pipe, 47 get_pipe_data, 48 sync_pipe, 49 get_pipe_columns_types, 50 clear_pipe, 51 get_sync_time, 52 get_pipe_rowcount, 53 fetch_pipes_keys, 54 ) 55 from ._fetch import ( 56 fetch, 57 ) 58 59 from ._users import ( 60 get_users_pipe, 61 get_user_key, 62 get_user_keys_vals, 63 register_user, 64 get_user_id, 65 edit_user, 66 get_user_attributes, 67 delete_user, 68 get_users, 69 get_user_password_hash, 70 get_user_type, 71 ) 72 from ._plugins import ( 73 get_plugins_pipe, 74 get_plugin_key, 75 get_plugin_keys_vals, 76 register_plugin, 77 get_plugin_id, 78 get_plugin_version, 79 get_plugin_user_id, 80 get_plugin_username, 81 get_plugin_attributes, 82 get_plugins, 83 delete_plugin, 84 ) 85 86 @property 87 def client(self): 88 """ 89 Return the Valkey client. 90 """ 91 if '_client' in self.__dict__: 92 return self.__dict__['_client'] 93 94 valkey = mrsm.attempt_import('valkey') 95 96 if 'uri' in self.__dict__: 97 self._client = valkey.Valkey.from_url(self.__dict__.get('uri')) 98 return self._client 99 100 optional_kwargs = { 101 key: self.__dict__.get(key) 102 for key in self.OPTIONAL_ATTRIBUTES 103 if key in self.__dict__ 104 } 105 connection_kwargs = { 106 'host': self.host, 107 **optional_kwargs 108 } 109 110 self._client = valkey.Valkey(**connection_kwargs) 111 return self._client 112 113 @property 114 def URI(self) -> str: 115 """ 116 Return the connection URI for this connector. 117 """ 118 import urllib.parse 119 120 if 'uri' in self.__dict__: 121 return self.__dict__.get('uri') 122 123 uri = "valkey://" 124 if 'username' in self.__dict__: 125 uri += urllib.parse.quote_plus(self.username) + ':' 126 127 if 'password' in self.__dict__: 128 uri += urllib.parse.quote_plus(self.password) + '@' 129 130 if 'host' in self.__dict__: 131 uri += self.host 132 133 if 'port' in self.__dict__: 134 uri += f':{self.port}' 135 136 if 'db' in self.__dict__: 137 uri += f"/{self.db}" 138 139 if 'socket_timeout' in self.__dict__: 140 uri += f"?timeout={self.socket_timeout}s" 141 142 return uri 143 144 def set(self, key: str, value: Any, **kwargs: Any) -> None: 145 """ 146 Set the `key` to `value`. 147 """ 148 return self.client.set(key, value, **kwargs) 149 150 def get(self, key: str) -> Union[str, None]: 151 """ 152 Get the value for `key`. 153 """ 154 val = self.client.get(key) 155 if val is None: 156 return None 157 158 return val.decode('utf-8') 159 160 def test_connection(self) -> bool: 161 """ 162 Return whether a connection may be established. 163 """ 164 return self.client.ping() 165 166 @classmethod 167 def quote_table(cls, table: str) -> str: 168 """ 169 Return a quoted key. 170 """ 171 return shlex.quote(table) 172 173 @classmethod 174 def get_counter_key(cls, table: str) -> str: 175 """ 176 Return the counter key for a given table. 177 """ 178 table_name = cls.quote_table(table) 179 return f"{table_name}:counter" 180 181 def push_df( 182 self, 183 df: 'pd.DataFrame', 184 table: str, 185 datetime_column: Optional[str] = None, 186 debug: bool = False, 187 ) -> int: 188 """ 189 Append a pandas DataFrame to a table. 190 191 Parameters 192 ---------- 193 df: pd.DataFrame 194 The pandas DataFrame to append to the table. 195 196 table: str 197 The "table" name (root key). 198 199 datetime_column: Optional[str], default None 200 If provided, use this key as the datetime index. 201 202 Returns 203 ------- 204 The current index counter value (how many docs have been pushed). 205 """ 206 from meerschaum.utils.dataframe import to_json 207 docs_str = to_json(df) 208 docs = json.loads(docs_str) 209 return self.push_docs( 210 docs, 211 table, 212 datetime_column=datetime_column, 213 debug=debug, 214 ) 215 216 def push_docs( 217 self, 218 docs: List[Dict[str, Any]], 219 table: str, 220 datetime_column: Optional[str] = None, 221 debug: bool = False, 222 ) -> int: 223 """ 224 Append a list of documents to a table. 225 226 Parameters 227 ---------- 228 docs: List[Dict[str, Any]] 229 The docs to be pushed. 230 All keys and values will be coerced into strings. 231 232 table: str 233 The "table" name (root key). 234 235 datetime_column: Optional[str], default None 236 If set, create a sorted set with this datetime column as the index. 237 Otherwise push the docs to a list. 238 239 Returns 240 ------- 241 The current index counter value (how many docs have been pushed). 242 """ 243 from meerschaum.utils.dtypes import json_serialize_value 244 table_name = self.quote_table(table) 245 datetime_column_key = self.get_datetime_column_key(table) 246 remote_datetime_column = self.get(datetime_column_key) 247 datetime_column = datetime_column or remote_datetime_column 248 dateutil_parser = mrsm.attempt_import('dateutil.parser') 249 250 old_len = ( 251 self.client.zcard(table_name) 252 if datetime_column 253 else self.client.scard(table_name) 254 ) 255 for doc in docs: 256 original_dt_val = ( 257 doc[datetime_column] 258 if datetime_column and datetime_column in doc 259 else 0 260 ) 261 dt_val = ( 262 dateutil_parser.parse(str(original_dt_val)) 263 if not isinstance(original_dt_val, int) 264 else int(original_dt_val) 265 ) if datetime_column else None 266 ts = ( 267 int(dt_val.replace(tzinfo=timezone.utc).timestamp()) 268 if isinstance(dt_val, datetime) 269 else int(dt_val) 270 ) if datetime_column else None 271 doc_str = json.dumps( 272 doc, 273 default=json_serialize_value, 274 separators=(',', ':'), 275 sort_keys=True, 276 ) 277 if datetime_column: 278 self.client.zadd(table_name, {doc_str: ts}) 279 else: 280 self.client.sadd(table_name, doc_str) 281 282 if datetime_column: 283 self.set(datetime_column_key, datetime_column) 284 new_len = ( 285 self.client.zcard(table_name) 286 if datetime_column 287 else self.client.scard(table_name) 288 ) 289 290 return new_len - old_len 291 292 def _push_hash_docs_to_list(self, docs: List[Dict[str, Any]], table: str) -> int: 293 table_name = self.quote_table(table) 294 next_ix = max(self.client.llen(table_name) or 0, 1) 295 for i, doc in enumerate(docs): 296 doc_key = f"{table_name}:{next_ix + i}" 297 self.client.hset( 298 doc_key, 299 mapping={ 300 str(k): str(v) 301 for k, v in doc.items() 302 }, 303 ) 304 self.client.rpush(table_name, doc_key) 305 306 return next_ix + len(docs) 307 308 def get_datetime_column_key(self, table: str) -> str: 309 """ 310 Return the key to store the datetime index for `table`. 311 """ 312 table_name = self.quote_table(table) 313 return f'{table_name}:datetime_column' 314 315 def read( 316 self, 317 table: str, 318 begin: Union[datetime, int, str, None] = None, 319 end: Union[datetime, int, str, None] = None, 320 params: Optional[Dict[str, Any]] = None, 321 datetime_column: Optional[str] = None, 322 select_columns: Optional[List[str]] = None, 323 omit_columns: Optional[List[str]] = None, 324 debug: bool = False 325 ) -> Union['pd.DataFrame', None]: 326 """ 327 Query the table and return the result dataframe. 328 329 Parameters 330 ---------- 331 table: str 332 The "table" name to be queried. 333 334 begin: Union[datetime, int, str, None], default None 335 If provided, only return rows greater than or equal to this datetime. 336 337 end: Union[datetime, int, str, None], default None 338 If provided, only return rows older than this datetime. 339 340 params: Optional[Dict[str, Any]] 341 Additional Meerschaum filter parameters. 342 343 datetime_column: Optional[str], default None 344 If provided, use this column for the datetime index. 345 Otherwise infer from the table metadata. 346 347 select_columns: Optional[List[str]], default None 348 If provided, only return these columns. 349 350 omit_columns: Optional[List[str]], default None 351 If provided, do not include these columns in the result. 352 353 Returns 354 ------- 355 A Pandas DataFrame of the result, or `None`. 356 """ 357 from meerschaum.utils.dataframe import parse_df_datetimes, query_df 358 docs = self.read_docs( 359 table, 360 begin=begin, 361 end=end, 362 debug=debug, 363 ) 364 df = parse_df_datetimes(docs) 365 datetime_column_key = self.get_datetime_column_key(table) 366 datetime_column = datetime_column or self.get(datetime_column_key) 367 368 return query_df( 369 df, 370 begin=(begin if datetime_column is not None else None), 371 end=(end if datetime_column is not None else None), 372 params=params, 373 datetime_column=datetime_column, 374 select_columns=select_columns, 375 omit_columns=omit_columns, 376 inplace=True, 377 reset_index=True, 378 debug=debug, 379 ) 380 381 def read_docs( 382 self, 383 table: str, 384 begin: Union[datetime, int, str, None] = None, 385 end: Union[datetime, int, str, None] = None, 386 debug: bool = False, 387 ) -> List[Dict[str, str]]: 388 """ 389 Return a list of previously pushed docs. 390 391 Parameters 392 ---------- 393 table: str 394 The "table" name (root key) under which the docs were pushed. 395 396 begin: Union[datetime, int, str, None], default None 397 If provided and the table was created with a datetime index, only return documents 398 newer than this datetime. 399 If the table was not created with a datetime index and `begin` is an `int`, 400 return documents with a positional index greater than or equal to this value. 401 402 end: Union[datetime, int, str, None], default None 403 If provided and the table was created with a datetime index, only return documents 404 older than this datetime. 405 If the table was not created with a datetime index and `begin` is an `int`, 406 return documents with a positional index less than this value. 407 408 Returns 409 ------- 410 A list of dictionaries, where all keys and values are strings. 411 """ 412 from meerschaum.utils.dtypes import coerce_timezone 413 table_name = self.quote_table(table) 414 datetime_column_key = self.get_datetime_column_key(table) 415 datetime_column = self.get(datetime_column_key) 416 417 if debug: 418 dprint(f"Reading documents from '{table}' with {begin=}, {end=}") 419 420 if not datetime_column: 421 return [ 422 json.loads(doc_bytes.decode('utf-8')) 423 for doc_bytes in self.client.smembers(table_name) 424 ] 425 426 dateutil_parser = mrsm.attempt_import('dateutil.parser') 427 428 if isinstance(begin, str): 429 begin = coerce_timezone(dateutil_parser.parse(begin)) 430 431 if isinstance(end, str): 432 end = coerce_timezone(dateutil_parser.parse(end)) 433 434 begin_ts = ( 435 ( 436 int(begin.replace(tzinfo=timezone.utc).timestamp()) 437 if isinstance(begin, datetime) 438 else int(begin) 439 ) 440 if begin is not None else '-inf' 441 ) 442 end_ts = ( 443 ( 444 int(end.replace(tzinfo=timezone.utc).timestamp()) 445 if isinstance(end, datetime) 446 else int(end) 447 ) 448 if end is not None else '+inf' 449 ) 450 451 if debug: 452 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 453 454 return [ 455 json.loads(doc_bytes.decode('utf-8')) 456 for doc_bytes in self.client.zrangebyscore( 457 table_name, 458 begin_ts, 459 end_ts, 460 withscores=False, 461 ) 462 ] 463 464 def _read_docs_from_list( 465 self, 466 table: str, 467 begin_ix: Optional[int] = 0, 468 end_ix: Optional[int] = -1, 469 debug: bool = False, 470 ): 471 """ 472 Read a list of documents from a "table". 473 474 Parameters 475 ---------- 476 table: str 477 The "table" (root key) from which to read docs. 478 479 begin_ix: Optional[int], default 0 480 If provided, only read documents from this starting index. 481 482 end_ix: Optional[int], default -1 483 If provided, only read documents up to (not including) this index. 484 485 Returns 486 ------- 487 A list of documents. 488 """ 489 if begin_ix is None: 490 begin_ix = 0 491 492 if end_ix == 0: 493 return 494 495 if end_ix is None: 496 end_ix = -1 497 else: 498 end_ix -= 1 499 500 table_name = self.quote_table(table) 501 doc_keys = self.client.lrange(table_name, begin_ix, end_ix) 502 for doc_key in doc_keys: 503 yield { 504 key.decode('utf-8'): value.decode('utf-8') 505 for key, value in self.client.hgetall(doc_key).items() 506 } 507 508 def drop_table(self, table: str, debug: bool = False) -> None: 509 """ 510 Drop a "table" of documents. 511 512 Parameters 513 ---------- 514 table: str 515 The "table" name (root key) to be deleted. 516 """ 517 table_name = self.quote_table(table) 518 datetime_column_key = self.get_datetime_column_key(table) 519 self.client.delete(table_name) 520 self.client.delete(datetime_column_key) 521 522 @classmethod 523 def get_entity_key(cls, *keys: Any) -> str: 524 """ 525 Return a joined key to set an entity. 526 """ 527 if not keys: 528 raise ValueError("No keys to be joined.") 529 530 for key in keys: 531 if cls.KEY_SEPARATOR in str(key): 532 raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.") 533 534 return cls.KEY_SEPARATOR.join([str(key) for key in keys])
Manage a Valkey instance.
Build a ValkeyConnector
from connection attributes or a URI string.
86 @property 87 def client(self): 88 """ 89 Return the Valkey client. 90 """ 91 if '_client' in self.__dict__: 92 return self.__dict__['_client'] 93 94 valkey = mrsm.attempt_import('valkey') 95 96 if 'uri' in self.__dict__: 97 self._client = valkey.Valkey.from_url(self.__dict__.get('uri')) 98 return self._client 99 100 optional_kwargs = { 101 key: self.__dict__.get(key) 102 for key in self.OPTIONAL_ATTRIBUTES 103 if key in self.__dict__ 104 } 105 connection_kwargs = { 106 'host': self.host, 107 **optional_kwargs 108 } 109 110 self._client = valkey.Valkey(**connection_kwargs) 111 return self._client
Return the Valkey client.
113 @property 114 def URI(self) -> str: 115 """ 116 Return the connection URI for this connector. 117 """ 118 import urllib.parse 119 120 if 'uri' in self.__dict__: 121 return self.__dict__.get('uri') 122 123 uri = "valkey://" 124 if 'username' in self.__dict__: 125 uri += urllib.parse.quote_plus(self.username) + ':' 126 127 if 'password' in self.__dict__: 128 uri += urllib.parse.quote_plus(self.password) + '@' 129 130 if 'host' in self.__dict__: 131 uri += self.host 132 133 if 'port' in self.__dict__: 134 uri += f':{self.port}' 135 136 if 'db' in self.__dict__: 137 uri += f"/{self.db}" 138 139 if 'socket_timeout' in self.__dict__: 140 uri += f"?timeout={self.socket_timeout}s" 141 142 return uri
Return the connection URI for this connector.
144 def set(self, key: str, value: Any, **kwargs: Any) -> None: 145 """ 146 Set the `key` to `value`. 147 """ 148 return self.client.set(key, value, **kwargs)
Set the key
to value
.
150 def get(self, key: str) -> Union[str, None]: 151 """ 152 Get the value for `key`. 153 """ 154 val = self.client.get(key) 155 if val is None: 156 return None 157 158 return val.decode('utf-8')
Get the value for key
.
160 def test_connection(self) -> bool: 161 """ 162 Return whether a connection may be established. 163 """ 164 return self.client.ping()
Return whether a connection may be established.
166 @classmethod 167 def quote_table(cls, table: str) -> str: 168 """ 169 Return a quoted key. 170 """ 171 return shlex.quote(table)
Return a quoted key.
173 @classmethod 174 def get_counter_key(cls, table: str) -> str: 175 """ 176 Return the counter key for a given table. 177 """ 178 table_name = cls.quote_table(table) 179 return f"{table_name}:counter"
Return the counter key for a given table.
181 def push_df( 182 self, 183 df: 'pd.DataFrame', 184 table: str, 185 datetime_column: Optional[str] = None, 186 debug: bool = False, 187 ) -> int: 188 """ 189 Append a pandas DataFrame to a table. 190 191 Parameters 192 ---------- 193 df: pd.DataFrame 194 The pandas DataFrame to append to the table. 195 196 table: str 197 The "table" name (root key). 198 199 datetime_column: Optional[str], default None 200 If provided, use this key as the datetime index. 201 202 Returns 203 ------- 204 The current index counter value (how many docs have been pushed). 205 """ 206 from meerschaum.utils.dataframe import to_json 207 docs_str = to_json(df) 208 docs = json.loads(docs_str) 209 return self.push_docs( 210 docs, 211 table, 212 datetime_column=datetime_column, 213 debug=debug, 214 )
Append a pandas DataFrame to a table.
Parameters
- df (pd.DataFrame): The pandas DataFrame to append to the table.
- table (str): The "table" name (root key).
- datetime_column (Optional[str], default None): If provided, use this key as the datetime index.
Returns
- The current index counter value (how many docs have been pushed).
216 def push_docs( 217 self, 218 docs: List[Dict[str, Any]], 219 table: str, 220 datetime_column: Optional[str] = None, 221 debug: bool = False, 222 ) -> int: 223 """ 224 Append a list of documents to a table. 225 226 Parameters 227 ---------- 228 docs: List[Dict[str, Any]] 229 The docs to be pushed. 230 All keys and values will be coerced into strings. 231 232 table: str 233 The "table" name (root key). 234 235 datetime_column: Optional[str], default None 236 If set, create a sorted set with this datetime column as the index. 237 Otherwise push the docs to a list. 238 239 Returns 240 ------- 241 The current index counter value (how many docs have been pushed). 242 """ 243 from meerschaum.utils.dtypes import json_serialize_value 244 table_name = self.quote_table(table) 245 datetime_column_key = self.get_datetime_column_key(table) 246 remote_datetime_column = self.get(datetime_column_key) 247 datetime_column = datetime_column or remote_datetime_column 248 dateutil_parser = mrsm.attempt_import('dateutil.parser') 249 250 old_len = ( 251 self.client.zcard(table_name) 252 if datetime_column 253 else self.client.scard(table_name) 254 ) 255 for doc in docs: 256 original_dt_val = ( 257 doc[datetime_column] 258 if datetime_column and datetime_column in doc 259 else 0 260 ) 261 dt_val = ( 262 dateutil_parser.parse(str(original_dt_val)) 263 if not isinstance(original_dt_val, int) 264 else int(original_dt_val) 265 ) if datetime_column else None 266 ts = ( 267 int(dt_val.replace(tzinfo=timezone.utc).timestamp()) 268 if isinstance(dt_val, datetime) 269 else int(dt_val) 270 ) if datetime_column else None 271 doc_str = json.dumps( 272 doc, 273 default=json_serialize_value, 274 separators=(',', ':'), 275 sort_keys=True, 276 ) 277 if datetime_column: 278 self.client.zadd(table_name, {doc_str: ts}) 279 else: 280 self.client.sadd(table_name, doc_str) 281 282 if datetime_column: 283 self.set(datetime_column_key, datetime_column) 284 new_len = ( 285 self.client.zcard(table_name) 286 if datetime_column 287 else self.client.scard(table_name) 288 ) 289 290 return new_len - old_len
Append a list of documents to a table.
Parameters
- docs (List[Dict[str, Any]]): The docs to be pushed. All keys and values will be coerced into strings.
- table (str): The "table" name (root key).
- datetime_column (Optional[str], default None): If set, create a sorted set with this datetime column as the index. Otherwise push the docs to a list.
Returns
- The current index counter value (how many docs have been pushed).
308 def get_datetime_column_key(self, table: str) -> str: 309 """ 310 Return the key to store the datetime index for `table`. 311 """ 312 table_name = self.quote_table(table) 313 return f'{table_name}:datetime_column'
Return the key to store the datetime index for table
.
315 def read( 316 self, 317 table: str, 318 begin: Union[datetime, int, str, None] = None, 319 end: Union[datetime, int, str, None] = None, 320 params: Optional[Dict[str, Any]] = None, 321 datetime_column: Optional[str] = None, 322 select_columns: Optional[List[str]] = None, 323 omit_columns: Optional[List[str]] = None, 324 debug: bool = False 325 ) -> Union['pd.DataFrame', None]: 326 """ 327 Query the table and return the result dataframe. 328 329 Parameters 330 ---------- 331 table: str 332 The "table" name to be queried. 333 334 begin: Union[datetime, int, str, None], default None 335 If provided, only return rows greater than or equal to this datetime. 336 337 end: Union[datetime, int, str, None], default None 338 If provided, only return rows older than this datetime. 339 340 params: Optional[Dict[str, Any]] 341 Additional Meerschaum filter parameters. 342 343 datetime_column: Optional[str], default None 344 If provided, use this column for the datetime index. 345 Otherwise infer from the table metadata. 346 347 select_columns: Optional[List[str]], default None 348 If provided, only return these columns. 349 350 omit_columns: Optional[List[str]], default None 351 If provided, do not include these columns in the result. 352 353 Returns 354 ------- 355 A Pandas DataFrame of the result, or `None`. 356 """ 357 from meerschaum.utils.dataframe import parse_df_datetimes, query_df 358 docs = self.read_docs( 359 table, 360 begin=begin, 361 end=end, 362 debug=debug, 363 ) 364 df = parse_df_datetimes(docs) 365 datetime_column_key = self.get_datetime_column_key(table) 366 datetime_column = datetime_column or self.get(datetime_column_key) 367 368 return query_df( 369 df, 370 begin=(begin if datetime_column is not None else None), 371 end=(end if datetime_column is not None else None), 372 params=params, 373 datetime_column=datetime_column, 374 select_columns=select_columns, 375 omit_columns=omit_columns, 376 inplace=True, 377 reset_index=True, 378 debug=debug, 379 )
Query the table and return the result dataframe.
Parameters
- table (str): The "table" name to be queried.
- begin (Union[datetime, int, str, None], default None): If provided, only return rows greater than or equal to this datetime.
- end (Union[datetime, int, str, None], default None): If provided, only return rows older than this datetime.
- params (Optional[Dict[str, Any]]): Additional Meerschaum filter parameters.
- datetime_column (Optional[str], default None): If provided, use this column for the datetime index. Otherwise infer from the table metadata.
- select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
Returns
- A Pandas DataFrame of the result, or
None
.
381 def read_docs( 382 self, 383 table: str, 384 begin: Union[datetime, int, str, None] = None, 385 end: Union[datetime, int, str, None] = None, 386 debug: bool = False, 387 ) -> List[Dict[str, str]]: 388 """ 389 Return a list of previously pushed docs. 390 391 Parameters 392 ---------- 393 table: str 394 The "table" name (root key) under which the docs were pushed. 395 396 begin: Union[datetime, int, str, None], default None 397 If provided and the table was created with a datetime index, only return documents 398 newer than this datetime. 399 If the table was not created with a datetime index and `begin` is an `int`, 400 return documents with a positional index greater than or equal to this value. 401 402 end: Union[datetime, int, str, None], default None 403 If provided and the table was created with a datetime index, only return documents 404 older than this datetime. 405 If the table was not created with a datetime index and `begin` is an `int`, 406 return documents with a positional index less than this value. 407 408 Returns 409 ------- 410 A list of dictionaries, where all keys and values are strings. 411 """ 412 from meerschaum.utils.dtypes import coerce_timezone 413 table_name = self.quote_table(table) 414 datetime_column_key = self.get_datetime_column_key(table) 415 datetime_column = self.get(datetime_column_key) 416 417 if debug: 418 dprint(f"Reading documents from '{table}' with {begin=}, {end=}") 419 420 if not datetime_column: 421 return [ 422 json.loads(doc_bytes.decode('utf-8')) 423 for doc_bytes in self.client.smembers(table_name) 424 ] 425 426 dateutil_parser = mrsm.attempt_import('dateutil.parser') 427 428 if isinstance(begin, str): 429 begin = coerce_timezone(dateutil_parser.parse(begin)) 430 431 if isinstance(end, str): 432 end = coerce_timezone(dateutil_parser.parse(end)) 433 434 begin_ts = ( 435 ( 436 int(begin.replace(tzinfo=timezone.utc).timestamp()) 437 if isinstance(begin, datetime) 438 else int(begin) 439 ) 440 if begin is not None else '-inf' 441 ) 442 end_ts = ( 443 ( 444 int(end.replace(tzinfo=timezone.utc).timestamp()) 445 if isinstance(end, datetime) 446 else int(end) 447 ) 448 if end is not None else '+inf' 449 ) 450 451 if debug: 452 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 453 454 return [ 455 json.loads(doc_bytes.decode('utf-8')) 456 for doc_bytes in self.client.zrangebyscore( 457 table_name, 458 begin_ts, 459 end_ts, 460 withscores=False, 461 ) 462 ]
Return a list of previously pushed docs.
Parameters
- table (str): The "table" name (root key) under which the docs were pushed.
- begin (Union[datetime, int, str, None], default None):
If provided and the table was created with a datetime index, only return documents
newer than this datetime.
If the table was not created with a datetime index and
begin
is anint
, return documents with a positional index greater than or equal to this value. - end (Union[datetime, int, str, None], default None):
If provided and the table was created with a datetime index, only return documents
older than this datetime.
If the table was not created with a datetime index and
begin
is anint
, return documents with a positional index less than this value.
Returns
- A list of dictionaries, where all keys and values are strings.
508 def drop_table(self, table: str, debug: bool = False) -> None: 509 """ 510 Drop a "table" of documents. 511 512 Parameters 513 ---------- 514 table: str 515 The "table" name (root key) to be deleted. 516 """ 517 table_name = self.quote_table(table) 518 datetime_column_key = self.get_datetime_column_key(table) 519 self.client.delete(table_name) 520 self.client.delete(datetime_column_key)
Drop a "table" of documents.
Parameters
- table (str): The "table" name (root key) to be deleted.
522 @classmethod 523 def get_entity_key(cls, *keys: Any) -> str: 524 """ 525 Return a joined key to set an entity. 526 """ 527 if not keys: 528 raise ValueError("No keys to be joined.") 529 530 for key in keys: 531 if cls.KEY_SEPARATOR in str(key): 532 raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.") 533 534 return cls.KEY_SEPARATOR.join([str(key) for key in keys])
Return a joined key to set an entity.
137def register_pipe( 138 self, 139 pipe: mrsm.Pipe, 140 debug: bool = False, 141 **kwargs: Any 142) -> SuccessTuple: 143 """ 144 Insert the pipe's attributes into the internal `pipes` table. 145 146 Parameters 147 ---------- 148 pipe: mrsm.Pipe 149 The pipe to be registered. 150 151 Returns 152 ------- 153 A `SuccessTuple` of the result. 154 """ 155 attributes = { 156 'connector_keys': str(pipe.connector_keys), 157 'metric_key': str(pipe.metric_key), 158 'location_key': str(pipe.location_key), 159 } 160 parameters_str = json.dumps( 161 pipe._attributes.get('parameters', {}), 162 separators=(',', ':'), 163 ) 164 165 pipe_key = get_pipe_key(pipe) 166 parameters_key = get_pipe_parameters_key(pipe) 167 168 try: 169 existing_pipe_id = self.get(pipe_key) 170 if existing_pipe_id is not None: 171 return False, f"{pipe} is already registered." 172 173 pipe_id = self.client.incr(PIPES_COUNTER) 174 _ = self.push_docs( 175 [{'pipe_id': pipe_id, **attributes}], 176 PIPES_TABLE, 177 datetime_column='pipe_id', 178 debug=debug, 179 ) 180 self.set(pipe_key, pipe_id) 181 self.set(parameters_key, parameters_str) 182 183 except Exception as e: 184 return False, f"Failed to register {pipe}:\n{e}" 185 186 return True, "Success"
Insert the pipe's attributes into the internal pipes
table.
Parameters
- pipe (mrsm.Pipe): The pipe to be registered.
Returns
- A
SuccessTuple
of the result.
189def get_pipe_id( 190 self, 191 pipe: mrsm.Pipe, 192 debug: bool = False, 193 **kwargs: Any 194) -> Union[str, int, None]: 195 """ 196 Return the `_id` for the pipe if it exists. 197 198 Parameters 199 ---------- 200 pipe: mrsm.Pipe 201 The pipe whose `_id` to fetch. 202 203 Returns 204 ------- 205 The `_id` for the pipe's document or `None`. 206 """ 207 pipe_key = get_pipe_key(pipe) 208 try: 209 return int(self.get(pipe_key)) 210 except Exception: 211 pass 212 return None
Return the _id
for the pipe if it exists.
Parameters
- pipe (mrsm.Pipe):
The pipe whose
_id
to fetch.
Returns
- The
_id
for the pipe's document orNone
.
215def get_pipe_attributes( 216 self, 217 pipe: mrsm.Pipe, 218 debug: bool = False, 219 **kwargs: Any 220) -> Dict[str, Any]: 221 """ 222 Return the pipe's document from the internal `pipes` collection. 223 224 Parameters 225 ---------- 226 pipe: mrsm.Pipe 227 The pipe whose attributes should be retrieved. 228 229 Returns 230 ------- 231 The document that matches the keys of the pipe. 232 """ 233 pipe_id = pipe.get_id(debug=debug) 234 if pipe_id is None: 235 return {} 236 237 parameters_key = get_pipe_parameters_key(pipe) 238 parameters_str = self.get(parameters_key) 239 240 parameters = json.loads(parameters_str) if parameters_str else {} 241 242 attributes = { 243 'connector_keys': pipe.connector_keys, 244 'metric_key': pipe.metric_key, 245 'location_key': pipe.location_key, 246 'parameters': parameters, 247 'pipe_id': pipe_id, 248 } 249 return attributes
Return the pipe's document from the internal pipes
collection.
Parameters
- pipe (mrsm.Pipe): The pipe whose attributes should be retrieved.
Returns
- The document that matches the keys of the pipe.
252def edit_pipe( 253 self, 254 pipe: mrsm.Pipe, 255 debug: bool = False, 256 **kwargs: Any 257) -> mrsm.SuccessTuple: 258 """ 259 Edit the attributes of the pipe. 260 261 Parameters 262 ---------- 263 pipe: mrsm.Pipe 264 The pipe whose in-memory parameters must be persisted. 265 266 Returns 267 ------- 268 A `SuccessTuple` indicating success. 269 """ 270 pipe_id = pipe.get_id(debug=debug) 271 if pipe_id is None: 272 return False, f"{pipe} is not registered." 273 274 parameters_key = get_pipe_parameters_key(pipe) 275 parameters_str = json.dumps(pipe.parameters, separators=(',', ':')) 276 self.set(parameters_key, parameters_str) 277 return True, "Success"
Edit the attributes of the pipe.
Parameters
- pipe (mrsm.Pipe): The pipe whose in-memory parameters must be persisted.
Returns
- A
SuccessTuple
indicating success.
280def pipe_exists( 281 self, 282 pipe: mrsm.Pipe, 283 debug: bool = False, 284 **kwargs: Any 285) -> bool: 286 """ 287 Check whether a pipe's target table exists. 288 289 Parameters 290 ---------- 291 pipe: mrsm.Pipe 292 The pipe to check whether its table exists. 293 294 Returns 295 ------- 296 A `bool` indicating the table exists. 297 """ 298 table_name = self.quote_table(pipe.target) 299 return self.client.exists(table_name) != 0
Check whether a pipe's target table exists.
Parameters
- pipe (mrsm.Pipe): The pipe to check whether its table exists.
Returns
- A
bool
indicating the table exists.
302def drop_pipe( 303 self, 304 pipe: mrsm.Pipe, 305 debug: bool = False, 306 **kwargs: Any 307) -> mrsm.SuccessTuple: 308 """ 309 Drop a pipe's collection if it exists. 310 311 Parameters 312 ---------- 313 pipe: mrsm.Pipe 314 The pipe to be dropped. 315 316 Returns 317 ------- 318 A `SuccessTuple` indicating success. 319 """ 320 for chunk_begin, chunk_end in pipe.get_chunk_bounds(debug=debug): 321 clear_chunk_success, clear_chunk_msg = pipe.clear( 322 begin=chunk_begin, 323 end=chunk_end, 324 debug=debug, 325 ) 326 if not clear_chunk_success: 327 return clear_chunk_success, clear_chunk_msg 328 try: 329 self.drop_table(pipe.target, debug=debug) 330 except Exception as e: 331 return False, f"Failed to drop {pipe}:\n{e}" 332 333 if 'valkey' not in pipe.parameters: 334 return True, "Success" 335 336 pipe.parameters['valkey']['dtypes'] = {} 337 if not pipe.temporary: 338 edit_success, edit_msg = pipe.edit(debug=debug) 339 if not edit_success: 340 return edit_success, edit_msg 341 342 return True, "Success"
Drop a pipe's collection if it exists.
Parameters
- pipe (mrsm.Pipe): The pipe to be dropped.
Returns
- A
SuccessTuple
indicating success.
345def delete_pipe( 346 self, 347 pipe: mrsm.Pipe, 348 debug: bool = False, 349 **kwargs: Any 350) -> mrsm.SuccessTuple: 351 """ 352 Delete a pipe's registration from the `pipes` collection. 353 354 Parameters 355 ---------- 356 pipe: mrsm.Pipe 357 The pipe to be deleted. 358 359 Returns 360 ------- 361 A `SuccessTuple` indicating success. 362 """ 363 drop_success, drop_message = pipe.drop(debug=debug) 364 if not drop_success: 365 return drop_success, drop_message 366 367 pipe_id = self.get_pipe_id(pipe, debug=debug) 368 if pipe_id is None: 369 return False, f"{pipe} is not registered." 370 371 pipe_key = get_pipe_key(pipe) 372 parameters_key = get_pipe_parameters_key(pipe) 373 self.client.delete(pipe_key) 374 self.client.delete(parameters_key) 375 df = self.read(PIPES_TABLE, params={'pipe_id': pipe_id}) 376 docs = df.to_dict(orient='records') 377 if docs: 378 doc = docs[0] 379 doc_str = json.dumps( 380 doc, 381 default=(lambda x: json_serialize_datetime(x) if hasattr(x, 'tzinfo') else str(x)), 382 separators=(',', ':'), 383 sort_keys=True, 384 ) 385 self.client.zrem(PIPES_TABLE, doc_str) 386 return True, "Success"
Delete a pipe's registration from the pipes
collection.
Parameters
- pipe (mrsm.Pipe): The pipe to be deleted.
Returns
- A
SuccessTuple
indicating success.
389def get_pipe_data( 390 self, 391 pipe: mrsm.Pipe, 392 select_columns: Optional[List[str]] = None, 393 omit_columns: Optional[List[str]] = None, 394 begin: Union[datetime, int, None] = None, 395 end: Union[datetime, int, None] = None, 396 params: Optional[Dict[str, Any]] = None, 397 debug: bool = False, 398 **kwargs: Any 399) -> Union['pd.DataFrame', None]: 400 """ 401 Query a pipe's target table and return the DataFrame. 402 403 Parameters 404 ---------- 405 pipe: mrsm.Pipe 406 The pipe with the target table from which to read. 407 408 select_columns: Optional[List[str]], default None 409 If provided, only select these given columns. 410 Otherwise select all available columns (i.e. `SELECT *`). 411 412 omit_columns: Optional[List[str]], default None 413 If provided, remove these columns from the selection. 414 415 begin: Union[datetime, int, None], default None 416 The earliest `datetime` value to search from (inclusive). 417 418 end: Union[datetime, int, None], default None 419 The lastest `datetime` value to search from (exclusive). 420 421 params: Optional[Dict[str, str]], default None 422 Additional filters to apply to the query. 423 424 Returns 425 ------- 426 The target table's data as a DataFrame. 427 """ 428 if not pipe.exists(debug=debug): 429 return None 430 431 from meerschaum.utils.dataframe import query_df, parse_df_datetimes 432 from meerschaum.utils.dtypes import are_dtypes_equal 433 434 valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {}) 435 dt_col = pipe.columns.get('datetime', None) 436 table_name = self.quote_table(pipe.target) 437 indices = [col for col in pipe.columns.values() if col] 438 ix_docs = [ 439 string_to_dict(doc.get('ix', '').replace(COLON, ':')) 440 for doc in self.read_docs( 441 pipe.target, 442 begin=begin, 443 end=end, 444 debug=debug, 445 ) 446 ] 447 try: 448 docs_strings = [ 449 self.get(get_document_key( 450 doc, indices, table_name 451 )) 452 for doc in ix_docs 453 ] 454 except Exception as e: 455 warn(f"Failed to fetch documents for {pipe}:\n{e}") 456 docs_strings = [] 457 458 docs = [ 459 json.loads(doc_str) 460 for doc_str in docs_strings 461 if doc_str 462 ] 463 ignore_dt_cols = [ 464 col 465 for col, dtype in pipe.dtypes.items() 466 if not are_dtypes_equal(str(dtype), 'datetime') 467 ] 468 469 df = parse_df_datetimes( 470 docs, 471 ignore_cols=ignore_dt_cols, 472 chunksize=kwargs.get('chunksize', None), 473 strip_timezone=(pipe.tzinfo is None), 474 debug=debug, 475 ) 476 for col, typ in valkey_dtypes.items(): 477 try: 478 df[col] = df[col].astype(typ) 479 except Exception: 480 pass 481 482 df = pipe.enforce_dtypes(df, debug=debug) 483 484 if len(df) == 0: 485 return query_df(df, select_columns=select_columns, omit_columns=omit_columns) 486 487 return query_df( 488 df, 489 select_columns=select_columns, 490 omit_columns=omit_columns, 491 params=params, 492 begin=begin, 493 end=end, 494 datetime_column=dt_col, 495 inplace=True, 496 reset_index=True, 497 )
Query a pipe's target table and return the DataFrame.
Parameters
- pipe (mrsm.Pipe): The pipe with the target table from which to read.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, None], default None):
The earliest
datetime
value to search from (inclusive). - end (Union[datetime, int, None], default None):
The lastest
datetime
value to search from (exclusive). - params (Optional[Dict[str, str]], default None): Additional filters to apply to the query.
Returns
- The target table's data as a DataFrame.
500def sync_pipe( 501 self, 502 pipe: mrsm.Pipe, 503 df: 'pd.DataFrame' = None, 504 check_existing: bool = True, 505 debug: bool = False, 506 **kwargs: Any 507) -> mrsm.SuccessTuple: 508 """ 509 Upsert new documents into the pipe's collection. 510 511 Parameters 512 ---------- 513 pipe: mrsm.Pipe 514 The pipe whose collection should receive the new documents. 515 516 df: Union['pd.DataFrame', Iterator['pd.DataFrame']], default None 517 The data to be synced. 518 519 check_existing: bool, default True 520 If `False`, do not check the documents against existing data and instead insert directly. 521 522 Returns 523 ------- 524 A `SuccessTuple` indicating success. 525 """ 526 from meerschaum.utils.dtypes import are_dtypes_equal 527 dt_col = pipe.columns.get('datetime', None) 528 indices = [col for col in pipe.columns.values() if col] 529 table_name = self.quote_table(pipe.target) 530 is_dask = 'dask' in df.__module__ 531 if is_dask: 532 df = df.compute() 533 upsert = pipe.parameters.get('upsert', False) 534 static = pipe.parameters.get('static', False) 535 536 def _serialize_indices_docs(_docs): 537 return [ 538 { 539 'ix': get_document_key(doc, indices), 540 **( 541 { 542 dt_col: doc.get(dt_col, 0) 543 } 544 if dt_col 545 else {} 546 ) 547 } 548 for doc in _docs 549 ] 550 551 valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {}) 552 new_dtypes = { 553 str(key): ( 554 str(val) 555 if not are_dtypes_equal(str(val), 'datetime') 556 else 'datetime64[ns, UTC]' 557 ) 558 for key, val in df.dtypes.items() 559 if str(key) not in valkey_dtypes 560 } 561 for col, typ in {c: v for c, v in valkey_dtypes.items()}.items(): 562 if col in df.columns: 563 try: 564 df[col] = df[col].astype(typ) 565 except Exception: 566 valkey_dtypes[col] = 'string' 567 new_dtypes[col] = 'string' 568 df[col] = df[col].astype('string') 569 570 if new_dtypes and (not static or not valkey_dtypes): 571 valkey_dtypes.update(new_dtypes) 572 if 'valkey' not in pipe.parameters: 573 pipe.parameters['valkey'] = {} 574 pipe.parameters['valkey']['dtypes'] = valkey_dtypes 575 if not pipe.temporary: 576 edit_success, edit_msg = pipe.edit(debug=debug) 577 if not edit_success: 578 return edit_success, edit_msg 579 580 unseen_df, update_df, delta_df = ( 581 pipe.filter_existing(df, include_unchanged_columns=True, debug=debug) 582 if check_existing and not upsert 583 else (None, df, df) 584 ) 585 num_insert = len(unseen_df) if unseen_df is not None else 0 586 num_update = len(update_df) if update_df is not None else 0 587 msg = ( 588 f"Inserted {num_insert}, updated {num_update} rows." 589 if not upsert 590 else f"Upserted {num_update} rows." 591 ) 592 if len(delta_df) == 0: 593 return True, msg 594 595 unseen_docs = unseen_df.to_dict(orient='records') if unseen_df is not None else [] 596 unseen_indices_docs = _serialize_indices_docs(unseen_docs) 597 unseen_ix_vals = { 598 get_document_key(doc, indices, table_name): serialize_document(doc) 599 for doc in unseen_docs 600 } 601 for key, val in unseen_ix_vals.items(): 602 try: 603 self.set(key, val) 604 except Exception as e: 605 return False, f"Failed to set keys for {pipe}:\n{e}" 606 607 try: 608 self.push_docs( 609 unseen_indices_docs, 610 pipe.target, 611 datetime_column=dt_col, 612 debug=debug, 613 ) 614 except Exception as e: 615 return False, f"Failed to push docs to '{pipe.target}':\n{e}" 616 617 update_docs = update_df.to_dict(orient='records') if update_df is not None else [] 618 update_ix_docs = { 619 get_document_key(doc, indices, table_name): doc 620 for doc in update_docs 621 } 622 existing_docs_data = { 623 key: self.get(key) 624 for key in update_ix_docs 625 } if pipe.exists(debug=debug) else {} 626 existing_docs = { 627 key: json.loads(data) 628 for key, data in existing_docs_data.items() 629 if data 630 } 631 new_update_docs = { 632 key: doc 633 for key, doc in update_ix_docs.items() 634 if key not in existing_docs 635 } 636 new_ix_vals = { 637 get_document_key(doc, indices, table_name): serialize_document(doc) 638 for doc in new_update_docs.values() 639 } 640 for key, val in new_ix_vals.items(): 641 try: 642 self.set(key, val) 643 except Exception as e: 644 return False, f"Failed to set keys for {pipe}:\n{e}" 645 646 old_update_docs = { 647 key: { 648 **existing_docs[key], 649 **doc 650 } 651 for key, doc in update_ix_docs.items() 652 if key in existing_docs 653 } 654 new_indices_docs = _serialize_indices_docs([doc for doc in new_update_docs.values()]) 655 try: 656 if new_indices_docs: 657 self.push_docs( 658 new_indices_docs, 659 pipe.target, 660 datetime_column=dt_col, 661 debug=debug, 662 ) 663 except Exception as e: 664 return False, f"Failed to upsert '{pipe.target}':\n{e}" 665 666 for key, doc in old_update_docs.items(): 667 try: 668 self.set(key, serialize_document(doc)) 669 except Exception as e: 670 return False, f"Failed to set keys for {pipe}:\n{e}" 671 672 return True, msg
Upsert new documents into the pipe's collection.
Parameters
- pipe (mrsm.Pipe): The pipe whose collection should receive the new documents.
- df (Union['pd.DataFrame', Iterator['pd.DataFrame']], default None): The data to be synced.
- check_existing (bool, default True):
If
False
, do not check the documents against existing data and instead insert directly.
Returns
- A
SuccessTuple
indicating success.
675def get_pipe_columns_types( 676 self, 677 pipe: mrsm.Pipe, 678 debug: bool = False, 679 **kwargs: Any 680) -> Dict[str, str]: 681 """ 682 Return the data types for the columns in the target table for data type enforcement. 683 684 Parameters 685 ---------- 686 pipe: mrsm.Pipe 687 The pipe whose target table contains columns and data types. 688 689 Returns 690 ------- 691 A dictionary mapping columns to data types. 692 """ 693 if not pipe.exists(debug=debug): 694 return {} 695 696 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 697 return { 698 col: get_db_type_from_pd_type(typ, flavor='postgresql') 699 for col, typ in pipe.parameters.get('valkey', {}).get('dtypes', {}).items() 700 }
Return the data types for the columns in the target table for data type enforcement.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table contains columns and data types.
Returns
- A dictionary mapping columns to data types.
703def clear_pipe( 704 self, 705 pipe: mrsm.Pipe, 706 begin: Union[datetime, int, None] = None, 707 end: Union[datetime, int, None] = None, 708 params: Optional[Dict[str, Any]] = None, 709 debug: bool = False, 710) -> mrsm.SuccessTuple: 711 """ 712 Delete rows within `begin`, `end`, and `params`. 713 714 Parameters 715 ---------- 716 pipe: mrsm.Pipe 717 The pipe whose rows to clear. 718 719 begin: Union[datetime, int, None], default None 720 If provided, remove rows >= `begin`. 721 722 end: Union[datetime, int, None], default None 723 If provided, remove rows < `end`. 724 725 params: Optional[Dict[str, Any]], default None 726 If provided, only remove rows which match the `params` filter. 727 728 Returns 729 ------- 730 A `SuccessTuple` indicating success. 731 """ 732 dt_col = pipe.columns.get('datetime', None) 733 734 existing_df = pipe.get_data( 735 begin=begin, 736 end=end, 737 params=params, 738 debug=debug, 739 ) 740 if existing_df is None or len(existing_df) == 0: 741 return True, "Deleted 0 rows." 742 743 docs = existing_df.to_dict(orient='records') 744 table_name = self.quote_table(pipe.target) 745 indices = [col for col in pipe.columns.values() if col] 746 for doc in docs: 747 set_doc_key = get_document_key(doc, indices) 748 table_doc_key = get_document_key(doc, indices, table_name) 749 try: 750 if dt_col: 751 self.client.zrem(table_name, set_doc_key) 752 else: 753 self.client.srem(table_name, set_doc_key) 754 self.client.delete(table_doc_key) 755 except Exception as e: 756 return False, f"Failed to delete documents:\n{e}" 757 msg = ( 758 f"Deleted {len(docs)} row" 759 + ('s' if len(docs) != 1 else '') 760 + '.' 761 ) 762 return True, msg
Delete rows within begin
, end
, and params
.
Parameters
- pipe (mrsm.Pipe): The pipe whose rows to clear.
- begin (Union[datetime, int, None], default None):
If provided, remove rows >=
begin
. - end (Union[datetime, int, None], default None):
If provided, remove rows <
end
. - params (Optional[Dict[str, Any]], default None):
If provided, only remove rows which match the
params
filter.
Returns
- A
SuccessTuple
indicating success.
765def get_sync_time( 766 self, 767 pipe: mrsm.Pipe, 768 newest: bool = True, 769 **kwargs: Any 770) -> Union[datetime, int, None]: 771 """ 772 Return the newest (or oldest) timestamp in a pipe. 773 """ 774 from meerschaum.utils.dtypes import are_dtypes_equal 775 dt_col = pipe.columns.get('datetime', None) 776 dt_typ = pipe.dtypes.get(dt_col, 'datetime64[ns, UTC]') 777 if not dt_col: 778 return None 779 780 dateutil_parser = mrsm.attempt_import('dateutil.parser') 781 table_name = self.quote_table(pipe.target) 782 try: 783 vals = ( 784 self.client.zrevrange(table_name, 0, 0) 785 if newest 786 else self.client.zrange(table_name, 0, 0) 787 ) 788 if not vals: 789 return None 790 val = vals[0] 791 except Exception: 792 return None 793 794 doc = json.loads(val) 795 dt_val = doc.get(dt_col, None) 796 if dt_val is None: 797 return None 798 799 try: 800 return ( 801 int(dt_val) 802 if are_dtypes_equal(dt_typ, 'int') 803 else dateutil_parser.parse(str(dt_val)) 804 ) 805 except Exception as e: 806 warn(f"Failed to parse sync time for {pipe}:\n{e}") 807 808 return None
Return the newest (or oldest) timestamp in a pipe.
811def get_pipe_rowcount( 812 self, 813 pipe: mrsm.Pipe, 814 begin: Union[datetime, int, None] = None, 815 end: Union[datetime, int, None] = None, 816 params: Optional[Dict[str, Any]] = None, 817 debug: bool = False, 818 **kwargs: Any 819) -> Union[int, None]: 820 """ 821 Return the number of documents in the pipe's set. 822 """ 823 dt_col = pipe.columns.get('datetime', None) 824 table_name = self.quote_table(pipe.target) 825 826 if not pipe.exists(): 827 return 0 828 829 try: 830 if begin is None and end is None and params is None: 831 return ( 832 self.client.zcard(table_name) 833 if dt_col 834 else self.client.llen(table_name) 835 ) 836 except Exception: 837 return None 838 839 df = pipe.get_data(begin=begin, end=end, params=params, debug=debug) 840 if df is None: 841 return 0 842 843 return len(df)
Return the number of documents in the pipe's set.
846def fetch_pipes_keys( 847 self, 848 connector_keys: Optional[List[str]] = None, 849 metric_keys: Optional[List[str]] = None, 850 location_keys: Optional[List[str]] = None, 851 tags: Optional[List[str]] = None, 852 params: Optional[Dict[str, Any]] = None, 853 debug: bool = False 854) -> Optional[List[Tuple[str, str, Optional[str]]]]: 855 """ 856 Return the keys for the registered pipes. 857 """ 858 from meerschaum.utils.dataframe import query_df 859 from meerschaum.utils.misc import separate_negation_values 860 try: 861 df = self.read(PIPES_TABLE, debug=debug) 862 except Exception: 863 return [] 864 865 if df is None or len(df) == 0: 866 return [] 867 868 query = {} 869 if connector_keys: 870 query['connector_keys'] = [str(k) for k in connector_keys] 871 if metric_keys: 872 query['metric_key'] = [str(k) for k in metric_keys] 873 if location_keys: 874 query['location_key'] = [str(k) for k in location_keys] 875 if params: 876 query.update(params) 877 878 df = query_df(df, query, inplace=True) 879 880 keys = [ 881 ( 882 doc['connector_keys'], 883 doc['metric_key'], 884 doc['location_key'], 885 ) 886 for doc in df.to_dict(orient='records') 887 ] 888 if not tags: 889 return keys 890 891 tag_groups = [tag.split(',') for tag in tags] 892 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 893 894 filtered_keys = [] 895 for ck, mk, lk in keys: 896 pipe = mrsm.Pipe(ck, mk, lk, instance=self) 897 pipe_tags = set(pipe.tags) 898 899 include_pipe = True 900 for in_tags, ex_tags in in_ex_tag_groups: 901 all_in = all(tag in pipe_tags for tag in in_tags) 902 any_ex = any(tag in pipe_tags for tag in ex_tags) 903 904 if (not all_in) or any_ex: 905 include_pipe = False 906 continue 907 908 if include_pipe: 909 filtered_keys.append((ck, mk, lk)) 910 911 return filtered_keys
Return the keys for the registered pipes.
17def fetch( 18 self, 19 pipe: mrsm.Pipe, 20 begin: Union[datetime, int, None] = None, 21 end: Union[datetime, int, None] = None, 22 params: Optional[Dict[str, Any]] = None, 23 debug: bool = False, 24 **kwargs: Any 25) -> List[Dict[str, Any]]: 26 """ 27 Return data from a source database. 28 """ 29 source_key = pipe.parameters.get('valkey', {}).get('key', None) 30 if not source_key: 31 return [] 32 33 try: 34 key_type = self.client.type(source_key).decode('utf-8') 35 except Exception: 36 warn(f"Could not determine the type for key '{source_key}'.") 37 return [] 38 39 begin_ts = ( 40 ( 41 int(begin.replace(tzinfo=timezone.utc).timestamp()) 42 if isinstance(begin, datetime) 43 else int(begin) 44 ) 45 if begin is not None else '-inf' 46 ) 47 end_ts = ( 48 ( 49 int(end.replace(tzinfo=timezone.utc).timestamp()) 50 if isinstance(end, datetime) 51 else int(end) 52 ) 53 if end is not None else '+inf' 54 ) 55 56 if debug: 57 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 58 59 if key_type == 'set': 60 return [ 61 json.loads(doc_bytes.decode('utf-8')) 62 for doc_bytes in self.client.smembers(source_key) 63 ] 64 65 if key_type == 'zset': 66 return [ 67 json.loads(doc_bytes.decode('utf-8')) 68 for doc_bytes in self.client.zrangebyscore( 69 source_key, 70 begin_ts, 71 end_ts, 72 withscores=False, 73 ) 74 ] 75 76 return [{source_key: self.get(source_key)}]
Return data from a source database.
20def get_users_pipe(self): 21 """ 22 Return the pipe which stores the registered users. 23 """ 24 return mrsm.Pipe( 25 'mrsm', 'users', 26 columns=['user_id'], 27 temporary=True, 28 target=USERS_TABLE, 29 instance=self, 30 )
Return the pipe which stores the registered users.
33@classmethod 34def get_user_key(cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str: 35 """ 36 Return the key to store metadata about a user. 37 38 Parameters 39 ---------- 40 user_id_or_username: str 41 The user ID or username of the given user. 42 If `by_username` is `True`, then provide the username. 43 44 sub_key: str 45 The key suffix, e.g. `'attributes'`. 46 47 by_username: bool, default False 48 If `True`, then treat `user_id_or_username` as a username. 49 50 Returns 51 ------- 52 A key to store information about a user. 53 54 Examples 55 -------- 56 >>> get_user_key('deadbeef', 'attributes') 57 'mrsm_user:user_id:deadbeef:attributes' 58 >>> get_user_key('foo', 'user_id', by_username=True) 59 'mrsm_user:username:foo:user_id' 60 """ 61 key_type = 'user_id' if not by_username else 'username' 62 return cls.get_entity_key(USER_PREFIX, key_type, user_id_or_username, sub_key)
Return the key to store metadata about a user.
Parameters
- user_id_or_username (str):
The user ID or username of the given user.
If
by_username
isTrue
, then provide the username. - sub_key (str):
The key suffix, e.g.
'attributes'
. - by_username (bool, default False):
If
True
, then treatuser_id_or_username
as a username.
Returns
- A key to store information about a user.
Examples
>>> get_user_key('deadbeef', 'attributes')
'mrsm_user:user_id:deadbeef:attributes'
>>> get_user_key('foo', 'user_id', by_username=True)
'mrsm_user:username:foo:user_id'
65@classmethod 66def get_user_keys_vals( 67 cls, 68 user: 'mrsm.core.User', 69 mutable_only: bool = False, 70) -> Dict[str, str]: 71 """ 72 Return a dictionary containing keys and values to set for the user. 73 74 Parameters 75 ---------- 76 user: mrsm.core.User 77 The user for which to generate the keys. 78 79 mutable_only: bool, default False 80 If `True`, only return keys which may be edited. 81 82 Returns 83 ------- 84 A dictionary mapping a user's keys to values. 85 """ 86 user_attributes_str = json.dumps(user.attributes, separators=(',', ':')) 87 mutable_keys_vals = { 88 cls.get_user_key(user.user_id, 'attributes'): user_attributes_str, 89 cls.get_user_key(user.user_id, 'email'): user.email, 90 cls.get_user_key(user.user_id, 'type'): user.type, 91 cls.get_user_key(user.user_id, 'password_hash'): user.password_hash, 92 } 93 if mutable_only: 94 return mutable_keys_vals 95 96 immutable_keys_vals = { 97 cls.get_user_key(user.user_id, 'username'): user.username, 98 cls.get_user_key(user.username, 'user_id', by_username=True): user.user_id, 99 } 100 101 return {**immutable_keys_vals, **mutable_keys_vals}
Return a dictionary containing keys and values to set for the user.
Parameters
- user (mrsm.core.User): The user for which to generate the keys.
- mutable_only (bool, default False):
If
True
, only return keys which may be edited.
Returns
- A dictionary mapping a user's keys to values.
104def register_user( 105 self, 106 user: 'mrsm.core.User', 107 debug: bool = False, 108 **kwargs: Any 109) -> SuccessTuple: 110 """ 111 Register a new user. 112 """ 113 from meerschaum.utils.misc import generate_password 114 115 user.user_id = generate_password(12) 116 users_pipe = self.get_users_pipe() 117 keys_vals = self.get_user_keys_vals(user) 118 119 try: 120 sync_success, sync_msg = users_pipe.sync( 121 [ 122 { 123 'user_id': user.user_id, 124 'username': user.username, 125 }, 126 ], 127 check_existing=False, 128 debug=debug, 129 ) 130 if not sync_success: 131 return sync_success, sync_msg 132 133 for key, val in keys_vals.items(): 134 if val is not None: 135 self.set(key, val) 136 137 success, msg = True, "Success" 138 except Exception as e: 139 success = False 140 import traceback 141 traceback.print_exc() 142 msg = f"Failed to register '{user.username}':\n{e}" 143 144 if not success: 145 for key in keys_vals: 146 try: 147 self.client.delete(key) 148 except Exception: 149 pass 150 151 return success, msg
Register a new user.
154def get_user_id(self, user: 'mrsm.core.User', debug: bool = False) -> Union[str, None]: 155 """ 156 Return the ID for a user, or `None`. 157 """ 158 username_user_id_key = self.get_user_key(user.username, 'user_id', by_username=True) 159 try: 160 user_id = self.get(username_user_id_key) 161 except Exception: 162 user_id = None 163 return user_id
Return the ID for a user, or None
.
166def edit_user( 167 self, 168 user: 'mrsm.core.User', 169 debug: bool = False, 170 **kw: Any 171) -> SuccessTuple: 172 """ 173 Edit the attributes for an existing user. 174 """ 175 keys_vals = self.get_user_keys_vals(user, mutable_only=True) 176 try: 177 old_keys_vals = { 178 key: self.get(key) 179 for key in keys_vals 180 } 181 except Exception as e: 182 return False, f"Failed to edit user:\n{e}" 183 184 try: 185 for key, val in keys_vals.items(): 186 self.set(key, val) 187 success, msg = True, "Success" 188 except Exception as e: 189 success = False 190 msg = f"Failed to edit user:\n{e}" 191 192 if not success: 193 try: 194 for key, old_val in old_keys_vals.items(): 195 self.set(key, old_val) 196 except Exception: 197 pass 198 199 return success, msg
Edit the attributes for an existing user.
202def get_user_attributes( 203 self, 204 user: 'mrsm.core.User', 205 debug: bool = False 206) -> Union[Dict[str, Any], None]: 207 """ 208 Return the user's attributes. 209 """ 210 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 211 user_id_attributes_key = self.get_user_key(user_id, 'attributes') 212 try: 213 return json.loads(self.get(user_id_attributes_key)) 214 except Exception: 215 return None
Return the user's attributes.
218def delete_user( 219 self, 220 user: 'mrsm.core.User', 221 debug: bool = False 222) -> SuccessTuple: 223 """ 224 Delete a user's keys. 225 """ 226 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 227 users_pipe = self.get_users_pipe() 228 keys_vals = self.get_user_keys_vals(user) 229 try: 230 old_keys_vals = { 231 key: self.get(key) 232 for key in keys_vals 233 } 234 except Exception as e: 235 return False, f"Failed to delete user:\n{e}" 236 237 clear_success, clear_msg = users_pipe.clear(params={'user_id': user_id}) 238 if not clear_success: 239 return clear_success, clear_msg 240 241 try: 242 for key in keys_vals: 243 self.client.delete(key) 244 success, msg = True, "Success" 245 except Exception as e: 246 success = False 247 msg = f"Failed to delete user:\n{e}" 248 249 if not success: 250 try: 251 for key, old_val in old_keys_vals.items(): 252 self.set(key, old_val) 253 except Exception: 254 pass 255 256 return success, msg
Delete a user's keys.
259def get_users( 260 self, 261 debug: bool = False, 262 **kw: Any 263) -> List[str]: 264 """ 265 Get the registered usernames. 266 """ 267 users_pipe = self.get_users_pipe() 268 df = users_pipe.get_data() 269 if df is None: 270 return [] 271 272 return list(df['username'])
Get the registered usernames.
275def get_user_password_hash( 276 self, 277 user: 'mrsm.core.User', 278 debug: bool = False, 279 **kw: Any 280) -> Union[str, None]: 281 """ 282 Return the password has for a user. 283 """ 284 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 285 user_id_password_hash_key = self.get_user_key(user_id, 'password_hash') 286 try: 287 return self.get(user_id_password_hash_key) 288 except Exception: 289 return None
Return the password has for a user.
292def get_user_type( 293 self, 294 user: 'mrsm.core.User', 295 debug: bool = False, 296 **kw: Any 297) -> Union[str, None]: 298 """ 299 Return the user's type. 300 """ 301 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 302 user_id_type_key = self.get_user_key(user_id, 'type') 303 try: 304 return self.get(user_id_type_key) 305 except Exception: 306 return None
Return the user's type.
20def get_plugins_pipe(self) -> mrsm.Pipe: 21 """ 22 Return the pipe to store the plugins. 23 """ 24 return mrsm.Pipe( 25 'mrsm', 'plugins', 26 columns=['plugin_name'], 27 temporary=True, 28 target=PLUGINS_TABLE, 29 instance=self, 30 )
Return the pipe to store the plugins.
33@classmethod 34def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str: 35 """ 36 Return the key for a plugin's attribute. 37 """ 38 return cls.get_entity_key(PLUGIN_PREFIX, plugin_name, sub_key)
Return the key for a plugin's attribute.
41@classmethod 42def get_plugin_keys_vals( 43 cls, 44 plugin: 'mrsm.core.Plugin', 45 mutable_only: bool = False, 46) -> Dict[str, str]: 47 """ 48 Return a dictionary containing keys and values to set for the plugin. 49 50 Parameters 51 ---------- 52 plugin: mrsm.core.Plugin 53 The plugin for which to generate the keys. 54 55 mutable_only: bool, default False 56 If `True`, only return keys which may be edited. 57 58 Returns 59 ------- 60 A dictionary mapping a plugins's keys to values. 61 """ 62 plugin_attributes_str = json.dumps(plugin.attributes, separators=(',', ':')) 63 mutable_keys_vals = { 64 cls.get_plugin_key(plugin.name, 'attributes'): plugin_attributes_str, 65 cls.get_plugin_key(plugin.name, 'version'): plugin.version, 66 } 67 if mutable_only: 68 return mutable_keys_vals 69 70 immutable_keys_vals = { 71 cls.get_plugin_key(plugin.name, 'user_id'): plugin.user_id, 72 } 73 74 return {**immutable_keys_vals, **mutable_keys_vals}
Return a dictionary containing keys and values to set for the plugin.
Parameters
- plugin (mrsm.core.Plugin): The plugin for which to generate the keys.
- mutable_only (bool, default False):
If
True
, only return keys which may be edited.
Returns
- A dictionary mapping a plugins's keys to values.
77def register_plugin( 78 self, 79 plugin: 'mrsm.core.Plugin', 80 force: bool = False, 81 debug: bool = False, 82 **kw: Any 83) -> SuccessTuple: 84 """Register a new plugin to the `mrsm_plugins` "table".""" 85 from meerschaum.utils.misc import generate_password 86 87 plugins_pipe = self.get_plugins_pipe() 88 keys_vals = self.get_plugin_keys_vals(plugin) 89 90 try: 91 sync_success, sync_msg = plugins_pipe.sync( 92 [ 93 { 94 'plugin_name': plugin.name, 95 'user_id': plugin.user_id, 96 }, 97 ], 98 check_existing=False, 99 debug=debug, 100 ) 101 if not sync_success: 102 return sync_success, sync_msg 103 104 for key, val in keys_vals.items(): 105 if val is not None: 106 self.set(key, val) 107 108 success, msg = True, "Success" 109 except Exception as e: 110 success = False 111 msg = f"Failed to register plugin '{plugin.name}':\n{e}" 112 113 if not success: 114 for key in keys_vals: 115 try: 116 self.client.delete(key) 117 except Exception: 118 pass 119 120 return success, msg
Register a new plugin to the mrsm_plugins
"table".
123def get_plugin_id( 124 self, 125 plugin: 'mrsm.core.Plugin', 126 debug: bool = False 127) -> Union[str, None]: 128 """ 129 Return a plugin's ID. 130 """ 131 return plugin.name
Return a plugin's ID.
134def get_plugin_version( 135 self, 136 plugin: 'mrsm.core.Plugin', 137 debug: bool = False, 138) -> Union[str, None]: 139 """ 140 Return a plugin's version. 141 """ 142 version_key = self.get_plugin_key(plugin.name, 'version') 143 144 try: 145 return self.get(version_key) 146 except Exception: 147 return None
Return a plugin's version.
150def get_plugin_user_id( 151 self, 152 plugin: 'mrsm.core.Plugin', 153 debug: bool = False 154) -> Union[str, None]: 155 """ 156 Return a plugin's user ID. 157 """ 158 user_id_key = self.get_plugin_key(plugin.name, 'user_id') 159 160 try: 161 return self.get(user_id_key) 162 except Exception: 163 return None
Return a plugin's user ID.
166def get_plugin_username( 167 self, 168 plugin: 'mrsm.core.Plugin', 169 debug: bool = False 170) -> Union[str]: 171 """ 172 Return the username of a plugin's owner. 173 """ 174 user_id = self.get_plugin_user_id(plugin, debug=debug) 175 if user_id is None: 176 return None 177 178 username_key = self.get_user_key(user_id, 'username') 179 try: 180 return self.get(username_key) 181 except Exception: 182 return None
Return the username of a plugin's owner.
185def get_plugin_attributes( 186 self, 187 plugin: 'mrsm.core.Plugin', 188 debug: bool = False 189) -> Dict[str, Any]: 190 """ 191 Return the attributes of a plugin. 192 """ 193 attributes_key = self.get_plugin_key(plugin.name, 'attributes') 194 try: 195 attributes_str = self.get(attributes_key) 196 if not attributes_str: 197 return {} 198 return json.loads(attributes_str) 199 except Exception: 200 return {}
Return the attributes of a plugin.
203def get_plugins( 204 self, 205 user_id: Optional[int] = None, 206 search_term: Optional[str] = None, 207 debug: bool = False, 208 **kw: Any 209) -> List[str]: 210 """ 211 Return a list of plugin names. 212 """ 213 plugins_pipe = self.get_plugins_pipe() 214 params = {} 215 if user_id: 216 params['user_id'] = user_id 217 218 df = plugins_pipe.get_data(['plugin_name'], params=params, debug=debug) 219 docs = df.to_dict(orient='records') 220 221 return [ 222 doc['plugin_name'] 223 for doc in docs 224 if (plugin_name := doc['plugin_name']).startswith(search_term or '') 225 ]
Return a list of plugin names.
228def delete_plugin( 229 self, 230 plugin: 'mrsm.core.Plugin', 231 debug: bool = False, 232 **kw: Any 233) -> SuccessTuple: 234 """ 235 Delete a plugin from the plugins table. 236 """ 237 plugins_pipe = self.get_plugins_pipe() 238 clear_success, clear_msg = plugins_pipe.clear(params={'plugin_name': plugin.name}, debug=debug) 239 if not clear_success: 240 return clear_success, clear_msg 241 242 keys_vals = self.get_plugin_keys_vals(plugin) 243 try: 244 old_keys_vals = { 245 key: self.get(key) 246 for key in keys_vals 247 } 248 except Exception as e: 249 return False, f"Failed to delete plugin '{plugin.name}':\n{e}" 250 251 try: 252 for key in keys_vals: 253 self.client.delete(key) 254 success, msg = True, "Success" 255 except Exception as e: 256 success = False 257 msg = f"Failed to delete plugin '{plugin.name}':\n{e}" 258 259 if not success: 260 try: 261 for key, old_val in old_keys_vals.items(): 262 self.set(key, old_val) 263 except Exception: 264 pass 265 266 return success, msg
Delete a plugin from the plugins table.