meerschaum.connectors.valkey

Import the ValkeyConnector.

 1#! /usr/bin/env python3
 2# vim:fenc=utf-8
 3
 4"""
 5Import the `ValkeyConnector`.
 6"""
 7
 8from meerschaum.connectors.valkey._ValkeyConnector import ValkeyConnector
 9
10__all__ = ('ValkeyConnector',)
@make_connector
class ValkeyConnector(meerschaum.connectors.instance._InstanceConnector.InstanceConnector):
 19@make_connector
 20class ValkeyConnector(InstanceConnector):
 21    """
 22    Manage a Valkey instance.
 23
 24    Build a `ValkeyConnector` from connection attributes or a URI string.
 25    """
 26    REQUIRED_ATTRIBUTES: List[str] = ['host']
 27    OPTIONAL_ATTRIBUTES: List[str] = [
 28        'port', 'username', 'password', 'db', 'socket_timeout',
 29    ]
 30    DEFAULT_ATTRIBUTES: Dict[str, Any] = {
 31        'username': 'default',
 32        'port': 6379,
 33        'db': 0,
 34        'socket_timeout': 300,
 35    }
 36    KEY_SEPARATOR: str = ':'
 37
 38    from ._pipes import (
 39        register_pipe,
 40        get_pipe_id,
 41        get_pipe_attributes,
 42        edit_pipe,
 43        pipe_exists,
 44        drop_pipe,
 45        delete_pipe,
 46        get_pipe_data,
 47        sync_pipe,
 48        get_pipe_columns_types,
 49        clear_pipe,
 50        get_sync_time,
 51        get_pipe_rowcount,
 52        fetch_pipes_keys,
 53        get_document_key,
 54        get_table_quoted_doc_key,
 55    )
 56    from ._fetch import (
 57        fetch,
 58    )
 59
 60    from ._users import (
 61        get_users_pipe,
 62        get_user_key,
 63        get_user_keys_vals,
 64        register_user,
 65        get_user_id,
 66        edit_user,
 67        get_user_attributes,
 68        delete_user,
 69        get_users,
 70        get_user_password_hash,
 71        get_user_type,
 72    )
 73    from ._plugins import (
 74        get_plugins_pipe,
 75        get_plugin_key,
 76        get_plugin_keys_vals,
 77        register_plugin,
 78        get_plugin_id,
 79        get_plugin_version,
 80        get_plugin_user_id,
 81        get_plugin_username,
 82        get_plugin_attributes,
 83        delete_plugin,
 84    )
 85
 86    @property
 87    def client(self):
 88        """
 89        Return the Valkey client.
 90        """
 91        if '_client' in self.__dict__:
 92            return self.__dict__['_client']
 93
 94        valkey = mrsm.attempt_import('valkey')
 95
 96        if 'uri' in self.__dict__:
 97            self._client = valkey.Valkey.from_url(self.__dict__.get('uri'))
 98            return self._client
 99
100        optional_kwargs = {
101            key: self.__dict__.get(key)
102            for key in self.OPTIONAL_ATTRIBUTES
103            if key in self.__dict__
104        }
105        connection_kwargs = {
106            'host': self.host,
107            **optional_kwargs
108        }
109
110        self._client = valkey.Valkey(**connection_kwargs)
111        return self._client
112
113    @property
114    def URI(self) -> str:
115        """
116        Return the connection URI for this connector.
117        """
118        import urllib.parse
119
120        if 'uri' in self.__dict__:
121            return self.__dict__.get('uri')
122
123        uri = "valkey://"
124        if 'username' in self.__dict__:
125            uri += urllib.parse.quote_plus(self.username) + ':'
126
127        if 'password' in self.__dict__:
128            uri += urllib.parse.quote_plus(self.password) + '@'
129
130        if 'host' in self.__dict__:
131            uri += self.host
132
133        if 'port' in self.__dict__:
134            uri += f':{self.port}'
135
136        if 'db' in self.__dict__:
137            uri += f"/{self.db}"
138
139        if 'socket_timeout' in self.__dict__:
140            uri += f"?timeout={self.socket_timeout}s"
141
142        return uri
143
144    def set(self, key: str, value: Any, **kwargs: Any) -> bool:
145        """
146        Set the `key` to `value`.
147        """
148        return self.client.set(key, value, **kwargs)
149
150    def get(self, key: str, decode: bool = True) -> Union[str, None]:
151        """
152        Get the value for `key`.
153        """
154        val = self.client.get(key)
155        if val is None:
156            return None
157
158        return val.decode('utf-8') if decode else val
159
160    def test_connection(self) -> bool:
161        """
162        Return whether a connection may be established.
163        """
164        return self.client.ping()
165
166    @classmethod
167    def quote_table(cls, table: str) -> str:
168        """
169        Return a quoted key.
170        """
171        return shlex.quote(table)
172
173    @classmethod
174    def get_counter_key(cls, table: str) -> str:
175        """
176        Return the counter key for a given table.
177        """
178        table_name = cls.quote_table(table)
179        return f"{table_name}:counter"
180
181    def push_df(
182        self,
183        df: 'pd.DataFrame',
184        table: str,
185        datetime_column: Optional[str] = None,
186        debug: bool = False,
187    ) -> int:
188        """
189        Append a pandas DataFrame to a table.
190
191        Parameters
192        ----------
193        df: pd.DataFrame
194            The pandas DataFrame to append to the table.
195
196        table: str
197            The "table" name (root key).
198
199        datetime_column: Optional[str], default None
200            If provided, use this key as the datetime index.
201
202        Returns
203        -------
204        The current index counter value (how many docs have been pushed).
205        """
206        from meerschaum.utils.dataframe import to_json
207        docs_str = to_json(df)
208        docs = json.loads(docs_str)
209        return self.push_docs(
210            docs,
211            table,
212            datetime_column=datetime_column,
213            debug=debug,
214        )
215
216    def push_docs(
217        self,
218        docs: List[Dict[str, Any]],
219        table: str,
220        datetime_column: Optional[str] = None,
221        debug: bool = False,
222    ) -> int:
223        """
224        Append a list of documents to a table.
225
226        Parameters
227        ----------
228        docs: List[Dict[str, Any]]
229            The docs to be pushed.
230            All keys and values will be coerced into strings.
231
232        table: str
233            The "table" name (root key).
234
235        datetime_column: Optional[str], default None
236            If set, create a sorted set with this datetime column as the index.
237            Otherwise push the docs to a list.
238
239        Returns
240        -------
241        The current index counter value (how many docs have been pushed).
242        """
243        from meerschaum.utils.dtypes import json_serialize_value
244        table_name = self.quote_table(table)
245        datetime_column_key = self.get_datetime_column_key(table)
246        remote_datetime_column = self.get(datetime_column_key)
247        datetime_column = datetime_column or remote_datetime_column
248        dateutil_parser = mrsm.attempt_import('dateutil.parser')
249
250        old_len = (
251            self.client.zcard(table_name)
252            if datetime_column
253            else self.client.scard(table_name)
254        )
255        for doc in docs:
256            original_dt_val = (
257                doc[datetime_column]
258                if datetime_column and datetime_column in doc
259                else 0
260            )
261            dt_val = (
262                dateutil_parser.parse(str(original_dt_val))
263                if not isinstance(original_dt_val, int)
264                else int(original_dt_val)
265            ) if datetime_column else None
266            ts = (
267                int(dt_val.replace(tzinfo=timezone.utc).timestamp())
268                if isinstance(dt_val, datetime)
269                else int(dt_val)
270            ) if datetime_column else None
271            doc_str = json.dumps(
272                doc,
273                default=json_serialize_value,
274                separators=(',', ':'),
275                sort_keys=True,
276            )
277            if datetime_column:
278                self.client.zadd(table_name, {doc_str: ts})
279            else:
280                self.client.sadd(table_name, doc_str)
281
282        if datetime_column:
283            self.set(datetime_column_key, datetime_column)
284        new_len = (
285            self.client.zcard(table_name)
286            if datetime_column
287            else self.client.scard(table_name)
288        )
289
290        return new_len - old_len
291
292    def _push_hash_docs_to_list(self, docs: List[Dict[str, Any]], table: str) -> int:
293        table_name = self.quote_table(table)
294        next_ix = max(self.client.llen(table_name) or 0, 1)
295        for i, doc in enumerate(docs):
296            doc_key = f"{table_name}:{next_ix + i}"
297            self.client.hset(
298                doc_key,
299                mapping={
300                    str(k): str(v)
301                    for k, v in doc.items()
302                },
303            )
304            self.client.rpush(table_name, doc_key)
305
306        return next_ix + len(docs)
307
308    def get_datetime_column_key(self, table: str) -> str:
309        """
310        Return the key to store the datetime index for `table`.
311        """
312        table_name = self.quote_table(table)
313        return f'{table_name}:datetime_column'
314
315    def read(
316        self,
317        table: str,
318        begin: Union[datetime, int, str, None] = None,
319        end: Union[datetime, int, str, None] = None,
320        params: Optional[Dict[str, Any]] = None,
321        datetime_column: Optional[str] = None,
322        select_columns: Optional[List[str]] = None,
323        omit_columns: Optional[List[str]] = None,
324        debug: bool = False
325    ) -> Union['pd.DataFrame', None]:
326        """
327        Query the table and return the result dataframe.
328
329        Parameters
330        ----------
331        table: str
332            The "table" name to be queried.
333
334        begin: Union[datetime, int, str, None], default None
335            If provided, only return rows greater than or equal to this datetime.
336
337        end: Union[datetime, int, str, None], default None
338            If provided, only return rows older than this datetime.
339
340        params: Optional[Dict[str, Any]]
341            Additional Meerschaum filter parameters.
342
343        datetime_column: Optional[str], default None
344            If provided, use this column for the datetime index.
345            Otherwise infer from the table metadata.
346
347        select_columns: Optional[List[str]], default None
348            If provided, only return these columns.
349
350        omit_columns: Optional[List[str]], default None
351            If provided, do not include these columns in the result.
352
353        Returns
354        -------
355        A Pandas DataFrame of the result, or `None`.
356        """
357        from meerschaum.utils.dataframe import parse_df_datetimes, query_df
358        docs = self.read_docs(
359            table,
360            begin=begin,
361            end=end,
362            debug=debug,
363        )
364        df = parse_df_datetimes(docs)
365        datetime_column_key = self.get_datetime_column_key(table)
366        datetime_column = datetime_column or self.get(datetime_column_key)
367
368        return query_df(
369            df,
370            begin=(begin if datetime_column is not None else None),
371            end=(end if datetime_column is not None else None),
372            params=params,
373            datetime_column=datetime_column,
374            select_columns=select_columns,
375            omit_columns=omit_columns,
376            inplace=True,
377            reset_index=True,
378            debug=debug,
379        )
380
381    def read_docs(
382        self,
383        table: str,
384        begin: Union[datetime, int, str, None] = None,
385        end: Union[datetime, int, str, None] = None,
386        debug: bool = False,
387    ) -> List[Dict[str, str]]:
388        """
389        Return a list of previously pushed docs.
390
391        Parameters
392        ----------
393        table: str
394            The "table" name (root key) under which the docs were pushed.
395
396        begin: Union[datetime, int, str, None], default None
397            If provided and the table was created with a datetime index, only return documents
398            newer than this datetime.
399            If the table was not created with a datetime index and `begin` is an `int`,
400            return documents with a positional index greater than or equal to this value.
401
402        end: Union[datetime, int, str, None], default None
403            If provided and the table was created with a datetime index, only return documents
404            older than this datetime.
405            If the table was not created with a datetime index and `begin` is an `int`,
406            return documents with a positional index less than this value.
407
408        Returns
409        -------
410        A list of dictionaries, where all keys and values are strings.
411        """
412        from meerschaum.utils.dtypes import coerce_timezone
413        table_name = self.quote_table(table)
414        datetime_column_key = self.get_datetime_column_key(table)
415        datetime_column = self.get(datetime_column_key)
416
417        if debug:
418            dprint(f"Reading documents from '{table}' with {begin=}, {end=}")
419
420        if not datetime_column:
421            return [
422                json.loads(doc_bytes.decode('utf-8'))
423                for doc_bytes in self.client.smembers(table_name)
424            ]
425
426        dateutil_parser = mrsm.attempt_import('dateutil.parser')
427
428        if isinstance(begin, str):
429            begin = coerce_timezone(dateutil_parser.parse(begin))
430
431        if isinstance(end, str):
432            end = coerce_timezone(dateutil_parser.parse(end))
433
434        begin_ts = (
435            (
436                int(begin.replace(tzinfo=timezone.utc).timestamp())
437                if isinstance(begin, datetime)
438                else int(begin)
439            )
440            if begin is not None else '-inf'
441        )
442        end_ts = (
443            (
444                int(end.replace(tzinfo=timezone.utc).timestamp())
445                if isinstance(end, datetime)
446                else int(end)
447            )
448            if end is not None else '+inf'
449        )
450
451        if debug:
452            dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
453
454        return [
455            json.loads(doc_bytes.decode('utf-8'))
456            for doc_bytes in self.client.zrangebyscore(
457                table_name,
458                begin_ts,
459                end_ts,
460                withscores=False,
461            )
462        ]
463
464    def _read_docs_from_list(
465        self,
466        table: str,
467        begin_ix: Optional[int] = 0,
468        end_ix: Optional[int] = -1,
469        debug: bool = False,
470    ):
471        """
472        Read a list of documents from a "table".
473
474        Parameters
475        ----------
476        table: str
477            The "table" (root key) from which to read docs.
478
479        begin_ix: Optional[int], default 0
480            If provided, only read documents from this starting index.
481
482        end_ix: Optional[int], default -1
483            If provided, only read documents up to (not including) this index.
484
485        Returns
486        -------
487        A list of documents.
488        """
489        if begin_ix is None:
490            begin_ix = 0
491
492        if end_ix == 0:
493            return
494
495        if end_ix is None:
496            end_ix = -1
497        else:
498            end_ix -= 1
499
500        table_name = self.quote_table(table)
501        doc_keys = self.client.lrange(table_name, begin_ix, end_ix)
502        for doc_key in doc_keys:
503            yield {
504                key.decode('utf-8'): value.decode('utf-8')
505                for key, value in self.client.hgetall(doc_key).items()
506            }
507
508    def drop_table(self, table: str, debug: bool = False) -> None:
509        """
510        Drop a "table" of documents.
511
512        Parameters
513        ----------
514        table: str
515            The "table" name (root key) to be deleted.
516        """
517        table_name = self.quote_table(table)
518        datetime_column_key = self.get_datetime_column_key(table)
519        self.client.delete(table_name)
520        self.client.delete(datetime_column_key)
521
522    @classmethod
523    def get_entity_key(cls, *keys: Any) -> str:
524        """
525        Return a joined key to set an entity.
526        """
527        if not keys:
528            raise ValueError("No keys to be joined.")
529
530        for key in keys:
531            if cls.KEY_SEPARATOR in str(key):
532                raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.")
533
534        return cls.KEY_SEPARATOR.join([str(key) for key in keys])

Manage a Valkey instance.

Build a ValkeyConnector from connection attributes or a URI string.

REQUIRED_ATTRIBUTES: List[str] = ['host']
OPTIONAL_ATTRIBUTES: List[str] = ['port', 'username', 'password', 'db', 'socket_timeout']
DEFAULT_ATTRIBUTES: Dict[str, Any] = {'username': 'default', 'port': 6379, 'db': 0, 'socket_timeout': 300}
KEY_SEPARATOR: str = ':'
client
 86    @property
 87    def client(self):
 88        """
 89        Return the Valkey client.
 90        """
 91        if '_client' in self.__dict__:
 92            return self.__dict__['_client']
 93
 94        valkey = mrsm.attempt_import('valkey')
 95
 96        if 'uri' in self.__dict__:
 97            self._client = valkey.Valkey.from_url(self.__dict__.get('uri'))
 98            return self._client
 99
100        optional_kwargs = {
101            key: self.__dict__.get(key)
102            for key in self.OPTIONAL_ATTRIBUTES
103            if key in self.__dict__
104        }
105        connection_kwargs = {
106            'host': self.host,
107            **optional_kwargs
108        }
109
110        self._client = valkey.Valkey(**connection_kwargs)
111        return self._client

Return the Valkey client.

URI: str
113    @property
114    def URI(self) -> str:
115        """
116        Return the connection URI for this connector.
117        """
118        import urllib.parse
119
120        if 'uri' in self.__dict__:
121            return self.__dict__.get('uri')
122
123        uri = "valkey://"
124        if 'username' in self.__dict__:
125            uri += urllib.parse.quote_plus(self.username) + ':'
126
127        if 'password' in self.__dict__:
128            uri += urllib.parse.quote_plus(self.password) + '@'
129
130        if 'host' in self.__dict__:
131            uri += self.host
132
133        if 'port' in self.__dict__:
134            uri += f':{self.port}'
135
136        if 'db' in self.__dict__:
137            uri += f"/{self.db}"
138
139        if 'socket_timeout' in self.__dict__:
140            uri += f"?timeout={self.socket_timeout}s"
141
142        return uri

Return the connection URI for this connector.

def set(self, key: str, value: Any, **kwargs: Any) -> bool:
144    def set(self, key: str, value: Any, **kwargs: Any) -> bool:
145        """
146        Set the `key` to `value`.
147        """
148        return self.client.set(key, value, **kwargs)

Set the key to value.

def get(self, key: str, decode: bool = True) -> Optional[str]:
150    def get(self, key: str, decode: bool = True) -> Union[str, None]:
151        """
152        Get the value for `key`.
153        """
154        val = self.client.get(key)
155        if val is None:
156            return None
157
158        return val.decode('utf-8') if decode else val

Get the value for key.

def test_connection(self) -> bool:
160    def test_connection(self) -> bool:
161        """
162        Return whether a connection may be established.
163        """
164        return self.client.ping()

Return whether a connection may be established.

@classmethod
def quote_table(cls, table: str) -> str:
166    @classmethod
167    def quote_table(cls, table: str) -> str:
168        """
169        Return a quoted key.
170        """
171        return shlex.quote(table)

Return a quoted key.

@classmethod
def get_counter_key(cls, table: str) -> str:
173    @classmethod
174    def get_counter_key(cls, table: str) -> str:
175        """
176        Return the counter key for a given table.
177        """
178        table_name = cls.quote_table(table)
179        return f"{table_name}:counter"

Return the counter key for a given table.

def push_df( self, df: 'pd.DataFrame', table: str, datetime_column: Optional[str] = None, debug: bool = False) -> int:
181    def push_df(
182        self,
183        df: 'pd.DataFrame',
184        table: str,
185        datetime_column: Optional[str] = None,
186        debug: bool = False,
187    ) -> int:
188        """
189        Append a pandas DataFrame to a table.
190
191        Parameters
192        ----------
193        df: pd.DataFrame
194            The pandas DataFrame to append to the table.
195
196        table: str
197            The "table" name (root key).
198
199        datetime_column: Optional[str], default None
200            If provided, use this key as the datetime index.
201
202        Returns
203        -------
204        The current index counter value (how many docs have been pushed).
205        """
206        from meerschaum.utils.dataframe import to_json
207        docs_str = to_json(df)
208        docs = json.loads(docs_str)
209        return self.push_docs(
210            docs,
211            table,
212            datetime_column=datetime_column,
213            debug=debug,
214        )

Append a pandas DataFrame to a table.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to append to the table.
  • table (str): The "table" name (root key).
  • datetime_column (Optional[str], default None): If provided, use this key as the datetime index.
Returns
  • The current index counter value (how many docs have been pushed).
def push_docs( self, docs: List[Dict[str, Any]], table: str, datetime_column: Optional[str] = None, debug: bool = False) -> int:
216    def push_docs(
217        self,
218        docs: List[Dict[str, Any]],
219        table: str,
220        datetime_column: Optional[str] = None,
221        debug: bool = False,
222    ) -> int:
223        """
224        Append a list of documents to a table.
225
226        Parameters
227        ----------
228        docs: List[Dict[str, Any]]
229            The docs to be pushed.
230            All keys and values will be coerced into strings.
231
232        table: str
233            The "table" name (root key).
234
235        datetime_column: Optional[str], default None
236            If set, create a sorted set with this datetime column as the index.
237            Otherwise push the docs to a list.
238
239        Returns
240        -------
241        The current index counter value (how many docs have been pushed).
242        """
243        from meerschaum.utils.dtypes import json_serialize_value
244        table_name = self.quote_table(table)
245        datetime_column_key = self.get_datetime_column_key(table)
246        remote_datetime_column = self.get(datetime_column_key)
247        datetime_column = datetime_column or remote_datetime_column
248        dateutil_parser = mrsm.attempt_import('dateutil.parser')
249
250        old_len = (
251            self.client.zcard(table_name)
252            if datetime_column
253            else self.client.scard(table_name)
254        )
255        for doc in docs:
256            original_dt_val = (
257                doc[datetime_column]
258                if datetime_column and datetime_column in doc
259                else 0
260            )
261            dt_val = (
262                dateutil_parser.parse(str(original_dt_val))
263                if not isinstance(original_dt_val, int)
264                else int(original_dt_val)
265            ) if datetime_column else None
266            ts = (
267                int(dt_val.replace(tzinfo=timezone.utc).timestamp())
268                if isinstance(dt_val, datetime)
269                else int(dt_val)
270            ) if datetime_column else None
271            doc_str = json.dumps(
272                doc,
273                default=json_serialize_value,
274                separators=(',', ':'),
275                sort_keys=True,
276            )
277            if datetime_column:
278                self.client.zadd(table_name, {doc_str: ts})
279            else:
280                self.client.sadd(table_name, doc_str)
281
282        if datetime_column:
283            self.set(datetime_column_key, datetime_column)
284        new_len = (
285            self.client.zcard(table_name)
286            if datetime_column
287            else self.client.scard(table_name)
288        )
289
290        return new_len - old_len

Append a list of documents to a table.

Parameters
  • docs (List[Dict[str, Any]]): The docs to be pushed. All keys and values will be coerced into strings.
  • table (str): The "table" name (root key).
  • datetime_column (Optional[str], default None): If set, create a sorted set with this datetime column as the index. Otherwise push the docs to a list.
Returns
  • The current index counter value (how many docs have been pushed).
def get_datetime_column_key(self, table: str) -> str:
308    def get_datetime_column_key(self, table: str) -> str:
309        """
310        Return the key to store the datetime index for `table`.
311        """
312        table_name = self.quote_table(table)
313        return f'{table_name}:datetime_column'

Return the key to store the datetime index for table.

def read( self, table: str, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, debug: bool = False) -> Optional[ForwardRef('pd.DataFrame')]:
315    def read(
316        self,
317        table: str,
318        begin: Union[datetime, int, str, None] = None,
319        end: Union[datetime, int, str, None] = None,
320        params: Optional[Dict[str, Any]] = None,
321        datetime_column: Optional[str] = None,
322        select_columns: Optional[List[str]] = None,
323        omit_columns: Optional[List[str]] = None,
324        debug: bool = False
325    ) -> Union['pd.DataFrame', None]:
326        """
327        Query the table and return the result dataframe.
328
329        Parameters
330        ----------
331        table: str
332            The "table" name to be queried.
333
334        begin: Union[datetime, int, str, None], default None
335            If provided, only return rows greater than or equal to this datetime.
336
337        end: Union[datetime, int, str, None], default None
338            If provided, only return rows older than this datetime.
339
340        params: Optional[Dict[str, Any]]
341            Additional Meerschaum filter parameters.
342
343        datetime_column: Optional[str], default None
344            If provided, use this column for the datetime index.
345            Otherwise infer from the table metadata.
346
347        select_columns: Optional[List[str]], default None
348            If provided, only return these columns.
349
350        omit_columns: Optional[List[str]], default None
351            If provided, do not include these columns in the result.
352
353        Returns
354        -------
355        A Pandas DataFrame of the result, or `None`.
356        """
357        from meerschaum.utils.dataframe import parse_df_datetimes, query_df
358        docs = self.read_docs(
359            table,
360            begin=begin,
361            end=end,
362            debug=debug,
363        )
364        df = parse_df_datetimes(docs)
365        datetime_column_key = self.get_datetime_column_key(table)
366        datetime_column = datetime_column or self.get(datetime_column_key)
367
368        return query_df(
369            df,
370            begin=(begin if datetime_column is not None else None),
371            end=(end if datetime_column is not None else None),
372            params=params,
373            datetime_column=datetime_column,
374            select_columns=select_columns,
375            omit_columns=omit_columns,
376            inplace=True,
377            reset_index=True,
378            debug=debug,
379        )

Query the table and return the result dataframe.

Parameters
  • table (str): The "table" name to be queried.
  • begin (Union[datetime, int, str, None], default None): If provided, only return rows greater than or equal to this datetime.
  • end (Union[datetime, int, str, None], default None): If provided, only return rows older than this datetime.
  • params (Optional[Dict[str, Any]]): Additional Meerschaum filter parameters.
  • datetime_column (Optional[str], default None): If provided, use this column for the datetime index. Otherwise infer from the table metadata.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
Returns
  • A Pandas DataFrame of the result, or None.
def read_docs( self, table: str, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, debug: bool = False) -> List[Dict[str, str]]:
381    def read_docs(
382        self,
383        table: str,
384        begin: Union[datetime, int, str, None] = None,
385        end: Union[datetime, int, str, None] = None,
386        debug: bool = False,
387    ) -> List[Dict[str, str]]:
388        """
389        Return a list of previously pushed docs.
390
391        Parameters
392        ----------
393        table: str
394            The "table" name (root key) under which the docs were pushed.
395
396        begin: Union[datetime, int, str, None], default None
397            If provided and the table was created with a datetime index, only return documents
398            newer than this datetime.
399            If the table was not created with a datetime index and `begin` is an `int`,
400            return documents with a positional index greater than or equal to this value.
401
402        end: Union[datetime, int, str, None], default None
403            If provided and the table was created with a datetime index, only return documents
404            older than this datetime.
405            If the table was not created with a datetime index and `begin` is an `int`,
406            return documents with a positional index less than this value.
407
408        Returns
409        -------
410        A list of dictionaries, where all keys and values are strings.
411        """
412        from meerschaum.utils.dtypes import coerce_timezone
413        table_name = self.quote_table(table)
414        datetime_column_key = self.get_datetime_column_key(table)
415        datetime_column = self.get(datetime_column_key)
416
417        if debug:
418            dprint(f"Reading documents from '{table}' with {begin=}, {end=}")
419
420        if not datetime_column:
421            return [
422                json.loads(doc_bytes.decode('utf-8'))
423                for doc_bytes in self.client.smembers(table_name)
424            ]
425
426        dateutil_parser = mrsm.attempt_import('dateutil.parser')
427
428        if isinstance(begin, str):
429            begin = coerce_timezone(dateutil_parser.parse(begin))
430
431        if isinstance(end, str):
432            end = coerce_timezone(dateutil_parser.parse(end))
433
434        begin_ts = (
435            (
436                int(begin.replace(tzinfo=timezone.utc).timestamp())
437                if isinstance(begin, datetime)
438                else int(begin)
439            )
440            if begin is not None else '-inf'
441        )
442        end_ts = (
443            (
444                int(end.replace(tzinfo=timezone.utc).timestamp())
445                if isinstance(end, datetime)
446                else int(end)
447            )
448            if end is not None else '+inf'
449        )
450
451        if debug:
452            dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
453
454        return [
455            json.loads(doc_bytes.decode('utf-8'))
456            for doc_bytes in self.client.zrangebyscore(
457                table_name,
458                begin_ts,
459                end_ts,
460                withscores=False,
461            )
462        ]

Return a list of previously pushed docs.

Parameters
  • table (str): The "table" name (root key) under which the docs were pushed.
  • begin (Union[datetime, int, str, None], default None): If provided and the table was created with a datetime index, only return documents newer than this datetime. If the table was not created with a datetime index and begin is an int, return documents with a positional index greater than or equal to this value.
  • end (Union[datetime, int, str, None], default None): If provided and the table was created with a datetime index, only return documents older than this datetime. If the table was not created with a datetime index and begin is an int, return documents with a positional index less than this value.
Returns
  • A list of dictionaries, where all keys and values are strings.
def drop_table(self, table: str, debug: bool = False) -> None:
508    def drop_table(self, table: str, debug: bool = False) -> None:
509        """
510        Drop a "table" of documents.
511
512        Parameters
513        ----------
514        table: str
515            The "table" name (root key) to be deleted.
516        """
517        table_name = self.quote_table(table)
518        datetime_column_key = self.get_datetime_column_key(table)
519        self.client.delete(table_name)
520        self.client.delete(datetime_column_key)

Drop a "table" of documents.

Parameters
  • table (str): The "table" name (root key) to be deleted.
@classmethod
def get_entity_key(cls, *keys: Any) -> str:
522    @classmethod
523    def get_entity_key(cls, *keys: Any) -> str:
524        """
525        Return a joined key to set an entity.
526        """
527        if not keys:
528            raise ValueError("No keys to be joined.")
529
530        for key in keys:
531            if cls.KEY_SEPARATOR in str(key):
532                raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.")
533
534        return cls.KEY_SEPARATOR.join([str(key) for key in keys])

Return a joined key to set an entity.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
142def register_pipe(
143    self,
144    pipe: mrsm.Pipe,
145    debug: bool = False,
146    **kwargs: Any
147) -> SuccessTuple:
148    """
149    Insert the pipe's attributes into the internal `pipes` table.
150
151    Parameters
152    ----------
153    pipe: mrsm.Pipe
154        The pipe to be registered.
155
156    Returns
157    -------
158    A `SuccessTuple` of the result.
159    """
160    attributes = {
161        'connector_keys': str(pipe.connector_keys),
162        'metric_key': str(pipe.metric_key),
163        'location_key': str(pipe.location_key),
164    }
165    parameters_str = json.dumps(
166        pipe._attributes.get('parameters', {}),
167        separators=(',', ':'),
168    )
169
170    pipe_key = get_pipe_key(pipe)
171    parameters_key = get_pipe_parameters_key(pipe)
172
173    try:
174        existing_pipe_id = self.get(pipe_key)
175        if existing_pipe_id is not None:
176            return False, f"{pipe} is already registered."
177
178        pipe_id = self.client.incr(PIPES_COUNTER)
179        _ = self.push_docs(
180            [{'pipe_id': pipe_id, **attributes}],
181            PIPES_TABLE,
182            datetime_column='pipe_id',
183            debug=debug,
184        )
185        self.set(pipe_key, pipe_id)
186        self.set(parameters_key, parameters_str)
187
188    except Exception as e:
189        return False, f"Failed to register {pipe}:\n{e}"
190
191    return True, "Success"

Insert the pipe's attributes into the internal pipes table.

Parameters
  • pipe (mrsm.Pipe): The pipe to be registered.
Returns
  • A SuccessTuple of the result.
def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Union[str, int, NoneType]:
194def get_pipe_id(
195    self,
196    pipe: mrsm.Pipe,
197    debug: bool = False,
198    **kwargs: Any
199) -> Union[str, int, None]:
200    """
201    Return the `_id` for the pipe if it exists.
202
203    Parameters
204    ----------
205    pipe: mrsm.Pipe
206        The pipe whose `_id` to fetch.
207
208    Returns
209    -------
210    The `_id` for the pipe's document or `None`.
211    """
212    pipe_key = get_pipe_key(pipe)
213    try:
214        return int(self.get(pipe_key))
215    except Exception:
216        pass
217    return None

Return the _id for the pipe if it exists.

Parameters
  • pipe (mrsm.Pipe): The pipe whose _id to fetch.
Returns
  • The _id for the pipe's document or None.
def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Dict[str, Any]:
220def get_pipe_attributes(
221    self,
222    pipe: mrsm.Pipe,
223    debug: bool = False,
224    **kwargs: Any
225) -> Dict[str, Any]:
226    """
227    Return the pipe's document from the internal `pipes` collection.
228
229    Parameters
230    ----------
231    pipe: mrsm.Pipe
232        The pipe whose attributes should be retrieved.
233
234    Returns
235    -------
236    The document that matches the keys of the pipe.
237    """
238    pipe_id = pipe.get_id(debug=debug)
239    if pipe_id is None:
240        return {}
241
242    parameters_key = get_pipe_parameters_key(pipe)
243    parameters_str = self.get(parameters_key)
244
245    parameters = json.loads(parameters_str) if parameters_str else {}
246
247    attributes = {
248        'connector_keys': pipe.connector_keys,
249        'metric_key': pipe.metric_key,
250        'location_key': pipe.location_key,
251        'parameters': parameters,
252        'pipe_id': pipe_id,
253    }
254    return attributes

Return the pipe's document from the internal pipes collection.

Parameters
  • pipe (mrsm.Pipe): The pipe whose attributes should be retrieved.
Returns
  • The document that matches the keys of the pipe.
def edit_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
257def edit_pipe(
258    self,
259    pipe: mrsm.Pipe,
260    debug: bool = False,
261    **kwargs: Any
262) -> mrsm.SuccessTuple:
263    """
264    Edit the attributes of the pipe.
265
266    Parameters
267    ----------
268    pipe: mrsm.Pipe
269        The pipe whose in-memory parameters must be persisted.
270
271    Returns
272    -------
273    A `SuccessTuple` indicating success.
274    """
275    pipe_id = pipe.get_id(debug=debug)
276    if pipe_id is None:
277        return False, f"{pipe} is not registered."
278
279    parameters_key = get_pipe_parameters_key(pipe)
280    parameters_str = json.dumps(pipe.get_parameters(apply_symlinks=False), separators=(',', ':'))
281    self.set(parameters_key, parameters_str)
282    return True, "Success"

Edit the attributes of the pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose in-memory parameters must be persisted.
Returns
  • A SuccessTuple indicating success.
def pipe_exists( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> bool:
285def pipe_exists(
286    self,
287    pipe: mrsm.Pipe,
288    debug: bool = False,
289    **kwargs: Any
290) -> bool:
291    """
292    Check whether a pipe's target table exists.
293
294    Parameters
295    ----------
296    pipe: mrsm.Pipe
297        The pipe to check whether its table exists.
298
299    Returns
300    -------
301    A `bool` indicating the table exists.
302    """
303    table_name = self.quote_table(pipe.target)
304    return self.client.exists(table_name) != 0

Check whether a pipe's target table exists.

Parameters
  • pipe (mrsm.Pipe): The pipe to check whether its table exists.
Returns
  • A bool indicating the table exists.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
307def drop_pipe(
308    self,
309    pipe: mrsm.Pipe,
310    debug: bool = False,
311    **kwargs: Any
312) -> mrsm.SuccessTuple:
313    """
314    Drop a pipe's collection if it exists.
315
316    Parameters
317    ----------
318    pipe: mrsm.Pipe
319        The pipe to be dropped.
320
321    Returns
322    -------
323    A `SuccessTuple` indicating success.
324    """
325    if not pipe.exists(debug=debug):
326        return True, f"{pipe} does not exist, so it was not dropped."
327
328    table_name = self.quote_table(pipe.target)
329    dt_col = pipe.columns.get('datetime', None)
330
331    try:
332        members = (
333            self.client.zrange(table_name, 0, -1)
334            if dt_col
335            else self.client.smembers(table_name)
336        )
337        
338        keys_to_delete = []
339        for member_bytes in members:
340            member_str = member_bytes.decode('utf-8')
341            member_doc = json.loads(member_str)
342            ix_str = member_doc.get('ix')
343            if not ix_str:
344                continue
345            
346            ix_doc = string_to_dict(ix_str.replace(COLON, ':'))
347            doc_key = self.get_document_key(ix_doc, list(ix_doc.keys()), table_name)
348            keys_to_delete.append(doc_key)
349
350        if keys_to_delete:
351            batch_size = 1000
352            for i in range(0, len(keys_to_delete), batch_size):
353                batch = keys_to_delete[i:i+batch_size]
354                self.client.delete(*batch)
355
356    except Exception as e:
357        return False, f"Failed to delete documents for {pipe}:\n{e}"
358
359    try:
360        self.drop_table(pipe.target, debug=debug)
361    except Exception as e:
362        return False, f"Failed to drop {pipe}:\n{e}"
363
364    if 'valkey' not in pipe.parameters:
365        return True, "Success"
366
367    pipe._attributes['parameters']['valkey']['dtypes'] = {}
368    if not pipe.temporary:
369        edit_success, edit_msg = pipe.edit(debug=debug)
370        if not edit_success:
371            return edit_success, edit_msg
372
373    return True, "Success"

Drop a pipe's collection if it exists.

Parameters
  • pipe (mrsm.Pipe): The pipe to be dropped.
Returns
  • A SuccessTuple indicating success.
def delete_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
376def delete_pipe(
377    self,
378    pipe: mrsm.Pipe,
379    debug: bool = False,
380    **kwargs: Any
381) -> mrsm.SuccessTuple:
382    """
383    Delete a pipe's registration from the `pipes` collection.
384
385    Parameters
386    ----------
387    pipe: mrsm.Pipe
388        The pipe to be deleted.
389
390    Returns
391    -------
392    A `SuccessTuple` indicating success.
393    """
394    drop_success, drop_message = pipe.drop(debug=debug)
395    if not drop_success:
396        return drop_success, drop_message
397
398    pipe_id = self.get_pipe_id(pipe, debug=debug)
399    if pipe_id is None:
400        return False, f"{pipe} is not registered."
401
402    pipe_key = get_pipe_key(pipe)
403    parameters_key = get_pipe_parameters_key(pipe)
404    self.client.delete(pipe_key)
405    self.client.delete(parameters_key)
406    df = self.read(PIPES_TABLE, params={'pipe_id': pipe_id})
407    docs = df.to_dict(orient='records')
408    if docs:
409        doc = docs[0]
410        doc_str = json.dumps(
411            doc,
412            default=json_serialize_value,
413            separators=(',', ':'),
414            sort_keys=True,
415        )
416        self.client.zrem(PIPES_TABLE, doc_str)
417    return True, "Success"

Delete a pipe's registration from the pipes collection.

Parameters
  • pipe (mrsm.Pipe): The pipe to be deleted.
Returns
  • A SuccessTuple indicating success.
def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Optional[ForwardRef('pd.DataFrame')]:
420def get_pipe_data(
421    self,
422    pipe: mrsm.Pipe,
423    select_columns: Optional[List[str]] = None,
424    omit_columns: Optional[List[str]] = None,
425    begin: Union[datetime, int, None] = None,
426    end: Union[datetime, int, None] = None,
427    params: Optional[Dict[str, Any]] = None,
428    debug: bool = False,
429    **kwargs: Any
430) -> Union['pd.DataFrame', None]:
431    """
432    Query a pipe's target table and return the DataFrame.
433
434    Parameters
435    ----------
436    pipe: mrsm.Pipe
437        The pipe with the target table from which to read.
438
439    select_columns: Optional[List[str]], default None
440        If provided, only select these given columns.
441        Otherwise select all available columns (i.e. `SELECT *`).
442
443    omit_columns: Optional[List[str]], default None
444        If provided, remove these columns from the selection.
445
446    begin: Union[datetime, int, None], default None
447        The earliest `datetime` value to search from (inclusive).
448
449    end: Union[datetime, int, None], default None
450        The lastest `datetime` value to search from (exclusive).
451
452    params: Optional[Dict[str, str]], default None
453        Additional filters to apply to the query.
454
455    Returns
456    -------
457    The target table's data as a DataFrame.
458    """
459    if not pipe.exists(debug=debug):
460        return None
461
462    from meerschaum.utils.dataframe import query_df, parse_df_datetimes
463    from meerschaum.utils.dtypes import are_dtypes_equal
464
465    valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {})
466    dt_col = pipe.columns.get('datetime', None)
467    table_name = self.quote_table(pipe.target)
468    indices = [col for col in pipe.columns.values() if col]
469    ix_docs = [
470        string_to_dict(doc.get('ix', '').replace(COLON, ':'))
471        for doc in self.read_docs(
472            pipe.target,
473            begin=begin,
474            end=end,
475            debug=debug,
476        )
477    ]
478    try:
479        docs_strings = [
480            self.get(
481                self.get_document_key(
482                    doc,
483                    indices,
484                    table_name,
485                )
486            )
487            for doc in ix_docs
488        ]
489    except Exception as e:
490        warn(f"Failed to fetch documents for {pipe}:\n{e}")
491        docs_strings = []
492
493    docs = [
494        json.loads(doc_str)
495        for doc_str in docs_strings
496        if doc_str
497    ]
498    ignore_dt_cols = [
499        col
500        for col, dtype in pipe.dtypes.items()
501        if not are_dtypes_equal(str(dtype), 'datetime')
502    ]
503
504    df = parse_df_datetimes(
505        docs,
506        ignore_cols=ignore_dt_cols,
507        chunksize=kwargs.get('chunksize', None),
508        strip_timezone=(pipe.tzinfo is None),
509        debug=debug,
510    )
511    for col, typ in valkey_dtypes.items():
512        try:
513            df[col] = df[col].astype(typ)
514        except Exception:
515            pass
516
517    df = pipe.enforce_dtypes(df, debug=debug)
518
519    if len(df) == 0:
520        return query_df(df, select_columns=select_columns, omit_columns=omit_columns)
521
522    return query_df(
523        df,
524        select_columns=select_columns,
525        omit_columns=omit_columns,
526        params=params,
527        begin=begin,
528        end=end,
529        datetime_column=dt_col,
530        inplace=True,
531        reset_index=True,
532    )

Query a pipe's target table and return the DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe with the target table from which to read.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, None], default None): The earliest datetime value to search from (inclusive).
  • end (Union[datetime, int, None], default None): The lastest datetime value to search from (exclusive).
  • params (Optional[Dict[str, str]], default None): Additional filters to apply to the query.
Returns
  • The target table's data as a DataFrame.
def sync_pipe( self, pipe: meerschaum.Pipe, df: 'pd.DataFrame' = None, check_existing: bool = True, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
535def sync_pipe(
536    self,
537    pipe: mrsm.Pipe,
538    df: 'pd.DataFrame' = None,
539    check_existing: bool = True,
540    debug: bool = False,
541    **kwargs: Any
542) -> mrsm.SuccessTuple:
543    """
544    Upsert new documents into the pipe's collection.
545
546    Parameters
547    ----------
548    pipe: mrsm.Pipe
549        The pipe whose collection should receive the new documents.
550
551    df: Union['pd.DataFrame', Iterator['pd.DataFrame']], default None
552        The data to be synced.
553
554    check_existing: bool, default True
555        If `False`, do not check the documents against existing data and instead insert directly.
556
557    Returns
558    -------
559    A `SuccessTuple` indicating success.
560    """
561    from meerschaum.utils.dtypes import are_dtypes_equal
562    dt_col = pipe.columns.get('datetime', None)
563    indices = [col for col in pipe.columns.values() if col]
564    table_name = self.quote_table(pipe.target)
565    is_dask = 'dask' in df.__module__
566    if is_dask:
567        df = df.compute()
568    upsert = pipe.parameters.get('upsert', False)
569    static = pipe.parameters.get('static', False)
570
571    def _serialize_indices_docs(_docs):
572        return [
573            {
574                'ix': self.get_document_key(doc, indices),
575                **(
576                    {
577                        dt_col: doc.get(dt_col, 0)
578                    }
579                    if dt_col
580                    else {}
581                )
582            }
583            for doc in _docs
584        ]
585
586    valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {})
587    new_dtypes = {
588        str(key): str(val)
589        for key, val in df.dtypes.items()
590        if str(key) not in valkey_dtypes
591    }
592    for col, typ in {c: v for c, v in valkey_dtypes.items()}.items():
593        if col in df.columns:
594            try:
595                df[col] = df[col].astype(typ)
596            except Exception:
597                import traceback
598                traceback.print_exc()
599                valkey_dtypes[col] = 'string'
600                new_dtypes[col] = 'string'
601                df[col] = df[col].astype('string')
602
603    if new_dtypes and (not static or not valkey_dtypes):
604        valkey_dtypes.update(new_dtypes)
605        update_success, update_msg = pipe.update_parameters(
606            {'valkey': {'dtypes': valkey_dtypes}},
607            debug=debug,
608        )
609        if not update_success:
610            return False, update_msg
611
612    unseen_df, update_df, delta_df = (
613        pipe.filter_existing(df, include_unchanged_columns=True, debug=debug)
614        if check_existing and not upsert
615        else (None, df, df)
616    )
617    num_insert = len(unseen_df) if unseen_df is not None else 0
618    num_update = len(update_df) if update_df is not None else 0
619    msg = (
620        f"Inserted {num_insert}, updated {num_update} rows."
621        if not upsert
622        else f"Upserted {num_update} rows."
623    )
624    if len(delta_df) == 0:
625        return True, msg
626
627    unseen_docs = unseen_df.to_dict(orient='records') if unseen_df is not None else []
628    unseen_indices_docs = _serialize_indices_docs(unseen_docs)
629    unseen_ix_vals = {
630        self.get_document_key(doc, indices, table_name): serialize_document(doc)
631        for doc in unseen_docs
632    }
633    for key, val in unseen_ix_vals.items():
634        try:
635            self.set(key, val)
636        except Exception as e:
637            return False, f"Failed to set keys for {pipe}:\n{e}"
638
639    try:
640        self.push_docs(
641            unseen_indices_docs,
642            pipe.target,
643            datetime_column=dt_col,
644            debug=debug,
645        )
646    except Exception as e:
647        return False, f"Failed to push docs to '{pipe.target}':\n{e}"
648
649    update_docs = update_df.to_dict(orient='records') if update_df is not None else []
650    update_ix_docs = {
651        self.get_document_key(doc, indices, table_name): doc
652        for doc in update_docs
653    }
654    existing_docs_data = {
655        key: self.get(key)
656        for key in update_ix_docs
657    } if pipe.exists(debug=debug) else {}
658    existing_docs = {
659        key: json.loads(data)
660        for key, data in existing_docs_data.items()
661        if data
662    }
663    new_update_docs = {
664        key: doc
665        for key, doc in update_ix_docs.items()
666        if key not in existing_docs
667    }
668    new_ix_vals = {
669        self.get_document_key(doc, indices, table_name): serialize_document(doc)
670        for doc in new_update_docs.values()
671    }
672    for key, val in new_ix_vals.items():
673        try:
674            self.set(key, val)
675        except Exception as e:
676            return False, f"Failed to set keys for {pipe}:\n{e}"
677
678    old_update_docs = {
679        key: {
680            **existing_docs[key],
681            **doc
682        }
683        for key, doc in update_ix_docs.items()
684        if key in existing_docs
685    }
686    new_indices_docs = _serialize_indices_docs([doc for doc in new_update_docs.values()])
687    try:
688        if new_indices_docs:
689            self.push_docs(
690                new_indices_docs,
691                pipe.target,
692                datetime_column=dt_col,
693                debug=debug,
694            )
695    except Exception as e:
696        return False, f"Failed to upsert '{pipe.target}':\n{e}"
697
698    for key, doc in old_update_docs.items():
699        try:
700            self.set(key, serialize_document(doc))
701        except Exception as e:
702            return False, f"Failed to set keys for {pipe}:\n{e}"
703
704    return True, msg

Upsert new documents into the pipe's collection.

Parameters
  • pipe (mrsm.Pipe): The pipe whose collection should receive the new documents.
  • df (Union['pd.DataFrame', Iterator['pd.DataFrame']], default None): The data to be synced.
  • check_existing (bool, default True): If False, do not check the documents against existing data and instead insert directly.
Returns
  • A SuccessTuple indicating success.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Dict[str, str]:
707def get_pipe_columns_types(
708    self,
709    pipe: mrsm.Pipe,
710    debug: bool = False,
711    **kwargs: Any
712) -> Dict[str, str]:
713    """
714    Return the data types for the columns in the target table for data type enforcement.
715
716    Parameters
717    ----------
718    pipe: mrsm.Pipe
719        The pipe whose target table contains columns and data types.
720
721    Returns
722    -------
723    A dictionary mapping columns to data types.
724    """
725    if not pipe.exists(debug=debug):
726        return {}
727
728    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
729    return {
730        col: get_db_type_from_pd_type(typ, flavor='postgresql')
731        for col, typ in pipe.parameters.get('valkey', {}).get('dtypes', {}).items()
732    }

Return the data types for the columns in the target table for data type enforcement.

Parameters
  • pipe (mrsm.Pipe): The pipe whose target table contains columns and data types.
Returns
  • A dictionary mapping columns to data types.
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
735def clear_pipe(
736    self,
737    pipe: mrsm.Pipe,
738    begin: Union[datetime, int, None] = None,
739    end: Union[datetime, int, None] = None,
740    params: Optional[Dict[str, Any]] = None,
741    debug: bool = False,
742) -> mrsm.SuccessTuple:
743    """
744    Delete rows within `begin`, `end`, and `params`.
745
746    Parameters
747    ----------
748    pipe: mrsm.Pipe
749        The pipe whose rows to clear.
750
751    begin: Union[datetime, int, None], default None
752        If provided, remove rows >= `begin`.
753
754    end: Union[datetime, int, None], default None
755        If provided, remove rows < `end`.
756
757    params: Optional[Dict[str, Any]], default None
758        If provided, only remove rows which match the `params` filter.
759
760    Returns
761    -------
762    A `SuccessTuple` indicating success.
763    """
764    dt_col = pipe.columns.get('datetime', None)
765
766    existing_df = pipe.get_data(
767        begin=begin,
768        end=end,
769        params=params,
770        debug=debug,
771    )
772    if existing_df is None or len(existing_df) == 0:
773        return True, "Deleted 0 rows."
774
775    docs = existing_df.to_dict(orient='records')
776    table_name = self.quote_table(pipe.target)
777    indices = [col for col in pipe.columns.values() if col]
778    for doc in docs:
779        set_doc_key = self.get_document_key(doc, indices)
780        table_doc_key = self.get_document_key(doc, indices, table_name)
781        try:
782            if dt_col:
783                self.client.zrem(table_name, set_doc_key)
784            else:
785                self.client.srem(table_name, set_doc_key)
786            self.client.delete(table_doc_key)
787        except Exception as e:
788            return False, f"Failed to delete documents:\n{e}"
789    msg = (
790        f"Deleted {len(docs)} row"
791        + ('s' if len(docs) != 1 else '')
792        + '.'
793    )
794    return True, msg

Delete rows within begin, end, and params.

Parameters
  • pipe (mrsm.Pipe): The pipe whose rows to clear.
  • begin (Union[datetime, int, None], default None): If provided, remove rows >= begin.
  • end (Union[datetime, int, None], default None): If provided, remove rows < end.
  • params (Optional[Dict[str, Any]], default None): If provided, only remove rows which match the params filter.
Returns
  • A SuccessTuple indicating success.
def get_sync_time( self, pipe: meerschaum.Pipe, newest: bool = True, **kwargs: Any) -> Union[datetime.datetime, int, NoneType]:
797def get_sync_time(
798    self,
799    pipe: mrsm.Pipe,
800    newest: bool = True,
801    **kwargs: Any
802) -> Union[datetime, int, None]:
803    """
804    Return the newest (or oldest) timestamp in a pipe.
805    """
806    from meerschaum.utils.dtypes import are_dtypes_equal
807    dt_col = pipe.columns.get('datetime', None)
808    dt_typ = pipe.dtypes.get(dt_col, 'datetime')
809    if not dt_col:
810        return None
811
812    dateutil_parser = mrsm.attempt_import('dateutil.parser')
813    table_name = self.quote_table(pipe.target)
814    try:
815        vals = (
816            self.client.zrevrange(table_name, 0, 0, withscores=True)
817            if newest
818            else self.client.zrange(table_name, 0, 0, withscores=True)
819        )
820        if not vals:
821            return None
822        val = vals[0][0]
823        if isinstance(val, bytes):
824            val = val.decode('utf-8')
825    except Exception:
826        import traceback
827        traceback.print_exc()
828        return None
829
830    doc = json.loads(val)
831    dt_val = doc.get(dt_col, None)
832    if dt_val is None:
833        return None
834
835    try:
836        return (
837            int(dt_val)
838            if are_dtypes_equal(dt_typ, 'int')
839            else dateutil_parser.parse(str(dt_val))
840        )
841    except Exception as e:
842        warn(f"Failed to parse sync time for {pipe}:\n{e}")
843
844    return None

Return the newest (or oldest) timestamp in a pipe.

def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Optional[int]:
847def get_pipe_rowcount(
848    self,
849    pipe: mrsm.Pipe,
850    begin: Union[datetime, int, None] = None,
851    end: Union[datetime, int, None] = None,
852    params: Optional[Dict[str, Any]] = None,
853    debug: bool = False,
854    **kwargs: Any
855) -> Union[int, None]:
856    """
857    Return the number of documents in the pipe's set.
858    """
859    dt_col = pipe.columns.get('datetime', None)
860    table_name = self.quote_table(pipe.target)
861
862    if not pipe.exists():
863        return 0
864
865    try:
866        if begin is None and end is None and not params:
867            return (
868                self.client.zcard(table_name)
869                if dt_col
870                else self.client.scard(table_name)
871            )
872    except Exception as e:
873        if debug:
874            dprint(f"Failed to get rowcount for {pipe}:\n{e}")
875        return None
876
877    df = pipe.get_data(begin=begin, end=end, params=params, debug=debug)
878    if df is None:
879        return 0
880
881    return len(df)

Return the number of documents in the pipe's set.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> List[Tuple[str, str, Optional[str], Dict[str, Any]]]:
884def fetch_pipes_keys(
885    self,
886    connector_keys: Optional[List[str]] = None,
887    metric_keys: Optional[List[str]] = None,
888    location_keys: Optional[List[str]] = None,
889    tags: Optional[List[str]] = None,
890    params: Optional[Dict[str, Any]] = None,
891    debug: bool = False
892) -> List[
893        Tuple[str, str, Union[str, None], Dict[str, Any]]
894    ]:
895    """
896    Return the keys for the registered pipes.
897    """
898    from meerschaum.utils.dataframe import query_df
899    from meerschaum.utils.misc import separate_negation_values
900    try:
901        df = self.read(PIPES_TABLE, debug=debug)
902    except Exception:
903        return []
904
905    if df is None or len(df) == 0:
906        return []
907
908    query = {}
909    if connector_keys:
910        query['connector_keys'] = [str(k) for k in connector_keys]
911    if metric_keys:
912        query['metric_key'] = [str(k) for k in metric_keys]
913    if location_keys:
914        query['location_key'] = [str(k) for k in location_keys]
915    if params:
916        query.update(params)
917
918    df = query_df(df, query, inplace=True)
919
920    keys = [
921        (
922            doc['connector_keys'],
923            doc['metric_key'],
924            doc['location_key'],
925            doc.get('parameters', {})
926        )
927        for doc in df.to_dict(orient='records')
928    ]
929    if not tags:
930        return keys
931
932    tag_groups = [tag.split(',') for tag in tags]
933    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
934
935    filtered_keys = []
936    for ck, mk, lk, parameters in keys:
937        pipe_tags = set(parameters.get('tags', []))
938        
939        include_pipe = True
940        for in_tags, ex_tags in in_ex_tag_groups:
941            all_in = all(tag in pipe_tags for tag in in_tags)
942            any_ex = any(tag in pipe_tags for tag in ex_tags)
943
944            if (not all_in) or any_ex:
945                include_pipe = False
946                continue
947
948        if include_pipe:
949            filtered_keys.append((ck, mk, lk, parameters))
950
951    return filtered_keys

Return the keys for the registered pipes.

@staticmethod
def get_document_key( doc: Dict[str, Any], indices: List[str], table_name: Optional[str] = None) -> str:
 59@staticmethod
 60def get_document_key(
 61    doc: Dict[str, Any],
 62    indices: List[str],
 63    table_name: Optional[str] = None,
 64) -> str:
 65    """
 66    Return a serialized string for a document's indices only.
 67
 68    Parameters
 69    ----------
 70    doc: Dict[str, Any]
 71        The document containing index values to be serialized.
 72
 73    indices: List[str]
 74        The name of the indices to be serialized.
 75
 76    table_name: Optional[str], default None
 77        If provided, prepend the table to the key.
 78
 79    Returns
 80    -------
 81    A serialized string of the document's indices.
 82    """
 83    from meerschaum.utils.dtypes import coerce_timezone
 84    index_vals = {
 85        key: (
 86            str(val).replace(':', COLON)
 87            if not isinstance(val, datetime)
 88            else str(int(coerce_timezone(val).replace(tzinfo=timezone.utc).timestamp()))
 89        )
 90        for key, val in doc.items()
 91        if ((key in indices) if indices else True)
 92    }
 93    indices_str = (
 94        (
 95            (
 96                (
 97                    table_name
 98                    + ':'
 99                    + ('indices:' if True else '')
100                )
101            )
102            if table_name
103            else ''
104        ) + ','.join(
105            sorted(
106                [
107                    f'{key}{COLON}{val}'
108                    for key, val in index_vals.items()
109                ]
110            )
111        )
112    )
113    return indices_str

Return a serialized string for a document's indices only.

Parameters
  • doc (Dict[str, Any]): The document containing index values to be serialized.
  • indices (List[str]): The name of the indices to be serialized.
  • table_name (Optional[str], default None): If provided, prepend the table to the key.
Returns
  • A serialized string of the document's indices.
@classmethod
def get_table_quoted_doc_key( cls, table_name: str, doc: Dict[str, Any], indices: List[str], datetime_column: Optional[str] = None) -> str:
116@classmethod
117def get_table_quoted_doc_key(
118    cls,
119    table_name: str,
120    doc: Dict[str, Any],
121    indices: List[str],
122    datetime_column: Optional[str] = None,
123) -> str:
124    """
125    Return the document string as stored in the underling set.
126    """
127    return json.dumps(
128        {
129            cls.get_document_key(doc, indices, table_name): serialize_document(doc),
130            **(
131                {datetime_column: doc.get(datetime_column, 0)}
132                if datetime_column
133                else {}
134            )
135        },
136        sort_keys=True,
137        separators=(',', ':'),
138        default=json_serialize_value,
139    )

Return the document string as stored in the underling set.

def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> List[Dict[str, Any]]:
17def fetch(
18    self,
19    pipe: mrsm.Pipe,
20    begin: Union[datetime, int, None] = None,
21    end: Union[datetime, int, None] = None,
22    params: Optional[Dict[str, Any]] = None,
23    debug: bool = False,
24    **kwargs: Any
25) -> List[Dict[str, Any]]:
26    """
27    Return data from a source database.
28    """
29    source_key = pipe.parameters.get('valkey', {}).get('key', None)
30    if not source_key:
31        return []
32
33    try:
34        key_type = self.client.type(source_key).decode('utf-8')
35    except Exception:
36        warn(f"Could not determine the type for key '{source_key}'.")
37        return []
38
39    begin_ts = (
40        (
41            int(begin.replace(tzinfo=timezone.utc).timestamp())
42            if isinstance(begin, datetime)
43            else int(begin)
44        )
45        if begin is not None else '-inf'
46    )
47    end_ts = (
48        (
49            int(end.replace(tzinfo=timezone.utc).timestamp())
50            if isinstance(end, datetime)
51            else int(end)
52        )
53        if end is not None else '+inf'
54    )
55
56    if debug:
57        dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
58
59    if key_type == 'set':
60        return [
61            json.loads(doc_bytes.decode('utf-8'))
62            for doc_bytes in self.client.smembers(source_key)
63        ]
64
65    if key_type == 'zset':
66        return [
67            json.loads(doc_bytes.decode('utf-8'))
68            for doc_bytes in self.client.zrangebyscore(
69                source_key,
70                begin_ts,
71                end_ts,
72                withscores=False,
73            )
74        ]
75
76    return [{source_key: self.get(source_key)}]

Return data from a source database.

def get_users_pipe(self):
20def get_users_pipe(self):
21    """
22    Return the pipe which stores the registered users.
23    """
24    return mrsm.Pipe(
25        'mrsm', 'users',
26        columns=['user_id'],
27        temporary=True,
28        target=USERS_TABLE,
29        instance=self,
30    )

Return the pipe which stores the registered users.

@classmethod
def get_user_key( cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str:
33@classmethod
34def get_user_key(cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str:
35    """
36    Return the key to store metadata about a user.
37
38    Parameters
39    ----------
40    user_id_or_username: str
41        The user ID or username of the given user.
42        If `by_username` is `True`, then provide the username.
43
44    sub_key: str
45        The key suffix, e.g. `'attributes'`.
46
47    by_username: bool, default False
48        If `True`, then treat `user_id_or_username` as a username.
49
50    Returns
51    -------
52    A key to store information about a user.
53
54    Examples
55    --------
56    >>> get_user_key('deadbeef', 'attributes')
57    'mrsm_user:user_id:deadbeef:attributes'
58    >>> get_user_key('foo', 'user_id', by_username=True)
59    'mrsm_user:username:foo:user_id'
60    """
61    key_type = 'user_id' if not by_username else 'username'
62    return cls.get_entity_key(USER_PREFIX, key_type, user_id_or_username, sub_key)

Return the key to store metadata about a user.

Parameters
  • user_id_or_username (str): The user ID or username of the given user. If by_username is True, then provide the username.
  • sub_key (str): The key suffix, e.g. 'attributes'.
  • by_username (bool, default False): If True, then treat user_id_or_username as a username.
Returns
  • A key to store information about a user.
Examples
>>> get_user_key('deadbeef', 'attributes')
'mrsm_user:user_id:deadbeef:attributes'
>>> get_user_key('foo', 'user_id', by_username=True)
'mrsm_user:username:foo:user_id'
@classmethod
def get_user_keys_vals( cls, user: meerschaum.core.User._User.User, mutable_only: bool = False) -> Dict[str, str]:
 65@classmethod
 66def get_user_keys_vals(
 67    cls,
 68    user: 'mrsm.core.User',
 69    mutable_only: bool = False,
 70) -> Dict[str, str]:
 71    """
 72    Return a dictionary containing keys and values to set for the user.
 73
 74    Parameters
 75    ----------
 76    user: mrsm.core.User
 77        The user for which to generate the keys.
 78
 79    mutable_only: bool, default False
 80        If `True`, only return keys which may be edited.
 81
 82    Returns
 83    -------
 84    A dictionary mapping a user's keys to values.
 85    """
 86    user_attributes_str = json.dumps(user.attributes, separators=(',', ':'))
 87    mutable_keys_vals = {
 88        cls.get_user_key(user.user_id, 'attributes'): user_attributes_str,
 89        cls.get_user_key(user.user_id, 'email'): user.email,
 90        cls.get_user_key(user.user_id, 'type'): user.type,
 91        cls.get_user_key(user.user_id, 'password_hash'): user.password_hash,
 92    }
 93    if mutable_only:
 94        return mutable_keys_vals
 95
 96    immutable_keys_vals = {
 97        cls.get_user_key(user.user_id, 'username'): user.username,
 98        cls.get_user_key(user.username, 'user_id', by_username=True): user.user_id,
 99    }
100
101    return {**immutable_keys_vals, **mutable_keys_vals}

Return a dictionary containing keys and values to set for the user.

Parameters
  • user (mrsm.core.User): The user for which to generate the keys.
  • mutable_only (bool, default False): If True, only return keys which may be edited.
Returns
  • A dictionary mapping a user's keys to values.
def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
104def register_user(
105    self,
106    user: 'mrsm.core.User',
107    debug: bool = False,
108    **kwargs: Any
109) -> SuccessTuple:
110    """
111    Register a new user.
112    """
113    from meerschaum.utils.misc import generate_password
114
115    user.user_id = generate_password(12)
116    users_pipe = self.get_users_pipe()
117    keys_vals = self.get_user_keys_vals(user)
118
119    try:
120        sync_success, sync_msg = users_pipe.sync(
121            [
122                {
123                    'user_id': user.user_id,
124                    'username': user.username,
125                },
126            ],
127            check_existing=False,
128            debug=debug,
129        )
130        if not sync_success:
131            return sync_success, sync_msg
132
133        for key, val in keys_vals.items():
134            if val is not None:
135                self.set(key, val)
136
137        success, msg = True, "Success"
138    except Exception as e:
139        success = False
140        import traceback
141        traceback.print_exc()
142        msg = f"Failed to register '{user.username}':\n{e}"
143
144    if not success:
145        for key in keys_vals:
146            try:
147                self.client.delete(key)
148            except Exception:
149                pass
150
151    return success, msg

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[str]:
154def get_user_id(self, user: 'mrsm.core.User', debug: bool = False) -> Union[str, None]:
155    """
156    Return the ID for a user, or `None`.
157    """
158    username_user_id_key = self.get_user_key(user.username, 'user_id', by_username=True)
159    try:
160        user_id = self.get(username_user_id_key)
161    except Exception:
162        user_id = None
163    return user_id

Return the ID for a user, or None.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
166def edit_user(
167    self,
168    user: 'mrsm.core.User',
169    debug: bool = False,
170    **kw: Any
171) -> SuccessTuple:
172    """
173    Edit the attributes for an existing user.
174    """
175    keys_vals = self.get_user_keys_vals(user, mutable_only=True)
176    try:
177        old_keys_vals = {
178            key: self.get(key)
179            for key in keys_vals
180        }
181    except Exception as e:
182        return False, f"Failed to edit user:\n{e}"
183
184    try:
185        for key, val in keys_vals.items():
186            self.set(key, val)
187        success, msg = True, "Success"
188    except Exception as e:
189        success = False
190        msg = f"Failed to edit user:\n{e}"
191
192    if not success:
193        try:
194            for key, old_val in old_keys_vals.items():
195                self.set(key, old_val)
196        except Exception:
197            pass
198
199    return success, msg

Edit the attributes for an existing user.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[Dict[str, Any]]:
202def get_user_attributes(
203    self,
204    user: 'mrsm.core.User',
205    debug: bool = False
206) -> Union[Dict[str, Any], None]:
207    """
208    Return the user's attributes.
209    """
210    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
211    user_id_attributes_key = self.get_user_key(user_id, 'attributes')
212    try:
213        return json.loads(self.get(user_id_attributes_key))
214    except Exception:
215        return None

Return the user's attributes.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Tuple[bool, str]:
218def delete_user(
219    self,
220    user: 'mrsm.core.User',
221    debug: bool = False
222) -> SuccessTuple:
223    """
224    Delete a user's keys.
225    """
226    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
227    users_pipe = self.get_users_pipe()
228    keys_vals = self.get_user_keys_vals(user)
229    try:
230        old_keys_vals = {
231            key: self.get(key)
232            for key in keys_vals
233        }
234    except Exception as e:
235        return False, f"Failed to delete user:\n{e}"
236
237    clear_success, clear_msg = users_pipe.clear(params={'user_id': user_id})
238    if not clear_success:
239        return clear_success, clear_msg
240
241    try:
242        for key in keys_vals:
243            self.client.delete(key)
244        success, msg = True, "Success"
245    except Exception as e:
246        success = False
247        msg = f"Failed to delete user:\n{e}"
248
249    if not success:
250        try:
251            for key, old_val in old_keys_vals.items():
252                self.set(key, old_val)
253        except Exception:
254            pass
255
256    return success, msg

Delete a user's keys.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
259def get_users(
260    self,
261    debug: bool = False,
262    **kw: Any
263) -> List[str]:
264    """
265    Get the registered usernames.
266    """
267    users_pipe = self.get_users_pipe()
268    df = users_pipe.get_data()
269    if df is None:
270        return []
271
272    return list(df['username'])

Get the registered usernames.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
275def get_user_password_hash(
276    self,
277    user: 'mrsm.core.User',
278    debug: bool = False,
279    **kw: Any
280) -> Union[str, None]:
281    """
282    Return the password has for a user.
283    """
284    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
285    user_id_password_hash_key = self.get_user_key(user_id, 'password_hash')
286    try:
287        return self.get(user_id_password_hash_key)
288    except Exception:
289        return None

Return the password has for a user.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
292def get_user_type(
293    self,
294    user: 'mrsm.core.User',
295    debug: bool = False,
296    **kw: Any
297) -> Union[str, None]:
298    """
299    Return the user's type.
300    """
301    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
302    user_id_type_key = self.get_user_key(user_id, 'type')
303    try:
304        return self.get(user_id_type_key)
305    except Exception:
306        return None

Return the user's type.

def get_plugins_pipe(self) -> meerschaum.Pipe:
20def get_plugins_pipe(self) -> mrsm.Pipe:
21    """
22    Return the pipe to store the plugins.
23    """
24    return mrsm.Pipe(
25        'mrsm', 'plugins',
26        columns=['plugin_name'],
27        temporary=True,
28        target=PLUGINS_TABLE,
29        instance=self,
30    )

Return the pipe to store the plugins.

@classmethod
def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str:
33@classmethod
34def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str:
35    """
36    Return the key for a plugin's attribute.
37    """
38    return cls.get_entity_key(PLUGIN_PREFIX, plugin_name, sub_key)

Return the key for a plugin's attribute.

@classmethod
def get_plugin_keys_vals( cls, plugin: meerschaum.Plugin, mutable_only: bool = False) -> Dict[str, str]:
41@classmethod
42def get_plugin_keys_vals(
43    cls,
44    plugin: 'mrsm.core.Plugin',
45    mutable_only: bool = False,
46) -> Dict[str, str]:
47    """
48    Return a dictionary containing keys and values to set for the plugin.
49
50    Parameters
51    ----------
52    plugin: mrsm.core.Plugin
53        The plugin for which to generate the keys.
54
55    mutable_only: bool, default False
56        If `True`, only return keys which may be edited.
57
58    Returns
59    -------
60    A dictionary mapping a plugins's keys to values.
61    """
62    plugin_attributes_str = json.dumps(plugin.attributes, separators=(',', ':'))
63    mutable_keys_vals = {
64        cls.get_plugin_key(plugin.name, 'attributes'): plugin_attributes_str,
65        cls.get_plugin_key(plugin.name, 'version'): plugin.version,
66    }
67    if mutable_only:
68        return mutable_keys_vals
69
70    immutable_keys_vals = {
71        cls.get_plugin_key(plugin.name, 'user_id'): plugin.user_id,
72    }
73
74    return {**immutable_keys_vals, **mutable_keys_vals}

Return a dictionary containing keys and values to set for the plugin.

Parameters
  • plugin (mrsm.core.Plugin): The plugin for which to generate the keys.
  • mutable_only (bool, default False): If True, only return keys which may be edited.
Returns
  • A dictionary mapping a plugins's keys to values.
def register_plugin( self, plugin: meerschaum.Plugin, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 77def register_plugin(
 78    self,
 79    plugin: 'mrsm.core.Plugin',
 80    force: bool = False,
 81    debug: bool = False,
 82    **kw: Any
 83) -> SuccessTuple:
 84    """Register a new plugin to the `mrsm_plugins` "table"."""
 85    from meerschaum.utils.misc import generate_password
 86
 87    plugins_pipe = self.get_plugins_pipe()
 88    keys_vals = self.get_plugin_keys_vals(plugin)
 89
 90    try:
 91        sync_success, sync_msg = plugins_pipe.sync(
 92            [
 93                {
 94                    'plugin_name': plugin.name,
 95                    'user_id': plugin.user_id,
 96                },
 97            ],
 98            check_existing=False,
 99            debug=debug,
100        )
101        if not sync_success:
102            return sync_success, sync_msg
103
104        for key, val in keys_vals.items():
105            if val is not None:
106                self.set(key, val)
107
108        success, msg = True, "Success"
109    except Exception as e:
110        success = False
111        msg = f"Failed to register plugin '{plugin.name}':\n{e}"
112
113    if not success:
114        for key in keys_vals:
115            try:
116                self.client.delete(key)
117            except Exception:
118                pass
119
120    return success, msg

Register a new plugin to the mrsm_plugins "table".

def get_plugin_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
123def get_plugin_id(
124    self,
125    plugin: 'mrsm.core.Plugin',
126    debug: bool = False
127) -> Union[str, None]:
128    """
129    Return a plugin's ID.
130    """
131    user_id = self.get_plugin_user_id(plugin, debug=debug)
132    return plugin.name if user_id is not None else None

Return a plugin's ID.

def get_plugin_version( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
135def get_plugin_version(
136    self,
137    plugin: 'mrsm.core.Plugin',
138    debug: bool = False,
139) -> Union[str, None]:
140    """
141    Return a plugin's version.
142    """
143    version_key = self.get_plugin_key(plugin.name, 'version')
144
145    try:
146        return self.get(version_key)
147    except Exception:
148        return None

Return a plugin's version.

def get_plugin_user_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
151def get_plugin_user_id(
152    self,
153    plugin: 'mrsm.core.Plugin',
154    debug: bool = False
155) -> Union[str, None]:
156    """
157    Return a plugin's user ID.
158    """
159    user_id_key = self.get_plugin_key(plugin.name, 'user_id')
160
161    try:
162        return self.get(user_id_key)
163    except Exception:
164        return None

Return a plugin's user ID.

def get_plugin_username( self, plugin: meerschaum.Plugin, debug: bool = False) -> str:
167def get_plugin_username(
168    self,
169    plugin: 'mrsm.core.Plugin',
170    debug: bool = False
171) -> Union[str]:
172    """
173    Return the username of a plugin's owner.
174    """
175    user_id = self.get_plugin_user_id(plugin, debug=debug)
176    if user_id is None:
177        return None
178
179    username_key = self.get_user_key(user_id, 'username')
180    try:
181        return self.get(username_key)
182    except Exception:
183        return None

Return the username of a plugin's owner.

def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
186def get_plugin_attributes(
187    self,
188    plugin: 'mrsm.core.Plugin',
189    debug: bool = False
190) -> Dict[str, Any]:
191    """
192    Return the attributes of a plugin.
193    """
194    attributes_key = self.get_plugin_key(plugin.name, 'attributes')
195    try:
196        attributes_str = self.get(attributes_key)
197        if not attributes_str:
198            return {}
199        return json.loads(attributes_str)
200    except Exception:
201        return {}

Return the attributes of a plugin.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
204def delete_plugin(
205    self,
206    plugin: 'mrsm.core.Plugin',
207    debug: bool = False,
208    **kw: Any
209) -> SuccessTuple:
210    """
211    Delete a plugin from the plugins table.
212    """
213    plugins_pipe = self.get_plugins_pipe()
214    clear_success, clear_msg = plugins_pipe.clear(params={'plugin_name': plugin.name}, debug=debug)
215    if not clear_success:
216        return clear_success, clear_msg
217
218    keys_vals = self.get_plugin_keys_vals(plugin)
219    try:
220        old_keys_vals = {
221            key: self.get(key)
222            for key in keys_vals
223        }
224    except Exception as e:
225        return False, f"Failed to delete plugin '{plugin.name}':\n{e}"
226
227    try:
228        for key in keys_vals:
229            self.client.delete(key)
230        success, msg = True, "Success"
231    except Exception as e:
232        success = False
233        msg = f"Failed to delete plugin '{plugin.name}':\n{e}"
234
235    if not success:
236        try:
237            for key, old_val in old_keys_vals.items():
238                self.set(key, old_val)
239        except Exception:
240            pass
241
242    return success, msg

Delete a plugin from the plugins table.