meerschaum.connectors.valkey

Import the ValkeyConnector.

 1#! /usr/bin/env python3
 2# vim:fenc=utf-8
 3
 4"""
 5Import the `ValkeyConnector`.
 6"""
 7
 8from meerschaum.connectors.valkey._ValkeyConnector import ValkeyConnector
 9
10__all__ = ('ValkeyConnector',)
@make_connector
class ValkeyConnector(meerschaum.connectors._Connector.Connector):
 19@make_connector
 20class ValkeyConnector(Connector):
 21    """
 22    Manage a Valkey instance.
 23
 24    Build a `ValkeyConnector` from connection attributes or a URI string.
 25    """
 26    IS_INSTANCE: bool = True
 27    REQUIRED_ATTRIBUTES: List[str] = ['host']
 28    OPTIONAL_ATTRIBUTES: List[str] = [
 29        'port', 'username', 'password', 'db', 'socket_timeout',
 30    ]
 31    DEFAULT_ATTRIBUTES: Dict[str, Any] = {
 32        'username': 'default',
 33        'port': 6379,
 34        'db': 0,
 35        'socket_timeout': 300,
 36    }
 37    KEY_SEPARATOR: str = ':'
 38
 39    from ._pipes import (
 40        register_pipe,
 41        get_pipe_id,
 42        get_pipe_attributes,
 43        edit_pipe,
 44        pipe_exists,
 45        drop_pipe,
 46        delete_pipe,
 47        get_pipe_data,
 48        sync_pipe,
 49        get_pipe_columns_types,
 50        clear_pipe,
 51        get_sync_time,
 52        get_pipe_rowcount,
 53        fetch_pipes_keys,
 54    )
 55    from ._fetch import (
 56        fetch,
 57    )
 58
 59    from ._users import (
 60        get_users_pipe,
 61        get_user_key,
 62        get_user_keys_vals,
 63        register_user,
 64        get_user_id,
 65        edit_user,
 66        get_user_attributes,
 67        delete_user,
 68        get_users,
 69        get_user_password_hash,
 70        get_user_type,
 71    )
 72    from ._plugins import (
 73        get_plugins_pipe,
 74        get_plugin_key,
 75        get_plugin_keys_vals,
 76        register_plugin,
 77        get_plugin_id,
 78        get_plugin_version,
 79        get_plugin_user_id,
 80        get_plugin_username,
 81        get_plugin_attributes,
 82        get_plugins,
 83        delete_plugin,
 84    )
 85
 86    @property
 87    def client(self):
 88        """
 89        Return the Valkey client.
 90        """
 91        if '_client' in self.__dict__:
 92            return self.__dict__['_client']
 93
 94        valkey = mrsm.attempt_import('valkey')
 95
 96        if 'uri' in self.__dict__:
 97            self._client = valkey.Valkey.from_url(self.__dict__.get('uri'))
 98            return self._client
 99
100        optional_kwargs = {
101            key: self.__dict__.get(key)
102            for key in self.OPTIONAL_ATTRIBUTES
103            if key in self.__dict__
104        }
105        connection_kwargs = {
106            'host': self.host,
107            **optional_kwargs
108        }
109
110        self._client = valkey.Valkey(**connection_kwargs)
111        return self._client
112
113    @property
114    def URI(self) -> str:
115        """
116        Return the connection URI for this connector.
117        """
118        import urllib.parse
119
120        if 'uri' in self.__dict__:
121            return self.__dict__.get('uri')
122
123        uri = "valkey://"
124        if 'username' in self.__dict__:
125            uri += urllib.parse.quote_plus(self.username) + ':'
126
127        if 'password' in self.__dict__:
128            uri += urllib.parse.quote_plus(self.password) + '@'
129
130        if 'host' in self.__dict__:
131            uri += self.host
132
133        if 'port' in self.__dict__:
134            uri += f':{self.port}'
135
136        if 'db' in self.__dict__:
137            uri += f"/{self.db}"
138
139        if 'socket_timeout' in self.__dict__:
140            uri += f"?timeout={self.socket_timeout}s"
141
142        return uri
143
144    def set(self, key: str, value: Any, **kwargs: Any) -> None:
145        """
146        Set the `key` to `value`.
147        """
148        return self.client.set(key, value, **kwargs)
149
150    def get(self, key: str) -> Union[str, None]:
151        """
152        Get the value for `key`.
153        """
154        val = self.client.get(key)
155        if val is None:
156            return None
157
158        return val.decode('utf-8')
159
160    def test_connection(self) -> bool:
161        """
162        Return whether a connection may be established.
163        """
164        return self.client.ping()
165
166    @classmethod
167    def quote_table(cls, table: str) -> str:
168        """
169        Return a quoted key.
170        """
171        return shlex.quote(table)
172
173    @classmethod
174    def get_counter_key(cls, table: str) -> str:
175        """
176        Return the counter key for a given table.
177        """
178        table_name = cls.quote_table(table)
179        return f"{table_name}:counter"
180
181    def push_df(
182        self,
183        df: 'pd.DataFrame',
184        table: str,
185        datetime_column: Optional[str] = None,
186        debug: bool = False,
187    ) -> int:
188        """
189        Append a pandas DataFrame to a table.
190
191        Parameters
192        ----------
193        df: pd.DataFrame
194            The pandas DataFrame to append to the table.
195
196        table: str
197            The "table" name (root key).
198
199        datetime_column: Optional[str], default None
200            If provided, use this key as the datetime index.
201
202        Returns
203        -------
204        The current index counter value (how many docs have been pushed).
205        """
206        from meerschaum.utils.dataframe import to_json
207        docs_str = to_json(df)
208        docs = json.loads(docs_str)
209        return self.push_docs(
210            docs,
211            table,
212            datetime_column=datetime_column,
213            debug=debug,
214        )
215
216    def push_docs(
217        self,
218        docs: List[Dict[str, Any]],
219        table: str,
220        datetime_column: Optional[str] = None,
221        debug: bool = False,
222    ) -> int:
223        """
224        Append a list of documents to a table.
225
226        Parameters
227        ----------
228        docs: List[Dict[str, Any]]
229            The docs to be pushed.
230            All keys and values will be coerced into strings.
231
232        table: str
233            The "table" name (root key).
234
235        datetime_column: Optional[str], default None
236            If set, create a sorted set with this datetime column as the index.
237            Otherwise push the docs to a list.
238
239        Returns
240        -------
241        The current index counter value (how many docs have been pushed).
242        """
243        from meerschaum.utils.misc import json_serialize_datetime
244        table_name = self.quote_table(table)
245        datetime_column_key = self.get_datetime_column_key(table)
246        remote_datetime_column = self.get(datetime_column_key)
247        datetime_column = datetime_column or remote_datetime_column
248        dateutil_parser = mrsm.attempt_import('dateutil.parser')
249
250        old_len = (
251            self.client.zcard(table_name)
252            if datetime_column
253            else self.client.scard(table_name)
254        )
255        for doc in docs:
256            original_dt_val = (
257                doc[datetime_column]
258                if datetime_column and datetime_column in doc
259                else 0
260            )
261            dt_val = (
262                dateutil_parser.parse(str(original_dt_val))
263                if not isinstance(original_dt_val, int)
264                else int(original_dt_val)
265            ) if datetime_column else None
266            ts = (
267                int(dt_val.replace(tzinfo=timezone.utc).timestamp())
268                if isinstance(dt_val, datetime)
269                else int(dt_val)
270            ) if datetime_column else None
271            doc_str = json.dumps(
272                doc,
273                default=(lambda x: json_serialize_datetime(x) if hasattr(x, 'tzinfo') else str(x)),
274                separators=(',', ':'),
275                sort_keys=True,
276            )
277            if datetime_column:
278                self.client.zadd(table_name, {doc_str: ts})
279            else:
280                self.client.sadd(table_name, doc_str)
281
282        if datetime_column:
283            self.set(datetime_column_key, datetime_column)
284        new_len = (
285            self.client.zcard(table_name)
286            if datetime_column
287            else self.client.scard(table_name)
288        )
289
290        return new_len - old_len
291
292    def _push_hash_docs_to_list(self, docs: List[Dict[str, Any]], table: str) -> int:
293        table_name = self.quote_table(table)
294        next_ix = max(self.client.llen(table_name) or 0, 1)
295        for i, doc in enumerate(docs):
296            doc_key = f"{table_name}:{next_ix + i}"
297            self.client.hset(
298                doc_key,
299                mapping={
300                    str(k): str(v)
301                    for k, v in doc.items()
302                },
303            )
304            self.client.rpush(table_name, doc_key)
305
306        return next_ix + len(docs)
307
308    def get_datetime_column_key(self, table: str) -> str:
309        """
310        Return the key to store the datetime index for `table`.
311        """
312        table_name = self.quote_table(table)
313        return f'{table_name}:datetime_column'
314
315    def read(
316        self,
317        table: str,
318        begin: Union[datetime, int, str, None] = None,
319        end: Union[datetime, int, str, None] = None,
320        params: Optional[Dict[str, Any]] = None,
321        datetime_column: Optional[str] = None,
322        select_columns: Optional[List[str]] = None,
323        omit_columns: Optional[List[str]] = None,
324        debug: bool = False
325    ) -> Union['pd.DataFrame', None]:
326        """
327        Query the table and return the result dataframe.
328
329        Parameters
330        ----------
331        table: str
332            The "table" name to be queried.
333
334        begin: Union[datetime, int, str, None], default None
335            If provided, only return rows greater than or equal to this datetime.
336
337        end: Union[datetime, int, str, None], default None
338            If provided, only return rows older than this datetime.
339
340        params: Optional[Dict[str, Any]]
341            Additional Meerschaum filter parameters.
342
343        datetime_column: Optional[str], default None
344            If provided, use this column for the datetime index.
345            Otherwise infer from the table metadata.
346
347        select_columns: Optional[List[str]], default None
348            If provided, only return these columns.
349
350        omit_columns: Optional[List[str]], default None
351            If provided, do not include these columns in the result.
352
353        Returns
354        -------
355        A Pandas DataFrame of the result, or `None`.
356        """
357        from meerschaum.utils.dataframe import parse_df_datetimes, query_df
358        docs = self.read_docs(
359            table,
360            begin=begin,
361            end=end,
362            debug=debug,
363        )
364        df = parse_df_datetimes(docs)
365        datetime_column_key = self.get_datetime_column_key(table)
366        datetime_column = datetime_column or self.get(datetime_column_key)
367
368        return query_df(
369            df,
370            begin=(begin if datetime_column is not None else None),
371            end=(end if datetime_column is not None else None),
372            params=params,
373            datetime_column=datetime_column,
374            select_columns=select_columns,
375            omit_columns=omit_columns,
376            inplace=True,
377            reset_index=True,
378            debug=debug,
379        )
380
381    def read_docs(
382        self,
383        table: str,
384        begin: Union[datetime, int, str, None] = None,
385        end: Union[datetime, int, str, None] = None,
386        debug: bool = False,
387    ) -> List[Dict[str, str]]:
388        """
389        Return a list of previously pushed docs.
390
391        Parameters
392        ----------
393        table: str
394            The "table" name (root key) under which the docs were pushed.
395
396        begin: Union[datetime, int, str, None], default None
397            If provided and the table was created with a datetime index, only return documents
398            newer than this datetime.
399            If the table was not created with a datetime index and `begin` is an `int`,
400            return documents with a positional index greater than or equal to this value.
401
402        end: Union[datetime, int, str, None], default None
403            If provided and the table was created with a datetime index, only return documents
404            older than this datetime.
405            If the table was not created with a datetime index and `begin` is an `int`,
406            return documents with a positional index less than this value.
407
408        Returns
409        -------
410        A list of dictionaries, where all keys and values are strings.
411        """
412        from meerschaum.utils.dtypes import coerce_timezone
413        table_name = self.quote_table(table)
414        datetime_column_key = self.get_datetime_column_key(table)
415        datetime_column = self.get(datetime_column_key)
416
417        if debug:
418            dprint(f"Reading documents from '{table}' with {begin=}, {end=}")
419
420        if not datetime_column:
421            return [
422                json.loads(doc_bytes.decode('utf-8'))
423                for doc_bytes in self.client.smembers(table_name)
424            ]
425
426        dateutil_parser = mrsm.attempt_import('dateutil.parser')
427
428        if isinstance(begin, str):
429            begin = coerce_timezone(dateutil_parser.parse(begin))
430
431        if isinstance(end, str):
432            end = coerce_timezone(dateutil_parser.parse(end))
433
434        begin_ts = (
435            (
436                int(begin.replace(tzinfo=timezone.utc).timestamp())
437                if isinstance(begin, datetime)
438                else int(begin)
439            )
440            if begin is not None else '-inf'
441        )
442        end_ts = (
443            (
444                int(end.replace(tzinfo=timezone.utc).timestamp())
445                if isinstance(end, datetime)
446                else int(end)
447            )
448            if end is not None else '+inf'
449        )
450
451        if debug:
452            dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
453
454        return [
455            json.loads(doc_bytes.decode('utf-8'))
456            for doc_bytes in self.client.zrangebyscore(
457                table_name,
458                begin_ts,
459                end_ts,
460                withscores=False,
461            )
462        ]
463
464    def _read_docs_from_list(
465        self,
466        table: str,
467        begin_ix: Optional[int] = 0,
468        end_ix: Optional[int] = -1,
469        debug: bool = False,
470    ):
471        """
472        Read a list of documents from a "table".
473
474        Parameters
475        ----------
476        table: str
477            The "table" (root key) from which to read docs.
478
479        begin_ix: Optional[int], default 0
480            If provided, only read documents from this starting index.
481
482        end_ix: Optional[int], default -1
483            If provided, only read documents up to (not including) this index.
484
485        Returns
486        -------
487        A list of documents.
488        """
489        if begin_ix is None:
490            begin_ix = 0
491
492        if end_ix == 0:
493            return
494
495        if end_ix is None:
496            end_ix = -1
497        else:
498            end_ix -= 1
499
500        table_name = self.quote_table(table)
501        doc_keys = self.client.lrange(table_name, begin_ix, end_ix)
502        for doc_key in doc_keys:
503            yield {
504                key.decode('utf-8'): value.decode('utf-8')
505                for key, value in self.client.hgetall(doc_key).items()
506            }
507
508    def drop_table(self, table: str, debug: bool = False) -> None:
509        """
510        Drop a "table" of documents.
511
512        Parameters
513        ----------
514        table: str
515            The "table" name (root key) to be deleted.
516        """
517        table_name = self.quote_table(table)
518        datetime_column_key = self.get_datetime_column_key(table)
519        self.client.delete(table_name)
520        self.client.delete(datetime_column_key)
521
522    @classmethod
523    def get_entity_key(cls, *keys: Any) -> str:
524        """
525        Return a joined key to set an entity.
526        """
527        if not keys:
528            raise ValueError("No keys to be joined.")
529
530        for key in keys:
531            if cls.KEY_SEPARATOR in str(key):
532                raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.")
533
534        return cls.KEY_SEPARATOR.join([str(key) for key in keys])

Manage a Valkey instance.

Build a ValkeyConnector from connection attributes or a URI string.

IS_INSTANCE: bool = True
REQUIRED_ATTRIBUTES: List[str] = ['host']
OPTIONAL_ATTRIBUTES: List[str] = ['port', 'username', 'password', 'db', 'socket_timeout']
DEFAULT_ATTRIBUTES: Dict[str, Any] = {'username': 'default', 'port': 6379, 'db': 0, 'socket_timeout': 300}
KEY_SEPARATOR: str = ':'
client
 86    @property
 87    def client(self):
 88        """
 89        Return the Valkey client.
 90        """
 91        if '_client' in self.__dict__:
 92            return self.__dict__['_client']
 93
 94        valkey = mrsm.attempt_import('valkey')
 95
 96        if 'uri' in self.__dict__:
 97            self._client = valkey.Valkey.from_url(self.__dict__.get('uri'))
 98            return self._client
 99
100        optional_kwargs = {
101            key: self.__dict__.get(key)
102            for key in self.OPTIONAL_ATTRIBUTES
103            if key in self.__dict__
104        }
105        connection_kwargs = {
106            'host': self.host,
107            **optional_kwargs
108        }
109
110        self._client = valkey.Valkey(**connection_kwargs)
111        return self._client

Return the Valkey client.

URI: str
113    @property
114    def URI(self) -> str:
115        """
116        Return the connection URI for this connector.
117        """
118        import urllib.parse
119
120        if 'uri' in self.__dict__:
121            return self.__dict__.get('uri')
122
123        uri = "valkey://"
124        if 'username' in self.__dict__:
125            uri += urllib.parse.quote_plus(self.username) + ':'
126
127        if 'password' in self.__dict__:
128            uri += urllib.parse.quote_plus(self.password) + '@'
129
130        if 'host' in self.__dict__:
131            uri += self.host
132
133        if 'port' in self.__dict__:
134            uri += f':{self.port}'
135
136        if 'db' in self.__dict__:
137            uri += f"/{self.db}"
138
139        if 'socket_timeout' in self.__dict__:
140            uri += f"?timeout={self.socket_timeout}s"
141
142        return uri

Return the connection URI for this connector.

def set(self, key: str, value: Any, **kwargs: Any) -> None:
144    def set(self, key: str, value: Any, **kwargs: Any) -> None:
145        """
146        Set the `key` to `value`.
147        """
148        return self.client.set(key, value, **kwargs)

Set the key to value.

def get(self, key: str) -> Optional[str]:
150    def get(self, key: str) -> Union[str, None]:
151        """
152        Get the value for `key`.
153        """
154        val = self.client.get(key)
155        if val is None:
156            return None
157
158        return val.decode('utf-8')

Get the value for key.

def test_connection(self) -> bool:
160    def test_connection(self) -> bool:
161        """
162        Return whether a connection may be established.
163        """
164        return self.client.ping()

Return whether a connection may be established.

@classmethod
def quote_table(cls, table: str) -> str:
166    @classmethod
167    def quote_table(cls, table: str) -> str:
168        """
169        Return a quoted key.
170        """
171        return shlex.quote(table)

Return a quoted key.

@classmethod
def get_counter_key(cls, table: str) -> str:
173    @classmethod
174    def get_counter_key(cls, table: str) -> str:
175        """
176        Return the counter key for a given table.
177        """
178        table_name = cls.quote_table(table)
179        return f"{table_name}:counter"

Return the counter key for a given table.

def push_df( self, df: 'pd.DataFrame', table: str, datetime_column: Optional[str] = None, debug: bool = False) -> int:
181    def push_df(
182        self,
183        df: 'pd.DataFrame',
184        table: str,
185        datetime_column: Optional[str] = None,
186        debug: bool = False,
187    ) -> int:
188        """
189        Append a pandas DataFrame to a table.
190
191        Parameters
192        ----------
193        df: pd.DataFrame
194            The pandas DataFrame to append to the table.
195
196        table: str
197            The "table" name (root key).
198
199        datetime_column: Optional[str], default None
200            If provided, use this key as the datetime index.
201
202        Returns
203        -------
204        The current index counter value (how many docs have been pushed).
205        """
206        from meerschaum.utils.dataframe import to_json
207        docs_str = to_json(df)
208        docs = json.loads(docs_str)
209        return self.push_docs(
210            docs,
211            table,
212            datetime_column=datetime_column,
213            debug=debug,
214        )

Append a pandas DataFrame to a table.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to append to the table.
  • table (str): The "table" name (root key).
  • datetime_column (Optional[str], default None): If provided, use this key as the datetime index.
Returns
  • The current index counter value (how many docs have been pushed).
def push_docs( self, docs: List[Dict[str, Any]], table: str, datetime_column: Optional[str] = None, debug: bool = False) -> int:
216    def push_docs(
217        self,
218        docs: List[Dict[str, Any]],
219        table: str,
220        datetime_column: Optional[str] = None,
221        debug: bool = False,
222    ) -> int:
223        """
224        Append a list of documents to a table.
225
226        Parameters
227        ----------
228        docs: List[Dict[str, Any]]
229            The docs to be pushed.
230            All keys and values will be coerced into strings.
231
232        table: str
233            The "table" name (root key).
234
235        datetime_column: Optional[str], default None
236            If set, create a sorted set with this datetime column as the index.
237            Otherwise push the docs to a list.
238
239        Returns
240        -------
241        The current index counter value (how many docs have been pushed).
242        """
243        from meerschaum.utils.misc import json_serialize_datetime
244        table_name = self.quote_table(table)
245        datetime_column_key = self.get_datetime_column_key(table)
246        remote_datetime_column = self.get(datetime_column_key)
247        datetime_column = datetime_column or remote_datetime_column
248        dateutil_parser = mrsm.attempt_import('dateutil.parser')
249
250        old_len = (
251            self.client.zcard(table_name)
252            if datetime_column
253            else self.client.scard(table_name)
254        )
255        for doc in docs:
256            original_dt_val = (
257                doc[datetime_column]
258                if datetime_column and datetime_column in doc
259                else 0
260            )
261            dt_val = (
262                dateutil_parser.parse(str(original_dt_val))
263                if not isinstance(original_dt_val, int)
264                else int(original_dt_val)
265            ) if datetime_column else None
266            ts = (
267                int(dt_val.replace(tzinfo=timezone.utc).timestamp())
268                if isinstance(dt_val, datetime)
269                else int(dt_val)
270            ) if datetime_column else None
271            doc_str = json.dumps(
272                doc,
273                default=(lambda x: json_serialize_datetime(x) if hasattr(x, 'tzinfo') else str(x)),
274                separators=(',', ':'),
275                sort_keys=True,
276            )
277            if datetime_column:
278                self.client.zadd(table_name, {doc_str: ts})
279            else:
280                self.client.sadd(table_name, doc_str)
281
282        if datetime_column:
283            self.set(datetime_column_key, datetime_column)
284        new_len = (
285            self.client.zcard(table_name)
286            if datetime_column
287            else self.client.scard(table_name)
288        )
289
290        return new_len - old_len

Append a list of documents to a table.

Parameters
  • docs (List[Dict[str, Any]]): The docs to be pushed. All keys and values will be coerced into strings.
  • table (str): The "table" name (root key).
  • datetime_column (Optional[str], default None): If set, create a sorted set with this datetime column as the index. Otherwise push the docs to a list.
Returns
  • The current index counter value (how many docs have been pushed).
def get_datetime_column_key(self, table: str) -> str:
308    def get_datetime_column_key(self, table: str) -> str:
309        """
310        Return the key to store the datetime index for `table`.
311        """
312        table_name = self.quote_table(table)
313        return f'{table_name}:datetime_column'

Return the key to store the datetime index for table.

def read( self, table: str, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, debug: bool = False) -> Optional[ForwardRef('pd.DataFrame')]:
315    def read(
316        self,
317        table: str,
318        begin: Union[datetime, int, str, None] = None,
319        end: Union[datetime, int, str, None] = None,
320        params: Optional[Dict[str, Any]] = None,
321        datetime_column: Optional[str] = None,
322        select_columns: Optional[List[str]] = None,
323        omit_columns: Optional[List[str]] = None,
324        debug: bool = False
325    ) -> Union['pd.DataFrame', None]:
326        """
327        Query the table and return the result dataframe.
328
329        Parameters
330        ----------
331        table: str
332            The "table" name to be queried.
333
334        begin: Union[datetime, int, str, None], default None
335            If provided, only return rows greater than or equal to this datetime.
336
337        end: Union[datetime, int, str, None], default None
338            If provided, only return rows older than this datetime.
339
340        params: Optional[Dict[str, Any]]
341            Additional Meerschaum filter parameters.
342
343        datetime_column: Optional[str], default None
344            If provided, use this column for the datetime index.
345            Otherwise infer from the table metadata.
346
347        select_columns: Optional[List[str]], default None
348            If provided, only return these columns.
349
350        omit_columns: Optional[List[str]], default None
351            If provided, do not include these columns in the result.
352
353        Returns
354        -------
355        A Pandas DataFrame of the result, or `None`.
356        """
357        from meerschaum.utils.dataframe import parse_df_datetimes, query_df
358        docs = self.read_docs(
359            table,
360            begin=begin,
361            end=end,
362            debug=debug,
363        )
364        df = parse_df_datetimes(docs)
365        datetime_column_key = self.get_datetime_column_key(table)
366        datetime_column = datetime_column or self.get(datetime_column_key)
367
368        return query_df(
369            df,
370            begin=(begin if datetime_column is not None else None),
371            end=(end if datetime_column is not None else None),
372            params=params,
373            datetime_column=datetime_column,
374            select_columns=select_columns,
375            omit_columns=omit_columns,
376            inplace=True,
377            reset_index=True,
378            debug=debug,
379        )

Query the table and return the result dataframe.

Parameters
  • table (str): The "table" name to be queried.
  • begin (Union[datetime, int, str, None], default None): If provided, only return rows greater than or equal to this datetime.
  • end (Union[datetime, int, str, None], default None): If provided, only return rows older than this datetime.
  • params (Optional[Dict[str, Any]]): Additional Meerschaum filter parameters.
  • datetime_column (Optional[str], default None): If provided, use this column for the datetime index. Otherwise infer from the table metadata.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
Returns
  • A Pandas DataFrame of the result, or None.
def read_docs( self, table: str, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, debug: bool = False) -> List[Dict[str, str]]:
381    def read_docs(
382        self,
383        table: str,
384        begin: Union[datetime, int, str, None] = None,
385        end: Union[datetime, int, str, None] = None,
386        debug: bool = False,
387    ) -> List[Dict[str, str]]:
388        """
389        Return a list of previously pushed docs.
390
391        Parameters
392        ----------
393        table: str
394            The "table" name (root key) under which the docs were pushed.
395
396        begin: Union[datetime, int, str, None], default None
397            If provided and the table was created with a datetime index, only return documents
398            newer than this datetime.
399            If the table was not created with a datetime index and `begin` is an `int`,
400            return documents with a positional index greater than or equal to this value.
401
402        end: Union[datetime, int, str, None], default None
403            If provided and the table was created with a datetime index, only return documents
404            older than this datetime.
405            If the table was not created with a datetime index and `begin` is an `int`,
406            return documents with a positional index less than this value.
407
408        Returns
409        -------
410        A list of dictionaries, where all keys and values are strings.
411        """
412        from meerschaum.utils.dtypes import coerce_timezone
413        table_name = self.quote_table(table)
414        datetime_column_key = self.get_datetime_column_key(table)
415        datetime_column = self.get(datetime_column_key)
416
417        if debug:
418            dprint(f"Reading documents from '{table}' with {begin=}, {end=}")
419
420        if not datetime_column:
421            return [
422                json.loads(doc_bytes.decode('utf-8'))
423                for doc_bytes in self.client.smembers(table_name)
424            ]
425
426        dateutil_parser = mrsm.attempt_import('dateutil.parser')
427
428        if isinstance(begin, str):
429            begin = coerce_timezone(dateutil_parser.parse(begin))
430
431        if isinstance(end, str):
432            end = coerce_timezone(dateutil_parser.parse(end))
433
434        begin_ts = (
435            (
436                int(begin.replace(tzinfo=timezone.utc).timestamp())
437                if isinstance(begin, datetime)
438                else int(begin)
439            )
440            if begin is not None else '-inf'
441        )
442        end_ts = (
443            (
444                int(end.replace(tzinfo=timezone.utc).timestamp())
445                if isinstance(end, datetime)
446                else int(end)
447            )
448            if end is not None else '+inf'
449        )
450
451        if debug:
452            dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
453
454        return [
455            json.loads(doc_bytes.decode('utf-8'))
456            for doc_bytes in self.client.zrangebyscore(
457                table_name,
458                begin_ts,
459                end_ts,
460                withscores=False,
461            )
462        ]

Return a list of previously pushed docs.

Parameters
  • table (str): The "table" name (root key) under which the docs were pushed.
  • begin (Union[datetime, int, str, None], default None): If provided and the table was created with a datetime index, only return documents newer than this datetime. If the table was not created with a datetime index and begin is an int, return documents with a positional index greater than or equal to this value.
  • end (Union[datetime, int, str, None], default None): If provided and the table was created with a datetime index, only return documents older than this datetime. If the table was not created with a datetime index and begin is an int, return documents with a positional index less than this value.
Returns
  • A list of dictionaries, where all keys and values are strings.
def drop_table(self, table: str, debug: bool = False) -> None:
508    def drop_table(self, table: str, debug: bool = False) -> None:
509        """
510        Drop a "table" of documents.
511
512        Parameters
513        ----------
514        table: str
515            The "table" name (root key) to be deleted.
516        """
517        table_name = self.quote_table(table)
518        datetime_column_key = self.get_datetime_column_key(table)
519        self.client.delete(table_name)
520        self.client.delete(datetime_column_key)

Drop a "table" of documents.

Parameters
  • table (str): The "table" name (root key) to be deleted.
@classmethod
def get_entity_key(cls, *keys: Any) -> str:
522    @classmethod
523    def get_entity_key(cls, *keys: Any) -> str:
524        """
525        Return a joined key to set an entity.
526        """
527        if not keys:
528            raise ValueError("No keys to be joined.")
529
530        for key in keys:
531            if cls.KEY_SEPARATOR in str(key):
532                raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.")
533
534        return cls.KEY_SEPARATOR.join([str(key) for key in keys])

Return a joined key to set an entity.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
126def register_pipe(
127    self,
128    pipe: mrsm.Pipe,
129    debug: bool = False,
130    **kwargs: Any
131) -> SuccessTuple:
132    """
133    Insert the pipe's attributes into the internal `pipes` table.
134
135    Parameters
136    ----------
137    pipe: mrsm.Pipe
138        The pipe to be registered.
139
140    Returns
141    -------
142    A `SuccessTuple` of the result.
143    """
144    attributes = {
145        'connector_keys': str(pipe.connector_keys),
146        'metric_key': str(pipe.metric_key),
147        'location_key': str(pipe.location_key),
148    }
149    parameters_str = json.dumps(
150        pipe._attributes.get('parameters', {}),
151        separators=(',', ':'),
152    )
153
154    pipe_key = get_pipe_key(pipe)
155    parameters_key = get_pipe_parameters_key(pipe)
156
157    try:
158        existing_pipe_id = self.get(pipe_key)
159        if existing_pipe_id is not None:
160            return False, f"{pipe} is already registered."
161
162        pipe_id = self.client.incr(PIPES_COUNTER)
163        _ = self.push_docs(
164            [{'pipe_id': pipe_id, **attributes}],
165            PIPES_TABLE,
166            datetime_column='pipe_id',
167            debug=debug,
168        )
169        self.set(pipe_key, pipe_id)
170        self.set(parameters_key, parameters_str)
171
172    except Exception as e:
173        return False, f"Failed to register {pipe}:\n{e}"
174
175    return True, "Success"

Insert the pipe's attributes into the internal pipes table.

Parameters
  • pipe (mrsm.Pipe): The pipe to be registered.
Returns
  • A SuccessTuple of the result.
def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Union[str, int, NoneType]:
178def get_pipe_id(
179    self,
180    pipe: mrsm.Pipe,
181    debug: bool = False,
182    **kwargs: Any
183) -> Union[str, int, None]:
184    """
185    Return the `_id` for the pipe if it exists.
186
187    Parameters
188    ----------
189    pipe: mrsm.Pipe
190        The pipe whose `_id` to fetch.
191
192    Returns
193    -------
194    The `_id` for the pipe's document or `None`.
195    """
196    pipe_key = get_pipe_key(pipe)
197    try:
198        return int(self.get(pipe_key))
199    except Exception:
200        pass
201    return None

Return the _id for the pipe if it exists.

Parameters
  • pipe (mrsm.Pipe): The pipe whose _id to fetch.
Returns
  • The _id for the pipe's document or None.
def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Dict[str, Any]:
204def get_pipe_attributes(
205    self,
206    pipe: mrsm.Pipe,
207    debug: bool = False,
208    **kwargs: Any
209) -> Dict[str, Any]:
210    """
211    Return the pipe's document from the internal `pipes` collection.
212
213    Parameters
214    ----------
215    pipe: mrsm.Pipe
216        The pipe whose attributes should be retrieved.
217
218    Returns
219    -------
220    The document that matches the keys of the pipe.
221    """
222    pipe_id = pipe.get_id(debug=debug)
223    if pipe_id is None:
224        return {}
225
226    parameters_key = get_pipe_parameters_key(pipe)
227    parameters_str = self.get(parameters_key)
228
229    parameters = json.loads(parameters_str) if parameters_str else {}
230
231    attributes = {
232        'connector_keys': pipe.connector_keys,
233        'metric_key': pipe.metric_key,
234        'location_key': pipe.location_key,
235        'parameters': parameters,
236        'pipe_id': pipe_id,
237    }
238    return attributes

Return the pipe's document from the internal pipes collection.

Parameters
  • pipe (mrsm.Pipe): The pipe whose attributes should be retrieved.
Returns
  • The document that matches the keys of the pipe.
def edit_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
241def edit_pipe(
242    self,
243    pipe: mrsm.Pipe,
244    debug: bool = False,
245    **kwargs: Any
246) -> mrsm.SuccessTuple:
247    """
248    Edit the attributes of the pipe.
249
250    Parameters
251    ----------
252    pipe: mrsm.Pipe
253        The pipe whose in-memory parameters must be persisted.
254
255    Returns
256    -------
257    A `SuccessTuple` indicating success.
258    """
259    pipe_id = pipe.get_id(debug=debug)
260    if pipe_id is None:
261        return False, f"{pipe} is not registered."
262
263    parameters_key = get_pipe_parameters_key(pipe)
264    parameters_str = json.dumps(pipe.parameters, separators=(',', ':'))
265    self.set(parameters_key, parameters_str)
266    return True, "Success"

Edit the attributes of the pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose in-memory parameters must be persisted.
Returns
  • A SuccessTuple indicating success.
def pipe_exists( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> bool:
269def pipe_exists(
270    self,
271    pipe: mrsm.Pipe,
272    debug: bool = False,
273    **kwargs: Any
274) -> bool:
275    """
276    Check whether a pipe's target table exists.
277
278    Parameters
279    ----------
280    pipe: mrsm.Pipe
281        The pipe to check whether its table exists.
282
283    Returns
284    -------
285    A `bool` indicating the table exists.
286    """
287    table_name = self.quote_table(pipe.target)
288    return self.client.exists(table_name) != 0

Check whether a pipe's target table exists.

Parameters
  • pipe (mrsm.Pipe): The pipe to check whether its table exists.
Returns
  • A bool indicating the table exists.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
291def drop_pipe(
292    self,
293    pipe: mrsm.Pipe,
294    debug: bool = False,
295    **kwargs: Any
296) -> mrsm.SuccessTuple:
297    """
298    Drop a pipe's collection if it exists.
299
300    Parameters
301    ----------
302    pipe: mrsm.Pipe
303        The pipe to be dropped.
304
305    Returns
306    -------
307    A `SuccessTuple` indicating success.
308    """
309    for chunk_begin, chunk_end in pipe.get_chunk_bounds(debug=debug):
310        clear_chunk_success, clear_chunk_msg = pipe.clear(
311            begin=chunk_begin,
312            end=chunk_end,
313            debug=debug,
314        )
315        if not clear_chunk_success:
316            return clear_chunk_success, clear_chunk_msg
317    try:
318        self.drop_table(pipe.target, debug=debug)
319    except Exception as e:
320        return False, f"Failed to drop {pipe}:\n{e}"
321
322    if 'valkey' not in pipe.parameters:
323        return True, "Success"
324
325    pipe.parameters['valkey']['dtypes'] = {}
326    if not pipe.temporary:
327        edit_success, edit_msg = pipe.edit(debug=debug)
328        if not edit_success:
329            return edit_success, edit_msg
330
331    return True, "Success"

Drop a pipe's collection if it exists.

Parameters
  • pipe (mrsm.Pipe): The pipe to be dropped.
Returns
  • A SuccessTuple indicating success.
def delete_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
334def delete_pipe(
335    self,
336    pipe: mrsm.Pipe,
337    debug: bool = False,
338    **kwargs: Any
339) -> mrsm.SuccessTuple:
340    """
341    Delete a pipe's registration from the `pipes` collection.
342
343    Parameters
344    ----------
345    pipe: mrsm.Pipe
346        The pipe to be deleted.
347
348    Returns
349    -------
350    A `SuccessTuple` indicating success.
351    """
352    drop_success, drop_message = pipe.drop(debug=debug)
353    if not drop_success:
354        return drop_success, drop_message
355
356    pipe_id = self.get_pipe_id(pipe, debug=debug)
357    if pipe_id is None:
358        return False, f"{pipe} is not registered."
359
360    pipe_key = get_pipe_key(pipe)
361    parameters_key = get_pipe_parameters_key(pipe)
362    self.client.delete(pipe_key)
363    self.client.delete(parameters_key)
364    df = self.read(PIPES_TABLE, params={'pipe_id': pipe_id})
365    docs = df.to_dict(orient='records')
366    if docs:
367        doc = docs[0]
368        doc_str = json.dumps(
369            doc,
370            default=(lambda x: json_serialize_datetime(x) if hasattr(x, 'tzinfo') else str(x)),
371            separators=(',', ':'),
372            sort_keys=True,
373        )
374        self.client.zrem(PIPES_TABLE, doc_str)
375    return True, "Success"

Delete a pipe's registration from the pipes collection.

Parameters
  • pipe (mrsm.Pipe): The pipe to be deleted.
Returns
  • A SuccessTuple indicating success.
def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Optional[ForwardRef('pd.DataFrame')]:
378def get_pipe_data(
379    self,
380    pipe: mrsm.Pipe,
381    select_columns: Optional[List[str]] = None,
382    omit_columns: Optional[List[str]] = None,
383    begin: Union[datetime, int, None] = None,
384    end: Union[datetime, int, None] = None,
385    params: Optional[Dict[str, Any]] = None,
386    debug: bool = False,
387    **kwargs: Any
388) -> Union['pd.DataFrame', None]:
389    """
390    Query a pipe's target table and return the DataFrame.
391
392    Parameters
393    ----------
394    pipe: mrsm.Pipe
395        The pipe with the target table from which to read.
396
397    select_columns: Optional[List[str]], default None
398        If provided, only select these given columns.
399        Otherwise select all available columns (i.e. `SELECT *`).
400
401    omit_columns: Optional[List[str]], default None
402        If provided, remove these columns from the selection.
403
404    begin: Union[datetime, int, None], default None
405        The earliest `datetime` value to search from (inclusive).
406
407    end: Union[datetime, int, None], default None
408        The lastest `datetime` value to search from (exclusive).
409
410    params: Optional[Dict[str, str]], default None
411        Additional filters to apply to the query.
412
413    Returns
414    -------
415    The target table's data as a DataFrame.
416    """
417    if not pipe.exists(debug=debug):
418        return None
419
420    from meerschaum.utils.dataframe import query_df, parse_df_datetimes
421    from meerschaum.utils.dtypes import are_dtypes_equal
422
423    valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {})
424    dt_col = pipe.columns.get('datetime', None)
425    table_name = self.quote_table(pipe.target)
426    indices = [col for col in pipe.columns.values() if col]
427    ix_docs = [
428        string_to_dict(doc.get('ix', '').replace(COLON, ':'))
429        for doc in self.read_docs(
430            pipe.target,
431            begin=begin,
432            end=end,
433            debug=debug,
434        )
435    ]
436    try:
437        docs_strings = [
438            self.get(get_document_key(
439                doc, indices, table_name
440            ))
441            for doc in ix_docs
442        ]
443    except Exception as e:
444        warn(f"Failed to fetch documents for {pipe}:\n{e}")
445        docs_strings = []
446
447    docs = [
448        json.loads(doc_str)
449        for doc_str in docs_strings
450        if doc_str
451    ]
452    ignore_dt_cols = [
453        col
454        for col, dtype in pipe.dtypes.items()
455        if not are_dtypes_equal(str(dtype), 'datetime')
456    ]
457
458    df = parse_df_datetimes(
459        docs,
460        ignore_cols=ignore_dt_cols,
461        chunksize=kwargs.get('chunksize', None),
462        strip_timezone=(pipe.tzinfo is None),
463        debug=debug,
464    )
465    for col, typ in valkey_dtypes.items():
466        try:
467            df[col] = df[col].astype(typ)
468        except Exception:
469            pass
470
471    df = pipe.enforce_dtypes(df, debug=debug)
472
473    if len(df) == 0:
474        return query_df(df, select_columns=select_columns, omit_columns=omit_columns)
475
476    return query_df(
477        df,
478        select_columns=select_columns,
479        omit_columns=omit_columns,
480        params=params,
481        begin=begin,
482        end=end,
483        datetime_column=dt_col,
484        inplace=True,
485        reset_index=True,
486    )

Query a pipe's target table and return the DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe with the target table from which to read.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, None], default None): The earliest datetime value to search from (inclusive).
  • end (Union[datetime, int, None], default None): The lastest datetime value to search from (exclusive).
  • params (Optional[Dict[str, str]], default None): Additional filters to apply to the query.
Returns
  • The target table's data as a DataFrame.
def sync_pipe( self, pipe: meerschaum.Pipe, df: 'pd.DataFrame' = None, check_existing: bool = True, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
489def sync_pipe(
490    self,
491    pipe: mrsm.Pipe,
492    df: 'pd.DataFrame' = None,
493    check_existing: bool = True,
494    debug: bool = False,
495    **kwargs: Any
496) -> mrsm.SuccessTuple:
497    """
498    Upsert new documents into the pipe's collection.
499
500    Parameters
501    ----------
502    pipe: mrsm.Pipe
503        The pipe whose collection should receive the new documents.
504
505    df: Union['pd.DataFrame', Iterator['pd.DataFrame']], default None
506        The data to be synced.
507
508    check_existing: bool, default True
509        If `False`, do not check the documents against existing data and instead insert directly.
510
511    Returns
512    -------
513    A `SuccessTuple` indicating success.
514    """
515    from meerschaum.utils.dtypes import are_dtypes_equal
516    dt_col = pipe.columns.get('datetime', None)
517    indices = [col for col in pipe.columns.values() if col]
518    table_name = self.quote_table(pipe.target)
519    is_dask = 'dask' in df.__module__
520    if is_dask:
521        df = df.compute()
522    upsert = pipe.parameters.get('upsert', False)
523    static = pipe.parameters.get('static', False)
524
525    def _serialize_indices_docs(_docs):
526        return [
527            {
528                'ix': get_document_key(doc, indices),
529                **(
530                    {
531                        dt_col: doc.get(dt_col, 0)
532                    }
533                    if dt_col
534                    else {}
535                )
536            }
537            for doc in _docs
538        ]
539
540    valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {})
541    new_dtypes = {
542        str(key): (
543            str(val)
544            if not are_dtypes_equal(str(val), 'datetime')
545            else 'datetime64[ns, UTC]'
546        )
547        for key, val in df.dtypes.items()
548        if str(key) not in valkey_dtypes
549    }
550    for col, typ in {c: v for c, v in valkey_dtypes.items()}.items():
551        if col in df.columns:
552            try:
553                df[col] = df[col].astype(typ)
554            except Exception:
555                valkey_dtypes[col] = 'string'
556                new_dtypes[col] = 'string'
557                df[col] = df[col].astype('string')
558
559    if new_dtypes and (not static or not valkey_dtypes):
560        valkey_dtypes.update(new_dtypes)
561        if 'valkey' not in pipe.parameters:
562            pipe.parameters['valkey'] = {}
563        pipe.parameters['valkey']['dtypes'] = valkey_dtypes
564        if not pipe.temporary:
565            edit_success, edit_msg = pipe.edit(debug=debug)
566            if not edit_success:
567                return edit_success, edit_msg
568
569    unseen_df, update_df, delta_df = (
570        pipe.filter_existing(df, include_unchanged_columns=True, debug=debug)
571        if check_existing and not upsert
572        else (None, df, df)
573    )
574    num_insert = len(unseen_df) if unseen_df is not None else 0
575    num_update = len(update_df) if update_df is not None else 0
576    msg = (
577        f"Inserted {num_insert}, updated {num_update} rows."
578        if not upsert
579        else f"Upserted {num_update} rows."
580    )
581    if len(delta_df) == 0:
582        return True, msg
583
584    unseen_docs = unseen_df.to_dict(orient='records') if unseen_df is not None else []
585    unseen_indices_docs = _serialize_indices_docs(unseen_docs)
586    unseen_ix_vals = {
587        get_document_key(doc, indices, table_name): serialize_document(doc)
588        for doc in unseen_docs
589    }
590    for key, val in unseen_ix_vals.items():
591        try:
592            self.set(key, val)
593        except Exception as e:
594            return False, f"Failed to set keys for {pipe}:\n{e}"
595
596    try:
597        self.push_docs(
598            unseen_indices_docs,
599            pipe.target,
600            datetime_column=dt_col,
601            debug=debug,
602        )
603    except Exception as e:
604        return False, f"Failed to push docs to '{pipe.target}':\n{e}"
605
606    update_docs = update_df.to_dict(orient='records') if update_df is not None else []
607    update_ix_docs = {
608        get_document_key(doc, indices, table_name): doc
609        for doc in update_docs
610    }
611    existing_docs_data = {
612        key: self.get(key)
613        for key in update_ix_docs
614    } if pipe.exists(debug=debug) else {}
615    existing_docs = {
616        key: json.loads(data)
617        for key, data in existing_docs_data.items()
618        if data
619    }
620    new_update_docs = {
621        key: doc
622        for key, doc in update_ix_docs.items()
623        if key not in existing_docs
624    }
625    new_ix_vals = {
626        get_document_key(doc, indices, table_name): serialize_document(doc)
627        for doc in new_update_docs.values()
628    }
629    for key, val in new_ix_vals.items():
630        try:
631            self.set(key, val)
632        except Exception as e:
633            return False, f"Failed to set keys for {pipe}:\n{e}"
634
635    old_update_docs = {
636        key: {
637            **existing_docs[key],
638            **doc
639        }
640        for key, doc in update_ix_docs.items()
641        if key in existing_docs
642    }
643    new_indices_docs = _serialize_indices_docs([doc for doc in new_update_docs.values()])
644    try:
645        if new_indices_docs:
646            self.push_docs(
647                new_indices_docs,
648                pipe.target,
649                datetime_column=dt_col,
650                debug=debug,
651            )
652    except Exception as e:
653        return False, f"Failed to upsert '{pipe.target}':\n{e}"
654
655    for key, doc in old_update_docs.items():
656        try:
657            self.set(key, serialize_document(doc))
658        except Exception as e:
659            return False, f"Failed to set keys for {pipe}:\n{e}"
660
661    return True, msg

Upsert new documents into the pipe's collection.

Parameters
  • pipe (mrsm.Pipe): The pipe whose collection should receive the new documents.
  • df (Union['pd.DataFrame', Iterator['pd.DataFrame']], default None): The data to be synced.
  • check_existing (bool, default True): If False, do not check the documents against existing data and instead insert directly.
Returns
  • A SuccessTuple indicating success.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Dict[str, str]:
664def get_pipe_columns_types(
665    self,
666    pipe: mrsm.Pipe,
667    debug: bool = False,
668    **kwargs: Any
669) -> Dict[str, str]:
670    """
671    Return the data types for the columns in the target table for data type enforcement.
672
673    Parameters
674    ----------
675    pipe: mrsm.Pipe
676        The pipe whose target table contains columns and data types.
677
678    Returns
679    -------
680    A dictionary mapping columns to data types.
681    """
682    if not pipe.exists(debug=debug):
683        return {}
684
685    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
686    return {
687        col: get_db_type_from_pd_type(typ, flavor='postgresql')
688        for col, typ in pipe.parameters.get('valkey', {}).get('dtypes', {}).items()
689    }

Return the data types for the columns in the target table for data type enforcement.

Parameters
  • pipe (mrsm.Pipe): The pipe whose target table contains columns and data types.
Returns
  • A dictionary mapping columns to data types.
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
692def clear_pipe(
693    self,
694    pipe: mrsm.Pipe,
695    begin: Union[datetime, int, None] = None,
696    end: Union[datetime, int, None] = None,
697    params: Optional[Dict[str, Any]] = None,
698    debug: bool = False,
699) -> mrsm.SuccessTuple:
700    """
701    Delete rows within `begin`, `end`, and `params`.
702
703    Parameters
704    ----------
705    pipe: mrsm.Pipe
706        The pipe whose rows to clear.
707
708    begin: Union[datetime, int, None], default None
709        If provided, remove rows >= `begin`.
710
711    end: Union[datetime, int, None], default None
712        If provided, remove rows < `end`.
713
714    params: Optional[Dict[str, Any]], default None
715        If provided, only remove rows which match the `params` filter.
716
717    Returns
718    -------
719    A `SuccessTuple` indicating success.
720    """
721    dt_col = pipe.columns.get('datetime', None)
722
723    existing_df = pipe.get_data(
724        begin=begin,
725        end=end,
726        params=params,
727        debug=debug,
728    )
729    if existing_df is None or len(existing_df) == 0:
730        return True, "Deleted 0 rows."
731
732    docs = existing_df.to_dict(orient='records')
733    table_name = self.quote_table(pipe.target)
734    indices = [col for col in pipe.columns.values() if col]
735    for doc in docs:
736        set_doc_key = get_document_key(doc, indices)
737        table_doc_key = get_document_key(doc, indices, table_name)
738        try:
739            if dt_col:
740                self.client.zrem(table_name, set_doc_key)
741            else:
742                self.client.srem(table_name, set_doc_key)
743            self.client.delete(table_doc_key)
744        except Exception as e:
745            return False, f"Failed to delete documents:\n{e}"
746    msg = (
747        f"Deleted {len(docs)} row"
748        + ('s' if len(docs) != 1 else '')
749        + '.'
750    )
751    return True, msg

Delete rows within begin, end, and params.

Parameters
  • pipe (mrsm.Pipe): The pipe whose rows to clear.
  • begin (Union[datetime, int, None], default None): If provided, remove rows >= begin.
  • end (Union[datetime, int, None], default None): If provided, remove rows < end.
  • params (Optional[Dict[str, Any]], default None): If provided, only remove rows which match the params filter.
Returns
  • A SuccessTuple indicating success.
def get_sync_time( self, pipe: meerschaum.Pipe, newest: bool = True, **kwargs: Any) -> Union[datetime.datetime, int, NoneType]:
754def get_sync_time(
755    self,
756    pipe: mrsm.Pipe,
757    newest: bool = True,
758    **kwargs: Any
759) -> Union[datetime, int, None]:
760    """
761    Return the newest (or oldest) timestamp in a pipe.
762    """
763    from meerschaum.utils.dtypes import are_dtypes_equal
764    dt_col = pipe.columns.get('datetime', None)
765    dt_typ = pipe.dtypes.get(dt_col, 'datetime64[ns, UTC]')
766    if not dt_col:
767        return None
768
769    dateutil_parser = mrsm.attempt_import('dateutil.parser')
770    table_name = self.quote_table(pipe.target)
771    try:
772        vals = (
773            self.client.zrevrange(table_name, 0, 0)
774            if newest
775            else self.client.zrange(table_name, 0, 0)
776        )
777        if not vals:
778            return None
779        val = vals[0]
780    except Exception:
781        return None
782
783    doc = json.loads(val)
784    dt_val = doc.get(dt_col, None)
785    if dt_val is None:
786        return None
787
788    try:
789        return (
790            int(dt_val)
791            if are_dtypes_equal(dt_typ, 'int')
792            else dateutil_parser.parse(str(dt_val))
793        )
794    except Exception as e:
795        warn(f"Failed to parse sync time for {pipe}:\n{e}")
796
797    return None

Return the newest (or oldest) timestamp in a pipe.

def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Optional[int]:
800def get_pipe_rowcount(
801    self,
802    pipe: mrsm.Pipe,
803    begin: Union[datetime, int, None] = None,
804    end: Union[datetime, int, None] = None,
805    params: Optional[Dict[str, Any]] = None,
806    debug: bool = False,
807    **kwargs: Any
808) -> Union[int, None]:
809    """
810    Return the number of documents in the pipe's set.
811    """
812    dt_col = pipe.columns.get('datetime', None)
813    table_name = self.quote_table(pipe.target)
814
815    if not pipe.exists():
816        return 0
817
818    try:
819        if begin is None and end is None and params is None:
820            return (
821                self.client.zcard(table_name)
822                if dt_col
823                else self.client.llen(table_name)
824            )
825    except Exception:
826        return None
827
828    df = pipe.get_data(begin=begin, end=end, params=params, debug=debug)
829    if df is None:
830        return 0
831
832    return len(df)

Return the number of documents in the pipe's set.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Optional[List[Tuple[str, str, Optional[str]]]]:
835def fetch_pipes_keys(
836    self,
837    connector_keys: Optional[List[str]] = None,
838    metric_keys: Optional[List[str]] = None,
839    location_keys: Optional[List[str]] = None,
840    tags: Optional[List[str]] = None,
841    params: Optional[Dict[str, Any]] = None,
842    debug: bool = False
843) -> Optional[List[Tuple[str, str, Optional[str]]]]:
844    """
845    Return the keys for the registered pipes.
846    """
847    from meerschaum.utils.dataframe import query_df
848    from meerschaum.utils.misc import separate_negation_values
849    try:
850        df = self.read(PIPES_TABLE, debug=debug)
851    except Exception:
852        return []
853
854    if df is None or len(df) == 0:
855        return []
856
857    query = {}
858    if connector_keys:
859        query['connector_keys'] = [str(k) for k in connector_keys]
860    if metric_keys:
861        query['metric_key'] = [str(k) for k in metric_keys]
862    if location_keys:
863        query['location_key'] = [str(k) for k in location_keys]
864    if params:
865        query.update(params)
866
867    df = query_df(df, query, inplace=True)
868
869    keys = [
870        (
871            doc['connector_keys'],
872            doc['metric_key'],
873            doc['location_key'],
874        )
875        for doc in df.to_dict(orient='records')
876    ]
877    if not tags:
878        return keys
879
880    tag_groups = [tag.split(',') for tag in tags]
881    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
882
883    filtered_keys = []
884    for ck, mk, lk in keys:
885        pipe = mrsm.Pipe(ck, mk, lk, instance=self)
886        pipe_tags = set(pipe.tags)
887        
888        include_pipe = True
889        for in_tags, ex_tags in in_ex_tag_groups:
890            all_in = all(tag in pipe_tags for tag in in_tags)
891            any_ex = any(tag in pipe_tags for tag in ex_tags)
892
893            if (not all_in) or any_ex:
894                include_pipe = False
895                continue
896
897        if include_pipe:
898            filtered_keys.append((ck, mk, lk))
899
900    return filtered_keys

Return the keys for the registered pipes.

def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> List[Dict[str, Any]]:
17def fetch(
18    self,
19    pipe: mrsm.Pipe,
20    begin: Union[datetime, int, None] = None,
21    end: Union[datetime, int, None] = None,
22    params: Optional[Dict[str, Any]] = None,
23    debug: bool = False,
24    **kwargs: Any
25) -> List[Dict[str, Any]]:
26    """
27    Return data from a source database.
28    """
29    source_key = pipe.parameters.get('valkey', {}).get('key', None)
30    if not source_key:
31        return []
32
33    try:
34        key_type = self.client.type(source_key).decode('utf-8')
35    except Exception:
36        warn(f"Could not determine the type for key '{source_key}'.")
37        return []
38
39    begin_ts = (
40        (
41            int(begin.replace(tzinfo=timezone.utc).timestamp())
42            if isinstance(begin, datetime)
43            else int(begin)
44        )
45        if begin is not None else '-inf'
46    )
47    end_ts = (
48        (
49            int(end.replace(tzinfo=timezone.utc).timestamp())
50            if isinstance(end, datetime)
51            else int(end)
52        )
53        if end is not None else '+inf'
54    )
55
56    if debug:
57        dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
58
59    if key_type == 'set':
60        return [
61            json.loads(doc_bytes.decode('utf-8'))
62            for doc_bytes in self.client.smembers(source_key)
63        ]
64
65    if key_type == 'zset':
66        return [
67            json.loads(doc_bytes.decode('utf-8'))
68            for doc_bytes in self.client.zrangebyscore(
69                source_key,
70                begin_ts,
71                end_ts,
72                withscores=False,
73            )
74        ]
75
76    return [{source_key: self.get(source_key)}]

Return data from a source database.

def get_users_pipe(self):
20def get_users_pipe(self):
21    """
22    Return the pipe which stores the registered users.
23    """
24    return mrsm.Pipe(
25        'mrsm', 'users',
26        columns=['user_id'],
27        temporary=True,
28        target=USERS_TABLE,
29        instance=self,
30    )

Return the pipe which stores the registered users.

@classmethod
def get_user_key( cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str:
33@classmethod
34def get_user_key(cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str:
35    """
36    Return the key to store metadata about a user.
37
38    Parameters
39    ----------
40    user_id_or_username: str
41        The user ID or username of the given user.
42        If `by_username` is `True`, then provide the username.
43
44    sub_key: str
45        The key suffix, e.g. `'attributes'`.
46
47    by_username: bool, default False
48        If `True`, then treat `user_id_or_username` as a username.
49
50    Returns
51    -------
52    A key to store information about a user.
53
54    Examples
55    --------
56    >>> get_user_key('deadbeef', 'attributes')
57    'mrsm_user:user_id:deadbeef:attributes'
58    >>> get_user_key('foo', 'user_id', by_username=True)
59    'mrsm_user:username:foo:user_id'
60    """
61    key_type = 'user_id' if not by_username else 'username'
62    return cls.get_entity_key(USER_PREFIX, key_type, user_id_or_username, sub_key)

Return the key to store metadata about a user.

Parameters
  • user_id_or_username (str): The user ID or username of the given user. If by_username is True, then provide the username.
  • sub_key (str): The key suffix, e.g. 'attributes'.
  • by_username (bool, default False): If True, then treat user_id_or_username as a username.
Returns
  • A key to store information about a user.
Examples
>>> get_user_key('deadbeef', 'attributes')
'mrsm_user:user_id:deadbeef:attributes'
>>> get_user_key('foo', 'user_id', by_username=True)
'mrsm_user:username:foo:user_id'
@classmethod
def get_user_keys_vals( cls, user: meerschaum.core.User._User.User, mutable_only: bool = False) -> Dict[str, str]:
 65@classmethod
 66def get_user_keys_vals(
 67    cls,
 68    user: 'mrsm.core.User',
 69    mutable_only: bool = False,
 70) -> Dict[str, str]:
 71    """
 72    Return a dictionary containing keys and values to set for the user.
 73
 74    Parameters
 75    ----------
 76    user: mrsm.core.User
 77        The user for which to generate the keys.
 78
 79    mutable_only: bool, default False
 80        If `True`, only return keys which may be edited.
 81
 82    Returns
 83    -------
 84    A dictionary mapping a user's keys to values.
 85    """
 86    user_attributes_str = json.dumps(user.attributes, separators=(',', ':'))
 87    mutable_keys_vals = {
 88        cls.get_user_key(user.user_id, 'attributes'): user_attributes_str,
 89        cls.get_user_key(user.user_id, 'email'): user.email,
 90        cls.get_user_key(user.user_id, 'type'): user.type,
 91        cls.get_user_key(user.user_id, 'password_hash'): user.password_hash,
 92    }
 93    if mutable_only:
 94        return mutable_keys_vals
 95
 96    immutable_keys_vals = {
 97        cls.get_user_key(user.user_id, 'username'): user.username,
 98        cls.get_user_key(user.username, 'user_id', by_username=True): user.user_id,
 99    }
100
101    return {**immutable_keys_vals, **mutable_keys_vals}

Return a dictionary containing keys and values to set for the user.

Parameters
  • user (mrsm.core.User): The user for which to generate the keys.
  • mutable_only (bool, default False): If True, only return keys which may be edited.
Returns
  • A dictionary mapping a user's keys to values.
def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
104def register_user(
105    self,
106    user: 'mrsm.core.User',
107    debug: bool = False,
108    **kwargs: Any
109) -> SuccessTuple:
110    """
111    Register a new user.
112    """
113    from meerschaum.utils.misc import generate_password
114
115    user.user_id = generate_password(12)
116    users_pipe = self.get_users_pipe()
117    keys_vals = self.get_user_keys_vals(user)
118
119    try:
120        sync_success, sync_msg = users_pipe.sync(
121            [
122                {
123                    'user_id': user.user_id,
124                    'username': user.username,
125                },
126            ],
127            check_existing=False,
128            debug=debug,
129        )
130        if not sync_success:
131            return sync_success, sync_msg
132
133        for key, val in keys_vals.items():
134            if val is not None:
135                self.set(key, val)
136
137        success, msg = True, "Success"
138    except Exception as e:
139        success = False
140        import traceback
141        traceback.print_exc()
142        msg = f"Failed to register '{user.username}':\n{e}"
143
144    if not success:
145        for key in keys_vals:
146            try:
147                self.client.delete(key)
148            except Exception:
149                pass
150
151    return success, msg

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[str]:
154def get_user_id(self, user: 'mrsm.core.User', debug: bool = False) -> Union[str, None]:
155    """
156    Return the ID for a user, or `None`.
157    """
158    username_user_id_key = self.get_user_key(user.username, 'user_id', by_username=True)
159    try:
160        user_id = self.get(username_user_id_key)
161    except Exception:
162        user_id = None
163    return user_id

Return the ID for a user, or None.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
166def edit_user(
167    self,
168    user: 'mrsm.core.User',
169    debug: bool = False,
170    **kw: Any
171) -> SuccessTuple:
172    """
173    Edit the attributes for an existing user.
174    """
175    keys_vals = self.get_user_keys_vals(user, mutable_only=True)
176    try:
177        old_keys_vals = {
178            key: self.get(key)
179            for key in keys_vals
180        }
181    except Exception as e:
182        return False, f"Failed to edit user:\n{e}"
183
184    try:
185        for key, val in keys_vals.items():
186            self.set(key, val)
187        success, msg = True, "Success"
188    except Exception as e:
189        success = False
190        msg = f"Failed to edit user:\n{e}"
191
192    if not success:
193        try:
194            for key, old_val in old_keys_vals.items():
195                self.set(key, old_val)
196        except Exception:
197            pass
198
199    return success, msg

Edit the attributes for an existing user.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[Dict[str, Any]]:
202def get_user_attributes(
203    self,
204    user: 'mrsm.core.User',
205    debug: bool = False
206) -> Union[Dict[str, Any], None]:
207    """
208    Return the user's attributes.
209    """
210    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
211    user_id_attributes_key = self.get_user_key(user_id, 'attributes')
212    try:
213        return json.loads(self.get(user_id_attributes_key))
214    except Exception:
215        return None

Return the user's attributes.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Tuple[bool, str]:
218def delete_user(
219    self,
220    user: 'mrsm.core.User',
221    debug: bool = False
222) -> SuccessTuple:
223    """
224    Delete a user's keys.
225    """
226    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
227    users_pipe = self.get_users_pipe()
228    keys_vals = self.get_user_keys_vals(user)
229    try:
230        old_keys_vals = {
231            key: self.get(key)
232            for key in keys_vals
233        }
234    except Exception as e:
235        return False, f"Failed to delete user:\n{e}"
236
237    clear_success, clear_msg = users_pipe.clear(params={'user_id': user_id})
238    if not clear_success:
239        return clear_success, clear_msg
240
241    try:
242        for key in keys_vals:
243            self.client.delete(key)
244        success, msg = True, "Success"
245    except Exception as e:
246        success = False
247        msg = f"Failed to delete user:\n{e}"
248
249    if not success:
250        try:
251            for key, old_val in old_keys_vals.items():
252                self.set(key, old_val)
253        except Exception:
254            pass
255
256    return success, msg

Delete a user's keys.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
259def get_users(
260    self,
261    debug: bool = False,
262    **kw: Any
263) -> List[str]:
264    """
265    Get the registered usernames.
266    """
267    users_pipe = self.get_users_pipe()
268    df = users_pipe.get_data()
269    if df is None:
270        return []
271
272    return list(df['username'])

Get the registered usernames.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
275def get_user_password_hash(
276    self,
277    user: 'mrsm.core.User',
278    debug: bool = False,
279    **kw: Any
280) -> Union[str, None]:
281    """
282    Return the password has for a user.
283    """
284    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
285    user_id_password_hash_key = self.get_user_key(user_id, 'password_hash')
286    try:
287        return self.get(user_id_password_hash_key)
288    except Exception:
289        return None

Return the password has for a user.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
292def get_user_type(
293    self,
294    user: 'mrsm.core.User',
295    debug: bool = False,
296    **kw: Any
297) -> Union[str, None]:
298    """
299    Return the user's type.
300    """
301    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
302    user_id_type_key = self.get_user_key(user_id, 'type')
303    try:
304        return self.get(user_id_type_key)
305    except Exception:
306        return None

Return the user's type.

def get_plugins_pipe(self) -> meerschaum.Pipe:
20def get_plugins_pipe(self) -> mrsm.Pipe:
21    """
22    Return the pipe to store the plugins.
23    """
24    return mrsm.Pipe(
25        'mrsm', 'plugins',
26        columns=['plugin_name'],
27        temporary=True,
28        target=PLUGINS_TABLE,
29        instance=self,
30    )

Return the pipe to store the plugins.

@classmethod
def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str:
33@classmethod
34def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str:
35    """
36    Return the key for a plugin's attribute.
37    """
38    return cls.get_entity_key(PLUGIN_PREFIX, plugin_name, sub_key)

Return the key for a plugin's attribute.

@classmethod
def get_plugin_keys_vals( cls, plugin: meerschaum.Plugin, mutable_only: bool = False) -> Dict[str, str]:
41@classmethod
42def get_plugin_keys_vals(
43    cls,
44    plugin: 'mrsm.core.Plugin',
45    mutable_only: bool = False,
46) -> Dict[str, str]:
47    """
48    Return a dictionary containing keys and values to set for the plugin.
49
50    Parameters
51    ----------
52    plugin: mrsm.core.Plugin
53        The plugin for which to generate the keys.
54
55    mutable_only: bool, default False
56        If `True`, only return keys which may be edited.
57
58    Returns
59    -------
60    A dictionary mapping a plugins's keys to values.
61    """
62    plugin_attributes_str = json.dumps(plugin.attributes, separators=(',', ':'))
63    mutable_keys_vals = {
64        cls.get_plugin_key(plugin.name, 'attributes'): plugin_attributes_str,
65        cls.get_plugin_key(plugin.name, 'version'): plugin.version,
66    }
67    if mutable_only:
68        return mutable_keys_vals
69
70    immutable_keys_vals = {
71        cls.get_plugin_key(plugin.name, 'user_id'): plugin.user_id,
72    }
73
74    return {**immutable_keys_vals, **mutable_keys_vals}

Return a dictionary containing keys and values to set for the plugin.

Parameters
  • plugin (mrsm.core.Plugin): The plugin for which to generate the keys.
  • mutable_only (bool, default False): If True, only return keys which may be edited.
Returns
  • A dictionary mapping a plugins's keys to values.
def register_plugin( self, plugin: meerschaum.Plugin, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 77def register_plugin(
 78    self,
 79    plugin: 'mrsm.core.Plugin',
 80    force: bool = False,
 81    debug: bool = False,
 82    **kw: Any
 83) -> SuccessTuple:
 84    """Register a new plugin to the `mrsm_plugins` "table"."""
 85    from meerschaum.utils.misc import generate_password
 86
 87    plugins_pipe = self.get_plugins_pipe()
 88    keys_vals = self.get_plugin_keys_vals(plugin)
 89
 90    try:
 91        sync_success, sync_msg = plugins_pipe.sync(
 92            [
 93                {
 94                    'plugin_name': plugin.name,
 95                    'user_id': plugin.user_id,
 96                },
 97            ],
 98            check_existing=False,
 99            debug=debug,
100        )
101        if not sync_success:
102            return sync_success, sync_msg
103
104        for key, val in keys_vals.items():
105            if val is not None:
106                self.set(key, val)
107
108        success, msg = True, "Success"
109    except Exception as e:
110        success = False
111        msg = f"Failed to register plugin '{plugin.name}':\n{e}"
112
113    if not success:
114        for key in keys_vals:
115            try:
116                self.client.delete(key)
117            except Exception:
118                pass
119
120    return success, msg

Register a new plugin to the mrsm_plugins "table".

def get_plugin_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
123def get_plugin_id(
124    self,
125    plugin: 'mrsm.core.Plugin',
126    debug: bool = False
127) -> Union[str, None]:
128    """
129    Return a plugin's ID.
130    """
131    return plugin.name

Return a plugin's ID.

def get_plugin_version( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
134def get_plugin_version(
135    self,
136    plugin: 'mrsm.core.Plugin',
137    debug: bool = False,
138) -> Union[str, None]:
139    """
140    Return a plugin's version.
141    """
142    version_key = self.get_plugin_key(plugin.name, 'version')
143
144    try:
145        return self.get(version_key)
146    except Exception:
147        return None

Return a plugin's version.

def get_plugin_user_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
150def get_plugin_user_id(
151    self,
152    plugin: 'mrsm.core.Plugin',
153    debug: bool = False
154) -> Union[str, None]:
155    """
156    Return a plugin's user ID.
157    """
158    user_id_key = self.get_plugin_key(plugin.name, 'user_id')
159
160    try:
161        return self.get(user_id_key)
162    except Exception:
163        return None

Return a plugin's user ID.

def get_plugin_username( self, plugin: meerschaum.Plugin, debug: bool = False) -> str:
166def get_plugin_username(
167    self,
168    plugin: 'mrsm.core.Plugin',
169    debug: bool = False
170) -> Union[str]:
171    """
172    Return the username of a plugin's owner.
173    """
174    user_id = self.get_plugin_user_id(plugin, debug=debug)
175    if user_id is None:
176        return None
177
178    username_key = self.get_user_key(user_id, 'username')
179    try:
180        return self.get(username_key)
181    except Exception:
182        return None

Return the username of a plugin's owner.

def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
185def get_plugin_attributes(
186    self,
187    plugin: 'mrsm.core.Plugin',
188    debug: bool = False
189) -> Dict[str, Any]:
190    """
191    Return the attributes of a plugin.
192    """
193    attributes_key = self.get_plugin_key(plugin.name, 'attributes')
194    try:
195        attributes_str = self.get(attributes_key)
196        if not attributes_str:
197            return {}
198        return json.loads(attributes_str)
199    except Exception:
200        return {}

Return the attributes of a plugin.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) -> List[str]:
203def get_plugins(
204    self,
205    user_id: Optional[int] = None,
206    search_term: Optional[str] = None,
207    debug: bool = False,
208    **kw: Any
209) -> List[str]:
210    """
211    Return a list of plugin names.
212    """
213    plugins_pipe = self.get_plugins_pipe()
214    params = {}
215    if user_id:
216        params['user_id'] = user_id
217
218    df = plugins_pipe.get_data(['plugin_name'], params=params, debug=debug)
219    docs = df.to_dict(orient='records')
220
221    return [
222        doc['plugin_name']
223        for doc in docs
224        if (plugin_name := doc['plugin_name']).startswith(search_term or '')
225    ]

Return a list of plugin names.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
228def delete_plugin(
229    self,
230    plugin: 'mrsm.core.Plugin',
231    debug: bool = False,
232    **kw: Any
233) -> SuccessTuple:
234    """
235    Delete a plugin from the plugins table.
236    """
237    plugins_pipe = self.get_plugins_pipe()
238    clear_success, clear_msg = plugins_pipe.clear(params={'plugin_name': plugin.name}, debug=debug)
239    if not clear_success:
240        return clear_success, clear_msg
241
242    keys_vals = self.get_plugin_keys_vals(plugin)
243    try:
244        old_keys_vals = {
245            key: self.get(key)
246            for key in keys_vals
247        }
248    except Exception as e:
249        return False, f"Failed to delete plugin '{plugin.name}':\n{e}"
250
251    try:
252        for key in keys_vals:
253            self.client.delete(key)
254        success, msg = True, "Success"
255    except Exception as e:
256        success = False
257        msg = f"Failed to delete plugin '{plugin.name}':\n{e}"
258
259    if not success:
260        try:
261            for key, old_val in old_keys_vals.items():
262                self.set(key, old_val)
263        except Exception:
264            pass
265
266    return success, msg

Delete a plugin from the plugins table.