meerschaum.connectors.api

Interact with the Meerschaum API (send commands, pull data, etc.)

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Interact with the Meerschaum API (send commands, pull data, etc.)
 7"""
 8
 9from meerschaum.connectors.api._APIConnector import APIConnector
10
11__all__ = ('APIConnector',)
class APIConnector(meerschaum.connectors._Connector.Connector):
 22class APIConnector(Connector):
 23    """
 24    Connect to a Meerschaum API instance.
 25    """
 26
 27    IS_INSTANCE: bool = True
 28    IS_THREAD_SAFE: bool = False
 29
 30    OPTIONAL_ATTRIBUTES: List[str] = ['port']
 31
 32    from ._request import (
 33        make_request,
 34        get,
 35        post,
 36        put,
 37        patch,
 38        delete,
 39        wget,
 40    )
 41    from ._actions import (
 42        get_actions,
 43        do_action,
 44        do_action_async,
 45        do_action_legacy,
 46    )
 47    from ._misc import get_mrsm_version, get_chaining_status
 48    from ._pipes import (
 49        get_pipe_instance_keys,
 50        register_pipe,
 51        fetch_pipes_keys,
 52        edit_pipe,
 53        sync_pipe,
 54        delete_pipe,
 55        get_pipe_data,
 56        get_pipe_id,
 57        get_pipe_attributes,
 58        get_sync_time,
 59        pipe_exists,
 60        create_metadata,
 61        get_pipe_rowcount,
 62        drop_pipe,
 63        clear_pipe,
 64        get_pipe_columns_types,
 65        get_pipe_columns_indices,
 66    )
 67    from ._fetch import fetch
 68    from ._plugins import (
 69        register_plugin,
 70        install_plugin,
 71        delete_plugin,
 72        get_plugins,
 73        get_plugin_attributes,
 74    )
 75    from ._login import login, test_connection
 76    from ._users import (
 77        register_user,
 78        get_user_id,
 79        get_users,
 80        edit_user,
 81        delete_user,
 82        get_user_password_hash,
 83        get_user_type,
 84        get_user_attributes,
 85    )
 86    from ._uri import from_uri
 87    from ._jobs import (
 88        get_jobs,
 89        get_job,
 90        get_job_metadata,
 91        get_job_properties,
 92        get_job_exists,
 93        delete_job,
 94        start_job,
 95        create_job,
 96        stop_job,
 97        pause_job,
 98        get_logs,
 99        get_job_stop_time,
100        monitor_logs,
101        monitor_logs_async,
102        get_job_is_blocking_on_stdin,
103        get_job_began,
104        get_job_ended,
105        get_job_paused,
106        get_job_status,
107    )
108
109    def __init__(
110        self,
111        label: Optional[str] = None,
112        wait: bool = False,
113        debug: bool = False,
114        **kw
115    ):
116        if 'uri' in kw:
117            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
118            label = label or from_uri_params.get('label', None)
119            _ = from_uri_params.pop('label', None)
120            kw.update(from_uri_params)
121
122        super().__init__('api', label=label, **kw)
123        if 'protocol' not in self.__dict__:
124            self.protocol = (
125                'https' if self.__dict__.get('uri', '').startswith('https')
126                else 'http'
127            )
128
129        if 'uri' not in self.__dict__:
130            self.verify_attributes(required_attributes)
131        else:
132            from meerschaum.connectors.sql import SQLConnector
133            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
134            if 'host' not in conn_attrs:
135                raise Exception(f"Invalid URI for '{self}'.")
136            self.__dict__.update(conn_attrs)
137
138        self.url = (
139            self.protocol + '://' +
140            self.host
141            + (
142                (':' + str(self.port))
143                if self.__dict__.get('port', None)
144                else ''
145            )
146        )
147        self._token = None
148        self._expires = None
149        self._session = None
150        self._instance_keys = self.__dict__.get('instance_keys', None)
151
152
153    @property
154    def URI(self) -> str:
155        """
156        Return the fully qualified URI.
157        """
158        username = self.__dict__.get('username', None)
159        password = self.__dict__.get('password', None)
160        creds = (username + ':' + password + '@') if username and password else ''
161        return (
162            self.protocol
163            + '://'
164            + creds
165            + self.host
166            + (
167                (':' + str(self.port))
168                if self.__dict__.get('port', None)
169                else ''
170            )
171        )
172
173
174    @property
175    def session(self):
176        if self._session is None:
177            _ = attempt_import('certifi', lazy=False)
178            requests = attempt_import('requests', lazy=False)
179            if requests:
180                self._session = requests.Session()
181            if self._session is None:
182                error("Failed to import requests. Is requests installed?")
183        return self._session
184
185    @property
186    def token(self):
187        expired = (
188            True if self._expires is None else (
189                (
190                    self._expires
191                    <
192                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
193                )
194            )
195        )
196
197        if self._token is None or expired:
198            success, msg = self.login()
199            if not success and not self.__dict__.get('_emitted_warning'):
200                warn(msg, stack=False)
201                self._emitted_warning = True
202        return self._token
203
204    @property
205    def instance_keys(self) -> Union[str, None]:
206        """
207        Return the instance keys to be sent alongside pipe requests.
208        """
209        return self._instance_keys

Connect to a Meerschaum API instance.

APIConnector( label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)
109    def __init__(
110        self,
111        label: Optional[str] = None,
112        wait: bool = False,
113        debug: bool = False,
114        **kw
115    ):
116        if 'uri' in kw:
117            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
118            label = label or from_uri_params.get('label', None)
119            _ = from_uri_params.pop('label', None)
120            kw.update(from_uri_params)
121
122        super().__init__('api', label=label, **kw)
123        if 'protocol' not in self.__dict__:
124            self.protocol = (
125                'https' if self.__dict__.get('uri', '').startswith('https')
126                else 'http'
127            )
128
129        if 'uri' not in self.__dict__:
130            self.verify_attributes(required_attributes)
131        else:
132            from meerschaum.connectors.sql import SQLConnector
133            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
134            if 'host' not in conn_attrs:
135                raise Exception(f"Invalid URI for '{self}'.")
136            self.__dict__.update(conn_attrs)
137
138        self.url = (
139            self.protocol + '://' +
140            self.host
141            + (
142                (':' + str(self.port))
143                if self.__dict__.get('port', None)
144                else ''
145            )
146        )
147        self._token = None
148        self._expires = None
149        self._session = None
150        self._instance_keys = self.__dict__.get('instance_keys', None)

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
IS_INSTANCE: bool = True
IS_THREAD_SAFE: bool = False
OPTIONAL_ATTRIBUTES: List[str] = ['port']
url
URI: str
153    @property
154    def URI(self) -> str:
155        """
156        Return the fully qualified URI.
157        """
158        username = self.__dict__.get('username', None)
159        password = self.__dict__.get('password', None)
160        creds = (username + ':' + password + '@') if username and password else ''
161        return (
162            self.protocol
163            + '://'
164            + creds
165            + self.host
166            + (
167                (':' + str(self.port))
168                if self.__dict__.get('port', None)
169                else ''
170            )
171        )

Return the fully qualified URI.

session
174    @property
175    def session(self):
176        if self._session is None:
177            _ = attempt_import('certifi', lazy=False)
178            requests = attempt_import('requests', lazy=False)
179            if requests:
180                self._session = requests.Session()
181            if self._session is None:
182                error("Failed to import requests. Is requests installed?")
183        return self._session
token
185    @property
186    def token(self):
187        expired = (
188            True if self._expires is None else (
189                (
190                    self._expires
191                    <
192                    datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1)
193                )
194            )
195        )
196
197        if self._token is None or expired:
198            success, msg = self.login()
199            if not success and not self.__dict__.get('_emitted_warning'):
200                warn(msg, stack=False)
201                self._emitted_warning = True
202        return self._token
instance_keys: Optional[str]
204    @property
205    def instance_keys(self) -> Union[str, None]:
206        """
207        Return the instance keys to be sent alongside pipe requests.
208        """
209        return self._instance_keys

Return the instance keys to be sent alongside pipe requests.

def make_request( self, method: str, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kwargs: Any) -> requests.models.Response:
 28def make_request(
 29    self,
 30    method: str,
 31    r_url: str,
 32    headers: Optional[Dict[str, Any]] = None,
 33    use_token: bool = True,
 34    debug: bool = False,
 35    **kwargs: Any
 36) -> 'requests.Response':
 37    """
 38    Make a request to this APIConnector's endpoint using the in-memory session.
 39
 40    Parameters
 41    ----------
 42    method: str
 43        The kind of request to make.
 44        Accepted values:
 45        - `'GET'`
 46        - `'OPTIONS'`
 47        - `'HEAD'`
 48        - `'POST'`
 49        - `'PUT'`
 50        - `'PATCH'`
 51        - `'DELETE'`
 52
 53    r_url: str
 54        The relative URL for the endpoint (e.g. `'/pipes'`).
 55
 56    headers: Optional[Dict[str, Any]], default None
 57        The headers to use for the request.
 58        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
 59
 60    use_token: bool, default True
 61        If `True`, add the authorization token to the headers.
 62
 63    debug: bool, default False
 64        Verbosity toggle.
 65
 66    kwargs: Any
 67        All other keyword arguments are passed to `requests.request`.
 68
 69    Returns
 70    -------
 71    A `requests.Reponse` object.
 72    """
 73    if method.upper() not in METHODS:
 74        raise ValueError(f"Method '{method}' is not supported.")
 75
 76    verify = self.__dict__.get('verify', None)
 77    if 'verify' not in kwargs and isinstance(verify, bool):
 78        kwargs['verify'] = verify
 79
 80    headers = (
 81        copy.deepcopy(headers)
 82        if isinstance(headers, dict)
 83        else {}
 84    )
 85
 86    if use_token:
 87        headers.update({'Authorization': f'Bearer {self.token}'})
 88
 89    if 'timeout' not in kwargs:
 90        kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout']
 91
 92    request_url = urllib.parse.urljoin(self.url, r_url)
 93    if debug:
 94        dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}")
 95
 96    return self.session.request(
 97        method.upper(),
 98        request_url,
 99        headers=headers,
100        **kwargs
101    )

Make a request to this APIConnector's endpoint using the in-memory session.

Parameters
  • method (str): The kind of request to make. Accepted values:
    • 'GET'
    • 'OPTIONS'
    • 'HEAD'
    • 'POST'
    • 'PUT'
    • 'PATCH'
    • 'DELETE'
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def get(self, r_url: str, **kwargs: Any) -> requests.models.Response:
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response':
105    """
106    Wrapper for `requests.get`.
107
108    Parameters
109    ----------
110    r_url: str
111        The relative URL for the endpoint (e.g. `'/pipes'`).
112
113    headers: Optional[Dict[str, Any]], default None
114        The headers to use for the request.
115        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
116
117    use_token: bool, default True
118        If `True`, add the authorization token to the headers.
119
120    debug: bool, default False
121        Verbosity toggle.
122
123    kwargs: Any
124        All other keyword arguments are passed to `requests.request`.
125
126    Returns
127    -------
128    A `requests.Reponse` object.
129
130    """
131    return self.make_request('GET', r_url, **kwargs)

Wrapper for requests.get.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def post(self, r_url: str, **kwargs: Any) -> requests.models.Response:
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response':
135    """
136    Wrapper for `requests.post`.
137
138    Parameters
139    ----------
140    r_url: str
141        The relative URL for the endpoint (e.g. `'/pipes'`).
142
143    headers: Optional[Dict[str, Any]], default None
144        The headers to use for the request.
145        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
146
147    use_token: bool, default True
148        If `True`, add the authorization token to the headers.
149
150    debug: bool, default False
151        Verbosity toggle.
152
153    kwargs: Any
154        All other keyword arguments are passed to `requests.request`.
155
156    Returns
157    -------
158    A `requests.Reponse` object.
159
160    """
161    return self.make_request('POST', r_url, **kwargs)

Wrapper for requests.post.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def put(self, r_url: str, **kwargs: Any) -> requests.models.Response:
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response':
194    """
195    Wrapper for `requests.put`.
196
197    Parameters
198    ----------
199    r_url: str
200        The relative URL for the endpoint (e.g. `'/pipes'`).
201
202    headers: Optional[Dict[str, Any]], default None
203        The headers to use for the request.
204        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
205
206    use_token: bool, default True
207        If `True`, add the authorization token to the headers.
208
209    debug: bool, default False
210        Verbosity toggle.
211
212    kwargs: Any
213        All other keyword arguments are passed to `requests.request`.
214
215    Returns
216    -------
217    A `requests.Reponse` object.
218    """
219    return self.make_request('PUT', r_url, **kwargs)

Wrapper for requests.put.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def patch(self, r_url: str, **kwargs: Any) -> requests.models.Response:
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response':
165    """
166    Wrapper for `requests.patch`.
167
168    Parameters
169    ----------
170    r_url: str
171        The relative URL for the endpoint (e.g. `'/pipes'`).
172
173    headers: Optional[Dict[str, Any]], default None
174        The headers to use for the request.
175        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
176
177    use_token: bool, default True
178        If `True`, add the authorization token to the headers.
179
180    debug: bool, default False
181        Verbosity toggle.
182
183    kwargs: Any
184        All other keyword arguments are passed to `requests.request`.
185
186    Returns
187    -------
188    A `requests.Reponse` object.
189    """
190    return self.make_request('PATCH', r_url, **kwargs)

Wrapper for requests.patch.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def delete(self, r_url: str, **kwargs: Any) -> requests.models.Response:
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response':
223    """
224    Wrapper for `requests.delete`.
225
226    Parameters
227    ----------
228    r_url: str
229        The relative URL for the endpoint (e.g. `'/pipes'`).
230
231    headers: Optional[Dict[str, Any]], default None
232        The headers to use for the request.
233        If `use_token` is `True`, the authorization token will be added to a copy of these headers.
234
235    use_token: bool, default True
236        If `True`, add the authorization token to the headers.
237
238    debug: bool, default False
239        Verbosity toggle.
240
241    kwargs: Any
242        All other keyword arguments are passed to `requests.request`.
243
244    Returns
245    -------
246    A `requests.Reponse` object.
247    """
248    return self.make_request('DELETE', r_url, **kwargs)

Wrapper for requests.delete.

Parameters
  • r_url (str): The relative URL for the endpoint (e.g. '/pipes').
  • headers (Optional[Dict[str, Any]], default None): The headers to use for the request. If use_token is True, the authorization token will be added to a copy of these headers.
  • use_token (bool, default True): If True, add the authorization token to the headers.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All other keyword arguments are passed to requests.request.
Returns
  • A requests.Reponse object.
def wget( self, r_url: str, dest: Union[str, pathlib.Path, NoneType] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) -> pathlib.Path:
251def wget(
252    self,
253    r_url: str,
254    dest: Optional[Union[str, pathlib.Path]] = None,
255    headers: Optional[Dict[str, Any]] = None,
256    use_token: bool = True,
257    debug: bool = False,
258    **kw: Any
259) -> pathlib.Path:
260    """Mimic wget with requests."""
261    from meerschaum.utils.misc import wget
262    if headers is None:
263        headers = {}
264    if use_token:
265        headers.update({'Authorization': f'Bearer {self.token}'})
266    request_url = urllib.parse.urljoin(self.url, r_url)
267    if debug:
268        dprint(
269            f"[{self}] Downloading {request_url}"
270            + (f' to {dest}' if dest is not None else '')
271            + "..."
272        )
273    return wget(request_url, dest=dest, headers=headers, **kw)

Mimic wget with requests.

def get_actions(self):
24def get_actions(self):
25    """Get available actions from the API instance."""
26    return self.get(ACTIONS_ENDPOINT)

Get available actions from the API instance.

def do_action(self, sysargs: List[str]) -> Tuple[bool, str]:
29def do_action(self, sysargs: List[str]) -> SuccessTuple:
30    """
31    Execute a Meerschaum action remotely.
32    """
33    return asyncio.run(self.do_action_async(sysargs))

Execute a Meerschaum action remotely.

async def do_action_async( self, sysargs: List[str], callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='')) -> Tuple[bool, str]:
36async def do_action_async(
37    self,
38    sysargs: List[str],
39    callback_function: Callable[[str], None] = partial(print, end=''),
40) -> SuccessTuple:
41    """
42    Execute an action as a temporary remote job.
43    """
44    from meerschaum._internal.arguments import remove_api_executor_keys
45    from meerschaum.utils.misc import generate_password
46    sysargs = remove_api_executor_keys(sysargs)
47
48    job_name = TEMP_PREFIX + generate_password(12)
49    job = mrsm.Job(job_name, sysargs, executor_keys=str(self))
50
51    start_success, start_msg = job.start()
52    if not start_success:
53        return start_success, start_msg
54
55    await job.monitor_logs_async(
56        callback_function=callback_function,
57        stop_on_exit=True,
58        strip_timestamps=True,
59    )
60
61    success, msg = job.result
62    job.delete()
63    return success, msg

Execute an action as a temporary remote job.

def do_action_legacy( self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
 66def do_action_legacy(
 67    self,
 68    action: Optional[List[str]] = None,
 69    sysargs: Optional[List[str]] = None,
 70    debug: bool = False,
 71    **kw
 72) -> SuccessTuple:
 73    """
 74    NOTE: This method is deprecated.
 75    Please use `do_action()` or `do_action_async()`.
 76
 77    Execute a Meerschaum action remotely.
 78
 79    If `sysargs` are provided, parse those instead.
 80    Otherwise infer everything from keyword arguments.
 81
 82    Examples
 83    --------
 84    >>> conn = mrsm.get_connector('api:main')
 85    >>> conn.do_action(['show', 'pipes'])
 86    (True, "Success")
 87    >>> conn.do_action(['show', 'arguments'], name='test')
 88    (True, "Success")
 89    """
 90    import sys, json
 91    from meerschaum.utils.debug import dprint
 92    from meerschaum.config.static import STATIC_CONFIG
 93    from meerschaum.utils.misc import json_serialize_datetime
 94    if action is None:
 95        action = []
 96
 97    if sysargs is not None and action and action[0] == '':
 98        from meerschaum._internal.arguments import parse_arguments
 99        if debug:
100            dprint(f"Parsing sysargs:\n{sysargs}")
101        json_dict = parse_arguments(sysargs)
102    else:
103        json_dict = kw
104        json_dict['action'] = action
105        if 'noask' not in kw:
106            json_dict['noask'] = True
107        if 'yes' not in kw:
108            json_dict['yes'] = True
109        if debug:
110            json_dict['debug'] = debug
111
112    root_action = json_dict['action'][0]
113    del json_dict['action'][0]
114    r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}"
115    
116    if debug:
117        from meerschaum.utils.formatting import pprint
118        dprint(f"Sending data to '{self.url + r_url}':")
119        pprint(json_dict, stream=sys.stderr)
120
121    response = self.post(
122        r_url,
123        data = json.dumps(json_dict, default=json_serialize_datetime),
124        debug = debug,
125    )
126    try:
127        response_list = json.loads(response.text)
128        if isinstance(response_list, dict) and 'detail' in response_list:
129            return False, response_list['detail']
130    except Exception as e:
131        print(f"Invalid response: {response}")
132        print(e)
133        return False, response.text
134    if debug:
135        dprint(response)
136    try:
137        return response_list[0], response_list[1]
138    except Exception as e:
139        return False, f"Failed to parse result from action '{root_action}'"

NOTE: This method is deprecated. Please use do_action() or do_action_async().

Execute a Meerschaum action remotely.

If sysargs are provided, parse those instead. Otherwise infer everything from keyword arguments.

Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
def get_mrsm_version(self, **kw) -> Optional[str]:
13def get_mrsm_version(self, **kw) -> Optional[str]:
14    """
15    Return the Meerschaum version of the API instance.
16    """
17    from meerschaum.config.static import STATIC_CONFIG
18    try:
19        j = self.get(
20            STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm',
21            use_token=False,
22            **kw
23        ).json()
24    except Exception:
25        return None
26    if isinstance(j, dict) and 'detail' in j:
27        return None
28    return j

Return the Meerschaum version of the API instance.

def get_chaining_status(self, **kw) -> Optional[bool]:
31def get_chaining_status(self, **kw) -> Optional[bool]:
32    """
33    Fetch the chaining status of the API instance.
34    """
35    from meerschaum.config.static import STATIC_CONFIG
36    try:
37        response = self.get(
38            STATIC_CONFIG['api']['endpoints']['chaining'],
39            use_token = True,
40            **kw
41        )
42        if not response:
43            return None
44    except Exception:
45        return None
46
47    return response.json()

Fetch the chaining status of the API instance.

def get_pipe_instance_keys(self, pipe: meerschaum.Pipe) -> Optional[str]:
35def get_pipe_instance_keys(self, pipe: mrsm.Pipe) -> Union[str, None]:
36    """
37    Return the configured instance keys for a pipe if set,
38    else fall back to the default `instance_keys` for this `APIConnector`.
39    """
40    return pipe.parameters.get('instance_keys', self.instance_keys)

Return the configured instance keys for a pipe if set, else fall back to the default instance_keys for this APIConnector.

def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
43def register_pipe(
44    self,
45    pipe: mrsm.Pipe,
46    debug: bool = False
47) -> SuccessTuple:
48    """Submit a POST to the API to register a new Pipe object.
49    Returns a tuple of (success_bool, response_dict).
50    """
51    from meerschaum.utils.debug import dprint
52    r_url = pipe_r_url(pipe)
53    response = self.post(
54        r_url + '/register',
55        json=pipe._attributes.get('parameters', {}),
56        params={'instance_keys': self.get_pipe_instance_keys(pipe)},
57        debug=debug,
58    )
59    if debug:
60        dprint(response.text)
61
62    if not response:
63        return False, response.text
64
65    response_data = response.json()
66    if isinstance(response_data, list):
67        response_tuple = response_data[0], response_data[1]
68    elif 'detail' in response.json():
69        response_tuple = response.__bool__(), response_data['detail']
70    else:
71        response_tuple = response.__bool__(), response.text
72    return response_tuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> List[Tuple[str, str, Optional[str]]]:
108def fetch_pipes_keys(
109    self,
110    connector_keys: Optional[List[str]] = None,
111    metric_keys: Optional[List[str]] = None,
112    location_keys: Optional[List[str]] = None,
113    tags: Optional[List[str]] = None,
114    params: Optional[Dict[str, Any]] = None,
115    debug: bool = False
116) -> Union[List[Tuple[str, str, Union[str, None]]]]:
117    """
118    Fetch registered Pipes' keys from the API.
119    
120    Parameters
121    ----------
122    connector_keys: Optional[List[str]], default None
123        The connector keys for the query.
124
125    metric_keys: Optional[List[str]], default None
126        The metric keys for the query.
127
128    location_keys: Optional[List[str]], default None
129        The location keys for the query.
130
131    tags: Optional[List[str]], default None
132        A list of tags for the query.
133
134    params: Optional[Dict[str, Any]], default None
135        A parameters dictionary for filtering against the `pipes` table
136        (e.g. `{'connector_keys': 'plugin:foo'}`).
137        Not recommeded to be used.
138
139    debug: bool, default False
140        Verbosity toggle.
141
142    Returns
143    -------
144    A list of tuples containing pipes' keys.
145    """
146    from meerschaum.config.static import STATIC_CONFIG
147    if connector_keys is None:
148        connector_keys = []
149    if metric_keys is None:
150        metric_keys = []
151    if location_keys is None:
152        location_keys = []
153    if tags is None:
154        tags = []
155
156    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
157    try:
158        j = self.get(
159            r_url,
160            params={
161                'connector_keys': json.dumps(connector_keys),
162                'metric_keys': json.dumps(metric_keys),
163                'location_keys': json.dumps(location_keys),
164                'tags': json.dumps(tags),
165                'params': json.dumps(params),
166                'instance_keys': self.instance_keys,
167            },
168            debug=debug
169        ).json()
170    except Exception as e:
171        error(str(e))
172
173    if 'detail' in j:
174        error(j['detail'], stack=False)
175    return [tuple(r) for r in j]

Fetch registered Pipes' keys from the API.

Parameters
  • connector_keys (Optional[List[str]], default None): The connector keys for the query.
  • metric_keys (Optional[List[str]], default None): The metric keys for the query.
  • location_keys (Optional[List[str]], default None): The location keys for the query.
  • tags (Optional[List[str]], default None): A list of tags for the query.
  • params (Optional[Dict[str, Any]], default None): A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of tuples containing pipes' keys.
def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False) -> Tuple[bool, str]:
 75def edit_pipe(
 76    self,
 77    pipe: mrsm.Pipe,
 78    patch: bool = False,
 79    debug: bool = False,
 80) -> SuccessTuple:
 81    """Submit a PATCH to the API to edit an existing Pipe object.
 82    Returns a tuple of (success_bool, response_dict).
 83    """
 84    from meerschaum.utils.debug import dprint
 85    ### NOTE: if `parameters` is supplied in the Pipe constructor,
 86    ###       then `pipe.parameters` will exist and not be fetched from the database.
 87    r_url = pipe_r_url(pipe)
 88    response = self.patch(
 89        r_url + '/edit',
 90        params={'patch': patch, 'instance_keys': self.get_pipe_instance_keys(pipe)},
 91        json=pipe.parameters,
 92        debug=debug,
 93    )
 94    if debug:
 95        dprint(response.text)
 96
 97    response_data = response.json()
 98
 99    if isinstance(response.json(), list):
100        response_tuple = response_data[0], response_data[1]
101    elif 'detail' in response.json():
102        response_tuple = response.__bool__(), response_data['detail']
103    else:
104        response_tuple = response.__bool__(), response.text
105    return response_tuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

def sync_pipe( self, pipe: meerschaum.Pipe, df: "Optional[Union['pd.DataFrame', Dict[Any, Any], str]]" = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
178def sync_pipe(
179    self,
180    pipe: mrsm.Pipe,
181    df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None,
182    chunksize: Optional[int] = -1,
183    debug: bool = False,
184    **kw: Any
185) -> SuccessTuple:
186    """Sync a DataFrame into a Pipe."""
187    from decimal import Decimal
188    from meerschaum.utils.debug import dprint
189    from meerschaum.utils.misc import json_serialize_datetime, items_str, interval_str
190    from meerschaum.config import get_config
191    from meerschaum.utils.packages import attempt_import
192    from meerschaum.utils.dataframe import get_numeric_cols, to_json
193    begin = time.perf_counter()
194    more_itertools = attempt_import('more_itertools')
195    if df is None:
196        msg = f"DataFrame is `None`. Cannot sync {pipe}."
197        return False, msg
198
199    def get_json_str(c):
200        ### allow syncing dict or JSON without needing to import pandas (for IOT devices)
201        if isinstance(c, (dict, list)):
202            return json.dumps(c, default=json_serialize_datetime)
203        return to_json(c, orient='columns')
204
205    df = json.loads(df) if isinstance(df, str) else df
206
207    _chunksize: Optional[int] = (1 if chunksize is None else (
208        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
209        else chunksize
210    ))
211    keys: List[str] = list(df.columns)
212    chunks = []
213    if hasattr(df, 'index'):
214        df = df.reset_index(drop=True)
215        is_dask = 'dask' in df.__module__
216        chunks = (
217            (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize))
218            if not is_dask
219            else [partition.compute() for partition in df.partitions]
220        )
221
222        numeric_cols = get_numeric_cols(df)
223        if numeric_cols:
224            for col in numeric_cols:
225                df[col] = df[col].apply(lambda x: f'{x:f}' if isinstance(x, Decimal) else x)
226            pipe_dtypes = pipe.dtypes
227            new_numeric_cols = [
228                col
229                for col in numeric_cols
230                if pipe_dtypes.get(col, None) != 'numeric'
231            ]
232            pipe.dtypes.update({
233                col: 'numeric'
234                for col in new_numeric_cols
235            })
236            edit_success, edit_msg = pipe.edit(debug=debug)
237            if not edit_success:
238                warn(
239                    "Failed to update new numeric columns "
240                    + f"{items_str(new_numeric_cols)}:\n{edit_msg}"
241                )
242    elif isinstance(df, dict):
243        ### `_chunks` is a dict of lists of dicts.
244        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
245        _chunks = {k: [] for k in keys}
246        for k in keys:
247            chunk_iter = more_itertools.chunked(df[k], _chunksize)
248            for l in chunk_iter:
249                _chunks[k].append({k: l})
250
251        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
252        for k, l in _chunks.items():
253            for i, c in enumerate(l):
254                try:
255                    chunks[i].update(c)
256                except IndexError:
257                    chunks.append(c)
258    elif isinstance(df, list):
259        chunks = (df[i] for i in more_itertools.chunked(df, _chunksize))
260
261    ### Send columns in case the user has defined them locally.
262    request_params = kw.copy()
263    if pipe.columns:
264        request_params['columns'] = json.dumps(pipe.columns)
265    request_params['instance_keys'] = self.get_pipe_instance_keys(pipe)
266    r_url = pipe_r_url(pipe) + '/data'
267
268    rowcount = 0
269    num_success_chunks = 0
270    for i, c in enumerate(chunks):
271        if debug:
272            dprint(f"[{self}] Posting chunk {i} to {r_url}...")
273        if len(c) == 0:
274            if debug:
275                dprint(f"[{self}] Skipping empty chunk...")
276            continue
277        json_str = get_json_str(c)
278
279        try:
280            response = self.post(
281                r_url,
282                params=request_params,
283                data=json_str,
284                debug=debug,
285            )
286        except Exception as e:
287            msg = f"Failed to post a chunk to {pipe}:\n{e}"
288            warn(msg)
289            return False, msg
290            
291        if not response:
292            return False, f"Failed to sync a chunk:\n{response.text}"
293
294        try:
295            j = json.loads(response.text)
296        except Exception as e:
297            return False, f"Failed to parse response from syncing {pipe}:\n{e}"
298
299        if isinstance(j, dict) and 'detail' in j:
300            return False, j['detail']
301
302        try:
303            j = tuple(j)
304        except Exception:
305            return False, response.text
306
307        if debug:
308            dprint("Received response: " + str(j))
309        if not j[0]:
310            return j
311
312        rowcount += len(c)
313        num_success_chunks += 1
314
315    success_tuple = True, (
316        f"It took {interval_str(timedelta(seconds=(time.perf_counter() - begin)))} "
317        + "to sync {rowcount:,} row"
318        + ('s' if rowcount != 1 else '')
319        + f" across {num_success_chunks:,} chunk" + ('s' if num_success_chunks != 1 else '') +
320        f" to {pipe}."
321    )
322    return success_tuple

Sync a DataFrame into a Pipe.

def delete_pipe( self, pipe: Optional[meerschaum.Pipe] = None, debug: bool = None) -> Tuple[bool, str]:
325def delete_pipe(
326    self,
327    pipe: Optional[mrsm.Pipe] = None,
328    debug: bool = None,        
329) -> SuccessTuple:
330    """Delete a Pipe and drop its table."""
331    if pipe is None:
332        error("Pipe cannot be None.")
333    r_url = pipe_r_url(pipe)
334    response = self.delete(
335        r_url + '/delete',
336        params={'instance_keys': self.get_pipe_instance_keys(pipe)},
337        debug=debug,
338    )
339    if debug:
340        dprint(response.text)
341
342    response_data = response.json()
343    if isinstance(response.json(), list):
344        response_tuple = response_data[0], response_data[1]
345    elif 'detail' in response.json():
346        response_tuple = response.__bool__(), response_data['detail']
347    else:
348        response_tuple = response.__bool__(), response.text
349    return response_tuple

Delete a Pipe and drop its table.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
352def get_pipe_data(
353    self,
354    pipe: mrsm.Pipe,
355    select_columns: Optional[List[str]] = None,
356    omit_columns: Optional[List[str]] = None,
357    begin: Union[str, datetime, int, None] = None,
358    end: Union[str, datetime, int, None] = None,
359    params: Optional[Dict[str, Any]] = None,
360    as_chunks: bool = False,
361    debug: bool = False,
362    **kw: Any
363) -> Union[pandas.DataFrame, None]:
364    """Fetch data from the API."""
365    r_url = pipe_r_url(pipe)
366    while True:
367        try:
368            response = self.get(
369                r_url + "/data",
370                params={
371                    'select_columns': json.dumps(select_columns),
372                    'omit_columns': json.dumps(omit_columns),
373                    'begin': begin,
374                    'end': end,
375                    'params': json.dumps(params, default=str),
376                    'instance': self.get_pipe_instance_keys(pipe),
377                    'as_chunks': as_chunks,
378                },
379                debug=debug
380            )
381            if not response.ok:
382                return None
383            j = response.json()
384        except Exception as e:
385            warn(f"Failed to get data for {pipe}:\n{e}")
386            return None
387        if isinstance(j, dict) and 'detail' in j:
388            return False, j['detail']
389        break
390
391    from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df
392    from meerschaum.utils.dtypes import are_dtypes_equal
393    try:
394        df = parse_df_datetimes(
395            j,
396            ignore_cols=[
397                col
398                for col, dtype in pipe.dtypes.items()
399                if not are_dtypes_equal(str(dtype), 'datetime')
400            ],
401            strip_timezone=(pipe.tzinfo is None),
402            debug=debug,
403        )
404    except Exception as e:
405        warn(f"Failed to parse response for {pipe}:\n{e}")
406        return None
407
408    if len(df.columns) == 0:
409        return add_missing_cols_to_df(df, pipe.dtypes)
410
411    return df

Fetch data from the API.

def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False) -> Union[int, str, NoneType]:
414def get_pipe_id(
415    self,
416    pipe: mrsm.Pipe,
417    debug: bool = False,
418) -> Union[int, str, None]:
419    """Get a Pipe's ID from the API."""
420    from meerschaum.utils.misc import is_int
421    r_url = pipe_r_url(pipe)
422    response = self.get(
423        r_url + '/id',
424        params={
425            'instance': self.get_pipe_instance_keys(pipe),
426        },
427        debug=debug,
428    )
429    if debug:
430        dprint(f"Got pipe ID: {response.text}")
431    try:
432        if is_int(response.text):
433            return int(response.text)
434        if response.text and response.text[0] != '{':
435            return response.text
436    except Exception as e:
437        warn(f"Failed to get the ID for {pipe}:\n{e}")
438    return None

Get a Pipe's ID from the API.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
441def get_pipe_attributes(
442    self,
443    pipe: mrsm.Pipe,
444    debug: bool = False,
445) -> Dict[str, Any]:
446    """Get a Pipe's attributes from the API
447
448    Parameters
449    ----------
450    pipe: meerschaum.Pipe
451        The pipe whose attributes we are fetching.
452        
453    Returns
454    -------
455    A dictionary of a pipe's attributes.
456    If the pipe does not exist, return an empty dictionary.
457    """
458    r_url = pipe_r_url(pipe)
459    response = self.get(
460        r_url + '/attributes',
461        params={
462            'instance': self.get_pipe_instance_keys(pipe),
463        },
464        debug=debug
465    )
466    try:
467        return json.loads(response.text)
468    except Exception as e:
469        warn(f"Failed to get the attributes for {pipe}:\n{e}")
470    return {}

Get a Pipe's attributes from the API

Parameters
Returns
  • A dictionary of a pipe's attributes.
  • If the pipe does not exist, return an empty dictionary.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
473def get_sync_time(
474    self,
475    pipe: mrsm.Pipe,
476    params: Optional[Dict[str, Any]] = None,
477    newest: bool = True,
478    debug: bool = False,
479) -> Union[datetime, int, None]:
480    """Get a Pipe's most recent datetime value from the API.
481
482    Parameters
483    ----------
484    pipe: meerschaum.Pipe
485        The pipe to select from.
486
487    params: Optional[Dict[str, Any]], default None
488        Optional params dictionary to build the WHERE clause.
489
490    newest: bool, default True
491        If `True`, get the most recent datetime (honoring `params`).
492        If `False`, get the oldest datetime (ASC instead of DESC).
493
494    Returns
495    -------
496    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
497    rounded down to the closest minute.
498    """
499    from meerschaum.utils.misc import is_int
500    from meerschaum.utils.warnings import warn
501    r_url = pipe_r_url(pipe)
502    response = self.get(
503        r_url + '/sync_time',
504        json=params,
505        params={
506            'instance': self.get_pipe_instance_keys(pipe),
507            'newest': newest,
508            'debug': debug,
509        },
510        debug=debug,
511    )
512    if not response:
513        warn(f"Failed to get the sync time for {pipe}:\n" + response.text)
514        return None
515
516    j = response.json()
517    if j is None:
518        dt = None
519    else:
520        try:
521            dt = (
522                datetime.fromisoformat(j)
523                if not is_int(j)
524                else int(j)
525            )
526        except Exception as e:
527            warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}")
528            dt = None
529    return dt

Get a Pipe's most recent datetime value from the API.

Parameters
  • pipe (meerschaum.Pipe): The pipe to select from.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
Returns
  • The most recent (or oldest if newest is False) datetime of a pipe,
  • rounded down to the closest minute.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
532def pipe_exists(
533    self,
534    pipe: mrsm.Pipe,
535    debug: bool = False
536) -> bool:
537    """Check the API to see if a Pipe exists.
538
539    Parameters
540    ----------
541    pipe: 'meerschaum.Pipe'
542        The pipe which were are querying.
543        
544    Returns
545    -------
546    A bool indicating whether a pipe's underlying table exists.
547    """
548    from meerschaum.utils.debug import dprint
549    from meerschaum.utils.warnings import warn
550    r_url = pipe_r_url(pipe)
551    response = self.get(
552        r_url + '/exists',
553        params={
554            'instance': self.get_pipe_instance_keys(pipe),
555        },
556        debug=debug,
557    )
558    if not response:
559        warn(f"Failed to check if {pipe} exists:\n{response.text}")
560        return False
561    if debug:
562        dprint("Received response: " + str(response.text))
563    j = response.json()
564    if isinstance(j, dict) and 'detail' in j:
565        warn(j['detail'])
566    return j

Check the API to see if a Pipe exists.

Parameters
Returns
  • A bool indicating whether a pipe's underlying table exists.
def create_metadata(self, debug: bool = False) -> bool:
569def create_metadata(
570    self,
571    debug: bool = False
572) -> bool:
573    """Create metadata tables.
574
575    Returns
576    -------
577    A bool indicating success.
578    """
579    from meerschaum.utils.debug import dprint
580    from meerschaum.config.static import STATIC_CONFIG
581    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
582    response = self.post(r_url, debug=debug)
583    if debug:
584        dprint("Create metadata response: {response.text}")
585    try:
586        _ = json.loads(response.text)
587    except Exception as e:
588        warn(f"Failed to create metadata on {self}:\n{e}")
589    return False

Create metadata tables.

Returns
  • A bool indicating success.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
592def get_pipe_rowcount(
593    self,
594    pipe: mrsm.Pipe,
595    begin: Union[str, datetime, int, None] = None,
596    end: Union[str, datetime, int, None] = None,
597    params: Optional[Dict[str, Any]] = None,
598    remote: bool = False,
599    debug: bool = False,
600) -> int:
601    """Get a pipe's row count from the API.
602
603    Parameters
604    ----------
605    pipe: 'meerschaum.Pipe':
606        The pipe whose row count we are counting.
607        
608    begin: Union[str, datetime, int, None], default None
609        If provided, bound the count by this datetime.
610
611    end: Union[str, datetime, int, None], default None
612        If provided, bound the count by this datetime.
613
614    params: Optional[Dict[str, Any]], default None
615        If provided, bound the count by these parameters.
616
617    remote: bool, default False
618        If `True`, return the rowcount for the fetch definition.
619
620    Returns
621    -------
622    The number of rows in the pipe's table, bound the given parameters.
623    If the table does not exist, return 0.
624    """
625    r_url = pipe_r_url(pipe)
626    response = self.get(
627        r_url + "/rowcount",
628        json = params,
629        params = {
630            'begin': begin,
631            'end': end,
632            'remote': remote,
633            'instance': self.get_pipe_instance_keys(pipe),
634        },
635        debug = debug
636    )
637    if not response:
638        warn(f"Failed to get the rowcount for {pipe}:\n{response.text}")
639        return 0
640    try:
641        return int(json.loads(response.text))
642    except Exception as e:
643        warn(f"Failed to get the rowcount for {pipe}:\n{e}")
644    return 0

Get a pipe's row count from the API.

Parameters
  • pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
  • begin (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
  • end (Union[str, datetime, int, None], default None): If provided, bound the count by this datetime.
  • params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
  • remote (bool, default False): If True, return the rowcount for the fetch definition.
Returns
  • The number of rows in the pipe's table, bound the given parameters.
  • If the table does not exist, return 0.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
647def drop_pipe(
648    self,
649    pipe: mrsm.Pipe,
650    debug: bool = False
651) -> SuccessTuple:
652    """
653    Drop a pipe's table but maintain its registration.
654
655    Parameters
656    ----------
657    pipe: meerschaum.Pipe:
658        The pipe to be dropped.
659        
660    Returns
661    -------
662    A success tuple (bool, str).
663    """
664    from meerschaum.utils.warnings import error
665    from meerschaum.utils.debug import dprint
666    if pipe is None:
667        error(f"Pipe cannot be None.")
668    r_url = pipe_r_url(pipe)
669    response = self.delete(
670        r_url + '/drop',
671        params={
672            'instance': self.get_pipe_instance_keys(pipe),
673        },
674        debug=debug,
675    )
676    if debug:
677        dprint(response.text)
678
679    try:
680        data = response.json()
681    except Exception as e:
682        return False, f"Failed to drop {pipe}."
683
684    if isinstance(data, list):
685        response_tuple = data[0], data[1]
686    elif 'detail' in response.json():
687        response_tuple = response.__bool__(), data['detail']
688    else:
689        response_tuple = response.__bool__(), response.text
690
691    return response_tuple

Drop a pipe's table but maintain its registration.

Parameters
Returns
  • A success tuple (bool, str).
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, NoneType] = None, end: Union[str, datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
694def clear_pipe(
695    self,
696    pipe: mrsm.Pipe,
697    begin: Union[str, datetime, int, None] = None,
698    end: Union[str, datetime, int, None] = None,
699    params: Optional[Dict[str, Any]] = None,
700    debug: bool = False,
701    **kw
702) -> SuccessTuple:
703    """
704    Delete rows in a pipe's table.
705
706    Parameters
707    ----------
708    pipe: meerschaum.Pipe
709        The pipe with rows to be deleted.
710        
711    Returns
712    -------
713    A success tuple.
714    """
715    r_url = pipe_r_url(pipe)
716    response = self.delete(
717        r_url + '/clear',
718        params={
719            'begin': begin,
720            'end': end,
721            'params': json.dumps(params),
722            'instance': self.get_pipe_instance_keys(pipe),
723        },
724        debug=debug,
725    )
726    if debug:
727        dprint(response.text)
728
729    try:
730        data = response.json()
731    except Exception as e:
732        return False, f"Failed to clear {pipe} with constraints {begin=}, {end=}, {params=}."
733
734    if isinstance(data, list):
735        response_tuple = data[0], data[1]
736    elif 'detail' in response.json():
737        response_tuple = response.__bool__(), data['detail']
738    else:
739        response_tuple = response.__bool__(), response.text
740
741    return response_tuple

Delete rows in a pipe's table.

Parameters
Returns
  • A success tuple.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
744def get_pipe_columns_types(
745    self,
746    pipe: mrsm.Pipe,
747    debug: bool = False,
748) -> Union[Dict[str, str], None]:
749    """
750    Fetch the columns and types of the pipe's table.
751
752    Parameters
753    ----------
754    pipe: meerschaum.Pipe
755        The pipe whose columns to be queried.
756
757    Returns
758    -------
759    A dictionary mapping column names to their database types.
760
761    Examples
762    --------
763    >>> {
764    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
765    ...   'id': 'BIGINT',
766    ...   'val': 'DOUBLE PRECISION',
767    ... }
768    >>>
769    """
770    r_url = pipe_r_url(pipe) + '/columns/types'
771    response = self.get(
772        r_url,
773        params={
774            'instance': self.get_pipe_instance_keys(pipe),
775        },
776        debug=debug,
777    )
778    j = response.json()
779    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
780        warn(j['detail'])
781        return None
782    if not isinstance(j, dict):
783        warn(response.text)
784        return None
785    return j

Fetch the columns and types of the pipe's table.

Parameters
Returns
  • A dictionary mapping column names to their database types.
Examples
>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
def get_pipe_columns_indices( self, pipe: meerschaum.Pipe, debug: bool = False) -> Optional[Dict[str, str]]:
788def get_pipe_columns_indices(
789    self,
790    pipe: mrsm.Pipe,
791    debug: bool = False,
792) -> Union[Dict[str, str], None]:
793    """
794    Fetch the index information for a pipe.
795
796    Parameters
797    ----------
798    pipe: mrsm.Pipe
799        The pipe whose columns to be queried.
800
801    Returns
802    -------
803    A dictionary mapping column names to a list of associated index information.
804    """
805    r_url = pipe_r_url(pipe) + '/columns/indices'
806    response = self.get(
807        r_url,
808        params={
809            'instance': self.get_pipe_instance_keys(pipe),
810        },
811        debug=debug,
812    )
813    j = response.json()
814    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
815        warn(j['detail'])
816        return None
817    if not isinstance(j, dict):
818        warn(response.text)
819        return None
820    return j

Fetch the index information for a pipe.

Parameters
  • pipe (mrsm.Pipe): The pipe whose columns to be queried.
Returns
  • A dictionary mapping column names to a list of associated index information.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, str, int] = '', end: Union[datetime.datetime, int] = None, params: 'Optional[Dict, Any]' = None, debug: bool = False, **kw: Any) -> Iterator[pandas.core.frame.DataFrame]:
16def fetch(
17        self,
18        pipe: mrsm.Pipe,
19        begin: Union[datetime, str, int] = '',
20        end: Union[datetime, int] = None,
21        params: Optional[Dict, Any] = None,
22        debug: bool = False,
23        **kw: Any
24    ) -> Iterator['pd.DataFrame']:
25    """Get the Pipe data from the remote Pipe."""
26    from meerschaum.utils.debug import dprint
27    from meerschaum.utils.warnings import warn, error
28    from meerschaum.config._patch import apply_patch_to_config
29
30    fetch_params = pipe.parameters.get('fetch', {})
31    if not fetch_params:
32        warn(f"Missing 'fetch' parameters for {pipe}.", stack=False)
33        return None
34
35    pipe_meta = fetch_params.get('pipe', {})
36    ### Legacy: check for `connector_keys`, etc. at the root.
37    if not pipe_meta:
38        ck, mk, lk = (
39            fetch_params.get('connector_keys', None),
40            fetch_params.get('metric_key', None),
41            fetch_params.get('location_key', None),
42        )
43        if not ck or not mk:
44            warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False)
45            return None
46
47        pipe_meta.update({
48            'connector': ck,
49            'metric': mk,
50            'location': lk,
51        })
52
53    pipe_meta['instance'] = self
54    source_pipe = mrsm.Pipe(**pipe_meta)
55
56    _params = copy.deepcopy(params) if params is not None else {}
57    _params = apply_patch_to_config(_params, fetch_params.get('params', {}))
58    select_columns = fetch_params.get('select_columns', [])
59    omit_columns = fetch_params.get('omit_columns', [])
60
61    return source_pipe.get_data(
62        select_columns = select_columns,
63        omit_columns = omit_columns,
64        begin = begin,
65        end = end,
66        params = _params,
67        debug = debug,
68        as_iterator = True,
69    )

Get the Pipe data from the remote Pipe.

def register_plugin( self, plugin: meerschaum.Plugin, make_archive: bool = True, debug: bool = False) -> Tuple[bool, str]:
24def register_plugin(
25    self,
26    plugin: mrsm.core.Plugin,
27    make_archive: bool = True,
28    debug: bool = False,
29) -> SuccessTuple:
30    """Register a plugin and upload its archive."""
31    import json
32    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
33    file_pointer = open(archive_path, 'rb')
34    files = {'archive': file_pointer}
35    metadata = {
36        'version': plugin.version,
37        'attributes': json.dumps(plugin.attributes),
38    }
39    r_url = plugin_r_url(plugin)
40    try:
41        response = self.post(r_url, files=files, params=metadata, debug=debug)
42    except Exception:
43        return False, f"Failed to register plugin '{plugin}'."
44    finally:
45        file_pointer.close()
46
47    try:
48        success, msg = json.loads(response.text)
49    except Exception:
50        return False, response.text
51
52    return success, msg

Register a plugin and upload its archive.

def install_plugin( self, name: str, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
55def install_plugin(
56    self,
57    name: str,
58    skip_deps: bool = False,
59    force: bool = False,
60    debug: bool = False
61) -> SuccessTuple:
62    """Download and attempt to install a plugin from the API."""
63    import os
64    import pathlib
65    import json
66    from meerschaum.core import Plugin
67    from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH
68    from meerschaum.utils.debug import dprint
69    from meerschaum.utils.packages import attempt_import
70    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
71    r_url = plugin_r_url(name)
72    dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
73    if debug:
74        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
75    archive_path = self.wget(r_url, dest, debug=debug) 
76    is_binary = binaryornot_check.is_binary(str(archive_path))
77    if not is_binary:
78        fail_msg = f"Failed to download binary for plugin '{name}'."
79        try:
80            with open(archive_path, 'r') as f:
81                j = json.load(f)
82            if isinstance(j, list):
83                success, msg = tuple(j)
84            elif isinstance(j, dict) and 'detail' in j:
85                success, msg = False, fail_msg
86        except Exception:
87            success, msg = False, fail_msg
88        return success, msg
89    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
90    return plugin.install(skip_deps=skip_deps, force=force, debug=debug)

Download and attempt to install a plugin from the API.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False) -> Tuple[bool, str]:
156def delete_plugin(
157    self,
158    plugin: mrsm.core.Plugin,
159    debug: bool = False
160) -> SuccessTuple:
161    """Delete a plugin from an API repository."""
162    import json
163    r_url = plugin_r_url(plugin)
164    try:
165        response = self.delete(r_url, debug=debug)
166    except Exception:
167        return False, f"Failed to delete plugin '{plugin}'."
168
169    try:
170        success, msg = json.loads(response.text)
171    except Exception:
172        return False, response.text
173
174    return success, msg

Delete a plugin from an API repository.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) -> List[str]:
 93def get_plugins(
 94    self,
 95    user_id: Optional[int] = None,
 96    search_term: Optional[str] = None,
 97    debug: bool = False
 98) -> List[str]:
 99    """Return a list of registered plugin names.
100
101    Parameters
102    ----------
103    user_id: Optional[int], default None
104        If specified, return all plugins from a certain user.
105
106    search_term: Optional[str], default None
107        If specified, return plugins beginning with this string.
108
109    Returns
110    -------
111    A list of plugin names.
112    """
113    import json
114    from meerschaum.utils.warnings import error
115    from meerschaum.config.static import STATIC_CONFIG
116    response = self.get(
117        STATIC_CONFIG['api']['endpoints']['plugins'],
118        params = {'user_id': user_id, 'search_term': search_term},
119        use_token = True,
120        debug = debug
121    )
122    if not response:
123        return []
124    plugins = json.loads(response.text)
125    if not isinstance(plugins, list):
126        error(response.text)
127    return plugins

Return a list of registered plugin names.

Parameters
  • user_id (Optional[int], default None): If specified, return all plugins from a certain user.
  • search_term (Optional[str], default None): If specified, return plugins beginning with this string.
Returns
  • A list of plugin names.
def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
130def get_plugin_attributes(
131    self,
132    plugin: mrsm.core.Plugin,
133    debug: bool = False
134) -> Dict[str, Any]:
135    """
136    Return a plugin's attributes.
137    """
138    import json
139    from meerschaum.utils.warnings import warn, error
140    r_url = plugin_r_url(plugin) + '/attributes'
141    response = self.get(r_url, use_token=True, debug=debug)
142    attributes = response.json()
143    if isinstance(attributes, str) and attributes and attributes[0] == '{':
144        try:
145            attributes = json.loads(attributes)
146        except Exception:
147            pass
148    if not isinstance(attributes, dict):
149        error(response.text)
150    elif not response and 'detail' in attributes:
151        warn(attributes['detail'])
152        return {}
153    return attributes

Return a plugin's attributes.

def login( self, debug: bool = False, warn: bool = True, **kw: Any) -> Tuple[bool, str]:
19def login(
20    self,
21    debug: bool = False,
22    warn: bool = True,
23    **kw: Any
24) -> SuccessTuple:
25    """Log in and set the session token."""
26    try:
27        login_data = {
28            'username': self.username,
29            'password': self.password,
30        }
31    except AttributeError:
32        return False, f"Please login with the command `login {self}`."
33
34    response = self.post(
35        STATIC_CONFIG['api']['endpoints']['login'],
36        data=login_data,
37        use_token=False,
38        debug=debug,
39    )
40    if response:
41        msg = f"Successfully logged into '{self}' as user '{login_data['username']}'."
42        self._token = json.loads(response.text)['access_token']
43        self._expires = datetime.datetime.strptime(
44            json.loads(response.text)['expires'], 
45            '%Y-%m-%dT%H:%M:%S.%f'
46        )
47    else:
48        msg = (
49            f"Failed to log into '{self}' as user '{login_data['username']}'.\n" +
50            f"    Please verify login details for connector '{self}'."
51        )
52        if warn and not self.__dict__.get('_emitted_warning', False):
53            _warn(msg, stack=False)
54            self._emitted_warning = True
55
56    return response.__bool__(), msg

Log in and set the session token.

def test_connection(self, **kw: Any) -> Optional[bool]:
59def test_connection(
60    self,
61    **kw: Any
62) -> Union[bool, None]:
63    """Test if a successful connection to the API may be made."""
64    from meerschaum.connectors.poll import retry_connect
65    _default_kw = {
66        'max_retries': 1, 'retry_wait': 0, 'warn': False,
67        'connector': self, 'enforce_chaining': False,
68        'enforce_login': False,
69    }
70    _default_kw.update(kw)
71    try:
72        return retry_connect(**_default_kw)
73    except Exception:
74        return False

Test if a successful connection to the API may be made.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
65def register_user(
66        self,
67        user: 'meerschaum.core.User',
68        debug: bool = False,
69        **kw: Any
70    ) -> SuccessTuple:
71    """Register a new user."""
72    import json
73    from meerschaum.config.static import STATIC_CONFIG
74    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register"
75    data = {
76        'username': user.username,
77        'password': user.password,
78        'attributes': json.dumps(user.attributes),
79    }
80    if user.type:
81        data['type'] = user.type
82    if user.email:
83        data['email'] = user.email
84    response = self.post(r_url, data=data, debug=debug)
85    try:
86        _json = json.loads(response.text)
87        if isinstance(_json, dict) and 'detail' in _json:
88            return False, _json['detail']
89        success_tuple = tuple(_json)
90    except Exception:
91        msg = response.text if response else f"Failed to register user '{user}'."
92        return False, msg
93
94    return tuple(success_tuple)

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[int]:
 97def get_user_id(
 98        self,
 99        user: 'meerschaum.core.User',
100        debug: bool = False,
101        **kw: Any
102    ) -> Optional[int]:
103    """Get a user's ID."""
104    from meerschaum.config.static import STATIC_CONFIG
105    import json
106    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id"
107    response = self.get(r_url, debug=debug, **kw)
108    try:
109        user_id = int(json.loads(response.text))
110    except Exception as e:
111        user_id = None
112    return user_id

Get a user's ID.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
13def get_users(
14        self,
15        debug: bool = False,
16        **kw : Any
17    ) -> List[str]:
18    """
19    Return a list of registered usernames.
20    """
21    from meerschaum.config.static import STATIC_CONFIG
22    import json
23    response = self.get(
24        f"{STATIC_CONFIG['api']['endpoints']['users']}",
25        debug = debug,
26        use_token = True,
27    )
28    if not response:
29        return []
30    try:
31        return response.json()
32    except Exception as e:
33        return []

Return a list of registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
35def edit_user(
36        self,
37        user: 'meerschaum.core.User',
38        debug: bool = False,
39        **kw: Any
40    ) -> SuccessTuple:
41    """Edit an existing user."""
42    import json
43    from meerschaum.config.static import STATIC_CONFIG
44    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
45    data = {
46        'username': user.username,
47        'password': user.password,
48        'type': user.type,
49        'email': user.email,
50        'attributes': json.dumps(user.attributes),
51    }
52    response = self.post(r_url, data=data, debug=debug)
53    try:
54        _json = json.loads(response.text)
55        if isinstance(_json, dict) and 'detail' in _json:
56            return False, _json['detail']
57        success_tuple = tuple(_json)
58    except Exception as e:
59        msg = response.text if response else f"Failed to edit user '{user}'."
60        return False, msg
61
62    return tuple(success_tuple)

Edit an existing user.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
114def delete_user(
115        self,
116        user: 'meerschaum.core.User',
117        debug: bool = False,
118        **kw: Any
119    ) -> SuccessTuple:
120    """Delete a user."""
121    from meerschaum.config.static import STATIC_CONFIG
122    import json
123    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}"
124    response = self.delete(r_url, debug=debug)
125    try:
126        _json = json.loads(response.text)
127        if isinstance(_json, dict) and 'detail' in _json:
128            return False, _json['detail']
129        success_tuple = tuple(_json)
130    except Exception as e:
131        success_tuple = False, f"Failed to delete user '{user.username}'."
132    return success_tuple

Delete a user.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
155def get_user_password_hash(
156        self,
157        user: 'meerschaum.core.User',
158        debug: bool = False,
159        **kw: Any
160    ) -> Optional[str]:
161    """If configured, get a user's password hash."""
162    from meerschaum.config.static import STATIC_CONFIG
163    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
164    response = self.get(r_url, debug=debug, **kw)
165    if not response:
166        return None
167    return response.json()

If configured, get a user's password hash.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
169def get_user_type(
170        self,
171        user: 'meerschaum.core.User',
172        debug: bool = False,
173        **kw: Any
174    ) -> Optional[str]:
175    """If configured, get a user's type."""
176    from meerschaum.config.static import STATIC_CONFIG
177    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type'
178    response = self.get(r_url, debug=debug, **kw)
179    if not response:
180        return None
181    return response.json()

If configured, get a user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw) -> int:
134def get_user_attributes(
135        self,
136        user: 'meerschaum.core.User',
137        debug: bool = False,
138        **kw
139    ) -> int:
140    """Get a user's attributes."""
141    from meerschaum.config.static import STATIC_CONFIG
142    import json
143    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes"
144    response = self.get(r_url, debug=debug, **kw)
145    try:
146        attributes = json.loads(response.text)
147    except Exception as e:
148        attributes = None
149    return attributes

Get a user's attributes.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[APIConnector, Dict[str, Union[str, int]]]:
13@classmethod
14def from_uri(
15    cls,
16    uri: str,
17    label: Optional[str] = None,
18    as_dict: bool = False,
19) -> Union[
20        'meerschaum.connectors.APIConnector',
21        Dict[str, Union[str, int]],
22    ]:
23    """
24    Create a new APIConnector from a URI string.
25
26    Parameters
27    ----------
28    uri: str
29        The URI connection string.
30
31    label: Optional[str], default None
32        If provided, use this as the connector label.
33        Otherwise use the determined database name.
34
35    as_dict: bool, default False
36        If `True`, return a dictionary of the keyword arguments
37        necessary to create a new `APIConnector`, otherwise create a new object.
38
39    Returns
40    -------
41    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
42    """
43    from meerschaum.connectors.sql import SQLConnector
44    params = SQLConnector.parse_uri(uri)
45    if 'host' not in params:
46        error("No host was found in the provided URI.")
47    params['protocol'] = params.pop('flavor')
48    params['label'] = label or (
49        (
50            (params['username'] + '@' if 'username' in params else '')
51            + params['host']
52        ).lower()
53    )
54
55    return cls(**params) if not as_dict else params

Create a new APIConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.
Returns
  • A new APIConnector object or a dictionary of attributes (if as_dict is True).
def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
28def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
29    """
30    Return a dictionary of remote jobs.
31    """
32    response = self.get(JOBS_ENDPOINT, debug=debug)
33    if not response:
34        warn(f"Failed to get remote jobs from {self}.")
35        return {}
36    return {
37        name: Job(
38            name,
39            job_meta['sysargs'],
40            executor_keys=str(self),
41            _properties=job_meta['daemon']['properties']
42        )
43        for name, job_meta in response.json().items()
44    }

Return a dictionary of remote jobs.

def get_job(self, name: str, debug: bool = False) -> meerschaum.Job:
47def get_job(self, name: str, debug: bool = False) -> Job:
48    """
49    Return a single Job object.
50    """
51    metadata = self.get_job_metadata(name, debug=debug)
52    if not metadata:
53        raise ValueError(f"Job '{name}' does not exist.")
54
55    return Job(
56        name,
57        metadata['sysargs'],
58        executor_keys=str(self),
59        _properties=metadata['daemon']['properties'],
60    )

Return a single Job object.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 63def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 64    """
 65    Return the metadata for a single job.
 66    """
 67    now = time.perf_counter()
 68    _job_metadata_cache = self.__dict__.get('_job_metadata_cache', None)
 69    _job_metadata_timestamp = (
 70        _job_metadata_cache.get(name, {}).get('timestamp', None)
 71    ) if _job_metadata_cache is not None else None
 72
 73    if (
 74        _job_metadata_timestamp is not None
 75        and (now - _job_metadata_timestamp) < JOB_METADATA_CACHE_SECONDS
 76    ):
 77        if debug:
 78            dprint(f"Returning cached metadata for job '{name}'.")
 79        return _job_metadata_cache[name]['metadata']
 80
 81    response = self.get(JOBS_ENDPOINT + f"/{name}", debug=debug)
 82    if not response:
 83        if debug:
 84            msg = (
 85                response.json()['detail']
 86                if 'detail' in response.text
 87                else response.text
 88            )
 89            warn(f"Failed to get metadata for job '{name}':\n{msg}")
 90        return {}
 91
 92    metadata = response.json()
 93    if _job_metadata_cache is None:
 94        self._job_metadata_cache = {}
 95
 96    self._job_metadata_cache[name] = {
 97        'timestamp': now,
 98        'metadata': metadata,
 99    }
100    return metadata

Return the metadata for a single job.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
102def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
103    """
104    Return the daemon properties for a single job.
105    """
106    metadata = self.get_job_metadata(name, debug=debug)
107    return metadata.get('daemon', {}).get('properties', {})

Return the daemon properties for a single job.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
149def get_job_exists(self, name: str, debug: bool = False) -> bool:
150    """
151    Return whether a job exists.
152    """
153    response = self.get(JOBS_ENDPOINT + f'/{name}/exists', debug=debug)
154    if not response:
155        warn(f"Failed to determine whether job '{name}' exists.")
156        return False
157
158    return response.json()

Return whether a job exists.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
161def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
162    """
163    Delete a job.
164    """
165    response = self.delete(JOBS_ENDPOINT + f"/{name}", debug=debug)
166    if not response:
167        if 'detail' in response.text:
168            return False, response.json()['detail']
169
170        return False, response.text
171
172    return tuple(response.json())

Delete a job.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
175def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
176    """
177    Start a job.
178    """
179    response = self.post(JOBS_ENDPOINT + f"/{name}/start", debug=debug)
180    if not response:
181        if 'detail' in response.text:
182            return False, response.json()['detail']
183        return False, response.text
184
185    return tuple(response.json())

Start a job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, str]] = None, debug: bool = False) -> Tuple[bool, str]:
188def create_job(
189    self,
190    name: str,
191    sysargs: List[str],
192    properties: Optional[Dict[str, str]] = None,
193    debug: bool = False,
194) -> SuccessTuple:
195    """
196    Create a job.
197    """
198    response = self.post(
199        JOBS_ENDPOINT + f"/{name}",
200        json={
201            'sysargs': sysargs,
202            'properties': properties,
203        },
204        debug=debug,
205    )
206    if not response:
207        if 'detail' in response.text:
208            return False, response.json()['detail']
209        return False, response.text
210
211    return tuple(response.json())

Create a job.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
214def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
215    """
216    Stop a job.
217    """
218    response = self.post(JOBS_ENDPOINT + f"/{name}/stop", debug=debug)
219    if not response:
220        if 'detail' in response.text:
221            return False, response.json()['detail']
222        return False, response.text
223
224    return tuple(response.json())

Stop a job.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
227def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
228    """
229    Pause a job.
230    """
231    response = self.post(JOBS_ENDPOINT + f"/{name}/pause", debug=debug)
232    if not response:
233        if 'detail' in response.text:
234            return False, response.json()['detail']
235        return False, response.text
236
237    return tuple(response.json())

Pause a job.

def get_logs(self, name: str, debug: bool = False) -> str:
240def get_logs(self, name: str, debug: bool = False) -> str:
241    """
242    Return the logs for a job.
243    """
244    response = self.get(LOGS_ENDPOINT + f"/{name}")
245    if not response:
246        raise ValueError(f"Cannot fetch logs for job '{name}':\n{response.text}")
247
248    return response.json()

Return the logs for a job.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
251def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
252    """
253    Return the job's manual stop time.
254    """
255    response = self.get(JOBS_ENDPOINT + f"/{name}/stop_time")
256    if not response:
257        warn(f"Failed to get stop time for job '{name}':\n{response.text}")
258        return None
259
260    data = response.json()
261    if data is None:
262        return None
263
264    return datetime.fromisoformat(data)

Return the job's manual stop time.

def monitor_logs( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[NoneType], str], stop_callback_function: Callable[[NoneType], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
348def monitor_logs(
349    self,
350    name: str,
351    callback_function: Callable[[Any], Any],
352    input_callback_function: Callable[[None], str],
353    stop_callback_function: Callable[[None], str],
354    stop_on_exit: bool = False,
355    strip_timestamps: bool = False,
356    accept_input: bool = True,
357    debug: bool = False,
358):
359    """
360    Monitor a job's log files and execute a callback with the changes.
361    """
362    return asyncio.run(
363        self.monitor_logs_async(
364            name,
365            callback_function,
366            input_callback_function=input_callback_function,
367            stop_callback_function=stop_callback_function,
368            stop_on_exit=stop_on_exit,
369            strip_timestamps=strip_timestamps,
370            accept_input=accept_input,
371            debug=debug
372        )
373    )

Monitor a job's log files and execute a callback with the changes.

async def monitor_logs_async( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[], str], stop_callback_function: Callable[[Tuple[bool, str]], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
267async def monitor_logs_async(
268    self,
269    name: str,
270    callback_function: Callable[[Any], Any],
271    input_callback_function: Callable[[], str],
272    stop_callback_function: Callable[[SuccessTuple], str],
273    stop_on_exit: bool = False,
274    strip_timestamps: bool = False,
275    accept_input: bool = True,
276    debug: bool = False,
277):
278    """
279    Monitor a job's log files and await a callback with the changes.
280    """
281    import traceback
282    from meerschaum.jobs import StopMonitoringLogs
283    from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
284
285    websockets, websockets_exceptions = mrsm.attempt_import('websockets', 'websockets.exceptions')
286    protocol = 'ws' if self.URI.startswith('http://') else 'wss'
287    port = self.port if 'port' in self.__dict__ else ''
288    uri = f"{protocol}://{self.host}:{port}{LOGS_ENDPOINT}/{name}/ws"
289
290    async def _stdin_callback(client):
291        if input_callback_function is None:
292            return
293
294        if asyncio.iscoroutinefunction(input_callback_function):
295            data = await input_callback_function()
296        else:
297            data = input_callback_function()
298
299        await client.send(data)
300
301    async def _stop_callback(client):
302        try:
303            result = tuple(json.loads(await client.recv()))
304        except Exception as e:
305            warn(traceback.format_exc())
306            result = False, str(e)
307
308        if stop_callback_function is not None:
309            if asyncio.iscoroutinefunction(stop_callback_function):
310                await stop_callback_function(result)
311            else:
312                stop_callback_function(result)
313
314        if stop_on_exit:
315            raise StopMonitoringLogs
316
317    message_callbacks = {
318        JOBS_STDIN_MESSAGE: _stdin_callback,
319        JOBS_STOP_MESSAGE: _stop_callback,
320    }
321
322    async with websockets.connect(uri) as websocket:
323        try:
324            await websocket.send(self.token or 'no-login')
325        except websockets_exceptions.ConnectionClosedOK:
326            pass
327
328        while True:
329            try:
330                response = await websocket.recv()
331                callback = message_callbacks.get(response, None)
332                if callback is not None:
333                    await callback(websocket)
334                    continue
335
336                if strip_timestamps:
337                    response = strip_timestamp_from_line(response)
338
339                if asyncio.iscoroutinefunction(callback_function):
340                    await callback_function(response)
341                else:
342                    callback_function(response)
343            except (KeyboardInterrupt, StopMonitoringLogs):
344                await websocket.close()
345                break

Monitor a job's log files and await a callback with the changes.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
375def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
376    """
377    Return whether a remote job is blocking on stdin.
378    """
379    response = self.get(JOBS_ENDPOINT + f'/{name}/is_blocking_on_stdin', debug=debug)
380    if not response:
381        return False
382
383    return response.json()

Return whether a remote job is blocking on stdin.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
116def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
117    """
118    Return a job's `began` timestamp, if it exists.
119    """
120    properties = self.get_job_properties(name, debug=debug)
121    began_str = properties.get('daemon', {}).get('began', None)
122    if began_str is None:
123        return None
124
125    return began_str

Return a job's began timestamp, if it exists.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
127def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
128    """
129    Return a job's `ended` timestamp, if it exists.
130    """
131    properties = self.get_job_properties(name, debug=debug)
132    ended_str = properties.get('daemon', {}).get('ended', None)
133    if ended_str is None:
134        return None
135
136    return ended_str

Return a job's ended timestamp, if it exists.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
138def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
139    """
140    Return a job's `paused` timestamp, if it exists.
141    """
142    properties = self.get_job_properties(name, debug=debug)
143    paused_str = properties.get('daemon', {}).get('paused', None)
144    if paused_str is None:
145        return None
146
147    return paused_str

Return a job's paused timestamp, if it exists.

def get_job_status(self, name: str, debug: bool = False) -> str:
109def get_job_status(self, name: str, debug: bool = False) -> str:
110    """
111    Return the job's status.
112    """
113    metadata = self.get_job_metadata(name, debug=debug)
114    return metadata.get('status', 'stopped')

Return the job's status.