meerschaum.connectors.api

Interact with the Meerschaum API (send commands, pull data, etc.)

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Interact with the Meerschaum API (send commands, pull data, etc.)
 7"""
 8
 9from meerschaum.connectors.api._APIConnector import APIConnector
10
11__all__ = ('APIConnector',)
class APIConnector(meerschaum.connectors._Connector.Connector):
 20class APIConnector(Connector):
 21    """
 22    Connect to a Meerschaum API instance.
 23    """
 24
 25    IS_INSTANCE: bool = True
 26    IS_THREAD_SAFE: bool = False
 27
 28    OPTIONAL_ATTRIBUTES: List[str] = ['port']
 29
 30    from ._request import (
 31        make_request,
 32        get,
 33        post,
 34        put,
 35        patch,
 36        delete,
 37        wget,
 38    )
 39    from ._actions import (
 40        get_actions,
 41        do_action,
 42        do_action_async,
 43        do_action_legacy,
 44    )
 45    from ._misc import get_mrsm_version, get_chaining_status
 46    from ._pipes import (
 47        register_pipe,
 48        fetch_pipes_keys,
 49        edit_pipe,
 50        sync_pipe,
 51        delete_pipe,
 52        get_pipe_data,
 53        get_pipe_id,
 54        get_pipe_attributes,
 55        get_sync_time,
 56        pipe_exists,
 57        create_metadata,
 58        get_pipe_rowcount,
 59        drop_pipe,
 60        clear_pipe,
 61        get_pipe_columns_types,
 62        get_pipe_columns_indices,
 63    )
 64    from ._fetch import fetch
 65    from ._plugins import (
 66        register_plugin,
 67        install_plugin,
 68        delete_plugin,
 69        get_plugins,
 70        get_plugin_attributes,
 71    )
 72    from ._login import login, test_connection
 73    from ._users import (
 74        register_user,
 75        get_user_id,
 76        get_users,
 77        edit_user,
 78        delete_user,
 79        get_user_password_hash,
 80        get_user_type,
 81        get_user_attributes,
 82    )
 83    from ._uri import from_uri
 84    from ._jobs import (
 85        get_jobs,
 86        get_job,
 87        get_job_metadata,
 88        get_job_properties,
 89        get_job_exists,
 90        delete_job,
 91        start_job,
 92        create_job,
 93        stop_job,
 94        pause_job,
 95        get_logs,
 96        get_job_stop_time,
 97        monitor_logs,
 98        monitor_logs_async,
 99        get_job_is_blocking_on_stdin,
100        get_job_began,
101        get_job_ended,
102        get_job_paused,
103        get_job_status,
104    )
105
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        wait: bool = False,
110        debug: bool = False,
111        **kw
112    ):
113        if 'uri' in kw:
114            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
115            label = label or from_uri_params.get('label', None)
116            _ = from_uri_params.pop('label', None)
117            kw.update(from_uri_params)
118
119        super().__init__('api', label=label, **kw)
120        if 'protocol' not in self.__dict__:
121            self.protocol = (
122                'https' if self.__dict__.get('uri', '').startswith('https')
123                else 'http'
124            )
125
126        if 'uri' not in self.__dict__:
127            self.verify_attributes(required_attributes)
128        else:
129            from meerschaum.connectors.sql import SQLConnector
130            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
131            if 'host' not in conn_attrs:
132                raise Exception(f"Invalid URI for '{self}'.")
133            self.__dict__.update(conn_attrs)
134
135        self.url = (
136            self.protocol + '://' +
137            self.host
138            + (
139                (':' + str(self.port))
140                if self.__dict__.get('port', None)
141                else ''
142            )
143        )
144        self._token = None
145        self._expires = None
146        self._session = None
147
148
149    @property
150    def URI(self) -> str:
151        """
152        Return the fully qualified URI.
153        """
154        username = self.__dict__.get('username', None)
155        password = self.__dict__.get('password', None)
156        creds = (username + ':' + password + '@') if username and password else ''
157        return (
158            self.protocol
159            + '://'
160            + creds
161            + self.host
162            + (
163                (':' + str(self.port))
164                if self.__dict__.get('port', None)
165                else ''
166            )
167        )
168
169
170    @property
171    def session(self):
172        if self._session is None:
173            _ = attempt_import('certifi', lazy=False)
174            requests = attempt_import('requests', lazy=False)
175            if requests:
176                self._session = requests.Session()
177            if self._session is None:
178                error("Failed to import requests. Is requests installed?")
179        return self._session
180
181    @property
182    def token(self):
183        expired = (
184            True if self._expires is None else (
185                (
186                    self._expires
187                    <
188                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
189                )
190            )
191        )
192
193        if self._token is None or expired:
194            success, msg = self.login()
195            if not success and not self.__dict__.get('_emitted_warning'):
196                warn(msg, stack=False)
197                self._emitted_warning = True
198        return self._token

Connect to a Meerschaum API instance.

APIConnector( label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        wait: bool = False,
110        debug: bool = False,
111        **kw
112    ):
113        if 'uri' in kw:
114            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
115            label = label or from_uri_params.get('label', None)
116            _ = from_uri_params.pop('label', None)
117            kw.update(from_uri_params)
118
119        super().__init__('api', label=label, **kw)
120        if 'protocol' not in self.__dict__:
121            self.protocol = (
122                'https' if self.__dict__.get('uri', '').startswith('https')
123                else 'http'
124            )
125
126        if 'uri' not in self.__dict__:
127            self.verify_attributes(required_attributes)
128        else:
129            from meerschaum.connectors.sql import SQLConnector
130            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
131            if 'host' not in conn_attrs:
132                raise Exception(f"Invalid URI for '{self}'.")
133            self.__dict__.update(conn_attrs)
134
135        self.url = (
136            self.protocol + '://' +
137            self.host
138            + (
139                (':' + str(self.port))
140                if self.__dict__.get('port', None)
141                else ''
142            )
143        )
144        self._token = None
145        self._expires = None
146        self._session = None

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
IS_INSTANCE: bool = True
IS_THREAD_SAFE: bool = False
OPTIONAL_ATTRIBUTES: List[str] = ['port']
url
URI: str
149    @property
150    def URI(self) -> str:
151        """
152        Return the fully qualified URI.
153        """
154        username = self.__dict__.get('username', None)
155        password = self.__dict__.get('password', None)
156        creds = (username + ':' + password + '@') if username and password else ''
157        return (
158            self.protocol
159            + '://'
160            + creds
161            + self.host
162            + (
163                (':' + str(self.port))
164                if self.__dict__.get('port', None)
165                else ''
166            )
167        )

Return the fully qualified URI.

session
170    @property
171    def session(self):
172        if self._session is None:
173            _ = attempt_import('certifi', lazy=False)
174            requests = attempt_import('requests', lazy=False)
175            if requests:
176                self._session = requests.Session()
177            if self._session is None:
178                error("Failed to import requests. Is requests installed?")
179        return self._session
token
181    @property
182    def token(self):
183        expired = (
184            True if self._expires is None else (
185                (
186                    self._expires
187                    <
188                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
189                )
190            )
191        )
192
193        if self._token is None or expired:
194            success, msg = self.login()
195            if not success and not self.__dict__.get('_emitted_warning'):
196                warn(msg, stack=False)
197                self._emitted_warning = True
198        return self._token
def make_request( self, method: str, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kwargs: Any) -> requests.models.Response:
 28def make_request(
 29    self,
 30    method: str,
 31    r_url: str,
 32    headers: Optional[Dict[str, Any]] = None,
 33    use_token: bool = True,
 34    debug: bool = False,
 35    **kwargs: Any
 36) -> 'requests.Response':
 37    """
 38    Make a request to this APIConnector's endpoint using the in-memory session.
 39
 40    Parameters
 41    ----------
 42    method: str
 43        The kind of request to make.
 44        Accepted values:
 45        - `'GET'`
 46        - `'OPTIONS'`
 47        - `'HEAD'`
 48        - `'POST'`
 49        - `'PUT'`
 50        - `'PATCH'`
 51        - `'DELETE'`
 52
 53    r_url: str
 54        The relative URL for the endpoint (e.g. `'/pipes'`).
 55
 56    headers: Optional[Dict[str, Any]], default None
 57        The headers to use for the request.
 58        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
 59
 60    use_token: bool, default True
 61        If `True`, add the authorization token to the headers.
 62
 63    debug: bool, default False
 64        Verbosity toggle.
 65
 66    kwargs: Any
 67        All other keyword arguments are passed to `requests.request`.
 68
 69    Returns
 70    -------
 71    A `requests.Reponse` object.
 72    """
 73    if method.upper() not in METHODS:
 74        raise ValueError(f"Method '{method}' is not supported.")
 75
 76    verify = self.__dict__.get('verify', None)
 77    if 'verify' not in kwargs and isinstance(verify, bool):
 78        kwargs['verify'] = verify
 79
 80    headers = (
 81        copy.deepcopy(headers)
 82        if isinstance(headers, dict)
 83        else {}
 84    )
 85
 86    if use_token:
 87        headers.update({'Authorization': f'Bearer {self.token}'})
 88
 89    if 'timeout' not in kwargs:
 90        kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout']
 91
 92    request_url = urllib.parse.urljoin(self.url, r_url)
 93    if debug:
 94        dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}")
 95
 96    return self.session.request(
 97        method.upper(),
 98        request_url,
 99        headers = headers,
100        **kwargs
101    )

Make a request to this APIConnector's endpoint using the in-memory session.

Parameters
  • method (str): The kind of request to make. Accepted values:
    • 'GET'
    • 'OPTIONS'
    • 'HEAD'
    • 'POST'
    • 'PUT'
    • 'PATCH'
    • 'DELETE'
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def get(self, r_url: str, **kwargs: Any) -> requests.models.Response:
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response':
105    """
106    Wrapper for `requests.get`.
107
108    Parameters
109    ----------
110    r_url: str
111        The relative URL for the endpoint (e.g. `'/pipes'`).
112
113    headers: Optional[Dict[str, Any]], default None
114        The headers to use for the request.
115        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
116
117    use_token: bool, default True
118        If `True`, add the authorization token to the headers.
119
120    debug: bool, default False
121        Verbosity toggle.
122
123    kwargs: Any
124        All other keyword arguments are passed to `requests.request`.
125
126    Returns
127    -------
128    A `requests.Reponse` object.
129
130    """
131    return self.make_request('GET', r_url, **kwargs)

Wrapper for requests.get.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def post(self, r_url: str, **kwargs: Any) -> requests.models.Response:
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response':
135    """
136    Wrapper for `requests.post`.
137
138    Parameters
139    ----------
140    r_url: str
141        The relative URL for the endpoint (e.g. `'/pipes'`).
142
143    headers: Optional[Dict[str, Any]], default None
144        The headers to use for the request.
145        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
146
147    use_token: bool, default True
148        If `True`, add the authorization token to the headers.
149
150    debug: bool, default False
151        Verbosity toggle.
152
153    kwargs: Any
154        All other keyword arguments are passed to `requests.request`.
155
156    Returns
157    -------
158    A `requests.Reponse` object.
159
160    """
161    return self.make_request('POST', r_url, **kwargs)

Wrapper for requests.post.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def put(self, r_url: str, **kwargs: Any) -> requests.models.Response:
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response':
194    """
195    Wrapper for `requests.put`.
196
197    Parameters
198    ----------
199    r_url: str
200        The relative URL for the endpoint (e.g. `'/pipes'`).
201
202    headers: Optional[Dict[str, Any]], default None
203        The headers to use for the request.
204        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
205
206    use_token: bool, default True
207        If `True`, add the authorization token to the headers.
208
209    debug: bool, default False
210        Verbosity toggle.
211
212    kwargs: Any
213        All other keyword arguments are passed to `requests.request`.
214
215    Returns
216    -------
217    A `requests.Reponse` object.
218    """
219    return self.make_request('PUT', r_url, **kwargs)

Wrapper for requests.put.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def patch(self, r_url: str, **kwargs: Any) -> requests.models.Response:
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response':
165    """
166    Wrapper for `requests.patch`.
167
168    Parameters
169    ----------
170    r_url: str
171        The relative URL for the endpoint (e.g. `'/pipes'`).
172
173    headers: Optional[Dict[str, Any]], default None
174        The headers to use for the request.
175        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
176
177    use_token: bool, default True
178        If `True`, add the authorization token to the headers.
179
180    debug: bool, default False
181        Verbosity toggle.
182
183    kwargs: Any
184        All other keyword arguments are passed to `requests.request`.
185
186    Returns
187    -------
188    A `requests.Reponse` object.
189    """
190    return self.make_request('PATCH', r_url, **kwargs)

Wrapper for requests.patch.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def delete(self, r_url: str, **kwargs: Any) -> requests.models.Response:
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response':
223    """
224    Wrapper for `requests.delete`.
225
226    Parameters
227    ----------
228    r_url: str
229        The relative URL for the endpoint (e.g. `'/pipes'`).
230
231    headers: Optional[Dict[str, Any]], default None
232        The headers to use for the request.
233        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
234
235    use_token: bool, default True
236        If `True`, add the authorization token to the headers.
237
238    debug: bool, default False
239        Verbosity toggle.
240
241    kwargs: Any
242        All other keyword arguments are passed to `requests.request`.
243
244    Returns
245    -------
246    A `requests.Reponse` object.
247    """
248    return self.make_request('DELETE', r_url, **kwargs)

Wrapper for requests.delete.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def wget( self, r_url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
251def wget(
252    self,
253    r_url: str,
254    dest: Optional[Union[str, pathlib.Path]] = None,
255    headers: Optional[Dict[str, Any]] = None,
256    use_token: bool = True,
257    debug: bool = False,
258    **kw: Any
259) -> pathlib.Path:
260    """Mimic wget with requests."""
261    from meerschaum.utils.misc import wget
262    if headers is None:
263        headers = {}
264    if use_token:
265        headers.update({'Authorization': f'Bearer {self.token}'})
266    request_url = urllib.parse.urljoin(self.url, r_url)
267    if debug:
268        dprint(
269            f"[{self}] Downloading {request_url}"
270            + (f' to {dest}' if dest is not None else '')
271            + "..."
272        )
273    return wget(request_url, dest=dest, headers=headers, **kw)

Mimic wget with requests.

def get_actions(self):
24def get_actions(self):
25    """Get available actions from the API instance."""
26    return self.get(ACTIONS_ENDPOINT)

Get available actions from the API instance.

def do_action(self, sysargs: List[str]) -> Tuple[bool, str]:
29def do_action(self, sysargs: List[str]) -> SuccessTuple:
30    """
31    Execute a Meerschaum action remotely.
32    """
33    return asyncio.run(self.do_action_async(sysargs))

Execute a Meerschaum action remotely.

async def do_action_async( self, sysargs: List[str], callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='')) -> Tuple[bool, str]:
36async def do_action_async(
37    self,
38    sysargs: List[str],
39    callback_function: Callable[[str], None] = partial(print, end=''),
40) -> SuccessTuple:
41    """
42    Execute an action as a temporary remote job.
43    """
44    from meerschaum._internal.arguments import remove_api_executor_keys
45    from meerschaum.utils.misc import generate_password
46    sysargs = remove_api_executor_keys(sysargs)
47
48    job_name = TEMP_PREFIX + generate_password(12)
49    job = mrsm.Job(job_name, sysargs, executor_keys=str(self))
50
51    start_success, start_msg = job.start()
52    if not start_success:
53        return start_success, start_msg
54
55    await job.monitor_logs_async(
56        callback_function=callback_function,
57        stop_on_exit=True,
58        strip_timestamps=True,
59    )
60
61    success, msg = job.result
62    job.delete()
63    return success, msg

Execute an action as a temporary remote job.

def do_action_legacy( self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
 66def do_action_legacy(
 67    self,
 68    action: Optional[List[str]] = None,
 69    sysargs: Optional[List[str]] = None,
 70    debug: bool = False,
 71    **kw
 72) -> SuccessTuple:
 73    """
 74    NOTE: This method is deprecated.
 75    Please use `do_action()` or `do_action_async()`.
 76
 77    Execute a Meerschaum action remotely.
 78
 79    If `sysargs` are provided, parse those instead.
 80    Otherwise infer everything from keyword arguments.
 81
 82    Examples
 83    --------
 84    >>> conn = mrsm.get_connector('api:main')
 85    >>> conn.do_action(['show', 'pipes'])
 86    (True, "Success")
 87    >>> conn.do_action(['show', 'arguments'], name='test')
 88    (True, "Success")
 89    """
 90    import sys, json
 91    from meerschaum.utils.debug import dprint
 92    from meerschaum.config.static import STATIC_CONFIG
 93    from meerschaum.utils.misc import json_serialize_datetime
 94    if action is None:
 95        action = []
 96
 97    if sysargs is not None and action and action[0] == '':
 98        from meerschaum._internal.arguments import parse_arguments
 99        if debug:
100            dprint(f"Parsing sysargs:\n{sysargs}")
101        json_dict = parse_arguments(sysargs)
102    else:
103        json_dict = kw
104        json_dict['action'] = action
105        if 'noask' not in kw:
106            json_dict['noask'] = True
107        if 'yes' not in kw:
108            json_dict['yes'] = True
109        if debug:
110            json_dict['debug'] = debug
111
112    root_action = json_dict['action'][0]
113    del json_dict['action'][0]
114    r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}"
115    
116    if debug:
117        from meerschaum.utils.formatting import pprint
118        dprint(f"Sending data to '{self.url + r_url}':")
119        pprint(json_dict, stream=sys.stderr)
120
121    response = self.post(
122        r_url,
123        data = json.dumps(json_dict, default=json_serialize_datetime),
124        debug = debug,
125    )
126    try:
127        response_list = json.loads(response.text)
128        if isinstance(response_list, dict) and 'detail' in response_list:
129            return False, response_list['detail']
130    except Exception as e:
131        print(f"Invalid response: {response}")
132        print(e)
133        return False, response.text
134    if debug:
135        dprint(response)
136    try:
137        return response_list[0], response_list[1]
138    except Exception as e:
139        return False, f"Failed to parse result from action '{root_action}'"

NOTE: This method is deprecated. Please use do_action() or do_action_async().

Execute a Meerschaum action remotely.

If sysargs are provided, parse those instead. Otherwise infer everything from keyword arguments.

Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
def get_mrsm_version(self, **kw) -> Optional[str]:
13def get_mrsm_version(self, **kw) -> Optional[str]:
14    """
15    Return the Meerschaum version of the API instance.
16    """
17    from meerschaum.config.static import STATIC_CONFIG
18    try:
19        j = self.get(
20            STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm',
21            use_token=False,
22            **kw
23        ).json()
24    except Exception:
25        return None
26    if isinstance(j, dict) and 'detail' in j:
27        return None
28    return j

Return the Meerschaum version of the API instance.

def get_chaining_status(self, **kw) -> Optional[bool]:
31def get_chaining_status(self, **kw) -> Optional[bool]:
32    """
33    Fetch the chaining status of the API instance.
34    """
35    from meerschaum.config.static import STATIC_CONFIG
36    try:
37        response = self.get(
38            STATIC_CONFIG['api']['endpoints']['chaining'],
39            use_token = True,
40            **kw
41        )
42        if not response:
43            return None
44    except Exception:
45        return None
46
47    return response.json()

Fetch the chaining status of the API instance.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
35def register_pipe(
36    self,
37    pipe: mrsm.Pipe,
38    debug: bool = False
39) -> SuccessTuple:
40    """Submit a POST to the API to register a new Pipe object.
41    Returns a tuple of (success_bool, response_dict).
42    """
43    from meerschaum.utils.debug import dprint
44    ### NOTE: if `parameters` is supplied in the Pipe constructor,
45    ###       then `pipe.parameters` will exist and not be fetched from the database.
46    r_url = pipe_r_url(pipe)
47    response = self.post(
48        r_url + '/register',
49        json = pipe.parameters,
50        debug = debug,
51    )
52    if debug:
53        dprint(response.text)
54
55    if not response:
56        return False, response.text
57
58    response_data = response.json()
59    if isinstance(response_data, list):
60        response_tuple = response_data[0], response_data[1]
61    elif 'detail' in response.json():
62        response_tuple = response.__bool__(), response_data['detail']
63    else:
64        response_tuple = response.__bool__(), response.text
65    return response_tuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> List[Tuple[str, str, Optional[str]]]:
101def fetch_pipes_keys(
102    self,
103    connector_keys: Optional[List[str]] = None,
104    metric_keys: Optional[List[str]] = None,
105    location_keys: Optional[List[str]] = None,
106    tags: Optional[List[str]] = None,
107    params: Optional[Dict[str, Any]] = None,
108    debug: bool = False
109) -> Union[List[Tuple[str, str, Union[str, None]]]]:
110    """
111    Fetch registered Pipes' keys from the API.
112    
113    Parameters
114    ----------
115    connector_keys: Optional[List[str]], default None
116        The connector keys for the query.
117
118    metric_keys: Optional[List[str]], default None
119        The metric keys for the query.
120
121    location_keys: Optional[List[str]], default None
122        The location keys for the query.
123
124    tags: Optional[List[str]], default None
125        A list of tags for the query.
126
127    params: Optional[Dict[str, Any]], default None
128        A parameters dictionary for filtering against the `pipes` table
129        (e.g. `{'connector_keys': 'plugin:foo'}`).
130        Not recommeded to be used.
131
132    debug: bool, default False
133        Verbosity toggle.
134
135    Returns
136    -------
137    A list of tuples containing pipes' keys.
138    """
139    from meerschaum.config.static import STATIC_CONFIG
140    if connector_keys is None:
141        connector_keys = []
142    if metric_keys is None:
143        metric_keys = []
144    if location_keys is None:
145        location_keys = []
146    if tags is None:
147        tags = []
148
149    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
150    try:
151        j = self.get(
152            r_url,
153            params = {
154                'connector_keys': json.dumps(connector_keys),
155                'metric_keys': json.dumps(metric_keys),
156                'location_keys': json.dumps(location_keys),
157                'tags': json.dumps(tags),
158                'params': json.dumps(params),
159            },
160            debug=debug
161        ).json()
162    except Exception as e:
163        error(str(e))
164
165    if 'detail' in j:
166        error(j['detail'], stack=False)
167    return [tuple(r) for r in j]

Fetch registered Pipes' keys from the API.

Parameters
  • connector_keys (Optional[List[str]], default None): The connector keys for the query.
  • metric_keys (Optional[List[str]], default None): The metric keys for the query.
  • location_keys (Optional[List[str]], default None): The location keys for the query.
  • tags (Optional[List[str]], default None): A list of tags for the query.
  • params (Optional[Dict[str, Any]], default None): A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of tuples containing pipes' keys.
def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False) -> Tuple[bool, str]:
68def edit_pipe(
69    self,
70    pipe: mrsm.Pipe,
71    patch: bool = False,
72    debug: bool = False,
73) -> SuccessTuple:
74    """Submit a PATCH to the API to edit an existing Pipe object.
75    Returns a tuple of (success_bool, response_dict).
76    """
77    from meerschaum.utils.debug import dprint
78    ### NOTE: if `parameters` is supplied in the Pipe constructor,
79    ###       then `pipe.parameters` will exist and not be fetched from the database.
80    r_url = pipe_r_url(pipe)
81    response = self.patch(
82        r_url + '/edit',
83        params = {'patch': patch,},
84        json = pipe.parameters,
85        debug = debug,
86    )
87    if debug:
88        dprint(response.text)
89
90    response_data = response.json()
91
92    if isinstance(response.json(), list):
93        response_tuple = response_data[0], response_data[1]
94    elif 'detail' in response.json():
95        response_tuple = response.__bool__(), response_data['detail']
96    else:
97        response_tuple = response.__bool__(), response.text
98    return response_tuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

def sync_pipe( self, pipe: meerschaum.Pipe, df: "Optional[Union['pd.DataFrame', Dict[Any, Any], str]]" = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
170def sync_pipe(
171    self,
172    pipe: mrsm.Pipe,
173    df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None,
174    chunksize: Optional[int] = -1,
175    debug: bool = False,
176    **kw: Any
177) -> SuccessTuple:
178    """Sync a DataFrame into a Pipe."""
179    from decimal import Decimal
180    from meerschaum.utils.debug import dprint
181    from meerschaum.utils.misc import json_serialize_datetime, items_str, interval_str
182    from meerschaum.config import get_config
183    from meerschaum.utils.packages import attempt_import
184    from meerschaum.utils.dataframe import get_numeric_cols, to_json
185    begin = time.perf_counter()
186    more_itertools = attempt_import('more_itertools')
187    if df is None:
188        msg = f"DataFrame is `None`. Cannot sync {pipe}."
189        return False, msg
190
191    def get_json_str(c):
192        ### allow syncing dict or JSON without needing to import pandas (for IOT devices)
193        if isinstance(c, (dict, list)):
194            return json.dumps(c, default=json_serialize_datetime)
195        return to_json(c, orient='columns')
196
197    df = json.loads(df) if isinstance(df, str) else df
198
199    _chunksize: Optional[int] = (1 if chunksize is None else (
200        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
201        else chunksize
202    ))
203    keys: List[str] = list(df.columns)
204    chunks = []
205    if hasattr(df, 'index'):
206        df = df.reset_index(drop=True)
207        is_dask = 'dask' in df.__module__
208        chunks = (
209            (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize))
210            if not is_dask
211            else [partition.compute() for partition in df.partitions]
212        )
213
214        numeric_cols = get_numeric_cols(df)
215        if numeric_cols:
216            for col in numeric_cols:
217                df[col] = df[col].apply(lambda x: f'{x:f}' if isinstance(x, Decimal) else x)
218            pipe_dtypes = pipe.dtypes
219            new_numeric_cols = [
220                col
221                for col in numeric_cols
222                if pipe_dtypes.get(col, None) != 'numeric'
223            ]
224            pipe.dtypes.update({
225                col: 'numeric'
226                for col in new_numeric_cols
227            })
228            edit_success, edit_msg = pipe.edit(debug=debug)
229            if not edit_success:
230                warn(
231                    "Failed to update new numeric columns "
232                    + f"{items_str(new_numeric_cols)}:\n{edit_msg}"
233                )
234    elif isinstance(df, dict):
235        ### `_chunks` is a dict of lists of dicts.
236        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
237        _chunks = {k: [] for k in keys}
238        for k in keys:
239            chunk_iter = more_itertools.chunked(df[k], _chunksize)
240            for l in chunk_iter:
241                _chunks[k].append({k: l})
242
243        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
244        for k, l in _chunks.items():
245            for i, c in enumerate(l):
246                try:
247                    chunks[i].update(c)
248                except IndexError:
249                    chunks.append(c)
250    elif isinstance(df, list):
251        chunks = (df[i] for i in more_itertools.chunked(df, _chunksize))
252
253    ### Send columns in case the user has defined them locally.
254    if pipe.columns:
255        kw['columns'] = json.dumps(pipe.columns)
256    r_url = pipe_r_url(pipe) + '/data'
257
258    rowcount = 0
259    num_success_chunks = 0
260    for i, c in enumerate(chunks):
261        if debug:
262            dprint(f"[{self}] Posting chunk {i} to {r_url}...")
263        if len(c) == 0:
264            if debug:
265                dprint(f"[{self}] Skipping empty chunk...")
266            continue
267        json_str = get_json_str(c)
268
269        try:
270            response = self.post(
271                r_url,
272                ### handles check_existing
273                params = kw,
274                data = json_str,
275                debug = debug
276            )
277        except Exception as e:
278            msg = f"Failed to post a chunk to {pipe}:\n{e}"
279            warn(msg)
280            return False, msg
281            
282        if not response:
283            return False, f"Failed to sync a chunk:\n{response.text}"
284
285        try:
286            j = json.loads(response.text)
287        except Exception as e:
288            return False, f"Failed to parse response from syncing {pipe}:\n{e}"
289
290        if isinstance(j, dict) and 'detail' in j:
291            return False, j['detail']
292
293        try:
294            j = tuple(j)
295        except Exception:
296            return False, response.text
297
298        if debug:
299            dprint("Received response: " + str(j))
300        if not j[0]:
301            return j
302
303        rowcount += len(c)
304        num_success_chunks += 1
305
306    success_tuple = True, (
307        f"It took {interval_str(timedelta(seconds=(time.perf_counter() - begin)))} "
308        + "to sync {rowcount:,} row"
309        + ('s' if rowcount != 1 else '')
310        + f" across {num_success_chunks:,} chunk" + ('s' if num_success_chunks != 1 else '') +
311        f" to {pipe}."
312    )
313    return success_tuple

Sync a DataFrame into a Pipe.

def delete_pipe( self, pipe: Optional[meerschaum.Pipe] = None, debug: bool = None) -> Tuple[bool, str]:
316def delete_pipe(
317    self,
318    pipe: Optional[mrsm.Pipe] = None,
319    debug: bool = None,        
320) -> SuccessTuple:
321    """Delete a Pipe and drop its table."""
322    if pipe is None:
323        error("Pipe cannot be None.")
324    r_url = pipe_r_url(pipe)
325    response = self.delete(
326        r_url + '/delete',
327        debug = debug,
328    )
329    if debug:
330        dprint(response.text)
331
332    response_data = response.json()
333    if isinstance(response.json(), list):
334        response_tuple = response_data[0], response_data[1]
335    elif 'detail' in response.json():
336        response_tuple = response.__bool__(), response_data['detail']
337    else:
338        response_tuple = response.__bool__(), response.text
339    return response_tuple

Delete a Pipe and drop its table.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
342def get_pipe_data(
343    self,
344    pipe: mrsm.Pipe,
345    select_columns: Optional[List[str]] = None,
346    omit_columns: Optional[List[str]] = None,
347    begin: Union[str, datetime, int, None] = None,
348    end: Union[str, datetime, int, None] = None,
349    params: Optional[Dict[str, Any]] = None,
350    as_chunks: bool = False,
351    debug: bool = False,
352    **kw: Any
353) -> Union[pandas.DataFrame, None]:
354    """Fetch data from the API."""
355    r_url = pipe_r_url(pipe)
356    while True:
357        try:
358            response = self.get(
359                r_url + "/data",
360                params={
361                    'select_columns': json.dumps(select_columns),
362                    'omit_columns': json.dumps(omit_columns),
363                    'begin': begin,
364                    'end': end,
365                    'params': json.dumps(params, default=str)
366                },
367                debug=debug
368            )
369            if not response.ok:
370                return None
371            j = response.json()
372        except Exception as e:
373            warn(f"Failed to get data for {pipe}:\n{e}")
374            return None
375        if isinstance(j, dict) and 'detail' in j:
376            return False, j['detail']
377        break
378
379    from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df
380    from meerschaum.utils.dtypes import are_dtypes_equal
381    try:
382        df = parse_df_datetimes(
383            j,
384            ignore_cols=[
385                col
386                for col, dtype in pipe.dtypes.items()
387                if not are_dtypes_equal(str(dtype), 'datetime')
388            ],
389            strip_timezone=(pipe.tzinfo is None),
390            debug=debug,
391        )
392    except Exception as e:
393        warn(f"Failed to parse response for {pipe}:\n{e}")
394        return None
395
396    if len(df.columns) == 0:
397        return add_missing_cols_to_df(df, pipe.dtypes)
398
399    return df

Fetch data from the API.

def get_pipe_id(self, pipe: meerschaum.Pipe, debug: bool = False) -> int:
402def get_pipe_id(
403    self,
404    pipe: mrsm.Pipe,
405    debug: bool = False,
406) -> int:
407    """Get a Pipe's ID from the API."""
408    from meerschaum.utils.misc import is_int
409    r_url = pipe_r_url(pipe)
410    response = self.get(
411        r_url + '/id',
412        debug = debug
413    )
414    if debug:
415        dprint(f"Got pipe ID: {response.text}")
416    try:
417        if is_int(response.text):
418            return int(response.text)
419    except Exception as e:
420        warn(f"Failed to get the ID for {pipe}:\n{e}")
421    return None

Get a Pipe's ID from the API.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
424def get_pipe_attributes(
425    self,
426    pipe: mrsm.Pipe,
427    debug: bool = False,
428) -> Dict[str, Any]:
429    """Get a Pipe's attributes from the API
430
431    Parameters
432    ----------
433    pipe: meerschaum.Pipe
434        The pipe whose attributes we are fetching.
435        
436    Returns
437    -------
438    A dictionary of a pipe's attributes.
439    If the pipe does not exist, return an empty dictionary.
440    """
441    r_url = pipe_r_url(pipe)
442    response = self.get(r_url + '/attributes', debug=debug)
443    try:
444        return json.loads(response.text)
445    except Exception as e:
446        warn(f"Failed to get the attributes for {pipe}:\n{e}")
447    return {}

Get a Pipe's attributes from the API

Parameters
Returns
  • A dictionary of a pipe's attributes.
  • If the pipe does not exist, return an empty dictionary.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
450def get_sync_time(
451    self,
452    pipe: mrsm.Pipe,
453    params: Optional[Dict[str, Any]] = None,
454    newest: bool = True,
455    debug: bool = False,
456) -> Union[datetime, int, None]:
457    """Get a Pipe's most recent datetime value from the API.
458
459    Parameters
460    ----------
461    pipe: meerschaum.Pipe
462        The pipe to select from.
463
464    params: Optional[Dict[str, Any]], default None
465        Optional params dictionary to build the WHERE clause.
466
467    newest: bool, default True
468        If `True`, get the most recent datetime (honoring `params`).
469        If `False`, get the oldest datetime (ASC instead of DESC).
470
471    Returns
472    -------
473    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
474    rounded down to the closest minute.
475    """
476    from meerschaum.utils.misc import is_int
477    from meerschaum.utils.warnings import warn
478    r_url = pipe_r_url(pipe)
479    response = self.get(
480        r_url + '/sync_time',
481        json = params,
482        params = {'newest': newest, 'debug': debug},
483        debug = debug,
484    )
485    if not response:
486        warn(f"Failed to get the sync time for {pipe}:\n" + response.text)
487        return None
488
489    j = response.json()
490    if j is None:
491        dt = None
492    else:
493        try:
494            dt = (
495                datetime.fromisoformat(j)
496                if not is_int(j)
497                else int(j)
498            )
499        except Exception as e:
500            warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}")
501            dt = None
502    return dt

Get a Pipe's most recent datetime value from the API.

Parameters
  • pipe (meerschaum.Pipe): The pipe to select from.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
Returns
  • The most recent (or oldest if newest is False) datetime of a pipe,
  • rounded down to the closest minute.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
505def pipe_exists(
506    self,
507    pipe: mrsm.Pipe,
508    debug: bool = False
509) -> bool:
510    """Check the API to see if a Pipe exists.
511
512    Parameters
513    ----------
514    pipe: 'meerschaum.Pipe'
515        The pipe which were are querying.
516        
517    Returns
518    -------
519    A bool indicating whether a pipe's underlying table exists.
520    """
521    from meerschaum.utils.debug import dprint
522    from meerschaum.utils.warnings import warn
523    r_url = pipe_r_url(pipe)
524    response = self.get(r_url + '/exists', debug=debug)
525    if not response:
526        warn(f"Failed to check if {pipe} exists:\n{response.text}")
527        return False
528    if debug:
529        dprint("Received response: " + str(response.text))
530    j = response.json()
531    if isinstance(j, dict) and 'detail' in j:
532        warn(j['detail'])
533    return j

Check the API to see if a Pipe exists.

Parameters
Returns
  • A bool indicating whether a pipe's underlying table exists.
def create_metadata(self, debug: bool = False) -> bool:
536def create_metadata(
537    self,
538    debug: bool = False
539) -> bool:
540    """Create metadata tables.
541
542    Returns
543    -------
544    A bool indicating success.
545    """
546    from meerschaum.utils.debug import dprint
547    from meerschaum.config.static import STATIC_CONFIG
548    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
549    response = self.post(r_url, debug=debug)
550    if debug:
551        dprint("Create metadata response: {response.text}")
552    try:
553        _ = json.loads(response.text)
554    except Exception as e:
555        warn(f"Failed to create metadata on {self}:\n{e}")
556    return False

Create metadata tables.

Returns
  • A bool indicating success.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
559def get_pipe_rowcount(
560    self,
561    pipe: mrsm.Pipe,
562    begin: Optional[datetime] = None,
563    end: Optional[datetime] = None,
564    params: Optional[Dict[str, Any]] = None,
565    remote: bool = False,
566    debug: bool = False,
567) -> int:
568    """Get a pipe's row count from the API.
569
570    Parameters
571    ----------
572    pipe: 'meerschaum.Pipe':
573        The pipe whose row count we are counting.
574        
575    begin: Optional[datetime], default None
576        If provided, bound the count by this datetime.
577
578    end: Optional[datetime]
579        If provided, bound the count by this datetime.
580
581    params: Optional[Dict[str, Any]], default None
582        If provided, bound the count by these parameters.
583
584    remote: bool, default False
585        If `True`, return the rowcount for the fetch definition.
586
587    Returns
588    -------
589    The number of rows in the pipe's table, bound the given parameters.
590    If the table does not exist, return 0.
591    """
592    r_url = pipe_r_url(pipe)
593    response = self.get(
594        r_url + "/rowcount",
595        json = params,
596        params = {
597            'begin': begin,
598            'end': end,
599            'remote': remote,
600        },
601        debug = debug
602    )
603    if not response:
604        warn(f"Failed to get the rowcount for {pipe}:\n{response.text}")
605        return 0
606    try:
607        return int(json.loads(response.text))
608    except Exception as e:
609        warn(f"Failed to get the rowcount for {pipe}:\n{e}")
610    return 0

Get a pipe's row count from the API.

Parameters
  • pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
  • begin (Optional[datetime], default None): If provided, bound the count by this datetime.
  • end (Optional[datetime]): If provided, bound the count by this datetime.
  • params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
  • remote (bool, default False): If True, return the rowcount for the fetch definition.
Returns
  • The number of rows in the pipe's table, bound the given parameters.
  • If the table does not exist, return 0.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
613def drop_pipe(
614    self,
615    pipe: mrsm.Pipe,
616    debug: bool = False
617) -> SuccessTuple:
618    """
619    Drop a pipe's table but maintain its registration.
620
621    Parameters
622    ----------
623    pipe: meerschaum.Pipe:
624        The pipe to be dropped.
625        
626    Returns
627    -------
628    A success tuple (bool, str).
629    """
630    from meerschaum.utils.warnings import error
631    from meerschaum.utils.debug import dprint
632    if pipe is None:
633        error(f"Pipe cannot be None.")
634    r_url = pipe_r_url(pipe)
635    response = self.delete(
636        r_url + '/drop',
637        debug = debug,
638    )
639    if debug:
640        dprint(response.text)
641
642    try:
643        data = response.json()
644    except Exception as e:
645        return False, f"Failed to drop {pipe}."
646
647    if isinstance(data, list):
648        response_tuple = data[0], data[1]
649    elif 'detail' in response.json():
650        response_tuple = response.__bool__(), data['detail']
651    else:
652        response_tuple = response.__bool__(), response.text
653
654    return response_tuple

Drop a pipe's table but maintain its registration.

Parameters
Returns
  • A success tuple (bool, str).
def clear_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw) -> Tuple[bool, str]:
657def clear_pipe(
658    self,
659    pipe: mrsm.Pipe,
660    debug: bool = False,
661    **kw
662) -> SuccessTuple:
663    """
664    Delete rows in a pipe's table.
665
666    Parameters
667    ----------
668    pipe: meerschaum.Pipe
669        The pipe with rows to be deleted.
670        
671    Returns
672    -------
673    A success tuple.
674    """
675    kw.pop('metric_keys', None)
676    kw.pop('connector_keys', None)
677    kw.pop('location_keys', None)
678    kw.pop('action', None)
679    kw.pop('force', None)
680    return self.do_action_legacy(
681        ['clear', 'pipes'],
682        connector_keys=pipe.connector_keys,
683        metric_keys=pipe.metric_key,
684        location_keys=pipe.location_key,
685        force=True,
686        debug=debug,
687        **kw
688    )

Delete rows in a pipe's table.

Parameters
Returns
  • A success tuple.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
691def get_pipe_columns_types(
692    self,
693    pipe: mrsm.Pipe,
694    debug: bool = False,
695) -> Union[Dict[str, str], None]:
696    """
697    Fetch the columns and types of the pipe's table.
698
699    Parameters
700    ----------
701    pipe: meerschaum.Pipe
702        The pipe whose columns to be queried.
703
704    Returns
705    -------
706    A dictionary mapping column names to their database types.
707
708    Examples
709    --------
710    >>> {
711    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
712    ...   'id': 'BIGINT',
713    ...   'val': 'DOUBLE PRECISION',
714    ... }
715    >>>
716    """
717    r_url = pipe_r_url(pipe) + '/columns/types'
718    response = self.get(
719        r_url,
720        debug=debug
721    )
722    j = response.json()
723    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
724        warn(j['detail'])
725        return None
726    if not isinstance(j, dict):
727        warn(response.text)
728        return None
729    return j

Fetch the columns and types of the pipe's table.

Parameters
Returns
  • A dictionary mapping column names to their database types.
Examples
>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
def get_pipe_columns_indices( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
732def get_pipe_columns_indices(
733    self,
734    pipe: mrsm.Pipe,
735    debug: bool = False,
736) -> Union[Dict[str, str], None]:
737    """
738    Fetch the index information for a pipe.
739
740    Parameters
741    ----------
742    pipe: mrsm.Pipe
743        The pipe whose columns to be queried.
744
745    Returns
746    -------
747    A dictionary mapping column names to a list of associated index information.
748    """
749    r_url = pipe_r_url(pipe) + '/columns/indices'
750    response = self.get(
751        r_url,
752        debug=debug
753    )
754    j = response.json()
755    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
756        warn(j['detail'])
757        return None
758    if not isinstance(j, dict):
759        warn(response.text)
760        return None
761    return j

Fetch the index information for a pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose columns to be queried.
Returns
  • A dictionary mapping column names to a list of associated index information.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, str, int] = '', end: Union[datetime.datetime, int] = None, params: 'Optional[Dict, Any]' = None, debug: bool = False, **kw: Any) -> Iterator[pandas.core.frame.DataFrame]:
16def fetch(
17        self,
18        pipe: mrsm.Pipe,
19        begin: Union[datetime, str, int] = '',
20        end: Union[datetime, int] = None,
21        params: Optional[Dict, Any] = None,
22        debug: bool = False,
23        **kw: Any
24    ) -> Iterator['pd.DataFrame']:
25    """Get the Pipe data from the remote Pipe."""
26    from meerschaum.utils.debug import dprint
27    from meerschaum.utils.warnings import warn, error
28    from meerschaum.config._patch import apply_patch_to_config
29
30    fetch_params = pipe.parameters.get('fetch', {})
31    if not fetch_params:
32        warn(f"Missing 'fetch' parameters for {pipe}.", stack=False)
33        return None
34
35    pipe_meta = fetch_params.get('pipe', {})
36    ### Legacy: check for `connector_keys`, etc. at the root.
37    if not pipe_meta:
38        ck, mk, lk = (
39            fetch_params.get('connector_keys', None),
40            fetch_params.get('metric_key', None),
41            fetch_params.get('location_key', None),
42        )
43        if not ck or not mk:
44            warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False)
45            return None
46
47        pipe_meta.update({
48            'connector': ck,
49            'metric': mk,
50            'location': lk,
51        })
52
53    pipe_meta['instance'] = self
54    source_pipe = mrsm.Pipe(**pipe_meta)
55
56    _params = copy.deepcopy(params) if params is not None else {}
57    _params = apply_patch_to_config(_params, fetch_params.get('params', {}))
58    select_columns = fetch_params.get('select_columns', [])
59    omit_columns = fetch_params.get('omit_columns', [])
60
61    return source_pipe.get_data(
62        select_columns = select_columns,
63        omit_columns = omit_columns,
64        begin = begin,
65        end = end,
66        params = _params,
67        debug = debug,
68        as_iterator = True,
69    )

Get the Pipe data from the remote Pipe.

def register_plugin( self, plugin: meerschaum.Plugin, make_archive: bool = True, debug: bool = False) -> Tuple[bool, str]:
20def register_plugin(
21        self,
22        plugin: meerschaum.core.Plugin,
23        make_archive: bool = True,
24        debug: bool = False,
25    ) -> SuccessTuple:
26    """Register a plugin and upload its archive."""
27    import json
28    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
29    file_pointer = open(archive_path, 'rb')
30    files = {'archive': file_pointer}
31    metadata = {
32        'version': plugin.version,
33        'attributes': json.dumps(plugin.attributes),
34    }
35    r_url = plugin_r_url(plugin)
36    try:
37        response = self.post(r_url, files=files, params=metadata, debug=debug)
38    except Exception as e:
39        return False, f"Failed to register plugin '{plugin}'."
40    finally:
41        file_pointer.close()
42
43    try:
44        success, msg = json.loads(response.text)
45    except Exception as e:
46        return False, response.text
47
48    return success, msg

Register a plugin and upload its archive.

def install_plugin( self, name: str, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
50def install_plugin(
51        self,
52        name: str,
53        skip_deps: bool = False,
54        force: bool = False,
55        debug: bool = False
56    ) -> SuccessTuple:
57    """Download and attempt to install a plugin from the API."""
58    import os, pathlib, json
59    from meerschaum.core import Plugin
60    from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH
61    from meerschaum.utils.debug import dprint
62    from meerschaum.utils.packages import attempt_import
63    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
64    r_url = plugin_r_url(name)
65    dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
66    if debug:
67        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
68    archive_path = self.wget(r_url, dest, debug=debug) 
69    is_binary = binaryornot_check.is_binary(str(archive_path))
70    if not is_binary:
71        fail_msg = f"Failed to download binary for plugin '{name}'."
72        try:
73            with open(archive_path, 'r') as f:
74                j = json.load(f)
75            if isinstance(j, list):
76                success, msg = tuple(j)
77            elif isinstance(j, dict) and 'detail' in j:
78                success, msg = False, fail_msg
79        except Exception as e:
80            success, msg = False, fail_msg
81        return success, msg
82    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
83    return plugin.install(skip_deps=skip_deps, force=force, debug=debug)

Download and attempt to install a plugin from the API.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False) -> Tuple[bool, str]:
149def delete_plugin(
150        self,
151        plugin: meerschaum.core.Plugin,
152        debug: bool = False
153    ) -> SuccessTuple:
154    """Delete a plugin from an API repository."""
155    import json
156    r_url = plugin_r_url(plugin)
157    try:
158        response = self.delete(r_url, debug=debug)
159    except Exception as e:
160        return False, f"Failed to delete plugin '{plugin}'."
161
162    try:
163        success, msg = json.loads(response.text)
164    except Exception as e:
165        return False, response.text
166
167    return success, msg

Delete a plugin from an API repository.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) -> Sequence[str]:
 85def get_plugins(
 86        self,
 87        user_id : Optional[int] = None,
 88        search_term : Optional[str] = None,
 89        debug : bool = False
 90    ) -> Sequence[str]:
 91    """Return a list of registered plugin names.
 92
 93    Parameters
 94    ----------
 95    user_id :
 96        If specified, return all plugins from a certain user.
 97    user_id : Optional[int] :
 98         (Default value = None)
 99    search_term : Optional[str] :
100         (Default value = None)
101    debug : bool :
102         (Default value = False)
103
104    Returns
105    -------
106
107    """
108    import json
109    from meerschaum.utils.warnings import warn, error
110    from meerschaum.config.static import STATIC_CONFIG
111    response = self.get(
112        STATIC_CONFIG['api']['endpoints']['plugins'],
113        params = {'user_id' : user_id, 'search_term' : search_term},
114        use_token = True,
115        debug = debug
116    )
117    if not response:
118        return []
119    plugins = json.loads(response.text)
120    if not isinstance(plugins, list):
121        error(response.text)
122    return plugins

Return a list of registered plugin names.

Parameters
  • user_id :: If specified, return all plugins from a certain user.
  • user_id (Optional[int] :): (Default value = None)
  • search_term (Optional[str] :): (Default value = None)
  • debug (bool :): (Default value = False)
  • Returns
  • -------
def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Mapping[str, Any]:
124def get_plugin_attributes(
125        self,
126        plugin: meerschaum.core.Plugin,
127        debug: bool = False
128    ) -> Mapping[str, Any]:
129    """
130    Return a plugin's attributes.
131    """
132    import json
133    from meerschaum.utils.warnings import warn, error
134    r_url = plugin_r_url(plugin) + '/attributes'
135    response = self.get(r_url, use_token=True, debug=debug)
136    attributes = response.json()
137    if isinstance(attributes, str) and attributes and attributes[0] == '{':
138        try:
139            attributes = json.loads(attributes)
140        except Exception as e:
141            pass
142    if not isinstance(attributes, dict):
143        error(response.text)
144    elif not response and 'detail' in attributes:
145        warn(attributes['detail'])
146        return {}
147    return attributes

Return a plugin's attributes.

def login( self, debug: bool = False, warn: bool = True, **kw: Any) -> Tuple[bool, str]:
19def login(
20    self,
21    debug: bool = False,
22    warn: bool = True,
23    **kw: Any
24) -> SuccessTuple:
25    """Log in and set the session token."""
26    try:
27        login_data = {
28            'username': self.username,
29            'password': self.password,
30        }
31    except AttributeError:
32        return False, f"Please login with the command `login {self}`."
33
34    response = self.post(
35        STATIC_CONFIG['api']['endpoints']['login'],
36        data=login_data,
37        use_token=False,
38        debug=debug,
39    )
40    if response:
41        msg = f"Successfully logged into '{self}' as user '{login_data['username']}'."
42        self._token = json.loads(response.text)['access_token']
43        self._expires = datetime.datetime.strptime(
44            json.loads(response.text)['expires'], 
45            '%Y-%m-%dT%H:%M:%S.%f'
46        )
47    else:
48        msg = (
49            f"Failed to log into '{self}' as user '{login_data['username']}'.\n" +
50            f"    Please verify login details for connector '{self}'."
51        )
52        if warn and not self.__dict__.get('_emitted_warning', False):
53            _warn(msg, stack=False)
54            self._emitted_warning = True
55
56    return response.__bool__(), msg

Log in and set the session token.

def test_connection(self, **kw: Any) -> Optional[bool]:
59def test_connection(
60    self,
61    **kw: Any
62) -> Union[bool, None]:
63    """Test if a successful connection to the API may be made."""
64    from meerschaum.connectors.poll import retry_connect
65    _default_kw = {
66        'max_retries': 1, 'retry_wait': 0, 'warn': False,
67        'connector': self, 'enforce_chaining': False,
68        'enforce_login': False,
69    }
70    _default_kw.update(kw)
71    try:
72        return retry_connect(**_default_kw)
73    except Exception:
74        return False

Test if a successful connection to the API may be made.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
65def register_user(
66        self,
67        user: 'meerschaum.core.User',
68        debug: bool = False,
69        **kw: Any
70    ) -> SuccessTuple:
71    """Register a new user."""
72    import json
73    from meerschaum.config.static import STATIC_CONFIG
74    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register"
75    data = {
76        'username': user.username,
77        'password': user.password,
78        'attributes': json.dumps(user.attributes),
79    }
80    if user.type:
81        data['type'] = user.type
82    if user.email:
83        data['email'] = user.email
84    response = self.post(r_url, data=data, debug=debug)
85    try:
86        _json = json.loads(response.text)
87        if isinstance(_json, dict) and 'detail' in _json:
88            return False, _json['detail']
89        success_tuple = tuple(_json)
90    except Exception:
91        msg = response.text if response else f"Failed to register user '{user}'."
92        return False, msg
93
94    return tuple(success_tuple)

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[int]:
 97def get_user_id(
 98        self,
 99        user: 'meerschaum.core.User',
100        debug: bool = False,
101        **kw: Any
102    ) -> Optional[int]:
103    """Get a user's ID."""
104    from meerschaum.config.static import STATIC_CONFIG
105    import json
106    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id"
107    response = self.get(r_url, debug=debug, **kw)
108    try:
109        user_id = int(json.loads(response.text))
110    except Exception as e:
111        user_id = None
112    return user_id

Get a user's ID.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
13def get_users(
14        self,
15        debug: bool = False,
16        **kw : Any
17    ) -> List[str]:
18    """
19    Return a list of registered usernames.
20    """
21    from meerschaum.config.static import STATIC_CONFIG
22    import json
23    response = self.get(
24        f"{STATIC_CONFIG['api']['endpoints']['users']}",
25        debug = debug,
26        use_token = True,
27    )
28    if not response:
29        return []
30    try:
31        return response.json()
32    except Exception as e:
33        return []

Return a list of registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
35def edit_user(
36        self,
37        user: 'meerschaum.core.User',
38        debug: bool = False,
39        **kw: Any
40    ) -> SuccessTuple:
41    """Edit an existing user."""
42    import json
43    from meerschaum.config.static import STATIC_CONFIG
44    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
45    data = {
46        'username': user.username,
47        'password': user.password,
48        'type': user.type,
49        'email': user.email,
50        'attributes': json.dumps(user.attributes),
51    }
52    response = self.post(r_url, data=data, debug=debug)
53    try:
54        _json = json.loads(response.text)
55        if isinstance(_json, dict) and 'detail' in _json:
56            return False, _json['detail']
57        success_tuple = tuple(_json)
58    except Exception as e:
59        msg = response.text if response else f"Failed to edit user '{user}'."
60        return False, msg
61
62    return tuple(success_tuple)

Edit an existing user.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
114def delete_user(
115        self,
116        user: 'meerschaum.core.User',
117        debug: bool = False,
118        **kw: Any
119    ) -> SuccessTuple:
120    """Delete a user."""
121    from meerschaum.config.static import STATIC_CONFIG
122    import json
123    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}"
124    response = self.delete(r_url, debug=debug)
125    try:
126        _json = json.loads(response.text)
127        if isinstance(_json, dict) and 'detail' in _json:
128            return False, _json['detail']
129        success_tuple = tuple(_json)
130    except Exception as e:
131        success_tuple = False, f"Failed to delete user '{user.username}'."
132    return success_tuple

Delete a user.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
155def get_user_password_hash(
156        self,
157        user: 'meerschaum.core.User',
158        debug: bool = False,
159        **kw: Any
160    ) -> Optional[str]:
161    """If configured, get a user's password hash."""
162    from meerschaum.config.static import STATIC_CONFIG
163    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
164    response = self.get(r_url, debug=debug, **kw)
165    if not response:
166        return None
167    return response.json()

If configured, get a user's password hash.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
169def get_user_type(
170        self,
171        user: 'meerschaum.core.User',
172        debug: bool = False,
173        **kw: Any
174    ) -> Optional[str]:
175    """If configured, get a user's type."""
176    from meerschaum.config.static import STATIC_CONFIG
177    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type'
178    response = self.get(r_url, debug=debug, **kw)
179    if not response:
180        return None
181    return response.json()

If configured, get a user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw) -> int:
134def get_user_attributes(
135        self,
136        user: 'meerschaum.core.User',
137        debug: bool = False,
138        **kw
139    ) -> int:
140    """Get a user's attributes."""
141    from meerschaum.config.static import STATIC_CONFIG
142    import json
143    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes"
144    response = self.get(r_url, debug=debug, **kw)
145    try:
146        attributes = json.loads(response.text)
147    except Exception as e:
148        attributes = None
149    return attributes

Get a user's attributes.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[APIConnector, Dict[str, Union[str, int]]]:
13@classmethod
14def from_uri(
15    cls,
16    uri: str,
17    label: Optional[str] = None,
18    as_dict: bool = False,
19) -> Union[
20        'meerschaum.connectors.APIConnector',
21        Dict[str, Union[str, int]],
22    ]:
23    """
24    Create a new APIConnector from a URI string.
25
26    Parameters
27    ----------
28    uri: str
29        The URI connection string.
30
31    label: Optional[str], default None
32        If provided, use this as the connector label.
33        Otherwise use the determined database name.
34
35    as_dict: bool, default False
36        If `True`, return a dictionary of the keyword arguments
37        necessary to create a new `APIConnector`, otherwise create a new object.
38
39    Returns
40    -------
41    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
42    """
43    from meerschaum.connectors.sql import SQLConnector
44    params = SQLConnector.parse_uri(uri)
45    if 'host' not in params:
46        error("No host was found in the provided URI.")
47    params['protocol'] = params.pop('flavor')
48    params['label'] = label or (
49        (
50            (params['username'] + '@' if 'username' in params else '')
51            + params['host']
52        ).lower()
53    )
54
55    return cls(**params) if not as_dict else params

Create a new APIConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.
Returns
  • A new APIConnector object or a dictionary of attributes (if as_dict is True).
def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
29    """
30    Return a dictionary of remote jobs.
31    """
32    response = self.get(JOBS_ENDPOINT, debug=debug)
33    if not response:
34        warn(f"Failed to get remote jobs from {self}.")
35        return {}
36    return {
37        name: Job(
38            name,
39            job_meta['sysargs'],
40            executor_keys=str(self),
41            _properties=job_meta['daemon']['properties']
42        )
43        for name, job_meta in response.json().items()
44    }

Return a dictionary of remote jobs.

def get_job(self, name: str, debug: bool = False) -> meerschaum.Job:
47def get_job(self, name: str, debug: bool = False) -> Job:
48    """
49    Return a single Job object.
50    """
51    metadata = self.get_job_metadata(name, debug=debug)
52    if not metadata:
53        raise ValueError(f"Job '{name}' does not exist.")
54
55    return Job(
56        name,
57        metadata['sysargs'],
58        executor_keys=str(self),
59        _properties=metadata['daemon']['properties'],
60    )

Return a single Job object.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 64    """
 65    Return the metadata for a single job.
 66    """
 67    now = time.perf_counter()
 68    _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None)
 69    _job_metadata_timestamp = (
 70        _job_metadata_cache.get(name, {}).get('timestamp', None)
 71    ) if _job_metadata_cache is not None else None
 72
 73    if (
 74        _job_metadata_timestamp is not None
 75        and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS
 76    ):
 77        if debug:
 78            dprint(f"Returning cached metadata for job '{name}'.")
 79        return _job_metadata_cache[name]['metadata']
 80
 81    response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug)
 82    if not response:
 83        if debug:
 84            msg = (
 85                response.json()['detail']
 86                if 'detail' in response.text
 87                else response.text
 88            )
 89            warn(f"Failed to get metadata for job '{name}':\n{msg}")
 90        return {}
 91
 92    metadata = response.json()
 93    if _job_metadata_cache is None:
 94        self._job_metadata_cache = {}
 95
 96    self._job_metadata_cache[name] = {
 97        'timestamp': now,
 98        'metadata': metadata,
 99    }
100    return metadata

Return the metadata for a single job.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
103    """
104    Return the daemon properties for a single job.
105    """
106    metadata = self.get_job_metadata(name, debug=debug)
107    return metadata.get('daemon', {}).get('properties', {})

Return the daemon properties for a single job.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
149def get_job_exists(self, name: str, debug: bool = False) -> bool:
150    """
151    Return whether a job exists.
152    """
153    response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug)
154    if not response:
155        warn(f"Failed to determine whether job '{name}' exists.")
156        return False
157
158    return response.json()

Return whether a job exists.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
162    """
163    Delete a job.
164    """
165    response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug)
166    if not response:
167        if 'detail' in response.text:
168            return False, response.json()['detail']
169
170        return False, response.text
171
172    return tuple(response.json())

Delete a job.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
176    """
177    Start a job.
178    """
179    response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug)
180    if not response:
181        if 'detail' in response.text:
182            return False, response.json()['detail']
183        return False, response.text
184
185    return tuple(response.json())

Start a job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, str]] = None, debug: bool = False) -> Tuple[bool, str]:
188def create_job(
189    self,
190    name: str,
191    sysargs: List[str],
192    properties: Optional[Dict[str, str]] = None,
193    debug: bool = False,
194) -> SuccessTuple:
195    """
196    Create a job.
197    """
198    response = self.post(
199        JOBS_ENDPOINT + f"/{name}",
200        json={
201            'sysargs': sysargs,
202            'properties': properties,
203        },
204        debug=debug,
205    )
206    if not response:
207        if 'detail' in response.text:
208            return False, response.json()['detail']
209        return False, response.text
210
211    return tuple(response.json())

Create a job.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
215    """
216    Stop a job.
217    """
218    response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug)
219    if not response:
220        if 'detail' in response.text:
221            return False, response.json()['detail']
222        return False, response.text
223
224    return tuple(response.json())

Stop a job.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
228    """
229    Pause a job.
230    """
231    response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug)
232    if not response:
233        if 'detail' in response.text:
234            return False, response.json()['detail']
235        return False, response.text
236
237    return tuple(response.json())

Pause a job.

def get_logs(self, name: str, debug: bool = False) -> str:
240def get_logs(self, name: str, debug: bool = False) -> str:
241    """
242    Return the logs for a job.
243    """
244    response = self.get(LOGS_ENDPOINT + f"/{name}")
245    if not response:
246        raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}")
247
248    return response.json()

Return the logs for a job.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
252    """
253    Return the job's manual stop time.
254    """
255    response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time")
256    if not response:
257        warn(f"Failed to get stop time for job '{name}':\n{response.text}")
258        return None
259
260    data = response.json()
261    if data is None:
262        return None
263
264    return datetime.fromisoformat(data)

Return the job's manual stop time.

def monitor_logs( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[NoneType], str], stop_callback_function: Callable[[NoneType], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
348def monitor_logs(
349    self,
350    name: str,
351    callback_function: Callable[[Any], Any],
352    input_callback_function: Callable[[None], str],
353    stop_callback_function: Callable[[None], str],
354    stop_on_exit: bool = False,
355    strip_timestamps: bool = False,
356    accept_input: bool = True,
357    debug: bool = False,
358):
359    """
360    Monitor a job's log files and execute a callback with the changes.
361    """
362    return asyncio.run(
363        self.monitor_logs_async(
364            name,
365            callback_function,
366            input_callback_function=input_callback_function,
367            stop_callback_function=stop_callback_function,
368            stop_on_exit=stop_on_exit,
369            strip_timestamps=strip_timestamps,
370            accept_input=accept_input,
371            debug=debug
372        )
373    )

Monitor a job's log files and execute a callback with the changes.

async def monitor_logs_async( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[], str], stop_callback_function: Callable[[Tuple[bool, str]], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
267async def monitor_logs_async(
268    self,
269    name: str,
270    callback_function: Callable[[Any], Any],
271    input_callback_function: Callable[[], str],
272    stop_callback_function: Callable[[SuccessTuple], str],
273    stop_on_exit: bool = False,
274    strip_timestamps: bool = False,
275    accept_input: bool = True,
276    debug: bool = False,
277):
278    """
279    Monitor a job's log files and await a callback with the changes.
280    """
281    import traceback
282    from meerschaum.jobs import StopMonitoringLogs
283    from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
284
285    websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions')
286    protocol = 'ws' if self.URI.startswith('http://') else 'wss'
287    port = self.port if 'port' in self.__dict__ else ''
288    uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws"
289
290    async def _stdin_callback(client):
291        if input_callback_function is None:
292            return
293
294        if asyncio.iscoroutinefunction(input_callback_function):
295            data = await input_callback_function()
296        else:
297            data = input_callback_function()
298
299        await client.send(data)
300
301    async def _stop_callback(client):
302        try:
303            result = tuple(json.loads(await client.recv()))
304        except Exception as e:
305            warn(traceback.format_exc())
306            result = False, str(e)
307
308        if stop_callback_function is not None:
309            if asyncio.iscoroutinefunction(stop_callback_function):
310                await stop_callback_function(result)
311            else:
312                stop_callback_function(result)
313
314        if stop_on_exit:
315            raise StopMonitoringLogs
316
317    message_callbacks = {
318        JOBS_STDIN_MESSAGE: _stdin_callback,
319        JOBS_STOP_MESSAGE: _stop_callback,
320    }
321
322    async with websockets.connect(uri) as websocket:
323        try:
324            await websocket.send(self.token or 'no-login')
325        except websockets_exceptions.ConnectionClosedOK:
326            pass
327
328        while True:
329            try:
330                response = await websocket.recv()
331                callback = message_callbacks.get(response, None)
332                if callback is not None:
333                    await callback(websocket)
334                    continue
335
336                if strip_timestamps:
337                    response = strip_timestamp_from_line(response)
338
339                if asyncio.iscoroutinefunction(callback_function):
340                    await callback_function(response)
341                else:
342                    callback_function(response)
343            except (KeyboardInterrupt, StopMonitoringLogs):
344                await websocket.close()
345                break

Monitor a job's log files and await a callback with the changes.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
376    """
377    Return whether a remote job is blocking on stdin.
378    """
379    response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug)
380    if not response:
381        return False
382
383    return response.json()

Return whether a remote job is blocking on stdin.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
117    """
118    Return a job's `began` timestamp, if it exists.
119    """
120    properties = self.get_job_properties(name, debug=debug)
121    began_str = properties.get('daemon', {}).get('began', None)
122    if began_str is None:
123        return None
124
125    return began_str

Return a job's began timestamp, if it exists.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
128    """
129    Return a job's `ended` timestamp, if it exists.
130    """
131    properties = self.get_job_properties(name, debug=debug)
132    ended_str = properties.get('daemon', {}).get('ended', None)
133    if ended_str is None:
134        return None
135
136    return ended_str

Return a job's ended timestamp, if it exists.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
139    """
140    Return a job's `paused` timestamp, if it exists.
141    """
142    properties = self.get_job_properties(name, debug=debug)
143    paused_str = properties.get('daemon', {}).get('paused', None)
144    if paused_str is None:
145        return None
146
147    return paused_str

Return a job's paused timestamp, if it exists.

def get_job_status(self, name: str, debug: bool = False) -> str:
109def get_job_status(self, name: str, debug: bool = False) -> str:
110    """
111    Return the job's status.
112    """
113    metadata = self.get_job_metadata(name, debug=debug)
114    return metadata.get('status', 'stopped')

Return the job's status.