meerschaum.connectors.api

Interact with the Meerschaum API (send commands, pull data, etc.)

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Interact with the Meerschaum API (send commands, pull data, etc.)
 7"""
 8
 9from meerschaum.connectors.api._APIConnector import APIConnector
10
11__all__ = ('APIConnector',)
class APIConnector(meerschaum.connectors.instance._InstanceConnector.InstanceConnector):
 22class APIConnector(InstanceConnector):
 23    """
 24    Connect to a Meerschaum API instance.
 25    """
 26
 27    IS_THREAD_SAFE: bool = False
 28    OPTIONAL_ATTRIBUTES: List[str] = ['port', 'client_secret', 'client_id', 'api_key']
 29
 30    from ._request import (
 31        make_request,
 32        get,
 33        post,
 34        put,
 35        patch,
 36        delete,
 37        wget,
 38    )
 39    from ._actions import (
 40        get_actions,
 41        do_action,
 42        do_action_async,
 43        do_action_legacy,
 44    )
 45    from ._misc import get_mrsm_version, get_chaining_status
 46    from ._pipes import (
 47        get_pipe_instance_keys,
 48        register_pipe,
 49        fetch_pipes_keys,
 50        edit_pipe,
 51        sync_pipe,
 52        delete_pipe,
 53        get_pipe_data,
 54        get_pipe_id,
 55        get_pipe_attributes,
 56        get_sync_time,
 57        pipe_exists,
 58        create_metadata,
 59        get_pipe_rowcount,
 60        drop_pipe,
 61        clear_pipe,
 62        get_pipe_columns_types,
 63        get_pipe_columns_indices,
 64    )
 65    from ._fetch import fetch
 66    from ._plugins import (
 67        register_plugin,
 68        install_plugin,
 69        delete_plugin,
 70        get_plugins,
 71        get_plugin_attributes,
 72    )
 73    from ._login import login, test_connection
 74    from ._users import (
 75        register_user,
 76        get_user_id,
 77        get_users,
 78        edit_user,
 79        delete_user,
 80        get_user_password_hash,
 81        get_user_type,
 82        get_user_attributes,
 83    )
 84    from ._tokens import (
 85        register_token,
 86        get_token_model,
 87        get_tokens,
 88        edit_token,
 89        invalidate_token,
 90        get_token_scopes,
 91        token_exists,
 92        delete_token,
 93    )
 94    from ._uri import from_uri
 95    from ._jobs import (
 96        get_jobs,
 97        get_job,
 98        get_job_metadata,
 99        get_job_properties,
100        get_job_exists,
101        delete_job,
102        start_job,
103        create_job,
104        stop_job,
105        pause_job,
106        get_logs,
107        get_job_stop_time,
108        monitor_logs,
109        monitor_logs_async,
110        get_job_is_blocking_on_stdin,
111        get_job_began,
112        get_job_ended,
113        get_job_paused,
114        get_job_status,
115    )
116
117    def __init__(
118        self,
119        label: Optional[str] = None,
120        wait: bool = False,
121        debug: bool = False,
122        **kw
123    ):
124        if 'uri' in kw:
125            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
126            label = label or from_uri_params.get('label', None)
127            _ = from_uri_params.pop('label', None)
128            kw.update(from_uri_params)
129
130        super().__init__('api', label=label, **kw)
131        if 'protocol' not in self.__dict__:
132            self.protocol = (
133                'https' if self.__dict__.get('uri', '').startswith('https')
134                else 'http'
135            )
136
137        if 'uri' not in self.__dict__:
138            self.verify_attributes(required_attributes)
139        else:
140            from meerschaum.connectors.sql import SQLConnector
141            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
142            if 'host' not in conn_attrs:
143                raise Exception(f"Invalid URI for '{self}'.")
144            self.__dict__.update(conn_attrs)
145
146        self.url = (
147            self.protocol + '://' +
148            self.host
149            + (
150                (':' + str(self.port))
151                if self.__dict__.get('port', None)
152                else ''
153            )
154        )
155        self._token = None
156        self._expires = None
157        self._session = None
158        self._instance_keys = self.__dict__.get('instance_keys', None)
159
160
161    @property
162    def URI(self) -> str:
163        """
164        Return the fully qualified URI.
165        """
166        import urllib.parse
167        username = self.__dict__.get('username', None)
168        password = self.__dict__.get('password', None)
169        client_id = self.__dict__.get('client_id', None)
170        client_secret = self.__dict__.get('client_secret', None)
171        api_key = self.__dict__.get('api_key', None)
172        creds = (username + ':' + password + '@') if username and password else ''
173        params = {}
174        params_str = ('?' + urllib.parse.urlencode(params)) if params else ''
175        return (
176            self.protocol
177            + '://'
178            + creds
179            + self.host
180            + (
181                (':' + str(self.port))
182                if self.__dict__.get('port', None)
183                else ''
184            )
185            + params_str
186        )
187
188    @property
189    def session(self):
190        if self._session is None:
191            _ = attempt_import('certifi', lazy=False)
192            requests = attempt_import('requests', lazy=False)
193            if requests:
194                self._session = requests.Session()
195            if self._session is None:
196                error("Failed to import requests. Is requests installed?")
197        return self._session
198
199    @property
200    def token(self):
201        expired = (
202            True if self._expires is None else (
203                (
204                    self._expires
205                    <
206                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
207                )
208            )
209        )
210
211        if self._token is None or expired:
212            success, msg = self.login()
213            if not success and not self.__dict__.get('_emitted_warning'):
214                warn(msg, stack=False)
215                self._emitted_warning = True
216        return self._token
217
218    @property
219    def instance_keys(self) -> Union[str, None]:
220        """
221        Return the instance keys to be sent alongside pipe requests.
222        """
223        return self._instance_keys
224
225    @property
226    def login_scheme(self) -> str:
227        """
228        Return the login scheme to use based on the configured credentials.
229        """
230        if 'username' in self.__dict__:
231            return 'password'
232        if 'client_id' in self.__dict__:
233            return 'client_credentials'
234        elif 'api_key' in self.__dict__:
235            return 'api_key'
236
237        return 'password'

Connect to a Meerschaum API instance.

APIConnector( label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)
117    def __init__(
118        self,
119        label: Optional[str] = None,
120        wait: bool = False,
121        debug: bool = False,
122        **kw
123    ):
124        if 'uri' in kw:
125            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
126            label = label or from_uri_params.get('label', None)
127            _ = from_uri_params.pop('label', None)
128            kw.update(from_uri_params)
129
130        super().__init__('api', label=label, **kw)
131        if 'protocol' not in self.__dict__:
132            self.protocol = (
133                'https' if self.__dict__.get('uri', '').startswith('https')
134                else 'http'
135            )
136
137        if 'uri' not in self.__dict__:
138            self.verify_attributes(required_attributes)
139        else:
140            from meerschaum.connectors.sql import SQLConnector
141            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
142            if 'host' not in conn_attrs:
143                raise Exception(f"Invalid URI for '{self}'.")
144            self.__dict__.update(conn_attrs)
145
146        self.url = (
147            self.protocol + '://' +
148            self.host
149            + (
150                (':' + str(self.port))
151                if self.__dict__.get('port', None)
152                else ''
153            )
154        )
155        self._token = None
156        self._expires = None
157        self._session = None
158        self._instance_keys = self.__dict__.get('instance_keys', None)

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
IS_THREAD_SAFE: bool = False
OPTIONAL_ATTRIBUTES: List[str] = ['port', 'client_secret', 'client_id', 'api_key']
url
URI: str
161    @property
162    def URI(self) -> str:
163        """
164        Return the fully qualified URI.
165        """
166        import urllib.parse
167        username = self.__dict__.get('username', None)
168        password = self.__dict__.get('password', None)
169        client_id = self.__dict__.get('client_id', None)
170        client_secret = self.__dict__.get('client_secret', None)
171        api_key = self.__dict__.get('api_key', None)
172        creds = (username + ':' + password + '@') if username and password else ''
173        params = {}
174        params_str = ('?' + urllib.parse.urlencode(params)) if params else ''
175        return (
176            self.protocol
177            + '://'
178            + creds
179            + self.host
180            + (
181                (':' + str(self.port))
182                if self.__dict__.get('port', None)
183                else ''
184            )
185            + params_str
186        )

Return the fully qualified URI.

session
188    @property
189    def session(self):
190        if self._session is None:
191            _ = attempt_import('certifi', lazy=False)
192            requests = attempt_import('requests', lazy=False)
193            if requests:
194                self._session = requests.Session()
195            if self._session is None:
196                error("Failed to import requests. Is requests installed?")
197        return self._session
token
199    @property
200    def token(self):
201        expired = (
202            True if self._expires is None else (
203                (
204                    self._expires
205                    <
206                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
207                )
208            )
209        )
210
211        if self._token is None or expired:
212            success, msg = self.login()
213            if not success and not self.__dict__.get('_emitted_warning'):
214                warn(msg, stack=False)
215                self._emitted_warning = True
216        return self._token
instance_keys: Optional[str]
218    @property
219    def instance_keys(self) -> Union[str, None]:
220        """
221        Return the instance keys to be sent alongside pipe requests.
222        """
223        return self._instance_keys

Return the instance keys to be sent alongside pipe requests.

login_scheme: str
225    @property
226    def login_scheme(self) -> str:
227        """
228        Return the login scheme to use based on the configured credentials.
229        """
230        if 'username' in self.__dict__:
231            return 'password'
232        if 'client_id' in self.__dict__:
233            return 'client_credentials'
234        elif 'api_key' in self.__dict__:
235            return 'api_key'
236
237        return 'password'

Return the login scheme to use based on the configured credentials.

def make_request( self, method: str, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kwargs: Any) -> requests.models.Response:
 28def make_request(
 29    self,
 30    method: str,
 31    r_url: str,
 32    headers: Optional[Dict[str, Any]] = None,
 33    use_token: bool = True,
 34    debug: bool = False,
 35    **kwargs: Any
 36) -> 'requests.Response':
 37    """
 38    Make a request to this APIConnector's endpoint using the in-memory session.
 39
 40    Parameters
 41    ----------
 42    method: str
 43        The kind of request to make.
 44        Accepted values:
 45        - `'GET'`
 46        - `'OPTIONS'`
 47        - `'HEAD'`
 48        - `'POST'`
 49        - `'PUT'`
 50        - `'PATCH'`
 51        - `'DELETE'`
 52
 53    r_url: str
 54        The relative URL for the endpoint (e.g. `'/pipes'`).
 55
 56    headers: Optional[Dict[str, Any]], default None
 57        The headers to use for the request.
 58        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
 59
 60    use_token: bool, default True
 61        If `True`, add the authorization token to the headers.
 62
 63    debug: bool, default False
 64        Verbosity toggle.
 65
 66    kwargs: Any
 67        All other keyword arguments are passed to `requests.request`.
 68
 69    Returns
 70    -------
 71    A `requests.Reponse` object.
 72    """
 73    if method.upper() not in METHODS:
 74        raise ValueError(f"Method '{method}' is not supported.")
 75
 76    verify = self.__dict__.get('verify', None)
 77    if 'verify' not in kwargs and isinstance(verify, bool):
 78        kwargs['verify'] = verify
 79
 80    headers = (
 81        copy.deepcopy(headers)
 82        if isinstance(headers, dict)
 83        else {}
 84    )
 85
 86    if use_token:
 87        headers.update({'Authorization': f'Bearer {self.token}'})
 88
 89    if 'timeout' not in kwargs:
 90        kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout']
 91
 92    request_url = urllib.parse.urljoin(self.url, r_url)
 93    if debug:
 94        dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}")
 95
 96    return self.session.request(
 97        method.upper(),
 98        request_url,
 99        headers=headers,
100        **kwargs
101    )

Make a request to this APIConnector's endpoint using the in-memory session.

Parameters
  • method (str): The kind of request to make. Accepted values:
    • 'GET'
    • 'OPTIONS'
    • 'HEAD'
    • 'POST'
    • 'PUT'
    • 'PATCH'
    • 'DELETE'
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def get(self, r_url: str, **kwargs: Any) -> requests.models.Response:
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response':
105    """
106    Wrapper for `requests.get`.
107
108    Parameters
109    ----------
110    r_url: str
111        The relative URL for the endpoint (e.g. `'/pipes'`).
112
113    headers: Optional[Dict[str, Any]], default None
114        The headers to use for the request.
115        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
116
117    use_token: bool, default True
118        If `True`, add the authorization token to the headers.
119
120    debug: bool, default False
121        Verbosity toggle.
122
123    kwargs: Any
124        All other keyword arguments are passed to `requests.request`.
125
126    Returns
127    -------
128    A `requests.Reponse` object.
129
130    """
131    return self.make_request('GET', r_url, **kwargs)

Wrapper for requests.get.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def post(self, r_url: str, **kwargs: Any) -> requests.models.Response:
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response':
135    """
136    Wrapper for `requests.post`.
137
138    Parameters
139    ----------
140    r_url: str
141        The relative URL for the endpoint (e.g. `'/pipes'`).
142
143    headers: Optional[Dict[str, Any]], default None
144        The headers to use for the request.
145        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
146
147    use_token: bool, default True
148        If `True`, add the authorization token to the headers.
149
150    debug: bool, default False
151        Verbosity toggle.
152
153    kwargs: Any
154        All other keyword arguments are passed to `requests.request`.
155
156    Returns
157    -------
158    A `requests.Reponse` object.
159
160    """
161    return self.make_request('POST', r_url, **kwargs)

Wrapper for requests.post.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def put(self, r_url: str, **kwargs: Any) -> requests.models.Response:
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response':
194    """
195    Wrapper for `requests.put`.
196
197    Parameters
198    ----------
199    r_url: str
200        The relative URL for the endpoint (e.g. `'/pipes'`).
201
202    headers: Optional[Dict[str, Any]], default None
203        The headers to use for the request.
204        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
205
206    use_token: bool, default True
207        If `True`, add the authorization token to the headers.
208
209    debug: bool, default False
210        Verbosity toggle.
211
212    kwargs: Any
213        All other keyword arguments are passed to `requests.request`.
214
215    Returns
216    -------
217    A `requests.Reponse` object.
218    """
219    return self.make_request('PUT', r_url, **kwargs)

Wrapper for requests.put.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def patch(self, r_url: str, **kwargs: Any) -> requests.models.Response:
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response':
165    """
166    Wrapper for `requests.patch`.
167
168    Parameters
169    ----------
170    r_url: str
171        The relative URL for the endpoint (e.g. `'/pipes'`).
172
173    headers: Optional[Dict[str, Any]], default None
174        The headers to use for the request.
175        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
176
177    use_token: bool, default True
178        If `True`, add the authorization token to the headers.
179
180    debug: bool, default False
181        Verbosity toggle.
182
183    kwargs: Any
184        All other keyword arguments are passed to `requests.request`.
185
186    Returns
187    -------
188    A `requests.Reponse` object.
189    """
190    return self.make_request('PATCH', r_url, **kwargs)

Wrapper for requests.patch.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def delete(self, r_url: str, **kwargs: Any) -> requests.models.Response:
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response':
223    """
224    Wrapper for `requests.delete`.
225
226    Parameters
227    ----------
228    r_url: str
229        The relative URL for the endpoint (e.g. `'/pipes'`).
230
231    headers: Optional[Dict[str, Any]], default None
232        The headers to use for the request.
233        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
234
235    use_token: bool, default True
236        If `True`, add the authorization token to the headers.
237
238    debug: bool, default False
239        Verbosity toggle.
240
241    kwargs: Any
242        All other keyword arguments are passed to `requests.request`.
243
244    Returns
245    -------
246    A `requests.Reponse` object.
247    """
248    return self.make_request('DELETE', r_url, **kwargs)

Wrapper for requests.delete.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def wget( self, r_url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
251def wget(
252    self,
253    r_url: str,
254    dest: Optional[Union[str, pathlib.Path]] = None,
255    headers: Optional[Dict[str, Any]] = None,
256    use_token: bool = True,
257    debug: bool = False,
258    **kw: Any
259) -> pathlib.Path:
260    """Mimic wget with requests."""
261    from meerschaum.utils.misc import wget
262    if headers is None:
263        headers = {}
264    if use_token:
265        headers.update({'Authorization': f'Bearer {self.token}'})
266    request_url = urllib.parse.urljoin(self.url, r_url)
267    if debug:
268        dprint(
269            f"[{self}] Downloading {request_url}"
270            + (f' to {dest}' if dest is not None else '')
271            + "..."
272        )
273    return wget(request_url, dest=dest, headers=headers, **kw)

Mimic wget with requests.

def get_actions(self):
24def get_actions(self):
25    """Get available actions from the API instance."""
26    return self.get(ACTIONS_ENDPOINT)

Get available actions from the API instance.

def do_action(self, sysargs: List[str]) -> Tuple[bool, str]:
29def do_action(self, sysargs: List[str]) -> SuccessTuple:
30    """
31    Execute a Meerschaum action remotely.
32    """
33    return asyncio.run(self.do_action_async(sysargs))

Execute a Meerschaum action remotely.

async def do_action_async( self, sysargs: List[str], callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='')) -> Tuple[bool, str]:
36async def do_action_async(
37    self,
38    sysargs: List[str],
39    callback_function: Callable[[str], None] = partial(print, end=''),
40) -> SuccessTuple:
41    """
42    Execute an action as a temporary remote job.
43    """
44    from meerschaum._internal.arguments import remove_api_executor_keys
45    from meerschaum.utils.misc import generate_password
46    sysargs = remove_api_executor_keys(sysargs)
47
48    job_name = TEMP_PREFIX + generate_password(12)
49    job = mrsm.Job(job_name, sysargs, executor_keys=str(self))
50
51    start_success, start_msg = job.start()
52    if not start_success:
53        return start_success, start_msg
54
55    await job.monitor_logs_async(
56        callback_function=callback_function,
57        stop_on_exit=True,
58        strip_timestamps=True,
59    )
60
61    success, msg = job.result
62    job.delete()
63    return success, msg

Execute an action as a temporary remote job.

def do_action_legacy( self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
 66def do_action_legacy(
 67    self,
 68    action: Optional[List[str]] = None,
 69    sysargs: Optional[List[str]] = None,
 70    debug: bool = False,
 71    **kw
 72) -> SuccessTuple:
 73    """
 74    NOTE: This method is deprecated.
 75    Please use `do_action()` or `do_action_async()`.
 76
 77    Execute a Meerschaum action remotely.
 78
 79    If `sysargs` are provided, parse those instead.
 80    Otherwise infer everything from keyword arguments.
 81
 82    Examples
 83    --------
 84    >>> conn = mrsm.get_connector('api:main')
 85    >>> conn.do_action(['show', 'pipes'])
 86    (True, "Success")
 87    >>> conn.do_action(['show', 'arguments'], name='test')
 88    (True, "Success")
 89    """
 90    import sys, json
 91    from meerschaum.utils.debug import dprint
 92    from meerschaum._internal.static import STATIC_CONFIG
 93    from meerschaum.utils.misc import json_serialize_datetime
 94    if action is None:
 95        action = []
 96
 97    if sysargs is not None and action and action[0] == '':
 98        from meerschaum._internal.arguments import parse_arguments
 99        if debug:
100            dprint(f"Parsing sysargs:\n{sysargs}")
101        json_dict = parse_arguments(sysargs)
102    else:
103        json_dict = kw
104        json_dict['action'] = action
105        if 'noask' not in kw:
106            json_dict['noask'] = True
107        if 'yes' not in kw:
108            json_dict['yes'] = True
109        if debug:
110            json_dict['debug'] = debug
111
112    root_action = json_dict['action'][0]
113    del json_dict['action'][0]
114    r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}"
115    
116    if debug:
117        from meerschaum.utils.formatting import pprint
118        dprint(f"Sending data to '{self.url + r_url}':")
119        pprint(json_dict, stream=sys.stderr)
120
121    response = self.post(
122        r_url,
123        data = json.dumps(json_dict, default=json_serialize_datetime),
124        debug = debug,
125    )
126    try:
127        response_list = json.loads(response.text)
128        if isinstance(response_list, dict) and 'detail' in response_list:
129            return False, response_list['detail']
130    except Exception as e:
131        print(f"Invalid response: {response}")
132        print(e)
133        return False, response.text
134    if debug:
135        dprint(response)
136    try:
137        return response_list[0], response_list[1]
138    except Exception as e:
139        return False, f"Failed to parse result from action '{root_action}'"

NOTE: This method is deprecated. Please use do_action() or do_action_async().

Execute a Meerschaum action remotely.

If sysargs are provided, parse those instead. Otherwise infer everything from keyword arguments.

Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
def get_mrsm_version(self, **kw) -> Optional[str]:
13def get_mrsm_version(self, **kw) -> Optional[str]:
14    """
15    Return the Meerschaum version of the API instance.
16    """
17    from meerschaum._internal.static import STATIC_CONFIG
18    try:
19        j = self.get(
20            STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm',
21            use_token=False,
22            **kw
23        ).json()
24    except Exception:
25        return None
26    if isinstance(j, dict) and 'detail' in j:
27        return None
28    return j

Return the Meerschaum version of the API instance.

def get_chaining_status(self, **kw) -> Optional[bool]:
31def get_chaining_status(self, **kw) -> Optional[bool]:
32    """
33    Fetch the chaining status of the API instance.
34    """
35    from meerschaum._internal.static import STATIC_CONFIG
36    try:
37        response = self.get(
38            STATIC_CONFIG['api']['endpoints']['chaining'],
39            use_token = True,
40            **kw
41        )
42        if not response:
43            return None
44    except Exception:
45        return None
46
47    return response.json()

Fetch the chaining status of the API instance.

def get_pipe_instance_keys(self, pipe: meerschaum.Pipe) -> Optional[str]:
35def get_pipe_instance_keys(self, pipe: mrsm.Pipe) -> Union[str, None]:
36    """
37    Return the configured instance keys for a pipe if set,
38    else fall back to the default `instance_keys` for this `APIConnector`.
39    """
40    return pipe.parameters.get('instance_keys', self.instance_keys)

Return the configured instance keys for a pipe if set, else fall back to the default instance_keys for this APIConnector.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
43def register_pipe(
44    self,
45    pipe: mrsm.Pipe,
46    debug: bool = False
47) -> SuccessTuple:
48    """Submit a POST to the API to register a new Pipe object.
49    Returns a tuple of (success_bool, response_dict).
50    """
51    from meerschaum.utils.debug import dprint
52    r_url = pipe_r_url(pipe)
53    response = self.post(
54        r_url + '/register',
55        json=pipe._attributes.get('parameters', {}),
56        params={'instance_keys': self.get_pipe_instance_keys(pipe)},
57        debug=debug,
58    )
59    if debug:
60        dprint(response.text)
61
62    if not response:
63        return False, response.text
64
65    response_data = response.json()
66    if isinstance(response_data, list):
67        response_tuple = response_data[0], response_data[1]
68    elif 'detail' in response.json():
69        response_tuple = response.__bool__(), response_data['detail']
70    else:
71        response_tuple = response.__bool__(), response.text
72    return response_tuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> List[Union[Tuple[str, str, Optional[str]], Tuple[str, str, Optional[str], List[str]], Tuple[str, str, Optional[str], Dict[str, Any]]]]:
108def fetch_pipes_keys(
109    self,
110    connector_keys: Optional[List[str]] = None,
111    metric_keys: Optional[List[str]] = None,
112    location_keys: Optional[List[str]] = None,
113    tags: Optional[List[str]] = None,
114    params: Optional[Dict[str, Any]] = None,
115    debug: bool = False
116) -> List[
117        Union[
118            Tuple[str, str, Union[str, None]],
119            Tuple[str, str, Union[str, None], List[str]],
120            Tuple[str, str, Union[str, None], Dict[str, Any]]
121        ]
122    ]:
123    """
124    Fetch registered Pipes' keys from the API.
125    
126    Parameters
127    ----------
128    connector_keys: Optional[List[str]], default None
129        The connector keys for the query.
130
131    metric_keys: Optional[List[str]], default None
132        The metric keys for the query.
133
134    location_keys: Optional[List[str]], default None
135        The location keys for the query.
136
137    tags: Optional[List[str]], default None
138        A list of tags for the query.
139
140    params: Optional[Dict[str, Any]], default None
141        A parameters dictionary for filtering against the `pipes` table
142        (e.g. `{'connector_keys': 'plugin:foo'}`).
143        Not recommeded to be used.
144
145    debug: bool, default False
146        Verbosity toggle.
147
148    Returns
149    -------
150    A list of tuples containing pipes' keys.
151    """
152    from meerschaum._internal.static import STATIC_CONFIG
153    if connector_keys is None:
154        connector_keys = []
155    if metric_keys is None:
156        metric_keys = []
157    if location_keys is None:
158        location_keys = []
159    if tags is None:
160        tags = []
161
162    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
163    try:
164        j = self.get(
165            r_url,
166            params={
167                'connector_keys': json.dumps(connector_keys),
168                'metric_keys': json.dumps(metric_keys),
169                'location_keys': json.dumps(location_keys),
170                'tags': json.dumps(tags),
171                'params': json.dumps(params),
172                'instance_keys': self.instance_keys,
173            },
174            debug=debug
175        ).json()
176    except Exception as e:
177        import traceback
178        traceback.print_exc()
179        error(str(e))
180
181    if 'detail' in j:
182        error(j['detail'], stack=False)
183    return [tuple(r) for r in j]

Fetch registered Pipes' keys from the API.

Parameters
  • connector_keys (Optional[List[str]], default None): The connector keys for the query.
  • metric_keys (Optional[List[str]], default None): The metric keys for the query.
  • location_keys (Optional[List[str]], default None): The location keys for the query.
  • tags (Optional[List[str]], default None): A list of tags for the query.
  • params (Optional[Dict[str, Any]], default None): A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of tuples containing pipes' keys.
def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False) -> Tuple[bool, str]:
 75def edit_pipe(
 76    self,
 77    pipe: mrsm.Pipe,
 78    patch: bool = False,
 79    debug: bool = False,
 80) -> SuccessTuple:
 81    """Submit a PATCH to the API to edit an existing Pipe object.
 82    Returns a tuple of (success_bool, response_dict).
 83    """
 84    from meerschaum.utils.debug import dprint
 85    ### NOTE: if `parameters` is supplied in the Pipe constructor,
 86    ###       then `pipe.parameters` will exist and not be fetched from the database.
 87    r_url = pipe_r_url(pipe)
 88    response = self.patch(
 89        r_url + '/edit',
 90        params={'patch': patch, 'instance_keys': self.get_pipe_instance_keys(pipe)},
 91        json=pipe.get_parameters(apply_symlinks=False),
 92        debug=debug,
 93    )
 94    if debug:
 95        dprint(response.text)
 96
 97    response_data = response.json()
 98
 99    if isinstance(response.json(), list):
100        response_tuple = response_data[0], response_data[1]
101    elif 'detail' in response.json():
102        response_tuple = response.__bool__(), response_data['detail']
103    else:
104        response_tuple = response.__bool__(), response.text
105    return response_tuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

def sync_pipe( self, pipe: meerschaum.Pipe, df: "Optional[Union['pd.DataFrame', Dict[Any, Any], str]]" = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
186def sync_pipe(
187    self,
188    pipe: mrsm.Pipe,
189    df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None,
190    chunksize: Optional[int] = -1,
191    debug: bool = False,
192    **kw: Any
193) -> SuccessTuple:
194    """Sync a DataFrame into a Pipe."""
195    from decimal import Decimal
196    from meerschaum.utils.debug import dprint
197    from meerschaum.utils.dtypes import json_serialize_value
198    from meerschaum.utils.misc import items_str, interval_str
199    from meerschaum.config import get_config
200    from meerschaum.utils.packages import attempt_import
201    from meerschaum.utils.dataframe import get_special_cols, to_json
202    begin = time.perf_counter()
203    more_itertools = attempt_import('more_itertools')
204    if df is None:
205        msg = f"DataFrame is `None`. Cannot sync {pipe}."
206        return False, msg
207
208    def get_json_str(c):
209        ### allow syncing dict or JSON without needing to import pandas (for IOT devices)
210        if isinstance(c, str):
211            return c
212        if isinstance(c, (dict, list, tuple)):
213            return json.dumps(c, default=json_serialize_value)
214        return to_json(c, orient='columns')
215
216    df = json.loads(df) if isinstance(df, str) else df
217
218    _chunksize: Optional[int] = (1 if chunksize is None else (
219        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
220        else chunksize
221    ))
222    keys: List[str] = list(df.columns)
223    chunks = []
224    if hasattr(df, 'index'):
225        df = df.reset_index(drop=True)
226        is_dask = 'dask' in df.__module__
227        chunks = (
228            (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize))
229            if not is_dask
230            else [partition.compute() for partition in df.partitions]
231        )
232
233    elif isinstance(df, dict):
234        ### `_chunks` is a dict of lists of dicts.
235        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
236        _chunks = {k: [] for k in keys}
237        for k in keys:
238            chunk_iter = more_itertools.chunked(df[k], _chunksize)
239            for l in chunk_iter:
240                _chunks[k].append({k: l})
241
242        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
243        for k, l in _chunks.items():
244            for i, c in enumerate(l):
245                try:
246                    chunks[i].update(c)
247                except IndexError:
248                    chunks.append(c)
249    elif isinstance(df, list):
250        chunks = (df[i] for i in more_itertools.chunked(df, _chunksize))
251
252    ### Send columns in case the user has defined them locally.
253    request_params = kw.copy()
254    if pipe.columns:
255        request_params['columns'] = json.dumps(pipe.columns)
256    request_params['instance_keys'] = self.get_pipe_instance_keys(pipe)
257    r_url = pipe_r_url(pipe) + '/data'
258
259    rowcount = 0
260    num_success_chunks = 0
261    for i, c in enumerate(chunks):
262        if debug:
263            dprint(f"[{self}] Posting chunk {i} to {r_url}...")
264        if len(c) == 0:
265            if debug:
266                dprint(f"[{self}] Skipping empty chunk...")
267            continue
268        json_str = get_json_str(c)
269
270        try:
271            response = self.post(
272                r_url,
273                params=request_params,
274                data=json_str,
275                debug=debug,
276            )
277        except Exception as e:
278            msg = f"Failed to post a chunk to {pipe}:\n{e}"
279            warn(msg)
280            return False, msg
281            
282        if not response:
283            return False, f"Failed to sync a chunk:\n{response.text}"
284
285        try:
286            j = json.loads(response.text)
287        except Exception as e:
288            return False, f"Failed to parse response from syncing {pipe}:\n{e}"
289
290        if isinstance(j, dict) and 'detail' in j:
291            return False, j['detail']
292
293        try:
294            j = tuple(j)
295        except Exception:
296            return False, response.text
297
298        if debug:
299            dprint("Received response: " + str(j))
300        if not j[0]:
301            return j
302
303        rowcount += len(c)
304        num_success_chunks += 1
305
306    success_tuple = True, (
307        f"It took {interval_str(timedelta(seconds=(time.perf_counter() - begin)))} "
308        + f"to sync {rowcount:,} row"
309        + ('s' if rowcount != 1 else '')
310        + f" across {num_success_chunks:,} chunk" + ('s' if num_success_chunks != 1 else '') +
311        f" to {pipe}."
312    )
313    return success_tuple

Sync a DataFrame into a Pipe.

def delete_pipe( self, pipe: Optional[meerschaum.Pipe] = None, debug: bool = False) -> Tuple[bool, str]:
316def delete_pipe(
317    self,
318    pipe: Optional[mrsm.Pipe] = None,
319    debug: bool = False,
320) -> SuccessTuple:
321    """Delete a Pipe and drop its table."""
322    if pipe is None:
323        error("Pipe cannot be None.")
324    r_url = pipe_r_url(pipe)
325    response = self.delete(
326        r_url + '/delete',
327        params={'instance_keys': self.get_pipe_instance_keys(pipe)},
328        debug=debug,
329    )
330    if debug:
331        dprint(response.text)
332
333    response_data = response.json()
334    if isinstance(response.json(), list):
335        response_tuple = response_data[0], response_data[1]
336    elif 'detail' in response.json():
337        response_tuple = response.__bool__(), response_data['detail']
338    else:
339        response_tuple = response.__bool__(), response.text
340    return response_tuple

Delete a Pipe and drop its table.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
343def get_pipe_data(
344    self,
345    pipe: mrsm.Pipe,
346    select_columns: Optional[List[str]] = None,
347    omit_columns: Optional[List[str]] = None,
348    begin: Union[str, datetime, int, None] = None,
349    end: Union[str, datetime, int, None] = None,
350    params: Optional[Dict[str, Any]] = None,
351    as_chunks: bool = False,
352    debug: bool = False,
353    **kw: Any
354) -> Union[pandas.DataFrame, None]:
355    """Fetch data from the API."""
356    r_url = pipe_r_url(pipe)
357    while True:
358        try:
359            response = self.get(
360                r_url + "/data",
361                params={
362                    'select_columns': json.dumps(select_columns),
363                    'omit_columns': json.dumps(omit_columns),
364                    'begin': begin,
365                    'end': end,
366                    'params': json.dumps(params, default=str),
367                    'instance': self.get_pipe_instance_keys(pipe),
368                    'as_chunks': as_chunks,
369                },
370                debug=debug
371            )
372            if not response.ok:
373                return None
374            j = response.json()
375        except Exception as e:
376            warn(f"Failed to get data for {pipe}:\n{e}")
377            return None
378        if isinstance(j, dict) and 'detail' in j:
379            return False, j['detail']
380        break
381
382    from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df
383    from meerschaum.utils.dtypes import are_dtypes_equal
384    try:
385        df = parse_df_datetimes(
386            j,
387            ignore_cols=[
388                col
389                for col, dtype in pipe.dtypes.items()
390                if not are_dtypes_equal(str(dtype), 'datetime')
391            ],
392            strip_timezone=(pipe.tzinfo is None),
393            debug=debug,
394        )
395    except Exception as e:
396        warn(f"Failed to parse response for {pipe}:\n{e}")
397        return None
398
399    if len(df.columns) == 0:
400        return add_missing_cols_to_df(df, pipe.dtypes)
401
402    return df

Fetch data from the API.

def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False) -> Union[int, str, NoneType]:
405def get_pipe_id(
406    self,
407    pipe: mrsm.Pipe,
408    debug: bool = False,
409) -> Union[int, str, None]:
410    """Get a Pipe's ID from the API."""
411    from meerschaum.utils.misc import is_int
412    r_url = pipe_r_url(pipe)
413    response = self.get(
414        r_url + '/id',
415        params={
416            'instance': self.get_pipe_instance_keys(pipe),
417        },
418        debug=debug,
419    )
420    if debug:
421        dprint(f"Got pipe ID: {response.text}")
422    try:
423        if is_int(response.text):
424            return int(response.text)
425        if response.text and response.text[0] != '{':
426            return response.text
427    except Exception as e:
428        warn(f"Failed to get the ID for {pipe}:\n{e}")
429    return None

Get a Pipe's ID from the API.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
432def get_pipe_attributes(
433    self,
434    pipe: mrsm.Pipe,
435    debug: bool = False,
436) -> Dict[str, Any]:
437    """Get a Pipe's attributes from the API
438
439    Parameters
440    ----------
441    pipe: meerschaum.Pipe
442        The pipe whose attributes we are fetching.
443        
444    Returns
445    -------
446    A dictionary of a pipe's attributes.
447    If the pipe does not exist, return an empty dictionary.
448    """
449    r_url = pipe_r_url(pipe)
450    response = self.get(
451        r_url + '/attributes',
452        params={
453            'instance': self.get_pipe_instance_keys(pipe),
454        },
455        debug=debug
456    )
457    try:
458        return json.loads(response.text)
459    except Exception as e:
460        warn(f"Failed to get the attributes for {pipe}:\n{e}")
461    return {}

Get a Pipe's attributes from the API

Parameters
Returns
  • A dictionary of a pipe's attributes.
  • If the pipe does not exist, return an empty dictionary.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
464def get_sync_time(
465    self,
466    pipe: mrsm.Pipe,
467    params: Optional[Dict[str, Any]] = None,
468    newest: bool = True,
469    debug: bool = False,
470) -> Union[datetime, int, None]:
471    """Get a Pipe's most recent datetime value from the API.
472
473    Parameters
474    ----------
475    pipe: meerschaum.Pipe
476        The pipe to select from.
477
478    params: Optional[Dict[str, Any]], default None
479        Optional params dictionary to build the WHERE clause.
480
481    newest: bool, default True
482        If `True`, get the most recent datetime (honoring `params`).
483        If `False`, get the oldest datetime (ASC instead of DESC).
484
485    Returns
486    -------
487    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
488    rounded down to the closest minute.
489    """
490    from meerschaum.utils.misc import is_int
491    from meerschaum.utils.warnings import warn
492    r_url = pipe_r_url(pipe)
493    response = self.get(
494        r_url + '/sync_time',
495        json=params,
496        params={
497            'instance': self.get_pipe_instance_keys(pipe),
498            'newest': newest,
499            'debug': debug,
500        },
501        debug=debug,
502    )
503    if not response:
504        warn(f"Failed to get the sync time for {pipe}:\n" + response.text)
505        return None
506
507    j = response.json()
508    if j is None:
509        dt = None
510    else:
511        try:
512            dt = (
513                datetime.fromisoformat(j)
514                if not is_int(j)
515                else int(j)
516            )
517        except Exception as e:
518            warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}")
519            dt = None
520    return dt

Get a Pipe's most recent datetime value from the API.

Parameters
  • pipe (meerschaum.Pipe): The pipe to select from.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
Returns
  • The most recent (or oldest if newest is False) datetime of a pipe,
  • rounded down to the closest minute.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
523def pipe_exists(
524    self,
525    pipe: mrsm.Pipe,
526    debug: bool = False
527) -> bool:
528    """Check the API to see if a Pipe exists.
529
530    Parameters
531    ----------
532    pipe: 'meerschaum.Pipe'
533        The pipe which were are querying.
534        
535    Returns
536    -------
537    A bool indicating whether a pipe's underlying table exists.
538    """
539    from meerschaum.utils.debug import dprint
540    from meerschaum.utils.warnings import warn
541    r_url = pipe_r_url(pipe)
542    response = self.get(
543        r_url + '/exists',
544        params={
545            'instance': self.get_pipe_instance_keys(pipe),
546        },
547        debug=debug,
548    )
549    if not response:
550        warn(f"Failed to check if {pipe} exists:\n{response.text}")
551        return False
552    if debug:
553        dprint("Received response: " + str(response.text))
554    j = response.json()
555    if isinstance(j, dict) and 'detail' in j:
556        warn(j['detail'])
557    return j

Check the API to see if a Pipe exists.

Parameters
Returns
  • A bool indicating whether a pipe's underlying table exists.
def create_metadata(self, debug: bool = False) -> bool:
560def create_metadata(
561    self,
562    debug: bool = False
563) -> bool:
564    """Create metadata tables.
565
566    Returns
567    -------
568    A bool indicating success.
569    """
570    from meerschaum.utils.debug import dprint
571    from meerschaum._internal.static import STATIC_CONFIG
572    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
573    response = self.post(r_url, debug=debug)
574    if debug:
575        dprint("Create metadata response: {response.text}")
576    try:
577        _ = json.loads(response.text)
578    except Exception as e:
579        warn(f"Failed to create metadata on {self}:\n{e}")
580    return False

Create metadata tables.

Returns
  • A bool indicating success.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
583def get_pipe_rowcount(
584    self,
585    pipe: mrsm.Pipe,
586    begin: Union[str, datetime, int, None] = None,
587    end: Union[str, datetime, int, None] = None,
588    params: Optional[Dict[str, Any]] = None,
589    remote: bool = False,
590    debug: bool = False,
591) -> int:
592    """Get a pipe's row count from the API.
593
594    Parameters
595    ----------
596    pipe: 'meerschaum.Pipe':
597        The pipe whose row count we are counting.
598        
599    begin: Union[str, datetime, int, None], default None
600        If provided, bound the count by this datetime.
601
602    end: Union[str, datetime, int, None], default None
603        If provided, bound the count by this datetime.
604
605    params: Optional[Dict[str, Any]], default None
606        If provided, bound the count by these parameters.
607
608    remote: bool, default False
609        If `True`, return the rowcount for the fetch definition.
610
611    Returns
612    -------
613    The number of rows in the pipe's table, bound the given parameters.
614    If the table does not exist, return 0.
615    """
616    r_url = pipe_r_url(pipe)
617    response = self.get(
618        r_url + "/rowcount",
619        json = params,
620        params = {
621            'begin': begin,
622            'end': end,
623            'remote': remote,
624            'instance': self.get_pipe_instance_keys(pipe),
625        },
626        debug = debug
627    )
628    if not response:
629        warn(f"Failed to get the rowcount for {pipe}:\n{response.text}")
630        return 0
631    try:
632        return int(json.loads(response.text))
633    except Exception as e:
634        warn(f"Failed to get the rowcount for {pipe}:\n{e}")
635    return 0

Get a pipe's row count from the API.

Parameters
  • pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
  • begin (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
  • end (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
  • params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
  • remote (bool, default False): If True, return the rowcount for the fetch definition.
Returns
  • The number of rows in the pipe's table, bound the given parameters.
  • If the table does not exist, return 0.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
638def drop_pipe(
639    self,
640    pipe: mrsm.Pipe,
641    debug: bool = False
642) -> SuccessTuple:
643    """
644    Drop a pipe's table but maintain its registration.
645
646    Parameters
647    ----------
648    pipe: meerschaum.Pipe:
649        The pipe to be dropped.
650        
651    Returns
652    -------
653    A success tuple (bool, str).
654    """
655    from meerschaum.utils.warnings import error
656    from meerschaum.utils.debug import dprint
657    if pipe is None:
658        error("Pipe cannot be None.")
659    r_url = pipe_r_url(pipe)
660    response = self.delete(
661        r_url + '/drop',
662        params={
663            'instance': self.get_pipe_instance_keys(pipe),
664        },
665        debug=debug,
666    )
667    if debug:
668        dprint(response.text)
669
670    try:
671        data = response.json()
672    except Exception as e:
673        return False, f"Failed to drop {pipe}."
674
675    if isinstance(data, list):
676        response_tuple = data[0], data[1]
677    elif 'detail' in response.json():
678        response_tuple = response.__bool__(), data['detail']
679    else:
680        response_tuple = response.__bool__(), response.text
681
682    return response_tuple

Drop a pipe's table but maintain its registration.

Parameters
Returns
  • A success tuple (bool, str).
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
685def clear_pipe(
686    self,
687    pipe: mrsm.Pipe,
688    begin: Union[str, datetime, int, None] = None,
689    end: Union[str, datetime, int, None] = None,
690    params: Optional[Dict[str, Any]] = None,
691    debug: bool = False,
692    **kw
693) -> SuccessTuple:
694    """
695    Delete rows in a pipe's table.
696
697    Parameters
698    ----------
699    pipe: meerschaum.Pipe
700        The pipe with rows to be deleted.
701        
702    Returns
703    -------
704    A success tuple.
705    """
706    r_url = pipe_r_url(pipe)
707    response = self.delete(
708        r_url + '/clear',
709        params={
710            'begin': begin,
711            'end': end,
712            'params': json.dumps(params),
713            'instance': self.get_pipe_instance_keys(pipe),
714        },
715        debug=debug,
716    )
717    if debug:
718        dprint(response.text)
719
720    try:
721        data = response.json()
722    except Exception as e:
723        return False, f"Failed to clear {pipe} with constraints {begin=}, {end=}, {params=}."
724
725    if isinstance(data, list):
726        response_tuple = data[0], data[1]
727    elif 'detail' in response.json():
728        response_tuple = response.__bool__(), data['detail']
729    else:
730        response_tuple = response.__bool__(), response.text
731
732    return response_tuple

Delete rows in a pipe's table.

Parameters
Returns
  • A success tuple.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
735def get_pipe_columns_types(
736    self,
737    pipe: mrsm.Pipe,
738    debug: bool = False,
739) -> Union[Dict[str, str], None]:
740    """
741    Fetch the columns and types of the pipe's table.
742
743    Parameters
744    ----------
745    pipe: meerschaum.Pipe
746        The pipe whose columns to be queried.
747
748    Returns
749    -------
750    A dictionary mapping column names to their database types.
751
752    Examples
753    --------
754    >>> {
755    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
756    ...   'id': 'BIGINT',
757    ...   'val': 'DOUBLE PRECISION',
758    ... }
759    >>>
760    """
761    r_url = pipe_r_url(pipe) + '/columns/types'
762    response = self.get(
763        r_url,
764        params={
765            'instance': self.get_pipe_instance_keys(pipe),
766        },
767        debug=debug,
768    )
769    j = response.json()
770    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
771        warn(j['detail'])
772        return None
773    if not isinstance(j, dict):
774        warn(response.text)
775        return None
776    return j

Fetch the columns and types of the pipe's table.

Parameters
Returns
  • A dictionary mapping column names to their database types.
Examples
>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
def get_pipe_columns_indices( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
779def get_pipe_columns_indices(
780    self,
781    pipe: mrsm.Pipe,
782    debug: bool = False,
783) -> Union[Dict[str, str], None]:
784    """
785    Fetch the index information for a pipe.
786
787    Parameters
788    ----------
789    pipe: mrsm.Pipe
790        The pipe whose columns to be queried.
791
792    Returns
793    -------
794    A dictionary mapping column names to a list of associated index information.
795    """
796    r_url = pipe_r_url(pipe) + '/columns/indices'
797    response = self.get(
798        r_url,
799        params={
800            'instance': self.get_pipe_instance_keys(pipe),
801        },
802        debug=debug,
803    )
804    j = response.json()
805    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
806        warn(j['detail'])
807        return None
808    if not isinstance(j, dict):
809        warn(response.text)
810        return None
811    return j

Fetch the index information for a pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose columns to be queried.
Returns
  • A dictionary mapping column names to a list of associated index information.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, str, int] = '', end: Union[datetime.datetime, int] = None, params: 'Optional[Dict, Any]' = None, debug: bool = False, **kw: Any) -> Iterator[pandas.core.frame.DataFrame]:
16def fetch(
17        self,
18        pipe: mrsm.Pipe,
19        begin: Union[datetime, str, int] = '',
20        end: Union[datetime, int] = None,
21        params: Optional[Dict, Any] = None,
22        debug: bool = False,
23        **kw: Any
24    ) -> Iterator['pd.DataFrame']:
25    """Get the Pipe data from the remote Pipe."""
26    from meerschaum.utils.debug import dprint
27    from meerschaum.utils.warnings import warn, error
28    from meerschaum.config._patch import apply_patch_to_config
29
30    fetch_params = pipe.parameters.get('fetch', {})
31    if not fetch_params:
32        warn(f"Missing 'fetch' parameters for {pipe}.", stack=False)
33        return None
34
35    pipe_meta = fetch_params.get('pipe', {})
36    ### Legacy: check for `connector_keys`, etc. at the root.
37    if not pipe_meta:
38        ck, mk, lk = (
39            fetch_params.get('connector_keys', None),
40            fetch_params.get('metric_key', None),
41            fetch_params.get('location_key', None),
42        )
43        if not ck or not mk:
44            warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False)
45            return None
46
47        pipe_meta.update({
48            'connector': ck,
49            'metric': mk,
50            'location': lk,
51        })
52
53    pipe_meta['instance'] = self
54    source_pipe = mrsm.Pipe(**pipe_meta)
55
56    _params = copy.deepcopy(params) if params is not None else {}
57    _params = apply_patch_to_config(_params, fetch_params.get('params', {}))
58    select_columns = fetch_params.get('select_columns', [])
59    omit_columns = fetch_params.get('omit_columns', [])
60
61    return source_pipe.get_data(
62        select_columns = select_columns,
63        omit_columns = omit_columns,
64        begin = begin,
65        end = end,
66        params = _params,
67        debug = debug,
68        as_iterator = True,
69    )

Get the Pipe data from the remote Pipe.

def register_plugin( self, plugin: meerschaum.Plugin, make_archive: bool = True, debug: bool = False) -> Tuple[bool, str]:
24def register_plugin(
25    self,
26    plugin: mrsm.core.Plugin,
27    make_archive: bool = True,
28    debug: bool = False,
29) -> SuccessTuple:
30    """Register a plugin and upload its archive."""
31    import json
32    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
33    file_pointer = open(archive_path, 'rb')
34    files = {'archive': file_pointer}
35    metadata = {
36        'version': plugin.version,
37        'attributes': json.dumps(plugin.attributes),
38    }
39    r_url = plugin_r_url(plugin)
40    try:
41        response = self.post(r_url, files=files, params=metadata, debug=debug)
42    except Exception:
43        return False, f"Failed to register plugin '{plugin}'."
44    finally:
45        file_pointer.close()
46
47    try:
48        success, msg = json.loads(response.text)
49    except Exception:
50        return False, response.text
51
52    return success, msg

Register a plugin and upload its archive.

def install_plugin( self, name: str, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
55def install_plugin(
56    self,
57    name: str,
58    skip_deps: bool = False,
59    force: bool = False,
60    debug: bool = False
61) -> SuccessTuple:
62    """Download and attempt to install a plugin from the API."""
63    import os
64    import pathlib
65    import json
66    from meerschaum.core import Plugin
67    from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH
68    from meerschaum.utils.debug import dprint
69    from meerschaum.utils.packages import attempt_import
70    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
71    r_url = plugin_r_url(name)
72    dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
73    if debug:
74        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
75    archive_path = self.wget(r_url, dest, debug=debug) 
76    is_binary = binaryornot_check.is_binary(str(archive_path))
77    if not is_binary:
78        fail_msg = f"Failed to download binary for plugin '{name}'."
79        try:
80            with open(archive_path, 'r') as f:
81                j = json.load(f)
82            if isinstance(j, list):
83                success, msg = tuple(j)
84            elif isinstance(j, dict) and 'detail' in j:
85                success, msg = False, fail_msg
86        except Exception:
87            success, msg = False, fail_msg
88        return success, msg
89    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
90    return plugin.install(skip_deps=skip_deps, force=force, debug=debug)

Download and attempt to install a plugin from the API.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False) -> Tuple[bool, str]:
156def delete_plugin(
157    self,
158    plugin: mrsm.core.Plugin,
159    debug: bool = False
160) -> SuccessTuple:
161    """Delete a plugin from an API repository."""
162    import json
163    r_url = plugin_r_url(plugin)
164    try:
165        response = self.delete(r_url, debug=debug)
166    except Exception:
167        return False, f"Failed to delete plugin '{plugin}'."
168
169    try:
170        success, msg = json.loads(response.text)
171    except Exception:
172        return False, response.text
173
174    return success, msg

Delete a plugin from an API repository.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) -> List[str]:
 93def get_plugins(
 94    self,
 95    user_id: Optional[int] = None,
 96    search_term: Optional[str] = None,
 97    debug: bool = False
 98) -> List[str]:
 99    """Return a list of registered plugin names.
100
101    Parameters
102    ----------
103    user_id: Optional[int], default None
104        If specified, return all plugins from a certain user.
105
106    search_term: Optional[str], default None
107        If specified, return plugins beginning with this string.
108
109    Returns
110    -------
111    A list of plugin names.
112    """
113    import json
114    from meerschaum.utils.warnings import error
115    from meerschaum._internal.static import STATIC_CONFIG
116    response = self.get(
117        STATIC_CONFIG['api']['endpoints']['plugins'],
118        params = {'user_id': user_id, 'search_term': search_term},
119        use_token = True,
120        debug = debug
121    )
122    if not response:
123        return []
124    plugins = json.loads(response.text)
125    if not isinstance(plugins, list):
126        error(response.text)
127    return plugins

Return a list of registered plugin names.

Parameters
  • user_id (Optional[int], default None): If specified, return all plugins from a certain user.
  • search_term (Optional[str], default None): If specified, return plugins beginning with this string.
Returns
  • A list of plugin names.
def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
130def get_plugin_attributes(
131    self,
132    plugin: mrsm.core.Plugin,
133    debug: bool = False
134) -> Dict[str, Any]:
135    """
136    Return a plugin's attributes.
137    """
138    import json
139    from meerschaum.utils.warnings import warn, error
140    r_url = plugin_r_url(plugin) + '/attributes'
141    response = self.get(r_url, use_token=True, debug=debug)
142    attributes = response.json()
143    if isinstance(attributes, str) and attributes and attributes[0] == '{':
144        try:
145            attributes = json.loads(attributes)
146        except Exception:
147            pass
148    if not isinstance(attributes, dict):
149        error(response.text)
150    elif not response and 'detail' in attributes:
151        warn(attributes['detail'])
152        return {}
153    return attributes

Return a plugin's attributes.

def login( self, debug: bool = False, warn: bool = True, **kw: Any) -> Tuple[bool, str]:
19def login(
20    self,
21    debug: bool = False,
22    warn: bool = True,
23    **kw: Any
24) -> SuccessTuple:
25    """Log in and set the session token."""
26    if self.login_scheme == 'api_key':
27        validate_response = self.post(
28            STATIC_CONFIG['api']['endpoints']['tokens'] + '/validate',
29            headers={'Authorization': f'Bearer {self.api_key}'},
30            use_token=False,
31            debug=debug,
32        )
33        if not validate_response:
34            return False, "API key is not valid."
35        return True, "API key is valid."
36
37    try:
38        if self.login_scheme == 'password':
39            login_data = {
40                'username': self.username,
41                'password': self.password,
42            }
43        elif self.login_scheme == 'client_credentials':
44            login_data = {
45                'client_id': self.client_id,
46                'client_secret': self.client_secret,
47            }
48    except AttributeError:
49        login_data = {}
50
51    if not login_data:
52        return False, f"Please login with the command `login {self}`."
53
54    login_scheme_msg = (
55        f" as user '{login_data['username']}'"
56        if self.login_scheme == 'username'
57        else ''
58    )
59
60    response = self.post(
61        STATIC_CONFIG['api']['endpoints']['login'],
62        data=login_data,
63        use_token=False,
64        debug=debug,
65    )
66    if response:
67        msg = f"Successfully logged into '{self}'{login_scheme_msg}'."
68        self._token = json.loads(response.text)['access_token']
69        self._expires = datetime.datetime.strptime(
70            json.loads(response.text)['expires'], 
71            '%Y-%m-%dT%H:%M:%S.%f'
72        )
73    else:
74        msg = (
75            f"Failed to log into '{self}'{login_scheme_msg}.\n" +
76            f"    Please verify login details for connector '{self}'."
77        )
78        if warn and not self.__dict__.get('_emitted_warning', False):
79            _warn(msg, stack=False)
80            self._emitted_warning = True
81
82    return response.__bool__(), msg

Log in and set the session token.

def test_connection(self, **kw: Any) -> Optional[bool]:
 85def test_connection(
 86    self,
 87    **kw: Any
 88) -> Union[bool, None]:
 89    """Test if a successful connection to the API may be made."""
 90    from meerschaum.connectors.poll import retry_connect
 91    _default_kw = {
 92        'max_retries': 1, 'retry_wait': 0, 'warn': False,
 93        'connector': self, 'enforce_chaining': False,
 94        'enforce_login': False,
 95    }
 96    _default_kw.update(kw)
 97    try:
 98        return retry_connect(**_default_kw)
 99    except Exception:
100        return False

Test if a successful connection to the API may be made.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
70def register_user(
71    self,
72    user: mrsm.core.User,
73    debug: bool = False,
74    **kw: Any
75) -> SuccessTuple:
76    """Register a new user."""
77    from meerschaum._internal.static import STATIC_CONFIG
78    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register"
79    data = {
80        'username': user.username,
81        'password': user.password,
82        'attributes': json.dumps(user.attributes),
83    }
84    if user.type:
85        data['type'] = user.type
86    if user.email:
87        data['email'] = user.email
88    response = self.post(r_url, data=data, debug=debug)
89    try:
90        _json = json.loads(response.text)
91        if isinstance(_json, dict) and 'detail' in _json:
92            return False, _json['detail']
93        success_tuple = tuple(_json)
94    except Exception:
95        msg = response.text if response else f"Failed to register user '{user}'."
96        return False, msg
97
98    return tuple(success_tuple)

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Union[int, str, uuid.UUID, NoneType]:
101def get_user_id(
102    self,
103    user: mrsm.core.User,
104    debug: bool = False,
105    **kw: Any
106) -> Union[int, str, UUID, None]:
107    """Get a user's ID."""
108    from meerschaum._internal.static import STATIC_CONFIG
109    from meerschaum.utils.misc import is_int, is_uuid
110    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id"
111    response = self.get(r_url, debug=debug, **kw)
112    try:
113        id_text = str(json.loads(response.text))
114        if is_int(id_text):
115            user_id = int(id_text)
116        elif is_uuid(id_text):
117            user_id = UUID(id_text)
118        else:
119            user_id = id_text
120    except Exception as e:
121        user_id = None
122    return user_id

Get a user's ID.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
19def get_users(
20    self,
21    debug: bool = False,
22    **kw: Any
23) -> List[str]:
24    """
25    Return a list of registered usernames.
26    """
27    from meerschaum._internal.static import STATIC_CONFIG
28    response = self.get(
29        f"{STATIC_CONFIG['api']['endpoints']['users']}",
30        debug = debug,
31        use_token = True,
32    )
33    if not response:
34        return []
35    try:
36        return response.json()
37    except Exception as e:
38        return []

Return a list of registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
41def edit_user(
42    self,
43    user: mrsm.core.User,
44    debug: bool = False,
45    **kw: Any
46) -> SuccessTuple:
47    """Edit an existing user."""
48    from meerschaum._internal.static import STATIC_CONFIG
49    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
50    data = {
51        'username': user.username,
52        'password': user.password,
53        'type': user.type,
54        'email': user.email,
55        'attributes': json.dumps(user.attributes),
56    }
57    response = self.post(r_url, data=data, debug=debug)
58    try:
59        _json = json.loads(response.text)
60        if isinstance(_json, dict) and 'detail' in _json:
61            return False, _json['detail']
62        success_tuple = tuple(_json)
63    except Exception:
64        msg = response.text if response else f"Failed to edit user '{user}'."
65        return False, msg
66
67    return tuple(success_tuple)

Edit an existing user.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
125def delete_user(
126    self,
127    user: mrsm.core.User,
128    debug: bool = False,
129    **kw: Any
130) -> SuccessTuple:
131    """Delete a user."""
132    from meerschaum._internal.static import STATIC_CONFIG
133    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}"
134    response = self.delete(r_url, debug=debug)
135    try:
136        _json = json.loads(response.text)
137        if isinstance(_json, dict) and 'detail' in _json:
138            return False, _json['detail']
139        success_tuple = tuple(_json)
140    except Exception:
141        success_tuple = False, f"Failed to delete user '{user.username}'."
142    return success_tuple

Delete a user.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
166def get_user_password_hash(
167    self,
168    user: mrsm.core.User,
169    debug: bool = False,
170    **kw: Any
171) -> Optional[str]:
172    """If configured, get a user's password hash."""
173    from meerschaum._internal.static import STATIC_CONFIG
174    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
175    response = self.get(r_url, debug=debug, **kw)
176    if not response:
177        return None
178    return response.json()

If configured, get a user's password hash.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
181def get_user_type(
182    self,
183    user: mrsm.core.User,
184    debug: bool = False,
185    **kw: Any
186) -> Optional[str]:
187    """If configured, get a user's type."""
188    from meerschaum._internal.static import STATIC_CONFIG
189    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type'
190    response = self.get(r_url, debug=debug, **kw)
191    if not response:
192        return None
193    return response.json()

If configured, get a user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw) -> int:
145def get_user_attributes(
146    self,
147    user: mrsm.core.User,
148    debug: bool = False,
149    **kw
150) -> int:
151    """Get a user's attributes."""
152    from meerschaum._internal.static import STATIC_CONFIG
153    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes"
154    response = self.get(r_url, debug=debug, **kw)
155    try:
156        attributes = json.loads(response.text)
157    except Exception:
158        attributes = None
159    return attributes

Get a user's attributes.

def register_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
20def register_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
21    """
22    Register the provided token to the API.
23    """
24    from meerschaum.utils.dtypes import json_serialize_value
25    r_url = tokens_endpoint + '/register'
26    response = self.post(
27        r_url,
28        data=json.dumps({
29            'label': token.label,
30            'scopes': token.scopes,
31            'expiration': token.expiration,
32        }, default=json_serialize_value),
33        debug=debug,
34    )
35    if not response:
36        return False, f"Failed to register token:\n{response.text}"
37
38    data = response.json()
39    token.label = data['label']
40    token.secret = data['secret']
41    token.id = uuid.UUID(data['id'])
42    if data.get('expiration', None):
43        token.expiration = datetime.fromisoformat(data['expiration'])
44
45    return True, f"Registered token '{token.label}'."

Register the provided token to the API.

def get_token_model( self, token_id: uuid.UUID, debug: bool = False) -> 'Union[TokenModel, None]':
48def get_token_model(self, token_id: uuid.UUID, debug: bool = False) -> 'Union[TokenModel, None]':
49    """
50    Return a token's model from the API instance.
51    """
52    from meerschaum.models import TokenModel
53    r_url = tokens_endpoint + f'/{token_id}'
54    response = self.get(r_url, debug=debug)
55    if not response:
56        return None
57    data = response.json()
58    return TokenModel(**data)

Return a token's model from the API instance.

def get_tokens( self, labels: Optional[List[str]] = None, debug: bool = False) -> List[meerschaum.core.Token._Token.Token]:
61def get_tokens(self, labels: Optional[List[str]] = None, debug: bool = False) -> List[Token]:
62    """
63    Return the tokens registered to the current user.
64    """
65    from meerschaum.utils.warnings import warn
66    r_url = tokens_endpoint
67    params = {}
68    if labels:
69        params['labels'] = ','.join(labels)
70    response = self.get(r_url, params={'labels': labels}, debug=debug)
71    if not response:
72        warn(f"Could not get tokens from '{self}':\n{response.text}")
73        return []
74
75    tokens = [
76        Token(instance=self, **payload)
77        for payload in response.json()
78    ]
79    return tokens

Return the tokens registered to the current user.

def edit_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
 82def edit_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
 83    """
 84    Persist the token's in-memory state to the API.
 85    """
 86    r_url = tokens_endpoint + f"/{token.id}/edit"
 87    response = self.post(
 88        r_url,
 89        json={
 90            'creation': token.creation.isoformat() if token.creation else None,
 91            'expiration': token.expiration.isoformat() if token.expiration else None,
 92            'label': token.label,
 93            'is_valid': token.is_valid,
 94            'scopes': token.scopes,
 95        },
 96    )
 97    if not response:
 98        return False, f"Failed to edit token:\n{response.text}"
 99
100    success, msg = response.json()
101    return success, msg

Persist the token's in-memory state to the API.

def invalidate_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
104def invalidate_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
105    """
106    Invalidate the token, disabling it for future requests.
107    """
108    r_url = tokens_endpoint + f"/{token.id}/invalidate"
109    response = self.post(r_url)
110    if not response:
111        return False, f"Failed to invalidate token:\n{response.text}"
112
113    success, msg = response.json()
114    return success, msg

Invalidate the token, disabling it for future requests.

def get_token_scopes( self, token_id: Union[uuid.UUID, meerschaum.core.Token._Token.Token], debug: bool = False) -> List[str]:
117def get_token_scopes(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> List[str]:
118    """
119    Return the scopes for a token.
120    """
121    _token_id = (token_id.id if isinstance(token_id, Token) else token_id)
122    model = self.get_token_model(_token_id, debug=debug).scopes
123    return getattr(model, 'scopes', [])

Return the scopes for a token.

def token_exists( self, token_id: Union[uuid.UUID, meerschaum.core.Token._Token.Token], debug: bool = False) -> bool:
126def token_exists(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> bool:
127    """
128    Return `True` if a token exists.
129    """
130    _token_id = (token_id.id if isinstance(token_id, Token) else token_id)
131    model = self.get_token_model(_token_id, debug=debug)
132    if model is None:
133        return False
134    return model.creation is not None

Return True if a token exists.

def delete_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
137def delete_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
138    """
139    Delete the token from the API.
140    """
141    r_url = tokens_endpoint + f"/{token.id}"
142    response = self.delete(r_url, debug=debug)
143    if not response:
144        return False, f"Failed to delete token:\n{response.text}"
145    
146    success, msg = response.json()
147    return success, msg

Delete the token from the API.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[APIConnector, Dict[str, Union[str, int]]]:
13@classmethod
14def from_uri(
15    cls,
16    uri: str,
17    label: Optional[str] = None,
18    as_dict: bool = False,
19) -> Union[
20        'meerschaum.connectors.APIConnector',
21        Dict[str, Union[str, int]],
22    ]:
23    """
24    Create a new APIConnector from a URI string.
25
26    Parameters
27    ----------
28    uri: str
29        The URI connection string.
30
31    label: Optional[str], default None
32        If provided, use this as the connector label.
33        Otherwise use the determined database name.
34
35    as_dict: bool, default False
36        If `True`, return a dictionary of the keyword arguments
37        necessary to create a new `APIConnector`, otherwise create a new object.
38
39    Returns
40    -------
41    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
42    """
43    from meerschaum.connectors.sql import SQLConnector
44    params = SQLConnector.parse_uri(uri)
45    if 'host' not in params:
46        error("No host was found in the provided URI.")
47    params['protocol'] = params.pop('flavor')
48    params['label'] = label or (
49        (
50            (params['username'] + '@' if 'username' in params else '')
51            + params['host']
52        ).lower()
53    )
54
55    return cls(**params) if not as_dict else params

Create a new APIConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.
Returns
  • A new APIConnector object or a dictionary of attributes (if as_dict is True).
def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
29    """
30    Return a dictionary of remote jobs.
31    """
32    response = self.get(JOBS_ENDPOINT, debug=debug)
33    if not response:
34        warn(f"Failed to get remote jobs from {self}.")
35        return {}
36    return {
37        name: Job(
38            name,
39            job_meta['sysargs'],
40            executor_keys=str(self),
41            _properties=job_meta['daemon']['properties']
42        )
43        for name, job_meta in response.json().items()
44    }

Return a dictionary of remote jobs.

def get_job(self, name: str, debug: bool = False) -> meerschaum.Job:
47def get_job(self, name: str, debug: bool = False) -> Job:
48    """
49    Return a single Job object.
50    """
51    metadata = self.get_job_metadata(name, debug=debug)
52    if not metadata:
53        raise ValueError(f"Job '{name}' does not exist.")
54
55    return Job(
56        name,
57        metadata['sysargs'],
58        executor_keys=str(self),
59        _properties=metadata['daemon']['properties'],
60    )

Return a single Job object.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 64    """
 65    Return the metadata for a single job.
 66    """
 67    now = time.perf_counter()
 68    _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None)
 69    _job_metadata_timestamp = (
 70        _job_metadata_cache.get(name, {}).get('timestamp', None)
 71    ) if _job_metadata_cache is not None else None
 72
 73    if (
 74        _job_metadata_timestamp is not None
 75        and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS
 76    ):
 77        if debug:
 78            dprint(f"Returning cached metadata for job '{name}'.")
 79        return _job_metadata_cache[name]['metadata']
 80
 81    response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug)
 82    if not response:
 83        if debug:
 84            msg = (
 85                response.json()['detail']
 86                if 'detail' in response.text
 87                else response.text
 88            )
 89            warn(f"Failed to get metadata for job '{name}':\n{msg}")
 90        return {}
 91
 92    metadata = response.json()
 93    if _job_metadata_cache is None:
 94        self._job_metadata_cache = {}
 95
 96    self._job_metadata_cache[name] = {
 97        'timestamp': now,
 98        'metadata': metadata,
 99    }
100    return metadata

Return the metadata for a single job.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
103    """
104    Return the daemon properties for a single job.
105    """
106    metadata = self.get_job_metadata(name, debug=debug)
107    return metadata.get('daemon', {}).get('properties', {})

Return the daemon properties for a single job.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
149def get_job_exists(self, name: str, debug: bool = False) -> bool:
150    """
151    Return whether a job exists.
152    """
153    response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug)
154    if not response:
155        warn(f"Failed to determine whether job '{name}' exists.")
156        return False
157
158    return response.json()

Return whether a job exists.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
162    """
163    Delete a job.
164    """
165    response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug)
166    if not response:
167        if 'detail' in response.text:
168            return False, response.json()['detail']
169
170        return False, response.text
171
172    return tuple(response.json())

Delete a job.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
176    """
177    Start a job.
178    """
179    response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug)
180    if not response:
181        if 'detail' in response.text:
182            return False, response.json()['detail']
183        return False, response.text
184
185    return tuple(response.json())

Start a job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, str]] = None, debug: bool = False) -> Tuple[bool, str]:
188def create_job(
189    self,
190    name: str,
191    sysargs: List[str],
192    properties: Optional[Dict[str, str]] = None,
193    debug: bool = False,
194) -> SuccessTuple:
195    """
196    Create a job.
197    """
198    response = self.post(
199        JOBS_ENDPOINT + f"/{name}",
200        json={
201            'sysargs': sysargs,
202            'properties': properties,
203        },
204        debug=debug,
205    )
206    if not response:
207        if 'detail' in response.text:
208            return False, response.json()['detail']
209        return False, response.text
210
211    return tuple(response.json())

Create a job.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
215    """
216    Stop a job.
217    """
218    response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug)
219    if not response:
220        if 'detail' in response.text:
221            return False, response.json()['detail']
222        return False, response.text
223
224    return tuple(response.json())

Stop a job.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
228    """
229    Pause a job.
230    """
231    response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug)
232    if not response:
233        if 'detail' in response.text:
234            return False, response.json()['detail']
235        return False, response.text
236
237    return tuple(response.json())

Pause a job.

def get_logs(self, name: str, debug: bool = False) -> str:
240def get_logs(self, name: str, debug: bool = False) -> str:
241    """
242    Return the logs for a job.
243    """
244    response = self.get(LOGS_ENDPOINT + f"/{name}")
245    if not response:
246        raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}")
247
248    return response.json()

Return the logs for a job.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
252    """
253    Return the job's manual stop time.
254    """
255    response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time")
256    if not response:
257        warn(f"Failed to get stop time for job '{name}':\n{response.text}")
258        return None
259
260    data = response.json()
261    if data is None:
262        return None
263
264    return datetime.fromisoformat(data)

Return the job's manual stop time.

def monitor_logs( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[NoneType], str], stop_callback_function: Callable[[NoneType], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
348def monitor_logs(
349    self,
350    name: str,
351    callback_function: Callable[[Any], Any],
352    input_callback_function: Callable[[None], str],
353    stop_callback_function: Callable[[None], str],
354    stop_on_exit: bool = False,
355    strip_timestamps: bool = False,
356    accept_input: bool = True,
357    debug: bool = False,
358):
359    """
360    Monitor a job's log files and execute a callback with the changes.
361    """
362    return asyncio.run(
363        self.monitor_logs_async(
364            name,
365            callback_function,
366            input_callback_function=input_callback_function,
367            stop_callback_function=stop_callback_function,
368            stop_on_exit=stop_on_exit,
369            strip_timestamps=strip_timestamps,
370            accept_input=accept_input,
371            debug=debug
372        )
373    )

Monitor a job's log files and execute a callback with the changes.

async def monitor_logs_async( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[], str], stop_callback_function: Callable[[Tuple[bool, str]], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
267async def monitor_logs_async(
268    self,
269    name: str,
270    callback_function: Callable[[Any], Any],
271    input_callback_function: Callable[[], str],
272    stop_callback_function: Callable[[SuccessTuple], str],
273    stop_on_exit: bool = False,
274    strip_timestamps: bool = False,
275    accept_input: bool = True,
276    debug: bool = False,
277):
278    """
279    Monitor a job's log files and await a callback with the changes.
280    """
281    import traceback
282    from meerschaum.jobs import StopMonitoringLogs
283    from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
284
285    websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions')
286    protocol = 'ws' if self.URI.startswith('http://') else 'wss'
287    port = self.port if 'port' in self.__dict__ else ''
288    uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws"
289
290    async def _stdin_callback(client):
291        if input_callback_function is None:
292            return
293
294        if asyncio.iscoroutinefunction(input_callback_function):
295            data = await input_callback_function()
296        else:
297            data = input_callback_function()
298
299        await client.send(data)
300
301    async def _stop_callback(client):
302        try:
303            result = tuple(json.loads(await client.recv()))
304        except Exception as e:
305            warn(traceback.format_exc())
306            result = False, str(e)
307
308        if stop_callback_function is not None:
309            if asyncio.iscoroutinefunction(stop_callback_function):
310                await stop_callback_function(result)
311            else:
312                stop_callback_function(result)
313
314        if stop_on_exit:
315            raise StopMonitoringLogs
316
317    message_callbacks = {
318        JOBS_STDIN_MESSAGE: _stdin_callback,
319        JOBS_STOP_MESSAGE: _stop_callback,
320    }
321
322    async with websockets.connect(uri) as websocket:
323        try:
324            await websocket.send(self.token or 'no-login')
325        except websockets_exceptions.ConnectionClosedOK:
326            pass
327
328        while True:
329            try:
330                response = await websocket.recv()
331                callback = message_callbacks.get(response, None)
332                if callback is not None:
333                    await callback(websocket)
334                    continue
335
336                if strip_timestamps:
337                    response = strip_timestamp_from_line(response)
338
339                if asyncio.iscoroutinefunction(callback_function):
340                    await callback_function(response)
341                else:
342                    callback_function(response)
343            except (KeyboardInterrupt, StopMonitoringLogs):
344                await websocket.close()
345                break

Monitor a job's log files and await a callback with the changes.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
376    """
377    Return whether a remote job is blocking on stdin.
378    """
379    response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug)
380    if not response:
381        return False
382
383    return response.json()

Return whether a remote job is blocking on stdin.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
117    """
118    Return a job's `began` timestamp, if it exists.
119    """
120    properties = self.get_job_properties(name, debug=debug)
121    began_str = properties.get('daemon', {}).get('began', None)
122    if began_str is None:
123        return None
124
125    return began_str

Return a job's began timestamp, if it exists.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
128    """
129    Return a job's `ended` timestamp, if it exists.
130    """
131    properties = self.get_job_properties(name, debug=debug)
132    ended_str = properties.get('daemon', {}).get('ended', None)
133    if ended_str is None:
134        return None
135
136    return ended_str

Return a job's ended timestamp, if it exists.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
139    """
140    Return a job's `paused` timestamp, if it exists.
141    """
142    properties = self.get_job_properties(name, debug=debug)
143    paused_str = properties.get('daemon', {}).get('paused', None)
144    if paused_str is None:
145        return None
146
147    return paused_str

Return a job's paused timestamp, if it exists.

def get_job_status(self, name: str, debug: bool = False) -> str:
109def get_job_status(self, name: str, debug: bool = False) -> str:
110    """
111    Return the job's status.
112    """
113    metadata = self.get_job_metadata(name, debug=debug)
114    return metadata.get('status', 'stopped')

Return the job's status.