meerschaum.connectors.api

Interact with the Meerschaum API (send commands, pull data, etc.)

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Interact with the Meerschaum API (send commands, pull data, etc.)
 7"""
 8
 9from meerschaum.connectors.api._APIConnector import APIConnector
10
11__all__ = ('APIConnector',)
class APIConnector(meerschaum.connectors._Connector.Connector):
 20class APIConnector(Connector):
 21    """
 22    Connect to a Meerschaum API instance.
 23    """
 24
 25    IS_INSTANCE: bool = True
 26    IS_THREAD_SAFE: bool = False
 27
 28    OPTIONAL_ATTRIBUTES: List[str] = ['port']
 29
 30    from ._request import (
 31        make_request,
 32        get,
 33        post,
 34        put,
 35        patch,
 36        delete,
 37        wget,
 38    )
 39    from ._actions import (
 40        get_actions,
 41        do_action,
 42        do_action_async,
 43        do_action_legacy,
 44    )
 45    from ._misc import get_mrsm_version, get_chaining_status
 46    from ._pipes import (
 47        register_pipe,
 48        fetch_pipes_keys,
 49        edit_pipe,
 50        sync_pipe,
 51        delete_pipe,
 52        get_pipe_data,
 53        get_pipe_id,
 54        get_pipe_attributes,
 55        get_sync_time,
 56        pipe_exists,
 57        create_metadata,
 58        get_pipe_rowcount,
 59        drop_pipe,
 60        clear_pipe,
 61        get_pipe_columns_types,
 62        get_pipe_columns_indices,
 63    )
 64    from ._fetch import fetch
 65    from ._plugins import (
 66        register_plugin,
 67        install_plugin,
 68        delete_plugin,
 69        get_plugins,
 70        get_plugin_attributes,
 71    )
 72    from ._login import login, test_connection
 73    from ._users import (
 74        register_user,
 75        get_user_id,
 76        get_users,
 77        edit_user,
 78        delete_user,
 79        get_user_password_hash,
 80        get_user_type,
 81        get_user_attributes,
 82    )
 83    from ._uri import from_uri
 84    from ._jobs import (
 85        get_jobs,
 86        get_job,
 87        get_job_metadata,
 88        get_job_properties,
 89        get_job_exists,
 90        delete_job,
 91        start_job,
 92        create_job,
 93        stop_job,
 94        pause_job,
 95        get_logs,
 96        get_job_stop_time,
 97        monitor_logs,
 98        monitor_logs_async,
 99        get_job_is_blocking_on_stdin,
100        get_job_began,
101        get_job_ended,
102        get_job_paused,
103        get_job_status,
104    )
105
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        wait: bool = False,
110        debug: bool = False,
111        **kw
112    ):
113        if 'uri' in kw:
114            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
115            label = label or from_uri_params.get('label', None)
116            _ = from_uri_params.pop('label', None)
117            kw.update(from_uri_params)
118
119        super().__init__('api', label=label, **kw)
120        if 'protocol' not in self.__dict__:
121            self.protocol = (
122                'https' if self.__dict__.get('uri', '').startswith('https')
123                else 'http'
124            )
125
126        if 'uri' not in self.__dict__:
127            self.verify_attributes(required_attributes)
128        else:
129            from meerschaum.connectors.sql import SQLConnector
130            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
131            if 'host' not in conn_attrs:
132                raise Exception(f"Invalid URI for '{self}'.")
133            self.__dict__.update(conn_attrs)
134
135        self.url = (
136            self.protocol + '://' +
137            self.host
138            + (
139                (':' + str(self.port))
140                if self.__dict__.get('port', None)
141                else ''
142            )
143        )
144        self._token = None
145        self._expires = None
146        self._session = None
147
148
149    @property
150    def URI(self) -> str:
151        """
152        Return the fully qualified URI.
153        """
154        username = self.__dict__.get('username', None)
155        password = self.__dict__.get('password', None)
156        creds = (username + ':' + password + '@') if username and password else ''
157        return (
158            self.protocol
159            + '://'
160            + creds
161            + self.host
162            + (
163                (':' + str(self.port))
164                if self.__dict__.get('port', None)
165                else ''
166            )
167        )
168
169
170    @property
171    def session(self):
172        if self._session is None:
173            certifi = attempt_import('certifi', lazy=False)
174            requests = attempt_import('requests', lazy=False)
175            if requests:
176                self._session = requests.Session()
177            if self._session is None:
178                error(f"Failed to import requests. Is requests installed?")
179        return self._session
180
181    @property
182    def token(self):
183        expired = (
184            True if self._expires is None else (
185                (
186                    self._expires
187                    <
188                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
189                )
190            )
191        )
192
193        if self._token is None or expired:
194            success, msg = self.login()
195            if not success:
196                warn(msg, stack=False)
197        return self._token

Connect to a Meerschaum API instance.

APIConnector( label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        wait: bool = False,
110        debug: bool = False,
111        **kw
112    ):
113        if 'uri' in kw:
114            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
115            label = label or from_uri_params.get('label', None)
116            _ = from_uri_params.pop('label', None)
117            kw.update(from_uri_params)
118
119        super().__init__('api', label=label, **kw)
120        if 'protocol' not in self.__dict__:
121            self.protocol = (
122                'https' if self.__dict__.get('uri', '').startswith('https')
123                else 'http'
124            )
125
126        if 'uri' not in self.__dict__:
127            self.verify_attributes(required_attributes)
128        else:
129            from meerschaum.connectors.sql import SQLConnector
130            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
131            if 'host' not in conn_attrs:
132                raise Exception(f"Invalid URI for '{self}'.")
133            self.__dict__.update(conn_attrs)
134
135        self.url = (
136            self.protocol + '://' +
137            self.host
138            + (
139                (':' + str(self.port))
140                if self.__dict__.get('port', None)
141                else ''
142            )
143        )
144        self._token = None
145        self._expires = None
146        self._session = None

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
IS_INSTANCE: bool = True
IS_THREAD_SAFE: bool = False
OPTIONAL_ATTRIBUTES: List[str] = ['port']
url
URI: str
149    @property
150    def URI(self) -> str:
151        """
152        Return the fully qualified URI.
153        """
154        username = self.__dict__.get('username', None)
155        password = self.__dict__.get('password', None)
156        creds = (username + ':' + password + '@') if username and password else ''
157        return (
158            self.protocol
159            + '://'
160            + creds
161            + self.host
162            + (
163                (':' + str(self.port))
164                if self.__dict__.get('port', None)
165                else ''
166            )
167        )

Return the fully qualified URI.

session
170    @property
171    def session(self):
172        if self._session is None:
173            certifi = attempt_import('certifi', lazy=False)
174            requests = attempt_import('requests', lazy=False)
175            if requests:
176                self._session = requests.Session()
177            if self._session is None:
178                error(f"Failed to import requests. Is requests installed?")
179        return self._session
token
181    @property
182    def token(self):
183        expired = (
184            True if self._expires is None else (
185                (
186                    self._expires
187                    <
188                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
189                )
190            )
191        )
192
193        if self._token is None or expired:
194            success, msg = self.login()
195            if not success:
196                warn(msg, stack=False)
197        return self._token
def make_request( self, method: str, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kwargs: Any) -> requests.models.Response:
 28def make_request(
 29    self,
 30    method: str,
 31    r_url: str,
 32    headers: Optional[Dict[str, Any]] = None,
 33    use_token: bool = True,
 34    debug: bool = False,
 35    **kwargs: Any
 36) -> 'requests.Response':
 37    """
 38    Make a request to this APIConnector's endpoint using the in-memory session.
 39
 40    Parameters
 41    ----------
 42    method: str
 43        The kind of request to make.
 44        Accepted values:
 45        - `'GET'`
 46        - `'OPTIONS'`
 47        - `'HEAD'`
 48        - `'POST'`
 49        - `'PUT'`
 50        - `'PATCH'`
 51        - `'DELETE'`
 52
 53    r_url: str
 54        The relative URL for the endpoint (e.g. `'/pipes'`).
 55
 56    headers: Optional[Dict[str, Any]], default None
 57        The headers to use for the request.
 58        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
 59
 60    use_token: bool, default True
 61        If `True`, add the authorization token to the headers.
 62
 63    debug: bool, default False
 64        Verbosity toggle.
 65
 66    kwargs: Any
 67        All other keyword arguments are passed to `requests.request`.
 68
 69    Returns
 70    -------
 71    A `requests.Reponse` object.
 72    """
 73    if method.upper() not in METHODS:
 74        raise ValueError(f"Method '{method}' is not supported.")
 75
 76    verify = self.__dict__.get('verify', None)
 77    if 'verify' not in kwargs and isinstance(verify, bool):
 78        kwargs['verify'] = verify
 79
 80    headers = (
 81        copy.deepcopy(headers)
 82        if isinstance(headers, dict)
 83        else {}
 84    )
 85
 86    if use_token:
 87        headers.update({'Authorization': f'Bearer {self.token}'})
 88
 89    if 'timeout' not in kwargs:
 90        kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout']
 91
 92    request_url = urllib.parse.urljoin(self.url, r_url)
 93    if debug:
 94        dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}")
 95
 96    return self.session.request(
 97        method.upper(),
 98        request_url,
 99        headers = headers,
100        **kwargs
101    )

Make a request to this APIConnector's endpoint using the in-memory session.

Parameters
  • method (str): The kind of request to make. Accepted values:
    • 'GET'
    • 'OPTIONS'
    • 'HEAD'
    • 'POST'
    • 'PUT'
    • 'PATCH'
    • 'DELETE'
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def get(self, r_url: str, **kwargs: Any) -> requests.models.Response:
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response':
105    """
106    Wrapper for `requests.get`.
107
108    Parameters
109    ----------
110    r_url: str
111        The relative URL for the endpoint (e.g. `'/pipes'`).
112
113    headers: Optional[Dict[str, Any]], default None
114        The headers to use for the request.
115        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
116
117    use_token: bool, default True
118        If `True`, add the authorization token to the headers.
119
120    debug: bool, default False
121        Verbosity toggle.
122
123    kwargs: Any
124        All other keyword arguments are passed to `requests.request`.
125
126    Returns
127    -------
128    A `requests.Reponse` object.
129
130    """
131    return self.make_request('GET', r_url, **kwargs)

Wrapper for requests.get.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def post(self, r_url: str, **kwargs: Any) -> requests.models.Response:
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response':
135    """
136    Wrapper for `requests.post`.
137
138    Parameters
139    ----------
140    r_url: str
141        The relative URL for the endpoint (e.g. `'/pipes'`).
142
143    headers: Optional[Dict[str, Any]], default None
144        The headers to use for the request.
145        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
146
147    use_token: bool, default True
148        If `True`, add the authorization token to the headers.
149
150    debug: bool, default False
151        Verbosity toggle.
152
153    kwargs: Any
154        All other keyword arguments are passed to `requests.request`.
155
156    Returns
157    -------
158    A `requests.Reponse` object.
159
160    """
161    return self.make_request('POST', r_url, **kwargs)

Wrapper for requests.post.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def put(self, r_url: str, **kwargs: Any) -> requests.models.Response:
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response':
194    """
195    Wrapper for `requests.put`.
196
197    Parameters
198    ----------
199    r_url: str
200        The relative URL for the endpoint (e.g. `'/pipes'`).
201
202    headers: Optional[Dict[str, Any]], default None
203        The headers to use for the request.
204        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
205
206    use_token: bool, default True
207        If `True`, add the authorization token to the headers.
208
209    debug: bool, default False
210        Verbosity toggle.
211
212    kwargs: Any
213        All other keyword arguments are passed to `requests.request`.
214
215    Returns
216    -------
217    A `requests.Reponse` object.
218    """
219    return self.make_request('PUT', r_url, **kwargs)

Wrapper for requests.put.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def patch(self, r_url: str, **kwargs: Any) -> requests.models.Response:
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response':
165    """
166    Wrapper for `requests.patch`.
167
168    Parameters
169    ----------
170    r_url: str
171        The relative URL for the endpoint (e.g. `'/pipes'`).
172
173    headers: Optional[Dict[str, Any]], default None
174        The headers to use for the request.
175        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
176
177    use_token: bool, default True
178        If `True`, add the authorization token to the headers.
179
180    debug: bool, default False
181        Verbosity toggle.
182
183    kwargs: Any
184        All other keyword arguments are passed to `requests.request`.
185
186    Returns
187    -------
188    A `requests.Reponse` object.
189    """
190    return self.make_request('PATCH', r_url, **kwargs)

Wrapper for requests.patch.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def delete(self, r_url: str, **kwargs: Any) -> requests.models.Response:
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response':
223    """
224    Wrapper for `requests.delete`.
225
226    Parameters
227    ----------
228    r_url: str
229        The relative URL for the endpoint (e.g. `'/pipes'`).
230
231    headers: Optional[Dict[str, Any]], default None
232        The headers to use for the request.
233        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
234
235    use_token: bool, default True
236        If `True`, add the authorization token to the headers.
237
238    debug: bool, default False
239        Verbosity toggle.
240
241    kwargs: Any
242        All other keyword arguments are passed to `requests.request`.
243
244    Returns
245    -------
246    A `requests.Reponse` object.
247    """
248    return self.make_request('DELETE', r_url, **kwargs)

Wrapper for requests.delete.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def wget( self, r_url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
251def wget(
252        self,
253        r_url: str,
254        dest: Optional[Union[str, pathlib.Path]] = None,
255        headers: Optional[Dict[str, Any]] = None,
256        use_token: bool = True,
257        debug: bool = False,
258        **kw: Any
259    ) -> pathlib.Path:
260    """Mimic wget with requests.
261    """
262    from meerschaum.utils.misc import wget
263    if headers is None:
264        headers = {}
265    if use_token:
266        headers.update({'Authorization': f'Bearer {self.token}'})
267    request_url = urllib.parse.urljoin(self.url, r_url)
268    if debug:
269        dprint(
270            f"[{self}] Downloading {request_url}"
271            + (f' to {dest}' if dest is not None else '')
272            + "..."
273        )
274    return wget(request_url, dest=dest, headers=headers, **kw)

Mimic wget with requests.

def get_actions(self):
24def get_actions(self):
25    """Get available actions from the API instance."""
26    return self.get(ACTIONS_ENDPOINT)

Get available actions from the API instance.

def do_action(self, sysargs: List[str]) -> Tuple[bool, str]:
29def do_action(self, sysargs: List[str]) -> SuccessTuple:
30    """
31    Execute a Meerschaum action remotely.
32    """
33    return asyncio.run(self.do_action_async(sysargs))

Execute a Meerschaum action remotely.

async def do_action_async( self, sysargs: List[str], callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='')) -> Tuple[bool, str]:
36async def do_action_async(
37    self,
38    sysargs: List[str],
39    callback_function: Callable[[str], None] = partial(print, end=''),
40) -> SuccessTuple:
41    """
42    Execute an action as a temporary remote job.
43    """
44    from meerschaum._internal.arguments import remove_api_executor_keys
45    from meerschaum.utils.misc import generate_password
46    sysargs = remove_api_executor_keys(sysargs)
47
48    job_name = TEMP_PREFIX + generate_password(12)
49    job = mrsm.Job(job_name, sysargs, executor_keys=str(self))
50
51    start_success, start_msg = job.start()
52    if not start_success:
53        return start_success, start_msg
54
55    await job.monitor_logs_async(
56        callback_function=callback_function,
57        stop_on_exit=True,
58        strip_timestamps=True,
59    )
60
61    success, msg = job.result
62    job.delete()
63    return success, msg

Execute an action as a temporary remote job.

def do_action_legacy( self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
 66def do_action_legacy(
 67    self,
 68    action: Optional[List[str]] = None,
 69    sysargs: Optional[List[str]] = None,
 70    debug: bool = False,
 71    **kw
 72) -> SuccessTuple:
 73    """
 74    NOTE: This method is deprecated.
 75    Please use `do_action()` or `do_action_async()`.
 76
 77    Execute a Meerschaum action remotely.
 78
 79    If `sysargs` are provided, parse those instead.
 80    Otherwise infer everything from keyword arguments.
 81
 82    Examples
 83    --------
 84    >>> conn = mrsm.get_connector('api:main')
 85    >>> conn.do_action(['show', 'pipes'])
 86    (True, "Success")
 87    >>> conn.do_action(['show', 'arguments'], name='test')
 88    (True, "Success")
 89    """
 90    import sys, json
 91    from meerschaum.utils.debug import dprint
 92    from meerschaum.config.static import STATIC_CONFIG
 93    from meerschaum.utils.misc import json_serialize_datetime
 94    if action is None:
 95        action = []
 96
 97    if sysargs is not None and action and action[0] == '':
 98        from meerschaum._internal.arguments import parse_arguments
 99        if debug:
100            dprint(f"Parsing sysargs:\n{sysargs}")
101        json_dict = parse_arguments(sysargs)
102    else:
103        json_dict = kw
104        json_dict['action'] = action
105        if 'noask' not in kw:
106            json_dict['noask'] = True
107        if 'yes' not in kw:
108            json_dict['yes'] = True
109        if debug:
110            json_dict['debug'] = debug
111
112    root_action = json_dict['action'][0]
113    del json_dict['action'][0]
114    r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}"
115    
116    if debug:
117        from meerschaum.utils.formatting import pprint
118        dprint(f"Sending data to '{self.url + r_url}':")
119        pprint(json_dict, stream=sys.stderr)
120
121    response = self.post(
122        r_url,
123        data = json.dumps(json_dict, default=json_serialize_datetime),
124        debug = debug,
125    )
126    try:
127        response_list = json.loads(response.text)
128        if isinstance(response_list, dict) and 'detail' in response_list:
129            return False, response_list['detail']
130    except Exception as e:
131        print(f"Invalid response: {response}")
132        print(e)
133        return False, response.text
134    if debug:
135        dprint(response)
136    try:
137        return response_list[0], response_list[1]
138    except Exception as e:
139        return False, f"Failed to parse result from action '{root_action}'"

NOTE: This method is deprecated. Please use do_action() or do_action_async().

Execute a Meerschaum action remotely.

If sysargs are provided, parse those instead. Otherwise infer everything from keyword arguments.

Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
def get_mrsm_version(self, **kw) -> Optional[str]:
13def get_mrsm_version(self, **kw) -> Optional[str]:
14    """
15    Return the Meerschaum version of the API instance.
16    """
17    from meerschaum.config.static import STATIC_CONFIG
18    try:
19        j = self.get(
20            STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm',
21            use_token=False,
22            **kw
23        ).json()
24    except Exception as e:
25        return None
26    if isinstance(j, dict) and 'detail' in j:
27        return None
28    return j

Return the Meerschaum version of the API instance.

def get_chaining_status(self, **kw) -> Optional[bool]:
30def get_chaining_status(self, **kw) -> Optional[bool]:
31    """
32    Fetch the chaining status of the API instance.
33    """
34    from meerschaum.config.static import STATIC_CONFIG
35    try:
36        response = self.get(
37            STATIC_CONFIG['api']['endpoints']['chaining'],
38            use_token = True,
39            **kw
40        )
41        if not response:
42            return None
43    except Exception as e:
44        return None
45
46    return response.json()

Fetch the chaining status of the API instance.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
34def register_pipe(
35    self,
36    pipe: mrsm.Pipe,
37    debug: bool = False
38) -> SuccessTuple:
39    """Submit a POST to the API to register a new Pipe object.
40    Returns a tuple of (success_bool, response_dict).
41    """
42    from meerschaum.utils.debug import dprint
43    from meerschaum.config.static import STATIC_CONFIG
44    ### NOTE: if `parameters` is supplied in the Pipe constructor,
45    ###       then `pipe.parameters` will exist and not be fetched from the database.
46    r_url = pipe_r_url(pipe)
47    response = self.post(
48        r_url + '/register',
49        json = pipe.parameters,
50        debug = debug,
51    )
52    if debug:
53        dprint(response.text)
54
55    if not response:
56        return False, response.text
57
58    response_data = response.json()
59    if isinstance(response_data, list):
60        response_tuple = response_data[0], response_data[1]
61    elif 'detail' in response.json():
62        response_tuple = response.__bool__(), response_data['detail']
63    else:
64        response_tuple = response.__bool__(), response.text
65    return response_tuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> List[Tuple[str, str, Optional[str]]]:
101def fetch_pipes_keys(
102    self,
103    connector_keys: Optional[List[str]] = None,
104    metric_keys: Optional[List[str]] = None,
105    location_keys: Optional[List[str]] = None,
106    tags: Optional[List[str]] = None,
107    params: Optional[Dict[str, Any]] = None,
108    debug: bool = False
109) -> Union[List[Tuple[str, str, Union[str, None]]]]:
110    """
111    Fetch registered Pipes' keys from the API.
112    
113    Parameters
114    ----------
115    connector_keys: Optional[List[str]], default None
116        The connector keys for the query.
117
118    metric_keys: Optional[List[str]], default None
119        The metric keys for the query.
120
121    location_keys: Optional[List[str]], default None
122        The location keys for the query.
123
124    tags: Optional[List[str]], default None
125        A list of tags for the query.
126
127    params: Optional[Dict[str, Any]], default None
128        A parameters dictionary for filtering against the `pipes` table
129        (e.g. `{'connector_keys': 'plugin:foo'}`).
130        Not recommeded to be used.
131
132    debug: bool, default False
133        Verbosity toggle.
134
135    Returns
136    -------
137    A list of tuples containing pipes' keys.
138    """
139    from meerschaum.config.static import STATIC_CONFIG
140    if connector_keys is None:
141        connector_keys = []
142    if metric_keys is None:
143        metric_keys = []
144    if location_keys is None:
145        location_keys = []
146    if tags is None:
147        tags = []
148
149    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
150    try:
151        j = self.get(
152            r_url,
153            params = {
154                'connector_keys': json.dumps(connector_keys),
155                'metric_keys': json.dumps(metric_keys),
156                'location_keys': json.dumps(location_keys),
157                'tags': json.dumps(tags),
158                'params': json.dumps(params),
159            },
160            debug=debug
161        ).json()
162    except Exception as e:
163        error(str(e))
164
165    if 'detail' in j:
166        error(j['detail'], stack=False)
167    return [tuple(r) for r in j]

Fetch registered Pipes' keys from the API.

Parameters
  • connector_keys (Optional[List[str]], default None): The connector keys for the query.
  • metric_keys (Optional[List[str]], default None): The metric keys for the query.
  • location_keys (Optional[List[str]], default None): The location keys for the query.
  • tags (Optional[List[str]], default None): A list of tags for the query.
  • params (Optional[Dict[str, Any]], default None): A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of tuples containing pipes' keys.
def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False) -> Tuple[bool, str]:
68def edit_pipe(
69    self,
70    pipe: mrsm.Pipe,
71    patch: bool = False,
72    debug: bool = False,
73) -> SuccessTuple:
74    """Submit a PATCH to the API to edit an existing Pipe object.
75    Returns a tuple of (success_bool, response_dict).
76    """
77    from meerschaum.utils.debug import dprint
78    ### NOTE: if `parameters` is supplied in the Pipe constructor,
79    ###       then `pipe.parameters` will exist and not be fetched from the database.
80    r_url = pipe_r_url(pipe)
81    response = self.patch(
82        r_url + '/edit',
83        params = {'patch': patch,},
84        json = pipe.parameters,
85        debug = debug,
86    )
87    if debug:
88        dprint(response.text)
89
90    response_data = response.json()
91
92    if isinstance(response.json(), list):
93        response_tuple = response_data[0], response_data[1]
94    elif 'detail' in response.json():
95        response_tuple = response.__bool__(), response_data['detail']
96    else:
97        response_tuple = response.__bool__(), response.text
98    return response_tuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

def sync_pipe( self, pipe: meerschaum.Pipe, df: "Optional[Union['pd.DataFrame', Dict[Any, Any], str]]" = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
170def sync_pipe(
171    self,
172    pipe: mrsm.Pipe,
173    df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None,
174    chunksize: Optional[int] = -1,
175    debug: bool = False,
176    **kw: Any
177) -> SuccessTuple:
178    """Sync a DataFrame into a Pipe."""
179    from decimal import Decimal
180    from meerschaum.utils.debug import dprint
181    from meerschaum.utils.misc import json_serialize_datetime, items_str
182    from meerschaum.config import get_config
183    from meerschaum.utils.packages import attempt_import
184    from meerschaum.utils.dataframe import get_numeric_cols, to_json
185    begin = time.time()
186    more_itertools = attempt_import('more_itertools')
187    if df is None:
188        msg = f"DataFrame is `None`. Cannot sync {pipe}."
189        return False, msg
190
191    def get_json_str(c):
192        ### allow syncing dict or JSON without needing to import pandas (for IOT devices)
193        if isinstance(c, (dict, list)):
194            return json.dumps(c, default=json_serialize_datetime)
195        return to_json(c, orient='columns')
196
197    df = json.loads(df) if isinstance(df, str) else df
198
199    _chunksize: Optional[int] = (1 if chunksize is None else (
200        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
201        else chunksize
202    ))
203    keys: List[str] = list(df.columns)
204    chunks = []
205    if hasattr(df, 'index'):
206        df = df.reset_index(drop=True)
207        is_dask = 'dask' in df.__module__
208        chunks = (
209            (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize))
210            if not is_dask
211            else [partition.compute() for partition in df.partitions]
212        )
213
214        numeric_cols = get_numeric_cols(df)
215        if numeric_cols:
216            for col in numeric_cols:
217                df[col] = df[col].apply(lambda x: f'{x:f}' if isinstance(x, Decimal) else x)
218            pipe_dtypes = pipe.dtypes
219            new_numeric_cols = [
220                col
221                for col in numeric_cols
222                if pipe_dtypes.get(col, None) != 'numeric'
223            ]
224            pipe.dtypes.update({
225                col: 'numeric'
226                for col in new_numeric_cols
227            })
228            edit_success, edit_msg = pipe.edit(debug=debug)
229            if not edit_success:
230                warn(
231                    "Failed to update new numeric columns "
232                    + f"{items_str(new_numeric_cols)}:\n{edit_msg}"
233                )
234    elif isinstance(df, dict):
235        ### `_chunks` is a dict of lists of dicts.
236        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
237        _chunks = {k: [] for k in keys}
238        for k in keys:
239            chunk_iter = more_itertools.chunked(df[k], _chunksize)
240            for l in chunk_iter:
241                _chunks[k].append({k: l})
242
243        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
244        for k, l in _chunks.items():
245            for i, c in enumerate(l):
246                try:
247                    chunks[i].update(c)
248                except IndexError:
249                    chunks.append(c)
250    elif isinstance(df, list):
251        chunks = (df[i] for i in more_itertools.chunked(df, _chunksize))
252
253    ### Send columns in case the user has defined them locally.
254    if pipe.columns:
255        kw['columns'] = json.dumps(pipe.columns)
256    r_url = pipe_r_url(pipe) + '/data'
257
258    rowcount = 0
259    num_success_chunks = 0
260    for i, c in enumerate(chunks):
261        if debug:
262            dprint(f"[{self}] Posting chunk {i} to {r_url}...")
263        if len(c) == 0:
264            if debug:
265                dprint(f"[{self}] Skipping empty chunk...")
266            continue
267        json_str = get_json_str(c)
268
269        try:
270            response = self.post(
271                r_url,
272                ### handles check_existing
273                params = kw,
274                data = json_str,
275                debug = debug
276            )
277        except Exception as e:
278            msg = f"Failed to post a chunk to {pipe}:\n{e}"
279            warn(msg)
280            return False, msg
281            
282        if not response:
283            return False, f"Failed to sync a chunk:\n{response.text}"
284
285        try:
286            j = json.loads(response.text)
287        except Exception as e:
288            return False, f"Failed to parse response from syncing {pipe}:\n{e}"
289
290        if isinstance(j, dict) and 'detail' in j:
291            return False, j['detail']
292
293        try:
294            j = tuple(j)
295        except Exception as e:
296            return False, response.text
297
298        if debug:
299            dprint("Received response: " + str(j))
300        if not j[0]:
301            return j
302
303        rowcount += len(c)
304        num_success_chunks += 1
305
306    success_tuple = True, (
307        f"It took {round(time.time() - begin, 2)} seconds to sync {rowcount} row"
308        + ('s' if rowcount != 1 else '')
309        + f" across {num_success_chunks} chunk" + ('s' if num_success_chunks != 1 else '') +
310        f" to {pipe}."
311    )
312    return success_tuple

Sync a DataFrame into a Pipe.

def delete_pipe( self, pipe: Optional[meerschaum.Pipe] = None, debug: bool = None) -> Tuple[bool, str]:
315def delete_pipe(
316    self,
317    pipe: Optional[meerschaum.Pipe] = None,
318    debug: bool = None,        
319) -> SuccessTuple:
320    """Delete a Pipe and drop its table."""
321    if pipe is None:
322        error(f"Pipe cannot be None.")
323    r_url = pipe_r_url(pipe)
324    response = self.delete(
325        r_url + '/delete',
326        debug = debug,
327    )
328    if debug:
329        dprint(response.text)
330
331    response_data = response.json()
332    if isinstance(response.json(), list):
333        response_tuple = response_data[0], response_data[1]
334    elif 'detail' in response.json():
335        response_tuple = response.__bool__(), response_data['detail']
336    else:
337        response_tuple = response.__bool__(), response.text
338    return response_tuple

Delete a Pipe and drop its table.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
341def get_pipe_data(
342    self,
343    pipe: meerschaum.Pipe,
344    select_columns: Optional[List[str]] = None,
345    omit_columns: Optional[List[str]] = None,
346    begin: Union[str, datetime, int, None] = None,
347    end: Union[str, datetime, int, None] = None,
348    params: Optional[Dict[str, Any]] = None,
349    as_chunks: bool = False,
350    debug: bool = False,
351    **kw: Any
352) -> Union[pandas.DataFrame, None]:
353    """Fetch data from the API."""
354    r_url = pipe_r_url(pipe)
355    chunks_list = []
356    while True:
357        try:
358            response = self.get(
359                r_url + "/data",
360                params={
361                    'select_columns': json.dumps(select_columns),
362                    'omit_columns': json.dumps(omit_columns),
363                    'begin': begin,
364                    'end': end,
365                    'params': json.dumps(params, default=str)
366                },
367                debug=debug
368            )
369            if not response.ok:
370                return None
371            j = response.json()
372        except Exception as e:
373            warn(f"Failed to get data for {pipe}:\n{e}")
374            return None
375        if isinstance(j, dict) and 'detail' in j:
376            return False, j['detail']
377        break
378
379    from meerschaum.utils.packages import import_pandas
380    from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df
381    from meerschaum.utils.dtypes import are_dtypes_equal
382    pd = import_pandas()
383    try:
384        df = pd.read_json(StringIO(response.text))
385    except Exception as e:
386        warn(f"Failed to parse response for {pipe}:\n{e}")
387        return None
388
389    if len(df.columns) == 0:
390        return add_missing_cols_to_df(df, pipe.dtypes)
391
392    df = parse_df_datetimes(
393        df,
394        ignore_cols = [
395            col
396            for col, dtype in pipe.dtypes.items()
397            if not are_dtypes_equal(str(dtype), 'datetime')
398        ],
399        strip_timezone=(pipe.tzinfo is None),
400        debug=debug,
401    )
402    return df

Fetch data from the API.

def get_pipe_id(self, pipe: meerschaum.Pipe, debug: bool = False) -> int:
405def get_pipe_id(
406    self,
407    pipe: mrsm.Pipe,
408    debug: bool = False,
409) -> int:
410    """Get a Pipe's ID from the API."""
411    from meerschaum.utils.misc import is_int
412    r_url = pipe_r_url(pipe)
413    response = self.get(
414        r_url + '/id',
415        debug = debug
416    )
417    if debug:
418        dprint(f"Got pipe ID: {response.text}")
419    try:
420        if is_int(response.text):
421            return int(response.text)
422    except Exception as e:
423        warn(f"Failed to get the ID for {pipe}:\n{e}")
424    return None

Get a Pipe's ID from the API.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
427def get_pipe_attributes(
428    self,
429    pipe: mrsm.Pipe,
430    debug: bool = False,
431) -> Dict[str, Any]:
432    """Get a Pipe's attributes from the API
433
434    Parameters
435    ----------
436    pipe: meerschaum.Pipe
437        The pipe whose attributes we are fetching.
438        
439    Returns
440    -------
441    A dictionary of a pipe's attributes.
442    If the pipe does not exist, return an empty dictionary.
443    """
444    r_url = pipe_r_url(pipe)
445    response = self.get(r_url + '/attributes', debug=debug)
446    try:
447        return json.loads(response.text)
448    except Exception as e:
449        warn(f"Failed to get the attributes for {pipe}:\n{e}")
450    return {}

Get a Pipe's attributes from the API

Parameters
Returns
  • A dictionary of a pipe's attributes.
  • If the pipe does not exist, return an empty dictionary.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
453def get_sync_time(
454    self,
455    pipe: mrsm.Pipe,
456    params: Optional[Dict[str, Any]] = None,
457    newest: bool = True,
458    debug: bool = False,
459) -> Union[datetime, int, None]:
460    """Get a Pipe's most recent datetime value from the API.
461
462    Parameters
463    ----------
464    pipe: meerschaum.Pipe
465        The pipe to select from.
466
467    params: Optional[Dict[str, Any]], default None
468        Optional params dictionary to build the WHERE clause.
469
470    newest: bool, default True
471        If `True`, get the most recent datetime (honoring `params`).
472        If `False`, get the oldest datetime (ASC instead of DESC).
473
474    Returns
475    -------
476    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
477    rounded down to the closest minute.
478    """
479    from meerschaum.utils.misc import is_int
480    from meerschaum.utils.warnings import warn
481    r_url = pipe_r_url(pipe)
482    response = self.get(
483        r_url + '/sync_time',
484        json = params,
485        params = {'newest': newest, 'debug': debug},
486        debug = debug,
487    )
488    if not response:
489        warn(f"Failed to get the sync time for {pipe}:\n" + response.text)
490        return None
491
492    j = response.json()
493    if j is None:
494        dt = None
495    else:
496        try:
497            dt = (
498                datetime.fromisoformat(j)
499                if not is_int(j)
500                else int(j)
501            )
502        except Exception as e:
503            warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}")
504            dt = None
505    return dt

Get a Pipe's most recent datetime value from the API.

Parameters
  • pipe (meerschaum.Pipe): The pipe to select from.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
Returns
  • The most recent (or oldest if newest is False) datetime of a pipe,
  • rounded down to the closest minute.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
508def pipe_exists(
509    self,
510    pipe: mrsm.Pipe,
511    debug: bool = False
512) -> bool:
513    """Check the API to see if a Pipe exists.
514
515    Parameters
516    ----------
517    pipe: 'meerschaum.Pipe'
518        The pipe which were are querying.
519        
520    Returns
521    -------
522    A bool indicating whether a pipe's underlying table exists.
523    """
524    from meerschaum.utils.debug import dprint
525    from meerschaum.utils.warnings import warn
526    r_url = pipe_r_url(pipe)
527    response = self.get(r_url + '/exists', debug=debug)
528    if not response:
529        warn(f"Failed to check if {pipe} exists:\n{response.text}")
530        return False
531    if debug:
532        dprint("Received response: " + str(response.text))
533    j = response.json()
534    if isinstance(j, dict) and 'detail' in j:
535        warn(j['detail'])
536    return j

Check the API to see if a Pipe exists.

Parameters
Returns
  • A bool indicating whether a pipe's underlying table exists.
def create_metadata(self, debug: bool = False) -> bool:
539def create_metadata(
540    self,
541    debug: bool = False
542) -> bool:
543    """Create metadata tables.
544
545    Returns
546    -------
547    A bool indicating success.
548    """
549    from meerschaum.utils.debug import dprint
550    from meerschaum.config.static import STATIC_CONFIG
551    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
552    response = self.post(r_url, debug=debug)
553    if debug:
554        dprint("Create metadata response: {response.text}")
555    try:
556        metadata_response = json.loads(response.text)
557    except Exception as e:
558        warn(f"Failed to create metadata on {self}:\n{e}")
559        metadata_response = False
560    return False

Create metadata tables.

Returns
  • A bool indicating success.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
563def get_pipe_rowcount(
564    self,
565    pipe: mrsm.Pipe,
566    begin: Optional[datetime] = None,
567    end: Optional[datetime] = None,
568    params: Optional[Dict[str, Any]] = None,
569    remote: bool = False,
570    debug: bool = False,
571) -> int:
572    """Get a pipe's row count from the API.
573
574    Parameters
575    ----------
576    pipe: 'meerschaum.Pipe':
577        The pipe whose row count we are counting.
578        
579    begin: Optional[datetime], default None
580        If provided, bound the count by this datetime.
581
582    end: Optional[datetime]
583        If provided, bound the count by this datetime.
584
585    params: Optional[Dict[str, Any]], default None
586        If provided, bound the count by these parameters.
587
588    remote: bool, default False
589
590    Returns
591    -------
592    The number of rows in the pipe's table, bound the given parameters.
593    If the table does not exist, return 0.
594    """
595    r_url = pipe_r_url(pipe)
596    response = self.get(
597        r_url + "/rowcount",
598        json = params,
599        params = {
600            'begin': begin,
601            'end': end,
602            'remote': remote,
603        },
604        debug = debug
605    )
606    if not response:
607        warn(f"Failed to get the rowcount for {pipe}:\n{response.text}")
608        return 0
609    try:
610        return int(json.loads(response.text))
611    except Exception as e:
612        warn(f"Failed to get the rowcount for {pipe}:\n{e}")
613    return 0

Get a pipe's row count from the API.

Parameters
  • pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
  • begin (Optional[datetime], default None): If provided, bound the count by this datetime.
  • end (Optional[datetime]): If provided, bound the count by this datetime.
  • params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
  • remote (bool, default False):
Returns
  • The number of rows in the pipe's table, bound the given parameters.
  • If the table does not exist, return 0.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
616def drop_pipe(
617    self,
618    pipe: mrsm.Pipe,
619    debug: bool = False
620) -> SuccessTuple:
621    """
622    Drop a pipe's table but maintain its registration.
623
624    Parameters
625    ----------
626    pipe: meerschaum.Pipe:
627        The pipe to be dropped.
628        
629    Returns
630    -------
631    A success tuple (bool, str).
632    """
633    from meerschaum.utils.warnings import error
634    from meerschaum.utils.debug import dprint
635    if pipe is None:
636        error(f"Pipe cannot be None.")
637    r_url = pipe_r_url(pipe)
638    response = self.delete(
639        r_url + '/drop',
640        debug = debug,
641    )
642    if debug:
643        dprint(response.text)
644
645    try:
646        data = response.json()
647    except Exception as e:
648        return False, f"Failed to drop {pipe}."
649
650    if isinstance(data, list):
651        response_tuple = data[0], data[1]
652    elif 'detail' in response.json():
653        response_tuple = response.__bool__(), data['detail']
654    else:
655        response_tuple = response.__bool__(), response.text
656
657    return response_tuple

Drop a pipe's table but maintain its registration.

Parameters
Returns
  • A success tuple (bool, str).
def clear_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw) -> Tuple[bool, str]:
660def clear_pipe(
661    self,
662    pipe: mrsm.Pipe,
663    debug: bool = False,
664    **kw
665) -> SuccessTuple:
666    """
667    Delete rows in a pipe's table.
668
669    Parameters
670    ----------
671    pipe: meerschaum.Pipe
672        The pipe with rows to be deleted.
673        
674    Returns
675    -------
676    A success tuple.
677    """
678    kw.pop('metric_keys', None)
679    kw.pop('connector_keys', None)
680    kw.pop('location_keys', None)
681    kw.pop('action', None)
682    kw.pop('force', None)
683    return self.do_action_legacy(
684        ['clear', 'pipes'],
685        connector_keys=pipe.connector_keys,
686        metric_keys=pipe.metric_key,
687        location_keys=pipe.location_key,
688        force=True,
689        debug=debug,
690        **kw
691    )

Delete rows in a pipe's table.

Parameters
Returns
  • A success tuple.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
694def get_pipe_columns_types(
695    self,
696    pipe: mrsm.Pipe,
697    debug: bool = False,
698) -> Union[Dict[str, str], None]:
699    """
700    Fetch the columns and types of the pipe's table.
701
702    Parameters
703    ----------
704    pipe: meerschaum.Pipe
705        The pipe whose columns to be queried.
706
707    Returns
708    -------
709    A dictionary mapping column names to their database types.
710
711    Examples
712    --------
713    >>> {
714    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
715    ...   'id': 'BIGINT',
716    ...   'val': 'DOUBLE PRECISION',
717    ... }
718    >>>
719    """
720    r_url = pipe_r_url(pipe) + '/columns/types'
721    response = self.get(
722        r_url,
723        debug=debug
724    )
725    j = response.json()
726    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
727        warn(j['detail'])
728        return None
729    if not isinstance(j, dict):
730        warn(response.text)
731        return None
732    return j

Fetch the columns and types of the pipe's table.

Parameters
Returns
  • A dictionary mapping column names to their database types.
Examples
>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
def get_pipe_columns_indices( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
735def get_pipe_columns_indices(
736    self,
737    pipe: mrsm.Pipe,
738    debug: bool = False,
739) -> Union[Dict[str, str], None]:
740    """
741    Fetch the index information for a pipe.
742
743    Parameters
744    ----------
745    pipe: mrsm.Pipe
746        The pipe whose columns to be queried.
747
748    Returns
749    -------
750    A dictionary mapping column names to a list of associated index information.
751    """
752    r_url = pipe_r_url(pipe) + '/columns/indices'
753    response = self.get(
754        r_url,
755        debug=debug
756    )
757    j = response.json()
758    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
759        warn(j['detail'])
760        return None
761    if not isinstance(j, dict):
762        warn(response.text)
763        return None
764    return j

Fetch the index information for a pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose columns to be queried.
Returns
  • A dictionary mapping column names to a list of associated index information.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, str, int] = '', end: Union[datetime.datetime, int] = None, params: 'Optional[Dict, Any]' = None, debug: bool = False, **kw: Any) -> Iterator[pandas.core.frame.DataFrame]:
16def fetch(
17        self,
18        pipe: mrsm.Pipe,
19        begin: Union[datetime, str, int] = '',
20        end: Union[datetime, int] = None,
21        params: Optional[Dict, Any] = None,
22        debug: bool = False,
23        **kw: Any
24    ) -> Iterator['pd.DataFrame']:
25    """Get the Pipe data from the remote Pipe."""
26    from meerschaum.utils.debug import dprint
27    from meerschaum.utils.warnings import warn, error
28    from meerschaum.config._patch import apply_patch_to_config
29
30    fetch_params = pipe.parameters.get('fetch', {})
31    if not fetch_params:
32        warn(f"Missing 'fetch' parameters for {pipe}.", stack=False)
33        return None
34
35    pipe_meta = fetch_params.get('pipe', {})
36    ### Legacy: check for `connector_keys`, etc. at the root.
37    if not pipe_meta:
38        ck, mk, lk = (
39            fetch_params.get('connector_keys', None),
40            fetch_params.get('metric_key', None),
41            fetch_params.get('location_key', None),
42        )
43        if not ck or not mk:
44            warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False)
45            return None
46
47        pipe_meta.update({
48            'connector': ck,
49            'metric': mk,
50            'location': lk,
51        })
52
53    pipe_meta['instance'] = self
54    source_pipe = mrsm.Pipe(**pipe_meta)
55
56    _params = copy.deepcopy(params) if params is not None else {}
57    _params = apply_patch_to_config(_params, fetch_params.get('params', {}))
58    select_columns = fetch_params.get('select_columns', [])
59    omit_columns = fetch_params.get('omit_columns', [])
60
61    return source_pipe.get_data(
62        select_columns = select_columns,
63        omit_columns = omit_columns,
64        begin = begin,
65        end = end,
66        params = _params,
67        debug = debug,
68        as_iterator = True,
69    )

Get the Pipe data from the remote Pipe.

def register_plugin( self, plugin: meerschaum.Plugin, make_archive: bool = True, debug: bool = False) -> Tuple[bool, str]:
20def register_plugin(
21        self,
22        plugin: meerschaum.core.Plugin,
23        make_archive: bool = True,
24        debug: bool = False,
25    ) -> SuccessTuple:
26    """Register a plugin and upload its archive."""
27    import json
28    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
29    file_pointer = open(archive_path, 'rb')
30    files = {'archive': file_pointer}
31    metadata = {
32        'version': plugin.version,
33        'attributes': json.dumps(plugin.attributes),
34    }
35    r_url = plugin_r_url(plugin)
36    try:
37        response = self.post(r_url, files=files, params=metadata, debug=debug)
38    except Exception as e:
39        return False, f"Failed to register plugin '{plugin}'."
40    finally:
41        file_pointer.close()
42
43    try:
44        success, msg = json.loads(response.text)
45    except Exception as e:
46        return False, response.text
47
48    return success, msg

Register a plugin and upload its archive.

def install_plugin( self, name: str, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
50def install_plugin(
51        self,
52        name: str,
53        skip_deps: bool = False,
54        force: bool = False,
55        debug: bool = False
56    ) -> SuccessTuple:
57    """Download and attempt to install a plugin from the API."""
58    import os, pathlib, json
59    from meerschaum.core import Plugin
60    from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH
61    from meerschaum.utils.debug import dprint
62    from meerschaum.utils.packages import attempt_import
63    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
64    r_url = plugin_r_url(name)
65    dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
66    if debug:
67        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
68    archive_path = self.wget(r_url, dest, debug=debug) 
69    is_binary = binaryornot_check.is_binary(str(archive_path))
70    if not is_binary:
71        fail_msg = f"Failed to download binary for plugin '{name}'."
72        try:
73            with open(archive_path, 'r') as f:
74                j = json.load(f)
75            if isinstance(j, list):
76                success, msg = tuple(j)
77            elif isinstance(j, dict) and 'detail' in j:
78                success, msg = False, fail_msg
79        except Exception as e:
80            success, msg = False, fail_msg
81        return success, msg
82    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
83    return plugin.install(skip_deps=skip_deps, force=force, debug=debug)

Download and attempt to install a plugin from the API.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False) -> Tuple[bool, str]:
149def delete_plugin(
150        self,
151        plugin: meerschaum.core.Plugin,
152        debug: bool = False
153    ) -> SuccessTuple:
154    """Delete a plugin from an API repository."""
155    import json
156    r_url = plugin_r_url(plugin)
157    try:
158        response = self.delete(r_url, debug=debug)
159    except Exception as e:
160        return False, f"Failed to delete plugin '{plugin}'."
161
162    try:
163        success, msg = json.loads(response.text)
164    except Exception as e:
165        return False, response.text
166
167    return success, msg

Delete a plugin from an API repository.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) -> Sequence[str]:
 85def get_plugins(
 86        self,
 87        user_id : Optional[int] = None,
 88        search_term : Optional[str] = None,
 89        debug : bool = False
 90    ) -> Sequence[str]:
 91    """Return a list of registered plugin names.
 92
 93    Parameters
 94    ----------
 95    user_id :
 96        If specified, return all plugins from a certain user.
 97    user_id : Optional[int] :
 98         (Default value = None)
 99    search_term : Optional[str] :
100         (Default value = None)
101    debug : bool :
102         (Default value = False)
103
104    Returns
105    -------
106
107    """
108    import json
109    from meerschaum.utils.warnings import warn, error
110    from meerschaum.config.static import STATIC_CONFIG
111    response = self.get(
112        STATIC_CONFIG['api']['endpoints']['plugins'],
113        params = {'user_id' : user_id, 'search_term' : search_term},
114        use_token = True,
115        debug = debug
116    )
117    if not response:
118        return []
119    plugins = json.loads(response.text)
120    if not isinstance(plugins, list):
121        error(response.text)
122    return plugins

Return a list of registered plugin names.

Parameters
  • user_id :: If specified, return all plugins from a certain user.
  • user_id (Optional[int] :): (Default value = None)
  • search_term (Optional[str] :): (Default value = None)
  • debug (bool :): (Default value = False)
  • Returns
  • -------
def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Mapping[str, Any]:
124def get_plugin_attributes(
125        self,
126        plugin: meerschaum.core.Plugin,
127        debug: bool = False
128    ) -> Mapping[str, Any]:
129    """
130    Return a plugin's attributes.
131    """
132    import json
133    from meerschaum.utils.warnings import warn, error
134    r_url = plugin_r_url(plugin) + '/attributes'
135    response = self.get(r_url, use_token=True, debug=debug)
136    attributes = response.json()
137    if isinstance(attributes, str) and attributes and attributes[0] == '{':
138        try:
139            attributes = json.loads(attributes)
140        except Exception as e:
141            pass
142    if not isinstance(attributes, dict):
143        error(response.text)
144    elif not response and 'detail' in attributes:
145        warn(attributes['detail'])
146        return {}
147    return attributes

Return a plugin's attributes.

def login( self, debug: bool = False, warn: bool = True, **kw: Any) -> Tuple[bool, str]:
13def login(
14        self,
15        debug: bool = False,
16        warn: bool = True,
17        **kw: Any
18    ) -> SuccessTuple:
19    """Log in and set the session token."""
20    from meerschaum.utils.warnings import warn as _warn, info, error
21    from meerschaum.core import User
22    from meerschaum.config.static import STATIC_CONFIG
23    import json, datetime
24    try:
25        login_data = {
26            'username': self.username,
27            'password': self.password,
28        }
29    except AttributeError:
30        return False, f"Please login with the command `login {self}`."
31    response = self.post(
32        STATIC_CONFIG['api']['endpoints']['login'],
33        data = login_data,
34        use_token = False,
35        debug = debug
36    )
37    if response:
38        msg = f"Successfully logged into '{self}' as user '{login_data['username']}'."
39        self._token = json.loads(response.text)['access_token']
40        self._expires = datetime.datetime.strptime(
41            json.loads(response.text)['expires'], 
42            '%Y-%m-%dT%H:%M:%S.%f'
43        )
44    else:
45        msg = (
46            f"Failed to log into '{self}' as user '{login_data['username']}'.\n" +
47            f"    Please verify login details for connector '{self}'."
48        )
49        if warn:
50            _warn(msg, stack=False)
51
52    return response.__bool__(), msg

Log in and set the session token.

def test_connection(self, **kw: Any) -> Optional[bool]:
55def test_connection(
56        self,
57        **kw: Any
58    ) -> Union[bool, None]:
59    """Test if a successful connection to the API may be made."""
60    from meerschaum.connectors.poll import retry_connect
61    _default_kw = {
62        'max_retries': 1, 'retry_wait': 0, 'warn': False,
63        'connector': self, 'enforce_chaining': False,
64        'enforce_login': False,
65    }
66    _default_kw.update(kw)
67    try:
68        return retry_connect(**_default_kw)
69    except Exception as e:
70        return False

Test if a successful connection to the API may be made.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
65def register_user(
66        self,
67        user: 'meerschaum.core.User',
68        debug: bool = False,
69        **kw: Any
70    ) -> SuccessTuple:
71    """Register a new user."""
72    import json
73    from meerschaum.config.static import STATIC_CONFIG
74    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register"
75    data = {
76        'username': user.username,
77        'password': user.password,
78        'attributes': json.dumps(user.attributes),
79    }
80    if user.type:
81        data['type'] = user.type
82    if user.email:
83        data['email'] = user.email
84    response = self.post(r_url, data=data, debug=debug)
85    try:
86        _json = json.loads(response.text)
87        if isinstance(_json, dict) and 'detail' in _json:
88            return False, _json['detail']
89        success_tuple = tuple(_json)
90    except Exception:
91        msg = response.text if response else f"Failed to register user '{user}'."
92        return False, msg
93
94    return tuple(success_tuple)

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[int]:
 97def get_user_id(
 98        self,
 99        user: 'meerschaum.core.User',
100        debug: bool = False,
101        **kw: Any
102    ) -> Optional[int]:
103    """Get a user's ID."""
104    from meerschaum.config.static import STATIC_CONFIG
105    import json
106    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id"
107    response = self.get(r_url, debug=debug, **kw)
108    try:
109        user_id = int(json.loads(response.text))
110    except Exception as e:
111        user_id = None
112    return user_id

Get a user's ID.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
13def get_users(
14        self,
15        debug: bool = False,
16        **kw : Any
17    ) -> List[str]:
18    """
19    Return a list of registered usernames.
20    """
21    from meerschaum.config.static import STATIC_CONFIG
22    import json
23    response = self.get(
24        f"{STATIC_CONFIG['api']['endpoints']['users']}",
25        debug = debug,
26        use_token = True,
27    )
28    if not response:
29        return []
30    try:
31        return response.json()
32    except Exception as e:
33        return []

Return a list of registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
35def edit_user(
36        self,
37        user: 'meerschaum.core.User',
38        debug: bool = False,
39        **kw: Any
40    ) -> SuccessTuple:
41    """Edit an existing user."""
42    import json
43    from meerschaum.config.static import STATIC_CONFIG
44    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
45    data = {
46        'username': user.username,
47        'password': user.password,
48        'type': user.type,
49        'email': user.email,
50        'attributes': json.dumps(user.attributes),
51    }
52    response = self.post(r_url, data=data, debug=debug)
53    try:
54        _json = json.loads(response.text)
55        if isinstance(_json, dict) and 'detail' in _json:
56            return False, _json['detail']
57        success_tuple = tuple(_json)
58    except Exception as e:
59        msg = response.text if response else f"Failed to edit user '{user}'."
60        return False, msg
61
62    return tuple(success_tuple)

Edit an existing user.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
114def delete_user(
115        self,
116        user: 'meerschaum.core.User',
117        debug: bool = False,
118        **kw: Any
119    ) -> SuccessTuple:
120    """Delete a user."""
121    from meerschaum.config.static import STATIC_CONFIG
122    import json
123    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}"
124    response = self.delete(r_url, debug=debug)
125    try:
126        _json = json.loads(response.text)
127        if isinstance(_json, dict) and 'detail' in _json:
128            return False, _json['detail']
129        success_tuple = tuple(_json)
130    except Exception as e:
131        success_tuple = False, f"Failed to delete user '{user.username}'."
132    return success_tuple

Delete a user.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
155def get_user_password_hash(
156        self,
157        user: 'meerschaum.core.User',
158        debug: bool = False,
159        **kw: Any
160    ) -> Optional[str]:
161    """If configured, get a user's password hash."""
162    from meerschaum.config.static import STATIC_CONFIG
163    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
164    response = self.get(r_url, debug=debug, **kw)
165    if not response:
166        return None
167    return response.json()

If configured, get a user's password hash.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
169def get_user_type(
170        self,
171        user: 'meerschaum.core.User',
172        debug: bool = False,
173        **kw: Any
174    ) -> Optional[str]:
175    """If configured, get a user's type."""
176    from meerschaum.config.static import STATIC_CONFIG
177    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type'
178    response = self.get(r_url, debug=debug, **kw)
179    if not response:
180        return None
181    return response.json()

If configured, get a user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw) -> int:
134def get_user_attributes(
135        self,
136        user: 'meerschaum.core.User',
137        debug: bool = False,
138        **kw
139    ) -> int:
140    """Get a user's attributes."""
141    from meerschaum.config.static import STATIC_CONFIG
142    import json
143    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes"
144    response = self.get(r_url, debug=debug, **kw)
145    try:
146        attributes = json.loads(response.text)
147    except Exception as e:
148        attributes = None
149    return attributes

Get a user's attributes.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[APIConnector, Dict[str, Union[str, int]]]:
13@classmethod
14def from_uri(
15    cls,
16    uri: str,
17    label: Optional[str] = None,
18    as_dict: bool = False,
19) -> Union[
20        'meerschaum.connectors.APIConnector',
21        Dict[str, Union[str, int]],
22    ]:
23    """
24    Create a new APIConnector from a URI string.
25
26    Parameters
27    ----------
28    uri: str
29        The URI connection string.
30
31    label: Optional[str], default None
32        If provided, use this as the connector label.
33        Otherwise use the determined database name.
34
35    as_dict: bool, default False
36        If `True`, return a dictionary of the keyword arguments
37        necessary to create a new `APIConnector`, otherwise create a new object.
38
39    Returns
40    -------
41    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
42    """
43    from meerschaum.connectors.sql import SQLConnector
44    params = SQLConnector.parse_uri(uri)
45    if 'host' not in params:
46        error("No host was found in the provided URI.")
47    params['protocol'] = params.pop('flavor')
48    params['label'] = label or (
49        (
50            (params['username'] + '@' if 'username' in params else '')
51            + params['host']
52        ).lower()
53    )
54
55    return cls(**params) if not as_dict else params

Create a new APIConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.
Returns
  • A new APIConnector object or a dictionary of attributes (if as_dict is True).
def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
29    """
30    Return a dictionary of remote jobs.
31    """
32    response = self.get(JOBS_ENDPOINT, debug=debug)
33    if not response:
34        warn(f"Failed to get remote jobs from {self}.")
35        return {}
36    return {
37        name: Job(
38            name,
39            job_meta['sysargs'],
40            executor_keys=str(self),
41            _properties=job_meta['daemon']['properties']
42        )
43        for name, job_meta in response.json().items()
44    }

Return a dictionary of remote jobs.

def get_job(self, name: str, debug: bool = False) -> meerschaum.Job:
47def get_job(self, name: str, debug: bool = False) -> Job:
48    """
49    Return a single Job object.
50    """
51    metadata = self.get_job_metadata(name, debug=debug)
52    if not metadata:
53        raise ValueError(f"Job '{name}' does not exist.")
54
55    return Job(
56        name,
57        metadata['sysargs'],
58        executor_keys=str(self),
59        _properties=metadata['daemon']['properties'],
60    )

Return a single Job object.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 64    """
 65    Return the metadata for a single job.
 66    """
 67    now = time.perf_counter()
 68    _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None)
 69    _job_metadata_timestamp = (
 70        _job_metadata_cache.get(name, {}).get('timestamp', None)
 71    ) if _job_metadata_cache is not None else None
 72
 73    if (
 74        _job_metadata_timestamp is not None
 75        and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS
 76    ):
 77        if debug:
 78            dprint(f"Returning cached metadata for job '{name}'.")
 79        return _job_metadata_cache[name]['metadata']
 80
 81    response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug)
 82    if not response:
 83        if debug:
 84            msg = (
 85                response.json()['detail']
 86                if 'detail' in response.text
 87                else response.text
 88            )
 89            warn(f"Failed to get metadata for job '{name}':\n{msg}")
 90        return {}
 91
 92    metadata = response.json()
 93    if _job_metadata_cache is None:
 94        self._job_metadata_cache = {}
 95
 96    self._job_metadata_cache[name] = {
 97        'timestamp': now,
 98        'metadata': metadata,
 99    }
100    return metadata

Return the metadata for a single job.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
103    """
104    Return the daemon properties for a single job.
105    """
106    metadata = self.get_job_metadata(name, debug=debug)
107    return metadata.get('daemon', {}).get('properties', {})

Return the daemon properties for a single job.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
149def get_job_exists(self, name: str, debug: bool = False) -> bool:
150    """
151    Return whether a job exists.
152    """
153    response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug)
154    if not response:
155        warn(f"Failed to determine whether job '{name}' exists.")
156        return False
157
158    return response.json()

Return whether a job exists.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
162    """
163    Delete a job.
164    """
165    response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug)
166    if not response:
167        if 'detail' in response.text:
168            return False, response.json()['detail']
169
170        return False, response.text
171
172    return tuple(response.json())

Delete a job.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
176    """
177    Start a job.
178    """
179    response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug)
180    if not response:
181        if 'detail' in response.text:
182            return False, response.json()['detail']
183        return False, response.text
184
185    return tuple(response.json())

Start a job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, str]] = None, debug: bool = False) -> Tuple[bool, str]:
188def create_job(
189    self,
190    name: str,
191    sysargs: List[str],
192    properties: Optional[Dict[str, str]] = None,
193    debug: bool = False,
194) -> SuccessTuple:
195    """
196    Create a job.
197    """
198    response = self.post(
199        JOBS_ENDPOINT + f"/{name}",
200        json={
201            'sysargs': sysargs,
202            'properties': properties,
203        },
204        debug=debug,
205    )
206    if not response:
207        if 'detail' in response.text:
208            return False, response.json()['detail']
209        return False, response.text
210
211    return tuple(response.json())

Create a job.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
215    """
216    Stop a job.
217    """
218    response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug)
219    if not response:
220        if 'detail' in response.text:
221            return False, response.json()['detail']
222        return False, response.text
223
224    return tuple(response.json())

Stop a job.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
228    """
229    Pause a job.
230    """
231    response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug)
232    if not response:
233        if 'detail' in response.text:
234            return False, response.json()['detail']
235        return False, response.text
236
237    return tuple(response.json())

Pause a job.

def get_logs(self, name: str, debug: bool = False) -> str:
240def get_logs(self, name: str, debug: bool = False) -> str:
241    """
242    Return the logs for a job.
243    """
244    response = self.get(LOGS_ENDPOINT + f"/{name}")
245    if not response:
246        raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}")
247
248    return response.json()

Return the logs for a job.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
252    """
253    Return the job's manual stop time.
254    """
255    response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time")
256    if not response:
257        warn(f"Failed to get stop time for job '{name}':\n{response.text}")
258        return None
259
260    data = response.json()
261    if data is None:
262        return None
263
264    return datetime.fromisoformat(data)

Return the job's manual stop time.

def monitor_logs( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[NoneType], str], stop_callback_function: Callable[[NoneType], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
348def monitor_logs(
349    self,
350    name: str,
351    callback_function: Callable[[Any], Any],
352    input_callback_function: Callable[[None], str],
353    stop_callback_function: Callable[[None], str],
354    stop_on_exit: bool = False,
355    strip_timestamps: bool = False,
356    accept_input: bool = True,
357    debug: bool = False,
358):
359    """
360    Monitor a job's log files and execute a callback with the changes.
361    """
362    return asyncio.run(
363        self.monitor_logs_async(
364            name,
365            callback_function,
366            input_callback_function=input_callback_function,
367            stop_callback_function=stop_callback_function,
368            stop_on_exit=stop_on_exit,
369            strip_timestamps=strip_timestamps,
370            accept_input=accept_input,
371            debug=debug
372        )
373    )

Monitor a job's log files and execute a callback with the changes.

async def monitor_logs_async( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[], str], stop_callback_function: Callable[[Tuple[bool, str]], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
267async def monitor_logs_async(
268    self,
269    name: str,
270    callback_function: Callable[[Any], Any],
271    input_callback_function: Callable[[], str],
272    stop_callback_function: Callable[[SuccessTuple], str],
273    stop_on_exit: bool = False,
274    strip_timestamps: bool = False,
275    accept_input: bool = True,
276    debug: bool = False,
277):
278    """
279    Monitor a job's log files and await a callback with the changes.
280    """
281    import traceback
282    from meerschaum.jobs import StopMonitoringLogs
283    from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
284
285    websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions')
286    protocol = 'ws' if self.URI.startswith('http://') else 'wss'
287    port = self.port if 'port' in self.__dict__ else ''
288    uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws"
289
290    async def _stdin_callback(client):
291        if input_callback_function is None:
292            return
293
294        if asyncio.iscoroutinefunction(input_callback_function):
295            data = await input_callback_function()
296        else:
297            data = input_callback_function()
298
299        await client.send(data)
300
301    async def _stop_callback(client):
302        try:
303            result = tuple(json.loads(await client.recv()))
304        except Exception as e:
305            warn(traceback.format_exc())
306            result = False, str(e)
307
308        if stop_callback_function is not None:
309            if asyncio.iscoroutinefunction(stop_callback_function):
310                await stop_callback_function(result)
311            else:
312                stop_callback_function(result)
313
314        if stop_on_exit:
315            raise StopMonitoringLogs
316
317    message_callbacks = {
318        JOBS_STDIN_MESSAGE: _stdin_callback,
319        JOBS_STOP_MESSAGE: _stop_callback,
320    }
321
322    async with websockets.connect(uri) as websocket:
323        try:
324            await websocket.send(self.token or 'no-login')
325        except websockets_exceptions.ConnectionClosedOK:
326            pass
327
328        while True:
329            try:
330                response = await websocket.recv()
331                callback = message_callbacks.get(response, None)
332                if callback is not None:
333                    await callback(websocket)
334                    continue
335
336                if strip_timestamps:
337                    response = strip_timestamp_from_line(response)
338
339                if asyncio.iscoroutinefunction(callback_function):
340                    await callback_function(response)
341                else:
342                    callback_function(response)
343            except (KeyboardInterrupt, StopMonitoringLogs):
344                await websocket.close()
345                break

Monitor a job's log files and await a callback with the changes.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
376    """
377    Return whether a remote job is blocking on stdin.
378    """
379    response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug)
380    if not response:
381        return False
382
383    return response.json()

Return whether a remote job is blocking on stdin.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
117    """
118    Return a job's `began` timestamp, if it exists.
119    """
120    properties = self.get_job_properties(name, debug=debug)
121    began_str = properties.get('daemon', {}).get('began', None)
122    if began_str is None:
123        return None
124
125    return began_str

Return a job's began timestamp, if it exists.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
128    """
129    Return a job's `ended` timestamp, if it exists.
130    """
131    properties = self.get_job_properties(name, debug=debug)
132    ended_str = properties.get('daemon', {}).get('ended', None)
133    if ended_str is None:
134        return None
135
136    return ended_str

Return a job's ended timestamp, if it exists.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
139    """
140    Return a job's `paused` timestamp, if it exists.
141    """
142    properties = self.get_job_properties(name, debug=debug)
143    paused_str = properties.get('daemon', {}).get('paused', None)
144    if paused_str is None:
145        return None
146
147    return paused_str

Return a job's paused timestamp, if it exists.

def get_job_status(self, name: str, debug: bool = False) -> str:
109def get_job_status(self, name: str, debug: bool = False) -> str:
110    """
111    Return the job's status.
112    """
113    metadata = self.get_job_metadata(name, debug=debug)
114    return metadata.get('status', 'stopped')

Return the job's status.