meerschaum.connectors.api

Interact with the Meerschaum API (send commands, pull data, etc.)

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Interact with the Meerschaum API (send commands, pull data, etc.)
 7"""
 8
 9from meerschaum.connectors.api._APIConnector import APIConnector
10
11__all__ = ('APIConnector',)
class APIConnector(meerschaum.connectors._Connector.Connector):
 20class APIConnector(Connector):
 21    """
 22    Connect to a Meerschaum API instance.
 23    """
 24
 25    IS_INSTANCE: bool = True
 26    IS_THREAD_SAFE: bool = False
 27
 28    OPTIONAL_ATTRIBUTES: List[str] = ['port']
 29
 30    from ._request import (
 31        make_request,
 32        get,
 33        post,
 34        put,
 35        patch,
 36        delete,
 37        wget,
 38    )
 39    from ._actions import (
 40        get_actions,
 41        do_action,
 42        do_action_async,
 43        do_action_legacy,
 44    )
 45    from ._misc import get_mrsm_version, get_chaining_status
 46    from ._pipes import (
 47        register_pipe,
 48        fetch_pipes_keys,
 49        edit_pipe,
 50        sync_pipe,
 51        delete_pipe,
 52        get_pipe_data,
 53        get_pipe_id,
 54        get_pipe_attributes,
 55        get_sync_time,
 56        pipe_exists,
 57        create_metadata,
 58        get_pipe_rowcount,
 59        drop_pipe,
 60        clear_pipe,
 61        get_pipe_columns_types,
 62    )
 63    from ._fetch import fetch
 64    from ._plugins import (
 65        register_plugin,
 66        install_plugin,
 67        delete_plugin,
 68        get_plugins,
 69        get_plugin_attributes,
 70    )
 71    from ._login import login, test_connection
 72    from ._users import (
 73        register_user,
 74        get_user_id,
 75        get_users,
 76        edit_user,
 77        delete_user,
 78        get_user_password_hash,
 79        get_user_type,
 80        get_user_attributes,
 81    )
 82    from ._uri import from_uri
 83    from ._jobs import (
 84        get_jobs,
 85        get_job,
 86        get_job_metadata,
 87        get_job_properties,
 88        get_job_exists,
 89        delete_job,
 90        start_job,
 91        create_job,
 92        stop_job,
 93        pause_job,
 94        get_logs,
 95        get_job_stop_time,
 96        monitor_logs,
 97        monitor_logs_async,
 98        get_job_is_blocking_on_stdin,
 99        get_job_began,
100        get_job_ended,
101        get_job_paused,
102        get_job_status,
103    )
104
105    def __init__(
106        self,
107        label: Optional[str] = None,
108        wait: bool = False,
109        debug: bool = False,
110        **kw
111    ):
112        if 'uri' in kw:
113            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
114            label = label or from_uri_params.get('label', None)
115            _ = from_uri_params.pop('label', None)
116            kw.update(from_uri_params)
117
118        super().__init__('api', label=label, **kw)
119        if 'protocol' not in self.__dict__:
120            self.protocol = (
121                'https' if self.__dict__.get('uri', '').startswith('https')
122                else 'http'
123            )
124
125        if 'uri' not in self.__dict__:
126            self.verify_attributes(required_attributes)
127        else:
128            from meerschaum.connectors.sql import SQLConnector
129            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
130            if 'host' not in conn_attrs:
131                raise Exception(f"Invalid URI for '{self}'.")
132            self.__dict__.update(conn_attrs)
133
134        self.url = (
135            self.protocol + '://' +
136            self.host
137            + (
138                (':' + str(self.port))
139                if self.__dict__.get('port', None)
140                else ''
141            )
142        )
143        self._token = None
144        self._expires = None
145        self._session = None
146
147
148    @property
149    def URI(self) -> str:
150        """
151        Return the fully qualified URI.
152        """
153        username = self.__dict__.get('username', None)
154        password = self.__dict__.get('password', None)
155        creds = (username + ':' + password + '@') if username and password else ''
156        return (
157            self.protocol
158            + '://'
159            + creds
160            + self.host
161            + (
162                (':' + str(self.port))
163                if self.__dict__.get('port', None)
164                else ''
165            )
166        )
167
168
169    @property
170    def session(self):
171        if self._session is None:
172            certifi = attempt_import('certifi', lazy=False)
173            requests = attempt_import('requests', lazy=False)
174            if requests:
175                self._session = requests.Session()
176            if self._session is None:
177                error(f"Failed to import requests. Is requests installed?")
178        return self._session
179
180    @property
181    def token(self):
182        expired = (
183            True if self._expires is None else (
184                (
185                    self._expires
186                    <
187                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
188                )
189            )
190        )
191
192        if self._token is None or expired:
193            success, msg = self.login()
194            if not success:
195                warn(msg, stack=False)
196        return self._token

Connect to a Meerschaum API instance.

APIConnector( label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)
105    def __init__(
106        self,
107        label: Optional[str] = None,
108        wait: bool = False,
109        debug: bool = False,
110        **kw
111    ):
112        if 'uri' in kw:
113            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
114            label = label or from_uri_params.get('label', None)
115            _ = from_uri_params.pop('label', None)
116            kw.update(from_uri_params)
117
118        super().__init__('api', label=label, **kw)
119        if 'protocol' not in self.__dict__:
120            self.protocol = (
121                'https' if self.__dict__.get('uri', '').startswith('https')
122                else 'http'
123            )
124
125        if 'uri' not in self.__dict__:
126            self.verify_attributes(required_attributes)
127        else:
128            from meerschaum.connectors.sql import SQLConnector
129            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
130            if 'host' not in conn_attrs:
131                raise Exception(f"Invalid URI for '{self}'.")
132            self.__dict__.update(conn_attrs)
133
134        self.url = (
135            self.protocol + '://' +
136            self.host
137            + (
138                (':' + str(self.port))
139                if self.__dict__.get('port', None)
140                else ''
141            )
142        )
143        self._token = None
144        self._expires = None
145        self._session = None

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
IS_INSTANCE: bool = True
IS_THREAD_SAFE: bool = False
OPTIONAL_ATTRIBUTES: List[str] = ['port']
url
URI: str
148    @property
149    def URI(self) -> str:
150        """
151        Return the fully qualified URI.
152        """
153        username = self.__dict__.get('username', None)
154        password = self.__dict__.get('password', None)
155        creds = (username + ':' + password + '@') if username and password else ''
156        return (
157            self.protocol
158            + '://'
159            + creds
160            + self.host
161            + (
162                (':' + str(self.port))
163                if self.__dict__.get('port', None)
164                else ''
165            )
166        )

Return the fully qualified URI.

session
169    @property
170    def session(self):
171        if self._session is None:
172            certifi = attempt_import('certifi', lazy=False)
173            requests = attempt_import('requests', lazy=False)
174            if requests:
175                self._session = requests.Session()
176            if self._session is None:
177                error(f"Failed to import requests. Is requests installed?")
178        return self._session
token
180    @property
181    def token(self):
182        expired = (
183            True if self._expires is None else (
184                (
185                    self._expires
186                    <
187                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
188                )
189            )
190        )
191
192        if self._token is None or expired:
193            success, msg = self.login()
194            if not success:
195                warn(msg, stack=False)
196        return self._token
def make_request( self, method: str, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kwargs: Any) -> requests.models.Response:
 28def make_request(
 29    self,
 30    method: str,
 31    r_url: str,
 32    headers: Optional[Dict[str, Any]] = None,
 33    use_token: bool = True,
 34    debug: bool = False,
 35    **kwargs: Any
 36) -> 'requests.Response':
 37    """
 38    Make a request to this APIConnector's endpoint using the in-memory session.
 39
 40    Parameters
 41    ----------
 42    method: str
 43        The kind of request to make.
 44        Accepted values:
 45        - `'GET'`
 46        - `'OPTIONS'`
 47        - `'HEAD'`
 48        - `'POST'`
 49        - `'PUT'`
 50        - `'PATCH'`
 51        - `'DELETE'`
 52
 53    r_url: str
 54        The relative URL for the endpoint (e.g. `'/pipes'`).
 55
 56    headers: Optional[Dict[str, Any]], default None
 57        The headers to use for the request.
 58        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
 59
 60    use_token: bool, default True
 61        If `True`, add the authorization token to the headers.
 62
 63    debug: bool, default False
 64        Verbosity toggle.
 65
 66    kwargs: Any
 67        All other keyword arguments are passed to `requests.request`.
 68
 69    Returns
 70    -------
 71    A `requests.Reponse` object.
 72    """
 73    if method.upper() not in METHODS:
 74        raise ValueError(f"Method '{method}' is not supported.")
 75
 76    verify = self.__dict__.get('verify', None)
 77    if 'verify' not in kwargs and isinstance(verify, bool):
 78        kwargs['verify'] = verify
 79
 80    headers = (
 81        copy.deepcopy(headers)
 82        if isinstance(headers, dict)
 83        else {}
 84    )
 85
 86    if use_token:
 87        headers.update({'Authorization': f'Bearer {self.token}'})
 88
 89    if 'timeout' not in kwargs:
 90        kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout']
 91
 92    request_url = urllib.parse.urljoin(self.url, r_url)
 93    if debug:
 94        dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}")
 95
 96    return self.session.request(
 97        method.upper(),
 98        request_url,
 99        headers = headers,
100        **kwargs
101    )

Make a request to this APIConnector's endpoint using the in-memory session.

Parameters
  • method (str): The kind of request to make. Accepted values:
    • 'GET'
    • 'OPTIONS'
    • 'HEAD'
    • 'POST'
    • 'PUT'
    • 'PATCH'
    • 'DELETE'
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def get(self, r_url: str, **kwargs: Any) -> requests.models.Response:
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response':
105    """
106    Wrapper for `requests.get`.
107
108    Parameters
109    ----------
110    r_url: str
111        The relative URL for the endpoint (e.g. `'/pipes'`).
112
113    headers: Optional[Dict[str, Any]], default None
114        The headers to use for the request.
115        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
116
117    use_token: bool, default True
118        If `True`, add the authorization token to the headers.
119
120    debug: bool, default False
121        Verbosity toggle.
122
123    kwargs: Any
124        All other keyword arguments are passed to `requests.request`.
125
126    Returns
127    -------
128    A `requests.Reponse` object.
129
130    """
131    return self.make_request('GET', r_url, **kwargs)

Wrapper for requests.get.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def post(self, r_url: str, **kwargs: Any) -> requests.models.Response:
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response':
135    """
136    Wrapper for `requests.post`.
137
138    Parameters
139    ----------
140    r_url: str
141        The relative URL for the endpoint (e.g. `'/pipes'`).
142
143    headers: Optional[Dict[str, Any]], default None
144        The headers to use for the request.
145        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
146
147    use_token: bool, default True
148        If `True`, add the authorization token to the headers.
149
150    debug: bool, default False
151        Verbosity toggle.
152
153    kwargs: Any
154        All other keyword arguments are passed to `requests.request`.
155
156    Returns
157    -------
158    A `requests.Reponse` object.
159
160    """
161    return self.make_request('POST', r_url, **kwargs)

Wrapper for requests.post.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def put(self, r_url: str, **kwargs: Any) -> requests.models.Response:
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response':
194    """
195    Wrapper for `requests.put`.
196
197    Parameters
198    ----------
199    r_url: str
200        The relative URL for the endpoint (e.g. `'/pipes'`).
201
202    headers: Optional[Dict[str, Any]], default None
203        The headers to use for the request.
204        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
205
206    use_token: bool, default True
207        If `True`, add the authorization token to the headers.
208
209    debug: bool, default False
210        Verbosity toggle.
211
212    kwargs: Any
213        All other keyword arguments are passed to `requests.request`.
214
215    Returns
216    -------
217    A `requests.Reponse` object.
218    """
219    return self.make_request('PUT', r_url, **kwargs)

Wrapper for requests.put.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def patch(self, r_url: str, **kwargs: Any) -> requests.models.Response:
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response':
165    """
166    Wrapper for `requests.patch`.
167
168    Parameters
169    ----------
170    r_url: str
171        The relative URL for the endpoint (e.g. `'/pipes'`).
172
173    headers: Optional[Dict[str, Any]], default None
174        The headers to use for the request.
175        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
176
177    use_token: bool, default True
178        If `True`, add the authorization token to the headers.
179
180    debug: bool, default False
181        Verbosity toggle.
182
183    kwargs: Any
184        All other keyword arguments are passed to `requests.request`.
185
186    Returns
187    -------
188    A `requests.Reponse` object.
189    """
190    return self.make_request('PATCH', r_url, **kwargs)

Wrapper for requests.patch.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def delete(self, r_url: str, **kwargs: Any) -> requests.models.Response:
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response':
223    """
224    Wrapper for `requests.delete`.
225
226    Parameters
227    ----------
228    r_url: str
229        The relative URL for the endpoint (e.g. `'/pipes'`).
230
231    headers: Optional[Dict[str, Any]], default None
232        The headers to use for the request.
233        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
234
235    use_token: bool, default True
236        If `True`, add the authorization token to the headers.
237
238    debug: bool, default False
239        Verbosity toggle.
240
241    kwargs: Any
242        All other keyword arguments are passed to `requests.request`.
243
244    Returns
245    -------
246    A `requests.Reponse` object.
247    """
248    return self.make_request('DELETE', r_url, **kwargs)

Wrapper for requests.delete.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def wget( self, r_url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
251def wget(
252        self,
253        r_url: str,
254        dest: Optional[Union[str, pathlib.Path]] = None,
255        headers: Optional[Dict[str, Any]] = None,
256        use_token: bool = True,
257        debug: bool = False,
258        **kw: Any
259    ) -> pathlib.Path:
260    """Mimic wget with requests.
261    """
262    from meerschaum.utils.misc import wget
263    if headers is None:
264        headers = {}
265    if use_token:
266        headers.update({'Authorization': f'Bearer {self.token}'})
267    request_url = urllib.parse.urljoin(self.url, r_url)
268    if debug:
269        dprint(
270            f"[{self}] Downloading {request_url}"
271            + (f' to {dest}' if dest is not None else '')
272            + "..."
273        )
274    return wget(request_url, dest=dest, headers=headers, **kw)

Mimic wget with requests.

def get_actions(self):
24def get_actions(self):
25    """Get available actions from the API instance."""
26    return self.get(ACTIONS_ENDPOINT)

Get available actions from the API instance.

def do_action(self, sysargs: List[str]) -> Tuple[bool, str]:
29def do_action(self, sysargs: List[str]) -> SuccessTuple:
30    """
31    Execute a Meerschaum action remotely.
32    """
33    return asyncio.run(self.do_action_async(sysargs))

Execute a Meerschaum action remotely.

async def do_action_async( self, sysargs: List[str], callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='')) -> Tuple[bool, str]:
36async def do_action_async(
37    self,
38    sysargs: List[str],
39    callback_function: Callable[[str], None] = partial(print, end=''),
40) -> SuccessTuple:
41    """
42    Execute an action as a temporary remote job.
43    """
44    from meerschaum._internal.arguments import remove_api_executor_keys
45    from meerschaum.utils.misc import generate_password
46    sysargs = remove_api_executor_keys(sysargs)
47
48    job_name = TEMP_PREFIX + generate_password(12)
49    job = mrsm.Job(job_name, sysargs, executor_keys=str(self))
50
51    start_success, start_msg = job.start()
52    if not start_success:
53        return start_success, start_msg
54
55    await job.monitor_logs_async(
56        callback_function=callback_function,
57        stop_on_exit=True,
58        strip_timestamps=True,
59    )
60
61    success, msg = job.result
62    job.delete()
63    return success, msg

Execute an action as a temporary remote job.

def do_action_legacy( self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
 66def do_action_legacy(
 67    self,
 68    action: Optional[List[str]] = None,
 69    sysargs: Optional[List[str]] = None,
 70    debug: bool = False,
 71    **kw
 72) -> SuccessTuple:
 73    """
 74    NOTE: This method is deprecated.
 75    Please use `do_action()` or `do_action_async()`.
 76
 77    Execute a Meerschaum action remotely.
 78
 79    If `sysargs` are provided, parse those instead.
 80    Otherwise infer everything from keyword arguments.
 81
 82    Examples
 83    --------
 84    >>> conn = mrsm.get_connector('api:main')
 85    >>> conn.do_action(['show', 'pipes'])
 86    (True, "Success")
 87    >>> conn.do_action(['show', 'arguments'], name='test')
 88    (True, "Success")
 89    """
 90    import sys, json
 91    from meerschaum.utils.debug import dprint
 92    from meerschaum.config.static import STATIC_CONFIG
 93    from meerschaum.utils.misc import json_serialize_datetime
 94    if action is None:
 95        action = []
 96
 97    if sysargs is not None and action and action[0] == '':
 98        from meerschaum._internal.arguments import parse_arguments
 99        if debug:
100            dprint(f"Parsing sysargs:\n{sysargs}")
101        json_dict = parse_arguments(sysargs)
102    else:
103        json_dict = kw
104        json_dict['action'] = action
105        if 'noask' not in kw:
106            json_dict['noask'] = True
107        if 'yes' not in kw:
108            json_dict['yes'] = True
109        if debug:
110            json_dict['debug'] = debug
111
112    root_action = json_dict['action'][0]
113    del json_dict['action'][0]
114    r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}"
115    
116    if debug:
117        from meerschaum.utils.formatting import pprint
118        dprint(f"Sending data to '{self.url + r_url}':")
119        pprint(json_dict, stream=sys.stderr)
120
121    response = self.post(
122        r_url,
123        data = json.dumps(json_dict, default=json_serialize_datetime),
124        debug = debug,
125    )
126    try:
127        response_list = json.loads(response.text)
128        if isinstance(response_list, dict) and 'detail' in response_list:
129            return False, response_list['detail']
130    except Exception as e:
131        print(f"Invalid response: {response}")
132        print(e)
133        return False, response.text
134    if debug:
135        dprint(response)
136    try:
137        return response_list[0], response_list[1]
138    except Exception as e:
139        return False, f"Failed to parse result from action '{root_action}'"

NOTE: This method is deprecated. Please use do_action() or do_action_async().

Execute a Meerschaum action remotely.

If sysargs are provided, parse those instead. Otherwise infer everything from keyword arguments.

Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
def get_mrsm_version(self, **kw) -> Optional[str]:
13def get_mrsm_version(self, **kw) -> Optional[str]:
14    """
15    Return the Meerschaum version of the API instance.
16    """
17    from meerschaum.config.static import STATIC_CONFIG
18    try:
19        j = self.get(
20            STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm',
21            use_token=False,
22            **kw
23        ).json()
24    except Exception as e:
25        return None
26    if isinstance(j, dict) and 'detail' in j:
27        return None
28    return j

Return the Meerschaum version of the API instance.

def get_chaining_status(self, **kw) -> Optional[bool]:
30def get_chaining_status(self, **kw) -> Optional[bool]:
31    """
32    Fetch the chaining status of the API instance.
33    """
34    from meerschaum.config.static import STATIC_CONFIG
35    try:
36        response = self.get(
37            STATIC_CONFIG['api']['endpoints']['chaining'],
38            use_token = True,
39            **kw
40        )
41        if not response:
42            return None
43    except Exception as e:
44        return None
45
46    return response.json()

Fetch the chaining status of the API instance.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
34def register_pipe(
35    self,
36    pipe: mrsm.Pipe,
37    debug: bool = False
38) -> SuccessTuple:
39    """Submit a POST to the API to register a new Pipe object.
40    Returns a tuple of (success_bool, response_dict).
41    """
42    from meerschaum.utils.debug import dprint
43    from meerschaum.config.static import STATIC_CONFIG
44    ### NOTE: if `parameters` is supplied in the Pipe constructor,
45    ###       then `pipe.parameters` will exist and not be fetched from the database.
46    r_url = pipe_r_url(pipe)
47    response = self.post(
48        r_url + '/register',
49        json = pipe.parameters,
50        debug = debug,
51    )
52    if debug:
53        dprint(response.text)
54    if isinstance(response.json(), list):
55        response_tuple = response.__bool__(), response.json()[1]
56    elif 'detail' in response.json():
57        response_tuple = response.__bool__(), response.json()['detail']
58    else:
59        response_tuple = response.__bool__(), response.text
60    return response_tuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> List[Tuple[str, str, Optional[str]]]:
 93def fetch_pipes_keys(
 94    self,
 95    connector_keys: Optional[List[str]] = None,
 96    metric_keys: Optional[List[str]] = None,
 97    location_keys: Optional[List[str]] = None,
 98    tags: Optional[List[str]] = None,
 99    params: Optional[Dict[str, Any]] = None,
100    debug: bool = False
101) -> Union[List[Tuple[str, str, Union[str, None]]]]:
102    """
103    Fetch registered Pipes' keys from the API.
104    
105    Parameters
106    ----------
107    connector_keys: Optional[List[str]], default None
108        The connector keys for the query.
109
110    metric_keys: Optional[List[str]], default None
111        The metric keys for the query.
112
113    location_keys: Optional[List[str]], default None
114        The location keys for the query.
115
116    tags: Optional[List[str]], default None
117        A list of tags for the query.
118
119    params: Optional[Dict[str, Any]], default None
120        A parameters dictionary for filtering against the `pipes` table
121        (e.g. `{'connector_keys': 'plugin:foo'}`).
122        Not recommeded to be used.
123
124    debug: bool, default False
125        Verbosity toggle.
126
127    Returns
128    -------
129    A list of tuples containing pipes' keys.
130    """
131    from meerschaum.config.static import STATIC_CONFIG
132    if connector_keys is None:
133        connector_keys = []
134    if metric_keys is None:
135        metric_keys = []
136    if location_keys is None:
137        location_keys = []
138    if tags is None:
139        tags = []
140
141    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
142    try:
143        j = self.get(
144            r_url,
145            params = {
146                'connector_keys': json.dumps(connector_keys),
147                'metric_keys': json.dumps(metric_keys),
148                'location_keys': json.dumps(location_keys),
149                'tags': json.dumps(tags),
150                'params': json.dumps(params),
151            },
152            debug=debug
153        ).json()
154    except Exception as e:
155        error(str(e))
156
157    if 'detail' in j:
158        error(j['detail'], stack=False)
159    return [tuple(r) for r in j]

Fetch registered Pipes' keys from the API.

Parameters
  • connector_keys (Optional[List[str]], default None): The connector keys for the query.
  • metric_keys (Optional[List[str]], default None): The metric keys for the query.
  • location_keys (Optional[List[str]], default None): The location keys for the query.
  • tags (Optional[List[str]], default None): A list of tags for the query.
  • params (Optional[Dict[str, Any]], default None): A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of tuples containing pipes' keys.
def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False) -> Tuple[bool, str]:
63def edit_pipe(
64    self,
65    pipe: mrsm.Pipe,
66    patch: bool = False,
67    debug: bool = False,
68) -> SuccessTuple:
69    """Submit a PATCH to the API to edit an existing Pipe object.
70    Returns a tuple of (success_bool, response_dict).
71    """
72    from meerschaum.utils.debug import dprint
73    ### NOTE: if `parameters` is supplied in the Pipe constructor,
74    ###       then `pipe.parameters` will exist and not be fetched from the database.
75    r_url = pipe_r_url(pipe)
76    response = self.patch(
77        r_url + '/edit',
78        params = {'patch': patch,},
79        json = pipe.parameters,
80        debug = debug,
81    )
82    if debug:
83        dprint(response.text)
84    if isinstance(response.json(), list):
85        response_tuple = response.__bool__(), response.json()[1]
86    elif 'detail' in response.json():
87        response_tuple = response.__bool__(), response.json()['detail']
88    else:
89        response_tuple = response.__bool__(), response.text
90    return response_tuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

def sync_pipe( self, pipe: meerschaum.Pipe, df: "Optional[Union['pd.DataFrame', Dict[Any, Any], str]]" = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
162def sync_pipe(
163    self,
164    pipe: mrsm.Pipe,
165    df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None,
166    chunksize: Optional[int] = -1,
167    debug: bool = False,
168    **kw: Any
169) -> SuccessTuple:
170    """Sync a DataFrame into a Pipe."""
171    from decimal import Decimal
172    from meerschaum.utils.debug import dprint
173    from meerschaum.utils.misc import json_serialize_datetime, items_str
174    from meerschaum.config import get_config
175    from meerschaum.utils.packages import attempt_import
176    from meerschaum.utils.dataframe import get_numeric_cols, to_json
177    begin = time.time()
178    more_itertools = attempt_import('more_itertools')
179    if df is None:
180        msg = f"DataFrame is `None`. Cannot sync {pipe}."
181        return False, msg
182
183    def get_json_str(c):
184        ### allow syncing dict or JSON without needing to import pandas (for IOT devices)
185        if isinstance(c, (dict, list)):
186            return json.dumps(c, default=json_serialize_datetime)
187        return to_json(c, orient='columns')
188
189    df = json.loads(df) if isinstance(df, str) else df
190
191    _chunksize: Optional[int] = (1 if chunksize is None else (
192        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
193        else chunksize
194    ))
195    keys: List[str] = list(df.columns)
196    chunks = []
197    if hasattr(df, 'index'):
198        df = df.reset_index(drop=True)
199        is_dask = 'dask' in df.__module__
200        chunks = (
201            (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize))
202            if not is_dask
203            else [partition.compute() for partition in df.partitions]
204        )
205
206        numeric_cols = get_numeric_cols(df)
207        if numeric_cols:
208            for col in numeric_cols:
209                df[col] = df[col].apply(lambda x: f'{x:f}' if isinstance(x, Decimal) else x)
210            pipe_dtypes = pipe.dtypes
211            new_numeric_cols = [
212                col
213                for col in numeric_cols
214                if pipe_dtypes.get(col, None) != 'numeric'
215            ]
216            pipe.dtypes.update({
217                col: 'numeric'
218                for col in new_numeric_cols
219            })
220            edit_success, edit_msg = pipe.edit(debug=debug)
221            if not edit_success:
222                warn(
223                    "Failed to update new numeric columns "
224                    + f"{items_str(new_numeric_cols)}:\n{edit_msg}"
225                )
226    elif isinstance(df, dict):
227        ### `_chunks` is a dict of lists of dicts.
228        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
229        _chunks = {k: [] for k in keys}
230        for k in keys:
231            chunk_iter = more_itertools.chunked(df[k], _chunksize)
232            for l in chunk_iter:
233                _chunks[k].append({k: l})
234
235        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
236        for k, l in _chunks.items():
237            for i, c in enumerate(l):
238                try:
239                    chunks[i].update(c)
240                except IndexError:
241                    chunks.append(c)
242    elif isinstance(df, list):
243        chunks = (df[i] for i in more_itertools.chunked(df, _chunksize))
244
245    ### Send columns in case the user has defined them locally.
246    if pipe.columns:
247        kw['columns'] = json.dumps(pipe.columns)
248    r_url = pipe_r_url(pipe) + '/data'
249
250    rowcount = 0
251    num_success_chunks = 0
252    for i, c in enumerate(chunks):
253        if debug:
254            dprint(f"[{self}] Posting chunk {i} to {r_url}...")
255        if len(c) == 0:
256            if debug:
257                dprint(f"[{self}] Skipping empty chunk...")
258            continue
259        json_str = get_json_str(c)
260
261        try:
262            response = self.post(
263                r_url,
264                ### handles check_existing
265                params = kw,
266                data = json_str,
267                debug = debug
268            )
269        except Exception as e:
270            msg = f"Failed to post a chunk to {pipe}:\n{e}"
271            warn(msg)
272            return False, msg
273            
274        if not response:
275            return False, f"Failed to sync a chunk:\n{response.text}"
276
277        try:
278            j = json.loads(response.text)
279        except Exception as e:
280            return False, f"Failed to parse response from syncing {pipe}:\n{e}"
281
282        if isinstance(j, dict) and 'detail' in j:
283            return False, j['detail']
284
285        try:
286            j = tuple(j)
287        except Exception as e:
288            return False, response.text
289
290        if debug:
291            dprint("Received response: " + str(j))
292        if not j[0]:
293            return j
294
295        rowcount += len(c)
296        num_success_chunks += 1
297
298    success_tuple = True, (
299        f"It took {round(time.time() - begin, 2)} seconds to sync {rowcount} row"
300        + ('s' if rowcount != 1 else '')
301        + f" across {num_success_chunks} chunk" + ('s' if num_success_chunks != 1 else '') +
302        f" to {pipe}."
303    )
304    return success_tuple

Sync a DataFrame into a Pipe.

def delete_pipe( self, pipe: Optional[meerschaum.Pipe] = None, debug: bool = None) -> Tuple[bool, str]:
307def delete_pipe(
308    self,
309    pipe: Optional[meerschaum.Pipe] = None,
310    debug: bool = None,        
311) -> SuccessTuple:
312    """Delete a Pipe and drop its table."""
313    if pipe is None:
314        error(f"Pipe cannot be None.")
315    r_url = pipe_r_url(pipe)
316    response = self.delete(
317        r_url + '/delete',
318        debug = debug,
319    )
320    if debug:
321        dprint(response.text)
322    if isinstance(response.json(), list):
323        response_tuple = response.__bool__(), response.json()[1]
324    elif 'detail' in response.json():
325        response_tuple = response.__bool__(), response.json()['detail']
326    else:
327        response_tuple = response.__bool__(), response.text
328    return response_tuple

Delete a Pipe and drop its table.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
331def get_pipe_data(
332    self,
333    pipe: meerschaum.Pipe,
334    select_columns: Optional[List[str]] = None,
335    omit_columns: Optional[List[str]] = None,
336    begin: Union[str, datetime, int, None] = None,
337    end: Union[str, datetime, int, None] = None,
338    params: Optional[Dict[str, Any]] = None,
339    as_chunks: bool = False,
340    debug: bool = False,
341    **kw: Any
342) -> Union[pandas.DataFrame, None]:
343    """Fetch data from the API."""
344    r_url = pipe_r_url(pipe)
345    chunks_list = []
346    while True:
347        try:
348            response = self.get(
349                r_url + "/data",
350                params={
351                    'select_columns': json.dumps(select_columns),
352                    'omit_columns': json.dumps(omit_columns),
353                    'begin': begin,
354                    'end': end,
355                    'params': json.dumps(params, default=str)
356                },
357                debug=debug
358            )
359            if not response.ok:
360                return None
361            j = response.json()
362        except Exception as e:
363            warn(f"Failed to get data for {pipe}:\n{e}")
364            return None
365        if isinstance(j, dict) and 'detail' in j:
366            return False, j['detail']
367        break
368
369    from meerschaum.utils.packages import import_pandas
370    from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df
371    pd = import_pandas()
372    try:
373        df = pd.read_json(StringIO(response.text))
374    except Exception as e:
375        warn(f"Failed to parse response for {pipe}:\n{e}")
376        return None
377
378    if len(df.columns) == 0:
379        return add_missing_cols_to_df(df, pipe.dtypes)
380
381    df = parse_df_datetimes(
382        df,
383        ignore_cols = [
384            col
385            for col, dtype in pipe.dtypes.items()
386            if 'datetime' not in str(dtype)
387        ],
388        debug = debug,
389    )
390    return df

Fetch data from the API.

def get_pipe_id(self, pipe: 'meerschuam.Pipe', debug: bool = False) -> int:
393def get_pipe_id(
394    self,
395    pipe: meerschuam.Pipe,
396    debug: bool = False,
397) -> int:
398    """Get a Pipe's ID from the API."""
399    from meerschaum.utils.misc import is_int
400    r_url = pipe_r_url(pipe)
401    response = self.get(
402        r_url + '/id',
403        debug = debug
404    )
405    if debug:
406        dprint(f"Got pipe ID: {response.text}")
407    try:
408        if is_int(response.text):
409            return int(response.text)
410    except Exception as e:
411        warn(f"Failed to get the ID for {pipe}:\n{e}")
412    return None

Get a Pipe's ID from the API.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
415def get_pipe_attributes(
416    self,
417    pipe: meerschaum.Pipe,
418    debug: bool = False,
419) -> Dict[str, Any]:
420    """Get a Pipe's attributes from the API
421
422    Parameters
423    ----------
424    pipe: meerschaum.Pipe
425        The pipe whose attributes we are fetching.
426        
427    Returns
428    -------
429    A dictionary of a pipe's attributes.
430    If the pipe does not exist, return an empty dictionary.
431    """
432    r_url = pipe_r_url(pipe)
433    response = self.get(r_url + '/attributes', debug=debug)
434    try:
435        return json.loads(response.text)
436    except Exception as e:
437        warn(f"Failed to get the attributes for {pipe}:\n{e}")
438    return {}

Get a Pipe's attributes from the API

Parameters
Returns
  • A dictionary of a pipe's attributes.
  • If the pipe does not exist, return an empty dictionary.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
441def get_sync_time(
442    self,
443    pipe: 'meerschaum.Pipe',
444    params: Optional[Dict[str, Any]] = None,
445    newest: bool = True,
446    debug: bool = False,
447) -> Union[datetime, int, None]:
448    """Get a Pipe's most recent datetime value from the API.
449
450    Parameters
451    ----------
452    pipe: meerschaum.Pipe
453        The pipe to select from.
454
455    params: Optional[Dict[str, Any]], default None
456        Optional params dictionary to build the WHERE clause.
457
458    newest: bool, default True
459        If `True`, get the most recent datetime (honoring `params`).
460        If `False`, get the oldest datetime (ASC instead of DESC).
461
462    Returns
463    -------
464    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
465    rounded down to the closest minute.
466    """
467    from meerschaum.utils.misc import is_int
468    from meerschaum.utils.warnings import warn
469    r_url = pipe_r_url(pipe)
470    response = self.get(
471        r_url + '/sync_time',
472        json = params,
473        params = {'newest': newest, 'debug': debug},
474        debug = debug,
475    )
476    if not response:
477        warn(f"Failed to get the sync time for {pipe}:\n" + response.text)
478        return None
479
480    j = response.json()
481    if j is None:
482        dt = None
483    else:
484        try:
485            dt = (
486                datetime.fromisoformat(j)
487                if not is_int(j)
488                else int(j)
489            )
490        except Exception as e:
491            warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}")
492            dt = None
493    return dt

Get a Pipe's most recent datetime value from the API.

Parameters
  • pipe (meerschaum.Pipe): The pipe to select from.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
Returns
  • The most recent (or oldest if newest is False) datetime of a pipe,
  • rounded down to the closest minute.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
496def pipe_exists(
497    self,
498    pipe: mrsm.Pipe,
499    debug: bool = False
500) -> bool:
501    """Check the API to see if a Pipe exists.
502
503    Parameters
504    ----------
505    pipe: 'meerschaum.Pipe'
506        The pipe which were are querying.
507        
508    Returns
509    -------
510    A bool indicating whether a pipe's underlying table exists.
511    """
512    from meerschaum.utils.debug import dprint
513    from meerschaum.utils.warnings import warn
514    r_url = pipe_r_url(pipe)
515    response = self.get(r_url + '/exists', debug=debug)
516    if not response:
517        warn(f"Failed to check if {pipe} exists:\n{response.text}")
518        return False
519    if debug:
520        dprint("Received response: " + str(response.text))
521    j = response.json()
522    if isinstance(j, dict) and 'detail' in j:
523        warn(j['detail'])
524    return j

Check the API to see if a Pipe exists.

Parameters
Returns
  • A bool indicating whether a pipe's underlying table exists.
def create_metadata(self, debug: bool = False) -> bool:
527def create_metadata(
528    self,
529    debug: bool = False
530) -> bool:
531    """Create metadata tables.
532
533    Returns
534    -------
535    A bool indicating success.
536    """
537    from meerschaum.utils.debug import dprint
538    from meerschaum.config.static import STATIC_CONFIG
539    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
540    response = self.post(r_url, debug=debug)
541    if debug:
542        dprint("Create metadata response: {response.text}")
543    try:
544        metadata_response = json.loads(response.text)
545    except Exception as e:
546        warn(f"Failed to create metadata on {self}:\n{e}")
547        metadata_response = False
548    return False

Create metadata tables.

Returns
  • A bool indicating success.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
551def get_pipe_rowcount(
552    self,
553    pipe: mrsm.Pipe,
554    begin: Optional[datetime] = None,
555    end: Optional[datetime] = None,
556    params: Optional[Dict[str, Any]] = None,
557    remote: bool = False,
558    debug: bool = False,
559) -> int:
560    """Get a pipe's row count from the API.
561
562    Parameters
563    ----------
564    pipe: 'meerschaum.Pipe':
565        The pipe whose row count we are counting.
566        
567    begin: Optional[datetime], default None
568        If provided, bound the count by this datetime.
569
570    end: Optional[datetime]
571        If provided, bound the count by this datetime.
572
573    params: Optional[Dict[str, Any]], default None
574        If provided, bound the count by these parameters.
575
576    remote: bool, default False
577
578    Returns
579    -------
580    The number of rows in the pipe's table, bound the given parameters.
581    If the table does not exist, return 0.
582    """
583    r_url = pipe_r_url(pipe)
584    response = self.get(
585        r_url + "/rowcount",
586        json = params,
587        params = {
588            'begin': begin,
589            'end': end,
590            'remote': remote,
591        },
592        debug = debug
593    )
594    if not response:
595        warn(f"Failed to get the rowcount for {pipe}:\n{response.text}")
596        return 0
597    try:
598        return int(json.loads(response.text))
599    except Exception as e:
600        warn(f"Failed to get the rowcount for {pipe}:\n{e}")
601    return 0

Get a pipe's row count from the API.

Parameters
  • pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
  • begin (Optional[datetime], default None): If provided, bound the count by this datetime.
  • end (Optional[datetime]): If provided, bound the count by this datetime.
  • params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
  • remote (bool, default False):
Returns
  • The number of rows in the pipe's table, bound the given parameters.
  • If the table does not exist, return 0.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
604def drop_pipe(
605    self,
606    pipe: mrsm.Pipe,
607    debug: bool = False
608) -> SuccessTuple:
609    """
610    Drop a pipe's table but maintain its registration.
611
612    Parameters
613    ----------
614    pipe: meerschaum.Pipe:
615        The pipe to be dropped.
616        
617    Returns
618    -------
619    A success tuple (bool, str).
620    """
621    from meerschaum.utils.warnings import error
622    from meerschaum.utils.debug import dprint
623    if pipe is None:
624        error(f"Pipe cannot be None.")
625    r_url = pipe_r_url(pipe)
626    response = self.delete(
627        r_url + '/drop',
628        debug = debug,
629    )
630    if debug:
631        dprint(response.text)
632
633    try:
634        data = response.json()
635    except Exception as e:
636        return False, f"Failed to drop {pipe}."
637
638    if isinstance(data, list):
639        response_tuple = response.__bool__(), data[1]
640    elif 'detail' in response.json():
641        response_tuple = response.__bool__(), data['detail']
642    else:
643        response_tuple = response.__bool__(), response.text
644
645    return response_tuple

Drop a pipe's table but maintain its registration.

Parameters
Returns
  • A success tuple (bool, str).
def clear_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw) -> Tuple[bool, str]:
648def clear_pipe(
649    self,
650    pipe: mrsm.Pipe,
651    debug: bool = False,
652    **kw
653) -> SuccessTuple:
654    """
655    Delete rows in a pipe's table.
656
657    Parameters
658    ----------
659    pipe: meerschaum.Pipe
660        The pipe with rows to be deleted.
661        
662    Returns
663    -------
664    A success tuple.
665    """
666    kw.pop('metric_keys', None)
667    kw.pop('connector_keys', None)
668    kw.pop('location_keys', None)
669    kw.pop('action', None)
670    kw.pop('force', None)
671    return self.do_action_legacy(
672        ['clear', 'pipes'],
673        connector_keys = pipe.connector_keys,
674        metric_keys = pipe.metric_key,
675        location_keys = pipe.location_key,
676        force = True,
677        debug = debug,
678        **kw
679    )

Delete rows in a pipe's table.

Parameters
Returns
  • A success tuple.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
682def get_pipe_columns_types(
683    self,
684    pipe: mrsm.Pipe,
685    debug: bool = False,
686) -> Union[Dict[str, str], None]:
687    """
688    Fetch the columns and types of the pipe's table.
689
690    Parameters
691    ----------
692    pipe: meerschaum.Pipe
693        The pipe whose columns to be queried.
694        
695    Returns
696    -------
697    A dictionary mapping column names to their database types.
698
699    Examples
700    --------
701    >>> {
702    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
703    ...   'id': 'BIGINT',
704    ...   'val': 'DOUBLE PRECISION',
705    ... }
706    >>>
707    """
708    r_url = pipe_r_url(pipe) + '/columns/types'
709    response = self.get(
710        r_url,
711        debug = debug
712    )
713    j = response.json()
714    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
715        from meerschaum.utils.warnings import warn
716        warn(j['detail'])
717        return None
718    if not isinstance(j, dict):
719        warn(response.text)
720        return None
721    return j

Fetch the columns and types of the pipe's table.

Parameters
Returns
  • A dictionary mapping column names to their database types.
Examples
>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, str, int] = '', end: Union[datetime.datetime, int] = None, params: 'Optional[Dict, Any]' = None, debug: bool = False, **kw: Any) -> Iterator[pandas.core.frame.DataFrame]:
16def fetch(
17        self,
18        pipe: mrsm.Pipe,
19        begin: Union[datetime, str, int] = '',
20        end: Union[datetime, int] = None,
21        params: Optional[Dict, Any] = None,
22        debug: bool = False,
23        **kw: Any
24    ) -> Iterator['pd.DataFrame']:
25    """Get the Pipe data from the remote Pipe."""
26    from meerschaum.utils.debug import dprint
27    from meerschaum.utils.warnings import warn, error
28    from meerschaum.config._patch import apply_patch_to_config
29
30    fetch_params = pipe.parameters.get('fetch', {})
31    if not fetch_params:
32        warn(f"Missing 'fetch' parameters for {pipe}.", stack=False)
33        return None
34
35    pipe_meta = fetch_params.get('pipe', {})
36    ### Legacy: check for `connector_keys`, etc. at the root.
37    if not pipe_meta:
38        ck, mk, lk = (
39            fetch_params.get('connector_keys', None),
40            fetch_params.get('metric_key', None),
41            fetch_params.get('location_key', None),
42        )
43        if not ck or not mk:
44            warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False)
45            return None
46
47        pipe_meta.update({
48            'connector': ck,
49            'metric': mk,
50            'location': lk,
51        })
52
53    pipe_meta['instance'] = self
54    source_pipe = mrsm.Pipe(**pipe_meta)
55
56    _params = copy.deepcopy(params) if params is not None else {}
57    _params = apply_patch_to_config(_params, fetch_params.get('params', {}))
58    select_columns = fetch_params.get('select_columns', [])
59    omit_columns = fetch_params.get('omit_columns', [])
60
61    return source_pipe.get_data(
62        select_columns = select_columns,
63        omit_columns = omit_columns,
64        begin = begin,
65        end = end,
66        params = _params,
67        debug = debug,
68        as_iterator = True,
69    )

Get the Pipe data from the remote Pipe.

def register_plugin( self, plugin: meerschaum.Plugin, make_archive: bool = True, debug: bool = False) -> Tuple[bool, str]:
20def register_plugin(
21        self,
22        plugin: meerschaum.core.Plugin,
23        make_archive: bool = True,
24        debug: bool = False,
25    ) -> SuccessTuple:
26    """Register a plugin and upload its archive."""
27    import json
28    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
29    file_pointer = open(archive_path, 'rb')
30    files = {'archive': file_pointer}
31    metadata = {
32        'version': plugin.version,
33        'attributes': json.dumps(plugin.attributes),
34    }
35    r_url = plugin_r_url(plugin)
36    try:
37        response = self.post(r_url, files=files, params=metadata, debug=debug)
38    except Exception as e:
39        return False, f"Failed to register plugin '{plugin}'."
40    finally:
41        file_pointer.close()
42
43    try:
44        success, msg = json.loads(response.text)
45    except Exception as e:
46        return False, response.text
47
48    return success, msg

Register a plugin and upload its archive.

def install_plugin( self, name: str, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
50def install_plugin(
51        self,
52        name: str,
53        skip_deps: bool = False,
54        force: bool = False,
55        debug: bool = False
56    ) -> SuccessTuple:
57    """Download and attempt to install a plugin from the API."""
58    import os, pathlib, json
59    from meerschaum.core import Plugin
60    from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH
61    from meerschaum.utils.debug import dprint
62    from meerschaum.utils.packages import attempt_import
63    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
64    r_url = plugin_r_url(name)
65    dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
66    if debug:
67        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
68    archive_path = self.wget(r_url, dest, debug=debug) 
69    is_binary = binaryornot_check.is_binary(str(archive_path))
70    if not is_binary:
71        fail_msg = f"Failed to download binary for plugin '{name}'."
72        try:
73            with open(archive_path, 'r') as f:
74                j = json.load(f)
75            if isinstance(j, list):
76                success, msg = tuple(j)
77            elif isinstance(j, dict) and 'detail' in j:
78                success, msg = False, fail_msg
79        except Exception as e:
80            success, msg = False, fail_msg
81        return success, msg
82    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
83    return plugin.install(skip_deps=skip_deps, force=force, debug=debug)

Download and attempt to install a plugin from the API.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False) -> Tuple[bool, str]:
149def delete_plugin(
150        self,
151        plugin: meerschaum.core.Plugin,
152        debug: bool = False
153    ) -> SuccessTuple:
154    """Delete a plugin from an API repository."""
155    import json
156    r_url = plugin_r_url(plugin)
157    try:
158        response = self.delete(r_url, debug=debug)
159    except Exception as e:
160        return False, f"Failed to delete plugin '{plugin}'."
161
162    try:
163        success, msg = json.loads(response.text)
164    except Exception as e:
165        return False, response.text
166
167    return success, msg

Delete a plugin from an API repository.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) -> Sequence[str]:
 85def get_plugins(
 86        self,
 87        user_id : Optional[int] = None,
 88        search_term : Optional[str] = None,
 89        debug : bool = False
 90    ) -> Sequence[str]:
 91    """Return a list of registered plugin names.
 92
 93    Parameters
 94    ----------
 95    user_id :
 96        If specified, return all plugins from a certain user.
 97    user_id : Optional[int] :
 98         (Default value = None)
 99    search_term : Optional[str] :
100         (Default value = None)
101    debug : bool :
102         (Default value = False)
103
104    Returns
105    -------
106
107    """
108    import json
109    from meerschaum.utils.warnings import warn, error
110    from meerschaum.config.static import STATIC_CONFIG
111    response = self.get(
112        STATIC_CONFIG['api']['endpoints']['plugins'],
113        params = {'user_id' : user_id, 'search_term' : search_term},
114        use_token = True,
115        debug = debug
116    )
117    if not response:
118        return []
119    plugins = json.loads(response.text)
120    if not isinstance(plugins, list):
121        error(response.text)
122    return plugins

Return a list of registered plugin names.

Parameters
  • user_id :: If specified, return all plugins from a certain user.
  • user_id (Optional[int] :): (Default value = None)
  • search_term (Optional[str] :): (Default value = None)
  • debug (bool :): (Default value = False)
  • Returns
  • -------
def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Mapping[str, Any]:
124def get_plugin_attributes(
125        self,
126        plugin: meerschaum.core.Plugin,
127        debug: bool = False
128    ) -> Mapping[str, Any]:
129    """
130    Return a plugin's attributes.
131    """
132    import json
133    from meerschaum.utils.warnings import warn, error
134    r_url = plugin_r_url(plugin) + '/attributes'
135    response = self.get(r_url, use_token=True, debug=debug)
136    attributes = response.json()
137    if isinstance(attributes, str) and attributes and attributes[0] == '{':
138        try:
139            attributes = json.loads(attributes)
140        except Exception as e:
141            pass
142    if not isinstance(attributes, dict):
143        error(response.text)
144    elif not response and 'detail' in attributes:
145        warn(attributes['detail'])
146        return {}
147    return attributes

Return a plugin's attributes.

def login( self, debug: bool = False, warn: bool = True, **kw: Any) -> Tuple[bool, str]:
13def login(
14        self,
15        debug: bool = False,
16        warn: bool = True,
17        **kw: Any
18    ) -> SuccessTuple:
19    """Log in and set the session token."""
20    from meerschaum.utils.warnings import warn as _warn, info, error
21    from meerschaum.core import User
22    from meerschaum.config.static import STATIC_CONFIG
23    import json, datetime
24    try:
25        login_data = {
26            'username': self.username,
27            'password': self.password,
28        }
29    except AttributeError:
30        return False, f"Please login with the command `login {self}`."
31    response = self.post(
32        STATIC_CONFIG['api']['endpoints']['login'],
33        data = login_data,
34        use_token = False,
35        debug = debug
36    )
37    if response:
38        msg = f"Successfully logged into '{self}' as user '{login_data['username']}'."
39        self._token = json.loads(response.text)['access_token']
40        self._expires = datetime.datetime.strptime(
41            json.loads(response.text)['expires'], 
42            '%Y-%m-%dT%H:%M:%S.%f'
43        )
44    else:
45        msg = (
46            f"Failed to log into '{self}' as user '{login_data['username']}'.\n" +
47            f"    Please verify login details for connector '{self}'."
48        )
49        if warn:
50            _warn(msg, stack=False)
51
52    return response.__bool__(), msg

Log in and set the session token.

def test_connection(self, **kw: Any) -> Optional[bool]:
55def test_connection(
56        self,
57        **kw: Any
58    ) -> Union[bool, None]:
59    """Test if a successful connection to the API may be made."""
60    from meerschaum.connectors.poll import retry_connect
61    _default_kw = {
62        'max_retries': 1, 'retry_wait': 0, 'warn': False,
63        'connector': self, 'enforce_chaining': False,
64        'enforce_login': False,
65    }
66    _default_kw.update(kw)
67    try:
68        return retry_connect(**_default_kw)
69    except Exception as e:
70        return False

Test if a successful connection to the API may be made.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
65def register_user(
66        self,
67        user: 'meerschaum.core.User',
68        debug: bool = False,
69        **kw: Any
70    ) -> SuccessTuple:
71    """Register a new user."""
72    import json
73    from meerschaum.config.static import STATIC_CONFIG
74    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register"
75    data = {
76        'username': user.username,
77        'password': user.password,
78        'attributes': json.dumps(user.attributes),
79    }
80    if user.type:
81        data['type'] = user.type
82    if user.email:
83        data['email'] = user.email
84    response = self.post(r_url, data=data, debug=debug)
85    try:
86        _json = json.loads(response.text)
87        if isinstance(_json, dict) and 'detail' in _json:
88            return False, _json['detail']
89        success_tuple = tuple(_json)
90    except Exception:
91        msg = response.text if response else f"Failed to register user '{user}'."
92        return False, msg
93
94    return tuple(success_tuple)

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[int]:
 97def get_user_id(
 98        self,
 99        user: 'meerschaum.core.User',
100        debug: bool = False,
101        **kw: Any
102    ) -> Optional[int]:
103    """Get a user's ID."""
104    from meerschaum.config.static import STATIC_CONFIG
105    import json
106    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id"
107    response = self.get(r_url, debug=debug, **kw)
108    try:
109        user_id = int(json.loads(response.text))
110    except Exception as e:
111        user_id = None
112    return user_id

Get a user's ID.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
13def get_users(
14        self,
15        debug: bool = False,
16        **kw : Any
17    ) -> List[str]:
18    """
19    Return a list of registered usernames.
20    """
21    from meerschaum.config.static import STATIC_CONFIG
22    import json
23    response = self.get(
24        f"{STATIC_CONFIG['api']['endpoints']['users']}",
25        debug = debug,
26        use_token = True,
27    )
28    if not response:
29        return []
30    try:
31        return response.json()
32    except Exception as e:
33        return []

Return a list of registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
35def edit_user(
36        self,
37        user: 'meerschaum.core.User',
38        debug: bool = False,
39        **kw: Any
40    ) -> SuccessTuple:
41    """Edit an existing user."""
42    import json
43    from meerschaum.config.static import STATIC_CONFIG
44    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
45    data = {
46        'username': user.username,
47        'password': user.password,
48        'type': user.type,
49        'email': user.email,
50        'attributes': json.dumps(user.attributes),
51    }
52    response = self.post(r_url, data=data, debug=debug)
53    try:
54        _json = json.loads(response.text)
55        if isinstance(_json, dict) and 'detail' in _json:
56            return False, _json['detail']
57        success_tuple = tuple(_json)
58    except Exception as e:
59        msg = response.text if response else f"Failed to edit user '{user}'."
60        return False, msg
61
62    return tuple(success_tuple)

Edit an existing user.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
114def delete_user(
115        self,
116        user: 'meerschaum.core.User',
117        debug: bool = False,
118        **kw: Any
119    ) -> SuccessTuple:
120    """Delete a user."""
121    from meerschaum.config.static import STATIC_CONFIG
122    import json
123    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}"
124    response = self.delete(r_url, debug=debug)
125    try:
126        _json = json.loads(response.text)
127        if isinstance(_json, dict) and 'detail' in _json:
128            return False, _json['detail']
129        success_tuple = tuple(_json)
130    except Exception as e:
131        success_tuple = False, f"Failed to delete user '{user.username}'."
132    return success_tuple

Delete a user.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
155def get_user_password_hash(
156        self,
157        user: 'meerschaum.core.User',
158        debug: bool = False,
159        **kw: Any
160    ) -> Optional[str]:
161    """If configured, get a user's password hash."""
162    from meerschaum.config.static import STATIC_CONFIG
163    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
164    response = self.get(r_url, debug=debug, **kw)
165    if not response:
166        return None
167    return response.json()

If configured, get a user's password hash.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
169def get_user_type(
170        self,
171        user: 'meerschaum.core.User',
172        debug: bool = False,
173        **kw: Any
174    ) -> Optional[str]:
175    """If configured, get a user's type."""
176    from meerschaum.config.static import STATIC_CONFIG
177    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type'
178    response = self.get(r_url, debug=debug, **kw)
179    if not response:
180        return None
181    return response.json()

If configured, get a user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw) -> int:
134def get_user_attributes(
135        self,
136        user: 'meerschaum.core.User',
137        debug: bool = False,
138        **kw
139    ) -> int:
140    """Get a user's attributes."""
141    from meerschaum.config.static import STATIC_CONFIG
142    import json
143    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes"
144    response = self.get(r_url, debug=debug, **kw)
145    try:
146        attributes = json.loads(response.text)
147    except Exception as e:
148        attributes = None
149    return attributes

Get a user's attributes.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[APIConnector, Dict[str, Union[str, int]]]:
13@classmethod
14def from_uri(
15    cls,
16    uri: str,
17    label: Optional[str] = None,
18    as_dict: bool = False,
19) -> Union[
20        'meerschaum.connectors.APIConnector',
21        Dict[str, Union[str, int]],
22    ]:
23    """
24    Create a new APIConnector from a URI string.
25
26    Parameters
27    ----------
28    uri: str
29        The URI connection string.
30
31    label: Optional[str], default None
32        If provided, use this as the connector label.
33        Otherwise use the determined database name.
34
35    as_dict: bool, default False
36        If `True`, return a dictionary of the keyword arguments
37        necessary to create a new `APIConnector`, otherwise create a new object.
38
39    Returns
40    -------
41    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
42    """
43    from meerschaum.connectors.sql import SQLConnector
44    params = SQLConnector.parse_uri(uri)
45    if 'host' not in params:
46        error("No host was found in the provided URI.")
47    params['protocol'] = params.pop('flavor')
48    params['label'] = label or (
49        (
50            (params['username'] + '@' if 'username' in params else '')
51            + params['host']
52        ).lower()
53    )
54
55    return cls(**params) if not as_dict else params

Create a new APIConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.
Returns
  • A new APIConnector object or a dictionary of attributes (if as_dict is True).
def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
29    """
30    Return a dictionary of remote jobs.
31    """
32    response = self.get(JOBS_ENDPOINT, debug=debug)
33    if not response:
34        warn(f"Failed to get remote jobs from {self}.")
35        return {}
36    return {
37        name: Job(
38            name,
39            job_meta['sysargs'],
40            executor_keys=str(self),
41            _properties=job_meta['daemon']['properties']
42        )
43        for name, job_meta in response.json().items()
44    }

Return a dictionary of remote jobs.

def get_job(self, name: str, debug: bool = False) -> meerschaum.Job:
47def get_job(self, name: str, debug: bool = False) -> Job:
48    """
49    Return a single Job object.
50    """
51    metadata = self.get_job_metadata(name, debug=debug)
52    if not metadata:
53        raise ValueError(f"Job '{name}' does not exist.")
54
55    return Job(
56        name,
57        metadata['sysargs'],
58        executor_keys=str(self),
59        _properties=metadata['daemon']['properties'],
60    )

Return a single Job object.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 64    """
 65    Return the metadata for a single job.
 66    """
 67    now = time.perf_counter()
 68    _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None)
 69    _job_metadata_timestamp = (
 70        _job_metadata_cache.get(name, {}).get('timestamp', None)
 71    ) if _job_metadata_cache is not None else None
 72
 73    if (
 74        _job_metadata_timestamp is not None
 75        and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS
 76    ):
 77        if debug:
 78            dprint(f"Returning cached metadata for job '{name}'.")
 79        return _job_metadata_cache[name]['metadata']
 80
 81    response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug)
 82    if not response:
 83        if debug:
 84            msg = (
 85                response.json()['detail']
 86                if 'detail' in response.text
 87                else response.text
 88            )
 89            warn(f"Failed to get metadata for job '{name}':\n{msg}")
 90        return {}
 91
 92    metadata = response.json()
 93    if _job_metadata_cache is None:
 94        self._job_metadata_cache = {}
 95
 96    self._job_metadata_cache[name] = {
 97        'timestamp': now,
 98        'metadata': metadata,
 99    }
100    return metadata

Return the metadata for a single job.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
103    """
104    Return the daemon properties for a single job.
105    """
106    metadata = self.get_job_metadata(name, debug=debug)
107    return metadata.get('daemon', {}).get('properties', {})

Return the daemon properties for a single job.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
149def get_job_exists(self, name: str, debug: bool = False) -> bool:
150    """
151    Return whether a job exists.
152    """
153    response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug)
154    if not response:
155        warn(f"Failed to determine whether job '{name}' exists.")
156        return False
157
158    return response.json()

Return whether a job exists.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
162    """
163    Delete a job.
164    """
165    response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug)
166    if not response:
167        if 'detail' in response.text:
168            return False, response.json()['detail']
169
170        return False, response.text
171
172    return tuple(response.json())

Delete a job.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
176    """
177    Start a job.
178    """
179    response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug)
180    if not response:
181        if 'detail' in response.text:
182            return False, response.json()['detail']
183        return False, response.text
184
185    return tuple(response.json())

Start a job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, str]] = None, debug: bool = False) -> Tuple[bool, str]:
188def create_job(
189    self,
190    name: str,
191    sysargs: List[str],
192    properties: Optional[Dict[str, str]] = None,
193    debug: bool = False,
194) -> SuccessTuple:
195    """
196    Create a job.
197    """
198    response = self.post(
199        JOBS_ENDPOINT + f"/{name}",
200        json={
201            'sysargs': sysargs,
202            'properties': properties,
203        },
204        debug=debug,
205    )
206    if not response:
207        if 'detail' in response.text:
208            return False, response.json()['detail']
209        return False, response.text
210
211    return tuple(response.json())

Create a job.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
215    """
216    Stop a job.
217    """
218    response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug)
219    if not response:
220        if 'detail' in response.text:
221            return False, response.json()['detail']
222        return False, response.text
223
224    return tuple(response.json())

Stop a job.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
228    """
229    Pause a job.
230    """
231    response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug)
232    if not response:
233        if 'detail' in response.text:
234            return False, response.json()['detail']
235        return False, response.text
236
237    return tuple(response.json())

Pause a job.

def get_logs(self, name: str, debug: bool = False) -> str:
240def get_logs(self, name: str, debug: bool = False) -> str:
241    """
242    Return the logs for a job.
243    """
244    response = self.get(LOGS_ENDPOINT + f"/{name}")
245    if not response:
246        raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}")
247
248    return response.json()

Return the logs for a job.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
252    """
253    Return the job's manual stop time.
254    """
255    response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time")
256    if not response:
257        warn(f"Failed to get stop time for job '{name}':\n{response.text}")
258        return None
259
260    data = response.json()
261    if data is None:
262        return None
263
264    return datetime.fromisoformat(data)

Return the job's manual stop time.

def monitor_logs( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[NoneType], str], stop_callback_function: Callable[[NoneType], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
348def monitor_logs(
349    self,
350    name: str,
351    callback_function: Callable[[Any], Any],
352    input_callback_function: Callable[[None], str],
353    stop_callback_function: Callable[[None], str],
354    stop_on_exit: bool = False,
355    strip_timestamps: bool = False,
356    accept_input: bool = True,
357    debug: bool = False,
358):
359    """
360    Monitor a job's log files and execute a callback with the changes.
361    """
362    return asyncio.run(
363        self.monitor_logs_async(
364            name,
365            callback_function,
366            input_callback_function=input_callback_function,
367            stop_callback_function=stop_callback_function,
368            stop_on_exit=stop_on_exit,
369            strip_timestamps=strip_timestamps,
370            accept_input=accept_input,
371            debug=debug
372        )
373    )

Monitor a job's log files and execute a callback with the changes.

async def monitor_logs_async( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[], str], stop_callback_function: Callable[[Tuple[bool, str]], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
267async def monitor_logs_async(
268    self,
269    name: str,
270    callback_function: Callable[[Any], Any],
271    input_callback_function: Callable[[], str],
272    stop_callback_function: Callable[[SuccessTuple], str],
273    stop_on_exit: bool = False,
274    strip_timestamps: bool = False,
275    accept_input: bool = True,
276    debug: bool = False,
277):
278    """
279    Monitor a job's log files and await a callback with the changes.
280    """
281    import traceback
282    from meerschaum.jobs import StopMonitoringLogs
283    from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
284
285    websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions')
286    protocol = 'ws' if self.URI.startswith('http://') else 'wss'
287    port = self.port if 'port' in self.__dict__ else ''
288    uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws"
289
290    async def _stdin_callback(client):
291        if input_callback_function is None:
292            return
293
294        if asyncio.iscoroutinefunction(input_callback_function):
295            data = await input_callback_function()
296        else:
297            data = input_callback_function()
298
299        await client.send(data)
300
301    async def _stop_callback(client):
302        try:
303            result = tuple(json.loads(await client.recv()))
304        except Exception as e:
305            warn(traceback.format_exc())
306            result = False, str(e)
307
308        if stop_callback_function is not None:
309            if asyncio.iscoroutinefunction(stop_callback_function):
310                await stop_callback_function(result)
311            else:
312                stop_callback_function(result)
313
314        if stop_on_exit:
315            raise StopMonitoringLogs
316
317    message_callbacks = {
318        JOBS_STDIN_MESSAGE: _stdin_callback,
319        JOBS_STOP_MESSAGE: _stop_callback,
320    }
321
322    async with websockets.connect(uri) as websocket:
323        try:
324            await websocket.send(self.token or 'no-login')
325        except websockets_exceptions.ConnectionClosedOK:
326            pass
327
328        while True:
329            try:
330                response = await websocket.recv()
331                callback = message_callbacks.get(response, None)
332                if callback is not None:
333                    await callback(websocket)
334                    continue
335
336                if strip_timestamps:
337                    response = strip_timestamp_from_line(response)
338
339                if asyncio.iscoroutinefunction(callback_function):
340                    await callback_function(response)
341                else:
342                    callback_function(response)
343            except (KeyboardInterrupt, StopMonitoringLogs):
344                await websocket.close()
345                break

Monitor a job's log files and await a callback with the changes.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
376    """
377    Return whether a remote job is blocking on stdin.
378    """
379    response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug)
380    if not response:
381        return False
382
383    return response.json()

Return whether a remote job is blocking on stdin.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
117    """
118    Return a job's `began` timestamp, if it exists.
119    """
120    properties = self.get_job_properties(name, debug=debug)
121    began_str = properties.get('daemon', {}).get('began', None)
122    if began_str is None:
123        return None
124
125    return began_str

Return a job's began timestamp, if it exists.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
128    """
129    Return a job's `ended` timestamp, if it exists.
130    """
131    properties = self.get_job_properties(name, debug=debug)
132    ended_str = properties.get('daemon', {}).get('ended', None)
133    if ended_str is None:
134        return None
135
136    return ended_str

Return a job's ended timestamp, if it exists.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
139    """
140    Return a job's `paused` timestamp, if it exists.
141    """
142    properties = self.get_job_properties(name, debug=debug)
143    paused_str = properties.get('daemon', {}).get('paused', None)
144    if paused_str is None:
145        return None
146
147    return paused_str

Return a job's paused timestamp, if it exists.

def get_job_status(self, name: str, debug: bool = False) -> str:
109def get_job_status(self, name: str, debug: bool = False) -> str:
110    """
111    Return the job's status.
112    """
113    metadata = self.get_job_metadata(name, debug=debug)
114    return metadata.get('status', 'stopped')

Return the job's status.