meerschaum.connectors.api

Interact with the Meerschaum API (send commands, pull data, etc.)

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Interact with the Meerschaum API (send commands, pull data, etc.)
 7"""
 8
 9from meerschaum.connectors.api._APIConnector import APIConnector
10
11__all__ = ('APIConnector',)
class APIConnector(meerschaum.connectors.instance._InstanceConnector.InstanceConnector):
 22class APIConnector(InstanceConnector):
 23    """
 24    Connect to a Meerschaum API instance.
 25    """
 26
 27    IS_THREAD_SAFE: bool = False
 28    OPTIONAL_ATTRIBUTES: List[str] = ['port', 'client_secret', 'client_id', 'api_key']
 29
 30    from ._request import (
 31        make_request,
 32        get,
 33        post,
 34        put,
 35        patch,
 36        delete,
 37        wget,
 38    )
 39    from ._actions import (
 40        get_actions,
 41        do_action,
 42        do_action_async,
 43        do_action_legacy,
 44    )
 45    from ._misc import get_mrsm_version, get_chaining_status
 46    from ._pipes import (
 47        get_pipe_instance_keys,
 48        register_pipe,
 49        fetch_pipes_keys,
 50        edit_pipe,
 51        sync_pipe,
 52        delete_pipe,
 53        get_pipe_data,
 54        get_pipe_id,
 55        get_pipe_attributes,
 56        get_sync_time,
 57        pipe_exists,
 58        create_metadata,
 59        get_pipe_rowcount,
 60        drop_pipe,
 61        clear_pipe,
 62        get_pipe_columns_types,
 63        get_pipe_columns_indices,
 64    )
 65    from ._fetch import fetch
 66    from ._plugins import (
 67        register_plugin,
 68        install_plugin,
 69        delete_plugin,
 70        get_plugins,
 71        get_plugin_attributes,
 72    )
 73    from ._login import login, test_connection
 74    from ._users import (
 75        register_user,
 76        get_user_id,
 77        get_users,
 78        edit_user,
 79        delete_user,
 80        get_user_password_hash,
 81        get_user_type,
 82        get_user_attributes,
 83    )
 84    from ._tokens import (
 85        register_token,
 86        get_token_model,
 87        get_tokens,
 88        edit_token,
 89        invalidate_token,
 90        get_token_scopes,
 91        token_exists,
 92        delete_token,
 93    )
 94    from ._uri import from_uri
 95    from ._jobs import (
 96        get_jobs,
 97        get_job,
 98        get_job_metadata,
 99        get_job_properties,
100        get_job_exists,
101        delete_job,
102        start_job,
103        create_job,
104        stop_job,
105        pause_job,
106        get_logs,
107        get_job_stop_time,
108        monitor_logs,
109        monitor_logs_async,
110        get_job_is_blocking_on_stdin,
111        get_job_began,
112        get_job_ended,
113        get_job_paused,
114        get_job_status,
115    )
116
117    def __init__(
118        self,
119        label: Optional[str] = None,
120        wait: bool = False,
121        debug: bool = False,
122        **kw
123    ):
124        if 'uri' in kw:
125            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
126            label = label or from_uri_params.get('label', None)
127            _ = from_uri_params.pop('label', None)
128            kw.update(from_uri_params)
129
130        super().__init__('api', label=label, **kw)
131        if 'protocol' not in self.__dict__:
132            self.protocol = (
133                'https' if self.__dict__.get('uri', '').startswith('https')
134                else 'http'
135            )
136
137        if 'uri' not in self.__dict__:
138            self.verify_attributes(required_attributes)
139        else:
140            from meerschaum.connectors.sql import SQLConnector
141            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
142            if 'host' not in conn_attrs:
143                raise Exception(f"Invalid URI for '{self}'.")
144            self.__dict__.update(conn_attrs)
145
146        self.url = (
147            self.protocol + '://' +
148            self.host
149            + (
150                (':' + str(self.port))
151                if self.__dict__.get('port', None)
152                else ''
153            )
154        )
155        self._token = None
156        self._expires = None
157        self._session = None
158        self._instance_keys = self.__dict__.get('instance_keys', None)
159
160
161    @property
162    def URI(self) -> str:
163        """
164        Return the fully qualified URI.
165        """
166        import urllib.parse
167        username = self.__dict__.get('username', None)
168        password = self.__dict__.get('password', None)
169        client_id = self.__dict__.get('client_id', None)
170        client_secret = self.__dict__.get('client_secret', None)
171        api_key = self.__dict__.get('api_key', None)
172        creds = (username + ':' + password + '@') if username and password else ''
173        params = {}
174        params_str = ('?' + urllib.parse.urlencode(params)) if params else ''
175        return (
176            self.protocol
177            + '://'
178            + creds
179            + self.host
180            + (
181                (':' + str(self.port))
182                if self.__dict__.get('port', None)
183                else ''
184            )
185            + params_str
186        )
187
188    @property
189    def session(self):
190        if self._session is None:
191            _ = attempt_import('certifi', lazy=False)
192            requests = attempt_import('requests', lazy=False)
193            if requests:
194                self._session = requests.Session()
195            if self._session is None:
196                error("Failed to import requests. Is requests installed?")
197        return self._session
198
199    @property
200    def token(self):
201        if self.login_scheme == 'api_key':
202            return self.api_key
203
204        expired = (
205            True if self._expires is None else (
206                (
207                    self._expires
208                    <
209                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
210                )
211            )
212        )
213
214        if self._token is None or expired:
215            success, msg = self.login()
216            if not success and not self.__dict__.get('_emitted_warning'):
217                warn(msg, stack=False)
218                self._emitted_warning = True
219        return self._token
220
221    @property
222    def instance_keys(self) -> Union[str, None]:
223        """
224        Return the instance keys to be sent alongside pipe requests.
225        """
226        return self._instance_keys
227
228    @property
229    def login_scheme(self) -> str:
230        """
231        Return the login scheme to use based on the configured credentials.
232        """
233        if 'username' in self.__dict__:
234            return 'password'
235        if 'client_id' in self.__dict__:
236            return 'client_credentials'
237        elif 'api_key' in self.__dict__:
238            return 'api_key'
239
240        return 'password'

Connect to a Meerschaum API instance.

APIConnector( label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)
117    def __init__(
118        self,
119        label: Optional[str] = None,
120        wait: bool = False,
121        debug: bool = False,
122        **kw
123    ):
124        if 'uri' in kw:
125            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
126            label = label or from_uri_params.get('label', None)
127            _ = from_uri_params.pop('label', None)
128            kw.update(from_uri_params)
129
130        super().__init__('api', label=label, **kw)
131        if 'protocol' not in self.__dict__:
132            self.protocol = (
133                'https' if self.__dict__.get('uri', '').startswith('https')
134                else 'http'
135            )
136
137        if 'uri' not in self.__dict__:
138            self.verify_attributes(required_attributes)
139        else:
140            from meerschaum.connectors.sql import SQLConnector
141            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
142            if 'host' not in conn_attrs:
143                raise Exception(f"Invalid URI for '{self}'.")
144            self.__dict__.update(conn_attrs)
145
146        self.url = (
147            self.protocol + '://' +
148            self.host
149            + (
150                (':' + str(self.port))
151                if self.__dict__.get('port', None)
152                else ''
153            )
154        )
155        self._token = None
156        self._expires = None
157        self._session = None
158        self._instance_keys = self.__dict__.get('instance_keys', None)

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
IS_THREAD_SAFE: bool = False
OPTIONAL_ATTRIBUTES: List[str] = ['port', 'client_secret', 'client_id', 'api_key']
url
URI: str
161    @property
162    def URI(self) -> str:
163        """
164        Return the fully qualified URI.
165        """
166        import urllib.parse
167        username = self.__dict__.get('username', None)
168        password = self.__dict__.get('password', None)
169        client_id = self.__dict__.get('client_id', None)
170        client_secret = self.__dict__.get('client_secret', None)
171        api_key = self.__dict__.get('api_key', None)
172        creds = (username + ':' + password + '@') if username and password else ''
173        params = {}
174        params_str = ('?' + urllib.parse.urlencode(params)) if params else ''
175        return (
176            self.protocol
177            + '://'
178            + creds
179            + self.host
180            + (
181                (':' + str(self.port))
182                if self.__dict__.get('port', None)
183                else ''
184            )
185            + params_str
186        )

Return the fully qualified URI.

session
188    @property
189    def session(self):
190        if self._session is None:
191            _ = attempt_import('certifi', lazy=False)
192            requests = attempt_import('requests', lazy=False)
193            if requests:
194                self._session = requests.Session()
195            if self._session is None:
196                error("Failed to import requests. Is requests installed?")
197        return self._session
token
199    @property
200    def token(self):
201        if self.login_scheme == 'api_key':
202            return self.api_key
203
204        expired = (
205            True if self._expires is None else (
206                (
207                    self._expires
208                    <
209                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
210                )
211            )
212        )
213
214        if self._token is None or expired:
215            success, msg = self.login()
216            if not success and not self.__dict__.get('_emitted_warning'):
217                warn(msg, stack=False)
218                self._emitted_warning = True
219        return self._token
instance_keys: Optional[str]
221    @property
222    def instance_keys(self) -> Union[str, None]:
223        """
224        Return the instance keys to be sent alongside pipe requests.
225        """
226        return self._instance_keys

Return the instance keys to be sent alongside pipe requests.

login_scheme: str
228    @property
229    def login_scheme(self) -> str:
230        """
231        Return the login scheme to use based on the configured credentials.
232        """
233        if 'username' in self.__dict__:
234            return 'password'
235        if 'client_id' in self.__dict__:
236            return 'client_credentials'
237        elif 'api_key' in self.__dict__:
238            return 'api_key'
239
240        return 'password'

Return the login scheme to use based on the configured credentials.

def make_request( self, method: str, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kwargs: Any) -> requests.models.Response:
 28def make_request(
 29    self,
 30    method: str,
 31    r_url: str,
 32    headers: Optional[Dict[str, Any]] = None,
 33    use_token: bool = True,
 34    debug: bool = False,
 35    **kwargs: Any
 36) -> 'requests.Response':
 37    """
 38    Make a request to this APIConnector's endpoint using the in-memory session.
 39
 40    Parameters
 41    ----------
 42    method: str
 43        The kind of request to make.
 44        Accepted values:
 45        - `'GET'`
 46        - `'OPTIONS'`
 47        - `'HEAD'`
 48        - `'POST'`
 49        - `'PUT'`
 50        - `'PATCH'`
 51        - `'DELETE'`
 52
 53    r_url: str
 54        The relative URL for the endpoint (e.g. `'/pipes'`).
 55
 56    headers: Optional[Dict[str, Any]], default None
 57        The headers to use for the request.
 58        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
 59
 60    use_token: bool, default True
 61        If `True`, add the authorization token to the headers.
 62
 63    debug: bool, default False
 64        Verbosity toggle.
 65
 66    kwargs: Any
 67        All other keyword arguments are passed to `requests.request`.
 68
 69    Returns
 70    -------
 71    A `requests.Reponse` object.
 72    """
 73    if method.upper() not in METHODS:
 74        raise ValueError(f"Method '{method}' is not supported.")
 75
 76    verify = self.__dict__.get('verify', None)
 77    if 'verify' not in kwargs and isinstance(verify, bool):
 78        kwargs['verify'] = verify
 79
 80    headers = (
 81        copy.deepcopy(headers)
 82        if isinstance(headers, dict)
 83        else {}
 84    )
 85
 86    if use_token:
 87        headers.update({'Authorization': f'Bearer {self.token}'})
 88
 89    if 'timeout' not in kwargs:
 90        kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout']
 91
 92    request_url = urllib.parse.urljoin(self.url, r_url)
 93    if debug:
 94        dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}")
 95
 96    return self.session.request(
 97        method.upper(),
 98        request_url,
 99        headers=headers,
100        **kwargs
101    )

Make a request to this APIConnector's endpoint using the in-memory session.

Parameters
  • method (str): The kind of request to make. Accepted values:
    • 'GET'
    • 'OPTIONS'
    • 'HEAD'
    • 'POST'
    • 'PUT'
    • 'PATCH'
    • 'DELETE'
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def get(self, r_url: str, **kwargs: Any) -> requests.models.Response:
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response':
105    """
106    Wrapper for `requests.get`.
107
108    Parameters
109    ----------
110    r_url: str
111        The relative URL for the endpoint (e.g. `'/pipes'`).
112
113    headers: Optional[Dict[str, Any]], default None
114        The headers to use for the request.
115        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
116
117    use_token: bool, default True
118        If `True`, add the authorization token to the headers.
119
120    debug: bool, default False
121        Verbosity toggle.
122
123    kwargs: Any
124        All other keyword arguments are passed to `requests.request`.
125
126    Returns
127    -------
128    A `requests.Reponse` object.
129
130    """
131    return self.make_request('GET', r_url, **kwargs)

Wrapper for requests.get.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def post(self, r_url: str, **kwargs: Any) -> requests.models.Response:
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response':
135    """
136    Wrapper for `requests.post`.
137
138    Parameters
139    ----------
140    r_url: str
141        The relative URL for the endpoint (e.g. `'/pipes'`).
142
143    headers: Optional[Dict[str, Any]], default None
144        The headers to use for the request.
145        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
146
147    use_token: bool, default True
148        If `True`, add the authorization token to the headers.
149
150    debug: bool, default False
151        Verbosity toggle.
152
153    kwargs: Any
154        All other keyword arguments are passed to `requests.request`.
155
156    Returns
157    -------
158    A `requests.Reponse` object.
159
160    """
161    return self.make_request('POST', r_url, **kwargs)

Wrapper for requests.post.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def put(self, r_url: str, **kwargs: Any) -> requests.models.Response:
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response':
194    """
195    Wrapper for `requests.put`.
196
197    Parameters
198    ----------
199    r_url: str
200        The relative URL for the endpoint (e.g. `'/pipes'`).
201
202    headers: Optional[Dict[str, Any]], default None
203        The headers to use for the request.
204        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
205
206    use_token: bool, default True
207        If `True`, add the authorization token to the headers.
208
209    debug: bool, default False
210        Verbosity toggle.
211
212    kwargs: Any
213        All other keyword arguments are passed to `requests.request`.
214
215    Returns
216    -------
217    A `requests.Reponse` object.
218    """
219    return self.make_request('PUT', r_url, **kwargs)

Wrapper for requests.put.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def patch(self, r_url: str, **kwargs: Any) -> requests.models.Response:
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response':
165    """
166    Wrapper for `requests.patch`.
167
168    Parameters
169    ----------
170    r_url: str
171        The relative URL for the endpoint (e.g. `'/pipes'`).
172
173    headers: Optional[Dict[str, Any]], default None
174        The headers to use for the request.
175        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
176
177    use_token: bool, default True
178        If `True`, add the authorization token to the headers.
179
180    debug: bool, default False
181        Verbosity toggle.
182
183    kwargs: Any
184        All other keyword arguments are passed to `requests.request`.
185
186    Returns
187    -------
188    A `requests.Reponse` object.
189    """
190    return self.make_request('PATCH', r_url, **kwargs)

Wrapper for requests.patch.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def delete(self, r_url: str, **kwargs: Any) -> requests.models.Response:
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response':
223    """
224    Wrapper for `requests.delete`.
225
226    Parameters
227    ----------
228    r_url: str
229        The relative URL for the endpoint (e.g. `'/pipes'`).
230
231    headers: Optional[Dict[str, Any]], default None
232        The headers to use for the request.
233        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
234
235    use_token: bool, default True
236        If `True`, add the authorization token to the headers.
237
238    debug: bool, default False
239        Verbosity toggle.
240
241    kwargs: Any
242        All other keyword arguments are passed to `requests.request`.
243
244    Returns
245    -------
246    A `requests.Reponse` object.
247    """
248    return self.make_request('DELETE', r_url, **kwargs)

Wrapper for requests.delete.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def wget( self, r_url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
251def wget(
252    self,
253    r_url: str,
254    dest: Optional[Union[str, pathlib.Path]] = None,
255    headers: Optional[Dict[str, Any]] = None,
256    use_token: bool = True,
257    debug: bool = False,
258    **kw: Any
259) -> pathlib.Path:
260    """Mimic wget with requests."""
261    from meerschaum.utils.misc import wget
262    if headers is None:
263        headers = {}
264    if use_token:
265        headers.update({'Authorization': f'Bearer {self.token}'})
266    request_url = urllib.parse.urljoin(self.url, r_url)
267    if debug:
268        dprint(
269            f"[{self}] Downloading {request_url}"
270            + (f' to {dest}' if dest is not None else '')
271            + "..."
272        )
273    return wget(request_url, dest=dest, headers=headers, **kw)

Mimic wget with requests.

def get_actions(self):
24def get_actions(self):
25    """Get available actions from the API instance."""
26    return self.get(ACTIONS_ENDPOINT)

Get available actions from the API instance.

def do_action(self, sysargs: List[str]) -> Tuple[bool, str]:
29def do_action(self, sysargs: List[str]) -> SuccessTuple:
30    """
31    Execute a Meerschaum action remotely.
32    """
33    return asyncio.run(self.do_action_async(sysargs))

Execute a Meerschaum action remotely.

async def do_action_async( self, sysargs: List[str], callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='')) -> Tuple[bool, str]:
36async def do_action_async(
37    self,
38    sysargs: List[str],
39    callback_function: Callable[[str], None] = partial(print, end=''),
40) -> SuccessTuple:
41    """
42    Execute an action as a temporary remote job.
43    """
44    from meerschaum._internal.arguments import remove_api_executor_keys
45    from meerschaum.utils.misc import generate_password
46    sysargs = remove_api_executor_keys(sysargs)
47
48    job_name = TEMP_PREFIX + generate_password(12)
49    job = mrsm.Job(job_name, sysargs, executor_keys=str(self))
50
51    start_success, start_msg = job.start()
52    if not start_success:
53        return start_success, start_msg
54
55    await job.monitor_logs_async(
56        callback_function=callback_function,
57        stop_on_exit=True,
58        strip_timestamps=True,
59    )
60
61    success, msg = job.result
62    job.delete()
63    return success, msg

Execute an action as a temporary remote job.

def do_action_legacy( self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
 66def do_action_legacy(
 67    self,
 68    action: Optional[List[str]] = None,
 69    sysargs: Optional[List[str]] = None,
 70    debug: bool = False,
 71    **kw
 72) -> SuccessTuple:
 73    """
 74    NOTE: This method is deprecated.
 75    Please use `do_action()` or `do_action_async()`.
 76
 77    Execute a Meerschaum action remotely.
 78
 79    If `sysargs` are provided, parse those instead.
 80    Otherwise infer everything from keyword arguments.
 81
 82    Examples
 83    --------
 84    >>> conn = mrsm.get_connector('api:main')
 85    >>> conn.do_action(['show', 'pipes'])
 86    (True, "Success")
 87    >>> conn.do_action(['show', 'arguments'], name='test')
 88    (True, "Success")
 89    """
 90    import sys, json
 91    from meerschaum.utils.debug import dprint
 92    from meerschaum._internal.static import STATIC_CONFIG
 93    from meerschaum.utils.misc import json_serialize_datetime
 94    if action is None:
 95        action = []
 96
 97    if sysargs is not None and action and action[0] == '':
 98        from meerschaum._internal.arguments import parse_arguments
 99        if debug:
100            dprint(f"Parsing sysargs:\n{sysargs}")
101        json_dict = parse_arguments(sysargs)
102    else:
103        json_dict = kw
104        json_dict['action'] = action
105        if 'noask' not in kw:
106            json_dict['noask'] = True
107        if 'yes' not in kw:
108            json_dict['yes'] = True
109        if debug:
110            json_dict['debug'] = debug
111
112    root_action = json_dict['action'][0]
113    del json_dict['action'][0]
114    r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}"
115    
116    if debug:
117        from meerschaum.utils.formatting import pprint
118        dprint(f"Sending data to '{self.url + r_url}':")
119        pprint(json_dict, stream=sys.stderr)
120
121    response = self.post(
122        r_url,
123        data = json.dumps(json_dict, default=json_serialize_datetime),
124        debug = debug,
125    )
126    try:
127        response_list = json.loads(response.text)
128        if isinstance(response_list, dict) and 'detail' in response_list:
129            return False, response_list['detail']
130    except Exception as e:
131        print(f"Invalid response: {response}")
132        print(e)
133        return False, response.text
134    if debug:
135        dprint(response)
136    try:
137        return response_list[0], response_list[1]
138    except Exception as e:
139        return False, f"Failed to parse result from action '{root_action}'"

NOTE: This method is deprecated. Please use do_action() or do_action_async().

Execute a Meerschaum action remotely.

If sysargs are provided, parse those instead. Otherwise infer everything from keyword arguments.

Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
def get_mrsm_version(self, **kw) -> Optional[str]:
13def get_mrsm_version(self, **kw) -> Optional[str]:
14    """
15    Return the Meerschaum version of the API instance.
16    """
17    from meerschaum._internal.static import STATIC_CONFIG
18    try:
19        j = self.get(
20            STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm',
21            use_token=False,
22            **kw
23        ).json()
24    except Exception:
25        return None
26    if isinstance(j, dict) and 'detail' in j:
27        return None
28    return j

Return the Meerschaum version of the API instance.

def get_chaining_status(self, **kw) -> Optional[bool]:
31def get_chaining_status(self, **kw) -> Optional[bool]:
32    """
33    Fetch the chaining status of the API instance.
34    """
35    from meerschaum._internal.static import STATIC_CONFIG
36    try:
37        response = self.get(
38            STATIC_CONFIG['api']['endpoints']['chaining'],
39            use_token = True,
40            **kw
41        )
42        if not response:
43            return None
44    except Exception:
45        return None
46
47    return response.json()

Fetch the chaining status of the API instance.

def get_pipe_instance_keys(self, pipe: meerschaum.Pipe) -> Optional[str]:
35def get_pipe_instance_keys(self, pipe: mrsm.Pipe) -> Union[str, None]:
36    """
37    Return the configured instance keys for a pipe if set,
38    else fall back to the default `instance_keys` for this `APIConnector`.
39    """
40    return pipe.parameters.get('instance_keys', self.instance_keys)

Return the configured instance keys for a pipe if set, else fall back to the default instance_keys for this APIConnector.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
43def register_pipe(
44    self,
45    pipe: mrsm.Pipe,
46    debug: bool = False
47) -> SuccessTuple:
48    """Submit a POST to the API to register a new Pipe object.
49    Returns a tuple of (success_bool, response_dict).
50    """
51    from meerschaum.utils.debug import dprint
52    r_url = pipe_r_url(pipe)
53    response = self.post(
54        r_url + '/register',
55        json=pipe._attributes.get('parameters', {}),
56        params={'instance_keys': self.get_pipe_instance_keys(pipe)},
57        debug=debug,
58    )
59    if debug:
60        dprint(response.text)
61
62    if not response:
63        return False, response.text
64
65    response_data = response.json()
66    if isinstance(response_data, list):
67        response_tuple = response_data[0], response_data[1]
68    elif 'detail' in response.json():
69        response_tuple = response.__bool__(), response_data['detail']
70    else:
71        response_tuple = response.__bool__(), response.text
72    return response_tuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> List[Union[Tuple[str, str, Optional[str]], Tuple[str, str, Optional[str], List[str]], Tuple[str, str, Optional[str], Dict[str, Any]]]]:
108def fetch_pipes_keys(
109    self,
110    connector_keys: Optional[List[str]] = None,
111    metric_keys: Optional[List[str]] = None,
112    location_keys: Optional[List[str]] = None,
113    tags: Optional[List[str]] = None,
114    params: Optional[Dict[str, Any]] = None,
115    debug: bool = False
116) -> List[
117        Union[
118            Tuple[str, str, Union[str, None]],
119            Tuple[str, str, Union[str, None], List[str]],
120            Tuple[str, str, Union[str, None], Dict[str, Any]]
121        ]
122    ]:
123    """
124    Fetch registered Pipes' keys from the API.
125    
126    Parameters
127    ----------
128    connector_keys: Optional[List[str]], default None
129        The connector keys for the query.
130
131    metric_keys: Optional[List[str]], default None
132        The metric keys for the query.
133
134    location_keys: Optional[List[str]], default None
135        The location keys for the query.
136
137    tags: Optional[List[str]], default None
138        A list of tags for the query.
139
140    params: Optional[Dict[str, Any]], default None
141        A parameters dictionary for filtering against the `pipes` table
142        (e.g. `{'connector_keys': 'plugin:foo'}`).
143        Not recommeded to be used.
144
145    debug: bool, default False
146        Verbosity toggle.
147
148    Returns
149    -------
150    A list of tuples containing pipes' keys.
151    """
152    from meerschaum._internal.static import STATIC_CONFIG
153    if connector_keys is None:
154        connector_keys = []
155    if metric_keys is None:
156        metric_keys = []
157    if location_keys is None:
158        location_keys = []
159    if tags is None:
160        tags = []
161
162    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
163    try:
164        j = self.get(
165            r_url,
166            params={
167                'connector_keys': json.dumps(connector_keys),
168                'metric_keys': json.dumps(metric_keys),
169                'location_keys': json.dumps(location_keys),
170                'tags': json.dumps(tags),
171                'params': json.dumps(params),
172                'instance_keys': self.instance_keys,
173            },
174            debug=debug
175        ).json()
176    except Exception as e:
177        import traceback
178        traceback.print_exc()
179        error(str(e))
180
181    if 'detail' in j:
182        error(j['detail'], stack=False)
183    return [tuple(r) for r in j]

Fetch registered Pipes' keys from the API.

Parameters
  • connector_keys (Optional[List[str]], default None): The connector keys for the query.
  • metric_keys (Optional[List[str]], default None): The metric keys for the query.
  • location_keys (Optional[List[str]], default None): The location keys for the query.
  • tags (Optional[List[str]], default None): A list of tags for the query.
  • params (Optional[Dict[str, Any]], default None): A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of tuples containing pipes' keys.
def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False) -> Tuple[bool, str]:
 75def edit_pipe(
 76    self,
 77    pipe: mrsm.Pipe,
 78    patch: bool = False,
 79    debug: bool = False,
 80) -> SuccessTuple:
 81    """Submit a PATCH to the API to edit an existing Pipe object.
 82    Returns a tuple of (success_bool, response_dict).
 83    """
 84    from meerschaum.utils.debug import dprint
 85    ### NOTE: if `parameters` is supplied in the Pipe constructor,
 86    ###       then `pipe.parameters` will exist and not be fetched from the database.
 87    r_url = pipe_r_url(pipe)
 88    response = self.patch(
 89        r_url + '/edit',
 90        params={'patch': patch, 'instance_keys': self.get_pipe_instance_keys(pipe)},
 91        json=pipe.get_parameters(apply_symlinks=False),
 92        debug=debug,
 93    )
 94    if debug:
 95        dprint(response.text)
 96
 97    response_data = response.json()
 98
 99    if isinstance(response.json(), list):
100        response_tuple = response_data[0], response_data[1]
101    elif 'detail' in response.json():
102        response_tuple = response.__bool__(), response_data['detail']
103    else:
104        response_tuple = response.__bool__(), response.text
105    return response_tuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

def sync_pipe( self, pipe: meerschaum.Pipe, df: "Optional[Union['pd.DataFrame', Dict[Any, Any], str]]" = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
186def sync_pipe(
187    self,
188    pipe: mrsm.Pipe,
189    df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None,
190    chunksize: Optional[int] = -1,
191    debug: bool = False,
192    **kw: Any
193) -> SuccessTuple:
194    """Sync a DataFrame into a Pipe."""
195    from decimal import Decimal
196    from meerschaum.utils.debug import dprint
197    from meerschaum.utils.dtypes import json_serialize_value
198    from meerschaum.utils.misc import items_str, interval_str
199    from meerschaum.config import get_config
200    from meerschaum.utils.packages import attempt_import
201    from meerschaum.utils.dataframe import get_special_cols, to_json
202    begin = time.perf_counter()
203    more_itertools = attempt_import('more_itertools')
204    if df is None:
205        msg = f"DataFrame is `None`. Cannot sync {pipe}."
206        return False, msg
207
208    def get_json_str(c):
209        if isinstance(c, str):
210            return c
211        if isinstance(c, (dict, list, tuple)):
212            return json.dumps(c, default=json_serialize_value)
213        return to_json(c, orient='columns', geometry_format='wkb_hex')
214
215    df = json.loads(df) if isinstance(df, str) else df
216
217    _chunksize: Optional[int] = (1 if chunksize is None else (
218        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
219        else chunksize
220    ))
221    keys: List[str] = list(df.columns)
222    chunks = []
223    if hasattr(df, 'index'):
224        df = df.reset_index(drop=True)
225        is_dask = 'dask' in df.__module__
226        chunks = (
227            (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize))
228            if not is_dask
229            else [partition.compute() for partition in df.partitions]
230        )
231
232    elif isinstance(df, dict):
233        ### `_chunks` is a dict of lists of dicts.
234        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
235        _chunks = {k: [] for k in keys}
236        for k in keys:
237            chunk_iter = more_itertools.chunked(df[k], _chunksize)
238            for l in chunk_iter:
239                _chunks[k].append({k: l})
240
241        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
242        for k, l in _chunks.items():
243            for i, c in enumerate(l):
244                try:
245                    chunks[i].update(c)
246                except IndexError:
247                    chunks.append(c)
248    elif isinstance(df, list):
249        chunks = (df[i] for i in more_itertools.chunked(df, _chunksize))
250
251    ### Send columns in case the user has defined them locally.
252    request_params = kw.copy()
253    if pipe.columns:
254        request_params['columns'] = json.dumps(pipe.columns)
255    request_params['instance_keys'] = self.get_pipe_instance_keys(pipe)
256    r_url = pipe_r_url(pipe) + '/data'
257
258    rowcount = 0
259    num_success_chunks = 0
260    for i, c in enumerate(chunks):
261        if debug:
262            dprint(f"[{self}] Posting chunk {i} to {r_url}...")
263        if len(c) == 0:
264            if debug:
265                dprint(f"[{self}] Skipping empty chunk...")
266            continue
267        json_str = get_json_str(c)
268
269        try:
270            response = self.post(
271                r_url,
272                params=request_params,
273                data=json_str,
274                debug=debug,
275            )
276        except Exception as e:
277            msg = f"Failed to post a chunk to {pipe}:\n{e}"
278            warn(msg)
279            return False, msg
280            
281        if not response:
282            return False, f"Failed to sync a chunk:\n{response.text}"
283
284        try:
285            j = json.loads(response.text)
286        except Exception as e:
287            return False, f"Failed to parse response from syncing {pipe}:\n{e}"
288
289        if isinstance(j, dict) and 'detail' in j:
290            return False, j['detail']
291
292        try:
293            j = tuple(j)
294        except Exception:
295            return False, response.text
296
297        if debug:
298            dprint("Received response: " + str(j))
299        if not j[0]:
300            return j
301
302        rowcount += len(c)
303        num_success_chunks += 1
304
305    success_tuple = True, (
306        f"It took {interval_str(timedelta(seconds=(time.perf_counter() - begin)))} "
307        + f"to sync {rowcount:,} row"
308        + ('s' if rowcount != 1 else '')
309        + f" across {num_success_chunks:,} chunk" + ('s' if num_success_chunks != 1 else '') +
310        f" to {pipe}."
311    )
312    return success_tuple

Sync a DataFrame into a Pipe.

def delete_pipe( self, pipe: Optional[meerschaum.Pipe] = None, debug: bool = False) -> Tuple[bool, str]:
315def delete_pipe(
316    self,
317    pipe: Optional[mrsm.Pipe] = None,
318    debug: bool = False,
319) -> SuccessTuple:
320    """Delete a Pipe and drop its table."""
321    if pipe is None:
322        error("Pipe cannot be None.")
323    r_url = pipe_r_url(pipe)
324    response = self.delete(
325        r_url + '/delete',
326        params={'instance_keys': self.get_pipe_instance_keys(pipe)},
327        debug=debug,
328    )
329    if debug:
330        dprint(response.text)
331
332    response_data = response.json()
333    if isinstance(response.json(), list):
334        response_tuple = response_data[0], response_data[1]
335    elif 'detail' in response.json():
336        response_tuple = response.__bool__(), response_data['detail']
337    else:
338        response_tuple = response.__bool__(), response.text
339    return response_tuple

Delete a Pipe and drop its table.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.DataFrame]:
342def get_pipe_data(
343    self,
344    pipe: mrsm.Pipe,
345    select_columns: Optional[List[str]] = None,
346    omit_columns: Optional[List[str]] = None,
347    begin: Union[str, datetime, int, None] = None,
348    end: Union[str, datetime, int, None] = None,
349    params: Optional[Dict[str, Any]] = None,
350    as_chunks: bool = False,
351    debug: bool = False,
352    **kw: Any
353) -> Union[pandas.DataFrame, None]:
354    """Fetch data from the API."""
355    r_url = pipe_r_url(pipe)
356    while True:
357        try:
358            response = self.get(
359                r_url + "/data",
360                params={
361                    'select_columns': json.dumps(select_columns),
362                    'omit_columns': json.dumps(omit_columns),
363                    'begin': begin,
364                    'end': end,
365                    'params': json.dumps(params, default=str),
366                    'instance': self.get_pipe_instance_keys(pipe),
367                    'as_chunks': as_chunks,
368                },
369                debug=debug
370            )
371            if not response.ok:
372                return None
373            j = response.json()
374        except Exception as e:
375            warn(f"Failed to get data for {pipe}:\n{e}")
376            return None
377        if isinstance(j, dict) and 'detail' in j:
378            return False, j['detail']
379        break
380
381    from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df
382    from meerschaum.utils.dtypes import are_dtypes_equal
383    try:
384        df = parse_df_datetimes(
385            j,
386            ignore_cols=[
387                col
388                for col, dtype in pipe.dtypes.items()
389                if not are_dtypes_equal(str(dtype), 'datetime')
390            ],
391            strip_timezone=(pipe.tzinfo is None),
392            debug=debug,
393        )
394    except Exception as e:
395        warn(f"Failed to parse response for {pipe}:\n{e}")
396        return None
397
398    if len(df.columns) == 0:
399        return add_missing_cols_to_df(df, pipe.dtypes)
400
401    return df

Fetch data from the API.

def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False) -> Union[int, str, NoneType]:
404def get_pipe_id(
405    self,
406    pipe: mrsm.Pipe,
407    debug: bool = False,
408) -> Union[int, str, None]:
409    """Get a Pipe's ID from the API."""
410    from meerschaum.utils.misc import is_int
411    r_url = pipe_r_url(pipe)
412    response = self.get(
413        r_url + '/id',
414        params={
415            'instance': self.get_pipe_instance_keys(pipe),
416        },
417        debug=debug,
418    )
419    if debug:
420        dprint(f"Got pipe ID: {response.text}")
421    try:
422        if is_int(response.text):
423            return int(response.text)
424        if response.text and response.text[0] != '{':
425            return response.text
426    except Exception as e:
427        warn(f"Failed to get the ID for {pipe}:\n{e}")
428    return None

Get a Pipe's ID from the API.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
431def get_pipe_attributes(
432    self,
433    pipe: mrsm.Pipe,
434    debug: bool = False,
435) -> Dict[str, Any]:
436    """Get a Pipe's attributes from the API
437
438    Parameters
439    ----------
440    pipe: meerschaum.Pipe
441        The pipe whose attributes we are fetching.
442        
443    Returns
444    -------
445    A dictionary of a pipe's attributes.
446    If the pipe does not exist, return an empty dictionary.
447    """
448    r_url = pipe_r_url(pipe)
449    response = self.get(
450        r_url + '/attributes',
451        params={
452            'instance': self.get_pipe_instance_keys(pipe),
453        },
454        debug=debug
455    )
456    try:
457        return json.loads(response.text)
458    except Exception as e:
459        warn(f"Failed to get the attributes for {pipe}:\n{e}")
460    return {}

Get a Pipe's attributes from the API

Parameters
Returns
  • A dictionary of a pipe's attributes.
  • If the pipe does not exist, return an empty dictionary.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
463def get_sync_time(
464    self,
465    pipe: mrsm.Pipe,
466    params: Optional[Dict[str, Any]] = None,
467    newest: bool = True,
468    debug: bool = False,
469) -> Union[datetime, int, None]:
470    """Get a Pipe's most recent datetime value from the API.
471
472    Parameters
473    ----------
474    pipe: meerschaum.Pipe
475        The pipe to select from.
476
477    params: Optional[Dict[str, Any]], default None
478        Optional params dictionary to build the WHERE clause.
479
480    newest: bool, default True
481        If `True`, get the most recent datetime (honoring `params`).
482        If `False`, get the oldest datetime (ASC instead of DESC).
483
484    Returns
485    -------
486    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
487    rounded down to the closest minute.
488    """
489    from meerschaum.utils.misc import is_int
490    from meerschaum.utils.warnings import warn
491    r_url = pipe_r_url(pipe)
492    response = self.get(
493        r_url + '/sync_time',
494        json=params,
495        params={
496            'instance': self.get_pipe_instance_keys(pipe),
497            'newest': newest,
498            'debug': debug,
499        },
500        debug=debug,
501    )
502    if not response:
503        warn(f"Failed to get the sync time for {pipe}:\n" + response.text)
504        return None
505
506    j = response.json()
507    if j is None:
508        dt = None
509    else:
510        try:
511            dt = (
512                datetime.fromisoformat(j)
513                if not is_int(j)
514                else int(j)
515            )
516        except Exception as e:
517            warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}")
518            dt = None
519    return dt

Get a Pipe's most recent datetime value from the API.

Parameters
  • pipe (meerschaum.Pipe): The pipe to select from.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
Returns
  • The most recent (or oldest if newest is False) datetime of a pipe,
  • rounded down to the closest minute.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
522def pipe_exists(
523    self,
524    pipe: mrsm.Pipe,
525    debug: bool = False
526) -> bool:
527    """Check the API to see if a Pipe exists.
528
529    Parameters
530    ----------
531    pipe: 'meerschaum.Pipe'
532        The pipe which were are querying.
533        
534    Returns
535    -------
536    A bool indicating whether a pipe's underlying table exists.
537    """
538    from meerschaum.utils.debug import dprint
539    from meerschaum.utils.warnings import warn
540    r_url = pipe_r_url(pipe)
541    response = self.get(
542        r_url + '/exists',
543        params={
544            'instance': self.get_pipe_instance_keys(pipe),
545        },
546        debug=debug,
547    )
548    if not response:
549        warn(f"Failed to check if {pipe} exists:\n{response.text}")
550        return False
551    if debug:
552        dprint("Received response: " + str(response.text))
553    j = response.json()
554    if isinstance(j, dict) and 'detail' in j:
555        warn(j['detail'])
556    return j

Check the API to see if a Pipe exists.

Parameters
Returns
  • A bool indicating whether a pipe's underlying table exists.
def create_metadata(self, debug: bool = False) -> bool:
559def create_metadata(
560    self,
561    debug: bool = False
562) -> bool:
563    """Create metadata tables.
564
565    Returns
566    -------
567    A bool indicating success.
568    """
569    from meerschaum.utils.debug import dprint
570    from meerschaum._internal.static import STATIC_CONFIG
571    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
572    response = self.post(r_url, debug=debug)
573    if debug:
574        dprint("Create metadata response: {response.text}")
575    try:
576        _ = json.loads(response.text)
577    except Exception as e:
578        warn(f"Failed to create metadata on {self}:\n{e}")
579    return False

Create metadata tables.

Returns
  • A bool indicating success.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
582def get_pipe_rowcount(
583    self,
584    pipe: mrsm.Pipe,
585    begin: Union[str, datetime, int, None] = None,
586    end: Union[str, datetime, int, None] = None,
587    params: Optional[Dict[str, Any]] = None,
588    remote: bool = False,
589    debug: bool = False,
590) -> int:
591    """Get a pipe's row count from the API.
592
593    Parameters
594    ----------
595    pipe: 'meerschaum.Pipe':
596        The pipe whose row count we are counting.
597        
598    begin: Union[str, datetime, int, None], default None
599        If provided, bound the count by this datetime.
600
601    end: Union[str, datetime, int, None], default None
602        If provided, bound the count by this datetime.
603
604    params: Optional[Dict[str, Any]], default None
605        If provided, bound the count by these parameters.
606
607    remote: bool, default False
608        If `True`, return the rowcount for the fetch definition.
609
610    Returns
611    -------
612    The number of rows in the pipe's table, bound the given parameters.
613    If the table does not exist, return 0.
614    """
615    r_url = pipe_r_url(pipe)
616    response = self.get(
617        r_url + "/rowcount",
618        json = params,
619        params = {
620            'begin': begin,
621            'end': end,
622            'remote': remote,
623            'instance': self.get_pipe_instance_keys(pipe),
624        },
625        debug = debug
626    )
627    if not response:
628        warn(f"Failed to get the rowcount for {pipe}:\n{response.text}")
629        return 0
630    try:
631        return int(json.loads(response.text))
632    except Exception as e:
633        warn(f"Failed to get the rowcount for {pipe}:\n{e}")
634    return 0

Get a pipe's row count from the API.

Parameters
  • pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
  • begin (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
  • end (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
  • params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
  • remote (bool, default False): If True, return the rowcount for the fetch definition.
Returns
  • The number of rows in the pipe's table, bound the given parameters.
  • If the table does not exist, return 0.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
637def drop_pipe(
638    self,
639    pipe: mrsm.Pipe,
640    debug: bool = False
641) -> SuccessTuple:
642    """
643    Drop a pipe's table but maintain its registration.
644
645    Parameters
646    ----------
647    pipe: meerschaum.Pipe:
648        The pipe to be dropped.
649        
650    Returns
651    -------
652    A success tuple (bool, str).
653    """
654    from meerschaum.utils.warnings import error
655    from meerschaum.utils.debug import dprint
656    if pipe is None:
657        error("Pipe cannot be None.")
658    r_url = pipe_r_url(pipe)
659    response = self.delete(
660        r_url + '/drop',
661        params={
662            'instance': self.get_pipe_instance_keys(pipe),
663        },
664        debug=debug,
665    )
666    if debug:
667        dprint(response.text)
668
669    try:
670        data = response.json()
671    except Exception as e:
672        return False, f"Failed to drop {pipe}."
673
674    if isinstance(data, list):
675        response_tuple = data[0], data[1]
676    elif 'detail' in response.json():
677        response_tuple = response.__bool__(), data['detail']
678    else:
679        response_tuple = response.__bool__(), response.text
680
681    return response_tuple

Drop a pipe's table but maintain its registration.

Parameters
Returns
  • A success tuple (bool, str).
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
684def clear_pipe(
685    self,
686    pipe: mrsm.Pipe,
687    begin: Union[str, datetime, int, None] = None,
688    end: Union[str, datetime, int, None] = None,
689    params: Optional[Dict[str, Any]] = None,
690    debug: bool = False,
691    **kw
692) -> SuccessTuple:
693    """
694    Delete rows in a pipe's table.
695
696    Parameters
697    ----------
698    pipe: meerschaum.Pipe
699        The pipe with rows to be deleted.
700        
701    Returns
702    -------
703    A success tuple.
704    """
705    r_url = pipe_r_url(pipe)
706    response = self.delete(
707        r_url + '/clear',
708        params={
709            'begin': begin,
710            'end': end,
711            'params': json.dumps(params),
712            'instance': self.get_pipe_instance_keys(pipe),
713        },
714        debug=debug,
715    )
716    if debug:
717        dprint(response.text)
718
719    try:
720        data = response.json()
721    except Exception as e:
722        return False, f"Failed to clear {pipe} with constraints {begin=}, {end=}, {params=}."
723
724    if isinstance(data, list):
725        response_tuple = data[0], data[1]
726    elif 'detail' in response.json():
727        response_tuple = response.__bool__(), data['detail']
728    else:
729        response_tuple = response.__bool__(), response.text
730
731    return response_tuple

Delete rows in a pipe's table.

Parameters
Returns
  • A success tuple.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
734def get_pipe_columns_types(
735    self,
736    pipe: mrsm.Pipe,
737    debug: bool = False,
738) -> Union[Dict[str, str], None]:
739    """
740    Fetch the columns and types of the pipe's table.
741
742    Parameters
743    ----------
744    pipe: meerschaum.Pipe
745        The pipe whose columns to be queried.
746
747    Returns
748    -------
749    A dictionary mapping column names to their database types.
750
751    Examples
752    --------
753    >>> {
754    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
755    ...   'id': 'BIGINT',
756    ...   'val': 'DOUBLE PRECISION',
757    ... }
758    >>>
759    """
760    r_url = pipe_r_url(pipe) + '/columns/types'
761    response = self.get(
762        r_url,
763        params={
764            'instance': self.get_pipe_instance_keys(pipe),
765        },
766        debug=debug,
767    )
768    j = response.json()
769    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
770        warn(j['detail'])
771        return None
772    if not isinstance(j, dict):
773        warn(response.text)
774        return None
775    return j

Fetch the columns and types of the pipe's table.

Parameters
Returns
  • A dictionary mapping column names to their database types.
Examples
>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
def get_pipe_columns_indices( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
778def get_pipe_columns_indices(
779    self,
780    pipe: mrsm.Pipe,
781    debug: bool = False,
782) -> Union[Dict[str, str], None]:
783    """
784    Fetch the index information for a pipe.
785
786    Parameters
787    ----------
788    pipe: mrsm.Pipe
789        The pipe whose columns to be queried.
790
791    Returns
792    -------
793    A dictionary mapping column names to a list of associated index information.
794    """
795    r_url = pipe_r_url(pipe) + '/columns/indices'
796    response = self.get(
797        r_url,
798        params={
799            'instance': self.get_pipe_instance_keys(pipe),
800        },
801        debug=debug,
802    )
803    j = response.json()
804    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
805        warn(j['detail'])
806        return None
807    if not isinstance(j, dict):
808        warn(response.text)
809        return None
810    return j

Fetch the index information for a pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose columns to be queried.
Returns
  • A dictionary mapping column names to a list of associated index information.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, str, int] = '', end: Union[datetime.datetime, int] = None, params: 'Optional[Dict, Any]' = None, debug: bool = False, **kw: Any) -> "Iterator['pd.DataFrame']":
16def fetch(
17        self,
18        pipe: mrsm.Pipe,
19        begin: Union[datetime, str, int] = '',
20        end: Union[datetime, int] = None,
21        params: Optional[Dict, Any] = None,
22        debug: bool = False,
23        **kw: Any
24    ) -> Iterator['pd.DataFrame']:
25    """Get the Pipe data from the remote Pipe."""
26    from meerschaum.utils.debug import dprint
27    from meerschaum.utils.warnings import warn, error
28    from meerschaum.config._patch import apply_patch_to_config
29
30    fetch_params = pipe.parameters.get('fetch', {})
31    if not fetch_params:
32        warn(f"Missing 'fetch' parameters for {pipe}.", stack=False)
33        return None
34
35    pipe_meta = fetch_params.get('pipe', {})
36    ### Legacy: check for `connector_keys`, etc. at the root.
37    if not pipe_meta:
38        ck, mk, lk = (
39            fetch_params.get('connector_keys', None),
40            fetch_params.get('metric_key', None),
41            fetch_params.get('location_key', None),
42        )
43        if not ck or not mk:
44            warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False)
45            return None
46
47        pipe_meta.update({
48            'connector': ck,
49            'metric': mk,
50            'location': lk,
51        })
52
53    pipe_meta['instance'] = self
54    source_pipe = mrsm.Pipe(**pipe_meta)
55
56    _params = copy.deepcopy(params) if params is not None else {}
57    _params = apply_patch_to_config(_params, fetch_params.get('params', {}))
58    select_columns = fetch_params.get('select_columns', [])
59    omit_columns = fetch_params.get('omit_columns', [])
60
61    return source_pipe.get_data(
62        select_columns = select_columns,
63        omit_columns = omit_columns,
64        begin = begin,
65        end = end,
66        params = _params,
67        debug = debug,
68        as_iterator = True,
69    )

Get the Pipe data from the remote Pipe.

def register_plugin( self, plugin: meerschaum.Plugin, make_archive: bool = True, debug: bool = False) -> Tuple[bool, str]:
24def register_plugin(
25    self,
26    plugin: mrsm.core.Plugin,
27    make_archive: bool = True,
28    debug: bool = False,
29) -> SuccessTuple:
30    """Register a plugin and upload its archive."""
31    import json
32    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
33    file_pointer = open(archive_path, 'rb')
34    files = {'archive': file_pointer}
35    metadata = {
36        'version': plugin.version,
37        'attributes': json.dumps(plugin.attributes),
38    }
39    r_url = plugin_r_url(plugin)
40    try:
41        response = self.post(r_url, files=files, params=metadata, debug=debug)
42    except Exception:
43        return False, f"Failed to register plugin '{plugin}'."
44    finally:
45        file_pointer.close()
46
47    try:
48        success, msg = json.loads(response.text)
49    except Exception:
50        return False, response.text
51
52    return success, msg

Register a plugin and upload its archive.

def install_plugin( self, name: str, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
55def install_plugin(
56    self,
57    name: str,
58    skip_deps: bool = False,
59    force: bool = False,
60    debug: bool = False
61) -> SuccessTuple:
62    """Download and attempt to install a plugin from the API."""
63    import os
64    import pathlib
65    import json
66    from meerschaum.core import Plugin
67    import meerschaum.config.paths as paths
68    from meerschaum.utils.debug import dprint
69    from meerschaum.utils.packages import attempt_import
70    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
71    r_url = plugin_r_url(name)
72    dest = pathlib.Path(os.path.join(paths.PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
73    if debug:
74        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
75    archive_path = self.wget(r_url, dest, debug=debug) 
76    is_binary = binaryornot_check.is_binary(str(archive_path))
77    if not is_binary:
78        fail_msg = f"Failed to download binary for plugin '{name}'."
79        try:
80            with open(archive_path, 'r') as f:
81                j = json.load(f)
82            if isinstance(j, list):
83                success, msg = tuple(j)
84            elif isinstance(j, dict) and 'detail' in j:
85                success, msg = False, fail_msg
86        except Exception:
87            success, msg = False, fail_msg
88        return success, msg
89    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
90    return plugin.install(skip_deps=skip_deps, force=force, debug=debug)

Download and attempt to install a plugin from the API.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False) -> Tuple[bool, str]:
156def delete_plugin(
157    self,
158    plugin: mrsm.core.Plugin,
159    debug: bool = False
160) -> SuccessTuple:
161    """Delete a plugin from an API repository."""
162    import json
163    r_url = plugin_r_url(plugin)
164    try:
165        response = self.delete(r_url, debug=debug)
166    except Exception:
167        return False, f"Failed to delete plugin '{plugin}'."
168
169    try:
170        success, msg = json.loads(response.text)
171    except Exception:
172        return False, response.text
173
174    return success, msg

Delete a plugin from an API repository.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) -> List[str]:
 93def get_plugins(
 94    self,
 95    user_id: Optional[int] = None,
 96    search_term: Optional[str] = None,
 97    debug: bool = False
 98) -> List[str]:
 99    """Return a list of registered plugin names.
100
101    Parameters
102    ----------
103    user_id: Optional[int], default None
104        If specified, return all plugins from a certain user.
105
106    search_term: Optional[str], default None
107        If specified, return plugins beginning with this string.
108
109    Returns
110    -------
111    A list of plugin names.
112    """
113    import json
114    from meerschaum.utils.warnings import error
115    from meerschaum._internal.static import STATIC_CONFIG
116    response = self.get(
117        STATIC_CONFIG['api']['endpoints']['plugins'],
118        params = {'user_id': user_id, 'search_term': search_term},
119        use_token = True,
120        debug = debug
121    )
122    if not response:
123        return []
124    plugins = json.loads(response.text)
125    if not isinstance(plugins, list):
126        error(response.text)
127    return plugins

Return a list of registered plugin names.

Parameters
  • user_id (Optional[int], default None): If specified, return all plugins from a certain user.
  • search_term (Optional[str], default None): If specified, return plugins beginning with this string.
Returns
  • A list of plugin names.
def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
130def get_plugin_attributes(
131    self,
132    plugin: mrsm.core.Plugin,
133    debug: bool = False
134) -> Dict[str, Any]:
135    """
136    Return a plugin's attributes.
137    """
138    import json
139    from meerschaum.utils.warnings import warn, error
140    r_url = plugin_r_url(plugin) + '/attributes'
141    response = self.get(r_url, use_token=True, debug=debug)
142    attributes = response.json()
143    if isinstance(attributes, str) and attributes and attributes[0] == '{':
144        try:
145            attributes = json.loads(attributes)
146        except Exception:
147            pass
148    if not isinstance(attributes, dict):
149        error(response.text)
150    elif not response and 'detail' in attributes:
151        warn(attributes['detail'])
152        return {}
153    return attributes

Return a plugin's attributes.

def login( self, debug: bool = False, warn: bool = True, **kw: Any) -> Tuple[bool, str]:
19def login(
20    self,
21    debug: bool = False,
22    warn: bool = True,
23    **kw: Any
24) -> SuccessTuple:
25    """Log in and set the session token."""
26    if self.login_scheme == 'api_key':
27        validate_response = self.post(
28            STATIC_CONFIG['api']['endpoints']['tokens'] + '/validate',
29            headers={'Authorization': f'Bearer {self.api_key}'},
30            use_token=False,
31            debug=debug,
32        )
33        if not validate_response:
34            return False, "API key is not valid."
35        return True, "API key is valid."
36
37    try:
38        if self.login_scheme == 'password':
39            login_data = {
40                'username': self.username,
41                'password': self.password,
42            }
43        elif self.login_scheme == 'client_credentials':
44            login_data = {
45                'client_id': self.client_id,
46                'client_secret': self.client_secret,
47            }
48    except AttributeError:
49        login_data = {}
50
51    if not login_data:
52        return False, f"Please login with the command `login {self}`."
53
54    login_scheme_msg = (
55        f" as user '{login_data['username']}'"
56        if self.login_scheme == 'username'
57        else ''
58    )
59
60    response = self.post(
61        STATIC_CONFIG['api']['endpoints']['login'],
62        data=login_data,
63        use_token=False,
64        debug=debug,
65    )
66    if response:
67        msg = f"Successfully logged into '{self}'{login_scheme_msg}'."
68        self._token = json.loads(response.text)['access_token']
69        self._expires = datetime.datetime.strptime(
70            json.loads(response.text)['expires'], 
71            '%Y-%m-%dT%H:%M:%S.%f'
72        )
73    else:
74        msg = (
75            f"Failed to log into '{self}'{login_scheme_msg}.\n" +
76            f"    Please verify login details for connector '{self}'."
77        )
78        if warn and not self.__dict__.get('_emitted_warning', False):
79            _warn(msg, stack=False)
80            self._emitted_warning = True
81
82    return response.__bool__(), msg

Log in and set the session token.

def test_connection(self, **kw: Any) -> Optional[bool]:
 85def test_connection(
 86    self,
 87    **kw: Any
 88) -> Union[bool, None]:
 89    """Test if a successful connection to the API may be made."""
 90    from meerschaum.connectors.poll import retry_connect
 91    _default_kw = {
 92        'max_retries': 1, 'retry_wait': 0, 'warn': False,
 93        'connector': self, 'enforce_chaining': False,
 94        'enforce_login': False,
 95    }
 96    _default_kw.update(kw)
 97    try:
 98        return retry_connect(**_default_kw)
 99    except Exception:
100        return False

Test if a successful connection to the API may be made.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
70def register_user(
71    self,
72    user: mrsm.core.User,
73    debug: bool = False,
74    **kw: Any
75) -> SuccessTuple:
76    """Register a new user."""
77    from meerschaum._internal.static import STATIC_CONFIG
78    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register"
79    data = {
80        'username': user.username,
81        'password': user.password,
82        'attributes': json.dumps(user.attributes),
83    }
84    if user.type:
85        data['type'] = user.type
86    if user.email:
87        data['email'] = user.email
88    response = self.post(r_url, data=data, debug=debug)
89    try:
90        _json = json.loads(response.text)
91        if isinstance(_json, dict) and 'detail' in _json:
92            return False, _json['detail']
93        success_tuple = tuple(_json)
94    except Exception:
95        msg = response.text if response else f"Failed to register user '{user}'."
96        return False, msg
97
98    return tuple(success_tuple)

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Union[int, str, uuid.UUID, NoneType]:
101def get_user_id(
102    self,
103    user: mrsm.core.User,
104    debug: bool = False,
105    **kw: Any
106) -> Union[int, str, UUID, None]:
107    """Get a user's ID."""
108    from meerschaum._internal.static import STATIC_CONFIG
109    from meerschaum.utils.misc import is_int, is_uuid
110    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id"
111    response = self.get(r_url, debug=debug, **kw)
112    try:
113        id_text = str(json.loads(response.text))
114        if is_int(id_text):
115            user_id = int(id_text)
116        elif is_uuid(id_text):
117            user_id = UUID(id_text)
118        else:
119            user_id = id_text
120    except Exception as e:
121        user_id = None
122    return user_id

Get a user's ID.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
19def get_users(
20    self,
21    debug: bool = False,
22    **kw: Any
23) -> List[str]:
24    """
25    Return a list of registered usernames.
26    """
27    from meerschaum._internal.static import STATIC_CONFIG
28    response = self.get(
29        f"{STATIC_CONFIG['api']['endpoints']['users']}",
30        debug = debug,
31        use_token = True,
32    )
33    if not response:
34        return []
35    try:
36        return response.json()
37    except Exception as e:
38        return []

Return a list of registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
41def edit_user(
42    self,
43    user: mrsm.core.User,
44    debug: bool = False,
45    **kw: Any
46) -> SuccessTuple:
47    """Edit an existing user."""
48    from meerschaum._internal.static import STATIC_CONFIG
49    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
50    data = {
51        'username': user.username,
52        'password': user.password,
53        'type': user.type,
54        'email': user.email,
55        'attributes': json.dumps(user.attributes),
56    }
57    response = self.post(r_url, data=data, debug=debug)
58    try:
59        _json = json.loads(response.text)
60        if isinstance(_json, dict) and 'detail' in _json:
61            return False, _json['detail']
62        success_tuple = tuple(_json)
63    except Exception:
64        msg = response.text if response else f"Failed to edit user '{user}'."
65        return False, msg
66
67    return tuple(success_tuple)

Edit an existing user.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
125def delete_user(
126    self,
127    user: mrsm.core.User,
128    debug: bool = False,
129    **kw: Any
130) -> SuccessTuple:
131    """Delete a user."""
132    from meerschaum._internal.static import STATIC_CONFIG
133    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}"
134    response = self.delete(r_url, debug=debug)
135    try:
136        _json = json.loads(response.text)
137        if isinstance(_json, dict) and 'detail' in _json:
138            return False, _json['detail']
139        success_tuple = tuple(_json)
140    except Exception:
141        success_tuple = False, f"Failed to delete user '{user.username}'."
142    return success_tuple

Delete a user.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
166def get_user_password_hash(
167    self,
168    user: mrsm.core.User,
169    debug: bool = False,
170    **kw: Any
171) -> Optional[str]:
172    """If configured, get a user's password hash."""
173    from meerschaum._internal.static import STATIC_CONFIG
174    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
175    response = self.get(r_url, debug=debug, **kw)
176    if not response:
177        return None
178    return response.json()

If configured, get a user's password hash.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
181def get_user_type(
182    self,
183    user: mrsm.core.User,
184    debug: bool = False,
185    **kw: Any
186) -> Optional[str]:
187    """If configured, get a user's type."""
188    from meerschaum._internal.static import STATIC_CONFIG
189    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type'
190    response = self.get(r_url, debug=debug, **kw)
191    if not response:
192        return None
193    return response.json()

If configured, get a user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw) -> int:
145def get_user_attributes(
146    self,
147    user: mrsm.core.User,
148    debug: bool = False,
149    **kw
150) -> int:
151    """Get a user's attributes."""
152    from meerschaum._internal.static import STATIC_CONFIG
153    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes"
154    response = self.get(r_url, debug=debug, **kw)
155    try:
156        attributes = json.loads(response.text)
157    except Exception:
158        attributes = None
159    return attributes

Get a user's attributes.

def register_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
20def register_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
21    """
22    Register the provided token to the API.
23    """
24    from meerschaum.utils.dtypes import json_serialize_value
25    r_url = tokens_endpoint + '/register'
26    response = self.post(
27        r_url,
28        data=json.dumps({
29            'label': token.label,
30            'scopes': token.scopes,
31            'expiration': token.expiration,
32        }, default=json_serialize_value),
33        debug=debug,
34    )
35    if not response:
36        return False, f"Failed to register token:\n{response.text}"
37
38    data = response.json()
39    token.label = data['label']
40    token.secret = data['secret']
41    token.id = uuid.UUID(data['id'])
42    if data.get('expiration', None):
43        token.expiration = datetime.fromisoformat(data['expiration'])
44
45    return True, f"Registered token '{token.label}'."

Register the provided token to the API.

def get_token_model( self, token_id: uuid.UUID, debug: bool = False) -> 'Union[TokenModel, None]':
48def get_token_model(self, token_id: uuid.UUID, debug: bool = False) -> 'Union[TokenModel, None]':
49    """
50    Return a token's model from the API instance.
51    """
52    from meerschaum.models import TokenModel
53    r_url = tokens_endpoint + f'/{token_id}'
54    response = self.get(r_url, debug=debug)
55    if not response:
56        return None
57    data = response.json()
58    return TokenModel(**data)

Return a token's model from the API instance.

def get_tokens( self, labels: Optional[List[str]] = None, debug: bool = False) -> List[meerschaum.core.Token._Token.Token]:
61def get_tokens(self, labels: Optional[List[str]] = None, debug: bool = False) -> List[Token]:
62    """
63    Return the tokens registered to the current user.
64    """
65    from meerschaum.utils.warnings import warn
66    r_url = tokens_endpoint
67    params = {}
68    if labels:
69        params['labels'] = ','.join(labels)
70    response = self.get(r_url, params={'labels': labels}, debug=debug)
71    if not response:
72        warn(f"Could not get tokens from '{self}':\n{response.text}")
73        return []
74
75    tokens = [
76        Token(instance=self, **payload)
77        for payload in response.json()
78    ]
79    return tokens

Return the tokens registered to the current user.

def edit_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
 82def edit_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
 83    """
 84    Persist the token's in-memory state to the API.
 85    """
 86    r_url = tokens_endpoint + f"/{token.id}/edit"
 87    response = self.post(
 88        r_url,
 89        json={
 90            'creation': token.creation.isoformat() if token.creation else None,
 91            'expiration': token.expiration.isoformat() if token.expiration else None,
 92            'label': token.label,
 93            'is_valid': token.is_valid,
 94            'scopes': token.scopes,
 95        },
 96    )
 97    if not response:
 98        return False, f"Failed to edit token:\n{response.text}"
 99
100    success, msg = response.json()
101    return success, msg

Persist the token's in-memory state to the API.

def invalidate_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
104def invalidate_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
105    """
106    Invalidate the token, disabling it for future requests.
107    """
108    r_url = tokens_endpoint + f"/{token.id}/invalidate"
109    response = self.post(r_url)
110    if not response:
111        return False, f"Failed to invalidate token:\n{response.text}"
112
113    success, msg = response.json()
114    return success, msg

Invalidate the token, disabling it for future requests.

def get_token_scopes( self, token_id: Union[uuid.UUID, meerschaum.core.Token._Token.Token], debug: bool = False) -> List[str]:
117def get_token_scopes(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> List[str]:
118    """
119    Return the scopes for a token.
120    """
121    _token_id = (token_id.id if isinstance(token_id, Token) else token_id)
122    model = self.get_token_model(_token_id, debug=debug).scopes
123    return getattr(model, 'scopes', [])

Return the scopes for a token.

def token_exists( self, token_id: Union[uuid.UUID, meerschaum.core.Token._Token.Token], debug: bool = False) -> bool:
126def token_exists(self, token_id: Union[uuid.UUID, Token], debug: bool = False) -> bool:
127    """
128    Return `True` if a token exists.
129    """
130    _token_id = (token_id.id if isinstance(token_id, Token) else token_id)
131    model = self.get_token_model(_token_id, debug=debug)
132    if model is None:
133        return False
134    return model.creation is not None

Return True if a token exists.

def delete_token( self, token: meerschaum.core.Token._Token.Token, debug: bool = False) -> Tuple[bool, str]:
137def delete_token(self, token: Token, debug: bool = False) -> mrsm.SuccessTuple:
138    """
139    Delete the token from the API.
140    """
141    r_url = tokens_endpoint + f"/{token.id}"
142    response = self.delete(r_url, debug=debug)
143    if not response:
144        return False, f"Failed to delete token:\n{response.text}"
145    
146    success, msg = response.json()
147    return success, msg

Delete the token from the API.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[APIConnector, Dict[str, Union[str, int]]]:
13@classmethod
14def from_uri(
15    cls,
16    uri: str,
17    label: Optional[str] = None,
18    as_dict: bool = False,
19) -> Union[
20        'meerschaum.connectors.APIConnector',
21        Dict[str, Union[str, int]],
22    ]:
23    """
24    Create a new APIConnector from a URI string.
25
26    Parameters
27    ----------
28    uri: str
29        The URI connection string.
30
31    label: Optional[str], default None
32        If provided, use this as the connector label.
33        Otherwise use the determined database name.
34
35    as_dict: bool, default False
36        If `True`, return a dictionary of the keyword arguments
37        necessary to create a new `APIConnector`, otherwise create a new object.
38
39    Returns
40    -------
41    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
42    """
43    from meerschaum.connectors.sql import SQLConnector
44    params = SQLConnector.parse_uri(uri)
45    if 'host' not in params:
46        error("No host was found in the provided URI.")
47    params['protocol'] = params.pop('flavor')
48    params['label'] = label or (
49        (
50            (params['username'] + '@' if 'username' in params else '')
51            + params['host']
52        ).lower()
53    )
54
55    return cls(**params) if not as_dict else params

Create a new APIConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.
Returns
  • A new APIConnector object or a dictionary of attributes (if as_dict is True).
def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
29    """
30    Return a dictionary of remote jobs.
31    """
32    response = self.get(JOBS_ENDPOINT, debug=debug)
33    if not response:
34        warn(f"Failed to get remote jobs from {self}.")
35        return {}
36    return {
37        name: Job(
38            name,
39            job_meta['sysargs'],
40            executor_keys=str(self),
41            _properties=job_meta['daemon']['properties']
42        )
43        for name, job_meta in response.json().items()
44    }

Return a dictionary of remote jobs.

def get_job(self, name: str, debug: bool = False) -> meerschaum.Job:
47def get_job(self, name: str, debug: bool = False) -> Job:
48    """
49    Return a single Job object.
50    """
51    metadata = self.get_job_metadata(name, debug=debug)
52    if not metadata:
53        raise ValueError(f"Job '{name}' does not exist.")
54
55    return Job(
56        name,
57        metadata['sysargs'],
58        executor_keys=str(self),
59        _properties=metadata['daemon']['properties'],
60    )

Return a single Job object.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 64    """
 65    Return the metadata for a single job.
 66    """
 67    now = time.perf_counter()
 68    _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None)
 69    _job_metadata_timestamp = (
 70        _job_metadata_cache.get(name, {}).get('timestamp', None)
 71    ) if _job_metadata_cache is not None else None
 72
 73    if (
 74        _job_metadata_timestamp is not None
 75        and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS
 76    ):
 77        if debug:
 78            dprint(f"Returning cached metadata for job '{name}'.")
 79        return _job_metadata_cache[name]['metadata']
 80
 81    response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug)
 82    if not response:
 83        if debug:
 84            msg = (
 85                response.json()['detail']
 86                if 'detail' in response.text
 87                else response.text
 88            )
 89            warn(f"Failed to get metadata for job '{name}':\n{msg}")
 90        return {}
 91
 92    metadata = response.json()
 93    if _job_metadata_cache is None:
 94        self._job_metadata_cache = {}
 95
 96    self._job_metadata_cache[name] = {
 97        'timestamp': now,
 98        'metadata': metadata,
 99    }
100    return metadata

Return the metadata for a single job.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
103    """
104    Return the daemon properties for a single job.
105    """
106    metadata = self.get_job_metadata(name, debug=debug)
107    return metadata.get('daemon', {}).get('properties', {})

Return the daemon properties for a single job.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
149def get_job_exists(self, name: str, debug: bool = False) -> bool:
150    """
151    Return whether a job exists.
152    """
153    response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug)
154    if not response:
155        warn(f"Failed to determine whether job '{name}' exists.")
156        return False
157
158    return response.json()

Return whether a job exists.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
162    """
163    Delete a job.
164    """
165    response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug)
166    if not response:
167        if 'detail' in response.text:
168            return False, response.json()['detail']
169
170        return False, response.text
171
172    return tuple(response.json())

Delete a job.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
176    """
177    Start a job.
178    """
179    response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug)
180    if not response:
181        if 'detail' in response.text:
182            return False, response.json()['detail']
183        return False, response.text
184
185    return tuple(response.json())

Start a job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, str]] = None, debug: bool = False) -> Tuple[bool, str]:
188def create_job(
189    self,
190    name: str,
191    sysargs: List[str],
192    properties: Optional[Dict[str, str]] = None,
193    debug: bool = False,
194) -> SuccessTuple:
195    """
196    Create a job.
197    """
198    response = self.post(
199        JOBS_ENDPOINT + f"/{name}",
200        json={
201            'sysargs': sysargs,
202            'properties': properties,
203        },
204        debug=debug,
205    )
206    if not response:
207        if 'detail' in response.text:
208            return False, response.json()['detail']
209        return False, response.text
210
211    return tuple(response.json())

Create a job.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
215    """
216    Stop a job.
217    """
218    response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug)
219    if not response:
220        if 'detail' in response.text:
221            return False, response.json()['detail']
222        return False, response.text
223
224    return tuple(response.json())

Stop a job.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
228    """
229    Pause a job.
230    """
231    response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug)
232    if not response:
233        if 'detail' in response.text:
234            return False, response.json()['detail']
235        return False, response.text
236
237    return tuple(response.json())

Pause a job.

def get_logs(self, name: str, debug: bool = False) -> str:
240def get_logs(self, name: str, debug: bool = False) -> str:
241    """
242    Return the logs for a job.
243    """
244    response = self.get(LOGS_ENDPOINT + f"/{name}")
245    if not response:
246        raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}")
247
248    return response.json()

Return the logs for a job.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
252    """
253    Return the job's manual stop time.
254    """
255    response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time")
256    if not response:
257        warn(f"Failed to get stop time for job '{name}':\n{response.text}")
258        return None
259
260    data = response.json()
261    if data is None:
262        return None
263
264    return datetime.fromisoformat(data)

Return the job's manual stop time.

def monitor_logs( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[NoneType], str], stop_callback_function: Callable[[NoneType], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
348def monitor_logs(
349    self,
350    name: str,
351    callback_function: Callable[[Any], Any],
352    input_callback_function: Callable[[None], str],
353    stop_callback_function: Callable[[None], str],
354    stop_on_exit: bool = False,
355    strip_timestamps: bool = False,
356    accept_input: bool = True,
357    debug: bool = False,
358):
359    """
360    Monitor a job's log files and execute a callback with the changes.
361    """
362    return asyncio.run(
363        self.monitor_logs_async(
364            name,
365            callback_function,
366            input_callback_function=input_callback_function,
367            stop_callback_function=stop_callback_function,
368            stop_on_exit=stop_on_exit,
369            strip_timestamps=strip_timestamps,
370            accept_input=accept_input,
371            debug=debug
372        )
373    )

Monitor a job's log files and execute a callback with the changes.

async def monitor_logs_async( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[], str], stop_callback_function: Callable[[Tuple[bool, str]], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
267async def monitor_logs_async(
268    self,
269    name: str,
270    callback_function: Callable[[Any], Any],
271    input_callback_function: Callable[[], str],
272    stop_callback_function: Callable[[SuccessTuple], str],
273    stop_on_exit: bool = False,
274    strip_timestamps: bool = False,
275    accept_input: bool = True,
276    debug: bool = False,
277):
278    """
279    Monitor a job's log files and await a callback with the changes.
280    """
281    import traceback
282    from meerschaum.jobs import StopMonitoringLogs
283    from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
284
285    websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions')
286    protocol = 'ws' if self.URI.startswith('http://') else 'wss'
287    port = self.port if 'port' in self.__dict__ else ''
288    uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws"
289
290    async def _stdin_callback(client):
291        if input_callback_function is None:
292            return
293
294        if asyncio.iscoroutinefunction(input_callback_function):
295            data = await input_callback_function()
296        else:
297            data = input_callback_function()
298
299        await client.send(data)
300
301    async def _stop_callback(client):
302        try:
303            result = tuple(json.loads(await client.recv()))
304        except Exception as e:
305            warn(traceback.format_exc())
306            result = False, str(e)
307
308        if stop_callback_function is not None:
309            if asyncio.iscoroutinefunction(stop_callback_function):
310                await stop_callback_function(result)
311            else:
312                stop_callback_function(result)
313
314        if stop_on_exit:
315            raise StopMonitoringLogs
316
317    message_callbacks = {
318        JOBS_STDIN_MESSAGE: _stdin_callback,
319        JOBS_STOP_MESSAGE: _stop_callback,
320    }
321
322    async with websockets.connect(uri) as websocket:
323        try:
324            await websocket.send(self.token or 'no-login')
325        except websockets_exceptions.ConnectionClosedOK:
326            pass
327
328        while True:
329            try:
330                response = await websocket.recv()
331                callback = message_callbacks.get(response, None)
332                if callback is not None:
333                    await callback(websocket)
334                    continue
335
336                if strip_timestamps:
337                    response = strip_timestamp_from_line(response)
338
339                if asyncio.iscoroutinefunction(callback_function):
340                    await callback_function(response)
341                else:
342                    callback_function(response)
343            except (KeyboardInterrupt, StopMonitoringLogs):
344                await websocket.close()
345                break

Monitor a job's log files and await a callback with the changes.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
376    """
377    Return whether a remote job is blocking on stdin.
378    """
379    response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug)
380    if not response:
381        return False
382
383    return response.json()

Return whether a remote job is blocking on stdin.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
117    """
118    Return a job's `began` timestamp, if it exists.
119    """
120    properties = self.get_job_properties(name, debug=debug)
121    began_str = properties.get('daemon', {}).get('began', None)
122    if began_str is None:
123        return None
124
125    return began_str

Return a job's began timestamp, if it exists.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
128    """
129    Return a job's `ended` timestamp, if it exists.
130    """
131    properties = self.get_job_properties(name, debug=debug)
132    ended_str = properties.get('daemon', {}).get('ended', None)
133    if ended_str is None:
134        return None
135
136    return ended_str

Return a job's ended timestamp, if it exists.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
139    """
140    Return a job's `paused` timestamp, if it exists.
141    """
142    properties = self.get_job_properties(name, debug=debug)
143    paused_str = properties.get('daemon', {}).get('paused', None)
144    if paused_str is None:
145        return None
146
147    return paused_str

Return a job's paused timestamp, if it exists.

def get_job_status(self, name: str, debug: bool = False) -> str:
109def get_job_status(self, name: str, debug: bool = False) -> str:
110    """
111    Return the job's status.
112    """
113    metadata = self.get_job_metadata(name, debug=debug)
114    return metadata.get('status', 'stopped')

Return the job's status.