meerschaum.connectors.valkey
Import the ValkeyConnector
.
19@make_connector 20class ValkeyConnector(Connector): 21 """ 22 Manage a Valkey instance. 23 24 Build a `ValkeyConnector` from connection attributes or a URI string. 25 """ 26 IS_INSTANCE: bool = True 27 REQUIRED_ATTRIBUTES: List[str] = ['host'] 28 OPTIONAL_ATTRIBUTES: List[str] = [ 29 'port', 'username', 'password', 'db', 'socket_timeout', 30 ] 31 DEFAULT_ATTRIBUTES: Dict[str, Any] = { 32 'username': 'default', 33 'port': 6379, 34 'db': 0, 35 'socket_timeout': 300, 36 } 37 KEY_SEPARATOR: str = ':' 38 39 from ._pipes import ( 40 register_pipe, 41 get_pipe_id, 42 get_pipe_attributes, 43 edit_pipe, 44 pipe_exists, 45 drop_pipe, 46 delete_pipe, 47 get_pipe_data, 48 sync_pipe, 49 get_pipe_columns_types, 50 clear_pipe, 51 get_sync_time, 52 get_pipe_rowcount, 53 fetch_pipes_keys, 54 ) 55 from ._fetch import ( 56 fetch, 57 ) 58 59 from ._users import ( 60 get_users_pipe, 61 get_user_key, 62 get_user_keys_vals, 63 register_user, 64 get_user_id, 65 edit_user, 66 get_user_attributes, 67 delete_user, 68 get_users, 69 get_user_password_hash, 70 get_user_type, 71 ) 72 from ._plugins import ( 73 get_plugins_pipe, 74 get_plugin_key, 75 get_plugin_keys_vals, 76 register_plugin, 77 get_plugin_id, 78 get_plugin_version, 79 get_plugin_user_id, 80 get_plugin_username, 81 get_plugin_attributes, 82 get_plugins, 83 delete_plugin, 84 ) 85 86 @property 87 def client(self): 88 """ 89 Return the Valkey client. 90 """ 91 if '_client' in self.__dict__: 92 return self.__dict__['_client'] 93 94 valkey = mrsm.attempt_import('valkey') 95 96 if 'uri' in self.__dict__: 97 self._client = valkey.Valkey.from_url(self.__dict__.get('uri')) 98 return self._client 99 100 optional_kwargs = { 101 key: self.__dict__.get(key) 102 for key in self.OPTIONAL_ATTRIBUTES 103 if key in self.__dict__ 104 } 105 connection_kwargs = { 106 'host': self.host, 107 **optional_kwargs 108 } 109 110 self._client = valkey.Valkey(**connection_kwargs) 111 return self._client 112 113 @property 114 def URI(self) -> str: 115 """ 116 Return the connection URI for this connector. 117 """ 118 import urllib.parse 119 120 if 'uri' in self.__dict__: 121 return self.__dict__.get('uri') 122 123 uri = "valkey://" 124 if 'username' in self.__dict__: 125 uri += urllib.parse.quote_plus(self.username) + ':' 126 127 if 'password' in self.__dict__: 128 uri += urllib.parse.quote_plus(self.password) + '@' 129 130 if 'host' in self.__dict__: 131 uri += self.host 132 133 if 'port' in self.__dict__: 134 uri += f':{self.port}' 135 136 if 'db' in self.__dict__: 137 uri += f"/{self.db}" 138 139 if 'socket_timeout' in self.__dict__: 140 uri += f"?timeout={self.socket_timeout}s" 141 142 return uri 143 144 def set(self, key: str, value: Any, **kwargs: Any) -> None: 145 """ 146 Set the `key` to `value`. 147 """ 148 return self.client.set(key, value, **kwargs) 149 150 def get(self, key: str) -> Union[str, None]: 151 """ 152 Get the value for `key`. 153 """ 154 val = self.client.get(key) 155 if val is None: 156 return None 157 158 return val.decode('utf-8') 159 160 def test_connection(self) -> bool: 161 """ 162 Return whether a connection may be established. 163 """ 164 return self.client.ping() 165 166 @classmethod 167 def quote_table(cls, table: str) -> str: 168 """ 169 Return a quoted key. 170 """ 171 return shlex.quote(table) 172 173 @classmethod 174 def get_counter_key(cls, table: str) -> str: 175 """ 176 Return the counter key for a given table. 177 """ 178 table_name = cls.quote_table(table) 179 return f"{table_name}:counter" 180 181 def push_df( 182 self, 183 df: 'pd.DataFrame', 184 table: str, 185 datetime_column: Optional[str] = None, 186 debug: bool = False, 187 ) -> int: 188 """ 189 Append a pandas DataFrame to a table. 190 191 Parameters 192 ---------- 193 df: pd.DataFrame 194 The pandas DataFrame to append to the table. 195 196 table: str 197 The "table" name (root key). 198 199 datetime_column: Optional[str], default None 200 If provided, use this key as the datetime index. 201 202 Returns 203 ------- 204 The current index counter value (how many docs have been pushed). 205 """ 206 from meerschaum.utils.dataframe import to_json 207 docs_str = to_json(df) 208 docs = json.loads(docs_str) 209 return self.push_docs( 210 docs, 211 table, 212 datetime_column=datetime_column, 213 debug=debug, 214 ) 215 216 def push_docs( 217 self, 218 docs: List[Dict[str, Any]], 219 table: str, 220 datetime_column: Optional[str] = None, 221 debug: bool = False, 222 ) -> int: 223 """ 224 Append a list of documents to a table. 225 226 Parameters 227 ---------- 228 docs: List[Dict[str, Any]] 229 The docs to be pushed. 230 All keys and values will be coerced into strings. 231 232 table: str 233 The "table" name (root key). 234 235 datetime_column: Optional[str], default None 236 If set, create a sorted set with this datetime column as the index. 237 Otherwise push the docs to a list. 238 239 Returns 240 ------- 241 The current index counter value (how many docs have been pushed). 242 """ 243 from meerschaum.utils.misc import json_serialize_datetime 244 table_name = self.quote_table(table) 245 datetime_column_key = self.get_datetime_column_key(table) 246 remote_datetime_column = self.get(datetime_column_key) 247 datetime_column = datetime_column or remote_datetime_column 248 dateutil_parser = mrsm.attempt_import('dateutil.parser') 249 250 old_len = ( 251 self.client.zcard(table_name) 252 if datetime_column 253 else self.client.scard(table_name) 254 ) 255 for doc in docs: 256 original_dt_val = ( 257 doc[datetime_column] 258 if datetime_column and datetime_column in doc 259 else 0 260 ) 261 dt_val = ( 262 dateutil_parser.parse(str(original_dt_val)) 263 if not isinstance(original_dt_val, int) 264 else int(original_dt_val) 265 ) if datetime_column else None 266 ts = ( 267 int(dt_val.replace(tzinfo=timezone.utc).timestamp()) 268 if isinstance(dt_val, datetime) 269 else int(dt_val) 270 ) if datetime_column else None 271 doc_str = json.dumps( 272 doc, 273 default=(lambda x: json_serialize_datetime(x) if hasattr(x, 'tzinfo') else str(x)), 274 separators=(',', ':'), 275 sort_keys=True, 276 ) 277 if datetime_column: 278 self.client.zadd(table_name, {doc_str: ts}) 279 else: 280 self.client.sadd(table_name, doc_str) 281 282 if datetime_column: 283 self.set(datetime_column_key, datetime_column) 284 new_len = ( 285 self.client.zcard(table_name) 286 if datetime_column 287 else self.client.scard(table_name) 288 ) 289 290 return new_len - old_len 291 292 def _push_hash_docs_to_list(self, docs: List[Dict[str, Any]], table: str) -> int: 293 table_name = self.quote_table(table) 294 next_ix = max(self.client.llen(table_name) or 0, 1) 295 for i, doc in enumerate(docs): 296 doc_key = f"{table_name}:{next_ix + i}" 297 self.client.hset( 298 doc_key, 299 mapping={ 300 str(k): str(v) 301 for k, v in doc.items() 302 }, 303 ) 304 self.client.rpush(table_name, doc_key) 305 306 return next_ix + len(docs) 307 308 def get_datetime_column_key(self, table: str) -> str: 309 """ 310 Return the key to store the datetime index for `table`. 311 """ 312 table_name = self.quote_table(table) 313 return f'{table_name}:datetime_column' 314 315 def read( 316 self, 317 table: str, 318 begin: Union[datetime, int, str, None] = None, 319 end: Union[datetime, int, str, None] = None, 320 params: Optional[Dict[str, Any]] = None, 321 datetime_column: Optional[str] = None, 322 select_columns: Optional[List[str]] = None, 323 omit_columns: Optional[List[str]] = None, 324 debug: bool = False 325 ) -> Union['pd.DataFrame', None]: 326 """ 327 Query the table and return the result dataframe. 328 329 Parameters 330 ---------- 331 table: str 332 The "table" name to be queried. 333 334 begin: Union[datetime, int, str, None], default None 335 If provided, only return rows greater than or equal to this datetime. 336 337 end: Union[datetime, int, str, None], default None 338 If provided, only return rows older than this datetime. 339 340 params: Optional[Dict[str, Any]] 341 Additional Meerschaum filter parameters. 342 343 datetime_column: Optional[str], default None 344 If provided, use this column for the datetime index. 345 Otherwise infer from the table metadata. 346 347 select_columns: Optional[List[str]], default None 348 If provided, only return these columns. 349 350 omit_columns: Optional[List[str]], default None 351 If provided, do not include these columns in the result. 352 353 Returns 354 ------- 355 A Pandas DataFrame of the result, or `None`. 356 """ 357 from meerschaum.utils.dataframe import parse_df_datetimes, query_df 358 docs = self.read_docs( 359 table, 360 begin=begin, 361 end=end, 362 debug=debug, 363 ) 364 df = parse_df_datetimes(docs) 365 datetime_column_key = self.get_datetime_column_key(table) 366 datetime_column = datetime_column or self.get(datetime_column_key) 367 368 return query_df( 369 df, 370 begin=(begin if datetime_column is not None else None), 371 end=(end if datetime_column is not None else None), 372 params=params, 373 datetime_column=datetime_column, 374 select_columns=select_columns, 375 omit_columns=omit_columns, 376 inplace=True, 377 reset_index=True, 378 debug=debug, 379 ) 380 381 def read_docs( 382 self, 383 table: str, 384 begin: Union[datetime, int, str, None] = None, 385 end: Union[datetime, int, str, None] = None, 386 debug: bool = False, 387 ) -> List[Dict[str, str]]: 388 """ 389 Return a list of previously pushed docs. 390 391 Parameters 392 ---------- 393 table: str 394 The "table" name (root key) under which the docs were pushed. 395 396 begin: Union[datetime, int, str, None], default None 397 If provided and the table was created with a datetime index, only return documents 398 newer than this datetime. 399 If the table was not created with a datetime index and `begin` is an `int`, 400 return documents with a positional index greater than or equal to this value. 401 402 end: Union[datetime, int, str, None], default None 403 If provided and the table was created with a datetime index, only return documents 404 older than this datetime. 405 If the table was not created with a datetime index and `begin` is an `int`, 406 return documents with a positional index less than this value. 407 408 Returns 409 ------- 410 A list of dictionaries, where all keys and values are strings. 411 """ 412 from meerschaum.utils.dtypes import coerce_timezone 413 table_name = self.quote_table(table) 414 datetime_column_key = self.get_datetime_column_key(table) 415 datetime_column = self.get(datetime_column_key) 416 417 if debug: 418 dprint(f"Reading documents from '{table}' with {begin=}, {end=}") 419 420 if not datetime_column: 421 return [ 422 json.loads(doc_bytes.decode('utf-8')) 423 for doc_bytes in self.client.smembers(table_name) 424 ] 425 426 dateutil_parser = mrsm.attempt_import('dateutil.parser') 427 428 if isinstance(begin, str): 429 begin = coerce_timezone(dateutil_parser.parse(begin)) 430 431 if isinstance(end, str): 432 end = coerce_timezone(dateutil_parser.parse(end)) 433 434 begin_ts = ( 435 ( 436 int(begin.replace(tzinfo=timezone.utc).timestamp()) 437 if isinstance(begin, datetime) 438 else int(begin) 439 ) 440 if begin is not None else '-inf' 441 ) 442 end_ts = ( 443 ( 444 int(end.replace(tzinfo=timezone.utc).timestamp()) 445 if isinstance(end, datetime) 446 else int(end) 447 ) 448 if end is not None else '+inf' 449 ) 450 451 if debug: 452 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 453 454 return [ 455 json.loads(doc_bytes.decode('utf-8')) 456 for doc_bytes in self.client.zrangebyscore( 457 table_name, 458 begin_ts, 459 end_ts, 460 withscores=False, 461 ) 462 ] 463 464 def _read_docs_from_list( 465 self, 466 table: str, 467 begin_ix: Optional[int] = 0, 468 end_ix: Optional[int] = -1, 469 debug: bool = False, 470 ): 471 """ 472 Read a list of documents from a "table". 473 474 Parameters 475 ---------- 476 table: str 477 The "table" (root key) from which to read docs. 478 479 begin_ix: Optional[int], default 0 480 If provided, only read documents from this starting index. 481 482 end_ix: Optional[int], default -1 483 If provided, only read documents up to (not including) this index. 484 485 Returns 486 ------- 487 A list of documents. 488 """ 489 if begin_ix is None: 490 begin_ix = 0 491 492 if end_ix == 0: 493 return 494 495 if end_ix is None: 496 end_ix = -1 497 else: 498 end_ix -= 1 499 500 table_name = self.quote_table(table) 501 doc_keys = self.client.lrange(table_name, begin_ix, end_ix) 502 for doc_key in doc_keys: 503 yield { 504 key.decode('utf-8'): value.decode('utf-8') 505 for key, value in self.client.hgetall(doc_key).items() 506 } 507 508 def drop_table(self, table: str, debug: bool = False) -> None: 509 """ 510 Drop a "table" of documents. 511 512 Parameters 513 ---------- 514 table: str 515 The "table" name (root key) to be deleted. 516 """ 517 table_name = self.quote_table(table) 518 datetime_column_key = self.get_datetime_column_key(table) 519 self.client.delete(table_name) 520 self.client.delete(datetime_column_key) 521 522 @classmethod 523 def get_entity_key(cls, *keys: Any) -> str: 524 """ 525 Return a joined key to set an entity. 526 """ 527 if not keys: 528 raise ValueError("No keys to be joined.") 529 530 for key in keys: 531 if cls.KEY_SEPARATOR in str(key): 532 raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.") 533 534 return cls.KEY_SEPARATOR.join([str(key) for key in keys])
Manage a Valkey instance.
Build a ValkeyConnector
from connection attributes or a URI string.
86 @property 87 def client(self): 88 """ 89 Return the Valkey client. 90 """ 91 if '_client' in self.__dict__: 92 return self.__dict__['_client'] 93 94 valkey = mrsm.attempt_import('valkey') 95 96 if 'uri' in self.__dict__: 97 self._client = valkey.Valkey.from_url(self.__dict__.get('uri')) 98 return self._client 99 100 optional_kwargs = { 101 key: self.__dict__.get(key) 102 for key in self.OPTIONAL_ATTRIBUTES 103 if key in self.__dict__ 104 } 105 connection_kwargs = { 106 'host': self.host, 107 **optional_kwargs 108 } 109 110 self._client = valkey.Valkey(**connection_kwargs) 111 return self._client
Return the Valkey client.
113 @property 114 def URI(self) -> str: 115 """ 116 Return the connection URI for this connector. 117 """ 118 import urllib.parse 119 120 if 'uri' in self.__dict__: 121 return self.__dict__.get('uri') 122 123 uri = "valkey://" 124 if 'username' in self.__dict__: 125 uri += urllib.parse.quote_plus(self.username) + ':' 126 127 if 'password' in self.__dict__: 128 uri += urllib.parse.quote_plus(self.password) + '@' 129 130 if 'host' in self.__dict__: 131 uri += self.host 132 133 if 'port' in self.__dict__: 134 uri += f':{self.port}' 135 136 if 'db' in self.__dict__: 137 uri += f"/{self.db}" 138 139 if 'socket_timeout' in self.__dict__: 140 uri += f"?timeout={self.socket_timeout}s" 141 142 return uri
Return the connection URI for this connector.
144 def set(self, key: str, value: Any, **kwargs: Any) -> None: 145 """ 146 Set the `key` to `value`. 147 """ 148 return self.client.set(key, value, **kwargs)
Set the key
to value
.
150 def get(self, key: str) -> Union[str, None]: 151 """ 152 Get the value for `key`. 153 """ 154 val = self.client.get(key) 155 if val is None: 156 return None 157 158 return val.decode('utf-8')
Get the value for key
.
160 def test_connection(self) -> bool: 161 """ 162 Return whether a connection may be established. 163 """ 164 return self.client.ping()
Return whether a connection may be established.
166 @classmethod 167 def quote_table(cls, table: str) -> str: 168 """ 169 Return a quoted key. 170 """ 171 return shlex.quote(table)
Return a quoted key.
173 @classmethod 174 def get_counter_key(cls, table: str) -> str: 175 """ 176 Return the counter key for a given table. 177 """ 178 table_name = cls.quote_table(table) 179 return f"{table_name}:counter"
Return the counter key for a given table.
181 def push_df( 182 self, 183 df: 'pd.DataFrame', 184 table: str, 185 datetime_column: Optional[str] = None, 186 debug: bool = False, 187 ) -> int: 188 """ 189 Append a pandas DataFrame to a table. 190 191 Parameters 192 ---------- 193 df: pd.DataFrame 194 The pandas DataFrame to append to the table. 195 196 table: str 197 The "table" name (root key). 198 199 datetime_column: Optional[str], default None 200 If provided, use this key as the datetime index. 201 202 Returns 203 ------- 204 The current index counter value (how many docs have been pushed). 205 """ 206 from meerschaum.utils.dataframe import to_json 207 docs_str = to_json(df) 208 docs = json.loads(docs_str) 209 return self.push_docs( 210 docs, 211 table, 212 datetime_column=datetime_column, 213 debug=debug, 214 )
Append a pandas DataFrame to a table.
Parameters
- df (pd.DataFrame): The pandas DataFrame to append to the table.
- table (str): The "table" name (root key).
- datetime_column (Optional[str], default None): If provided, use this key as the datetime index.
Returns
- The current index counter value (how many docs have been pushed).
216 def push_docs( 217 self, 218 docs: List[Dict[str, Any]], 219 table: str, 220 datetime_column: Optional[str] = None, 221 debug: bool = False, 222 ) -> int: 223 """ 224 Append a list of documents to a table. 225 226 Parameters 227 ---------- 228 docs: List[Dict[str, Any]] 229 The docs to be pushed. 230 All keys and values will be coerced into strings. 231 232 table: str 233 The "table" name (root key). 234 235 datetime_column: Optional[str], default None 236 If set, create a sorted set with this datetime column as the index. 237 Otherwise push the docs to a list. 238 239 Returns 240 ------- 241 The current index counter value (how many docs have been pushed). 242 """ 243 from meerschaum.utils.misc import json_serialize_datetime 244 table_name = self.quote_table(table) 245 datetime_column_key = self.get_datetime_column_key(table) 246 remote_datetime_column = self.get(datetime_column_key) 247 datetime_column = datetime_column or remote_datetime_column 248 dateutil_parser = mrsm.attempt_import('dateutil.parser') 249 250 old_len = ( 251 self.client.zcard(table_name) 252 if datetime_column 253 else self.client.scard(table_name) 254 ) 255 for doc in docs: 256 original_dt_val = ( 257 doc[datetime_column] 258 if datetime_column and datetime_column in doc 259 else 0 260 ) 261 dt_val = ( 262 dateutil_parser.parse(str(original_dt_val)) 263 if not isinstance(original_dt_val, int) 264 else int(original_dt_val) 265 ) if datetime_column else None 266 ts = ( 267 int(dt_val.replace(tzinfo=timezone.utc).timestamp()) 268 if isinstance(dt_val, datetime) 269 else int(dt_val) 270 ) if datetime_column else None 271 doc_str = json.dumps( 272 doc, 273 default=(lambda x: json_serialize_datetime(x) if hasattr(x, 'tzinfo') else str(x)), 274 separators=(',', ':'), 275 sort_keys=True, 276 ) 277 if datetime_column: 278 self.client.zadd(table_name, {doc_str: ts}) 279 else: 280 self.client.sadd(table_name, doc_str) 281 282 if datetime_column: 283 self.set(datetime_column_key, datetime_column) 284 new_len = ( 285 self.client.zcard(table_name) 286 if datetime_column 287 else self.client.scard(table_name) 288 ) 289 290 return new_len - old_len
Append a list of documents to a table.
Parameters
- docs (List[Dict[str, Any]]): The docs to be pushed. All keys and values will be coerced into strings.
- table (str): The "table" name (root key).
- datetime_column (Optional[str], default None): If set, create a sorted set with this datetime column as the index. Otherwise push the docs to a list.
Returns
- The current index counter value (how many docs have been pushed).
308 def get_datetime_column_key(self, table: str) -> str: 309 """ 310 Return the key to store the datetime index for `table`. 311 """ 312 table_name = self.quote_table(table) 313 return f'{table_name}:datetime_column'
Return the key to store the datetime index for table
.
315 def read( 316 self, 317 table: str, 318 begin: Union[datetime, int, str, None] = None, 319 end: Union[datetime, int, str, None] = None, 320 params: Optional[Dict[str, Any]] = None, 321 datetime_column: Optional[str] = None, 322 select_columns: Optional[List[str]] = None, 323 omit_columns: Optional[List[str]] = None, 324 debug: bool = False 325 ) -> Union['pd.DataFrame', None]: 326 """ 327 Query the table and return the result dataframe. 328 329 Parameters 330 ---------- 331 table: str 332 The "table" name to be queried. 333 334 begin: Union[datetime, int, str, None], default None 335 If provided, only return rows greater than or equal to this datetime. 336 337 end: Union[datetime, int, str, None], default None 338 If provided, only return rows older than this datetime. 339 340 params: Optional[Dict[str, Any]] 341 Additional Meerschaum filter parameters. 342 343 datetime_column: Optional[str], default None 344 If provided, use this column for the datetime index. 345 Otherwise infer from the table metadata. 346 347 select_columns: Optional[List[str]], default None 348 If provided, only return these columns. 349 350 omit_columns: Optional[List[str]], default None 351 If provided, do not include these columns in the result. 352 353 Returns 354 ------- 355 A Pandas DataFrame of the result, or `None`. 356 """ 357 from meerschaum.utils.dataframe import parse_df_datetimes, query_df 358 docs = self.read_docs( 359 table, 360 begin=begin, 361 end=end, 362 debug=debug, 363 ) 364 df = parse_df_datetimes(docs) 365 datetime_column_key = self.get_datetime_column_key(table) 366 datetime_column = datetime_column or self.get(datetime_column_key) 367 368 return query_df( 369 df, 370 begin=(begin if datetime_column is not None else None), 371 end=(end if datetime_column is not None else None), 372 params=params, 373 datetime_column=datetime_column, 374 select_columns=select_columns, 375 omit_columns=omit_columns, 376 inplace=True, 377 reset_index=True, 378 debug=debug, 379 )
Query the table and return the result dataframe.
Parameters
- table (str): The "table" name to be queried.
- begin (Union[datetime, int, str, None], default None): If provided, only return rows greater than or equal to this datetime.
- end (Union[datetime, int, str, None], default None): If provided, only return rows older than this datetime.
- params (Optional[Dict[str, Any]]): Additional Meerschaum filter parameters.
- datetime_column (Optional[str], default None): If provided, use this column for the datetime index. Otherwise infer from the table metadata.
- select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
Returns
- A Pandas DataFrame of the result, or
None
.
381 def read_docs( 382 self, 383 table: str, 384 begin: Union[datetime, int, str, None] = None, 385 end: Union[datetime, int, str, None] = None, 386 debug: bool = False, 387 ) -> List[Dict[str, str]]: 388 """ 389 Return a list of previously pushed docs. 390 391 Parameters 392 ---------- 393 table: str 394 The "table" name (root key) under which the docs were pushed. 395 396 begin: Union[datetime, int, str, None], default None 397 If provided and the table was created with a datetime index, only return documents 398 newer than this datetime. 399 If the table was not created with a datetime index and `begin` is an `int`, 400 return documents with a positional index greater than or equal to this value. 401 402 end: Union[datetime, int, str, None], default None 403 If provided and the table was created with a datetime index, only return documents 404 older than this datetime. 405 If the table was not created with a datetime index and `begin` is an `int`, 406 return documents with a positional index less than this value. 407 408 Returns 409 ------- 410 A list of dictionaries, where all keys and values are strings. 411 """ 412 from meerschaum.utils.dtypes import coerce_timezone 413 table_name = self.quote_table(table) 414 datetime_column_key = self.get_datetime_column_key(table) 415 datetime_column = self.get(datetime_column_key) 416 417 if debug: 418 dprint(f"Reading documents from '{table}' with {begin=}, {end=}") 419 420 if not datetime_column: 421 return [ 422 json.loads(doc_bytes.decode('utf-8')) 423 for doc_bytes in self.client.smembers(table_name) 424 ] 425 426 dateutil_parser = mrsm.attempt_import('dateutil.parser') 427 428 if isinstance(begin, str): 429 begin = coerce_timezone(dateutil_parser.parse(begin)) 430 431 if isinstance(end, str): 432 end = coerce_timezone(dateutil_parser.parse(end)) 433 434 begin_ts = ( 435 ( 436 int(begin.replace(tzinfo=timezone.utc).timestamp()) 437 if isinstance(begin, datetime) 438 else int(begin) 439 ) 440 if begin is not None else '-inf' 441 ) 442 end_ts = ( 443 ( 444 int(end.replace(tzinfo=timezone.utc).timestamp()) 445 if isinstance(end, datetime) 446 else int(end) 447 ) 448 if end is not None else '+inf' 449 ) 450 451 if debug: 452 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 453 454 return [ 455 json.loads(doc_bytes.decode('utf-8')) 456 for doc_bytes in self.client.zrangebyscore( 457 table_name, 458 begin_ts, 459 end_ts, 460 withscores=False, 461 ) 462 ]
Return a list of previously pushed docs.
Parameters
- table (str): The "table" name (root key) under which the docs were pushed.
- begin (Union[datetime, int, str, None], default None):
If provided and the table was created with a datetime index, only return documents
newer than this datetime.
If the table was not created with a datetime index and
begin
is anint
, return documents with a positional index greater than or equal to this value. - end (Union[datetime, int, str, None], default None):
If provided and the table was created with a datetime index, only return documents
older than this datetime.
If the table was not created with a datetime index and
begin
is anint
, return documents with a positional index less than this value.
Returns
- A list of dictionaries, where all keys and values are strings.
508 def drop_table(self, table: str, debug: bool = False) -> None: 509 """ 510 Drop a "table" of documents. 511 512 Parameters 513 ---------- 514 table: str 515 The "table" name (root key) to be deleted. 516 """ 517 table_name = self.quote_table(table) 518 datetime_column_key = self.get_datetime_column_key(table) 519 self.client.delete(table_name) 520 self.client.delete(datetime_column_key)
Drop a "table" of documents.
Parameters
- table (str): The "table" name (root key) to be deleted.
522 @classmethod 523 def get_entity_key(cls, *keys: Any) -> str: 524 """ 525 Return a joined key to set an entity. 526 """ 527 if not keys: 528 raise ValueError("No keys to be joined.") 529 530 for key in keys: 531 if cls.KEY_SEPARATOR in str(key): 532 raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.") 533 534 return cls.KEY_SEPARATOR.join([str(key) for key in keys])
Return a joined key to set an entity.
126def register_pipe( 127 self, 128 pipe: mrsm.Pipe, 129 debug: bool = False, 130 **kwargs: Any 131) -> SuccessTuple: 132 """ 133 Insert the pipe's attributes into the internal `pipes` table. 134 135 Parameters 136 ---------- 137 pipe: mrsm.Pipe 138 The pipe to be registered. 139 140 Returns 141 ------- 142 A `SuccessTuple` of the result. 143 """ 144 attributes = { 145 'connector_keys': str(pipe.connector_keys), 146 'metric_key': str(pipe.metric_key), 147 'location_key': str(pipe.location_key), 148 } 149 parameters_str = json.dumps( 150 pipe._attributes.get('parameters', {}), 151 separators=(',', ':'), 152 ) 153 154 pipe_key = get_pipe_key(pipe) 155 parameters_key = get_pipe_parameters_key(pipe) 156 157 try: 158 existing_pipe_id = self.get(pipe_key) 159 if existing_pipe_id is not None: 160 return False, f"{pipe} is already registered." 161 162 pipe_id = self.client.incr(PIPES_COUNTER) 163 _ = self.push_docs( 164 [{'pipe_id': pipe_id, **attributes}], 165 PIPES_TABLE, 166 datetime_column='pipe_id', 167 debug=debug, 168 ) 169 self.set(pipe_key, pipe_id) 170 self.set(parameters_key, parameters_str) 171 172 except Exception as e: 173 return False, f"Failed to register {pipe}:\n{e}" 174 175 return True, "Success"
Insert the pipe's attributes into the internal pipes
table.
Parameters
- pipe (mrsm.Pipe): The pipe to be registered.
Returns
- A
SuccessTuple
of the result.
178def get_pipe_id( 179 self, 180 pipe: mrsm.Pipe, 181 debug: bool = False, 182 **kwargs: Any 183) -> Union[str, int, None]: 184 """ 185 Return the `_id` for the pipe if it exists. 186 187 Parameters 188 ---------- 189 pipe: mrsm.Pipe 190 The pipe whose `_id` to fetch. 191 192 Returns 193 ------- 194 The `_id` for the pipe's document or `None`. 195 """ 196 pipe_key = get_pipe_key(pipe) 197 try: 198 return int(self.get(pipe_key)) 199 except Exception: 200 pass 201 return None
Return the _id
for the pipe if it exists.
Parameters
- pipe (mrsm.Pipe):
The pipe whose
_id
to fetch.
Returns
- The
_id
for the pipe's document orNone
.
204def get_pipe_attributes( 205 self, 206 pipe: mrsm.Pipe, 207 debug: bool = False, 208 **kwargs: Any 209) -> Dict[str, Any]: 210 """ 211 Return the pipe's document from the internal `pipes` collection. 212 213 Parameters 214 ---------- 215 pipe: mrsm.Pipe 216 The pipe whose attributes should be retrieved. 217 218 Returns 219 ------- 220 The document that matches the keys of the pipe. 221 """ 222 pipe_id = pipe.get_id(debug=debug) 223 if pipe_id is None: 224 return {} 225 226 parameters_key = get_pipe_parameters_key(pipe) 227 parameters_str = self.get(parameters_key) 228 229 parameters = json.loads(parameters_str) if parameters_str else {} 230 231 attributes = { 232 'connector_keys': pipe.connector_keys, 233 'metric_key': pipe.metric_key, 234 'location_key': pipe.location_key, 235 'parameters': parameters, 236 'pipe_id': pipe_id, 237 } 238 return attributes
Return the pipe's document from the internal pipes
collection.
Parameters
- pipe (mrsm.Pipe): The pipe whose attributes should be retrieved.
Returns
- The document that matches the keys of the pipe.
241def edit_pipe( 242 self, 243 pipe: mrsm.Pipe, 244 debug: bool = False, 245 **kwargs: Any 246) -> mrsm.SuccessTuple: 247 """ 248 Edit the attributes of the pipe. 249 250 Parameters 251 ---------- 252 pipe: mrsm.Pipe 253 The pipe whose in-memory parameters must be persisted. 254 255 Returns 256 ------- 257 A `SuccessTuple` indicating success. 258 """ 259 pipe_id = pipe.get_id(debug=debug) 260 if pipe_id is None: 261 return False, f"{pipe} is not registered." 262 263 parameters_key = get_pipe_parameters_key(pipe) 264 parameters_str = json.dumps(pipe.parameters, separators=(',', ':')) 265 self.set(parameters_key, parameters_str) 266 return True, "Success"
Edit the attributes of the pipe.
Parameters
- pipe (mrsm.Pipe): The pipe whose in-memory parameters must be persisted.
Returns
- A
SuccessTuple
indicating success.
269def pipe_exists( 270 self, 271 pipe: mrsm.Pipe, 272 debug: bool = False, 273 **kwargs: Any 274) -> bool: 275 """ 276 Check whether a pipe's target table exists. 277 278 Parameters 279 ---------- 280 pipe: mrsm.Pipe 281 The pipe to check whether its table exists. 282 283 Returns 284 ------- 285 A `bool` indicating the table exists. 286 """ 287 table_name = self.quote_table(pipe.target) 288 return self.client.exists(table_name) != 0
Check whether a pipe's target table exists.
Parameters
- pipe (mrsm.Pipe): The pipe to check whether its table exists.
Returns
- A
bool
indicating the table exists.
291def drop_pipe( 292 self, 293 pipe: mrsm.Pipe, 294 debug: bool = False, 295 **kwargs: Any 296) -> mrsm.SuccessTuple: 297 """ 298 Drop a pipe's collection if it exists. 299 300 Parameters 301 ---------- 302 pipe: mrsm.Pipe 303 The pipe to be dropped. 304 305 Returns 306 ------- 307 A `SuccessTuple` indicating success. 308 """ 309 for chunk_begin, chunk_end in pipe.get_chunk_bounds(debug=debug): 310 clear_chunk_success, clear_chunk_msg = pipe.clear( 311 begin=chunk_begin, 312 end=chunk_end, 313 debug=debug, 314 ) 315 if not clear_chunk_success: 316 return clear_chunk_success, clear_chunk_msg 317 try: 318 self.drop_table(pipe.target, debug=debug) 319 except Exception as e: 320 return False, f"Failed to drop {pipe}:\n{e}" 321 322 if 'valkey' not in pipe.parameters: 323 return True, "Success" 324 325 pipe.parameters['valkey']['dtypes'] = {} 326 if not pipe.temporary: 327 edit_success, edit_msg = pipe.edit(debug=debug) 328 if not edit_success: 329 return edit_success, edit_msg 330 331 return True, "Success"
Drop a pipe's collection if it exists.
Parameters
- pipe (mrsm.Pipe): The pipe to be dropped.
Returns
- A
SuccessTuple
indicating success.
334def delete_pipe( 335 self, 336 pipe: mrsm.Pipe, 337 debug: bool = False, 338 **kwargs: Any 339) -> mrsm.SuccessTuple: 340 """ 341 Delete a pipe's registration from the `pipes` collection. 342 343 Parameters 344 ---------- 345 pipe: mrsm.Pipe 346 The pipe to be deleted. 347 348 Returns 349 ------- 350 A `SuccessTuple` indicating success. 351 """ 352 drop_success, drop_message = pipe.drop(debug=debug) 353 if not drop_success: 354 return drop_success, drop_message 355 356 pipe_id = self.get_pipe_id(pipe, debug=debug) 357 if pipe_id is None: 358 return False, f"{pipe} is not registered." 359 360 pipe_key = get_pipe_key(pipe) 361 parameters_key = get_pipe_parameters_key(pipe) 362 self.client.delete(pipe_key) 363 self.client.delete(parameters_key) 364 df = self.read(PIPES_TABLE, params={'pipe_id': pipe_id}) 365 docs = df.to_dict(orient='records') 366 if docs: 367 doc = docs[0] 368 doc_str = json.dumps( 369 doc, 370 default=(lambda x: json_serialize_datetime(x) if hasattr(x, 'tzinfo') else str(x)), 371 separators=(',', ':'), 372 sort_keys=True, 373 ) 374 self.client.zrem(PIPES_TABLE, doc_str) 375 return True, "Success"
Delete a pipe's registration from the pipes
collection.
Parameters
- pipe (mrsm.Pipe): The pipe to be deleted.
Returns
- A
SuccessTuple
indicating success.
378def get_pipe_data( 379 self, 380 pipe: mrsm.Pipe, 381 select_columns: Optional[List[str]] = None, 382 omit_columns: Optional[List[str]] = None, 383 begin: Union[datetime, int, None] = None, 384 end: Union[datetime, int, None] = None, 385 params: Optional[Dict[str, Any]] = None, 386 debug: bool = False, 387 **kwargs: Any 388) -> Union['pd.DataFrame', None]: 389 """ 390 Query a pipe's target table and return the DataFrame. 391 392 Parameters 393 ---------- 394 pipe: mrsm.Pipe 395 The pipe with the target table from which to read. 396 397 select_columns: Optional[List[str]], default None 398 If provided, only select these given columns. 399 Otherwise select all available columns (i.e. `SELECT *`). 400 401 omit_columns: Optional[List[str]], default None 402 If provided, remove these columns from the selection. 403 404 begin: Union[datetime, int, None], default None 405 The earliest `datetime` value to search from (inclusive). 406 407 end: Union[datetime, int, None], default None 408 The lastest `datetime` value to search from (exclusive). 409 410 params: Optional[Dict[str, str]], default None 411 Additional filters to apply to the query. 412 413 Returns 414 ------- 415 The target table's data as a DataFrame. 416 """ 417 if not pipe.exists(debug=debug): 418 return None 419 420 from meerschaum.utils.dataframe import query_df, parse_df_datetimes 421 from meerschaum.utils.dtypes import are_dtypes_equal 422 423 valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {}) 424 dt_col = pipe.columns.get('datetime', None) 425 table_name = self.quote_table(pipe.target) 426 indices = [col for col in pipe.columns.values() if col] 427 ix_docs = [ 428 string_to_dict(doc.get('ix', '').replace(COLON, ':')) 429 for doc in self.read_docs( 430 pipe.target, 431 begin=begin, 432 end=end, 433 debug=debug, 434 ) 435 ] 436 try: 437 docs_strings = [ 438 self.get(get_document_key( 439 doc, indices, table_name 440 )) 441 for doc in ix_docs 442 ] 443 except Exception as e: 444 warn(f"Failed to fetch documents for {pipe}:\n{e}") 445 docs_strings = [] 446 447 docs = [ 448 json.loads(doc_str) 449 for doc_str in docs_strings 450 if doc_str 451 ] 452 ignore_dt_cols = [ 453 col 454 for col, dtype in pipe.dtypes.items() 455 if not are_dtypes_equal(str(dtype), 'datetime') 456 ] 457 458 df = parse_df_datetimes( 459 docs, 460 ignore_cols=ignore_dt_cols, 461 chunksize=kwargs.get('chunksize', None), 462 strip_timezone=(pipe.tzinfo is None), 463 debug=debug, 464 ) 465 for col, typ in valkey_dtypes.items(): 466 try: 467 df[col] = df[col].astype(typ) 468 except Exception: 469 pass 470 471 df = pipe.enforce_dtypes(df, debug=debug) 472 473 if len(df) == 0: 474 return query_df(df, select_columns=select_columns, omit_columns=omit_columns) 475 476 return query_df( 477 df, 478 select_columns=select_columns, 479 omit_columns=omit_columns, 480 params=params, 481 begin=begin, 482 end=end, 483 datetime_column=dt_col, 484 inplace=True, 485 reset_index=True, 486 )
Query a pipe's target table and return the DataFrame.
Parameters
- pipe (mrsm.Pipe): The pipe with the target table from which to read.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, None], default None):
The earliest
datetime
value to search from (inclusive). - end (Union[datetime, int, None], default None):
The lastest
datetime
value to search from (exclusive). - params (Optional[Dict[str, str]], default None): Additional filters to apply to the query.
Returns
- The target table's data as a DataFrame.
489def sync_pipe( 490 self, 491 pipe: mrsm.Pipe, 492 df: 'pd.DataFrame' = None, 493 check_existing: bool = True, 494 debug: bool = False, 495 **kwargs: Any 496) -> mrsm.SuccessTuple: 497 """ 498 Upsert new documents into the pipe's collection. 499 500 Parameters 501 ---------- 502 pipe: mrsm.Pipe 503 The pipe whose collection should receive the new documents. 504 505 df: Union['pd.DataFrame', Iterator['pd.DataFrame']], default None 506 The data to be synced. 507 508 check_existing: bool, default True 509 If `False`, do not check the documents against existing data and instead insert directly. 510 511 Returns 512 ------- 513 A `SuccessTuple` indicating success. 514 """ 515 from meerschaum.utils.dtypes import are_dtypes_equal 516 dt_col = pipe.columns.get('datetime', None) 517 indices = [col for col in pipe.columns.values() if col] 518 table_name = self.quote_table(pipe.target) 519 is_dask = 'dask' in df.__module__ 520 if is_dask: 521 df = df.compute() 522 upsert = pipe.parameters.get('upsert', False) 523 static = pipe.parameters.get('static', False) 524 525 def _serialize_indices_docs(_docs): 526 return [ 527 { 528 'ix': get_document_key(doc, indices), 529 **( 530 { 531 dt_col: doc.get(dt_col, 0) 532 } 533 if dt_col 534 else {} 535 ) 536 } 537 for doc in _docs 538 ] 539 540 valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {}) 541 new_dtypes = { 542 str(key): ( 543 str(val) 544 if not are_dtypes_equal(str(val), 'datetime') 545 else 'datetime64[ns, UTC]' 546 ) 547 for key, val in df.dtypes.items() 548 if str(key) not in valkey_dtypes 549 } 550 for col, typ in {c: v for c, v in valkey_dtypes.items()}.items(): 551 if col in df.columns: 552 try: 553 df[col] = df[col].astype(typ) 554 except Exception: 555 valkey_dtypes[col] = 'string' 556 new_dtypes[col] = 'string' 557 df[col] = df[col].astype('string') 558 559 if new_dtypes and (not static or not valkey_dtypes): 560 valkey_dtypes.update(new_dtypes) 561 if 'valkey' not in pipe.parameters: 562 pipe.parameters['valkey'] = {} 563 pipe.parameters['valkey']['dtypes'] = valkey_dtypes 564 if not pipe.temporary: 565 edit_success, edit_msg = pipe.edit(debug=debug) 566 if not edit_success: 567 return edit_success, edit_msg 568 569 unseen_df, update_df, delta_df = ( 570 pipe.filter_existing(df, include_unchanged_columns=True, debug=debug) 571 if check_existing and not upsert 572 else (None, df, df) 573 ) 574 num_insert = len(unseen_df) if unseen_df is not None else 0 575 num_update = len(update_df) if update_df is not None else 0 576 msg = ( 577 f"Inserted {num_insert}, updated {num_update} rows." 578 if not upsert 579 else f"Upserted {num_update} rows." 580 ) 581 if len(delta_df) == 0: 582 return True, msg 583 584 unseen_docs = unseen_df.to_dict(orient='records') if unseen_df is not None else [] 585 unseen_indices_docs = _serialize_indices_docs(unseen_docs) 586 unseen_ix_vals = { 587 get_document_key(doc, indices, table_name): serialize_document(doc) 588 for doc in unseen_docs 589 } 590 for key, val in unseen_ix_vals.items(): 591 try: 592 self.set(key, val) 593 except Exception as e: 594 return False, f"Failed to set keys for {pipe}:\n{e}" 595 596 try: 597 self.push_docs( 598 unseen_indices_docs, 599 pipe.target, 600 datetime_column=dt_col, 601 debug=debug, 602 ) 603 except Exception as e: 604 return False, f"Failed to push docs to '{pipe.target}':\n{e}" 605 606 update_docs = update_df.to_dict(orient='records') if update_df is not None else [] 607 update_ix_docs = { 608 get_document_key(doc, indices, table_name): doc 609 for doc in update_docs 610 } 611 existing_docs_data = { 612 key: self.get(key) 613 for key in update_ix_docs 614 } if pipe.exists(debug=debug) else {} 615 existing_docs = { 616 key: json.loads(data) 617 for key, data in existing_docs_data.items() 618 if data 619 } 620 new_update_docs = { 621 key: doc 622 for key, doc in update_ix_docs.items() 623 if key not in existing_docs 624 } 625 new_ix_vals = { 626 get_document_key(doc, indices, table_name): serialize_document(doc) 627 for doc in new_update_docs.values() 628 } 629 for key, val in new_ix_vals.items(): 630 try: 631 self.set(key, val) 632 except Exception as e: 633 return False, f"Failed to set keys for {pipe}:\n{e}" 634 635 old_update_docs = { 636 key: { 637 **existing_docs[key], 638 **doc 639 } 640 for key, doc in update_ix_docs.items() 641 if key in existing_docs 642 } 643 new_indices_docs = _serialize_indices_docs([doc for doc in new_update_docs.values()]) 644 try: 645 if new_indices_docs: 646 self.push_docs( 647 new_indices_docs, 648 pipe.target, 649 datetime_column=dt_col, 650 debug=debug, 651 ) 652 except Exception as e: 653 return False, f"Failed to upsert '{pipe.target}':\n{e}" 654 655 for key, doc in old_update_docs.items(): 656 try: 657 self.set(key, serialize_document(doc)) 658 except Exception as e: 659 return False, f"Failed to set keys for {pipe}:\n{e}" 660 661 return True, msg
Upsert new documents into the pipe's collection.
Parameters
- pipe (mrsm.Pipe): The pipe whose collection should receive the new documents.
- df (Union['pd.DataFrame', Iterator['pd.DataFrame']], default None): The data to be synced.
- check_existing (bool, default True):
If
False
, do not check the documents against existing data and instead insert directly.
Returns
- A
SuccessTuple
indicating success.
664def get_pipe_columns_types( 665 self, 666 pipe: mrsm.Pipe, 667 debug: bool = False, 668 **kwargs: Any 669) -> Dict[str, str]: 670 """ 671 Return the data types for the columns in the target table for data type enforcement. 672 673 Parameters 674 ---------- 675 pipe: mrsm.Pipe 676 The pipe whose target table contains columns and data types. 677 678 Returns 679 ------- 680 A dictionary mapping columns to data types. 681 """ 682 if not pipe.exists(debug=debug): 683 return {} 684 685 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 686 return { 687 col: get_db_type_from_pd_type(typ, flavor='postgresql') 688 for col, typ in pipe.parameters.get('valkey', {}).get('dtypes', {}).items() 689 }
Return the data types for the columns in the target table for data type enforcement.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table contains columns and data types.
Returns
- A dictionary mapping columns to data types.
692def clear_pipe( 693 self, 694 pipe: mrsm.Pipe, 695 begin: Union[datetime, int, None] = None, 696 end: Union[datetime, int, None] = None, 697 params: Optional[Dict[str, Any]] = None, 698 debug: bool = False, 699) -> mrsm.SuccessTuple: 700 """ 701 Delete rows within `begin`, `end`, and `params`. 702 703 Parameters 704 ---------- 705 pipe: mrsm.Pipe 706 The pipe whose rows to clear. 707 708 begin: Union[datetime, int, None], default None 709 If provided, remove rows >= `begin`. 710 711 end: Union[datetime, int, None], default None 712 If provided, remove rows < `end`. 713 714 params: Optional[Dict[str, Any]], default None 715 If provided, only remove rows which match the `params` filter. 716 717 Returns 718 ------- 719 A `SuccessTuple` indicating success. 720 """ 721 dt_col = pipe.columns.get('datetime', None) 722 723 existing_df = pipe.get_data( 724 begin=begin, 725 end=end, 726 params=params, 727 debug=debug, 728 ) 729 if existing_df is None or len(existing_df) == 0: 730 return True, "Deleted 0 rows." 731 732 docs = existing_df.to_dict(orient='records') 733 table_name = self.quote_table(pipe.target) 734 indices = [col for col in pipe.columns.values() if col] 735 for doc in docs: 736 set_doc_key = get_document_key(doc, indices) 737 table_doc_key = get_document_key(doc, indices, table_name) 738 try: 739 if dt_col: 740 self.client.zrem(table_name, set_doc_key) 741 else: 742 self.client.srem(table_name, set_doc_key) 743 self.client.delete(table_doc_key) 744 except Exception as e: 745 return False, f"Failed to delete documents:\n{e}" 746 msg = ( 747 f"Deleted {len(docs)} row" 748 + ('s' if len(docs) != 1 else '') 749 + '.' 750 ) 751 return True, msg
Delete rows within begin
, end
, and params
.
Parameters
- pipe (mrsm.Pipe): The pipe whose rows to clear.
- begin (Union[datetime, int, None], default None):
If provided, remove rows >=
begin
. - end (Union[datetime, int, None], default None):
If provided, remove rows <
end
. - params (Optional[Dict[str, Any]], default None):
If provided, only remove rows which match the
params
filter.
Returns
- A
SuccessTuple
indicating success.
754def get_sync_time( 755 self, 756 pipe: mrsm.Pipe, 757 newest: bool = True, 758 **kwargs: Any 759) -> Union[datetime, int, None]: 760 """ 761 Return the newest (or oldest) timestamp in a pipe. 762 """ 763 from meerschaum.utils.dtypes import are_dtypes_equal 764 dt_col = pipe.columns.get('datetime', None) 765 dt_typ = pipe.dtypes.get(dt_col, 'datetime64[ns, UTC]') 766 if not dt_col: 767 return None 768 769 dateutil_parser = mrsm.attempt_import('dateutil.parser') 770 table_name = self.quote_table(pipe.target) 771 try: 772 vals = ( 773 self.client.zrevrange(table_name, 0, 0) 774 if newest 775 else self.client.zrange(table_name, 0, 0) 776 ) 777 if not vals: 778 return None 779 val = vals[0] 780 except Exception: 781 return None 782 783 doc = json.loads(val) 784 dt_val = doc.get(dt_col, None) 785 if dt_val is None: 786 return None 787 788 try: 789 return ( 790 int(dt_val) 791 if are_dtypes_equal(dt_typ, 'int') 792 else dateutil_parser.parse(str(dt_val)) 793 ) 794 except Exception as e: 795 warn(f"Failed to parse sync time for {pipe}:\n{e}") 796 797 return None
Return the newest (or oldest) timestamp in a pipe.
800def get_pipe_rowcount( 801 self, 802 pipe: mrsm.Pipe, 803 begin: Union[datetime, int, None] = None, 804 end: Union[datetime, int, None] = None, 805 params: Optional[Dict[str, Any]] = None, 806 debug: bool = False, 807 **kwargs: Any 808) -> Union[int, None]: 809 """ 810 Return the number of documents in the pipe's set. 811 """ 812 dt_col = pipe.columns.get('datetime', None) 813 table_name = self.quote_table(pipe.target) 814 815 if not pipe.exists(): 816 return 0 817 818 try: 819 if begin is None and end is None and params is None: 820 return ( 821 self.client.zcard(table_name) 822 if dt_col 823 else self.client.llen(table_name) 824 ) 825 except Exception: 826 return None 827 828 df = pipe.get_data(begin=begin, end=end, params=params, debug=debug) 829 if df is None: 830 return 0 831 832 return len(df)
Return the number of documents in the pipe's set.
835def fetch_pipes_keys( 836 self, 837 connector_keys: Optional[List[str]] = None, 838 metric_keys: Optional[List[str]] = None, 839 location_keys: Optional[List[str]] = None, 840 tags: Optional[List[str]] = None, 841 params: Optional[Dict[str, Any]] = None, 842 debug: bool = False 843) -> Optional[List[Tuple[str, str, Optional[str]]]]: 844 """ 845 Return the keys for the registered pipes. 846 """ 847 from meerschaum.utils.dataframe import query_df 848 from meerschaum.utils.misc import separate_negation_values 849 try: 850 df = self.read(PIPES_TABLE, debug=debug) 851 except Exception: 852 return [] 853 854 if df is None or len(df) == 0: 855 return [] 856 857 query = {} 858 if connector_keys: 859 query['connector_keys'] = [str(k) for k in connector_keys] 860 if metric_keys: 861 query['metric_key'] = [str(k) for k in metric_keys] 862 if location_keys: 863 query['location_key'] = [str(k) for k in location_keys] 864 if params: 865 query.update(params) 866 867 df = query_df(df, query, inplace=True) 868 869 keys = [ 870 ( 871 doc['connector_keys'], 872 doc['metric_key'], 873 doc['location_key'], 874 ) 875 for doc in df.to_dict(orient='records') 876 ] 877 if not tags: 878 return keys 879 880 tag_groups = [tag.split(',') for tag in tags] 881 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 882 883 filtered_keys = [] 884 for ck, mk, lk in keys: 885 pipe = mrsm.Pipe(ck, mk, lk, instance=self) 886 pipe_tags = set(pipe.tags) 887 888 include_pipe = True 889 for in_tags, ex_tags in in_ex_tag_groups: 890 all_in = all(tag in pipe_tags for tag in in_tags) 891 any_ex = any(tag in pipe_tags for tag in ex_tags) 892 893 if (not all_in) or any_ex: 894 include_pipe = False 895 continue 896 897 if include_pipe: 898 filtered_keys.append((ck, mk, lk)) 899 900 return filtered_keys
Return the keys for the registered pipes.
17def fetch( 18 self, 19 pipe: mrsm.Pipe, 20 begin: Union[datetime, int, None] = None, 21 end: Union[datetime, int, None] = None, 22 params: Optional[Dict[str, Any]] = None, 23 debug: bool = False, 24 **kwargs: Any 25) -> List[Dict[str, Any]]: 26 """ 27 Return data from a source database. 28 """ 29 source_key = pipe.parameters.get('valkey', {}).get('key', None) 30 if not source_key: 31 return [] 32 33 try: 34 key_type = self.client.type(source_key).decode('utf-8') 35 except Exception: 36 warn(f"Could not determine the type for key '{source_key}'.") 37 return [] 38 39 begin_ts = ( 40 ( 41 int(begin.replace(tzinfo=timezone.utc).timestamp()) 42 if isinstance(begin, datetime) 43 else int(begin) 44 ) 45 if begin is not None else '-inf' 46 ) 47 end_ts = ( 48 ( 49 int(end.replace(tzinfo=timezone.utc).timestamp()) 50 if isinstance(end, datetime) 51 else int(end) 52 ) 53 if end is not None else '+inf' 54 ) 55 56 if debug: 57 dprint(f"Reading documents with {begin_ts=}, {end_ts=}") 58 59 if key_type == 'set': 60 return [ 61 json.loads(doc_bytes.decode('utf-8')) 62 for doc_bytes in self.client.smembers(source_key) 63 ] 64 65 if key_type == 'zset': 66 return [ 67 json.loads(doc_bytes.decode('utf-8')) 68 for doc_bytes in self.client.zrangebyscore( 69 source_key, 70 begin_ts, 71 end_ts, 72 withscores=False, 73 ) 74 ] 75 76 return [{source_key: self.get(source_key)}]
Return data from a source database.
20def get_users_pipe(self): 21 """ 22 Return the pipe which stores the registered users. 23 """ 24 return mrsm.Pipe( 25 'mrsm', 'users', 26 columns=['user_id'], 27 temporary=True, 28 target=USERS_TABLE, 29 instance=self, 30 )
Return the pipe which stores the registered users.
33@classmethod 34def get_user_key(cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str: 35 """ 36 Return the key to store metadata about a user. 37 38 Parameters 39 ---------- 40 user_id_or_username: str 41 The user ID or username of the given user. 42 If `by_username` is `True`, then provide the username. 43 44 sub_key: str 45 The key suffix, e.g. `'attributes'`. 46 47 by_username: bool, default False 48 If `True`, then treat `user_id_or_username` as a username. 49 50 Returns 51 ------- 52 A key to store information about a user. 53 54 Examples 55 -------- 56 >>> get_user_key('deadbeef', 'attributes') 57 'mrsm_user:user_id:deadbeef:attributes' 58 >>> get_user_key('foo', 'user_id', by_username=True) 59 'mrsm_user:username:foo:user_id' 60 """ 61 key_type = 'user_id' if not by_username else 'username' 62 return cls.get_entity_key(USER_PREFIX, key_type, user_id_or_username, sub_key)
Return the key to store metadata about a user.
Parameters
- user_id_or_username (str):
The user ID or username of the given user.
If
by_username
isTrue
, then provide the username. - sub_key (str):
The key suffix, e.g.
'attributes'
. - by_username (bool, default False):
If
True
, then treatuser_id_or_username
as a username.
Returns
- A key to store information about a user.
Examples
>>> get_user_key('deadbeef', 'attributes')
'mrsm_user:user_id:deadbeef:attributes'
>>> get_user_key('foo', 'user_id', by_username=True)
'mrsm_user:username:foo:user_id'
65@classmethod 66def get_user_keys_vals( 67 cls, 68 user: 'mrsm.core.User', 69 mutable_only: bool = False, 70) -> Dict[str, str]: 71 """ 72 Return a dictionary containing keys and values to set for the user. 73 74 Parameters 75 ---------- 76 user: mrsm.core.User 77 The user for which to generate the keys. 78 79 mutable_only: bool, default False 80 If `True`, only return keys which may be edited. 81 82 Returns 83 ------- 84 A dictionary mapping a user's keys to values. 85 """ 86 user_attributes_str = json.dumps(user.attributes, separators=(',', ':')) 87 mutable_keys_vals = { 88 cls.get_user_key(user.user_id, 'attributes'): user_attributes_str, 89 cls.get_user_key(user.user_id, 'email'): user.email, 90 cls.get_user_key(user.user_id, 'type'): user.type, 91 cls.get_user_key(user.user_id, 'password_hash'): user.password_hash, 92 } 93 if mutable_only: 94 return mutable_keys_vals 95 96 immutable_keys_vals = { 97 cls.get_user_key(user.user_id, 'username'): user.username, 98 cls.get_user_key(user.username, 'user_id', by_username=True): user.user_id, 99 } 100 101 return {**immutable_keys_vals, **mutable_keys_vals}
Return a dictionary containing keys and values to set for the user.
Parameters
- user (mrsm.core.User): The user for which to generate the keys.
- mutable_only (bool, default False):
If
True
, only return keys which may be edited.
Returns
- A dictionary mapping a user's keys to values.
104def register_user( 105 self, 106 user: 'mrsm.core.User', 107 debug: bool = False, 108 **kwargs: Any 109) -> SuccessTuple: 110 """ 111 Register a new user. 112 """ 113 from meerschaum.utils.misc import generate_password 114 115 user.user_id = generate_password(12) 116 users_pipe = self.get_users_pipe() 117 keys_vals = self.get_user_keys_vals(user) 118 119 try: 120 sync_success, sync_msg = users_pipe.sync( 121 [ 122 { 123 'user_id': user.user_id, 124 'username': user.username, 125 }, 126 ], 127 check_existing=False, 128 debug=debug, 129 ) 130 if not sync_success: 131 return sync_success, sync_msg 132 133 for key, val in keys_vals.items(): 134 if val is not None: 135 self.set(key, val) 136 137 success, msg = True, "Success" 138 except Exception as e: 139 success = False 140 import traceback 141 traceback.print_exc() 142 msg = f"Failed to register '{user.username}':\n{e}" 143 144 if not success: 145 for key in keys_vals: 146 try: 147 self.client.delete(key) 148 except Exception: 149 pass 150 151 return success, msg
Register a new user.
154def get_user_id(self, user: 'mrsm.core.User', debug: bool = False) -> Union[str, None]: 155 """ 156 Return the ID for a user, or `None`. 157 """ 158 username_user_id_key = self.get_user_key(user.username, 'user_id', by_username=True) 159 try: 160 user_id = self.get(username_user_id_key) 161 except Exception: 162 user_id = None 163 return user_id
Return the ID for a user, or None
.
166def edit_user( 167 self, 168 user: 'mrsm.core.User', 169 debug: bool = False, 170 **kw: Any 171) -> SuccessTuple: 172 """ 173 Edit the attributes for an existing user. 174 """ 175 keys_vals = self.get_user_keys_vals(user, mutable_only=True) 176 try: 177 old_keys_vals = { 178 key: self.get(key) 179 for key in keys_vals 180 } 181 except Exception as e: 182 return False, f"Failed to edit user:\n{e}" 183 184 try: 185 for key, val in keys_vals.items(): 186 self.set(key, val) 187 success, msg = True, "Success" 188 except Exception as e: 189 success = False 190 msg = f"Failed to edit user:\n{e}" 191 192 if not success: 193 try: 194 for key, old_val in old_keys_vals.items(): 195 self.set(key, old_val) 196 except Exception: 197 pass 198 199 return success, msg
Edit the attributes for an existing user.
202def get_user_attributes( 203 self, 204 user: 'mrsm.core.User', 205 debug: bool = False 206) -> Union[Dict[str, Any], None]: 207 """ 208 Return the user's attributes. 209 """ 210 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 211 user_id_attributes_key = self.get_user_key(user_id, 'attributes') 212 try: 213 return json.loads(self.get(user_id_attributes_key)) 214 except Exception: 215 return None
Return the user's attributes.
218def delete_user( 219 self, 220 user: 'mrsm.core.User', 221 debug: bool = False 222) -> SuccessTuple: 223 """ 224 Delete a user's keys. 225 """ 226 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 227 users_pipe = self.get_users_pipe() 228 keys_vals = self.get_user_keys_vals(user) 229 try: 230 old_keys_vals = { 231 key: self.get(key) 232 for key in keys_vals 233 } 234 except Exception as e: 235 return False, f"Failed to delete user:\n{e}" 236 237 clear_success, clear_msg = users_pipe.clear(params={'user_id': user_id}) 238 if not clear_success: 239 return clear_success, clear_msg 240 241 try: 242 for key in keys_vals: 243 self.client.delete(key) 244 success, msg = True, "Success" 245 except Exception as e: 246 success = False 247 msg = f"Failed to delete user:\n{e}" 248 249 if not success: 250 try: 251 for key, old_val in old_keys_vals.items(): 252 self.set(key, old_val) 253 except Exception: 254 pass 255 256 return success, msg
Delete a user's keys.
259def get_users( 260 self, 261 debug: bool = False, 262 **kw: Any 263) -> List[str]: 264 """ 265 Get the registered usernames. 266 """ 267 users_pipe = self.get_users_pipe() 268 df = users_pipe.get_data() 269 if df is None: 270 return [] 271 272 return list(df['username'])
Get the registered usernames.
275def get_user_password_hash( 276 self, 277 user: 'mrsm.core.User', 278 debug: bool = False, 279 **kw: Any 280) -> Union[str, None]: 281 """ 282 Return the password has for a user. 283 """ 284 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 285 user_id_password_hash_key = self.get_user_key(user_id, 'password_hash') 286 try: 287 return self.get(user_id_password_hash_key) 288 except Exception: 289 return None
Return the password has for a user.
292def get_user_type( 293 self, 294 user: 'mrsm.core.User', 295 debug: bool = False, 296 **kw: Any 297) -> Union[str, None]: 298 """ 299 Return the user's type. 300 """ 301 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 302 user_id_type_key = self.get_user_key(user_id, 'type') 303 try: 304 return self.get(user_id_type_key) 305 except Exception: 306 return None
Return the user's type.
20def get_plugins_pipe(self) -> mrsm.Pipe: 21 """ 22 Return the pipe to store the plugins. 23 """ 24 return mrsm.Pipe( 25 'mrsm', 'plugins', 26 columns=['plugin_name'], 27 temporary=True, 28 target=PLUGINS_TABLE, 29 instance=self, 30 )
Return the pipe to store the plugins.
33@classmethod 34def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str: 35 """ 36 Return the key for a plugin's attribute. 37 """ 38 return cls.get_entity_key(PLUGIN_PREFIX, plugin_name, sub_key)
Return the key for a plugin's attribute.
41@classmethod 42def get_plugin_keys_vals( 43 cls, 44 plugin: 'mrsm.core.Plugin', 45 mutable_only: bool = False, 46) -> Dict[str, str]: 47 """ 48 Return a dictionary containing keys and values to set for the plugin. 49 50 Parameters 51 ---------- 52 plugin: mrsm.core.Plugin 53 The plugin for which to generate the keys. 54 55 mutable_only: bool, default False 56 If `True`, only return keys which may be edited. 57 58 Returns 59 ------- 60 A dictionary mapping a plugins's keys to values. 61 """ 62 plugin_attributes_str = json.dumps(plugin.attributes, separators=(',', ':')) 63 mutable_keys_vals = { 64 cls.get_plugin_key(plugin.name, 'attributes'): plugin_attributes_str, 65 cls.get_plugin_key(plugin.name, 'version'): plugin.version, 66 } 67 if mutable_only: 68 return mutable_keys_vals 69 70 immutable_keys_vals = { 71 cls.get_plugin_key(plugin.name, 'user_id'): plugin.user_id, 72 } 73 74 return {**immutable_keys_vals, **mutable_keys_vals}
Return a dictionary containing keys and values to set for the plugin.
Parameters
- plugin (mrsm.core.Plugin): The plugin for which to generate the keys.
- mutable_only (bool, default False):
If
True
, only return keys which may be edited.
Returns
- A dictionary mapping a plugins's keys to values.
77def register_plugin( 78 self, 79 plugin: 'mrsm.core.Plugin', 80 force: bool = False, 81 debug: bool = False, 82 **kw: Any 83) -> SuccessTuple: 84 """Register a new plugin to the `mrsm_plugins` "table".""" 85 from meerschaum.utils.misc import generate_password 86 87 plugins_pipe = self.get_plugins_pipe() 88 keys_vals = self.get_plugin_keys_vals(plugin) 89 90 try: 91 sync_success, sync_msg = plugins_pipe.sync( 92 [ 93 { 94 'plugin_name': plugin.name, 95 'user_id': plugin.user_id, 96 }, 97 ], 98 check_existing=False, 99 debug=debug, 100 ) 101 if not sync_success: 102 return sync_success, sync_msg 103 104 for key, val in keys_vals.items(): 105 if val is not None: 106 self.set(key, val) 107 108 success, msg = True, "Success" 109 except Exception as e: 110 success = False 111 msg = f"Failed to register plugin '{plugin.name}':\n{e}" 112 113 if not success: 114 for key in keys_vals: 115 try: 116 self.client.delete(key) 117 except Exception: 118 pass 119 120 return success, msg
Register a new plugin to the mrsm_plugins
"table".
123def get_plugin_id( 124 self, 125 plugin: 'mrsm.core.Plugin', 126 debug: bool = False 127) -> Union[str, None]: 128 """ 129 Return a plugin's ID. 130 """ 131 return plugin.name
Return a plugin's ID.
134def get_plugin_version( 135 self, 136 plugin: 'mrsm.core.Plugin', 137 debug: bool = False, 138) -> Union[str, None]: 139 """ 140 Return a plugin's version. 141 """ 142 version_key = self.get_plugin_key(plugin.name, 'version') 143 144 try: 145 return self.get(version_key) 146 except Exception: 147 return None
Return a plugin's version.
150def get_plugin_user_id( 151 self, 152 plugin: 'mrsm.core.Plugin', 153 debug: bool = False 154) -> Union[str, None]: 155 """ 156 Return a plugin's user ID. 157 """ 158 user_id_key = self.get_plugin_key(plugin.name, 'user_id') 159 160 try: 161 return self.get(user_id_key) 162 except Exception: 163 return None
Return a plugin's user ID.
166def get_plugin_username( 167 self, 168 plugin: 'mrsm.core.Plugin', 169 debug: bool = False 170) -> Union[str]: 171 """ 172 Return the username of a plugin's owner. 173 """ 174 user_id = self.get_plugin_user_id(plugin, debug=debug) 175 if user_id is None: 176 return None 177 178 username_key = self.get_user_key(user_id, 'username') 179 try: 180 return self.get(username_key) 181 except Exception: 182 return None
Return the username of a plugin's owner.
185def get_plugin_attributes( 186 self, 187 plugin: 'mrsm.core.Plugin', 188 debug: bool = False 189) -> Dict[str, Any]: 190 """ 191 Return the attributes of a plugin. 192 """ 193 attributes_key = self.get_plugin_key(plugin.name, 'attributes') 194 try: 195 attributes_str = self.get(attributes_key) 196 if not attributes_str: 197 return {} 198 return json.loads(attributes_str) 199 except Exception: 200 return {}
Return the attributes of a plugin.
203def get_plugins( 204 self, 205 user_id: Optional[int] = None, 206 search_term: Optional[str] = None, 207 debug: bool = False, 208 **kw: Any 209) -> List[str]: 210 """ 211 Return a list of plugin names. 212 """ 213 plugins_pipe = self.get_plugins_pipe() 214 params = {} 215 if user_id: 216 params['user_id'] = user_id 217 218 df = plugins_pipe.get_data(['plugin_name'], params=params, debug=debug) 219 docs = df.to_dict(orient='records') 220 221 return [ 222 doc['plugin_name'] 223 for doc in docs 224 if (plugin_name := doc['plugin_name']).startswith(search_term or '') 225 ]
Return a list of plugin names.
228def delete_plugin( 229 self, 230 plugin: 'mrsm.core.Plugin', 231 debug: bool = False, 232 **kw: Any 233) -> SuccessTuple: 234 """ 235 Delete a plugin from the plugins table. 236 """ 237 plugins_pipe = self.get_plugins_pipe() 238 clear_success, clear_msg = plugins_pipe.clear(params={'plugin_name': plugin.name}, debug=debug) 239 if not clear_success: 240 return clear_success, clear_msg 241 242 keys_vals = self.get_plugin_keys_vals(plugin) 243 try: 244 old_keys_vals = { 245 key: self.get(key) 246 for key in keys_vals 247 } 248 except Exception as e: 249 return False, f"Failed to delete plugin '{plugin.name}':\n{e}" 250 251 try: 252 for key in keys_vals: 253 self.client.delete(key) 254 success, msg = True, "Success" 255 except Exception as e: 256 success = False 257 msg = f"Failed to delete plugin '{plugin.name}':\n{e}" 258 259 if not success: 260 try: 261 for key, old_val in old_keys_vals.items(): 262 self.set(key, old_val) 263 except Exception: 264 pass 265 266 return success, msg
Delete a plugin from the plugins table.