meerschaum.connectors.valkey

Import the ValkeyConnector.

 1#! /usr/bin/env python3
 2# vim:fenc=utf-8
 3
 4"""
 5Import the `ValkeyConnector`.
 6"""
 7
 8from meerschaum.connectors.valkey._ValkeyConnector import ValkeyConnector
 9
10__all__ = ('ValkeyConnector',)
@make_connector
class ValkeyConnector(meerschaum.connectors._Connector.Connector):
 19@make_connector
 20class ValkeyConnector(Connector):
 21    """
 22    Manage a Valkey instance.
 23
 24    Build a `ValkeyConnector` from connection attributes or a URI string.
 25    """
 26    IS_INSTANCE: bool = True
 27    REQUIRED_ATTRIBUTES: List[str] = ['host']
 28    OPTIONAL_ATTRIBUTES: List[str] = [
 29        'port', 'username', 'password', 'db', 'socket_timeout',
 30    ]
 31    DEFAULT_ATTRIBUTES: Dict[str, Any] = {
 32        'username': 'default',
 33        'port': 6379,
 34        'db': 0,
 35        'socket_timeout': 300,
 36    }
 37    KEY_SEPARATOR: str = ':'
 38
 39    from ._pipes import (
 40        register_pipe,
 41        get_pipe_id,
 42        get_pipe_attributes,
 43        edit_pipe,
 44        pipe_exists,
 45        drop_pipe,
 46        delete_pipe,
 47        get_pipe_data,
 48        sync_pipe,
 49        get_pipe_columns_types,
 50        clear_pipe,
 51        get_sync_time,
 52        get_pipe_rowcount,
 53        fetch_pipes_keys,
 54        get_document_key,
 55        get_table_quoted_doc_key,
 56    )
 57    from ._fetch import (
 58        fetch,
 59    )
 60
 61    from ._users import (
 62        get_users_pipe,
 63        get_user_key,
 64        get_user_keys_vals,
 65        register_user,
 66        get_user_id,
 67        edit_user,
 68        get_user_attributes,
 69        delete_user,
 70        get_users,
 71        get_user_password_hash,
 72        get_user_type,
 73    )
 74    from ._plugins import (
 75        get_plugins_pipe,
 76        get_plugin_key,
 77        get_plugin_keys_vals,
 78        register_plugin,
 79        get_plugin_id,
 80        get_plugin_version,
 81        get_plugin_user_id,
 82        get_plugin_username,
 83        get_plugin_attributes,
 84        get_plugins,
 85        delete_plugin,
 86    )
 87
 88    @property
 89    def client(self):
 90        """
 91        Return the Valkey client.
 92        """
 93        if '_client' in self.__dict__:
 94            return self.__dict__['_client']
 95
 96        valkey = mrsm.attempt_import('valkey')
 97
 98        if 'uri' in self.__dict__:
 99            self._client = valkey.Valkey.from_url(self.__dict__.get('uri'))
100            return self._client
101
102        optional_kwargs = {
103            key: self.__dict__.get(key)
104            for key in self.OPTIONAL_ATTRIBUTES
105            if key in self.__dict__
106        }
107        connection_kwargs = {
108            'host': self.host,
109            **optional_kwargs
110        }
111
112        self._client = valkey.Valkey(**connection_kwargs)
113        return self._client
114
115    @property
116    def URI(self) -> str:
117        """
118        Return the connection URI for this connector.
119        """
120        import urllib.parse
121
122        if 'uri' in self.__dict__:
123            return self.__dict__.get('uri')
124
125        uri = "valkey://"
126        if 'username' in self.__dict__:
127            uri += urllib.parse.quote_plus(self.username) + ':'
128
129        if 'password' in self.__dict__:
130            uri += urllib.parse.quote_plus(self.password) + '@'
131
132        if 'host' in self.__dict__:
133            uri += self.host
134
135        if 'port' in self.__dict__:
136            uri += f':{self.port}'
137
138        if 'db' in self.__dict__:
139            uri += f"/{self.db}"
140
141        if 'socket_timeout' in self.__dict__:
142            uri += f"?timeout={self.socket_timeout}s"
143
144        return uri
145
146    def set(self, key: str, value: Any, **kwargs: Any) -> None:
147        """
148        Set the `key` to `value`.
149        """
150        return self.client.set(key, value, **kwargs)
151
152    def get(self, key: str) -> Union[str, None]:
153        """
154        Get the value for `key`.
155        """
156        val = self.client.get(key)
157        if val is None:
158            return None
159
160        return val.decode('utf-8')
161
162    def test_connection(self) -> bool:
163        """
164        Return whether a connection may be established.
165        """
166        return self.client.ping()
167
168    @classmethod
169    def quote_table(cls, table: str) -> str:
170        """
171        Return a quoted key.
172        """
173        return shlex.quote(table)
174
175    @classmethod
176    def get_counter_key(cls, table: str) -> str:
177        """
178        Return the counter key for a given table.
179        """
180        table_name = cls.quote_table(table)
181        return f"{table_name}:counter"
182
183    def push_df(
184        self,
185        df: 'pd.DataFrame',
186        table: str,
187        datetime_column: Optional[str] = None,
188        debug: bool = False,
189    ) -> int:
190        """
191        Append a pandas DataFrame to a table.
192
193        Parameters
194        ----------
195        df: pd.DataFrame
196            The pandas DataFrame to append to the table.
197
198        table: str
199            The "table" name (root key).
200
201        datetime_column: Optional[str], default None
202            If provided, use this key as the datetime index.
203
204        Returns
205        -------
206        The current index counter value (how many docs have been pushed).
207        """
208        from meerschaum.utils.dataframe import to_json
209        docs_str = to_json(df)
210        docs = json.loads(docs_str)
211        return self.push_docs(
212            docs,
213            table,
214            datetime_column=datetime_column,
215            debug=debug,
216        )
217
218    def push_docs(
219        self,
220        docs: List[Dict[str, Any]],
221        table: str,
222        datetime_column: Optional[str] = None,
223        debug: bool = False,
224    ) -> int:
225        """
226        Append a list of documents to a table.
227
228        Parameters
229        ----------
230        docs: List[Dict[str, Any]]
231            The docs to be pushed.
232            All keys and values will be coerced into strings.
233
234        table: str
235            The "table" name (root key).
236
237        datetime_column: Optional[str], default None
238            If set, create a sorted set with this datetime column as the index.
239            Otherwise push the docs to a list.
240
241        Returns
242        -------
243        The current index counter value (how many docs have been pushed).
244        """
245        from meerschaum.utils.dtypes import json_serialize_value
246        table_name = self.quote_table(table)
247        datetime_column_key = self.get_datetime_column_key(table)
248        remote_datetime_column = self.get(datetime_column_key)
249        datetime_column = datetime_column or remote_datetime_column
250        dateutil_parser = mrsm.attempt_import('dateutil.parser')
251
252        old_len = (
253            self.client.zcard(table_name)
254            if datetime_column
255            else self.client.scard(table_name)
256        )
257        for doc in docs:
258            original_dt_val = (
259                doc[datetime_column]
260                if datetime_column and datetime_column in doc
261                else 0
262            )
263            dt_val = (
264                dateutil_parser.parse(str(original_dt_val))
265                if not isinstance(original_dt_val, int)
266                else int(original_dt_val)
267            ) if datetime_column else None
268            ts = (
269                int(dt_val.replace(tzinfo=timezone.utc).timestamp())
270                if isinstance(dt_val, datetime)
271                else int(dt_val)
272            ) if datetime_column else None
273            doc_str = json.dumps(
274                doc,
275                default=json_serialize_value,
276                separators=(',', ':'),
277                sort_keys=True,
278            )
279            if datetime_column:
280                self.client.zadd(table_name, {doc_str: ts})
281            else:
282                self.client.sadd(table_name, doc_str)
283
284        if datetime_column:
285            self.set(datetime_column_key, datetime_column)
286        new_len = (
287            self.client.zcard(table_name)
288            if datetime_column
289            else self.client.scard(table_name)
290        )
291
292        return new_len - old_len
293
294    def _push_hash_docs_to_list(self, docs: List[Dict[str, Any]], table: str) -> int:
295        table_name = self.quote_table(table)
296        next_ix = max(self.client.llen(table_name) or 0, 1)
297        for i, doc in enumerate(docs):
298            doc_key = f"{table_name}:{next_ix + i}"
299            self.client.hset(
300                doc_key,
301                mapping={
302                    str(k): str(v)
303                    for k, v in doc.items()
304                },
305            )
306            self.client.rpush(table_name, doc_key)
307
308        return next_ix + len(docs)
309
310    def get_datetime_column_key(self, table: str) -> str:
311        """
312        Return the key to store the datetime index for `table`.
313        """
314        table_name = self.quote_table(table)
315        return f'{table_name}:datetime_column'
316
317    def read(
318        self,
319        table: str,
320        begin: Union[datetime, int, str, None] = None,
321        end: Union[datetime, int, str, None] = None,
322        params: Optional[Dict[str, Any]] = None,
323        datetime_column: Optional[str] = None,
324        select_columns: Optional[List[str]] = None,
325        omit_columns: Optional[List[str]] = None,
326        debug: bool = False
327    ) -> Union['pd.DataFrame', None]:
328        """
329        Query the table and return the result dataframe.
330
331        Parameters
332        ----------
333        table: str
334            The "table" name to be queried.
335
336        begin: Union[datetime, int, str, None], default None
337            If provided, only return rows greater than or equal to this datetime.
338
339        end: Union[datetime, int, str, None], default None
340            If provided, only return rows older than this datetime.
341
342        params: Optional[Dict[str, Any]]
343            Additional Meerschaum filter parameters.
344
345        datetime_column: Optional[str], default None
346            If provided, use this column for the datetime index.
347            Otherwise infer from the table metadata.
348
349        select_columns: Optional[List[str]], default None
350            If provided, only return these columns.
351
352        omit_columns: Optional[List[str]], default None
353            If provided, do not include these columns in the result.
354
355        Returns
356        -------
357        A Pandas DataFrame of the result, or `None`.
358        """
359        from meerschaum.utils.dataframe import parse_df_datetimes, query_df
360        docs = self.read_docs(
361            table,
362            begin=begin,
363            end=end,
364            debug=debug,
365        )
366        df = parse_df_datetimes(docs)
367        datetime_column_key = self.get_datetime_column_key(table)
368        datetime_column = datetime_column or self.get(datetime_column_key)
369
370        return query_df(
371            df,
372            begin=(begin if datetime_column is not None else None),
373            end=(end if datetime_column is not None else None),
374            params=params,
375            datetime_column=datetime_column,
376            select_columns=select_columns,
377            omit_columns=omit_columns,
378            inplace=True,
379            reset_index=True,
380            debug=debug,
381        )
382
383    def read_docs(
384        self,
385        table: str,
386        begin: Union[datetime, int, str, None] = None,
387        end: Union[datetime, int, str, None] = None,
388        debug: bool = False,
389    ) -> List[Dict[str, str]]:
390        """
391        Return a list of previously pushed docs.
392
393        Parameters
394        ----------
395        table: str
396            The "table" name (root key) under which the docs were pushed.
397
398        begin: Union[datetime, int, str, None], default None
399            If provided and the table was created with a datetime index, only return documents
400            newer than this datetime.
401            If the table was not created with a datetime index and `begin` is an `int`,
402            return documents with a positional index greater than or equal to this value.
403
404        end: Union[datetime, int, str, None], default None
405            If provided and the table was created with a datetime index, only return documents
406            older than this datetime.
407            If the table was not created with a datetime index and `begin` is an `int`,
408            return documents with a positional index less than this value.
409
410        Returns
411        -------
412        A list of dictionaries, where all keys and values are strings.
413        """
414        from meerschaum.utils.dtypes import coerce_timezone
415        table_name = self.quote_table(table)
416        datetime_column_key = self.get_datetime_column_key(table)
417        datetime_column = self.get(datetime_column_key)
418
419        if debug:
420            dprint(f"Reading documents from '{table}' with {begin=}, {end=}")
421
422        if not datetime_column:
423            return [
424                json.loads(doc_bytes.decode('utf-8'))
425                for doc_bytes in self.client.smembers(table_name)
426            ]
427
428        dateutil_parser = mrsm.attempt_import('dateutil.parser')
429
430        if isinstance(begin, str):
431            begin = coerce_timezone(dateutil_parser.parse(begin))
432
433        if isinstance(end, str):
434            end = coerce_timezone(dateutil_parser.parse(end))
435
436        begin_ts = (
437            (
438                int(begin.replace(tzinfo=timezone.utc).timestamp())
439                if isinstance(begin, datetime)
440                else int(begin)
441            )
442            if begin is not None else '-inf'
443        )
444        end_ts = (
445            (
446                int(end.replace(tzinfo=timezone.utc).timestamp())
447                if isinstance(end, datetime)
448                else int(end)
449            )
450            if end is not None else '+inf'
451        )
452
453        if debug:
454            dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
455
456        return [
457            json.loads(doc_bytes.decode('utf-8'))
458            for doc_bytes in self.client.zrangebyscore(
459                table_name,
460                begin_ts,
461                end_ts,
462                withscores=False,
463            )
464        ]
465
466    def _read_docs_from_list(
467        self,
468        table: str,
469        begin_ix: Optional[int] = 0,
470        end_ix: Optional[int] = -1,
471        debug: bool = False,
472    ):
473        """
474        Read a list of documents from a "table".
475
476        Parameters
477        ----------
478        table: str
479            The "table" (root key) from which to read docs.
480
481        begin_ix: Optional[int], default 0
482            If provided, only read documents from this starting index.
483
484        end_ix: Optional[int], default -1
485            If provided, only read documents up to (not including) this index.
486
487        Returns
488        -------
489        A list of documents.
490        """
491        if begin_ix is None:
492            begin_ix = 0
493
494        if end_ix == 0:
495            return
496
497        if end_ix is None:
498            end_ix = -1
499        else:
500            end_ix -= 1
501
502        table_name = self.quote_table(table)
503        doc_keys = self.client.lrange(table_name, begin_ix, end_ix)
504        for doc_key in doc_keys:
505            yield {
506                key.decode('utf-8'): value.decode('utf-8')
507                for key, value in self.client.hgetall(doc_key).items()
508            }
509
510    def drop_table(self, table: str, debug: bool = False) -> None:
511        """
512        Drop a "table" of documents.
513
514        Parameters
515        ----------
516        table: str
517            The "table" name (root key) to be deleted.
518        """
519        table_name = self.quote_table(table)
520        datetime_column_key = self.get_datetime_column_key(table)
521        self.client.delete(table_name)
522        self.client.delete(datetime_column_key)
523
524    @classmethod
525    def get_entity_key(cls, *keys: Any) -> str:
526        """
527        Return a joined key to set an entity.
528        """
529        if not keys:
530            raise ValueError("No keys to be joined.")
531
532        for key in keys:
533            if cls.KEY_SEPARATOR in str(key):
534                raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.")
535
536        return cls.KEY_SEPARATOR.join([str(key) for key in keys])

Manage a Valkey instance.

Build a ValkeyConnector from connection attributes or a URI string.

IS_INSTANCE: bool = True
REQUIRED_ATTRIBUTES: List[str] = ['host']
OPTIONAL_ATTRIBUTES: List[str] = ['port', 'username', 'password', 'db', 'socket_timeout']
DEFAULT_ATTRIBUTES: Dict[str, Any] = {'username': 'default', 'port': 6379, 'db': 0, 'socket_timeout': 300}
KEY_SEPARATOR: str = ':'
client
 88    @property
 89    def client(self):
 90        """
 91        Return the Valkey client.
 92        """
 93        if '_client' in self.__dict__:
 94            return self.__dict__['_client']
 95
 96        valkey = mrsm.attempt_import('valkey')
 97
 98        if 'uri' in self.__dict__:
 99            self._client = valkey.Valkey.from_url(self.__dict__.get('uri'))
100            return self._client
101
102        optional_kwargs = {
103            key: self.__dict__.get(key)
104            for key in self.OPTIONAL_ATTRIBUTES
105            if key in self.__dict__
106        }
107        connection_kwargs = {
108            'host': self.host,
109            **optional_kwargs
110        }
111
112        self._client = valkey.Valkey(**connection_kwargs)
113        return self._client

Return the Valkey client.

URI: str
115    @property
116    def URI(self) -> str:
117        """
118        Return the connection URI for this connector.
119        """
120        import urllib.parse
121
122        if 'uri' in self.__dict__:
123            return self.__dict__.get('uri')
124
125        uri = "valkey://"
126        if 'username' in self.__dict__:
127            uri += urllib.parse.quote_plus(self.username) + ':'
128
129        if 'password' in self.__dict__:
130            uri += urllib.parse.quote_plus(self.password) + '@'
131
132        if 'host' in self.__dict__:
133            uri += self.host
134
135        if 'port' in self.__dict__:
136            uri += f':{self.port}'
137
138        if 'db' in self.__dict__:
139            uri += f"/{self.db}"
140
141        if 'socket_timeout' in self.__dict__:
142            uri += f"?timeout={self.socket_timeout}s"
143
144        return uri

Return the connection URI for this connector.

def set(self, key: str, value: Any, **kwargs: Any) -> None:
146    def set(self, key: str, value: Any, **kwargs: Any) -> None:
147        """
148        Set the `key` to `value`.
149        """
150        return self.client.set(key, value, **kwargs)

Set the key to value.

def get(self, key: str) -> Optional[str]:
152    def get(self, key: str) -> Union[str, None]:
153        """
154        Get the value for `key`.
155        """
156        val = self.client.get(key)
157        if val is None:
158            return None
159
160        return val.decode('utf-8')

Get the value for key.

def test_connection(self) -> bool:
162    def test_connection(self) -> bool:
163        """
164        Return whether a connection may be established.
165        """
166        return self.client.ping()

Return whether a connection may be established.

@classmethod
def quote_table(cls, table: str) -> str:
168    @classmethod
169    def quote_table(cls, table: str) -> str:
170        """
171        Return a quoted key.
172        """
173        return shlex.quote(table)

Return a quoted key.

@classmethod
def get_counter_key(cls, table: str) -> str:
175    @classmethod
176    def get_counter_key(cls, table: str) -> str:
177        """
178        Return the counter key for a given table.
179        """
180        table_name = cls.quote_table(table)
181        return f"{table_name}:counter"

Return the counter key for a given table.

def push_df( self, df: 'pd.DataFrame', table: str, datetime_column: Optional[str] = None, debug: bool = False) -> int:
183    def push_df(
184        self,
185        df: 'pd.DataFrame',
186        table: str,
187        datetime_column: Optional[str] = None,
188        debug: bool = False,
189    ) -> int:
190        """
191        Append a pandas DataFrame to a table.
192
193        Parameters
194        ----------
195        df: pd.DataFrame
196            The pandas DataFrame to append to the table.
197
198        table: str
199            The "table" name (root key).
200
201        datetime_column: Optional[str], default None
202            If provided, use this key as the datetime index.
203
204        Returns
205        -------
206        The current index counter value (how many docs have been pushed).
207        """
208        from meerschaum.utils.dataframe import to_json
209        docs_str = to_json(df)
210        docs = json.loads(docs_str)
211        return self.push_docs(
212            docs,
213            table,
214            datetime_column=datetime_column,
215            debug=debug,
216        )

Append a pandas DataFrame to a table.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to append to the table.
  • table (str): The "table" name (root key).
  • datetime_column (Optional[str], default None): If provided, use this key as the datetime index.
Returns
  • The current index counter value (how many docs have been pushed).
def push_docs( self, docs: List[Dict[str, Any]], table: str, datetime_column: Optional[str] = None, debug: bool = False) -> int:
218    def push_docs(
219        self,
220        docs: List[Dict[str, Any]],
221        table: str,
222        datetime_column: Optional[str] = None,
223        debug: bool = False,
224    ) -> int:
225        """
226        Append a list of documents to a table.
227
228        Parameters
229        ----------
230        docs: List[Dict[str, Any]]
231            The docs to be pushed.
232            All keys and values will be coerced into strings.
233
234        table: str
235            The "table" name (root key).
236
237        datetime_column: Optional[str], default None
238            If set, create a sorted set with this datetime column as the index.
239            Otherwise push the docs to a list.
240
241        Returns
242        -------
243        The current index counter value (how many docs have been pushed).
244        """
245        from meerschaum.utils.dtypes import json_serialize_value
246        table_name = self.quote_table(table)
247        datetime_column_key = self.get_datetime_column_key(table)
248        remote_datetime_column = self.get(datetime_column_key)
249        datetime_column = datetime_column or remote_datetime_column
250        dateutil_parser = mrsm.attempt_import('dateutil.parser')
251
252        old_len = (
253            self.client.zcard(table_name)
254            if datetime_column
255            else self.client.scard(table_name)
256        )
257        for doc in docs:
258            original_dt_val = (
259                doc[datetime_column]
260                if datetime_column and datetime_column in doc
261                else 0
262            )
263            dt_val = (
264                dateutil_parser.parse(str(original_dt_val))
265                if not isinstance(original_dt_val, int)
266                else int(original_dt_val)
267            ) if datetime_column else None
268            ts = (
269                int(dt_val.replace(tzinfo=timezone.utc).timestamp())
270                if isinstance(dt_val, datetime)
271                else int(dt_val)
272            ) if datetime_column else None
273            doc_str = json.dumps(
274                doc,
275                default=json_serialize_value,
276                separators=(',', ':'),
277                sort_keys=True,
278            )
279            if datetime_column:
280                self.client.zadd(table_name, {doc_str: ts})
281            else:
282                self.client.sadd(table_name, doc_str)
283
284        if datetime_column:
285            self.set(datetime_column_key, datetime_column)
286        new_len = (
287            self.client.zcard(table_name)
288            if datetime_column
289            else self.client.scard(table_name)
290        )
291
292        return new_len - old_len

Append a list of documents to a table.

Parameters
  • docs (List[Dict[str, Any]]): The docs to be pushed. All keys and values will be coerced into strings.
  • table (str): The "table" name (root key).
  • datetime_column (Optional[str], default None): If set, create a sorted set with this datetime column as the index. Otherwise push the docs to a list.
Returns
  • The current index counter value (how many docs have been pushed).
def get_datetime_column_key(self, table: str) -> str:
310    def get_datetime_column_key(self, table: str) -> str:
311        """
312        Return the key to store the datetime index for `table`.
313        """
314        table_name = self.quote_table(table)
315        return f'{table_name}:datetime_column'

Return the key to store the datetime index for table.

def read( self, table: str, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, debug: bool = False) -> Optional[ForwardRef('pd.DataFrame')]:
317    def read(
318        self,
319        table: str,
320        begin: Union[datetime, int, str, None] = None,
321        end: Union[datetime, int, str, None] = None,
322        params: Optional[Dict[str, Any]] = None,
323        datetime_column: Optional[str] = None,
324        select_columns: Optional[List[str]] = None,
325        omit_columns: Optional[List[str]] = None,
326        debug: bool = False
327    ) -> Union['pd.DataFrame', None]:
328        """
329        Query the table and return the result dataframe.
330
331        Parameters
332        ----------
333        table: str
334            The "table" name to be queried.
335
336        begin: Union[datetime, int, str, None], default None
337            If provided, only return rows greater than or equal to this datetime.
338
339        end: Union[datetime, int, str, None], default None
340            If provided, only return rows older than this datetime.
341
342        params: Optional[Dict[str, Any]]
343            Additional Meerschaum filter parameters.
344
345        datetime_column: Optional[str], default None
346            If provided, use this column for the datetime index.
347            Otherwise infer from the table metadata.
348
349        select_columns: Optional[List[str]], default None
350            If provided, only return these columns.
351
352        omit_columns: Optional[List[str]], default None
353            If provided, do not include these columns in the result.
354
355        Returns
356        -------
357        A Pandas DataFrame of the result, or `None`.
358        """
359        from meerschaum.utils.dataframe import parse_df_datetimes, query_df
360        docs = self.read_docs(
361            table,
362            begin=begin,
363            end=end,
364            debug=debug,
365        )
366        df = parse_df_datetimes(docs)
367        datetime_column_key = self.get_datetime_column_key(table)
368        datetime_column = datetime_column or self.get(datetime_column_key)
369
370        return query_df(
371            df,
372            begin=(begin if datetime_column is not None else None),
373            end=(end if datetime_column is not None else None),
374            params=params,
375            datetime_column=datetime_column,
376            select_columns=select_columns,
377            omit_columns=omit_columns,
378            inplace=True,
379            reset_index=True,
380            debug=debug,
381        )

Query the table and return the result dataframe.

Parameters
  • table (str): The "table" name to be queried.
  • begin (Union[datetime, int, str, None], default None): If provided, only return rows greater than or equal to this datetime.
  • end (Union[datetime, int, str, None], default None): If provided, only return rows older than this datetime.
  • params (Optional[Dict[str, Any]]): Additional Meerschaum filter parameters.
  • datetime_column (Optional[str], default None): If provided, use this column for the datetime index. Otherwise infer from the table metadata.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
Returns
  • A Pandas DataFrame of the result, or None.
def read_docs( self, table: str, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, debug: bool = False) -> List[Dict[str, str]]:
383    def read_docs(
384        self,
385        table: str,
386        begin: Union[datetime, int, str, None] = None,
387        end: Union[datetime, int, str, None] = None,
388        debug: bool = False,
389    ) -> List[Dict[str, str]]:
390        """
391        Return a list of previously pushed docs.
392
393        Parameters
394        ----------
395        table: str
396            The "table" name (root key) under which the docs were pushed.
397
398        begin: Union[datetime, int, str, None], default None
399            If provided and the table was created with a datetime index, only return documents
400            newer than this datetime.
401            If the table was not created with a datetime index and `begin` is an `int`,
402            return documents with a positional index greater than or equal to this value.
403
404        end: Union[datetime, int, str, None], default None
405            If provided and the table was created with a datetime index, only return documents
406            older than this datetime.
407            If the table was not created with a datetime index and `begin` is an `int`,
408            return documents with a positional index less than this value.
409
410        Returns
411        -------
412        A list of dictionaries, where all keys and values are strings.
413        """
414        from meerschaum.utils.dtypes import coerce_timezone
415        table_name = self.quote_table(table)
416        datetime_column_key = self.get_datetime_column_key(table)
417        datetime_column = self.get(datetime_column_key)
418
419        if debug:
420            dprint(f"Reading documents from '{table}' with {begin=}, {end=}")
421
422        if not datetime_column:
423            return [
424                json.loads(doc_bytes.decode('utf-8'))
425                for doc_bytes in self.client.smembers(table_name)
426            ]
427
428        dateutil_parser = mrsm.attempt_import('dateutil.parser')
429
430        if isinstance(begin, str):
431            begin = coerce_timezone(dateutil_parser.parse(begin))
432
433        if isinstance(end, str):
434            end = coerce_timezone(dateutil_parser.parse(end))
435
436        begin_ts = (
437            (
438                int(begin.replace(tzinfo=timezone.utc).timestamp())
439                if isinstance(begin, datetime)
440                else int(begin)
441            )
442            if begin is not None else '-inf'
443        )
444        end_ts = (
445            (
446                int(end.replace(tzinfo=timezone.utc).timestamp())
447                if isinstance(end, datetime)
448                else int(end)
449            )
450            if end is not None else '+inf'
451        )
452
453        if debug:
454            dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
455
456        return [
457            json.loads(doc_bytes.decode('utf-8'))
458            for doc_bytes in self.client.zrangebyscore(
459                table_name,
460                begin_ts,
461                end_ts,
462                withscores=False,
463            )
464        ]

Return a list of previously pushed docs.

Parameters
  • table (str): The "table" name (root key) under which the docs were pushed.
  • begin (Union[datetime, int, str, None], default None): If provided and the table was created with a datetime index, only return documents newer than this datetime. If the table was not created with a datetime index and begin is an int, return documents with a positional index greater than or equal to this value.
  • end (Union[datetime, int, str, None], default None): If provided and the table was created with a datetime index, only return documents older than this datetime. If the table was not created with a datetime index and begin is an int, return documents with a positional index less than this value.
Returns
  • A list of dictionaries, where all keys and values are strings.
def drop_table(self, table: str, debug: bool = False) -> None:
510    def drop_table(self, table: str, debug: bool = False) -> None:
511        """
512        Drop a "table" of documents.
513
514        Parameters
515        ----------
516        table: str
517            The "table" name (root key) to be deleted.
518        """
519        table_name = self.quote_table(table)
520        datetime_column_key = self.get_datetime_column_key(table)
521        self.client.delete(table_name)
522        self.client.delete(datetime_column_key)

Drop a "table" of documents.

Parameters
  • table (str): The "table" name (root key) to be deleted.
@classmethod
def get_entity_key(cls, *keys: Any) -> str:
524    @classmethod
525    def get_entity_key(cls, *keys: Any) -> str:
526        """
527        Return a joined key to set an entity.
528        """
529        if not keys:
530            raise ValueError("No keys to be joined.")
531
532        for key in keys:
533            if cls.KEY_SEPARATOR in str(key):
534                raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.")
535
536        return cls.KEY_SEPARATOR.join([str(key) for key in keys])

Return a joined key to set an entity.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
142def register_pipe(
143    self,
144    pipe: mrsm.Pipe,
145    debug: bool = False,
146    **kwargs: Any
147) -> SuccessTuple:
148    """
149    Insert the pipe's attributes into the internal `pipes` table.
150
151    Parameters
152    ----------
153    pipe: mrsm.Pipe
154        The pipe to be registered.
155
156    Returns
157    -------
158    A `SuccessTuple` of the result.
159    """
160    attributes = {
161        'connector_keys': str(pipe.connector_keys),
162        'metric_key': str(pipe.metric_key),
163        'location_key': str(pipe.location_key),
164    }
165    parameters_str = json.dumps(
166        pipe._attributes.get('parameters', {}),
167        separators=(',', ':'),
168    )
169
170    pipe_key = get_pipe_key(pipe)
171    parameters_key = get_pipe_parameters_key(pipe)
172
173    try:
174        existing_pipe_id = self.get(pipe_key)
175        if existing_pipe_id is not None:
176            return False, f"{pipe} is already registered."
177
178        pipe_id = self.client.incr(PIPES_COUNTER)
179        _ = self.push_docs(
180            [{'pipe_id': pipe_id, **attributes}],
181            PIPES_TABLE,
182            datetime_column='pipe_id',
183            debug=debug,
184        )
185        self.set(pipe_key, pipe_id)
186        self.set(parameters_key, parameters_str)
187
188    except Exception as e:
189        return False, f"Failed to register {pipe}:\n{e}"
190
191    return True, "Success"

Insert the pipe's attributes into the internal pipes table.

Parameters
  • pipe (mrsm.Pipe): The pipe to be registered.
Returns
  • A SuccessTuple of the result.
def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Union[str, int, NoneType]:
194def get_pipe_id(
195    self,
196    pipe: mrsm.Pipe,
197    debug: bool = False,
198    **kwargs: Any
199) -> Union[str, int, None]:
200    """
201    Return the `_id` for the pipe if it exists.
202
203    Parameters
204    ----------
205    pipe: mrsm.Pipe
206        The pipe whose `_id` to fetch.
207
208    Returns
209    -------
210    The `_id` for the pipe's document or `None`.
211    """
212    pipe_key = get_pipe_key(pipe)
213    try:
214        return int(self.get(pipe_key))
215    except Exception:
216        pass
217    return None

Return the _id for the pipe if it exists.

Parameters
  • pipe (mrsm.Pipe): The pipe whose _id to fetch.
Returns
  • The _id for the pipe's document or None.
def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Dict[str, Any]:
220def get_pipe_attributes(
221    self,
222    pipe: mrsm.Pipe,
223    debug: bool = False,
224    **kwargs: Any
225) -> Dict[str, Any]:
226    """
227    Return the pipe's document from the internal `pipes` collection.
228
229    Parameters
230    ----------
231    pipe: mrsm.Pipe
232        The pipe whose attributes should be retrieved.
233
234    Returns
235    -------
236    The document that matches the keys of the pipe.
237    """
238    pipe_id = pipe.get_id(debug=debug)
239    if pipe_id is None:
240        return {}
241
242    parameters_key = get_pipe_parameters_key(pipe)
243    parameters_str = self.get(parameters_key)
244
245    parameters = json.loads(parameters_str) if parameters_str else {}
246
247    attributes = {
248        'connector_keys': pipe.connector_keys,
249        'metric_key': pipe.metric_key,
250        'location_key': pipe.location_key,
251        'parameters': parameters,
252        'pipe_id': pipe_id,
253    }
254    return attributes

Return the pipe's document from the internal pipes collection.

Parameters
  • pipe (mrsm.Pipe): The pipe whose attributes should be retrieved.
Returns
  • The document that matches the keys of the pipe.
def edit_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
257def edit_pipe(
258    self,
259    pipe: mrsm.Pipe,
260    debug: bool = False,
261    **kwargs: Any
262) -> mrsm.SuccessTuple:
263    """
264    Edit the attributes of the pipe.
265
266    Parameters
267    ----------
268    pipe: mrsm.Pipe
269        The pipe whose in-memory parameters must be persisted.
270
271    Returns
272    -------
273    A `SuccessTuple` indicating success.
274    """
275    pipe_id = pipe.get_id(debug=debug)
276    if pipe_id is None:
277        return False, f"{pipe} is not registered."
278
279    parameters_key = get_pipe_parameters_key(pipe)
280    parameters_str = json.dumps(pipe.parameters, separators=(',', ':'))
281    self.set(parameters_key, parameters_str)
282    return True, "Success"

Edit the attributes of the pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose in-memory parameters must be persisted.
Returns
  • A SuccessTuple indicating success.
def pipe_exists( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> bool:
285def pipe_exists(
286    self,
287    pipe: mrsm.Pipe,
288    debug: bool = False,
289    **kwargs: Any
290) -> bool:
291    """
292    Check whether a pipe's target table exists.
293
294    Parameters
295    ----------
296    pipe: mrsm.Pipe
297        The pipe to check whether its table exists.
298
299    Returns
300    -------
301    A `bool` indicating the table exists.
302    """
303    table_name = self.quote_table(pipe.target)
304    return self.client.exists(table_name) != 0

Check whether a pipe's target table exists.

Parameters
  • pipe (mrsm.Pipe): The pipe to check whether its table exists.
Returns
  • A bool indicating the table exists.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
307def drop_pipe(
308    self,
309    pipe: mrsm.Pipe,
310    debug: bool = False,
311    **kwargs: Any
312) -> mrsm.SuccessTuple:
313    """
314    Drop a pipe's collection if it exists.
315
316    Parameters
317    ----------
318    pipe: mrsm.Pipe
319        The pipe to be dropped.
320
321    Returns
322    -------
323    A `SuccessTuple` indicating success.
324    """
325    for chunk_begin, chunk_end in pipe.get_chunk_bounds(debug=debug):
326        clear_chunk_success, clear_chunk_msg = pipe.clear(
327            begin=chunk_begin,
328            end=chunk_end,
329            debug=debug,
330        )
331        if not clear_chunk_success:
332            return clear_chunk_success, clear_chunk_msg
333    try:
334        self.drop_table(pipe.target, debug=debug)
335    except Exception as e:
336        return False, f"Failed to drop {pipe}:\n{e}"
337
338    if 'valkey' not in pipe.parameters:
339        return True, "Success"
340
341    pipe.parameters['valkey']['dtypes'] = {}
342    if not pipe.temporary:
343        edit_success, edit_msg = pipe.edit(debug=debug)
344        if not edit_success:
345            return edit_success, edit_msg
346
347    return True, "Success"

Drop a pipe's collection if it exists.

Parameters
  • pipe (mrsm.Pipe): The pipe to be dropped.
Returns
  • A SuccessTuple indicating success.
def delete_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
350def delete_pipe(
351    self,
352    pipe: mrsm.Pipe,
353    debug: bool = False,
354    **kwargs: Any
355) -> mrsm.SuccessTuple:
356    """
357    Delete a pipe's registration from the `pipes` collection.
358
359    Parameters
360    ----------
361    pipe: mrsm.Pipe
362        The pipe to be deleted.
363
364    Returns
365    -------
366    A `SuccessTuple` indicating success.
367    """
368    drop_success, drop_message = pipe.drop(debug=debug)
369    if not drop_success:
370        return drop_success, drop_message
371
372    pipe_id = self.get_pipe_id(pipe, debug=debug)
373    if pipe_id is None:
374        return False, f"{pipe} is not registered."
375
376    pipe_key = get_pipe_key(pipe)
377    parameters_key = get_pipe_parameters_key(pipe)
378    self.client.delete(pipe_key)
379    self.client.delete(parameters_key)
380    df = self.read(PIPES_TABLE, params={'pipe_id': pipe_id})
381    docs = df.to_dict(orient='records')
382    if docs:
383        doc = docs[0]
384        doc_str = json.dumps(
385            doc,
386            default=json_serialize_value,
387            separators=(',', ':'),
388            sort_keys=True,
389        )
390        self.client.zrem(PIPES_TABLE, doc_str)
391    return True, "Success"

Delete a pipe's registration from the pipes collection.

Parameters
  • pipe (mrsm.Pipe): The pipe to be deleted.
Returns
  • A SuccessTuple indicating success.
def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Optional[ForwardRef('pd.DataFrame')]:
394def get_pipe_data(
395    self,
396    pipe: mrsm.Pipe,
397    select_columns: Optional[List[str]] = None,
398    omit_columns: Optional[List[str]] = None,
399    begin: Union[datetime, int, None] = None,
400    end: Union[datetime, int, None] = None,
401    params: Optional[Dict[str, Any]] = None,
402    debug: bool = False,
403    **kwargs: Any
404) -> Union['pd.DataFrame', None]:
405    """
406    Query a pipe's target table and return the DataFrame.
407
408    Parameters
409    ----------
410    pipe: mrsm.Pipe
411        The pipe with the target table from which to read.
412
413    select_columns: Optional[List[str]], default None
414        If provided, only select these given columns.
415        Otherwise select all available columns (i.e. `SELECT *`).
416
417    omit_columns: Optional[List[str]], default None
418        If provided, remove these columns from the selection.
419
420    begin: Union[datetime, int, None], default None
421        The earliest `datetime` value to search from (inclusive).
422
423    end: Union[datetime, int, None], default None
424        The lastest `datetime` value to search from (exclusive).
425
426    params: Optional[Dict[str, str]], default None
427        Additional filters to apply to the query.
428
429    Returns
430    -------
431    The target table's data as a DataFrame.
432    """
433    if not pipe.exists(debug=debug):
434        return None
435
436    from meerschaum.utils.dataframe import query_df, parse_df_datetimes
437    from meerschaum.utils.dtypes import are_dtypes_equal
438
439    valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {})
440    dt_col = pipe.columns.get('datetime', None)
441    table_name = self.quote_table(pipe.target)
442    indices = [col for col in pipe.columns.values() if col]
443    ix_docs = [
444        string_to_dict(doc.get('ix', '').replace(COLON, ':'))
445        for doc in self.read_docs(
446            pipe.target,
447            begin=begin,
448            end=end,
449            debug=debug,
450        )
451    ]
452    try:
453        docs_strings = [
454            self.get(
455                self.get_document_key(
456                    doc,
457                    indices,
458                    table_name,
459                )
460            )
461            for doc in ix_docs
462        ]
463    except Exception as e:
464        warn(f"Failed to fetch documents for {pipe}:\n{e}")
465        docs_strings = []
466
467    docs = [
468        json.loads(doc_str)
469        for doc_str in docs_strings
470        if doc_str
471    ]
472    ignore_dt_cols = [
473        col
474        for col, dtype in pipe.dtypes.items()
475        if not are_dtypes_equal(str(dtype), 'datetime')
476    ]
477
478    df = parse_df_datetimes(
479        docs,
480        ignore_cols=ignore_dt_cols,
481        chunksize=kwargs.get('chunksize', None),
482        strip_timezone=(pipe.tzinfo is None),
483        debug=debug,
484    )
485    for col, typ in valkey_dtypes.items():
486        try:
487            df[col] = df[col].astype(typ)
488        except Exception:
489            pass
490
491    df = pipe.enforce_dtypes(df, debug=debug)
492
493    if len(df) == 0:
494        return query_df(df, select_columns=select_columns, omit_columns=omit_columns)
495
496    return query_df(
497        df,
498        select_columns=select_columns,
499        omit_columns=omit_columns,
500        params=params,
501        begin=begin,
502        end=end,
503        datetime_column=dt_col,
504        inplace=True,
505        reset_index=True,
506    )

Query a pipe's target table and return the DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe with the target table from which to read.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, None], default None): The earliest datetime value to search from (inclusive).
  • end (Union[datetime, int, None], default None): The lastest datetime value to search from (exclusive).
  • params (Optional[Dict[str, str]], default None): Additional filters to apply to the query.
Returns
  • The target table's data as a DataFrame.
def sync_pipe( self, pipe: meerschaum.Pipe, df: 'pd.DataFrame' = None, check_existing: bool = True, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
509def sync_pipe(
510    self,
511    pipe: mrsm.Pipe,
512    df: 'pd.DataFrame' = None,
513    check_existing: bool = True,
514    debug: bool = False,
515    **kwargs: Any
516) -> mrsm.SuccessTuple:
517    """
518    Upsert new documents into the pipe's collection.
519
520    Parameters
521    ----------
522    pipe: mrsm.Pipe
523        The pipe whose collection should receive the new documents.
524
525    df: Union['pd.DataFrame', Iterator['pd.DataFrame']], default None
526        The data to be synced.
527
528    check_existing: bool, default True
529        If `False`, do not check the documents against existing data and instead insert directly.
530
531    Returns
532    -------
533    A `SuccessTuple` indicating success.
534    """
535    from meerschaum.utils.dtypes import are_dtypes_equal
536    dt_col = pipe.columns.get('datetime', None)
537    indices = [col for col in pipe.columns.values() if col]
538    table_name = self.quote_table(pipe.target)
539    is_dask = 'dask' in df.__module__
540    if is_dask:
541        df = df.compute()
542    upsert = pipe.parameters.get('upsert', False)
543    static = pipe.parameters.get('static', False)
544
545    def _serialize_indices_docs(_docs):
546        return [
547            {
548                'ix': self.get_document_key(doc, indices),
549                **(
550                    {
551                        dt_col: doc.get(dt_col, 0)
552                    }
553                    if dt_col
554                    else {}
555                )
556            }
557            for doc in _docs
558        ]
559
560    valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {})
561    new_dtypes = {
562        str(key): (
563            str(val)
564            if not are_dtypes_equal(str(val), 'datetime')
565            else 'datetime64[ns, UTC]'
566        )
567        for key, val in df.dtypes.items()
568        if str(key) not in valkey_dtypes
569    }
570    for col, typ in {c: v for c, v in valkey_dtypes.items()}.items():
571        if col in df.columns:
572            try:
573                df[col] = df[col].astype(typ)
574            except Exception:
575                valkey_dtypes[col] = 'string'
576                new_dtypes[col] = 'string'
577                df[col] = df[col].astype('string')
578
579    if new_dtypes and (not static or not valkey_dtypes):
580        valkey_dtypes.update(new_dtypes)
581        if 'valkey' not in pipe.parameters:
582            pipe.parameters['valkey'] = {}
583        pipe.parameters['valkey']['dtypes'] = valkey_dtypes
584        if not pipe.temporary:
585            edit_success, edit_msg = pipe.edit(debug=debug)
586            if not edit_success:
587                return edit_success, edit_msg
588
589    unseen_df, update_df, delta_df = (
590        pipe.filter_existing(df, include_unchanged_columns=True, debug=debug)
591        if check_existing and not upsert
592        else (None, df, df)
593    )
594    num_insert = len(unseen_df) if unseen_df is not None else 0
595    num_update = len(update_df) if update_df is not None else 0
596    msg = (
597        f"Inserted {num_insert}, updated {num_update} rows."
598        if not upsert
599        else f"Upserted {num_update} rows."
600    )
601    if len(delta_df) == 0:
602        return True, msg
603
604    unseen_docs = unseen_df.to_dict(orient='records') if unseen_df is not None else []
605    unseen_indices_docs = _serialize_indices_docs(unseen_docs)
606    unseen_ix_vals = {
607        self.get_document_key(doc, indices, table_name): serialize_document(doc)
608        for doc in unseen_docs
609    }
610    for key, val in unseen_ix_vals.items():
611        try:
612            self.set(key, val)
613        except Exception as e:
614            return False, f"Failed to set keys for {pipe}:\n{e}"
615
616    try:
617        self.push_docs(
618            unseen_indices_docs,
619            pipe.target,
620            datetime_column=dt_col,
621            debug=debug,
622        )
623    except Exception as e:
624        return False, f"Failed to push docs to '{pipe.target}':\n{e}"
625
626    update_docs = update_df.to_dict(orient='records') if update_df is not None else []
627    update_ix_docs = {
628        self.get_document_key(doc, indices, table_name): doc
629        for doc in update_docs
630    }
631    existing_docs_data = {
632        key: self.get(key)
633        for key in update_ix_docs
634    } if pipe.exists(debug=debug) else {}
635    existing_docs = {
636        key: json.loads(data)
637        for key, data in existing_docs_data.items()
638        if data
639    }
640    new_update_docs = {
641        key: doc
642        for key, doc in update_ix_docs.items()
643        if key not in existing_docs
644    }
645    new_ix_vals = {
646        self.get_document_key(doc, indices, table_name): serialize_document(doc)
647        for doc in new_update_docs.values()
648    }
649    for key, val in new_ix_vals.items():
650        try:
651            self.set(key, val)
652        except Exception as e:
653            return False, f"Failed to set keys for {pipe}:\n{e}"
654
655    old_update_docs = {
656        key: {
657            **existing_docs[key],
658            **doc
659        }
660        for key, doc in update_ix_docs.items()
661        if key in existing_docs
662    }
663    new_indices_docs = _serialize_indices_docs([doc for doc in new_update_docs.values()])
664    try:
665        if new_indices_docs:
666            self.push_docs(
667                new_indices_docs,
668                pipe.target,
669                datetime_column=dt_col,
670                debug=debug,
671            )
672    except Exception as e:
673        return False, f"Failed to upsert '{pipe.target}':\n{e}"
674
675    for key, doc in old_update_docs.items():
676        try:
677            self.set(key, serialize_document(doc))
678        except Exception as e:
679            return False, f"Failed to set keys for {pipe}:\n{e}"
680
681    return True, msg

Upsert new documents into the pipe's collection.

Parameters
  • pipe (mrsm.Pipe): The pipe whose collection should receive the new documents.
  • df (Union['pd.DataFrame', Iterator['pd.DataFrame']], default None): The data to be synced.
  • check_existing (bool, default True): If False, do not check the documents against existing data and instead insert directly.
Returns
  • A SuccessTuple indicating success.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Dict[str, str]:
684def get_pipe_columns_types(
685    self,
686    pipe: mrsm.Pipe,
687    debug: bool = False,
688    **kwargs: Any
689) -> Dict[str, str]:
690    """
691    Return the data types for the columns in the target table for data type enforcement.
692
693    Parameters
694    ----------
695    pipe: mrsm.Pipe
696        The pipe whose target table contains columns and data types.
697
698    Returns
699    -------
700    A dictionary mapping columns to data types.
701    """
702    if not pipe.exists(debug=debug):
703        return {}
704
705    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
706    return {
707        col: get_db_type_from_pd_type(typ, flavor='postgresql')
708        for col, typ in pipe.parameters.get('valkey', {}).get('dtypes', {}).items()
709    }

Return the data types for the columns in the target table for data type enforcement.

Parameters
  • pipe (mrsm.Pipe): The pipe whose target table contains columns and data types.
Returns
  • A dictionary mapping columns to data types.
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
712def clear_pipe(
713    self,
714    pipe: mrsm.Pipe,
715    begin: Union[datetime, int, None] = None,
716    end: Union[datetime, int, None] = None,
717    params: Optional[Dict[str, Any]] = None,
718    debug: bool = False,
719) -> mrsm.SuccessTuple:
720    """
721    Delete rows within `begin`, `end`, and `params`.
722
723    Parameters
724    ----------
725    pipe: mrsm.Pipe
726        The pipe whose rows to clear.
727
728    begin: Union[datetime, int, None], default None
729        If provided, remove rows >= `begin`.
730
731    end: Union[datetime, int, None], default None
732        If provided, remove rows < `end`.
733
734    params: Optional[Dict[str, Any]], default None
735        If provided, only remove rows which match the `params` filter.
736
737    Returns
738    -------
739    A `SuccessTuple` indicating success.
740    """
741    dt_col = pipe.columns.get('datetime', None)
742
743    existing_df = pipe.get_data(
744        begin=begin,
745        end=end,
746        params=params,
747        debug=debug,
748    )
749    if existing_df is None or len(existing_df) == 0:
750        return True, "Deleted 0 rows."
751
752    docs = existing_df.to_dict(orient='records')
753    table_name = self.quote_table(pipe.target)
754    indices = [col for col in pipe.columns.values() if col]
755    for doc in docs:
756        set_doc_key = self.get_document_key(doc, indices)
757        table_doc_key = self.get_document_key(doc, indices, table_name)
758        try:
759            if dt_col:
760                self.client.zrem(table_name, set_doc_key)
761            else:
762                self.client.srem(table_name, set_doc_key)
763            self.client.delete(table_doc_key)
764        except Exception as e:
765            return False, f"Failed to delete documents:\n{e}"
766    msg = (
767        f"Deleted {len(docs)} row"
768        + ('s' if len(docs) != 1 else '')
769        + '.'
770    )
771    return True, msg

Delete rows within begin, end, and params.

Parameters
  • pipe (mrsm.Pipe): The pipe whose rows to clear.
  • begin (Union[datetime, int, None], default None): If provided, remove rows >= begin.
  • end (Union[datetime, int, None], default None): If provided, remove rows < end.
  • params (Optional[Dict[str, Any]], default None): If provided, only remove rows which match the params filter.
Returns
  • A SuccessTuple indicating success.
def get_sync_time( self, pipe: meerschaum.Pipe, newest: bool = True, **kwargs: Any) -> Union[datetime.datetime, int, NoneType]:
774def get_sync_time(
775    self,
776    pipe: mrsm.Pipe,
777    newest: bool = True,
778    **kwargs: Any
779) -> Union[datetime, int, None]:
780    """
781    Return the newest (or oldest) timestamp in a pipe.
782    """
783    from meerschaum.utils.dtypes import are_dtypes_equal
784    dt_col = pipe.columns.get('datetime', None)
785    dt_typ = pipe.dtypes.get(dt_col, 'datetime64[ns, UTC]')
786    if not dt_col:
787        return None
788
789    dateutil_parser = mrsm.attempt_import('dateutil.parser')
790    table_name = self.quote_table(pipe.target)
791    try:
792        vals = (
793            self.client.zrevrange(table_name, 0, 0)
794            if newest
795            else self.client.zrange(table_name, 0, 0)
796        )
797        if not vals:
798            return None
799        val = vals[0]
800    except Exception:
801        return None
802
803    doc = json.loads(val)
804    dt_val = doc.get(dt_col, None)
805    if dt_val is None:
806        return None
807
808    try:
809        return (
810            int(dt_val)
811            if are_dtypes_equal(dt_typ, 'int')
812            else dateutil_parser.parse(str(dt_val))
813        )
814    except Exception as e:
815        warn(f"Failed to parse sync time for {pipe}:\n{e}")
816
817    return None

Return the newest (or oldest) timestamp in a pipe.

def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Optional[int]:
820def get_pipe_rowcount(
821    self,
822    pipe: mrsm.Pipe,
823    begin: Union[datetime, int, None] = None,
824    end: Union[datetime, int, None] = None,
825    params: Optional[Dict[str, Any]] = None,
826    debug: bool = False,
827    **kwargs: Any
828) -> Union[int, None]:
829    """
830    Return the number of documents in the pipe's set.
831    """
832    dt_col = pipe.columns.get('datetime', None)
833    table_name = self.quote_table(pipe.target)
834
835    if not pipe.exists():
836        return 0
837
838    try:
839        if begin is None and end is None and not params:
840            return (
841                self.client.zcard(table_name)
842                if dt_col
843                else self.client.scard(table_name)
844            )
845    except Exception as e:
846        if debug:
847            dprint(f"Failed to get rowcount for {pipe}:\n{e}")
848        return None
849
850    df = pipe.get_data(begin=begin, end=end, params=params, debug=debug)
851    if df is None:
852        return 0
853
854    return len(df)

Return the number of documents in the pipe's set.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Optional[List[Tuple[str, str, Optional[str]]]]:
857def fetch_pipes_keys(
858    self,
859    connector_keys: Optional[List[str]] = None,
860    metric_keys: Optional[List[str]] = None,
861    location_keys: Optional[List[str]] = None,
862    tags: Optional[List[str]] = None,
863    params: Optional[Dict[str, Any]] = None,
864    debug: bool = False
865) -> Optional[List[Tuple[str, str, Optional[str]]]]:
866    """
867    Return the keys for the registered pipes.
868    """
869    from meerschaum.utils.dataframe import query_df
870    from meerschaum.utils.misc import separate_negation_values
871    try:
872        df = self.read(PIPES_TABLE, debug=debug)
873    except Exception:
874        return []
875
876    if df is None or len(df) == 0:
877        return []
878
879    query = {}
880    if connector_keys:
881        query['connector_keys'] = [str(k) for k in connector_keys]
882    if metric_keys:
883        query['metric_key'] = [str(k) for k in metric_keys]
884    if location_keys:
885        query['location_key'] = [str(k) for k in location_keys]
886    if params:
887        query.update(params)
888
889    df = query_df(df, query, inplace=True)
890
891    keys = [
892        (
893            doc['connector_keys'],
894            doc['metric_key'],
895            doc['location_key'],
896        )
897        for doc in df.to_dict(orient='records')
898    ]
899    if not tags:
900        return keys
901
902    tag_groups = [tag.split(',') for tag in tags]
903    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
904
905    filtered_keys = []
906    for ck, mk, lk in keys:
907        pipe = mrsm.Pipe(ck, mk, lk, instance=self)
908        pipe_tags = set(pipe.tags)
909        
910        include_pipe = True
911        for in_tags, ex_tags in in_ex_tag_groups:
912            all_in = all(tag in pipe_tags for tag in in_tags)
913            any_ex = any(tag in pipe_tags for tag in ex_tags)
914
915            if (not all_in) or any_ex:
916                include_pipe = False
917                continue
918
919        if include_pipe:
920            filtered_keys.append((ck, mk, lk))
921
922    return filtered_keys

Return the keys for the registered pipes.

@staticmethod
def get_document_key( doc: Dict[str, Any], indices: List[str], table_name: Optional[str] = None) -> str:
 59@staticmethod
 60def get_document_key(
 61    doc: Dict[str, Any],
 62    indices: List[str],
 63    table_name: Optional[str] = None,
 64) -> str:
 65    """
 66    Return a serialized string for a document's indices only.
 67
 68    Parameters
 69    ----------
 70    doc: Dict[str, Any]
 71        The document containing index values to be serialized.
 72
 73    indices: List[str]
 74        The name of the indices to be serialized.
 75
 76    table_name: Optional[str], default None
 77        If provided, prepend the table to the key.
 78
 79    Returns
 80    -------
 81    A serialized string of the document's indices.
 82    """
 83    from meerschaum.utils.dtypes import coerce_timezone
 84    index_vals = {
 85        key: (
 86            str(val).replace(':', COLON)
 87            if not isinstance(val, datetime)
 88            else str(int(coerce_timezone(val).replace(tzinfo=timezone.utc).timestamp()))
 89        )
 90        for key, val in doc.items()
 91        if ((key in indices) if indices else True)
 92    }
 93    indices_str = (
 94        (
 95            (
 96                (
 97                    table_name
 98                    + ':'
 99                    + ('indices:' if True else '')
100                )
101            )
102            if table_name
103            else ''
104        ) + ','.join(
105            sorted(
106                [
107                    f'{key}{COLON}{val}'
108                    for key, val in index_vals.items()
109                ]
110            )
111        )
112    )
113    return indices_str

Return a serialized string for a document's indices only.

Parameters
  • doc (Dict[str, Any]): The document containing index values to be serialized.
  • indices (List[str]): The name of the indices to be serialized.
  • table_name (Optional[str], default None): If provided, prepend the table to the key.
Returns
  • A serialized string of the document's indices.
@classmethod
def get_table_quoted_doc_key( cls, table_name: str, doc: Dict[str, Any], indices: List[str], datetime_column: Optional[str] = None) -> str:
116@classmethod
117def get_table_quoted_doc_key(
118    cls,
119    table_name: str,
120    doc: Dict[str, Any],
121    indices: List[str],
122    datetime_column: Optional[str] = None,
123) -> str:
124    """
125    Return the document string as stored in the underling set.
126    """
127    return json.dumps(
128        {
129            cls.get_document_key(doc, indices, table_name): serialize_document(doc),
130            **(
131                {datetime_column: doc.get(datetime_column, 0)}
132                if datetime_column
133                else {}
134            )
135        },
136        sort_keys=True,
137        separators=(',', ':'),
138        default=json_serialize_value,
139    )

Return the document string as stored in the underling set.

def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> List[Dict[str, Any]]:
17def fetch(
18    self,
19    pipe: mrsm.Pipe,
20    begin: Union[datetime, int, None] = None,
21    end: Union[datetime, int, None] = None,
22    params: Optional[Dict[str, Any]] = None,
23    debug: bool = False,
24    **kwargs: Any
25) -> List[Dict[str, Any]]:
26    """
27    Return data from a source database.
28    """
29    source_key = pipe.parameters.get('valkey', {}).get('key', None)
30    if not source_key:
31        return []
32
33    try:
34        key_type = self.client.type(source_key).decode('utf-8')
35    except Exception:
36        warn(f"Could not determine the type for key '{source_key}'.")
37        return []
38
39    begin_ts = (
40        (
41            int(begin.replace(tzinfo=timezone.utc).timestamp())
42            if isinstance(begin, datetime)
43            else int(begin)
44        )
45        if begin is not None else '-inf'
46    )
47    end_ts = (
48        (
49            int(end.replace(tzinfo=timezone.utc).timestamp())
50            if isinstance(end, datetime)
51            else int(end)
52        )
53        if end is not None else '+inf'
54    )
55
56    if debug:
57        dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
58
59    if key_type == 'set':
60        return [
61            json.loads(doc_bytes.decode('utf-8'))
62            for doc_bytes in self.client.smembers(source_key)
63        ]
64
65    if key_type == 'zset':
66        return [
67            json.loads(doc_bytes.decode('utf-8'))
68            for doc_bytes in self.client.zrangebyscore(
69                source_key,
70                begin_ts,
71                end_ts,
72                withscores=False,
73            )
74        ]
75
76    return [{source_key: self.get(source_key)}]

Return data from a source database.

def get_users_pipe(self):
20def get_users_pipe(self):
21    """
22    Return the pipe which stores the registered users.
23    """
24    return mrsm.Pipe(
25        'mrsm', 'users',
26        columns=['user_id'],
27        temporary=True,
28        target=USERS_TABLE,
29        instance=self,
30    )

Return the pipe which stores the registered users.

@classmethod
def get_user_key( cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str:
33@classmethod
34def get_user_key(cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str:
35    """
36    Return the key to store metadata about a user.
37
38    Parameters
39    ----------
40    user_id_or_username: str
41        The user ID or username of the given user.
42        If `by_username` is `True`, then provide the username.
43
44    sub_key: str
45        The key suffix, e.g. `'attributes'`.
46
47    by_username: bool, default False
48        If `True`, then treat `user_id_or_username` as a username.
49
50    Returns
51    -------
52    A key to store information about a user.
53
54    Examples
55    --------
56    >>> get_user_key('deadbeef', 'attributes')
57    'mrsm_user:user_id:deadbeef:attributes'
58    >>> get_user_key('foo', 'user_id', by_username=True)
59    'mrsm_user:username:foo:user_id'
60    """
61    key_type = 'user_id' if not by_username else 'username'
62    return cls.get_entity_key(USER_PREFIX, key_type, user_id_or_username, sub_key)

Return the key to store metadata about a user.

Parameters
  • user_id_or_username (str): The user ID or username of the given user. If by_username is True, then provide the username.
  • sub_key (str): The key suffix, e.g. 'attributes'.
  • by_username (bool, default False): If True, then treat user_id_or_username as a username.
Returns
  • A key to store information about a user.
Examples
>>> get_user_key('deadbeef', 'attributes')
'mrsm_user:user_id:deadbeef:attributes'
>>> get_user_key('foo', 'user_id', by_username=True)
'mrsm_user:username:foo:user_id'
@classmethod
def get_user_keys_vals( cls, user: meerschaum.core.User._User.User, mutable_only: bool = False) -> Dict[str, str]:
 65@classmethod
 66def get_user_keys_vals(
 67    cls,
 68    user: 'mrsm.core.User',
 69    mutable_only: bool = False,
 70) -> Dict[str, str]:
 71    """
 72    Return a dictionary containing keys and values to set for the user.
 73
 74    Parameters
 75    ----------
 76    user: mrsm.core.User
 77        The user for which to generate the keys.
 78
 79    mutable_only: bool, default False
 80        If `True`, only return keys which may be edited.
 81
 82    Returns
 83    -------
 84    A dictionary mapping a user's keys to values.
 85    """
 86    user_attributes_str = json.dumps(user.attributes, separators=(',', ':'))
 87    mutable_keys_vals = {
 88        cls.get_user_key(user.user_id, 'attributes'): user_attributes_str,
 89        cls.get_user_key(user.user_id, 'email'): user.email,
 90        cls.get_user_key(user.user_id, 'type'): user.type,
 91        cls.get_user_key(user.user_id, 'password_hash'): user.password_hash,
 92    }
 93    if mutable_only:
 94        return mutable_keys_vals
 95
 96    immutable_keys_vals = {
 97        cls.get_user_key(user.user_id, 'username'): user.username,
 98        cls.get_user_key(user.username, 'user_id', by_username=True): user.user_id,
 99    }
100
101    return {**immutable_keys_vals, **mutable_keys_vals}

Return a dictionary containing keys and values to set for the user.

Parameters
  • user (mrsm.core.User): The user for which to generate the keys.
  • mutable_only (bool, default False): If True, only return keys which may be edited.
Returns
  • A dictionary mapping a user's keys to values.
def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
104def register_user(
105    self,
106    user: 'mrsm.core.User',
107    debug: bool = False,
108    **kwargs: Any
109) -> SuccessTuple:
110    """
111    Register a new user.
112    """
113    from meerschaum.utils.misc import generate_password
114
115    user.user_id = generate_password(12)
116    users_pipe = self.get_users_pipe()
117    keys_vals = self.get_user_keys_vals(user)
118
119    try:
120        sync_success, sync_msg = users_pipe.sync(
121            [
122                {
123                    'user_id': user.user_id,
124                    'username': user.username,
125                },
126            ],
127            check_existing=False,
128            debug=debug,
129        )
130        if not sync_success:
131            return sync_success, sync_msg
132
133        for key, val in keys_vals.items():
134            if val is not None:
135                self.set(key, val)
136
137        success, msg = True, "Success"
138    except Exception as e:
139        success = False
140        import traceback
141        traceback.print_exc()
142        msg = f"Failed to register '{user.username}':\n{e}"
143
144    if not success:
145        for key in keys_vals:
146            try:
147                self.client.delete(key)
148            except Exception:
149                pass
150
151    return success, msg

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[str]:
154def get_user_id(self, user: 'mrsm.core.User', debug: bool = False) -> Union[str, None]:
155    """
156    Return the ID for a user, or `None`.
157    """
158    username_user_id_key = self.get_user_key(user.username, 'user_id', by_username=True)
159    try:
160        user_id = self.get(username_user_id_key)
161    except Exception:
162        user_id = None
163    return user_id

Return the ID for a user, or None.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
166def edit_user(
167    self,
168    user: 'mrsm.core.User',
169    debug: bool = False,
170    **kw: Any
171) -> SuccessTuple:
172    """
173    Edit the attributes for an existing user.
174    """
175    keys_vals = self.get_user_keys_vals(user, mutable_only=True)
176    try:
177        old_keys_vals = {
178            key: self.get(key)
179            for key in keys_vals
180        }
181    except Exception as e:
182        return False, f"Failed to edit user:\n{e}"
183
184    try:
185        for key, val in keys_vals.items():
186            self.set(key, val)
187        success, msg = True, "Success"
188    except Exception as e:
189        success = False
190        msg = f"Failed to edit user:\n{e}"
191
192    if not success:
193        try:
194            for key, old_val in old_keys_vals.items():
195                self.set(key, old_val)
196        except Exception:
197            pass
198
199    return success, msg

Edit the attributes for an existing user.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[Dict[str, Any]]:
202def get_user_attributes(
203    self,
204    user: 'mrsm.core.User',
205    debug: bool = False
206) -> Union[Dict[str, Any], None]:
207    """
208    Return the user's attributes.
209    """
210    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
211    user_id_attributes_key = self.get_user_key(user_id, 'attributes')
212    try:
213        return json.loads(self.get(user_id_attributes_key))
214    except Exception:
215        return None

Return the user's attributes.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Tuple[bool, str]:
218def delete_user(
219    self,
220    user: 'mrsm.core.User',
221    debug: bool = False
222) -> SuccessTuple:
223    """
224    Delete a user's keys.
225    """
226    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
227    users_pipe = self.get_users_pipe()
228    keys_vals = self.get_user_keys_vals(user)
229    try:
230        old_keys_vals = {
231            key: self.get(key)
232            for key in keys_vals
233        }
234    except Exception as e:
235        return False, f"Failed to delete user:\n{e}"
236
237    clear_success, clear_msg = users_pipe.clear(params={'user_id': user_id})
238    if not clear_success:
239        return clear_success, clear_msg
240
241    try:
242        for key in keys_vals:
243            self.client.delete(key)
244        success, msg = True, "Success"
245    except Exception as e:
246        success = False
247        msg = f"Failed to delete user:\n{e}"
248
249    if not success:
250        try:
251            for key, old_val in old_keys_vals.items():
252                self.set(key, old_val)
253        except Exception:
254            pass
255
256    return success, msg

Delete a user's keys.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
259def get_users(
260    self,
261    debug: bool = False,
262    **kw: Any
263) -> List[str]:
264    """
265    Get the registered usernames.
266    """
267    users_pipe = self.get_users_pipe()
268    df = users_pipe.get_data()
269    if df is None:
270        return []
271
272    return list(df['username'])

Get the registered usernames.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
275def get_user_password_hash(
276    self,
277    user: 'mrsm.core.User',
278    debug: bool = False,
279    **kw: Any
280) -> Union[str, None]:
281    """
282    Return the password has for a user.
283    """
284    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
285    user_id_password_hash_key = self.get_user_key(user_id, 'password_hash')
286    try:
287        return self.get(user_id_password_hash_key)
288    except Exception:
289        return None

Return the password has for a user.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
292def get_user_type(
293    self,
294    user: 'mrsm.core.User',
295    debug: bool = False,
296    **kw: Any
297) -> Union[str, None]:
298    """
299    Return the user's type.
300    """
301    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
302    user_id_type_key = self.get_user_key(user_id, 'type')
303    try:
304        return self.get(user_id_type_key)
305    except Exception:
306        return None

Return the user's type.

def get_plugins_pipe(self) -> meerschaum.Pipe:
20def get_plugins_pipe(self) -> mrsm.Pipe:
21    """
22    Return the pipe to store the plugins.
23    """
24    return mrsm.Pipe(
25        'mrsm', 'plugins',
26        columns=['plugin_name'],
27        temporary=True,
28        target=PLUGINS_TABLE,
29        instance=self,
30    )

Return the pipe to store the plugins.

@classmethod
def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str:
33@classmethod
34def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str:
35    """
36    Return the key for a plugin's attribute.
37    """
38    return cls.get_entity_key(PLUGIN_PREFIX, plugin_name, sub_key)

Return the key for a plugin's attribute.

@classmethod
def get_plugin_keys_vals( cls, plugin: meerschaum.Plugin, mutable_only: bool = False) -> Dict[str, str]:
41@classmethod
42def get_plugin_keys_vals(
43    cls,
44    plugin: 'mrsm.core.Plugin',
45    mutable_only: bool = False,
46) -> Dict[str, str]:
47    """
48    Return a dictionary containing keys and values to set for the plugin.
49
50    Parameters
51    ----------
52    plugin: mrsm.core.Plugin
53        The plugin for which to generate the keys.
54
55    mutable_only: bool, default False
56        If `True`, only return keys which may be edited.
57
58    Returns
59    -------
60    A dictionary mapping a plugins's keys to values.
61    """
62    plugin_attributes_str = json.dumps(plugin.attributes, separators=(',', ':'))
63    mutable_keys_vals = {
64        cls.get_plugin_key(plugin.name, 'attributes'): plugin_attributes_str,
65        cls.get_plugin_key(plugin.name, 'version'): plugin.version,
66    }
67    if mutable_only:
68        return mutable_keys_vals
69
70    immutable_keys_vals = {
71        cls.get_plugin_key(plugin.name, 'user_id'): plugin.user_id,
72    }
73
74    return {**immutable_keys_vals, **mutable_keys_vals}

Return a dictionary containing keys and values to set for the plugin.

Parameters
  • plugin (mrsm.core.Plugin): The plugin for which to generate the keys.
  • mutable_only (bool, default False): If True, only return keys which may be edited.
Returns
  • A dictionary mapping a plugins's keys to values.
def register_plugin( self, plugin: meerschaum.Plugin, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 77def register_plugin(
 78    self,
 79    plugin: 'mrsm.core.Plugin',
 80    force: bool = False,
 81    debug: bool = False,
 82    **kw: Any
 83) -> SuccessTuple:
 84    """Register a new plugin to the `mrsm_plugins` "table"."""
 85    from meerschaum.utils.misc import generate_password
 86
 87    plugins_pipe = self.get_plugins_pipe()
 88    keys_vals = self.get_plugin_keys_vals(plugin)
 89
 90    try:
 91        sync_success, sync_msg = plugins_pipe.sync(
 92            [
 93                {
 94                    'plugin_name': plugin.name,
 95                    'user_id': plugin.user_id,
 96                },
 97            ],
 98            check_existing=False,
 99            debug=debug,
100        )
101        if not sync_success:
102            return sync_success, sync_msg
103
104        for key, val in keys_vals.items():
105            if val is not None:
106                self.set(key, val)
107
108        success, msg = True, "Success"
109    except Exception as e:
110        success = False
111        msg = f"Failed to register plugin '{plugin.name}':\n{e}"
112
113    if not success:
114        for key in keys_vals:
115            try:
116                self.client.delete(key)
117            except Exception:
118                pass
119
120    return success, msg

Register a new plugin to the mrsm_plugins "table".

def get_plugin_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
123def get_plugin_id(
124    self,
125    plugin: 'mrsm.core.Plugin',
126    debug: bool = False
127) -> Union[str, None]:
128    """
129    Return a plugin's ID.
130    """
131    return plugin.name

Return a plugin's ID.

def get_plugin_version( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
134def get_plugin_version(
135    self,
136    plugin: 'mrsm.core.Plugin',
137    debug: bool = False,
138) -> Union[str, None]:
139    """
140    Return a plugin's version.
141    """
142    version_key = self.get_plugin_key(plugin.name, 'version')
143
144    try:
145        return self.get(version_key)
146    except Exception:
147        return None

Return a plugin's version.

def get_plugin_user_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
150def get_plugin_user_id(
151    self,
152    plugin: 'mrsm.core.Plugin',
153    debug: bool = False
154) -> Union[str, None]:
155    """
156    Return a plugin's user ID.
157    """
158    user_id_key = self.get_plugin_key(plugin.name, 'user_id')
159
160    try:
161        return self.get(user_id_key)
162    except Exception:
163        return None

Return a plugin's user ID.

def get_plugin_username( self, plugin: meerschaum.Plugin, debug: bool = False) -> str:
166def get_plugin_username(
167    self,
168    plugin: 'mrsm.core.Plugin',
169    debug: bool = False
170) -> Union[str]:
171    """
172    Return the username of a plugin's owner.
173    """
174    user_id = self.get_plugin_user_id(plugin, debug=debug)
175    if user_id is None:
176        return None
177
178    username_key = self.get_user_key(user_id, 'username')
179    try:
180        return self.get(username_key)
181    except Exception:
182        return None

Return the username of a plugin's owner.

def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
185def get_plugin_attributes(
186    self,
187    plugin: 'mrsm.core.Plugin',
188    debug: bool = False
189) -> Dict[str, Any]:
190    """
191    Return the attributes of a plugin.
192    """
193    attributes_key = self.get_plugin_key(plugin.name, 'attributes')
194    try:
195        attributes_str = self.get(attributes_key)
196        if not attributes_str:
197            return {}
198        return json.loads(attributes_str)
199    except Exception:
200        return {}

Return the attributes of a plugin.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) -> List[str]:
203def get_plugins(
204    self,
205    user_id: Optional[int] = None,
206    search_term: Optional[str] = None,
207    debug: bool = False,
208    **kw: Any
209) -> List[str]:
210    """
211    Return a list of plugin names.
212    """
213    plugins_pipe = self.get_plugins_pipe()
214    params = {}
215    if user_id:
216        params['user_id'] = user_id
217
218    df = plugins_pipe.get_data(['plugin_name'], params=params, debug=debug)
219    docs = df.to_dict(orient='records')
220
221    return [
222        doc['plugin_name']
223        for doc in docs
224        if (plugin_name := doc['plugin_name']).startswith(search_term or '')
225    ]

Return a list of plugin names.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
228def delete_plugin(
229    self,
230    plugin: 'mrsm.core.Plugin',
231    debug: bool = False,
232    **kw: Any
233) -> SuccessTuple:
234    """
235    Delete a plugin from the plugins table.
236    """
237    plugins_pipe = self.get_plugins_pipe()
238    clear_success, clear_msg = plugins_pipe.clear(params={'plugin_name': plugin.name}, debug=debug)
239    if not clear_success:
240        return clear_success, clear_msg
241
242    keys_vals = self.get_plugin_keys_vals(plugin)
243    try:
244        old_keys_vals = {
245            key: self.get(key)
246            for key in keys_vals
247        }
248    except Exception as e:
249        return False, f"Failed to delete plugin '{plugin.name}':\n{e}"
250
251    try:
252        for key in keys_vals:
253            self.client.delete(key)
254        success, msg = True, "Success"
255    except Exception as e:
256        success = False
257        msg = f"Failed to delete plugin '{plugin.name}':\n{e}"
258
259    if not success:
260        try:
261            for key, old_val in old_keys_vals.items():
262                self.set(key, old_val)
263        except Exception:
264            pass
265
266    return success, msg

Delete a plugin from the plugins table.