meerschaum.connectors.api

Interact with the Meerschaum API (send commands, pull data, etc.)

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Interact with the Meerschaum API (send commands, pull data, etc.)
 7"""
 8
 9from meerschaum.connectors.api._APIConnector import APIConnector
10
11__all__ = ('APIConnector',)
class APIConnector(meerschaum.connectors.instance._InstanceConnector.InstanceConnector):
 22class APIConnector(InstanceConnector):
 23    """
 24    Connect to a Meerschaum API instance.
 25    """
 26
 27    IS_THREAD_SAFE: bool = False
 28    OPTIONAL_ATTRIBUTES: List[str] = ['port', 'client_secret', 'client_id', 'api_key']
 29
 30    from ._request import (
 31        make_request,
 32        get,
 33        post,
 34        put,
 35        patch,
 36        delete,
 37        wget,
 38    )
 39    from ._actions import (
 40        get_actions,
 41        do_action,
 42        do_action_async,
 43        do_action_legacy,
 44    )
 45    from ._misc import get_mrsm_version, get_chaining_status
 46    from ._pipes import (
 47        get_pipe_instance_keys,
 48        register_pipe,
 49        fetch_pipes_keys,
 50        edit_pipe,
 51        sync_pipe,
 52        delete_pipe,
 53        delete_pipe_cache,
 54        get_pipe_data,
 55        get_pipe_id,
 56        get_pipe_attributes,
 57        get_sync_time,
 58        pipe_exists,
 59        create_metadata,
 60        get_pipe_rowcount,
 61        drop_pipe,
 62        clear_pipe,
 63        get_pipe_columns_types,
 64        get_pipe_columns_indices,
 65        get_pipe_docs,
 66        get_pipe_size,
 67        compress_pipe,
 68        decompress_pipe,
 69        vacuum_pipe,
 70        analyze_pipe,
 71        partition_pipe,
 72    )
 73    from ._fetch import fetch
 74    from ._plugins import (
 75        register_plugin,
 76        install_plugin,
 77        delete_plugin,
 78        get_plugins,
 79        get_plugin_attributes,
 80    )
 81    from ._login import login, test_connection
 82    from ._users import (
 83        register_user,
 84        get_user_id,
 85        get_users,
 86        edit_user,
 87        delete_user,
 88        get_user_password_hash,
 89        get_user_type,
 90        get_user_attributes,
 91    )
 92    from ._tokens import (
 93        register_token,
 94        get_token_model,
 95        get_tokens,
 96        edit_token,
 97        invalidate_token,
 98        get_token_scopes,
 99        token_exists,
100        delete_token,
101    )
102    from ._uri import from_uri
103    from ._jobs import (
104        get_jobs,
105        get_job,
106        get_job_metadata,
107        get_job_properties,
108        get_job_exists,
109        delete_job,
110        start_job,
111        create_job,
112        stop_job,
113        pause_job,
114        get_logs,
115        get_job_stop_time,
116        monitor_logs,
117        monitor_logs_async,
118        get_job_is_blocking_on_stdin,
119        get_job_began,
120        get_job_ended,
121        get_job_paused,
122        get_job_status,
123    )
124
125    def __init__(
126        self,
127        label: Optional[str] = None,
128        wait: bool = False,
129        debug: bool = False,
130        **kw
131    ):
132        if 'uri' in kw:
133            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
134            label = label or from_uri_params.get('label', None)
135            _ = from_uri_params.pop('label', None)
136            kw.update(from_uri_params)
137
138        super().__init__('api', label=label, **kw)
139        if 'protocol' not in self.__dict__:
140            self.protocol = (
141                'https' if self.__dict__.get('uri', '').startswith('https')
142                else 'http'
143            )
144
145        if 'uri' not in self.__dict__:
146            self.verify_attributes(required_attributes)
147        else:
148            from meerschaum.connectors.sql import SQLConnector
149            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
150            if 'host' not in conn_attrs:
151                raise Exception(f"Invalid URI for '{self}'.")
152            self.__dict__.update(conn_attrs)
153
154        self.url = (
155            self.protocol + '://' +
156            self.host
157            + (
158                (':' + str(self.port))
159                if self.__dict__.get('port', None)
160                else ''
161            )
162        )
163        self._token = None
164        self._expires = None
165        self._session = None
166        self._instance_keys = self.__dict__.get('instance_keys', None)
167
168
169    @property
170    def URI(self) -> str:
171        """
172        Return the fully qualified URI.
173        """
174        import urllib.parse
175        username = self.__dict__.get('username', None)
176        password = self.__dict__.get('password', None)
177        client_id = self.__dict__.get('client_id', None)
178        client_secret = self.__dict__.get('client_secret', None)
179        api_key = self.__dict__.get('api_key', None)
180        creds = (username + ':' + password + '@') if username and password else ''
181        params = {}
182        params_str = ('?' + urllib.parse.urlencode(params)) if params else ''
183        return (
184            self.protocol
185            + '://'
186            + creds
187            + self.host
188            + (
189                (':' + str(self.port))
190                if self.__dict__.get('port', None)
191                else ''
192            )
193            + params_str
194        )
195
196    @property
197    def session(self):
198        if self._session is None:
199            _ = attempt_import('certifi', lazy=False)
200            requests = attempt_import('requests', lazy=False)
201            if requests:
202                self._session = requests.Session()
203            if self._session is None:
204                error("Failed to import requests. Is requests installed?")
205        return self._session
206
207    @property
208    def token(self):
209        if self.login_scheme == 'api_key':
210            return self.api_key
211
212        expired = (
213            True if self._expires is None else (
214                (
215                    self._expires
216                    <
217                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
218                )
219            )
220        )
221
222        if self._token is None or expired:
223            success, msg = self.login()
224            if not success and not self.__dict__.get('_emitted_warning'):
225                warn(msg, stack=False)
226                self._emitted_warning = True
227        return self._token
228
229    @property
230    def instance_keys(self) -> Union[str, None]:
231        """
232        Return the instance keys to be sent alongside pipe requests.
233        """
234        return self._instance_keys
235
236    @property
237    def login_scheme(self) -> str:
238        """
239        Return the login scheme to use based on the configured credentials.
240        """
241        if 'username' in self.__dict__:
242            return 'password'
243        if 'client_id' in self.__dict__:
244            return 'client_credentials'
245        elif 'api_key' in self.__dict__:
246            return 'api_key'
247
248        return 'password'

Connect to a Meerschaum API instance.

APIConnector( label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)
125    def __init__(
126        self,
127        label: Optional[str] = None,
128        wait: bool = False,
129        debug: bool = False,
130        **kw
131    ):
132        if 'uri' in kw:
133            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
134            label = label or from_uri_params.get('label', None)
135            _ = from_uri_params.pop('label', None)
136            kw.update(from_uri_params)
137
138        super().__init__('api', label=label, **kw)
139        if 'protocol' not in self.__dict__:
140            self.protocol = (
141                'https' if self.__dict__.get('uri', '').startswith('https')
142                else 'http'
143            )
144
145        if 'uri' not in self.__dict__:
146            self.verify_attributes(required_attributes)
147        else:
148            from meerschaum.connectors.sql import SQLConnector
149            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
150            if 'host' not in conn_attrs:
151                raise Exception(f"Invalid URI for '{self}'.")
152            self.__dict__.update(conn_attrs)
153
154        self.url = (
155            self.protocol + '://' +
156            self.host
157            + (
158                (':' + str(self.port))
159                if self.__dict__.get('port', None)
160                else ''
161            )
162        )
163        self._token = None
164        self._expires = None
165        self._session = None
166        self._instance_keys = self.__dict__.get('instance_keys', None)

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
IS_THREAD_SAFE: bool = False
OPTIONAL_ATTRIBUTES: List[str] = ['port', 'client_secret', 'client_id', 'api_key']
url
URI: str
169    @property
170    def URI(self) -> str:
171        """
172        Return the fully qualified URI.
173        """
174        import urllib.parse
175        username = self.__dict__.get('username', None)
176        password = self.__dict__.get('password', None)
177        client_id = self.__dict__.get('client_id', None)
178        client_secret = self.__dict__.get('client_secret', None)
179        api_key = self.__dict__.get('api_key', None)
180        creds = (username + ':' + password + '@') if username and password else ''
181        params = {}
182        params_str = ('?' + urllib.parse.urlencode(params)) if params else ''
183        return (
184            self.protocol
185            + '://'
186            + creds
187            + self.host
188            + (
189                (':' + str(self.port))
190                if self.__dict__.get('port', None)
191                else ''
192            )
193            + params_str
194        )

Return the fully qualified URI.

session
196    @property
197    def session(self):
198        if self._session is None:
199            _ = attempt_import('certifi', lazy=False)
200            requests = attempt_import('requests', lazy=False)
201            if requests:
202                self._session = requests.Session()
203            if self._session is None:
204                error("Failed to import requests. Is requests installed?")
205        return self._session
token
207    @property
208    def token(self):
209        if self.login_scheme == 'api_key':
210            return self.api_key
211
212        expired = (
213            True if self._expires is None else (
214                (
215                    self._expires
216                    <
217                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
218                )
219            )
220        )
221
222        if self._token is None or expired:
223            success, msg = self.login()
224            if not success and not self.__dict__.get('_emitted_warning'):
225                warn(msg, stack=False)
226                self._emitted_warning = True
227        return self._token
instance_keys: Optional[str]
229    @property
230    def instance_keys(self) -> Union[str, None]:
231        """
232        Return the instance keys to be sent alongside pipe requests.
233        """
234        return self._instance_keys

Return the instance keys to be sent alongside pipe requests.

login_scheme: str
236    @property
237    def login_scheme(self) -> str:
238        """
239        Return the login scheme to use based on the configured credentials.
240        """
241        if 'username' in self.__dict__:
242            return 'password'
243        if 'client_id' in self.__dict__:
244            return 'client_credentials'
245        elif 'api_key' in self.__dict__:
246            return 'api_key'
247
248        return 'password'

Return the login scheme to use based on the configured credentials.

def make_request( self, method: str, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kwargs: Any) -> requests.models.Response:
 28def make_request(
 29    self,
 30    method: str,
 31    r_url: str,
 32    headers: Optional[Dict[str, Any]] = None,
 33    use_token: bool = True,
 34    debug: bool = False,
 35    **kwargs: Any
 36) -> 'requests.Response':
 37    """
 38    Make a request to this APIConnector's endpoint using the in-memory session.
 39
 40    Parameters
 41    ----------
 42    method: str
 43        The kind of request to make.
 44        Accepted values:
 45        - `'GET'`
 46        - `'OPTIONS'`
 47        - `'HEAD'`
 48        - `'POST'`
 49        - `'PUT'`
 50        - `'PATCH'`
 51        - `'DELETE'`
 52
 53    r_url: str
 54        The relative URL for the endpoint (e.g. `'/pipes'`).
 55
 56    headers: Optional[Dict[str, Any]], default None
 57        The headers to use for the request.
 58        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
 59
 60    use_token: bool, default True
 61        If `True`, add the authorization token to the headers.
 62
 63    debug: bool, default False
 64        Verbosity toggle.
 65
 66    kwargs: Any
 67        All other keyword arguments are passed to `requests.request`.
 68
 69    Returns
 70    -------
 71    A `requests.Reponse` object.
 72    """
 73    if method.upper() not in METHODS:
 74        raise ValueError(f"Method '{method}' is not supported.")
 75
 76    verify = self.__dict__.get('verify', None)
 77    if 'verify' not in kwargs and isinstance(verify, bool):
 78        kwargs['verify'] = verify
 79
 80    headers = (
 81        copy.deepcopy(headers)
 82        if isinstance(headers, dict)
 83        else {}
 84    )
 85
 86    if use_token:
 87        headers.update({'Authorization': f'Bearer {self.token}'})
 88
 89    if 'timeout' not in kwargs:
 90        kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout']
 91
 92    request_url = urllib.parse.urljoin(self.url, r_url)
 93    if debug:
 94        dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}")
 95
 96    return self.session.request(
 97        method.upper(),
 98        request_url,
 99        headers=headers,
100        **kwargs
101    )

Make a request to this APIConnector's endpoint using the in-memory session.

Parameters
  • method (str): The kind of request to make. Accepted values:
    • 'GET'
    • 'OPTIONS'
    • 'HEAD'
    • 'POST'
    • 'PUT'
    • 'PATCH'
    • 'DELETE'
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def get(self, r_url: str, **kwargs: Any) -> requests.models.Response:
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response':
105    """
106    Wrapper for `requests.get`.
107
108    Parameters
109    ----------
110    r_url: str
111        The relative URL for the endpoint (e.g. `'/pipes'`).
112
113    headers: Optional[Dict[str, Any]], default None
114        The headers to use for the request.
115        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
116
117    use_token: bool, default True
118        If `True`, add the authorization token to the headers.
119
120    debug: bool, default False
121        Verbosity toggle.
122
123    kwargs: Any
124        All other keyword arguments are passed to `requests.request`.
125
126    Returns
127    -------
128    A `requests.Reponse` object.
129
130    """
131    return self.make_request('GET', r_url, **kwargs)

Wrapper for requests.get.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def post(self, r_url: str, **kwargs: Any) -> requests.models.Response:
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response':
135    """
136    Wrapper for `requests.post`.
137
138    Parameters
139    ----------
140    r_url: str
141        The relative URL for the endpoint (e.g. `'/pipes'`).
142
143    headers: Optional[Dict[str, Any]], default None
144        The headers to use for the request.
145        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
146
147    use_token: bool, default True
148        If `True`, add the authorization token to the headers.
149
150    debug: bool, default False
151        Verbosity toggle.
152
153    kwargs: Any
154        All other keyword arguments are passed to `requests.request`.
155
156    Returns
157    -------
158    A `requests.Reponse` object.
159
160    """
161    return self.make_request('POST', r_url, **kwargs)

Wrapper for requests.post.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def put(self, r_url: str, **kwargs: Any) -> requests.models.Response:
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response':
194    """
195    Wrapper for `requests.put`.
196
197    Parameters
198    ----------
199    r_url: str
200        The relative URL for the endpoint (e.g. `'/pipes'`).
201
202    headers: Optional[Dict[str, Any]], default None
203        The headers to use for the request.
204        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
205
206    use_token: bool, default True
207        If `True`, add the authorization token to the headers.
208
209    debug: bool, default False
210        Verbosity toggle.
211
212    kwargs: Any
213        All other keyword arguments are passed to `requests.request`.
214
215    Returns
216    -------
217    A `requests.Reponse` object.
218    """
219    return self.make_request('PUT', r_url, **kwargs)

Wrapper for requests.put.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def patch(self, r_url: str, **kwargs: Any) -> requests.models.Response:
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response':
165    """
166    Wrapper for `requests.patch`.
167
168    Parameters
169    ----------
170    r_url: str
171        The relative URL for the endpoint (e.g. `'/pipes'`).
172
173    headers: Optional[Dict[str, Any]], default None
174        The headers to use for the request.
175        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
176
177    use_token: bool, default True
178        If `True`, add the authorization token to the headers.
179
180    debug: bool, default False
181        Verbosity toggle.
182
183    kwargs: Any
184        All other keyword arguments are passed to `requests.request`.
185
186    Returns
187    -------
188    A `requests.Reponse` object.
189    """
190    return self.make_request('PATCH', r_url, **kwargs)

Wrapper for requests.patch.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def delete(self, r_url: str, **kwargs: Any) -> requests.models.Response:
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response':
223    """
224    Wrapper for `requests.delete`.
225
226    Parameters
227    ----------
228    r_url: str
229        The relative URL for the endpoint (e.g. `'/pipes'`).
230
231    headers: Optional[Dict[str, Any]], default None
232        The headers to use for the request.
233        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
234
235    use_token: bool, default True
236        If `True`, add the authorization token to the headers.
237
238    debug: bool, default False
239        Verbosity toggle.
240
241    kwargs: Any
242        All other keyword arguments are passed to `requests.request`.
243
244    Returns
245    -------
246    A `requests.Reponse` object.
247    """
248    return self.make_request('DELETE', r_url, **kwargs)

Wrapper for requests.delete.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def wget( self, r_url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
251def wget(
252    self,
253    r_url: str,
254    dest: Optional[Union[str, pathlib.Path]] = None,
255    headers: Optional[Dict[str, Any]] = None,
256    use_token: bool = True,
257    debug: bool = False,
258    **kw: Any
259) -> pathlib.Path:
260    """Mimic wget with requests."""
261    from meerschaum.utils.misc import wget
262    if headers is None:
263        headers = {}
264    if use_token:
265        headers.update({'Authorization': f'Bearer {self.token}'})
266    request_url = urllib.parse.urljoin(self.url, r_url)
267    if debug:
268        dprint(
269            f"[{self}] Downloading {request_url}"
270            + (f' to {dest}' if dest is not None else '')
271            + "..."
272        )
273    return wget(request_url, dest=dest, headers=headers, **kw)

Mimic wget with requests.

def get_actions(self):
24def get_actions(self):
25    """Get available actions from the API instance."""
26    return self.get(ACTIONS_ENDPOINT)

Get available actions from the API instance.

def do_action(self, sysargs: List[str]) -> Tuple[bool, str]:
29def do_action(self, sysargs: List[str]) -> SuccessTuple:
30    """
31    Execute a Meerschaum action remotely.
32    """
33    return asyncio.run(self.do_action_async(sysargs))

Execute a Meerschaum action remotely.

async def do_action_async( self, sysargs: List[str], callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='')) -> Tuple[bool, str]:
36async def do_action_async(
37    self,
38    sysargs: List[str],
39    callback_function: Callable[[str], None] = partial(print, end=''),
40) -> SuccessTuple:
41    """
42    Execute an action as a temporary remote job.
43    """
44    from meerschaum._internal.arguments import remove_api_executor_keys
45    from meerschaum.utils.misc import generate_password
46    sysargs = remove_api_executor_keys(sysargs)
47
48    job_name = TEMP_PREFIX + generate_password(12)
49    job = mrsm.Job(job_name, sysargs, executor_keys=str(self))
50
51    start_success, start_msg = job.start()
52    if not start_success:
53        return start_success, start_msg
54
55    await job.monitor_logs_async(
56        callback_function=callback_function,
57        stop_on_exit=True,
58        strip_timestamps=True,
59    )
60
61    success, msg = job.result
62    job.delete()
63    return success, msg

Execute an action as a temporary remote job.

def do_action_legacy( self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
 66def do_action_legacy(
 67    self,
 68    action: Optional[List[str]] = None,
 69    sysargs: Optional[List[str]] = None,
 70    debug: bool = False,
 71    **kw
 72) -> SuccessTuple:
 73    """
 74    NOTE: This method is deprecated.
 75    Please use `do_action()` or `do_action_async()`.
 76
 77    Execute a Meerschaum action remotely.
 78
 79    If `sysargs` are provided, parse those instead.
 80    Otherwise infer everything from keyword arguments.
 81
 82    Examples
 83    --------
 84    >>> conn = mrsm.get_connector('api:main')
 85    >>> conn.do_action(['show', 'pipes'])
 86    (True, "Success")
 87    >>> conn.do_action(['show', 'arguments'], name='test')
 88    (True, "Success")
 89    """
 90    import sys, json
 91    from meerschaum.utils.debug import dprint
 92    from meerschaum._internal.static import STATIC_CONFIG
 93    from meerschaum.utils.misc import json_serialize_datetime
 94    if action is None:
 95        action = []
 96
 97    if sysargs is not None and action and action[0] == '':
 98        from meerschaum._internal.arguments import parse_arguments
 99        if debug:
100            dprint(f"Parsing sysargs:\n{sysargs}")
101        json_dict = parse_arguments(sysargs)
102    else:
103        json_dict = kw
104        json_dict['action'] = action
105        if 'noask' not in kw:
106            json_dict['noask'] = True
107        if 'yes' not in kw:
108            json_dict['yes'] = True
109        if debug:
110            json_dict['debug'] = debug
111
112    root_action = json_dict['action'][0]
113    del json_dict['action'][0]
114    r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}"
115    
116    if debug:
117        from meerschaum.utils.formatting import pprint
118        dprint(f"Sending data to '{self.url + r_url}':")
119        pprint(json_dict, stream=sys.stderr)
120
121    response = self.post(
122        r_url,
123        data = json.dumps(json_dict, default=json_serialize_datetime),
124        debug = debug,
125    )
126    try:
127        response_list = json.loads(response.text)
128        if isinstance(response_list, dict) and 'detail' in response_list:
129            return False, response_list['detail']
130    except Exception as e:
131        print(f"Invalid response: {response}")
132        print(e)
133        return False, response.text
134    if debug:
135        dprint(response)
136    try:
137        return response_list[0], response_list[1]
138    except Exception as e:
139        return False, f"Failed to parse result from action '{root_action}'"

NOTE: This method is deprecated. Please use do_action() or do_action_async().

Execute a Meerschaum action remotely.

If sysargs are provided, parse those instead. Otherwise infer everything from keyword arguments.

Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
def get_mrsm_version(self, **kw) -> Optional[str]:
13def get_mrsm_version(self, **kw) -> Optional[str]:
14    """
15    Return the Meerschaum version of the API instance.
16    """
17    from meerschaum._internal.static import STATIC_CONFIG
18    try:
19        j = self.get(
20            STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm',
21            use_token=False,
22            **kw
23        ).json()
24    except Exception:
25        return None
26    if isinstance(j, dict) and 'detail' in j:
27        return None
28    return j

Return the Meerschaum version of the API instance.

def get_chaining_status(self, **kw) -> Optional[bool]:
31def get_chaining_status(self, **kw) -> Optional[bool]:
32    """
33    Fetch the chaining status of the API instance.
34    """
35    from meerschaum._internal.static import STATIC_CONFIG
36    try:
37        response = self.get(
38            STATIC_CONFIG['api']['endpoints']['chaining'],
39            use_token = True,
40            **kw
41        )
42        if not response:
43            return None
44    except Exception:
45        return None
46
47    return response.json()

Fetch the chaining status of the API instance.

def get_pipe_instance_keys(self, pipe: meerschaum.Pipe) -> Optional[str]:
35def get_pipe_instance_keys(self, pipe: mrsm.Pipe) -> Union[str, None]:
36    """
37    Return the configured instance keys for a pipe if set,
38    else fall back to the default `instance_keys` for this `APIConnector`.
39    """
40    return pipe.parameters.get('instance_keys', self.instance_keys)

Return the configured instance keys for a pipe if set, else fall back to the default instance_keys for this APIConnector.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
43def register_pipe(
44    self,
45    pipe: mrsm.Pipe,
46    debug: bool = False
47) -> SuccessTuple:
48    """Submit a POST to the API to register a new Pipe object.
49    Returns a tuple of (success_bool, response_dict).
50    """
51    from meerschaum.utils.debug import dprint
52    r_url = pipe_r_url(pipe)
53    response = self.post(
54        r_url + '/register',
55        json=pipe._attributes.get('parameters', {}),
56        params={'instance_keys': self.get_pipe_instance_keys(pipe)},
57        debug=debug,
58    )
59    if debug:
60        dprint(response.text)
61
62    if not response:
63        return False, response.text
64
65    response_data = response.json()
66    if isinstance(response_data, list):
67        response_tuple = response_data[0], response_data[1]
68    elif 'detail' in response.json():
69        response_tuple = response.__bool__(), response_data['detail']
70    else:
71        response_tuple = response.__bool__(), response.text
72    return response_tuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Union[Dict[Union[int, str], Tuple[str, str, Optional[str], Dict[str, Any]]], List[Tuple[str, str, Optional[str]]]]:
108def fetch_pipes_keys(
109    self,
110    connector_keys: Optional[List[str]] = None,
111    metric_keys: Optional[List[str]] = None,
112    location_keys: Optional[List[str]] = None,
113    tags: Optional[List[str]] = None,
114    params: Optional[Dict[str, Any]] = None,
115    debug: bool = False
116) -> Union[
117    Dict[Union[int, str], Tuple[str, str, Union[str, None], Dict[str, Any]]],
118    List[Tuple[str, str, Union[str, None]]],
119]:
120    """
121    Fetch registered Pipes' keys from the API.
122
123    Parameters
124    ----------
125    connector_keys: Optional[List[str]], default None
126        The connector keys for the query.
127
128    metric_keys: Optional[List[str]], default None
129        The metric keys for the query.
130
131    location_keys: Optional[List[str]], default None
132        The location keys for the query.
133
134    tags: Optional[List[str]], default None
135        A list of tags for the query.
136
137    params: Optional[Dict[str, Any]], default None
138        A parameters dictionary for filtering against the `pipes` table
139        (e.g. `{'connector_keys': 'plugin:foo'}`).
140        Not recommeded to be used.
141
142    debug: bool, default False
143        Verbosity toggle.
144
145    Returns
146    -------
147    A dictionary mapping pipe IDs to key tuples, or a list of key tuples for older servers.
148    """
149    from meerschaum._internal.static import STATIC_CONFIG
150    if connector_keys is None:
151        connector_keys = []
152    if metric_keys is None:
153        metric_keys = []
154    if location_keys is None:
155        location_keys = []
156    if tags is None:
157        tags = []
158
159    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
160    try:
161        j = self.get(
162            r_url,
163            params={
164                'connector_keys': json.dumps(connector_keys),
165                'metric_keys': json.dumps(metric_keys),
166                'location_keys': json.dumps(location_keys),
167                'tags': json.dumps(tags),
168                'params': json.dumps(params),
169                'instance_keys': self.instance_keys,
170                'as_dict': True,
171            },
172            debug=debug
173        ).json()
174    except Exception as e:
175        import traceback
176        traceback.print_exc()
177        error(str(e))
178
179    if 'detail' in j:
180        error(j['detail'], stack=False)
181
182    if isinstance(j, dict):
183        return {
184            (int(k) if str(k).isdigit() else k): tuple(v)
185            for k, v in j.items()
186        }
187    return [tuple(r) for r in j]

Fetch registered Pipes' keys from the API.

Parameters
  • connector_keys (Optional[List[str]], default None): The connector keys for the query.
  • metric_keys (Optional[List[str]], default None): The metric keys for the query.
  • location_keys (Optional[List[str]], default None): The location keys for the query.
  • tags (Optional[List[str]], default None): A list of tags for the query.
  • params (Optional[Dict[str, Any]], default None): A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A dictionary mapping pipe IDs to key tuples, or a list of key tuples for older servers.
def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False) -> Tuple[bool, str]:
 75def edit_pipe(
 76    self,
 77    pipe: mrsm.Pipe,
 78    patch: bool = False,
 79    debug: bool = False,
 80) -> SuccessTuple:
 81    """Submit a PATCH to the API to edit an existing Pipe object.
 82    Returns a tuple of (success_bool, response_dict).
 83    """
 84    from meerschaum.utils.debug import dprint
 85    ### NOTE: if `parameters` is supplied in the Pipe constructor,
 86    ###       then `pipe.parameters` will exist and not be fetched from the database.
 87    r_url = pipe_r_url(pipe)
 88    response = self.patch(
 89        r_url + '/edit',
 90        params={'patch': patch, 'instance_keys': self.get_pipe_instance_keys(pipe)},
 91        json=pipe.get_parameters(apply_symlinks=False),
 92        debug=debug,
 93    )
 94    if debug:
 95        dprint(response.text)
 96
 97    response_data = response.json()
 98
 99    if isinstance(response.json(), list):
100        response_tuple = response_data[0], response_data[1]
101    elif 'detail' in response.json():
102        response_tuple = response.__bool__(), response_data['detail']
103    else:
104        response_tuple = response.__bool__(), response.text
105    return response_tuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

def sync_pipe( self, pipe: meerschaum.Pipe, df: "Optional[Union['pd.DataFrame', Dict[Any, Any], str]]" = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
190def sync_pipe(
191    self,
192    pipe: mrsm.Pipe,
193    df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None,
194    chunksize: Optional[int] = -1,
195    debug: bool = False,
196    **kw: Any
197) -> SuccessTuple:
198    """Sync a DataFrame into a Pipe."""
199    from decimal import Decimal
200    from meerschaum.utils.debug import dprint
201    from meerschaum.utils.dtypes import json_serialize_value
202    from meerschaum.utils.misc import items_str, interval_str
203    from meerschaum.config import get_config
204    from meerschaum.utils.packages import attempt_import
205    from meerschaum.utils.dataframe import get_special_cols, to_json
206    begin = time.perf_counter()
207    more_itertools = attempt_import('more_itertools')
208    if df is None:
209        msg = f"DataFrame is `None`. Cannot sync {pipe}."
210        return False, msg
211
212    def get_json_str(c):
213        if isinstance(c, str):
214            return c
215        if isinstance(c, (dict, list, tuple)):
216            return json.dumps(c, default=json_serialize_value)
217        return to_json(c, orient='columns', geometry_format='wkb_hex')
218
219    df = json.loads(df) if isinstance(df, str) else df
220
221    _chunksize: Optional[int] = (1 if chunksize is None else (
222        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
223        else chunksize
224    ))
225    keys: List[str] = list(df.columns)
226    chunks = []
227    if hasattr(df, 'index'):
228        df = df.reset_index(drop=True)
229        is_dask = 'dask' in df.__module__
230        chunks = (
231            (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize))
232            if not is_dask
233            else [partition.compute() for partition in df.partitions]
234        )
235
236    elif isinstance(df, dict):
237        ### `_chunks` is a dict of lists of dicts.
238        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
239        _chunks = {k: [] for k in keys}
240        for k in keys:
241            chunk_iter = more_itertools.chunked(df[k], _chunksize)
242            for l in chunk_iter:
243                _chunks[k].append({k: l})
244
245        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
246        for k, l in _chunks.items():
247            for i, c in enumerate(l):
248                try:
249                    chunks[i].update(c)
250                except IndexError:
251                    chunks.append(c)
252    elif isinstance(df, list):
253        chunks = (df[i] for i in more_itertools.chunked(df, _chunksize))
254
255    ### Send columns in case the user has defined them locally.
256    request_params = kw.copy()
257    if pipe.columns:
258        request_params['columns'] = json.dumps(pipe.columns)
259    request_params['instance_keys'] = self.get_pipe_instance_keys(pipe)
260    r_url = pipe_r_url(pipe) + '/data'
261
262    rowcount = 0
263    num_success_chunks = 0
264    for i, c in enumerate(chunks):
265        if debug:
266            dprint(f"[{self}] Posting chunk {i} to {r_url}...")
267        if len(c) == 0:
268            if debug:
269                dprint(f"[{self}] Skipping empty chunk...")
270            continue
271        json_str = get_json_str(c)
272
273        try:
274            response = self.post(
275                r_url,
276                params=request_params,
277                data=json_str,
278                debug=debug,
279            )
280        except Exception as e:
281            msg = f"Failed to post a chunk to {pipe}:\n{e}"
282            warn(msg)
283            return False, msg
284            
285        if not response:
286            return False, f"Failed to sync a chunk:\n{response.text}"
287
288        try:
289            j = json.loads(response.text)
290        except Exception as e:
291            return False, f"Failed to parse response from syncing {pipe}:\n{e}"
292
293        if isinstance(j, dict) and 'detail' in j:
294            return False, j['detail']
295
296        try:
297            j = tuple(j)
298        except Exception:
299            return False, response.text
300
301        if debug:
302            dprint("Received response: " + str(j))
303        if not j[0]:
304            return j
305
306        rowcount += len(c)
307        num_success_chunks += 1
308
309    self.delete_pipe_cache(pipe, debug=debug)
310    success_tuple = True, (
311        f"It took {interval_str(timedelta(seconds=(time.perf_counter() - begin)))} "
312        + f"to sync {rowcount:,} row"
313        + ('s' if rowcount != 1 else '')
314        + f" across {num_success_chunks:,} chunk" + ('s' if num_success_chunks != 1 else '') +
315        f" to {pipe}."
316    )
317    return success_tuple

Sync a DataFrame into a Pipe.

def delete_pipe( self, pipe: Optional[meerschaum.Pipe] = None, debug: bool = False) -> Tuple[bool, str]:
342def delete_pipe(
343    self,
344    pipe: Optional[mrsm.Pipe] = None,
345    debug: bool = False,
346) -> SuccessTuple:
347    """Delete a Pipe and drop its table."""
348    if pipe is None:
349        error("Pipe cannot be None.")
350    r_url = pipe_r_url(pipe)
351    response = self.delete(
352        r_url + '/delete',
353        params={'instance_keys': self.get_pipe_instance_keys(pipe)},
354        debug=debug,
355    )
356    if debug:
357        dprint(response.text)
358
359    response_data = response.json()
360    if isinstance(response.json(), list):
361        response_tuple = response_data[0], response_data[1]
362    elif 'detail' in response.json():
363        response_tuple = response.__bool__(), response_data['detail']
364    else:
365        response_tuple = response.__bool__(), response.text
366    return response_tuple

Delete a Pipe and drop its table.

def delete_pipe_cache( self, pipe: meerschaum.Pipe, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
320def delete_pipe_cache(
321    self,
322    pipe: mrsm.Pipe,
323    debug: bool = False,
324    **kw: Any
325) -> SuccessTuple:
326    """Invalidate the server-side cache for a pipe."""
327    r_url = pipe_r_url(pipe)
328    response = self.delete(
329        r_url + '/cache',
330        params={'instance_keys': self.get_pipe_instance_keys(pipe)},
331        debug=debug,
332    )
333    if not response.ok:
334        return False, f"Failed to invalidate cache for {pipe}: {response.text}"
335    try:
336        data = response.json()
337        return tuple(data) if isinstance(data, list) else (response.ok, response.text)
338    except Exception:
339        return response.ok, response.text

Invalidate the server-side cache for a pipe.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.DataFrame]:
369def get_pipe_data(
370    self,
371    pipe: mrsm.Pipe,
372    select_columns: Optional[List[str]] = None,
373    omit_columns: Optional[List[str]] = None,
374    begin: Union[str, datetime, int, None] = None,
375    end: Union[str, datetime, int, None] = None,
376    params: Optional[Dict[str, Any]] = None,
377    as_chunks: bool = False,
378    debug: bool = False,
379    **kw: Any
380) -> Union[pandas.DataFrame, None]:
381    """Fetch data from the API."""
382    r_url = pipe_r_url(pipe)
383    while True:
384        try:
385            response = self.get(
386                r_url + "/data",
387                params={
388                    'select_columns': json.dumps(select_columns),
389                    'omit_columns': json.dumps(omit_columns),
390                    'begin': begin,
391                    'end': end,
392                    'params': json.dumps(params, default=str),
393                    'instance': self.get_pipe_instance_keys(pipe),
394                    'as_chunks': as_chunks,
395                },
396                debug=debug
397            )
398            if not response.ok:
399                return None
400            j = response.json()
401        except Exception as e:
402            warn(f"Failed to get data for {pipe}:\n{e}")
403            return None
404        if isinstance(j, dict) and 'detail' in j:
405            return False, j['detail']
406        break
407
408    from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df
409    from meerschaum.utils.dtypes import are_dtypes_equal
410    try:
411        df = parse_df_datetimes(
412            j,
413            ignore_cols=[
414                col
415                for col, dtype in pipe.dtypes.items()
416                if not are_dtypes_equal(str(dtype), 'datetime')
417            ],
418            strip_timezone=(pipe.tzinfo is None),
419            debug=debug,
420        )
421    except Exception as e:
422        warn(f"Failed to parse response for {pipe}:\n{e}")
423        return None
424
425    if len(df.columns) == 0:
426        return add_missing_cols_to_df(df, pipe.dtypes)
427
428    return df

Fetch data from the API.

def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False) -> Union[int, str, NoneType]:
431def get_pipe_id(
432    self,
433    pipe: mrsm.Pipe,
434    debug: bool = False,
435) -> Union[int, str, None]:
436    """Get a Pipe's ID from the API."""
437    from meerschaum.utils.misc import is_int
438    r_url = pipe_r_url(pipe)
439    response = self.get(
440        r_url + '/id',
441        params={
442            'instance': self.get_pipe_instance_keys(pipe),
443        },
444        debug=debug,
445    )
446    if debug:
447        dprint(f"Got pipe ID: {response.text}")
448    try:
449        if is_int(response.text):
450            return int(response.text)
451        if response.text and response.text[0] != '{':
452            return response.text
453    except Exception as e:
454        warn(f"Failed to get the ID for {pipe}:\n{e}")
455    return None

Get a Pipe's ID from the API.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
458def get_pipe_attributes(
459    self,
460    pipe: mrsm.Pipe,
461    debug: bool = False,
462) -> Dict[str, Any]:
463    """Get a Pipe's attributes from the API
464
465    Parameters
466    ----------
467    pipe: meerschaum.Pipe
468        The pipe whose attributes we are fetching.
469        
470    Returns
471    -------
472    A dictionary of a pipe's attributes.
473    If the pipe does not exist, return an empty dictionary.
474    """
475    r_url = pipe_r_url(pipe)
476    response = self.get(
477        r_url + '/attributes',
478        params={
479            'instance': self.get_pipe_instance_keys(pipe),
480        },
481        debug=debug
482    )
483    try:
484        return json.loads(response.text)
485    except Exception as e:
486        warn(f"Failed to get the attributes for {pipe}:\n{e}")
487    return {}

Get a Pipe's attributes from the API

Parameters
Returns
  • A dictionary of a pipe's attributes.
  • If the pipe does not exist, return an empty dictionary.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
490def get_sync_time(
491    self,
492    pipe: mrsm.Pipe,
493    params: Optional[Dict[str, Any]] = None,
494    newest: bool = True,
495    debug: bool = False,
496) -> Union[datetime, int, None]:
497    """Get a Pipe's most recent datetime value from the API.
498
499    Parameters
500    ----------
501    pipe: meerschaum.Pipe
502        The pipe to select from.
503
504    params: Optional[Dict[str, Any]], default None
505        Optional params dictionary to build the WHERE clause.
506
507    newest: bool, default True
508        If `True`, get the most recent datetime (honoring `params`).
509        If `False`, get the oldest datetime (ASC instead of DESC).
510
511    Returns
512    -------
513    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
514    rounded down to the closest minute.
515    """
516    from meerschaum.utils.misc import is_int
517    from meerschaum.utils.warnings import warn
518    r_url = pipe_r_url(pipe)
519    response = self.get(
520        r_url + '/sync_time',
521        json=params,
522        params={
523            'instance': self.get_pipe_instance_keys(pipe),
524            'newest': newest,
525            'debug': debug,
526        },
527        debug=debug,
528    )
529    if not response:
530        warn(f"Failed to get the sync time for {pipe}:\n" + response.text)
531        return None
532
533    j = response.json()
534    if j is None:
535        dt = None
536    else:
537        try:
538            dt = (
539                datetime.fromisoformat(j)
540                if not is_int(j)
541                else int(j)
542            )
543        except Exception as e:
544            warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}")
545            dt = None
546    return dt

Get a Pipe's most recent datetime value from the API.

Parameters
  • pipe (meerschaum.Pipe): The pipe to select from.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
Returns
  • The most recent (or oldest if newest is False) datetime of a pipe,
  • rounded down to the closest minute.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
549def pipe_exists(
550    self,
551    pipe: mrsm.Pipe,
552    debug: bool = False
553) -> bool:
554    """Check the API to see if a Pipe exists.
555
556    Parameters
557    ----------
558    pipe: 'meerschaum.Pipe'
559        The pipe which were are querying.
560        
561    Returns
562    -------
563    A bool indicating whether a pipe's underlying table exists.
564    """
565    from meerschaum.utils.debug import dprint
566    from meerschaum.utils.warnings import warn
567    r_url = pipe_r_url(pipe)
568    response = self.get(
569        r_url + '/exists',
570        params={
571            'instance': self.get_pipe_instance_keys(pipe),
572        },
573        debug=debug,
574    )
575    if not response:
576        warn(f"Failed to check if {pipe} exists:\n{response.text}")
577        return False
578    if debug:
579        dprint("Received response: " + str(response.text))
580    j = response.json()
581    if isinstance(j, dict) and 'detail' in j:
582        warn(j['detail'])
583    return j

Check the API to see if a Pipe exists.

Parameters
Returns
  • A bool indicating whether a pipe's underlying table exists.
def create_metadata(self, debug: bool = False) -> bool:
586def create_metadata(
587    self,
588    debug: bool = False
589) -> bool:
590    """Create metadata tables.
591
592    Returns
593    -------
594    A bool indicating success.
595    """
596    from meerschaum.utils.debug import dprint
597    from meerschaum._internal.static import STATIC_CONFIG
598    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
599    response = self.post(r_url, debug=debug)
600    if debug:
601        dprint("Create metadata response: {response.text}")
602    try:
603        _ = json.loads(response.text)
604    except Exception as e:
605        warn(f"Failed to create metadata on {self}:\n{e}")
606    return False

Create metadata tables.

Returns
  • A bool indicating success.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
609def get_pipe_rowcount(
610    self,
611    pipe: mrsm.Pipe,
612    begin: Union[str, datetime, int, None] = None,
613    end: Union[str, datetime, int, None] = None,
614    params: Optional[Dict[str, Any]] = None,
615    remote: bool = False,
616    debug: bool = False,
617) -> int:
618    """Get a pipe's row count from the API.
619
620    Parameters
621    ----------
622    pipe: 'meerschaum.Pipe':
623        The pipe whose row count we are counting.
624        
625    begin: Union[str, datetime, int, None], default None
626        If provided, bound the count by this datetime.
627
628    end: Union[str, datetime, int, None], default None
629        If provided, bound the count by this datetime.
630
631    params: Optional[Dict[str, Any]], default None
632        If provided, bound the count by these parameters.
633
634    remote: bool, default False
635        If `True`, return the rowcount for the fetch definition.
636
637    Returns
638    -------
639    The number of rows in the pipe's table, bound the given parameters.
640    If the table does not exist, return 0.
641    """
642    r_url = pipe_r_url(pipe)
643    response = self.get(
644        r_url + "/rowcount",
645        json = params,
646        params = {
647            'begin': begin,
648            'end': end,
649            'remote': remote,
650            'instance': self.get_pipe_instance_keys(pipe),
651        },
652        debug = debug
653    )
654    if not response:
655        warn(f"Failed to get the rowcount for {pipe}:\n{response.text}")
656        return 0
657    try:
658        return int(json.loads(response.text))
659    except Exception as e:
660        warn(f"Failed to get the rowcount for {pipe}:\n{e}")
661    return 0

Get a pipe's row count from the API.

Parameters
  • pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
  • begin (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
  • end (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
  • params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
  • remote (bool, default False): If True, return the rowcount for the fetch definition.
Returns
  • The number of rows in the pipe's table, bound the given parameters.
  • If the table does not exist, return 0.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
664def drop_pipe(
665    self,
666    pipe: mrsm.Pipe,
667    debug: bool = False
668) -> SuccessTuple:
669    """
670    Drop a pipe's table but maintain its registration.
671
672    Parameters
673    ----------
674    pipe: meerschaum.Pipe:
675        The pipe to be dropped.
676        
677    Returns
678    -------
679    A success tuple (bool, str).
680    """
681    from meerschaum.utils.warnings import error
682    from meerschaum.utils.debug import dprint
683    if pipe is None:
684        error("Pipe cannot be None.")
685    r_url = pipe_r_url(pipe)
686    response = self.delete(
687        r_url + '/drop',
688        params={
689            'instance': self.get_pipe_instance_keys(pipe),
690        },
691        debug=debug,
692    )
693    if debug:
694        dprint(response.text)
695
696    try:
697        data = response.json()
698    except Exception as e:
699        return False, f"Failed to drop {pipe}."
700
701    if isinstance(data, list):
702        response_tuple = data[0], data[1]
703    elif 'detail' in response.json():
704        response_tuple = response.__bool__(), data['detail']
705    else:
706        response_tuple = response.__bool__(), response.text
707
708    return response_tuple

Drop a pipe's table but maintain its registration.

Parameters
Returns
  • A success tuple (bool, str).
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
711def clear_pipe(
712    self,
713    pipe: mrsm.Pipe,
714    begin: Union[str, datetime, int, None] = None,
715    end: Union[str, datetime, int, None] = None,
716    params: Optional[Dict[str, Any]] = None,
717    debug: bool = False,
718    **kw
719) -> SuccessTuple:
720    """
721    Delete rows in a pipe's table.
722
723    Parameters
724    ----------
725    pipe: meerschaum.Pipe
726        The pipe with rows to be deleted.
727        
728    Returns
729    -------
730    A success tuple.
731    """
732    r_url = pipe_r_url(pipe)
733    response = self.delete(
734        r_url + '/clear',
735        params={
736            'begin': begin,
737            'end': end,
738            'params': json.dumps(params),
739            'instance': self.get_pipe_instance_keys(pipe),
740        },
741        debug=debug,
742    )
743    if debug:
744        dprint(response.text)
745
746    try:
747        data = response.json()
748    except Exception as e:
749        return False, f"Failed to clear {pipe} with constraints {begin=}, {end=}, {params=}."
750
751    if isinstance(data, list):
752        response_tuple = data[0], data[1]
753    elif 'detail' in response.json():
754        response_tuple = response.__bool__(), data['detail']
755    else:
756        response_tuple = response.__bool__(), response.text
757
758    return response_tuple

Delete rows in a pipe's table.

Parameters
Returns
  • A success tuple.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
761def get_pipe_columns_types(
762    self,
763    pipe: mrsm.Pipe,
764    debug: bool = False,
765) -> Union[Dict[str, str], None]:
766    """
767    Fetch the columns and types of the pipe's table.
768
769    Parameters
770    ----------
771    pipe: meerschaum.Pipe
772        The pipe whose columns to be queried.
773
774    Returns
775    -------
776    A dictionary mapping column names to their database types.
777
778    Examples
779    --------
780    >>> {
781    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
782    ...   'id': 'BIGINT',
783    ...   'val': 'DOUBLE PRECISION',
784    ... }
785    >>>
786    """
787    r_url = pipe_r_url(pipe) + '/columns/types'
788    response = self.get(
789        r_url,
790        params={
791            'instance': self.get_pipe_instance_keys(pipe),
792        },
793        debug=debug,
794    )
795    j = response.json()
796    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
797        warn(j['detail'])
798        return None
799    if not isinstance(j, dict):
800        warn(response.text)
801        return None
802    return j

Fetch the columns and types of the pipe's table.

Parameters
Returns
  • A dictionary mapping column names to their database types.
Examples
>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
def get_pipe_columns_indices( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
805def get_pipe_columns_indices(
806    self,
807    pipe: mrsm.Pipe,
808    debug: bool = False,
809) -> Union[Dict[str, str], None]:
810    """
811    Fetch the index information for a pipe.
812
813    Parameters
814    ----------
815    pipe: mrsm.Pipe
816        The pipe whose columns to be queried.
817
818    Returns
819    -------
820    A dictionary mapping column names to a list of associated index information.
821    """
822    r_url = pipe_r_url(pipe) + '/columns/indices'
823    response = self.get(
824        r_url,
825        params={
826            'instance': self.get_pipe_instance_keys(pipe),
827        },
828        debug=debug,
829    )
830    j = response.json()
831    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
832        warn(j['detail'])
833        return None
834    if not isinstance(j, dict):
835        warn(response.text)
836        return None
837    return j

Fetch the index information for a pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose columns to be queried.
Returns
  • A dictionary mapping column names to a list of associated index information.
def get_pipe_docs( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, debug: bool = False, **kw: Any) -> List[Dict[str, Any]]:
840def get_pipe_docs(
841    self,
842    pipe: mrsm.Pipe,
843    select_columns: Optional[List[str]] = None,
844    omit_columns: Optional[List[str]] = None,
845    begin: Union[str, datetime, int, None] = None,
846    end: Union[str, datetime, int, None] = None,
847    params: Optional[Dict[str, Any]] = None,
848    order: str = 'asc',
849    limit: Optional[int] = None,
850    debug: bool = False,
851    **kw: Any
852) -> List[Dict[str, Any]]:
853    """Fetch a pipe's data as a list of documents from the API."""
854    r_url = pipe_r_url(pipe)
855    try:
856        response = self.get(
857            r_url + "/docs",
858            params={
859                'select_columns': json.dumps(select_columns),
860                'omit_columns': json.dumps(omit_columns),
861                'begin': begin,
862                'end': end,
863                'params': json.dumps(params, default=str),
864                'order': order,
865                'limit': limit,
866                'instance_keys': self.get_pipe_instance_keys(pipe),
867            },
868            debug=debug,
869        )
870        if not response.ok:
871            warn(f"Failed to get docs for {pipe}:\n{response.text}")
872            return []
873        j = response.json()
874        if isinstance(j, list):
875            return j
876        return []
877    except Exception as e:
878        warn(f"Failed to get docs for {pipe}:\n{e}")
879        return []

Fetch a pipe's data as a list of documents from the API.

def get_pipe_size( self, pipe: meerschaum.Pipe, debug: bool = False, **kw: Any) -> Optional[int]:
882def get_pipe_size(
883    self,
884    pipe: mrsm.Pipe,
885    debug: bool = False,
886    **kw: Any
887) -> Union[int, None]:
888    """
889    Return the on-disk size of a pipe's target table in bytes via the API.
890
891    Parameters
892    ----------
893    pipe: mrsm.Pipe
894        The pipe whose target table size to measure.
895
896    Returns
897    -------
898    An `int` of the number of bytes occupied by the target table,
899    or `None` if the size could not be determined.
900    """
901    r_url = pipe_r_url(pipe) + '/size'
902    response = self.get(
903        r_url,
904        params={
905            'instance': self.get_pipe_instance_keys(pipe),
906        },
907        debug=debug,
908    )
909    if not response:
910        warn(f"Failed to get the size for {pipe}:\n{response.text}")
911        return None
912    try:
913        j = json.loads(response.text)
914    except Exception as e:
915        warn(f"Failed to parse the size for {pipe}:\n{e}")
916        return None
917    if j is None:
918        return None
919    if isinstance(j, dict) and 'detail' in j:
920        warn(j['detail'])
921        return None
922    try:
923        return int(j)
924    except Exception:
925        return None

Return the on-disk size of a pipe's target table in bytes via the API.

Parameters
  • pipe (mrsm.Pipe): The pipe whose target table size to measure.
Returns
  • An int of the number of bytes occupied by the target table,
  • or None if the size could not be determined.
def compress_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
928def compress_pipe(
929    self,
930    pipe: mrsm.Pipe,
931    debug: bool = False,
932    **kw: Any
933) -> SuccessTuple:
934    """
935    Compress a pipe's target table via the API.
936
937    Parameters
938    ----------
939    pipe: mrsm.Pipe
940        The pipe whose target table to compress.
941
942    Returns
943    -------
944    A `SuccessTuple` indicating success.
945    """
946    r_url = pipe_r_url(pipe) + '/compress'
947    response = self.post(
948        r_url,
949        params={
950            'instance_keys': self.get_pipe_instance_keys(pipe),
951        },
952        debug=debug,
953    )
954    if debug:
955        dprint(response.text)
956
957    try:
958        data = response.json()
959    except Exception:
960        return False, f"Failed to compress {pipe}."
961
962    if isinstance(data, list):
963        return data[0], data[1]
964    if isinstance(data, dict) and 'detail' in data:
965        return response.__bool__(), data['detail']
966    return response.__bool__(), response.text

Compress a pipe's target table via the API.

Parameters
  • pipe (mrsm.Pipe): The pipe whose target table to compress.
Returns
  • A SuccessTuple indicating success.
def decompress_pipe( self, pipe: meerschaum.Pipe, no_policy: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 969def decompress_pipe(
 970    self,
 971    pipe: mrsm.Pipe,
 972    no_policy: bool = False,
 973    debug: bool = False,
 974    **kw: Any
 975) -> SuccessTuple:
 976    """
 977    Decompress a pipe's target table via the API, the inverse of `compress_pipe()`.
 978
 979    Parameters
 980    ----------
 981    pipe: mrsm.Pipe
 982        The pipe whose target table to decompress.
 983
 984    no_policy: bool, default False
 985        If `True`, decompress existing data now but leave the compression policy in place.
 986
 987    Returns
 988    -------
 989    A `SuccessTuple` indicating success.
 990    """
 991    r_url = pipe_r_url(pipe) + '/decompress'
 992    response = self.post(
 993        r_url,
 994        params={
 995            'instance_keys': self.get_pipe_instance_keys(pipe),
 996            'no_policy': no_policy,
 997        },
 998        debug=debug,
 999    )
1000    if debug:
1001        dprint(response.text)
1002
1003    try:
1004        data = response.json()
1005    except Exception:
1006        return False, f"Failed to decompress {pipe}."
1007
1008    if isinstance(data, list):
1009        return data[0], data[1]
1010    if isinstance(data, dict) and 'detail' in data:
1011        return response.__bool__(), data['detail']
1012    return response.__bool__(), response.text

Decompress a pipe's target table via the API, the inverse of compress_pipe().

Parameters
  • pipe (mrsm.Pipe): The pipe whose target table to decompress.
  • no_policy (bool, default False): If True, decompress existing data now but leave the compression policy in place.
Returns
  • A SuccessTuple indicating success.
def vacuum_pipe( self, pipe: meerschaum.Pipe, full: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
1015def vacuum_pipe(
1016    self,
1017    pipe: mrsm.Pipe,
1018    full: bool = False,
1019    debug: bool = False,
1020    **kw: Any
1021) -> SuccessTuple:
1022    """
1023    Vacuum a pipe's target table via the API.
1024
1025    Parameters
1026    ----------
1027    pipe: mrsm.Pipe
1028        The pipe whose target table to vacuum.
1029
1030    full: bool, default False
1031        If `True`, run `VACUUM FULL` (PostgreSQL family only).
1032
1033    Returns
1034    -------
1035    A `SuccessTuple` indicating success.
1036    """
1037    r_url = pipe_r_url(pipe) + '/vacuum'
1038    response = self.post(
1039        r_url,
1040        params={
1041            'instance_keys': self.get_pipe_instance_keys(pipe),
1042            'full': full,
1043        },
1044        debug=debug,
1045    )
1046    if debug:
1047        dprint(response.text)
1048
1049    try:
1050        data = response.json()
1051    except Exception:
1052        return False, f"Failed to vacuum {pipe}."
1053
1054    if isinstance(data, list):
1055        return data[0], data[1]
1056    if isinstance(data, dict) and 'detail' in data:
1057        return response.__bool__(), data['detail']
1058    return response.__bool__(), response.text

Vacuum a pipe's target table via the API.

Parameters
  • pipe (mrsm.Pipe): The pipe whose target table to vacuum.
  • full (bool, default False): If True, run VACUUM FULL (PostgreSQL family only).
Returns
  • A SuccessTuple indicating success.
def analyze_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
1107def analyze_pipe(
1108    self,
1109    pipe: mrsm.Pipe,
1110    debug: bool = False,
1111    **kw: Any
1112) -> SuccessTuple:
1113    """
1114    Analyze a pipe's target table via the API.
1115
1116    Parameters
1117    ----------
1118    pipe: mrsm.Pipe
1119        The pipe whose target table to analyze.
1120
1121    Returns
1122    -------
1123    A `SuccessTuple` indicating success.
1124    """
1125    r_url = pipe_r_url(pipe) + '/analyze'
1126    response = self.post(
1127        r_url,
1128        params={
1129            'instance_keys': self.get_pipe_instance_keys(pipe),
1130        },
1131        debug=debug,
1132    )
1133    if debug:
1134        dprint(response.text)
1135
1136    try:
1137        data = response.json()
1138    except Exception:
1139        return False, f"Failed to analyze {pipe}."
1140
1141    if isinstance(data, list):
1142        return data[0], data[1]
1143    if isinstance(data, dict) and 'detail' in data:
1144        return response.__bool__(), data['detail']
1145    return response.__bool__(), response.text

Analyze a pipe's target table via the API.

Parameters
  • pipe (mrsm.Pipe): The pipe whose target table to analyze.
Returns
  • A SuccessTuple indicating success.
def partition_pipe( self, pipe: meerschaum.Pipe, chunk_minutes: Optional[int] = None, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
1061def partition_pipe(
1062    self,
1063    pipe: mrsm.Pipe,
1064    chunk_minutes: Optional[int] = None,
1065    debug: bool = False,
1066    **kw: Any
1067) -> SuccessTuple:
1068    """
1069    Repartition a pipe's target table to a new chunk width via the API.
1070
1071    Parameters
1072    ----------
1073    pipe: mrsm.Pipe
1074        The partitioned pipe whose target table to repartition.
1075
1076    chunk_minutes: Optional[int], default None
1077        The new partition width in minutes. Defaults to the pipe's `verify.chunk_minutes`.
1078
1079    Returns
1080    -------
1081    A `SuccessTuple` indicating success.
1082    """
1083    r_url = pipe_r_url(pipe) + '/partition'
1084    response = self.post(
1085        r_url,
1086        params={
1087            'instance_keys': self.get_pipe_instance_keys(pipe),
1088            **({'chunk_minutes': chunk_minutes} if chunk_minutes is not None else {}),
1089        },
1090        debug=debug,
1091    )
1092    if debug:
1093        dprint(response.text)
1094
1095    try:
1096        data = response.json()
1097    except Exception:
1098        return False, f"Failed to repartition {pipe}."
1099
1100    if isinstance(data, list):
1101        return data[0], data[1]
1102    if isinstance(data, dict) and 'detail' in data:
1103        return response.__bool__(), data['detail']
1104    return response.__bool__(), response.text

Repartition a pipe's target table to a new chunk width via the API.

Parameters
  • pipe (mrsm.Pipe): The partitioned pipe whose target table to repartition.
  • chunk_minutes (Optional[int], default None): The new partition width in minutes. Defaults to the pipe's verify.chunk_minutes.
Returns
  • A SuccessTuple indicating success.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, str, int] = '', end: Union[datetime.datetime, int] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any) -> "Iterator['pd.DataFrame']":
16def fetch(
17        self,
18        pipe: mrsm.Pipe,
19        begin: Union[datetime, str, int] = '',
20        end: Union[datetime, int] = None,
21        params: Optional[Dict[str, Any]] = None,
22        debug: bool = False,
23        **kw: Any
24    ) -> Iterator['pd.DataFrame']:
25    """Get the Pipe data from the remote Pipe."""
26    from meerschaum.utils.debug import dprint
27    from meerschaum.utils.warnings import warn, error
28    from meerschaum.config._patch import apply_patch_to_config
29
30    fetch_params = pipe.parameters.get('fetch', {})
31    if not fetch_params:
32        warn(f"Missing 'fetch' parameters for {pipe}.", stack=False)
33        return None
34
35    pipe_meta = fetch_params.get('pipe', {})
36    ### Legacy: check for `connector_keys`, etc. at the root.
37    if not pipe_meta:
38        ck, mk, lk = (
39            fetch_params.get('connector_keys', None),
40            fetch_params.get('metric_key', None),
41            fetch_params.get('location_key', None),
42        )
43        if not ck or not mk:
44            warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False)
45            return None
46
47        pipe_meta.update({
48            'connector': ck,
49            'metric': mk,
50            'location': lk,
51        })
52
53    pipe_meta['instance'] = self
54    source_pipe = mrsm.Pipe(**pipe_meta)
55
56    _params = copy.deepcopy(params) if params is not None else {}
57    _params = apply_patch_to_config(_params, fetch_params.get('params', {}))
58    select_columns = fetch_params.get('select_columns', [])
59    omit_columns = fetch_params.get('omit_columns', [])
60
61    return source_pipe.get_data(
62        select_columns = select_columns,
63        omit_columns = omit_columns,
64        begin = begin,
65        end = end,
66        params = _params,
67        debug = debug,
68        as_iterator = True,
69    )

Get the Pipe data from the remote Pipe.

def register_plugin( self, plugin: meerschaum.Plugin, make_archive: bool = True, debug: bool = False) -> Tuple[bool, str]:
24def register_plugin(
25    self,
26    plugin: mrsm.core.Plugin,
27    make_archive: bool = True,
28    debug: bool = False,
29) -> SuccessTuple:
30    """Register a plugin and upload its archive."""
31    import json
32    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
33    file_pointer = open(archive_path, 'rb')
34    files = {'archive': file_pointer}
35    metadata = {
36        'version': plugin.version,
37        'attributes': json.dumps(plugin.attributes),
38    }
39    r_url = plugin_r_url(plugin)
40    try:
41        response = self.post(r_url, files=files, params=metadata, debug=debug)
42    except Exception:
43        return False, f"Failed to register plugin '{plugin}'."
44    finally:
45        file_pointer.close()
46
47    try:
48        success, msg = json.loads(response.text)
49    except Exception:
50        return False, response.text
51
52    return success, msg

Register a plugin and upload its archive.

def install_plugin( self, name: str, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
55def install_plugin(
56    self,
57    name: str,
58    skip_deps: bool = False,
59    force: bool = False,
60    debug: bool = False
61) -> SuccessTuple:
62    """Download and attempt to install a plugin from the API."""
63    import os
64    import pathlib
65    import json
66    from meerschaum.core import Plugin
67    import meerschaum.config.paths as paths
68    from meerschaum.utils.debug import dprint
69    from meerschaum.utils.packages import attempt_import
70    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
71    r_url = plugin_r_url(name)
72    dest = pathlib.Path(os.path.join(paths.PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
73    if debug:
74        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
75    archive_path = self.wget(r_url, dest, debug=debug) 
76    is_binary = binaryornot_check.is_binary(str(archive_path))
77    if not is_binary:
78        fail_msg = f"Failed to download binary for plugin '{name}'."
79        try:
80            with open(archive_path, 'r') as f:
81                j = json.load(f)
82            if isinstance(j, list):
83                success, msg = tuple(j)
84            elif isinstance(j, dict) and 'detail' in j:
85                success, msg = False, fail_msg
86        except Exception:
87            success, msg = False, fail_msg
88        return success, msg
89    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
90    return plugin.install(skip_deps=skip_deps, force=force, debug=debug)

Download and attempt to install a plugin from the API.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False) -> Tuple[bool, str]:
156def delete_plugin(
157    self,
158    plugin: mrsm.core.Plugin,
159    debug: bool = False
160) -> SuccessTuple:
161    """Delete a plugin from an API repository."""
162    import json
163    r_url = plugin_r_url(plugin)
164    try:
165        response = self.delete(r_url, debug=debug)
166    except Exception:
167        return False, f"Failed to delete plugin '{plugin}'."
168
169    try:
170        success, msg = json.loads(response.text)
171    except Exception:
172        return False, response.text
173
174    return success, msg

Delete a plugin from an API repository.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) -> List[str]:
 93def get_plugins(
 94    self,
 95    user_id: Optional[int] = None,
 96    search_term: Optional[str] = None,
 97    debug: bool = False
 98) -> List[str]:
 99    """Return a list of registered plugin names.
100
101    Parameters
102    ----------
103    user_id: Optional[int], default None
104        If specified, return all plugins from a certain user.
105
106    search_term: Optional[str], default None
107        If specified, return plugins beginning with this string.
108
109    Returns
110    -------
111    A list of plugin names.
112    """
113    import json
114    from meerschaum.utils.warnings import error
115    from meerschaum._internal.static import STATIC_CONFIG
116    response = self.get(
117        STATIC_CONFIG['api']['endpoints']['plugins'],
118        params = {'user_id': user_id, 'search_term': search_term},
119        use_token = True,
120        debug = debug
121    )
122    if not response:
123        return []
124    plugins = json.loads(response.text)
125    if not isinstance(plugins, list):
126        error(response.text)
127    return plugins

Return a list of registered plugin names.

Parameters
  • user_id (Optional[int], default None): If specified, return all plugins from a certain user.
  • search_term (Optional[str], default None): If specified, return plugins beginning with this string.
Returns
  • A list of plugin names.
def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
130def get_plugin_attributes(
131    self,
132    plugin: mrsm.core.Plugin,
133    debug: bool = False
134) -> Dict[str, Any]:
135    """
136    Return a plugin's attributes.
137    """
138    import json
139    from meerschaum.utils.warnings import warn, error
140    r_url = plugin_r_url(plugin) + '/attributes'
141    response = self.get(r_url, use_token=True, debug=debug)
142    attributes = response.json()
143    if isinstance(attributes, str) and attributes and attributes[0] == '{':
144        try:
145            attributes = json.loads(attributes)
146        except Exception:
147            pass
148    if not isinstance(attributes, dict):
149        error(response.text)
150    elif not response and 'detail' in attributes:
151        warn(attributes['detail'])
152        return {}
153    return attributes

Return a plugin's attributes.

def login( self, debug: bool = False, warn: bool = True, **kw: Any) -> Tuple[bool, str]:
19def login(
20    self,
21    debug: bool = False,
22    warn: bool = True,
23    **kw: Any
24) -> SuccessTuple:
25    """Log in and set the session token."""
26    if self.login_scheme == 'api_key':
27        validate_response = self.post(
28            STATIC_CONFIG['api']['endpoints']['tokens'] + '/validate',
29            headers={'Authorization': f'Bearer {self.api_key}'},
30            use_token=False,
31            debug=debug,
32        )
33        if not validate_response:
34            return False, "API key is not valid."
35        return True, "API key is valid."
36
37    try:
38        if self.login_scheme == 'password':
39            login_data = {
40                'username': self.username,
41                'password': self.password,
42            }
43        elif self.login_scheme == 'client_credentials':
44            login_data = {
45                'client_id': self.client_id,
46                'client_secret': self.client_secret,
47            }
48    except AttributeError:
49        login_data = {}
50
51    if not login_data:
52        return False, f"Please login with the command `login {self}`."
53
54    login_scheme_msg = (
55        f" as user '{login_data['username']}'"
56        if self.login_scheme == 'username'
57        else ''
58    )
59
60    response = self.post(
61        STATIC_CONFIG['api']['endpoints']['login'],
62        data=login_data,
63        use_token=False,
64        debug=debug,
65    )
66    if response:
67        msg = f"Successfully logged into '{self}'{login_scheme_msg}'."
68        self._token = json.loads(response.text)['access_token']
69        self._expires = datetime.datetime.strptime(
70            json.loads(response.text)['expires'], 
71            '%Y-%m-%dT%H:%M:%S.%f'
72        )
73    else:
74        msg = (
75            f"Failed to log into '{self}'{login_scheme_msg}.\n" +
76            f"    Please verify login details for connector '{self}'."
77        )
78        if warn and not self.__dict__.get('_emitted_warning', False):
79            _warn(msg, stack=False)
80            self._emitted_warning = True
81
82    return response.__bool__(), msg

Log in and set the session token.

def test_connection(self, **kw: Any) -> Optional[bool]:
 85def test_connection(
 86    self,
 87    **kw: Any
 88) -> Union[bool, None]:
 89    """Test if a successful connection to the API may be made."""
 90    from meerschaum.connectors.poll import retry_connect
 91    _default_kw = {
 92        'max_retries': 1, 'retry_wait': 0, 'warn': False,
 93        'connector': self, 'enforce_chaining': False,
 94        'enforce_login': False,
 95    }
 96    _default_kw.update(kw)
 97    try:
 98        return retry_connect(**_default_kw)
 99    except Exception:
100        return False

Test if a successful connection to the API may be made.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
70def register_user(
71    self,
72    user: mrsm.core.User,
73    debug: bool = False,
74    **kw: Any
75) -> SuccessTuple:
76    """Register a new user."""
77    from meerschaum._internal.static import STATIC_CONFIG
78    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register"
79    data = {
80        'username': user.username,
81        'password': user.password,
82        'attributes': json.dumps(user.attributes),
83    }
84    if user.type:
85        data['type'] = user.type
86    if user.email:
87        data['email'] = user.email
88    response = self.post(r_url, data=data, debug=debug)
89    try:
90        _json = json.loads(response.text)
91        if isinstance(_json, dict) and 'detail' in _json:
92            return False, _json['detail']
93        success_tuple = tuple(_json)
94    except Exception:
95        msg = response.text if response else f"Failed to register user '{user}'."
96        return False, msg
97
98    return tuple(success_tuple)

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Union[int, str, uuid.UUID, NoneType]:
101def get_user_id(
102    self,
103    user: mrsm.core.User,
104    debug: bool = False,
105    **kw: Any
106) -> Union[int, str, UUID, None]:
107    """Get a user's ID."""
108    from meerschaum._internal.static import STATIC_CONFIG
109    from meerschaum.utils.misc import is_int, is_uuid
110    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id"
111    response = self.get(r_url, debug=debug, **kw)
112    try:
113        id_text = str(json.loads(response.text))
114        if is_int(id_text):
115            user_id = int(id_text)
116        elif is_uuid(id_text):
117            user_id = UUID(id_text)
118        else:
119            user_id = id_text
120    except Exception as e:
121        user_id = None
122    return user_id

Get a user's ID.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
19def get_users(
20    self,
21    debug: bool = False,
22    **kw: Any
23) -> List[str]:
24    """
25    Return a list of registered usernames.
26    """
27    from meerschaum._internal.static import STATIC_CONFIG
28    response = self.get(
29        f"{STATIC_CONFIG['api']['endpoints']['users']}",
30        debug = debug,
31        use_token = True,
32    )
33    if not response:
34        return []
35    try:
36        return response.json()
37    except Exception as e:
38        return []

Return a list of registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
41def edit_user(
42    self,
43    user: mrsm.core.User,
44    debug: bool = False,
45    **kw: Any
46) -> SuccessTuple:
47    """Edit an existing user."""
48    from meerschaum._internal.static import STATIC_CONFIG
49    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
50    data = {
51        'username': user.username,
52        'password': user.password,
53        'type': user.type,
54        'email': user.email,
55        'attributes': json.dumps(user.attributes),
56    }
57    response = self.post(r_url, data=data, debug=debug)
58    try:
59        _json = json.loads(response.text)
60        if isinstance(_json, dict) and 'detail' in _json:
61            return False, _json['detail']
62        success_tuple = tuple(_json)
63    except Exception:
64        msg = response.text if response else f"Failed to edit user '{user}'."
65        return False, msg
66
67    return tuple(success_tuple)

Edit an existing user.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
125def delete_user(
126    self,
127    user: mrsm.core.User,
128    debug: bool = False,
129    **kw: Any
130) -> SuccessTuple:
131    """Delete a user."""
132    from meerschaum._internal.static import STATIC_CONFIG
133    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}"
134    response = self.delete(r_url, debug=debug)
135    try:
136        _json = json.loads(response.text)
137        if isinstance(_json, dict) and 'detail' in _json:
138            return False, _json['detail']
139        success_tuple = tuple(_json)
140    except Exception:
141        success_tuple = False, f"Failed to delete user '{user.username}'."
142    return success_tuple

Delete a user.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
166def get_user_password_hash(
167    self,
168    user: mrsm.core.User,
169    debug: bool = False,
170    **kw: Any
171) -> Optional[str]:
172    """If configured, get a user's password hash."""
173    from meerschaum._internal.static import STATIC_CONFIG
174    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
175    response = self.get(r_url, debug=debug, **kw)
176    if not response:
177        return None
178    return response.json()

If configured, get a user's password hash.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
181def get_user_type(
182    self,
183    user: mrsm.core.User,
184    debug: bool = False,
185    **kw: Any
186) -> Optional[str]:
187    """If configured, get a user's type."""
188    from meerschaum._internal.static import STATIC_CONFIG
189    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type'
190    response = self.get(r_url, debug=debug, **kw)
191    if not response:
192        return None
193    return response.json()

If configured, get a user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw) -> int:
145def get_user_attributes(
146    self,
147    user: mrsm.core.User,
148    debug: bool = False,
149    **kw
150) -> int:
151    """Get a user's attributes."""
152    from meerschaum._internal.static import STATIC_CONFIG
153    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes"
154    response = self.get(r_url, debug=debug, **kw)
155    try:
156        attributes = json.loads(response.text)
157    except Exception:
158        attributes = None
159    return attributes

Get a user's attributes.

def register_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
20def register_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
21    """
22    Register the provided token to the API.
23    """
24    from meerschaum.utils.dtypes import json_serialize_value
25    r_url = tokens_endpoint + '/register'
26    response = self.post(
27        r_url,
28        data=json.dumps({
29            'label': token.label,
30            'scopes': token.scopes,
31            'expiration': token.expiration,
32        }, default=json_serialize_value),
33        debug=debug,
34    )
35    if not response:
36        return False, f"Failed to register token:\n{response.text}"
37
38    data = response.json()
39    token.label = data['label']
40    token.secret = data['secret']
41    token.id = uuid.UUID(data['id'])
42    if data.get('expiration', None):
43        token.expiration = datetime.fromisoformat(data['expiration'])
44
45    return True, f"Registered token '{token.label}'."

Register the provided token to the API.

def get_token_model( self, token_id: uuid.UUID, debug: bool = False) -> 'Union[TokenModel, None]':
48def get_token_model(self, token_id: uuid.UUID, debug: bool = False) -> 'Union[TokenModel, None]':
49    """
50    Return a token's model from the API instance.
51    """
52    from meerschaum.models import TokenModel
53    r_url = tokens_endpoint + f'/{token_id}'
54    response = self.get(r_url, debug=debug)
55    if not response:
56        return None
57    data = response.json()
58    return TokenModel(**data)

Return a token's model from the API instance.

def get_tokens( self, labels: Optional[List[str]] = None, debug: bool = False) -> List[meerschaum.core.Token._Token.Token]:
61def get_tokens(self, labels: Optional[List[str]] = None, debug: bool = False) -> List[Token]:
62    """
63    Return the tokens registered to the current user.
64    """
65    from meerschaum.utils.warnings import warn
66    r_url = tokens_endpoint
67    params = {}
68    if labels:
69        params['labels'] = ','.join(labels)
70    response = self.get(r_url, params={'labels': labels}, debug=debug)
71    if not response:
72        warn(f"Could not get tokens from '{self}':\n{response.text}")
73        return []
74
75    tokens = [
76        Token(instance=self, **payload)
77        for payload in response.json()
78    ]
79    return tokens

Return the tokens registered to the current user.

def edit_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
 82def edit_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
 83    """
 84    Persist the token's in-memory state to the API.
 85    """
 86    r_url = tokens_endpoint + f"/{token.id}/edit"
 87    response = self.post(
 88        r_url,
 89        json={
 90            'creation': token.creation.isoformat() if token.creation else None,
 91            'expiration': token.expiration.isoformat() if token.expiration else None,
 92            'label': token.label,
 93            'is_valid': token.is_valid,
 94            'scopes': token.scopes,
 95        },
 96    )
 97    if not response:
 98        return False, f"Failed to edit token:\n{response.text}"
 99
100    success, msg = response.json()
101    return success, msg

Persist the token's in-memory state to the API.

def invalidate_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
104def invalidate_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
105    """
106    Invalidate the token, disabling it for future requests.
107    """
108    r_url = tokens_endpoint + f"/{token.id}/invalidate"
109    response = self.post(r_url)
110    if not response:
111        return False, f"Failed to invalidate token:\n{response.text}"
112
113    success, msg = response.json()
114    return success, msg

Invalidate the token, disabling it for future requests.

def get_token_scopes( self, token_id: Union[uuid.UUID, meerschaum.core.Token._Token.Token], debug: bool = False) -> List[str]:
117def get_token_scopes(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> List[str]:
118    """
119    Return the scopes for a token.
120    """
121    _token_id = (token_id.id if isinstance(token_id, Token) else token_id)
122    model = self.get_token_model(_token_id, debug=debug).scopes
123    return getattr(model, 'scopes', [])

Return the scopes for a token.

def token_exists( self, token_id: Union[uuid.UUID, meerschaum.core.Token._Token.Token], debug: bool = False) -> bool:
126def token_exists(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> bool:
127    """
128    Return `True` if a token exists.
129    """
130    _token_id = (token_id.id if isinstance(token_id, Token) else token_id)
131    model = self.get_token_model(_token_id, debug=debug)
132    if model is None:
133        return False
134    return model.creation is not None

Return True if a token exists.

def delete_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
137def delete_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
138    """
139    Delete the token from the API.
140    """
141    r_url = tokens_endpoint + f"/{token.id}"
142    response = self.delete(r_url, debug=debug)
143    if not response:
144        return False, f"Failed to delete token:\n{response.text}"
145    
146    success, msg = response.json()
147    return success, msg

Delete the token from the API.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[APIConnector, Dict[str, Union[str, int]]]:
13@classmethod
14def from_uri(
15    cls,
16    uri: str,
17    label: Optional[str] = None,
18    as_dict: bool = False,
19) -> Union[
20        'meerschaum.connectors.APIConnector',
21        Dict[str, Union[str, int]],
22    ]:
23    """
24    Create a new APIConnector from a URI string.
25
26    Parameters
27    ----------
28    uri: str
29        The URI connection string.
30
31    label: Optional[str], default None
32        If provided, use this as the connector label.
33        Otherwise use the determined database name.
34
35    as_dict: bool, default False
36        If `True`, return a dictionary of the keyword arguments
37        necessary to create a new `APIConnector`, otherwise create a new object.
38
39    Returns
40    -------
41    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
42    """
43    from meerschaum.connectors.sql import SQLConnector
44    params = SQLConnector.parse_uri(uri)
45    if 'host' not in params:
46        error("No host was found in the provided URI.")
47    params['protocol'] = params.pop('flavor')
48    params['label'] = label or (
49        (
50            (params['username'] + '@' if 'username' in params else '')
51            + params['host']
52        ).lower()
53    )
54
55    return cls(**params) if not as_dict else params

Create a new APIConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.
Returns
  • A new APIConnector object or a dictionary of attributes (if as_dict is True).
def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
29    """
30    Return a dictionary of remote jobs.
31    """
32    response = self.get(JOBS_ENDPOINT, debug=debug)
33    if not response:
34        warn(f"Failed to get remote jobs from {self}.")
35        return {}
36    return {
37        name: Job(
38            name,
39            job_meta['sysargs'],
40            executor_keys=str(self),
41            _properties=job_meta['daemon']['properties']
42        )
43        for name, job_meta in response.json().items()
44    }

Return a dictionary of remote jobs.

def get_job(self, name: str, debug: bool = False) -> meerschaum.Job:
47def get_job(self, name: str, debug: bool = False) -> Job:
48    """
49    Return a single Job object.
50    """
51    metadata = self.get_job_metadata(name, debug=debug)
52    if not metadata:
53        raise ValueError(f"Job '{name}' does not exist.")
54
55    return Job(
56        name,
57        metadata['sysargs'],
58        executor_keys=str(self),
59        _properties=metadata['daemon']['properties'],
60    )

Return a single Job object.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 64    """
 65    Return the metadata for a single job.
 66    """
 67    now = time.perf_counter()
 68    _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None)
 69    _job_metadata_timestamp = (
 70        _job_metadata_cache.get(name, {}).get('timestamp', None)
 71    ) if _job_metadata_cache is not None else None
 72
 73    if (
 74        _job_metadata_timestamp is not None
 75        and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS
 76    ):
 77        if debug:
 78            dprint(f"Returning cached metadata for job '{name}'.")
 79        return _job_metadata_cache[name]['metadata']
 80
 81    response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug)
 82    if not response:
 83        if debug:
 84            msg = (
 85                response.json()['detail']
 86                if 'detail' in response.text
 87                else response.text
 88            )
 89            warn(f"Failed to get metadata for job '{name}':\n{msg}")
 90        return {}
 91
 92    metadata = response.json()
 93    if _job_metadata_cache is None:
 94        self._job_metadata_cache = {}
 95
 96    self._job_metadata_cache[name] = {
 97        'timestamp': now,
 98        'metadata': metadata,
 99    }
100    return metadata

Return the metadata for a single job.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
103    """
104    Return the daemon properties for a single job.
105    """
106    metadata = self.get_job_metadata(name, debug=debug)
107    return metadata.get('daemon', {}).get('properties', {})

Return the daemon properties for a single job.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
149def get_job_exists(self, name: str, debug: bool = False) -> bool:
150    """
151    Return whether a job exists.
152    """
153    response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug)
154    if not response:
155        warn(f"Failed to determine whether job '{name}' exists.")
156        return False
157
158    return response.json()

Return whether a job exists.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
162    """
163    Delete a job.
164    """
165    response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug)
166    if not response:
167        if 'detail' in response.text:
168            return False, response.json()['detail']
169
170        return False, response.text
171
172    return tuple(response.json())

Delete a job.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
176    """
177    Start a job.
178    """
179    response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug)
180    if not response:
181        if 'detail' in response.text:
182            return False, response.json()['detail']
183        return False, response.text
184
185    return tuple(response.json())

Start a job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, str]] = None, debug: bool = False) -> Tuple[bool, str]:
188def create_job(
189    self,
190    name: str,
191    sysargs: List[str],
192    properties: Optional[Dict[str, str]] = None,
193    debug: bool = False,
194) -> SuccessTuple:
195    """
196    Create a job.
197    """
198    response = self.post(
199        JOBS_ENDPOINT + f"/{name}",
200        json={
201            'sysargs': sysargs,
202            'properties': properties,
203        },
204        debug=debug,
205    )
206    if not response:
207        if 'detail' in response.text:
208            return False, response.json()['detail']
209        return False, response.text
210
211    return tuple(response.json())

Create a job.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
215    """
216    Stop a job.
217    """
218    response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug)
219    if not response:
220        if 'detail' in response.text:
221            return False, response.json()['detail']
222        return False, response.text
223
224    return tuple(response.json())

Stop a job.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
228    """
229    Pause a job.
230    """
231    response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug)
232    if not response:
233        if 'detail' in response.text:
234            return False, response.json()['detail']
235        return False, response.text
236
237    return tuple(response.json())

Pause a job.

def get_logs(self, name: str, debug: bool = False) -> str:
240def get_logs(self, name: str, debug: bool = False) -> str:
241    """
242    Return the logs for a job.
243    """
244    response = self.get(LOGS_ENDPOINT + f"/{name}")
245    if not response:
246        raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}")
247
248    return response.json()

Return the logs for a job.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
252    """
253    Return the job's manual stop time.
254    """
255    response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time")
256    if not response:
257        warn(f"Failed to get stop time for job '{name}':\n{response.text}")
258        return None
259
260    data = response.json()
261    if data is None:
262        return None
263
264    return datetime.fromisoformat(data)

Return the job's manual stop time.

def monitor_logs( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[NoneType], str], stop_callback_function: Callable[[NoneType], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
348def monitor_logs(
349    self,
350    name: str,
351    callback_function: Callable[[Any], Any],
352    input_callback_function: Callable[[None], str],
353    stop_callback_function: Callable[[None], str],
354    stop_on_exit: bool = False,
355    strip_timestamps: bool = False,
356    accept_input: bool = True,
357    debug: bool = False,
358):
359    """
360    Monitor a job's log files and execute a callback with the changes.
361    """
362    return asyncio.run(
363        self.monitor_logs_async(
364            name,
365            callback_function,
366            input_callback_function=input_callback_function,
367            stop_callback_function=stop_callback_function,
368            stop_on_exit=stop_on_exit,
369            strip_timestamps=strip_timestamps,
370            accept_input=accept_input,
371            debug=debug
372        )
373    )

Monitor a job's log files and execute a callback with the changes.

async def monitor_logs_async( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[], str], stop_callback_function: Callable[[Tuple[bool, str]], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
267async def monitor_logs_async(
268    self,
269    name: str,
270    callback_function: Callable[[Any], Any],
271    input_callback_function: Callable[[], str],
272    stop_callback_function: Callable[[SuccessTuple], str],
273    stop_on_exit: bool = False,
274    strip_timestamps: bool = False,
275    accept_input: bool = True,
276    debug: bool = False,
277):
278    """
279    Monitor a job's log files and await a callback with the changes.
280    """
281    import traceback
282    from meerschaum.jobs import StopMonitoringLogs
283    from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
284
285    websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions')
286    protocol = 'ws' if self.URI.startswith('http://') else 'wss'
287    port = self.port if 'port' in self.__dict__ else ''
288    uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws"
289
290    async def _stdin_callback(client):
291        if input_callback_function is None:
292            return
293
294        if asyncio.iscoroutinefunction(input_callback_function):
295            data = await input_callback_function()
296        else:
297            data = input_callback_function()
298
299        await client.send(data)
300
301    async def _stop_callback(client):
302        try:
303            result = tuple(json.loads(await client.recv()))
304        except Exception as e:
305            warn(traceback.format_exc())
306            result = False, str(e)
307
308        if stop_callback_function is not None:
309            if asyncio.iscoroutinefunction(stop_callback_function):
310                await stop_callback_function(result)
311            else:
312                stop_callback_function(result)
313
314        if stop_on_exit:
315            raise StopMonitoringLogs
316
317    message_callbacks = {
318        JOBS_STDIN_MESSAGE: _stdin_callback,
319        JOBS_STOP_MESSAGE: _stop_callback,
320    }
321
322    async with websockets.connect(uri) as websocket:
323        try:
324            await websocket.send(self.token or 'no-login')
325        except websockets_exceptions.ConnectionClosedOK:
326            pass
327
328        while True:
329            try:
330                response = await websocket.recv()
331                callback = message_callbacks.get(response, None)
332                if callback is not None:
333                    await callback(websocket)
334                    continue
335
336                if strip_timestamps:
337                    response = strip_timestamp_from_line(response)
338
339                if asyncio.iscoroutinefunction(callback_function):
340                    await callback_function(response)
341                else:
342                    callback_function(response)
343            except (KeyboardInterrupt, StopMonitoringLogs):
344                await websocket.close()
345                break

Monitor a job's log files and await a callback with the changes.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
376    """
377    Return whether a remote job is blocking on stdin.
378    """
379    response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug)
380    if not response:
381        return False
382
383    return response.json()

Return whether a remote job is blocking on stdin.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
117    """
118    Return a job's `began` timestamp, if it exists.
119    """
120    properties = self.get_job_properties(name, debug=debug)
121    began_str = properties.get('daemon', {}).get('began', None)
122    if began_str is None:
123        return None
124
125    return began_str

Return a job's began timestamp, if it exists.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
128    """
129    Return a job's `ended` timestamp, if it exists.
130    """
131    properties = self.get_job_properties(name, debug=debug)
132    ended_str = properties.get('daemon', {}).get('ended', None)
133    if ended_str is None:
134        return None
135
136    return ended_str

Return a job's ended timestamp, if it exists.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
139    """
140    Return a job's `paused` timestamp, if it exists.
141    """
142    properties = self.get_job_properties(name, debug=debug)
143    paused_str = properties.get('daemon', {}).get('paused', None)
144    if paused_str is None:
145        return None
146
147    return paused_str

Return a job's paused timestamp, if it exists.

def get_job_status(self, name: str, debug: bool = False) -> str:
109def get_job_status(self, name: str, debug: bool = False) -> str:
110    """
111    Return the job's status.
112    """
113    metadata = self.get_job_metadata(name, debug=debug)
114    return metadata.get('status', 'stopped')

Return the job's status.