meerschaum.connectors.valkey

Import the ValkeyConnector.

 1#! /usr/bin/env python3
 2# vim:fenc=utf-8
 3
 4"""
 5Import the `ValkeyConnector`.
 6"""
 7
 8from meerschaum.connectors.valkey._ValkeyConnector import ValkeyConnector
 9
10__all__ = ('ValkeyConnector',)
@make_connector
class ValkeyConnector(meerschaum.connectors._Connector.Connector):
 19@make_connector
 20class ValkeyConnector(Connector):
 21    """
 22    Manage a Valkey instance.
 23
 24    Build a `ValkeyConnector` from connection attributes or a URI string.
 25    """
 26    IS_INSTANCE: bool = True
 27    REQUIRED_ATTRIBUTES: List[str] = ['host']
 28    OPTIONAL_ATTRIBUTES: List[str] = [
 29        'port', 'username', 'password', 'db', 'socket_timeout',
 30    ]
 31    DEFAULT_ATTRIBUTES: Dict[str, Any] = {
 32        'username': 'default',
 33        'port': 6379,
 34        'db': 0,
 35        'socket_timeout': 300,
 36    }
 37    KEY_SEPARATOR: str = ':'
 38
 39    from ._pipes import (
 40        register_pipe,
 41        get_pipe_id,
 42        get_pipe_attributes,
 43        edit_pipe,
 44        pipe_exists,
 45        drop_pipe,
 46        delete_pipe,
 47        get_pipe_data,
 48        sync_pipe,
 49        get_pipe_columns_types,
 50        clear_pipe,
 51        get_sync_time,
 52        get_pipe_rowcount,
 53        fetch_pipes_keys,
 54    )
 55    from ._fetch import (
 56        fetch,
 57    )
 58
 59    from ._users import (
 60        get_users_pipe,
 61        get_user_key,
 62        get_user_keys_vals,
 63        register_user,
 64        get_user_id,
 65        edit_user,
 66        get_user_attributes,
 67        delete_user,
 68        get_users,
 69        get_user_password_hash,
 70        get_user_type,
 71    )
 72    from ._plugins import (
 73        get_plugins_pipe,
 74        get_plugin_key,
 75        get_plugin_keys_vals,
 76        register_plugin,
 77        get_plugin_id,
 78        get_plugin_version,
 79        get_plugin_user_id,
 80        get_plugin_username,
 81        get_plugin_attributes,
 82        get_plugins,
 83        delete_plugin,
 84    )
 85
 86    @property
 87    def client(self):
 88        """
 89        Return the Valkey client.
 90        """
 91        if '_client' in self.__dict__:
 92            return self.__dict__['_client']
 93
 94        valkey = mrsm.attempt_import('valkey')
 95
 96        if 'uri' in self.__dict__:
 97            self._client = valkey.Valkey.from_url(self.__dict__.get('uri'))
 98            return self._client
 99
100        optional_kwargs = {
101            key: self.__dict__.get(key)
102            for key in self.OPTIONAL_ATTRIBUTES
103            if key in self.__dict__
104        }
105        connection_kwargs = {
106            'host': self.host,
107            **optional_kwargs
108        }
109
110        self._client = valkey.Valkey(**connection_kwargs)
111        return self._client
112
113    @property
114    def URI(self) -> str:
115        """
116        Return the connection URI for this connector.
117        """
118        import urllib.parse
119
120        if 'uri' in self.__dict__:
121            return self.__dict__.get('uri')
122
123        uri = "valkey://"
124        if 'username' in self.__dict__:
125            uri += urllib.parse.quote_plus(self.username) + ':'
126
127        if 'password' in self.__dict__:
128            uri += urllib.parse.quote_plus(self.password) + '@'
129
130        if 'host' in self.__dict__:
131            uri += self.host
132
133        if 'port' in self.__dict__:
134            uri += f':{self.port}'
135
136        if 'db' in self.__dict__:
137            uri += f"/{self.db}"
138
139        if 'socket_timeout' in self.__dict__:
140            uri += f"?timeout={self.socket_timeout}s"
141
142        return uri
143
144    def set(self, key: str, value: Any, **kwargs: Any) -> None:
145        """
146        Set the `key` to `value`.
147        """
148        return self.client.set(key, value, **kwargs)
149
150    def get(self, key: str) -> Union[str, None]:
151        """
152        Get the value for `key`.
153        """
154        val = self.client.get(key)
155        if val is None:
156            return None
157
158        return val.decode('utf-8')
159
160    def test_connection(self) -> bool:
161        """
162        Return whether a connection may be established.
163        """
164        return self.client.ping()
165
166    @classmethod
167    def quote_table(cls, table: str) -> str:
168        """
169        Return a quoted key.
170        """
171        return shlex.quote(table)
172
173    @classmethod
174    def get_counter_key(cls, table: str) -> str:
175        """
176        Return the counter key for a given table.
177        """
178        table_name = cls.quote_table(table)
179        return f"{table_name}:counter"
180
181    def push_df(
182        self,
183        df: 'pd.DataFrame',
184        table: str,
185        datetime_column: Optional[str] = None,
186        debug: bool = False,
187    ) -> int:
188        """
189        Append a pandas DataFrame to a table.
190
191        Parameters
192        ----------
193        df: pd.DataFrame
194            The pandas DataFrame to append to the table.
195
196        table: str
197            The "table" name (root key).
198
199        datetime_column: Optional[str], default None
200            If provided, use this key as the datetime index.
201
202        Returns
203        -------
204        The current index counter value (how many docs have been pushed).
205        """
206        from meerschaum.utils.dataframe import to_json
207        docs_str = to_json(df)
208        docs = json.loads(docs_str)
209        return self.push_docs(
210            docs,
211            table,
212            datetime_column=datetime_column,
213            debug=debug,
214        )
215
216    def push_docs(
217        self,
218        docs: List[Dict[str, Any]],
219        table: str,
220        datetime_column: Optional[str] = None,
221        debug: bool = False,
222    ) -> int:
223        """
224        Append a list of documents to a table.
225
226        Parameters
227        ----------
228        docs: List[Dict[str, Any]]
229            The docs to be pushed.
230            All keys and values will be coerced into strings.
231
232        table: str
233            The "table" name (root key).
234
235        datetime_column: Optional[str], default None
236            If set, create a sorted set with this datetime column as the index.
237            Otherwise push the docs to a list.
238
239        Returns
240        -------
241        The current index counter value (how many docs have been pushed).
242        """
243        from meerschaum.utils.dtypes import json_serialize_value
244        table_name = self.quote_table(table)
245        datetime_column_key = self.get_datetime_column_key(table)
246        remote_datetime_column = self.get(datetime_column_key)
247        datetime_column = datetime_column or remote_datetime_column
248        dateutil_parser = mrsm.attempt_import('dateutil.parser')
249
250        old_len = (
251            self.client.zcard(table_name)
252            if datetime_column
253            else self.client.scard(table_name)
254        )
255        for doc in docs:
256            original_dt_val = (
257                doc[datetime_column]
258                if datetime_column and datetime_column in doc
259                else 0
260            )
261            dt_val = (
262                dateutil_parser.parse(str(original_dt_val))
263                if not isinstance(original_dt_val, int)
264                else int(original_dt_val)
265            ) if datetime_column else None
266            ts = (
267                int(dt_val.replace(tzinfo=timezone.utc).timestamp())
268                if isinstance(dt_val, datetime)
269                else int(dt_val)
270            ) if datetime_column else None
271            doc_str = json.dumps(
272                doc,
273                default=json_serialize_value,
274                separators=(',', ':'),
275                sort_keys=True,
276            )
277            if datetime_column:
278                self.client.zadd(table_name, {doc_str: ts})
279            else:
280                self.client.sadd(table_name, doc_str)
281
282        if datetime_column:
283            self.set(datetime_column_key, datetime_column)
284        new_len = (
285            self.client.zcard(table_name)
286            if datetime_column
287            else self.client.scard(table_name)
288        )
289
290        return new_len - old_len
291
292    def _push_hash_docs_to_list(self, docs: List[Dict[str, Any]], table: str) -> int:
293        table_name = self.quote_table(table)
294        next_ix = max(self.client.llen(table_name) or 0, 1)
295        for i, doc in enumerate(docs):
296            doc_key = f"{table_name}:{next_ix + i}"
297            self.client.hset(
298                doc_key,
299                mapping={
300                    str(k): str(v)
301                    for k, v in doc.items()
302                },
303            )
304            self.client.rpush(table_name, doc_key)
305
306        return next_ix + len(docs)
307
308    def get_datetime_column_key(self, table: str) -> str:
309        """
310        Return the key to store the datetime index for `table`.
311        """
312        table_name = self.quote_table(table)
313        return f'{table_name}:datetime_column'
314
315    def read(
316        self,
317        table: str,
318        begin: Union[datetime, int, str, None] = None,
319        end: Union[datetime, int, str, None] = None,
320        params: Optional[Dict[str, Any]] = None,
321        datetime_column: Optional[str] = None,
322        select_columns: Optional[List[str]] = None,
323        omit_columns: Optional[List[str]] = None,
324        debug: bool = False
325    ) -> Union['pd.DataFrame', None]:
326        """
327        Query the table and return the result dataframe.
328
329        Parameters
330        ----------
331        table: str
332            The "table" name to be queried.
333
334        begin: Union[datetime, int, str, None], default None
335            If provided, only return rows greater than or equal to this datetime.
336
337        end: Union[datetime, int, str, None], default None
338            If provided, only return rows older than this datetime.
339
340        params: Optional[Dict[str, Any]]
341            Additional Meerschaum filter parameters.
342
343        datetime_column: Optional[str], default None
344            If provided, use this column for the datetime index.
345            Otherwise infer from the table metadata.
346
347        select_columns: Optional[List[str]], default None
348            If provided, only return these columns.
349
350        omit_columns: Optional[List[str]], default None
351            If provided, do not include these columns in the result.
352
353        Returns
354        -------
355        A Pandas DataFrame of the result, or `None`.
356        """
357        from meerschaum.utils.dataframe import parse_df_datetimes, query_df
358        docs = self.read_docs(
359            table,
360            begin=begin,
361            end=end,
362            debug=debug,
363        )
364        df = parse_df_datetimes(docs)
365        datetime_column_key = self.get_datetime_column_key(table)
366        datetime_column = datetime_column or self.get(datetime_column_key)
367
368        return query_df(
369            df,
370            begin=(begin if datetime_column is not None else None),
371            end=(end if datetime_column is not None else None),
372            params=params,
373            datetime_column=datetime_column,
374            select_columns=select_columns,
375            omit_columns=omit_columns,
376            inplace=True,
377            reset_index=True,
378            debug=debug,
379        )
380
381    def read_docs(
382        self,
383        table: str,
384        begin: Union[datetime, int, str, None] = None,
385        end: Union[datetime, int, str, None] = None,
386        debug: bool = False,
387    ) -> List[Dict[str, str]]:
388        """
389        Return a list of previously pushed docs.
390
391        Parameters
392        ----------
393        table: str
394            The "table" name (root key) under which the docs were pushed.
395
396        begin: Union[datetime, int, str, None], default None
397            If provided and the table was created with a datetime index, only return documents
398            newer than this datetime.
399            If the table was not created with a datetime index and `begin` is an `int`,
400            return documents with a positional index greater than or equal to this value.
401
402        end: Union[datetime, int, str, None], default None
403            If provided and the table was created with a datetime index, only return documents
404            older than this datetime.
405            If the table was not created with a datetime index and `begin` is an `int`,
406            return documents with a positional index less than this value.
407
408        Returns
409        -------
410        A list of dictionaries, where all keys and values are strings.
411        """
412        from meerschaum.utils.dtypes import coerce_timezone
413        table_name = self.quote_table(table)
414        datetime_column_key = self.get_datetime_column_key(table)
415        datetime_column = self.get(datetime_column_key)
416
417        if debug:
418            dprint(f"Reading documents from '{table}' with {begin=}, {end=}")
419
420        if not datetime_column:
421            return [
422                json.loads(doc_bytes.decode('utf-8'))
423                for doc_bytes in self.client.smembers(table_name)
424            ]
425
426        dateutil_parser = mrsm.attempt_import('dateutil.parser')
427
428        if isinstance(begin, str):
429            begin = coerce_timezone(dateutil_parser.parse(begin))
430
431        if isinstance(end, str):
432            end = coerce_timezone(dateutil_parser.parse(end))
433
434        begin_ts = (
435            (
436                int(begin.replace(tzinfo=timezone.utc).timestamp())
437                if isinstance(begin, datetime)
438                else int(begin)
439            )
440            if begin is not None else '-inf'
441        )
442        end_ts = (
443            (
444                int(end.replace(tzinfo=timezone.utc).timestamp())
445                if isinstance(end, datetime)
446                else int(end)
447            )
448            if end is not None else '+inf'
449        )
450
451        if debug:
452            dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
453
454        return [
455            json.loads(doc_bytes.decode('utf-8'))
456            for doc_bytes in self.client.zrangebyscore(
457                table_name,
458                begin_ts,
459                end_ts,
460                withscores=False,
461            )
462        ]
463
464    def _read_docs_from_list(
465        self,
466        table: str,
467        begin_ix: Optional[int] = 0,
468        end_ix: Optional[int] = -1,
469        debug: bool = False,
470    ):
471        """
472        Read a list of documents from a "table".
473
474        Parameters
475        ----------
476        table: str
477            The "table" (root key) from which to read docs.
478
479        begin_ix: Optional[int], default 0
480            If provided, only read documents from this starting index.
481
482        end_ix: Optional[int], default -1
483            If provided, only read documents up to (not including) this index.
484
485        Returns
486        -------
487        A list of documents.
488        """
489        if begin_ix is None:
490            begin_ix = 0
491
492        if end_ix == 0:
493            return
494
495        if end_ix is None:
496            end_ix = -1
497        else:
498            end_ix -= 1
499
500        table_name = self.quote_table(table)
501        doc_keys = self.client.lrange(table_name, begin_ix, end_ix)
502        for doc_key in doc_keys:
503            yield {
504                key.decode('utf-8'): value.decode('utf-8')
505                for key, value in self.client.hgetall(doc_key).items()
506            }
507
508    def drop_table(self, table: str, debug: bool = False) -> None:
509        """
510        Drop a "table" of documents.
511
512        Parameters
513        ----------
514        table: str
515            The "table" name (root key) to be deleted.
516        """
517        table_name = self.quote_table(table)
518        datetime_column_key = self.get_datetime_column_key(table)
519        self.client.delete(table_name)
520        self.client.delete(datetime_column_key)
521
522    @classmethod
523    def get_entity_key(cls, *keys: Any) -> str:
524        """
525        Return a joined key to set an entity.
526        """
527        if not keys:
528            raise ValueError("No keys to be joined.")
529
530        for key in keys:
531            if cls.KEY_SEPARATOR in str(key):
532                raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.")
533
534        return cls.KEY_SEPARATOR.join([str(key) for key in keys])

Manage a Valkey instance.

Build a ValkeyConnector from connection attributes or a URI string.

IS_INSTANCE: bool = True
REQUIRED_ATTRIBUTES: List[str] = ['host']
OPTIONAL_ATTRIBUTES: List[str] = ['port', 'username', 'password', 'db', 'socket_timeout']
DEFAULT_ATTRIBUTES: Dict[str, Any] = {'username': 'default', 'port': 6379, 'db': 0, 'socket_timeout': 300}
KEY_SEPARATOR: str = ':'
client
 86    @property
 87    def client(self):
 88        """
 89        Return the Valkey client.
 90        """
 91        if '_client' in self.__dict__:
 92            return self.__dict__['_client']
 93
 94        valkey = mrsm.attempt_import('valkey')
 95
 96        if 'uri' in self.__dict__:
 97            self._client = valkey.Valkey.from_url(self.__dict__.get('uri'))
 98            return self._client
 99
100        optional_kwargs = {
101            key: self.__dict__.get(key)
102            for key in self.OPTIONAL_ATTRIBUTES
103            if key in self.__dict__
104        }
105        connection_kwargs = {
106            'host': self.host,
107            **optional_kwargs
108        }
109
110        self._client = valkey.Valkey(**connection_kwargs)
111        return self._client

Return the Valkey client.

URI: str
113    @property
114    def URI(self) -> str:
115        """
116        Return the connection URI for this connector.
117        """
118        import urllib.parse
119
120        if 'uri' in self.__dict__:
121            return self.__dict__.get('uri')
122
123        uri = "valkey://"
124        if 'username' in self.__dict__:
125            uri += urllib.parse.quote_plus(self.username) + ':'
126
127        if 'password' in self.__dict__:
128            uri += urllib.parse.quote_plus(self.password) + '@'
129
130        if 'host' in self.__dict__:
131            uri += self.host
132
133        if 'port' in self.__dict__:
134            uri += f':{self.port}'
135
136        if 'db' in self.__dict__:
137            uri += f"/{self.db}"
138
139        if 'socket_timeout' in self.__dict__:
140            uri += f"?timeout={self.socket_timeout}s"
141
142        return uri

Return the connection URI for this connector.

def set(self, key: str, value: Any, **kwargs: Any) -> None:
144    def set(self, key: str, value: Any, **kwargs: Any) -> None:
145        """
146        Set the `key` to `value`.
147        """
148        return self.client.set(key, value, **kwargs)

Set the key to value.

def get(self, key: str) -> Optional[str]:
150    def get(self, key: str) -> Union[str, None]:
151        """
152        Get the value for `key`.
153        """
154        val = self.client.get(key)
155        if val is None:
156            return None
157
158        return val.decode('utf-8')

Get the value for key.

def test_connection(self) -> bool:
160    def test_connection(self) -> bool:
161        """
162        Return whether a connection may be established.
163        """
164        return self.client.ping()

Return whether a connection may be established.

@classmethod
def quote_table(cls, table: str) -> str:
166    @classmethod
167    def quote_table(cls, table: str) -> str:
168        """
169        Return a quoted key.
170        """
171        return shlex.quote(table)

Return a quoted key.

@classmethod
def get_counter_key(cls, table: str) -> str:
173    @classmethod
174    def get_counter_key(cls, table: str) -> str:
175        """
176        Return the counter key for a given table.
177        """
178        table_name = cls.quote_table(table)
179        return f"{table_name}:counter"

Return the counter key for a given table.

def push_df( self, df: 'pd.DataFrame', table: str, datetime_column: Optional[str] = None, debug: bool = False) -> int:
181    def push_df(
182        self,
183        df: 'pd.DataFrame',
184        table: str,
185        datetime_column: Optional[str] = None,
186        debug: bool = False,
187    ) -> int:
188        """
189        Append a pandas DataFrame to a table.
190
191        Parameters
192        ----------
193        df: pd.DataFrame
194            The pandas DataFrame to append to the table.
195
196        table: str
197            The "table" name (root key).
198
199        datetime_column: Optional[str], default None
200            If provided, use this key as the datetime index.
201
202        Returns
203        -------
204        The current index counter value (how many docs have been pushed).
205        """
206        from meerschaum.utils.dataframe import to_json
207        docs_str = to_json(df)
208        docs = json.loads(docs_str)
209        return self.push_docs(
210            docs,
211            table,
212            datetime_column=datetime_column,
213            debug=debug,
214        )

Append a pandas DataFrame to a table.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to append to the table.
  • table (str): The "table" name (root key).
  • datetime_column (Optional[str], default None): If provided, use this key as the datetime index.
Returns
  • The current index counter value (how many docs have been pushed).
def push_docs( self, docs: List[Dict[str, Any]], table: str, datetime_column: Optional[str] = None, debug: bool = False) -> int:
216    def push_docs(
217        self,
218        docs: List[Dict[str, Any]],
219        table: str,
220        datetime_column: Optional[str] = None,
221        debug: bool = False,
222    ) -> int:
223        """
224        Append a list of documents to a table.
225
226        Parameters
227        ----------
228        docs: List[Dict[str, Any]]
229            The docs to be pushed.
230            All keys and values will be coerced into strings.
231
232        table: str
233            The "table" name (root key).
234
235        datetime_column: Optional[str], default None
236            If set, create a sorted set with this datetime column as the index.
237            Otherwise push the docs to a list.
238
239        Returns
240        -------
241        The current index counter value (how many docs have been pushed).
242        """
243        from meerschaum.utils.dtypes import json_serialize_value
244        table_name = self.quote_table(table)
245        datetime_column_key = self.get_datetime_column_key(table)
246        remote_datetime_column = self.get(datetime_column_key)
247        datetime_column = datetime_column or remote_datetime_column
248        dateutil_parser = mrsm.attempt_import('dateutil.parser')
249
250        old_len = (
251            self.client.zcard(table_name)
252            if datetime_column
253            else self.client.scard(table_name)
254        )
255        for doc in docs:
256            original_dt_val = (
257                doc[datetime_column]
258                if datetime_column and datetime_column in doc
259                else 0
260            )
261            dt_val = (
262                dateutil_parser.parse(str(original_dt_val))
263                if not isinstance(original_dt_val, int)
264                else int(original_dt_val)
265            ) if datetime_column else None
266            ts = (
267                int(dt_val.replace(tzinfo=timezone.utc).timestamp())
268                if isinstance(dt_val, datetime)
269                else int(dt_val)
270            ) if datetime_column else None
271            doc_str = json.dumps(
272                doc,
273                default=json_serialize_value,
274                separators=(',', ':'),
275                sort_keys=True,
276            )
277            if datetime_column:
278                self.client.zadd(table_name, {doc_str: ts})
279            else:
280                self.client.sadd(table_name, doc_str)
281
282        if datetime_column:
283            self.set(datetime_column_key, datetime_column)
284        new_len = (
285            self.client.zcard(table_name)
286            if datetime_column
287            else self.client.scard(table_name)
288        )
289
290        return new_len - old_len

Append a list of documents to a table.

Parameters
  • docs (List[Dict[str, Any]]): The docs to be pushed. All keys and values will be coerced into strings.
  • table (str): The "table" name (root key).
  • datetime_column (Optional[str], default None): If set, create a sorted set with this datetime column as the index. Otherwise push the docs to a list.
Returns
  • The current index counter value (how many docs have been pushed).
def get_datetime_column_key(self, table: str) -> str:
308    def get_datetime_column_key(self, table: str) -> str:
309        """
310        Return the key to store the datetime index for `table`.
311        """
312        table_name = self.quote_table(table)
313        return f'{table_name}:datetime_column'

Return the key to store the datetime index for table.

def read( self, table: str, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, debug: bool = False) -> Optional[ForwardRef('pd.DataFrame')]:
315    def read(
316        self,
317        table: str,
318        begin: Union[datetime, int, str, None] = None,
319        end: Union[datetime, int, str, None] = None,
320        params: Optional[Dict[str, Any]] = None,
321        datetime_column: Optional[str] = None,
322        select_columns: Optional[List[str]] = None,
323        omit_columns: Optional[List[str]] = None,
324        debug: bool = False
325    ) -> Union['pd.DataFrame', None]:
326        """
327        Query the table and return the result dataframe.
328
329        Parameters
330        ----------
331        table: str
332            The "table" name to be queried.
333
334        begin: Union[datetime, int, str, None], default None
335            If provided, only return rows greater than or equal to this datetime.
336
337        end: Union[datetime, int, str, None], default None
338            If provided, only return rows older than this datetime.
339
340        params: Optional[Dict[str, Any]]
341            Additional Meerschaum filter parameters.
342
343        datetime_column: Optional[str], default None
344            If provided, use this column for the datetime index.
345            Otherwise infer from the table metadata.
346
347        select_columns: Optional[List[str]], default None
348            If provided, only return these columns.
349
350        omit_columns: Optional[List[str]], default None
351            If provided, do not include these columns in the result.
352
353        Returns
354        -------
355        A Pandas DataFrame of the result, or `None`.
356        """
357        from meerschaum.utils.dataframe import parse_df_datetimes, query_df
358        docs = self.read_docs(
359            table,
360            begin=begin,
361            end=end,
362            debug=debug,
363        )
364        df = parse_df_datetimes(docs)
365        datetime_column_key = self.get_datetime_column_key(table)
366        datetime_column = datetime_column or self.get(datetime_column_key)
367
368        return query_df(
369            df,
370            begin=(begin if datetime_column is not None else None),
371            end=(end if datetime_column is not None else None),
372            params=params,
373            datetime_column=datetime_column,
374            select_columns=select_columns,
375            omit_columns=omit_columns,
376            inplace=True,
377            reset_index=True,
378            debug=debug,
379        )

Query the table and return the result dataframe.

Parameters
  • table (str): The "table" name to be queried.
  • begin (Union[datetime, int, str, None], default None): If provided, only return rows greater than or equal to this datetime.
  • end (Union[datetime, int, str, None], default None): If provided, only return rows older than this datetime.
  • params (Optional[Dict[str, Any]]): Additional Meerschaum filter parameters.
  • datetime_column (Optional[str], default None): If provided, use this column for the datetime index. Otherwise infer from the table metadata.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
Returns
  • A Pandas DataFrame of the result, or None.
def read_docs( self, table: str, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, debug: bool = False) -> List[Dict[str, str]]:
381    def read_docs(
382        self,
383        table: str,
384        begin: Union[datetime, int, str, None] = None,
385        end: Union[datetime, int, str, None] = None,
386        debug: bool = False,
387    ) -> List[Dict[str, str]]:
388        """
389        Return a list of previously pushed docs.
390
391        Parameters
392        ----------
393        table: str
394            The "table" name (root key) under which the docs were pushed.
395
396        begin: Union[datetime, int, str, None], default None
397            If provided and the table was created with a datetime index, only return documents
398            newer than this datetime.
399            If the table was not created with a datetime index and `begin` is an `int`,
400            return documents with a positional index greater than or equal to this value.
401
402        end: Union[datetime, int, str, None], default None
403            If provided and the table was created with a datetime index, only return documents
404            older than this datetime.
405            If the table was not created with a datetime index and `begin` is an `int`,
406            return documents with a positional index less than this value.
407
408        Returns
409        -------
410        A list of dictionaries, where all keys and values are strings.
411        """
412        from meerschaum.utils.dtypes import coerce_timezone
413        table_name = self.quote_table(table)
414        datetime_column_key = self.get_datetime_column_key(table)
415        datetime_column = self.get(datetime_column_key)
416
417        if debug:
418            dprint(f"Reading documents from '{table}' with {begin=}, {end=}")
419
420        if not datetime_column:
421            return [
422                json.loads(doc_bytes.decode('utf-8'))
423                for doc_bytes in self.client.smembers(table_name)
424            ]
425
426        dateutil_parser = mrsm.attempt_import('dateutil.parser')
427
428        if isinstance(begin, str):
429            begin = coerce_timezone(dateutil_parser.parse(begin))
430
431        if isinstance(end, str):
432            end = coerce_timezone(dateutil_parser.parse(end))
433
434        begin_ts = (
435            (
436                int(begin.replace(tzinfo=timezone.utc).timestamp())
437                if isinstance(begin, datetime)
438                else int(begin)
439            )
440            if begin is not None else '-inf'
441        )
442        end_ts = (
443            (
444                int(end.replace(tzinfo=timezone.utc).timestamp())
445                if isinstance(end, datetime)
446                else int(end)
447            )
448            if end is not None else '+inf'
449        )
450
451        if debug:
452            dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
453
454        return [
455            json.loads(doc_bytes.decode('utf-8'))
456            for doc_bytes in self.client.zrangebyscore(
457                table_name,
458                begin_ts,
459                end_ts,
460                withscores=False,
461            )
462        ]

Return a list of previously pushed docs.

Parameters
  • table (str): The "table" name (root key) under which the docs were pushed.
  • begin (Union[datetime, int, str, None], default None): If provided and the table was created with a datetime index, only return documents newer than this datetime. If the table was not created with a datetime index and begin is an int, return documents with a positional index greater than or equal to this value.
  • end (Union[datetime, int, str, None], default None): If provided and the table was created with a datetime index, only return documents older than this datetime. If the table was not created with a datetime index and begin is an int, return documents with a positional index less than this value.
Returns
  • A list of dictionaries, where all keys and values are strings.
def drop_table(self, table: str, debug: bool = False) -> None:
508    def drop_table(self, table: str, debug: bool = False) -> None:
509        """
510        Drop a "table" of documents.
511
512        Parameters
513        ----------
514        table: str
515            The "table" name (root key) to be deleted.
516        """
517        table_name = self.quote_table(table)
518        datetime_column_key = self.get_datetime_column_key(table)
519        self.client.delete(table_name)
520        self.client.delete(datetime_column_key)

Drop a "table" of documents.

Parameters
  • table (str): The "table" name (root key) to be deleted.
@classmethod
def get_entity_key(cls, *keys: Any) -> str:
522    @classmethod
523    def get_entity_key(cls, *keys: Any) -> str:
524        """
525        Return a joined key to set an entity.
526        """
527        if not keys:
528            raise ValueError("No keys to be joined.")
529
530        for key in keys:
531            if cls.KEY_SEPARATOR in str(key):
532                raise ValueError(f"Key cannot contain separator '{cls.KEY_SEPARATOR}'.")
533
534        return cls.KEY_SEPARATOR.join([str(key) for key in keys])

Return a joined key to set an entity.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
137def register_pipe(
138    self,
139    pipe: mrsm.Pipe,
140    debug: bool = False,
141    **kwargs: Any
142) -> SuccessTuple:
143    """
144    Insert the pipe's attributes into the internal `pipes` table.
145
146    Parameters
147    ----------
148    pipe: mrsm.Pipe
149        The pipe to be registered.
150
151    Returns
152    -------
153    A `SuccessTuple` of the result.
154    """
155    attributes = {
156        'connector_keys': str(pipe.connector_keys),
157        'metric_key': str(pipe.metric_key),
158        'location_key': str(pipe.location_key),
159    }
160    parameters_str = json.dumps(
161        pipe._attributes.get('parameters', {}),
162        separators=(',', ':'),
163    )
164
165    pipe_key = get_pipe_key(pipe)
166    parameters_key = get_pipe_parameters_key(pipe)
167
168    try:
169        existing_pipe_id = self.get(pipe_key)
170        if existing_pipe_id is not None:
171            return False, f"{pipe} is already registered."
172
173        pipe_id = self.client.incr(PIPES_COUNTER)
174        _ = self.push_docs(
175            [{'pipe_id': pipe_id, **attributes}],
176            PIPES_TABLE,
177            datetime_column='pipe_id',
178            debug=debug,
179        )
180        self.set(pipe_key, pipe_id)
181        self.set(parameters_key, parameters_str)
182
183    except Exception as e:
184        return False, f"Failed to register {pipe}:\n{e}"
185
186    return True, "Success"

Insert the pipe's attributes into the internal pipes table.

Parameters
  • pipe (mrsm.Pipe): The pipe to be registered.
Returns
  • A SuccessTuple of the result.
def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Union[str, int, NoneType]:
189def get_pipe_id(
190    self,
191    pipe: mrsm.Pipe,
192    debug: bool = False,
193    **kwargs: Any
194) -> Union[str, int, None]:
195    """
196    Return the `_id` for the pipe if it exists.
197
198    Parameters
199    ----------
200    pipe: mrsm.Pipe
201        The pipe whose `_id` to fetch.
202
203    Returns
204    -------
205    The `_id` for the pipe's document or `None`.
206    """
207    pipe_key = get_pipe_key(pipe)
208    try:
209        return int(self.get(pipe_key))
210    except Exception:
211        pass
212    return None

Return the _id for the pipe if it exists.

Parameters
  • pipe (mrsm.Pipe): The pipe whose _id to fetch.
Returns
  • The _id for the pipe's document or None.
def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Dict[str, Any]:
215def get_pipe_attributes(
216    self,
217    pipe: mrsm.Pipe,
218    debug: bool = False,
219    **kwargs: Any
220) -> Dict[str, Any]:
221    """
222    Return the pipe's document from the internal `pipes` collection.
223
224    Parameters
225    ----------
226    pipe: mrsm.Pipe
227        The pipe whose attributes should be retrieved.
228
229    Returns
230    -------
231    The document that matches the keys of the pipe.
232    """
233    pipe_id = pipe.get_id(debug=debug)
234    if pipe_id is None:
235        return {}
236
237    parameters_key = get_pipe_parameters_key(pipe)
238    parameters_str = self.get(parameters_key)
239
240    parameters = json.loads(parameters_str) if parameters_str else {}
241
242    attributes = {
243        'connector_keys': pipe.connector_keys,
244        'metric_key': pipe.metric_key,
245        'location_key': pipe.location_key,
246        'parameters': parameters,
247        'pipe_id': pipe_id,
248    }
249    return attributes

Return the pipe's document from the internal pipes collection.

Parameters
  • pipe (mrsm.Pipe): The pipe whose attributes should be retrieved.
Returns
  • The document that matches the keys of the pipe.
def edit_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
252def edit_pipe(
253    self,
254    pipe: mrsm.Pipe,
255    debug: bool = False,
256    **kwargs: Any
257) -> mrsm.SuccessTuple:
258    """
259    Edit the attributes of the pipe.
260
261    Parameters
262    ----------
263    pipe: mrsm.Pipe
264        The pipe whose in-memory parameters must be persisted.
265
266    Returns
267    -------
268    A `SuccessTuple` indicating success.
269    """
270    pipe_id = pipe.get_id(debug=debug)
271    if pipe_id is None:
272        return False, f"{pipe} is not registered."
273
274    parameters_key = get_pipe_parameters_key(pipe)
275    parameters_str = json.dumps(pipe.parameters, separators=(',', ':'))
276    self.set(parameters_key, parameters_str)
277    return True, "Success"

Edit the attributes of the pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose in-memory parameters must be persisted.
Returns
  • A SuccessTuple indicating success.
def pipe_exists( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> bool:
280def pipe_exists(
281    self,
282    pipe: mrsm.Pipe,
283    debug: bool = False,
284    **kwargs: Any
285) -> bool:
286    """
287    Check whether a pipe's target table exists.
288
289    Parameters
290    ----------
291    pipe: mrsm.Pipe
292        The pipe to check whether its table exists.
293
294    Returns
295    -------
296    A `bool` indicating the table exists.
297    """
298    table_name = self.quote_table(pipe.target)
299    return self.client.exists(table_name) != 0

Check whether a pipe's target table exists.

Parameters
  • pipe (mrsm.Pipe): The pipe to check whether its table exists.
Returns
  • A bool indicating the table exists.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
302def drop_pipe(
303    self,
304    pipe: mrsm.Pipe,
305    debug: bool = False,
306    **kwargs: Any
307) -> mrsm.SuccessTuple:
308    """
309    Drop a pipe's collection if it exists.
310
311    Parameters
312    ----------
313    pipe: mrsm.Pipe
314        The pipe to be dropped.
315
316    Returns
317    -------
318    A `SuccessTuple` indicating success.
319    """
320    for chunk_begin, chunk_end in pipe.get_chunk_bounds(debug=debug):
321        clear_chunk_success, clear_chunk_msg = pipe.clear(
322            begin=chunk_begin,
323            end=chunk_end,
324            debug=debug,
325        )
326        if not clear_chunk_success:
327            return clear_chunk_success, clear_chunk_msg
328    try:
329        self.drop_table(pipe.target, debug=debug)
330    except Exception as e:
331        return False, f"Failed to drop {pipe}:\n{e}"
332
333    if 'valkey' not in pipe.parameters:
334        return True, "Success"
335
336    pipe.parameters['valkey']['dtypes'] = {}
337    if not pipe.temporary:
338        edit_success, edit_msg = pipe.edit(debug=debug)
339        if not edit_success:
340            return edit_success, edit_msg
341
342    return True, "Success"

Drop a pipe's collection if it exists.

Parameters
  • pipe (mrsm.Pipe): The pipe to be dropped.
Returns
  • A SuccessTuple indicating success.
def delete_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
345def delete_pipe(
346    self,
347    pipe: mrsm.Pipe,
348    debug: bool = False,
349    **kwargs: Any
350) -> mrsm.SuccessTuple:
351    """
352    Delete a pipe's registration from the `pipes` collection.
353
354    Parameters
355    ----------
356    pipe: mrsm.Pipe
357        The pipe to be deleted.
358
359    Returns
360    -------
361    A `SuccessTuple` indicating success.
362    """
363    drop_success, drop_message = pipe.drop(debug=debug)
364    if not drop_success:
365        return drop_success, drop_message
366
367    pipe_id = self.get_pipe_id(pipe, debug=debug)
368    if pipe_id is None:
369        return False, f"{pipe} is not registered."
370
371    pipe_key = get_pipe_key(pipe)
372    parameters_key = get_pipe_parameters_key(pipe)
373    self.client.delete(pipe_key)
374    self.client.delete(parameters_key)
375    df = self.read(PIPES_TABLE, params={'pipe_id': pipe_id})
376    docs = df.to_dict(orient='records')
377    if docs:
378        doc = docs[0]
379        doc_str = json.dumps(
380            doc,
381            default=(lambda x: json_serialize_datetime(x) if hasattr(x, 'tzinfo') else str(x)),
382            separators=(',', ':'),
383            sort_keys=True,
384        )
385        self.client.zrem(PIPES_TABLE, doc_str)
386    return True, "Success"

Delete a pipe's registration from the pipes collection.

Parameters
  • pipe (mrsm.Pipe): The pipe to be deleted.
Returns
  • A SuccessTuple indicating success.
def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Optional[ForwardRef('pd.DataFrame')]:
389def get_pipe_data(
390    self,
391    pipe: mrsm.Pipe,
392    select_columns: Optional[List[str]] = None,
393    omit_columns: Optional[List[str]] = None,
394    begin: Union[datetime, int, None] = None,
395    end: Union[datetime, int, None] = None,
396    params: Optional[Dict[str, Any]] = None,
397    debug: bool = False,
398    **kwargs: Any
399) -> Union['pd.DataFrame', None]:
400    """
401    Query a pipe's target table and return the DataFrame.
402
403    Parameters
404    ----------
405    pipe: mrsm.Pipe
406        The pipe with the target table from which to read.
407
408    select_columns: Optional[List[str]], default None
409        If provided, only select these given columns.
410        Otherwise select all available columns (i.e. `SELECT *`).
411
412    omit_columns: Optional[List[str]], default None
413        If provided, remove these columns from the selection.
414
415    begin: Union[datetime, int, None], default None
416        The earliest `datetime` value to search from (inclusive).
417
418    end: Union[datetime, int, None], default None
419        The lastest `datetime` value to search from (exclusive).
420
421    params: Optional[Dict[str, str]], default None
422        Additional filters to apply to the query.
423
424    Returns
425    -------
426    The target table's data as a DataFrame.
427    """
428    if not pipe.exists(debug=debug):
429        return None
430
431    from meerschaum.utils.dataframe import query_df, parse_df_datetimes
432    from meerschaum.utils.dtypes import are_dtypes_equal
433
434    valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {})
435    dt_col = pipe.columns.get('datetime', None)
436    table_name = self.quote_table(pipe.target)
437    indices = [col for col in pipe.columns.values() if col]
438    ix_docs = [
439        string_to_dict(doc.get('ix', '').replace(COLON, ':'))
440        for doc in self.read_docs(
441            pipe.target,
442            begin=begin,
443            end=end,
444            debug=debug,
445        )
446    ]
447    try:
448        docs_strings = [
449            self.get(get_document_key(
450                doc, indices, table_name
451            ))
452            for doc in ix_docs
453        ]
454    except Exception as e:
455        warn(f"Failed to fetch documents for {pipe}:\n{e}")
456        docs_strings = []
457
458    docs = [
459        json.loads(doc_str)
460        for doc_str in docs_strings
461        if doc_str
462    ]
463    ignore_dt_cols = [
464        col
465        for col, dtype in pipe.dtypes.items()
466        if not are_dtypes_equal(str(dtype), 'datetime')
467    ]
468
469    df = parse_df_datetimes(
470        docs,
471        ignore_cols=ignore_dt_cols,
472        chunksize=kwargs.get('chunksize', None),
473        strip_timezone=(pipe.tzinfo is None),
474        debug=debug,
475    )
476    for col, typ in valkey_dtypes.items():
477        try:
478            df[col] = df[col].astype(typ)
479        except Exception:
480            pass
481
482    df = pipe.enforce_dtypes(df, debug=debug)
483
484    if len(df) == 0:
485        return query_df(df, select_columns=select_columns, omit_columns=omit_columns)
486
487    return query_df(
488        df,
489        select_columns=select_columns,
490        omit_columns=omit_columns,
491        params=params,
492        begin=begin,
493        end=end,
494        datetime_column=dt_col,
495        inplace=True,
496        reset_index=True,
497    )

Query a pipe's target table and return the DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe with the target table from which to read.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, None], default None): The earliest datetime value to search from (inclusive).
  • end (Union[datetime, int, None], default None): The lastest datetime value to search from (exclusive).
  • params (Optional[Dict[str, str]], default None): Additional filters to apply to the query.
Returns
  • The target table's data as a DataFrame.
def sync_pipe( self, pipe: meerschaum.Pipe, df: 'pd.DataFrame' = None, check_existing: bool = True, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
500def sync_pipe(
501    self,
502    pipe: mrsm.Pipe,
503    df: 'pd.DataFrame' = None,
504    check_existing: bool = True,
505    debug: bool = False,
506    **kwargs: Any
507) -> mrsm.SuccessTuple:
508    """
509    Upsert new documents into the pipe's collection.
510
511    Parameters
512    ----------
513    pipe: mrsm.Pipe
514        The pipe whose collection should receive the new documents.
515
516    df: Union['pd.DataFrame', Iterator['pd.DataFrame']], default None
517        The data to be synced.
518
519    check_existing: bool, default True
520        If `False`, do not check the documents against existing data and instead insert directly.
521
522    Returns
523    -------
524    A `SuccessTuple` indicating success.
525    """
526    from meerschaum.utils.dtypes import are_dtypes_equal
527    dt_col = pipe.columns.get('datetime', None)
528    indices = [col for col in pipe.columns.values() if col]
529    table_name = self.quote_table(pipe.target)
530    is_dask = 'dask' in df.__module__
531    if is_dask:
532        df = df.compute()
533    upsert = pipe.parameters.get('upsert', False)
534    static = pipe.parameters.get('static', False)
535
536    def _serialize_indices_docs(_docs):
537        return [
538            {
539                'ix': get_document_key(doc, indices),
540                **(
541                    {
542                        dt_col: doc.get(dt_col, 0)
543                    }
544                    if dt_col
545                    else {}
546                )
547            }
548            for doc in _docs
549        ]
550
551    valkey_dtypes = pipe.parameters.get('valkey', {}).get('dtypes', {})
552    new_dtypes = {
553        str(key): (
554            str(val)
555            if not are_dtypes_equal(str(val), 'datetime')
556            else 'datetime64[ns, UTC]'
557        )
558        for key, val in df.dtypes.items()
559        if str(key) not in valkey_dtypes
560    }
561    for col, typ in {c: v for c, v in valkey_dtypes.items()}.items():
562        if col in df.columns:
563            try:
564                df[col] = df[col].astype(typ)
565            except Exception:
566                valkey_dtypes[col] = 'string'
567                new_dtypes[col] = 'string'
568                df[col] = df[col].astype('string')
569
570    if new_dtypes and (not static or not valkey_dtypes):
571        valkey_dtypes.update(new_dtypes)
572        if 'valkey' not in pipe.parameters:
573            pipe.parameters['valkey'] = {}
574        pipe.parameters['valkey']['dtypes'] = valkey_dtypes
575        if not pipe.temporary:
576            edit_success, edit_msg = pipe.edit(debug=debug)
577            if not edit_success:
578                return edit_success, edit_msg
579
580    unseen_df, update_df, delta_df = (
581        pipe.filter_existing(df, include_unchanged_columns=True, debug=debug)
582        if check_existing and not upsert
583        else (None, df, df)
584    )
585    num_insert = len(unseen_df) if unseen_df is not None else 0
586    num_update = len(update_df) if update_df is not None else 0
587    msg = (
588        f"Inserted {num_insert}, updated {num_update} rows."
589        if not upsert
590        else f"Upserted {num_update} rows."
591    )
592    if len(delta_df) == 0:
593        return True, msg
594
595    unseen_docs = unseen_df.to_dict(orient='records') if unseen_df is not None else []
596    unseen_indices_docs = _serialize_indices_docs(unseen_docs)
597    unseen_ix_vals = {
598        get_document_key(doc, indices, table_name): serialize_document(doc)
599        for doc in unseen_docs
600    }
601    for key, val in unseen_ix_vals.items():
602        try:
603            self.set(key, val)
604        except Exception as e:
605            return False, f"Failed to set keys for {pipe}:\n{e}"
606
607    try:
608        self.push_docs(
609            unseen_indices_docs,
610            pipe.target,
611            datetime_column=dt_col,
612            debug=debug,
613        )
614    except Exception as e:
615        return False, f"Failed to push docs to '{pipe.target}':\n{e}"
616
617    update_docs = update_df.to_dict(orient='records') if update_df is not None else []
618    update_ix_docs = {
619        get_document_key(doc, indices, table_name): doc
620        for doc in update_docs
621    }
622    existing_docs_data = {
623        key: self.get(key)
624        for key in update_ix_docs
625    } if pipe.exists(debug=debug) else {}
626    existing_docs = {
627        key: json.loads(data)
628        for key, data in existing_docs_data.items()
629        if data
630    }
631    new_update_docs = {
632        key: doc
633        for key, doc in update_ix_docs.items()
634        if key not in existing_docs
635    }
636    new_ix_vals = {
637        get_document_key(doc, indices, table_name): serialize_document(doc)
638        for doc in new_update_docs.values()
639    }
640    for key, val in new_ix_vals.items():
641        try:
642            self.set(key, val)
643        except Exception as e:
644            return False, f"Failed to set keys for {pipe}:\n{e}"
645
646    old_update_docs = {
647        key: {
648            **existing_docs[key],
649            **doc
650        }
651        for key, doc in update_ix_docs.items()
652        if key in existing_docs
653    }
654    new_indices_docs = _serialize_indices_docs([doc for doc in new_update_docs.values()])
655    try:
656        if new_indices_docs:
657            self.push_docs(
658                new_indices_docs,
659                pipe.target,
660                datetime_column=dt_col,
661                debug=debug,
662            )
663    except Exception as e:
664        return False, f"Failed to upsert '{pipe.target}':\n{e}"
665
666    for key, doc in old_update_docs.items():
667        try:
668            self.set(key, serialize_document(doc))
669        except Exception as e:
670            return False, f"Failed to set keys for {pipe}:\n{e}"
671
672    return True, msg

Upsert new documents into the pipe's collection.

Parameters
  • pipe (mrsm.Pipe): The pipe whose collection should receive the new documents.
  • df (Union['pd.DataFrame', Iterator['pd.DataFrame']], default None): The data to be synced.
  • check_existing (bool, default True): If False, do not check the documents against existing data and instead insert directly.
Returns
  • A SuccessTuple indicating success.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False, **kwargs: Any) -> Dict[str, str]:
675def get_pipe_columns_types(
676    self,
677    pipe: mrsm.Pipe,
678    debug: bool = False,
679    **kwargs: Any
680) -> Dict[str, str]:
681    """
682    Return the data types for the columns in the target table for data type enforcement.
683
684    Parameters
685    ----------
686    pipe: mrsm.Pipe
687        The pipe whose target table contains columns and data types.
688
689    Returns
690    -------
691    A dictionary mapping columns to data types.
692    """
693    if not pipe.exists(debug=debug):
694        return {}
695
696    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
697    return {
698        col: get_db_type_from_pd_type(typ, flavor='postgresql')
699        for col, typ in pipe.parameters.get('valkey', {}).get('dtypes', {}).items()
700    }

Return the data types for the columns in the target table for data type enforcement.

Parameters
  • pipe (mrsm.Pipe): The pipe whose target table contains columns and data types.
Returns
  • A dictionary mapping columns to data types.
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
703def clear_pipe(
704    self,
705    pipe: mrsm.Pipe,
706    begin: Union[datetime, int, None] = None,
707    end: Union[datetime, int, None] = None,
708    params: Optional[Dict[str, Any]] = None,
709    debug: bool = False,
710) -> mrsm.SuccessTuple:
711    """
712    Delete rows within `begin`, `end`, and `params`.
713
714    Parameters
715    ----------
716    pipe: mrsm.Pipe
717        The pipe whose rows to clear.
718
719    begin: Union[datetime, int, None], default None
720        If provided, remove rows >= `begin`.
721
722    end: Union[datetime, int, None], default None
723        If provided, remove rows < `end`.
724
725    params: Optional[Dict[str, Any]], default None
726        If provided, only remove rows which match the `params` filter.
727
728    Returns
729    -------
730    A `SuccessTuple` indicating success.
731    """
732    dt_col = pipe.columns.get('datetime', None)
733
734    existing_df = pipe.get_data(
735        begin=begin,
736        end=end,
737        params=params,
738        debug=debug,
739    )
740    if existing_df is None or len(existing_df) == 0:
741        return True, "Deleted 0 rows."
742
743    docs = existing_df.to_dict(orient='records')
744    table_name = self.quote_table(pipe.target)
745    indices = [col for col in pipe.columns.values() if col]
746    for doc in docs:
747        set_doc_key = get_document_key(doc, indices)
748        table_doc_key = get_document_key(doc, indices, table_name)
749        try:
750            if dt_col:
751                self.client.zrem(table_name, set_doc_key)
752            else:
753                self.client.srem(table_name, set_doc_key)
754            self.client.delete(table_doc_key)
755        except Exception as e:
756            return False, f"Failed to delete documents:\n{e}"
757    msg = (
758        f"Deleted {len(docs)} row"
759        + ('s' if len(docs) != 1 else '')
760        + '.'
761    )
762    return True, msg

Delete rows within begin, end, and params.

Parameters
  • pipe (mrsm.Pipe): The pipe whose rows to clear.
  • begin (Union[datetime, int, None], default None): If provided, remove rows >= begin.
  • end (Union[datetime, int, None], default None): If provided, remove rows < end.
  • params (Optional[Dict[str, Any]], default None): If provided, only remove rows which match the params filter.
Returns
  • A SuccessTuple indicating success.
def get_sync_time( self, pipe: meerschaum.Pipe, newest: bool = True, **kwargs: Any) -> Union[datetime.datetime, int, NoneType]:
765def get_sync_time(
766    self,
767    pipe: mrsm.Pipe,
768    newest: bool = True,
769    **kwargs: Any
770) -> Union[datetime, int, None]:
771    """
772    Return the newest (or oldest) timestamp in a pipe.
773    """
774    from meerschaum.utils.dtypes import are_dtypes_equal
775    dt_col = pipe.columns.get('datetime', None)
776    dt_typ = pipe.dtypes.get(dt_col, 'datetime64[ns, UTC]')
777    if not dt_col:
778        return None
779
780    dateutil_parser = mrsm.attempt_import('dateutil.parser')
781    table_name = self.quote_table(pipe.target)
782    try:
783        vals = (
784            self.client.zrevrange(table_name, 0, 0)
785            if newest
786            else self.client.zrange(table_name, 0, 0)
787        )
788        if not vals:
789            return None
790        val = vals[0]
791    except Exception:
792        return None
793
794    doc = json.loads(val)
795    dt_val = doc.get(dt_col, None)
796    if dt_val is None:
797        return None
798
799    try:
800        return (
801            int(dt_val)
802            if are_dtypes_equal(dt_typ, 'int')
803            else dateutil_parser.parse(str(dt_val))
804        )
805    except Exception as e:
806        warn(f"Failed to parse sync time for {pipe}:\n{e}")
807
808    return None

Return the newest (or oldest) timestamp in a pipe.

def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Optional[int]:
811def get_pipe_rowcount(
812    self,
813    pipe: mrsm.Pipe,
814    begin: Union[datetime, int, None] = None,
815    end: Union[datetime, int, None] = None,
816    params: Optional[Dict[str, Any]] = None,
817    debug: bool = False,
818    **kwargs: Any
819) -> Union[int, None]:
820    """
821    Return the number of documents in the pipe's set.
822    """
823    dt_col = pipe.columns.get('datetime', None)
824    table_name = self.quote_table(pipe.target)
825
826    if not pipe.exists():
827        return 0
828
829    try:
830        if begin is None and end is None and params is None:
831            return (
832                self.client.zcard(table_name)
833                if dt_col
834                else self.client.llen(table_name)
835            )
836    except Exception:
837        return None
838
839    df = pipe.get_data(begin=begin, end=end, params=params, debug=debug)
840    if df is None:
841        return 0
842
843    return len(df)

Return the number of documents in the pipe's set.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Optional[List[Tuple[str, str, Optional[str]]]]:
846def fetch_pipes_keys(
847    self,
848    connector_keys: Optional[List[str]] = None,
849    metric_keys: Optional[List[str]] = None,
850    location_keys: Optional[List[str]] = None,
851    tags: Optional[List[str]] = None,
852    params: Optional[Dict[str, Any]] = None,
853    debug: bool = False
854) -> Optional[List[Tuple[str, str, Optional[str]]]]:
855    """
856    Return the keys for the registered pipes.
857    """
858    from meerschaum.utils.dataframe import query_df
859    from meerschaum.utils.misc import separate_negation_values
860    try:
861        df = self.read(PIPES_TABLE, debug=debug)
862    except Exception:
863        return []
864
865    if df is None or len(df) == 0:
866        return []
867
868    query = {}
869    if connector_keys:
870        query['connector_keys'] = [str(k) for k in connector_keys]
871    if metric_keys:
872        query['metric_key'] = [str(k) for k in metric_keys]
873    if location_keys:
874        query['location_key'] = [str(k) for k in location_keys]
875    if params:
876        query.update(params)
877
878    df = query_df(df, query, inplace=True)
879
880    keys = [
881        (
882            doc['connector_keys'],
883            doc['metric_key'],
884            doc['location_key'],
885        )
886        for doc in df.to_dict(orient='records')
887    ]
888    if not tags:
889        return keys
890
891    tag_groups = [tag.split(',') for tag in tags]
892    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
893
894    filtered_keys = []
895    for ck, mk, lk in keys:
896        pipe = mrsm.Pipe(ck, mk, lk, instance=self)
897        pipe_tags = set(pipe.tags)
898        
899        include_pipe = True
900        for in_tags, ex_tags in in_ex_tag_groups:
901            all_in = all(tag in pipe_tags for tag in in_tags)
902            any_ex = any(tag in pipe_tags for tag in ex_tags)
903
904            if (not all_in) or any_ex:
905                include_pipe = False
906                continue
907
908        if include_pipe:
909            filtered_keys.append((ck, mk, lk))
910
911    return filtered_keys

Return the keys for the registered pipes.

def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> List[Dict[str, Any]]:
17def fetch(
18    self,
19    pipe: mrsm.Pipe,
20    begin: Union[datetime, int, None] = None,
21    end: Union[datetime, int, None] = None,
22    params: Optional[Dict[str, Any]] = None,
23    debug: bool = False,
24    **kwargs: Any
25) -> List[Dict[str, Any]]:
26    """
27    Return data from a source database.
28    """
29    source_key = pipe.parameters.get('valkey', {}).get('key', None)
30    if not source_key:
31        return []
32
33    try:
34        key_type = self.client.type(source_key).decode('utf-8')
35    except Exception:
36        warn(f"Could not determine the type for key '{source_key}'.")
37        return []
38
39    begin_ts = (
40        (
41            int(begin.replace(tzinfo=timezone.utc).timestamp())
42            if isinstance(begin, datetime)
43            else int(begin)
44        )
45        if begin is not None else '-inf'
46    )
47    end_ts = (
48        (
49            int(end.replace(tzinfo=timezone.utc).timestamp())
50            if isinstance(end, datetime)
51            else int(end)
52        )
53        if end is not None else '+inf'
54    )
55
56    if debug:
57        dprint(f"Reading documents with {begin_ts=}, {end_ts=}")
58
59    if key_type == 'set':
60        return [
61            json.loads(doc_bytes.decode('utf-8'))
62            for doc_bytes in self.client.smembers(source_key)
63        ]
64
65    if key_type == 'zset':
66        return [
67            json.loads(doc_bytes.decode('utf-8'))
68            for doc_bytes in self.client.zrangebyscore(
69                source_key,
70                begin_ts,
71                end_ts,
72                withscores=False,
73            )
74        ]
75
76    return [{source_key: self.get(source_key)}]

Return data from a source database.

def get_users_pipe(self):
20def get_users_pipe(self):
21    """
22    Return the pipe which stores the registered users.
23    """
24    return mrsm.Pipe(
25        'mrsm', 'users',
26        columns=['user_id'],
27        temporary=True,
28        target=USERS_TABLE,
29        instance=self,
30    )

Return the pipe which stores the registered users.

@classmethod
def get_user_key( cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str:
33@classmethod
34def get_user_key(cls, user_id_or_username: str, sub_key: str, by_username: bool = False) -> str:
35    """
36    Return the key to store metadata about a user.
37
38    Parameters
39    ----------
40    user_id_or_username: str
41        The user ID or username of the given user.
42        If `by_username` is `True`, then provide the username.
43
44    sub_key: str
45        The key suffix, e.g. `'attributes'`.
46
47    by_username: bool, default False
48        If `True`, then treat `user_id_or_username` as a username.
49
50    Returns
51    -------
52    A key to store information about a user.
53
54    Examples
55    --------
56    >>> get_user_key('deadbeef', 'attributes')
57    'mrsm_user:user_id:deadbeef:attributes'
58    >>> get_user_key('foo', 'user_id', by_username=True)
59    'mrsm_user:username:foo:user_id'
60    """
61    key_type = 'user_id' if not by_username else 'username'
62    return cls.get_entity_key(USER_PREFIX, key_type, user_id_or_username, sub_key)

Return the key to store metadata about a user.

Parameters
  • user_id_or_username (str): The user ID or username of the given user. If by_username is True, then provide the username.
  • sub_key (str): The key suffix, e.g. 'attributes'.
  • by_username (bool, default False): If True, then treat user_id_or_username as a username.
Returns
  • A key to store information about a user.
Examples
>>> get_user_key('deadbeef', 'attributes')
'mrsm_user:user_id:deadbeef:attributes'
>>> get_user_key('foo', 'user_id', by_username=True)
'mrsm_user:username:foo:user_id'
@classmethod
def get_user_keys_vals( cls, user: meerschaum.core.User._User.User, mutable_only: bool = False) -> Dict[str, str]:
 65@classmethod
 66def get_user_keys_vals(
 67    cls,
 68    user: 'mrsm.core.User',
 69    mutable_only: bool = False,
 70) -> Dict[str, str]:
 71    """
 72    Return a dictionary containing keys and values to set for the user.
 73
 74    Parameters
 75    ----------
 76    user: mrsm.core.User
 77        The user for which to generate the keys.
 78
 79    mutable_only: bool, default False
 80        If `True`, only return keys which may be edited.
 81
 82    Returns
 83    -------
 84    A dictionary mapping a user's keys to values.
 85    """
 86    user_attributes_str = json.dumps(user.attributes, separators=(',', ':'))
 87    mutable_keys_vals = {
 88        cls.get_user_key(user.user_id, 'attributes'): user_attributes_str,
 89        cls.get_user_key(user.user_id, 'email'): user.email,
 90        cls.get_user_key(user.user_id, 'type'): user.type,
 91        cls.get_user_key(user.user_id, 'password_hash'): user.password_hash,
 92    }
 93    if mutable_only:
 94        return mutable_keys_vals
 95
 96    immutable_keys_vals = {
 97        cls.get_user_key(user.user_id, 'username'): user.username,
 98        cls.get_user_key(user.username, 'user_id', by_username=True): user.user_id,
 99    }
100
101    return {**immutable_keys_vals, **mutable_keys_vals}

Return a dictionary containing keys and values to set for the user.

Parameters
  • user (mrsm.core.User): The user for which to generate the keys.
  • mutable_only (bool, default False): If True, only return keys which may be edited.
Returns
  • A dictionary mapping a user's keys to values.
def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
104def register_user(
105    self,
106    user: 'mrsm.core.User',
107    debug: bool = False,
108    **kwargs: Any
109) -> SuccessTuple:
110    """
111    Register a new user.
112    """
113    from meerschaum.utils.misc import generate_password
114
115    user.user_id = generate_password(12)
116    users_pipe = self.get_users_pipe()
117    keys_vals = self.get_user_keys_vals(user)
118
119    try:
120        sync_success, sync_msg = users_pipe.sync(
121            [
122                {
123                    'user_id': user.user_id,
124                    'username': user.username,
125                },
126            ],
127            check_existing=False,
128            debug=debug,
129        )
130        if not sync_success:
131            return sync_success, sync_msg
132
133        for key, val in keys_vals.items():
134            if val is not None:
135                self.set(key, val)
136
137        success, msg = True, "Success"
138    except Exception as e:
139        success = False
140        import traceback
141        traceback.print_exc()
142        msg = f"Failed to register '{user.username}':\n{e}"
143
144    if not success:
145        for key in keys_vals:
146            try:
147                self.client.delete(key)
148            except Exception:
149                pass
150
151    return success, msg

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[str]:
154def get_user_id(self, user: 'mrsm.core.User', debug: bool = False) -> Union[str, None]:
155    """
156    Return the ID for a user, or `None`.
157    """
158    username_user_id_key = self.get_user_key(user.username, 'user_id', by_username=True)
159    try:
160        user_id = self.get(username_user_id_key)
161    except Exception:
162        user_id = None
163    return user_id

Return the ID for a user, or None.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
166def edit_user(
167    self,
168    user: 'mrsm.core.User',
169    debug: bool = False,
170    **kw: Any
171) -> SuccessTuple:
172    """
173    Edit the attributes for an existing user.
174    """
175    keys_vals = self.get_user_keys_vals(user, mutable_only=True)
176    try:
177        old_keys_vals = {
178            key: self.get(key)
179            for key in keys_vals
180        }
181    except Exception as e:
182        return False, f"Failed to edit user:\n{e}"
183
184    try:
185        for key, val in keys_vals.items():
186            self.set(key, val)
187        success, msg = True, "Success"
188    except Exception as e:
189        success = False
190        msg = f"Failed to edit user:\n{e}"
191
192    if not success:
193        try:
194            for key, old_val in old_keys_vals.items():
195                self.set(key, old_val)
196        except Exception:
197            pass
198
199    return success, msg

Edit the attributes for an existing user.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[Dict[str, Any]]:
202def get_user_attributes(
203    self,
204    user: 'mrsm.core.User',
205    debug: bool = False
206) -> Union[Dict[str, Any], None]:
207    """
208    Return the user's attributes.
209    """
210    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
211    user_id_attributes_key = self.get_user_key(user_id, 'attributes')
212    try:
213        return json.loads(self.get(user_id_attributes_key))
214    except Exception:
215        return None

Return the user's attributes.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Tuple[bool, str]:
218def delete_user(
219    self,
220    user: 'mrsm.core.User',
221    debug: bool = False
222) -> SuccessTuple:
223    """
224    Delete a user's keys.
225    """
226    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
227    users_pipe = self.get_users_pipe()
228    keys_vals = self.get_user_keys_vals(user)
229    try:
230        old_keys_vals = {
231            key: self.get(key)
232            for key in keys_vals
233        }
234    except Exception as e:
235        return False, f"Failed to delete user:\n{e}"
236
237    clear_success, clear_msg = users_pipe.clear(params={'user_id': user_id})
238    if not clear_success:
239        return clear_success, clear_msg
240
241    try:
242        for key in keys_vals:
243            self.client.delete(key)
244        success, msg = True, "Success"
245    except Exception as e:
246        success = False
247        msg = f"Failed to delete user:\n{e}"
248
249    if not success:
250        try:
251            for key, old_val in old_keys_vals.items():
252                self.set(key, old_val)
253        except Exception:
254            pass
255
256    return success, msg

Delete a user's keys.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
259def get_users(
260    self,
261    debug: bool = False,
262    **kw: Any
263) -> List[str]:
264    """
265    Get the registered usernames.
266    """
267    users_pipe = self.get_users_pipe()
268    df = users_pipe.get_data()
269    if df is None:
270        return []
271
272    return list(df['username'])

Get the registered usernames.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
275def get_user_password_hash(
276    self,
277    user: 'mrsm.core.User',
278    debug: bool = False,
279    **kw: Any
280) -> Union[str, None]:
281    """
282    Return the password has for a user.
283    """
284    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
285    user_id_password_hash_key = self.get_user_key(user_id, 'password_hash')
286    try:
287        return self.get(user_id_password_hash_key)
288    except Exception:
289        return None

Return the password has for a user.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
292def get_user_type(
293    self,
294    user: 'mrsm.core.User',
295    debug: bool = False,
296    **kw: Any
297) -> Union[str, None]:
298    """
299    Return the user's type.
300    """
301    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
302    user_id_type_key = self.get_user_key(user_id, 'type')
303    try:
304        return self.get(user_id_type_key)
305    except Exception:
306        return None

Return the user's type.

def get_plugins_pipe(self) -> meerschaum.Pipe:
20def get_plugins_pipe(self) -> mrsm.Pipe:
21    """
22    Return the pipe to store the plugins.
23    """
24    return mrsm.Pipe(
25        'mrsm', 'plugins',
26        columns=['plugin_name'],
27        temporary=True,
28        target=PLUGINS_TABLE,
29        instance=self,
30    )

Return the pipe to store the plugins.

@classmethod
def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str:
33@classmethod
34def get_plugin_key(cls, plugin_name: str, sub_key: str) -> str:
35    """
36    Return the key for a plugin's attribute.
37    """
38    return cls.get_entity_key(PLUGIN_PREFIX, plugin_name, sub_key)

Return the key for a plugin's attribute.

@classmethod
def get_plugin_keys_vals( cls, plugin: meerschaum.Plugin, mutable_only: bool = False) -> Dict[str, str]:
41@classmethod
42def get_plugin_keys_vals(
43    cls,
44    plugin: 'mrsm.core.Plugin',
45    mutable_only: bool = False,
46) -> Dict[str, str]:
47    """
48    Return a dictionary containing keys and values to set for the plugin.
49
50    Parameters
51    ----------
52    plugin: mrsm.core.Plugin
53        The plugin for which to generate the keys.
54
55    mutable_only: bool, default False
56        If `True`, only return keys which may be edited.
57
58    Returns
59    -------
60    A dictionary mapping a plugins's keys to values.
61    """
62    plugin_attributes_str = json.dumps(plugin.attributes, separators=(',', ':'))
63    mutable_keys_vals = {
64        cls.get_plugin_key(plugin.name, 'attributes'): plugin_attributes_str,
65        cls.get_plugin_key(plugin.name, 'version'): plugin.version,
66    }
67    if mutable_only:
68        return mutable_keys_vals
69
70    immutable_keys_vals = {
71        cls.get_plugin_key(plugin.name, 'user_id'): plugin.user_id,
72    }
73
74    return {**immutable_keys_vals, **mutable_keys_vals}

Return a dictionary containing keys and values to set for the plugin.

Parameters
  • plugin (mrsm.core.Plugin): The plugin for which to generate the keys.
  • mutable_only (bool, default False): If True, only return keys which may be edited.
Returns
  • A dictionary mapping a plugins's keys to values.
def register_plugin( self, plugin: meerschaum.Plugin, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 77def register_plugin(
 78    self,
 79    plugin: 'mrsm.core.Plugin',
 80    force: bool = False,
 81    debug: bool = False,
 82    **kw: Any
 83) -> SuccessTuple:
 84    """Register a new plugin to the `mrsm_plugins` "table"."""
 85    from meerschaum.utils.misc import generate_password
 86
 87    plugins_pipe = self.get_plugins_pipe()
 88    keys_vals = self.get_plugin_keys_vals(plugin)
 89
 90    try:
 91        sync_success, sync_msg = plugins_pipe.sync(
 92            [
 93                {
 94                    'plugin_name': plugin.name,
 95                    'user_id': plugin.user_id,
 96                },
 97            ],
 98            check_existing=False,
 99            debug=debug,
100        )
101        if not sync_success:
102            return sync_success, sync_msg
103
104        for key, val in keys_vals.items():
105            if val is not None:
106                self.set(key, val)
107
108        success, msg = True, "Success"
109    except Exception as e:
110        success = False
111        msg = f"Failed to register plugin '{plugin.name}':\n{e}"
112
113    if not success:
114        for key in keys_vals:
115            try:
116                self.client.delete(key)
117            except Exception:
118                pass
119
120    return success, msg

Register a new plugin to the mrsm_plugins "table".

def get_plugin_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
123def get_plugin_id(
124    self,
125    plugin: 'mrsm.core.Plugin',
126    debug: bool = False
127) -> Union[str, None]:
128    """
129    Return a plugin's ID.
130    """
131    return plugin.name

Return a plugin's ID.

def get_plugin_version( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
134def get_plugin_version(
135    self,
136    plugin: 'mrsm.core.Plugin',
137    debug: bool = False,
138) -> Union[str, None]:
139    """
140    Return a plugin's version.
141    """
142    version_key = self.get_plugin_key(plugin.name, 'version')
143
144    try:
145        return self.get(version_key)
146    except Exception:
147        return None

Return a plugin's version.

def get_plugin_user_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
150def get_plugin_user_id(
151    self,
152    plugin: 'mrsm.core.Plugin',
153    debug: bool = False
154) -> Union[str, None]:
155    """
156    Return a plugin's user ID.
157    """
158    user_id_key = self.get_plugin_key(plugin.name, 'user_id')
159
160    try:
161        return self.get(user_id_key)
162    except Exception:
163        return None

Return a plugin's user ID.

def get_plugin_username( self, plugin: meerschaum.Plugin, debug: bool = False) -> str:
166def get_plugin_username(
167    self,
168    plugin: 'mrsm.core.Plugin',
169    debug: bool = False
170) -> Union[str]:
171    """
172    Return the username of a plugin's owner.
173    """
174    user_id = self.get_plugin_user_id(plugin, debug=debug)
175    if user_id is None:
176        return None
177
178    username_key = self.get_user_key(user_id, 'username')
179    try:
180        return self.get(username_key)
181    except Exception:
182        return None

Return the username of a plugin's owner.

def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
185def get_plugin_attributes(
186    self,
187    plugin: 'mrsm.core.Plugin',
188    debug: bool = False
189) -> Dict[str, Any]:
190    """
191    Return the attributes of a plugin.
192    """
193    attributes_key = self.get_plugin_key(plugin.name, 'attributes')
194    try:
195        attributes_str = self.get(attributes_key)
196        if not attributes_str:
197            return {}
198        return json.loads(attributes_str)
199    except Exception:
200        return {}

Return the attributes of a plugin.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) -> List[str]:
203def get_plugins(
204    self,
205    user_id: Optional[int] = None,
206    search_term: Optional[str] = None,
207    debug: bool = False,
208    **kw: Any
209) -> List[str]:
210    """
211    Return a list of plugin names.
212    """
213    plugins_pipe = self.get_plugins_pipe()
214    params = {}
215    if user_id:
216        params['user_id'] = user_id
217
218    df = plugins_pipe.get_data(['plugin_name'], params=params, debug=debug)
219    docs = df.to_dict(orient='records')
220
221    return [
222        doc['plugin_name']
223        for doc in docs
224        if (plugin_name := doc['plugin_name']).startswith(search_term or '')
225    ]

Return a list of plugin names.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
228def delete_plugin(
229    self,
230    plugin: 'mrsm.core.Plugin',
231    debug: bool = False,
232    **kw: Any
233) -> SuccessTuple:
234    """
235    Delete a plugin from the plugins table.
236    """
237    plugins_pipe = self.get_plugins_pipe()
238    clear_success, clear_msg = plugins_pipe.clear(params={'plugin_name': plugin.name}, debug=debug)
239    if not clear_success:
240        return clear_success, clear_msg
241
242    keys_vals = self.get_plugin_keys_vals(plugin)
243    try:
244        old_keys_vals = {
245            key: self.get(key)
246            for key in keys_vals
247        }
248    except Exception as e:
249        return False, f"Failed to delete plugin '{plugin.name}':\n{e}"
250
251    try:
252        for key in keys_vals:
253            self.client.delete(key)
254        success, msg = True, "Success"
255    except Exception as e:
256        success = False
257        msg = f"Failed to delete plugin '{plugin.name}':\n{e}"
258
259    if not success:
260        try:
261            for key, old_val in old_keys_vals.items():
262                self.set(key, old_val)
263        except Exception:
264            pass
265
266    return success, msg

Delete a plugin from the plugins table.