Module meerschaum.connectors.api.APIConnector

Interact with Meerschaum APIs. May be chained together (see 'meerschaum:api_instance' in your config.yaml).

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Interact with Meerschaum APIs. May be chained together (see 'meerschaum:api_instance' in your config.yaml).
"""

import datetime
from meerschaum.utils.typing import Optional
from meerschaum.connectors import Connector
from meerschaum.utils.warnings import warn, error
from meerschaum.utils.packages import attempt_import

required_attributes = {
    'host',
}

class APIConnector(Connector):
    """
    Connect to a Meerschaum API instance.
    """

    IS_INSTANCE: bool = True

    from ._delete import delete
    from ._post import post
    from ._patch import patch
    from ._get import get, wget
    from ._actions import get_actions, do_action
    from ._misc import get_mrsm_version, get_chaining_status
    from ._pipes import (
        register_pipe,
        fetch_pipes_keys,
        edit_pipe,
        sync_pipe,
        delete_pipe,
        get_pipe_data,
        get_backtrack_data,
        get_pipe_id,
        get_pipe_attributes,
        get_sync_time,
        pipe_exists,
        create_metadata,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        get_pipe_columns_types,
    )
    from ._fetch import fetch
    from ._plugins import (
        register_plugin,
        install_plugin,
        delete_plugin,
        get_plugins,
        get_plugin_attributes,
    )
    from ._login import login, test_connection
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri

    def __init__(
        self,
        label: Optional[str] = None,
        wait: bool = False,
        debug: bool = False,
        **kw
    ):
        self.wait = wait
        if 'uri' in kw:
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            from_uri_params.pop('label', None)
            kw.update(from_uri_params)

        super().__init__('api', label=label, **kw)
        if 'protocol' not in self.__dict__:
            self.protocol = 'http'
        if 'port' not in self.__dict__:
            self.port = 8000
        if 'uri' not in self.__dict__:
            self.verify_attributes(required_attributes)
        else:
            from meerschaum.connectors.sql import SQLConnector
            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
            if 'host' not in conn_attrs:
                raise Exception(f"Invalid URI for '{self}'.")
            self.__dict__.update(conn_attrs)
        self.url = (
            self.protocol + '://' +
            self.host + ':' +
            str(self.port)
        )
        self._token = None
        self._expires = None
        self._session = None


    @property
    def URI(self) -> str:
        """
        Return the fully qualified URI.
        """
        username = self.__dict__.get('username', None)
        password = self.__dict__.get('password', None)
        creds = (username + ':' + password + '@') if username and password else ''

        return (
            self.protocol + '://' + creds
            + self.host + ':' + str(self.port)
        )


    @property
    def session(self):
        if self._session is None:
            requests = attempt_import('requests')
            if requests:
                self._session = requests.Session()
            if self._session is None:
                error(f"Failed to import requests. Is requests installed?")
        return self._session

    @property
    def token(self):
        expired = (
            True if self._expires is None else (
                (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1))
            )
        )

        if self._token is None or expired:
            success, msg = self.login()
            if not success:
                warn(msg, stack=False)
        return self._token

Classes

class APIConnector (label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)

Connect to a Meerschaum API instance.

Parameters

type : str
The type of the connection. Used as a key in config.yaml to get attributes. Supported values are 'sql', 'api', 'mqtt', 'plugin'.
label : str
The label for the connection. Used as a key within config.yaml
pandas : str
Custom pandas implementation name. E.g. May change to modin.pandas. NOTE: This is experimental!

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
Expand source code
class APIConnector(Connector):
    """
    Connect to a Meerschaum API instance.
    """

    IS_INSTANCE: bool = True

    from ._delete import delete
    from ._post import post
    from ._patch import patch
    from ._get import get, wget
    from ._actions import get_actions, do_action
    from ._misc import get_mrsm_version, get_chaining_status
    from ._pipes import (
        register_pipe,
        fetch_pipes_keys,
        edit_pipe,
        sync_pipe,
        delete_pipe,
        get_pipe_data,
        get_backtrack_data,
        get_pipe_id,
        get_pipe_attributes,
        get_sync_time,
        pipe_exists,
        create_metadata,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        get_pipe_columns_types,
    )
    from ._fetch import fetch
    from ._plugins import (
        register_plugin,
        install_plugin,
        delete_plugin,
        get_plugins,
        get_plugin_attributes,
    )
    from ._login import login, test_connection
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri

    def __init__(
        self,
        label: Optional[str] = None,
        wait: bool = False,
        debug: bool = False,
        **kw
    ):
        self.wait = wait
        if 'uri' in kw:
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            from_uri_params.pop('label', None)
            kw.update(from_uri_params)

        super().__init__('api', label=label, **kw)
        if 'protocol' not in self.__dict__:
            self.protocol = 'http'
        if 'port' not in self.__dict__:
            self.port = 8000
        if 'uri' not in self.__dict__:
            self.verify_attributes(required_attributes)
        else:
            from meerschaum.connectors.sql import SQLConnector
            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
            if 'host' not in conn_attrs:
                raise Exception(f"Invalid URI for '{self}'.")
            self.__dict__.update(conn_attrs)
        self.url = (
            self.protocol + '://' +
            self.host + ':' +
            str(self.port)
        )
        self._token = None
        self._expires = None
        self._session = None


    @property
    def URI(self) -> str:
        """
        Return the fully qualified URI.
        """
        username = self.__dict__.get('username', None)
        password = self.__dict__.get('password', None)
        creds = (username + ':' + password + '@') if username and password else ''

        return (
            self.protocol + '://' + creds
            + self.host + ':' + str(self.port)
        )


    @property
    def session(self):
        if self._session is None:
            requests = attempt_import('requests')
            if requests:
                self._session = requests.Session()
            if self._session is None:
                error(f"Failed to import requests. Is requests installed?")
        return self._session

    @property
    def token(self):
        expired = (
            True if self._expires is None else (
                (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1))
            )
        )

        if self._token is None or expired:
            success, msg = self.login()
            if not success:
                warn(msg, stack=False)
        return self._token

Ancestors

  • meerschaum.connectors.Connector.Connector

Class variables

var IS_INSTANCE : bool

Static methods

def from_uri(uri: str, label: Optional[str] = None, as_dict: bool = False) ‑> Union[APIConnector, Dict[str, Union[str, int]]]

Create a new APIConnector from a URI string.

Parameters

uri : str
The URI connection string.
label : Optional[str], default None
If provided, use this as the connector label. Otherwise use the determined database name.
as_dict : bool, default False
If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.

Returns

A new APIConnector object or a dictionary of attributes (if as_dict is True).

Expand source code
@classmethod
def from_uri(
        cls,
        uri: str,
        label: Optional[str] = None,
        as_dict: bool = False,
    ) -> Union[
        'meerschaum.connectors.APIConnector',
        Dict[str, Union[str, int]],
    ]:
    """
    Create a new APIConnector from a URI string.

    Parameters
    ----------
    uri: str
        The URI connection string.

    label: Optional[str], default None
        If provided, use this as the connector label.
        Otherwise use the determined database name.

    as_dict: bool, default False
        If `True`, return a dictionary of the keyword arguments
        necessary to create a new `APIConnector`, otherwise create a new object.

    Returns
    -------
    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
    """
    from meerschaum.connectors.sql import SQLConnector
    params = SQLConnector.parse_uri(uri)
    if 'host' not in params:
        error("No host was found in the provided URI.")
    params['protocol'] = params.pop('flavor')
    params['label'] = label or (
        (
            (params['username'] + '@' if 'username' in params else '')
            + params['host']
        ).lower()
    )

    return cls(**params) if not as_dict else params

Instance variables

var URI : str

Return the fully qualified URI.

Expand source code
@property
def URI(self) -> str:
    """
    Return the fully qualified URI.
    """
    username = self.__dict__.get('username', None)
    password = self.__dict__.get('password', None)
    creds = (username + ':' + password + '@') if username and password else ''

    return (
        self.protocol + '://' + creds
        + self.host + ':' + str(self.port)
    )
var session
Expand source code
@property
def session(self):
    if self._session is None:
        requests = attempt_import('requests')
        if requests:
            self._session = requests.Session()
        if self._session is None:
            error(f"Failed to import requests. Is requests installed?")
    return self._session
var token
Expand source code
@property
def token(self):
    expired = (
        True if self._expires is None else (
            (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1))
        )
    )

    if self._token is None or expired:
        success, msg = self.login()
        if not success:
            warn(msg, stack=False)
    return self._token

Methods

def clear_pipe(self, pipe: Pipe, debug: bool = False, **kw) ‑> SuccessTuple

Delete rows in a pipe's table.

Parameters

pipe : Pipe
The pipe with rows to be deleted.

Returns

A success tuple.

Expand source code
def clear_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Delete rows in a pipe's table.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe with rows to be deleted.
        
    Returns
    -------
    A success tuple.
    """
    kw.pop('metric_keys', None)
    kw.pop('connector_keys', None)
    kw.pop('location_keys', None)
    kw.pop('action', None)
    kw.pop('force', None)
    return self.do_action(
        ['clear', 'pipes'],
        connector_keys = pipe.connector_keys,
        metric_keys = pipe.metric_key,
        location_keys = pipe.location_key,
        force = True,
        debug = debug,
        **kw
    )
def create_metadata(self, debug: bool = False) ‑> bool

Create metadata tables.

Returns

A bool indicating success.

Expand source code
def create_metadata(
        self,
        debug: bool = False
    ) -> bool:
    """Create metadata tables.

    Returns
    -------
    A bool indicating success.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.config.static import STATIC_CONFIG
    import json
    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
    response = self.post(r_url, debug=debug)
    if debug:
        dprint("Create metadata response: {response.text}")
    try:
        metadata_response = json.loads(response.text)
    except Exception as e:
        metadata_response = False
    return False
def delete(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Ahy) ‑> requests.Response

Wrapper for requests.delete.

Expand source code
def delete(
        self,
        r_url : str,
        headers : Optional[Dict[str, Any]] = None,
        use_token : bool = True,
        debug : bool = False,
        **kw : Ahy,
    ) -> requests.Response:
    """Wrapper for `requests.delete`."""
    if debug:
        from meerschaum.utils.debug import dprint
    
    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking token...")
        headers.update({ 'Authorization': f'Bearer {self.token}' })

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending DELETE request to {self.url + r_url}.")

    return self.session.delete(
        self.url + r_url,
        headers = headers,
        **kw
    )
def delete_pipe(self, pipe: Optional[Pipe] = None, debug: bool = None) ‑> SuccessTuple

Delete a Pipe and drop its table.

Expand source code
def delete_pipe(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        debug: bool = None,        
    ) -> SuccessTuple:
    """Delete a Pipe and drop its table."""
    from meerschaum.utils.warnings import error
    from meerschaum.utils.debug import dprint
    if pipe is None:
        error(f"Pipe cannot be None.")
    r_url = pipe_r_url(pipe)
    response = self.delete(
        r_url + '/delete',
        debug = debug,
    )
    if debug:
        dprint(response.text)
    if isinstance(response.json(), list):
        response_tuple = response.__bool__(), response.json()[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), response.json()['detail']
    else:
        response_tuple = response.__bool__(), response.text
    return response_tuple
def delete_plugin(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> SuccessTuple

Delete a plugin from an API repository.

Expand source code
def delete_plugin(
        self,
        plugin: meerschaum.core.Plugin,
        debug: bool = False
    ) -> SuccessTuple:
    """Delete a plugin from an API repository."""
    import json
    r_url = plugin_r_url(plugin)
    try:
        response = self.delete(r_url, debug=debug)
    except Exception as e:
        return False, f"Failed to delete plugin '{plugin}'."

    try:
        success, msg = json.loads(response.text)
    except Exception as e:
        return False, response.text

    return success, msg
def delete_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Delete a user.

Expand source code
def delete_user(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Delete a user."""
    from meerschaum.config.static import _static_config
    import json
    r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}"
    response = self.delete(r_url, debug=debug)
    try:
        _json = json.loads(response.text)
        if isinstance(_json, dict) and 'detail' in _json:
            return False, _json['detail']
        success_tuple = tuple(_json)
    except Exception as e:
        success_tuple = False, f"Failed to delete user '{user.username}'."
    return success_tuple
def do_action(self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) ‑> Tuple[bool, str]

Execute a Meerschaum action remotely.

If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments.

NOTE: The first index of action should NOT be removed! Example: action = ['show', 'config']

Returns: tuple (succeeded : bool, message : str)

Parameters

action : Optional[List[str]] :
(Default value = None)
sysargs : Optional[List[str]] :
(Default value = None)
debug : bool :
(Default value = False)

**kw :

Returns

Expand source code
def do_action(
        self,
        action: Optional[List[str]] = None,
        sysargs: Optional[List[str]] = None,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """Execute a Meerschaum action remotely.
    
    If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments.
    
    NOTE: The first index of `action` should NOT be removed!
    Example: action = ['show', 'config']
    
    Returns: tuple (succeeded : bool, message : str)

    Parameters
    ----------
    action: Optional[List[str]] :
         (Default value = None)
    sysargs: Optional[List[str]] :
         (Default value = None)
    debug: bool :
         (Default value = False)
    **kw :
        

    Returns
    -------

    """
    import sys, json
    from meerschaum.utils.debug import dprint
    from meerschaum.config.static import _static_config
    from meerschaum.utils.misc import json_serialize_datetime
    if action is None:
        action = []

    if sysargs is not None and action and action[0] == '':
        from meerschaum._internal.arguments import parse_arguments
        if debug:
            dprint(f"Parsing sysargs:\n{sysargs}")
        json_dict = parse_arguments(sysargs)
    else:
        json_dict = kw
        json_dict['action'] = action
        json_dict['debug'] = debug

    root_action = json_dict['action'][0]
    del json_dict['action'][0]
    r_url = f"{_static_config()['api']['endpoints']['actions']}/{root_action}"
    
    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending data to '{self.url + r_url}':")
        pprint(json_dict, stream=sys.stderr)

    response = self.post(
        r_url,
        data = json.dumps(json_dict, default=json_serialize_datetime),
        debug = debug,
    )
    try:
        response_list = json.loads(response.text)
        if isinstance(response_list, dict) and 'detail' in response_list:
            return False, response_list['detail']
    except Exception as e:
        print(f"Invalid response: {response}")
        print(e)
        return False, response.text
    if debug:
        dprint(response)
    try:
        return response_list[0], response_list[1]
    except Exception as e:
        return False, f"Failed to parse result from action '{root_action}'"
def drop_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Drop a pipe's table but maintain its registration.

Parameters

pipe : meerschaum.Pipe:
The pipe to be dropped.

Returns

A success tuple (bool, str).

Expand source code
def drop_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False
    ) -> SuccessTuple:
    """Drop a pipe's table but maintain its registration.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to be dropped.
        
    Returns
    -------
    A success tuple (bool, str).
    """
    return self.do_action(
        ['drop', 'pipes'],
        connector_keys = pipe.connector_keys,
        metric_keys = pipe.metric_key,
        location_keys = pipe.location_key,
        force = True,
        debug = debug
    )
def edit_pipe(self, pipe: Pipe, patch: bool = False, debug: bool = False) ‑> SuccessTuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

Expand source code
def edit_pipe(
        self,
        pipe: meerschaum.Pipe,
        patch: bool = False,
        debug: bool = False,
    ) -> SuccessTuple:
    """Submit a PATCH to the API to edit an existing Pipe object.
    Returns a tuple of (success_bool, response_dict).
    """
    from meerschaum.utils.debug import dprint
    ### NOTE: if `parameters` is supplied in the Pipe constructor,
    ###       then `pipe.parameters` will exist and not be fetched from the database.
    r_url = pipe_r_url(pipe)
    response = self.patch(
        r_url + '/edit',
        params = {'patch': patch,},
        json = pipe.parameters,
        debug = debug,
    )
    if debug:
        dprint(response.text)
    if isinstance(response.json(), list):
        response_tuple = response.__bool__(), response.json()[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), response.json()['detail']
    else:
        response_tuple = response.__bool__(), response.text
    return response_tuple
def edit_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Edit an existing user.

Expand source code
def edit_user(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Edit an existing user."""
    import json
    from meerschaum.config.static import STATIC_CONFIG
    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
    data = {
        'username': user.username,
        'password': user.password,
        'type': user.type,
        'email': user.email,
        'attributes': json.dumps(user.attributes),
    }
    response = self.post(r_url, data=data, debug=debug)
    try:
        _json = json.loads(response.text)
        if isinstance(_json, dict) and 'detail' in _json:
            return False, _json['detail']
        success_tuple = tuple(_json)
    except Exception as e:
        msg = response.text if response else f"Failed to edit user '{user}'."
        return False, msg

    return tuple(success_tuple)
def fetch(self, pipe: Pipe, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, params: Optional[Dict, Any] = None, debug: bool = False, **kw: Any) ‑> pandas.DataFrame

Get the Pipe data from the remote Pipe.

Expand source code
def fetch(
        self,
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime, str] = '',
        end: Optional[datetime.datetime] = None,
        params: Optional[Dict, Any] = None,
        debug: bool = False,
        **kw: Any
    ) -> pandas.DataFrame:
    """Get the Pipe data from the remote Pipe."""
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn, error
    from meerschaum.config.static import _static_config
    from meerschaum.config._patch import apply_patch_to_config

    if 'fetch' not in pipe.parameters:
        warn(f"Missing 'fetch' parameters for Pipe '{pipe}'.", stack=False)
        return None

    instructions = pipe.parameters['fetch']

    if 'connector_keys' not in instructions:
        warn(f"Missing connector_keys in fetch parameters for Pipe '{pipe}'", stack=False)
        return None
    remote_connector_keys = instructions.get('connector_keys', None)
    if 'metric_key' not in instructions:
        warn(f"Missing metric_key in fetch parameters for Pipe '{pipe}'", stack=False)
        return None
    remote_metric_key = instructions.get('metric_key', None)
    remote_location_key = instructions.get('location_key', None)
    if begin is None:
        begin = pipe.sync_time

    _params = copy.deepcopy(params) if params is not None else {}
    _params = apply_patch_to_config(_params, instructions.get('params', {}))

    from meerschaum import Pipe
    p = Pipe(
        remote_connector_keys,
        remote_metric_key,
        remote_location_key,
        mrsm_instance = self
    )
    begin = (
        begin if not (isinstance(begin, str) and begin == '')
        else pipe.get_sync_time(debug=debug)
    )

    return p.get_data(
        begin=begin, end=end,
        params=_params,
        debug=debug
    )
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> List[Tuple[str, str, Optional[str]]]

Fetch registered Pipes' keys from the API.

Parameters

connector_keys : Optional[List[str]], default None
The connector keys for the query.
metric_keys : Optional[List[str]], default None
The metric keys for the query.
location_keys : Optional[List[str]], default None
The location keys for the query.
tags : Optional[List[str]], default None
A list of tags for the query.
params : Optional[Dict[str, Any]], default None
A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
debug : bool, default False
Verbosity toggle.

Returns

A list of tuples containing pipes' keys.

Expand source code
def fetch_pipes_keys(
        self,
        connector_keys: Optional[List[str]] = None,
        metric_keys: Optional[List[str]] = None,
        location_keys: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> Union[List[Tuple[str, str, Union[str, None]]]]:
    """
    Fetch registered Pipes' keys from the API.
    
    Parameters
    ----------
    connector_keys: Optional[List[str]], default None
        The connector keys for the query.

    metric_keys: Optional[List[str]], default None
        The metric keys for the query.

    location_keys: Optional[List[str]], default None
        The location keys for the query.

    tags: Optional[List[str]], default None
        A list of tags for the query.

    params: Optional[Dict[str, Any]], default None
        A parameters dictionary for filtering against the `pipes` table
        (e.g. `{'connector_keys': 'plugin:foo'}`).
        Not recommeded to be used.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A list of tuples containing pipes' keys.
    """
    from meerschaum.utils.warnings import error
    from meerschaum.config.static import STATIC_CONFIG
    import json
    if connector_keys is None:
        connector_keys = []
    if metric_keys is None:
        metric_keys = []
    if location_keys is None:
        location_keys = []
    if tags is None:
        tags = []

    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
    try:
        j = self.get(
            r_url,
            params = {
                'connector_keys': json.dumps(connector_keys),
                'metric_keys': json.dumps(metric_keys),
                'location_keys': json.dumps(location_keys),
                'tags': json.dumps(tags),
                'params': json.dumps(params),
            },
            debug=debug
        ).json()
    except Exception as e:
        error(str(e))

    if 'detail' in j:
        error(j['detail'], stack=False)
    return [tuple(r) for r in j]
def get(self, r_url: str, headers: Optional[Dict[str, str]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Reponse

Wrapper for requests.get.

Expand source code
def get(
        self,
        r_url: str,
        headers: Optional[Dict[str, str]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> requests.Reponse:
    """Wrapper for `requests.get`."""
    if debug:
        from meerschaum.utils.debug import dprint

    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking login token.")
        headers.update({'Authorization': f'Bearer {self.token}'})

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending GET request to {self.url + r_url}.")

    return self.session.get(
        self.url + r_url,
        headers = headers,
        **kw
    )
def get_actions(self) ‑> list

Get available actions from the API server

Expand source code
def get_actions(
        self,
    ) -> list:
    """Get available actions from the API server"""
    from meerschaum.config.static import STATIC_CONFIG
    return self.get(STATIC_CONFIG['api']['endpoints']['actions'])
def get_backtrack_data(self, pipe: Pipe, begin: datetime.datetime, backtrack_minutes: int = 0, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any) ‑> pandas.DataFrame

Get a Pipe's backtrack data from the API.

Expand source code
def get_backtrack_data(
        self,
        pipe: meerschaum.Pipe,
        begin: datetime.datetime,
        backtrack_minutes: int = 0,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw: Any,
    ) -> pandas.DataFrame:
    """Get a Pipe's backtrack data from the API."""
    import json
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    r_url = pipe_r_url(pipe)
    try:
        response = self.get(
            r_url + "/backtrack_data",
            params = {
                'begin': begin,
                'backtrack_minutes': backtrack_minutes,
                'params': json.dumps(params),
            },
            debug = debug
        )
    except Exception as e:
        warn(f"Failed to parse backtrack data JSON for {pipe}. Exception:\n" + str(e))
        return None
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.misc import parse_df_datetimes
    if debug:
        dprint(response.text)
    pd = import_pandas()
    try:
        df = pd.read_json(response.text)
    except Exception as e:
        warn(str(e))
        return None
    df = parse_df_datetimes(pd.read_json(response.text), debug=debug)
    return df
def get_chaining_status(self, **kw) ‑> Optional[bool]

Parameters

**kw :

Returns

type
 
Expand source code
def get_chaining_status(self, **kw) -> Optional[bool]:
    """

    Parameters
    ----------
    **kw :
        

    Returns
    -------
    type
        

    """
    from meerschaum.config.static import _static_config
    try:
        response = self.get(
            _static_config()['api']['endpoints']['chaining'],
            use_token = True,
            **kw
        )
        if not response:
            return None
    except Exception as e:
        return None

    return response.json()
def get_mrsm_version(self, **kw) ‑> Optional[str]

Parameters

**kw :

Returns

type
 
Expand source code
def get_mrsm_version(self, **kw) -> Optional[str]:
    """

    Parameters
    ----------
    **kw :
        

    Returns
    -------
    type
        

    """
    from meerschaum.config.static import _static_config
    try:
        j = self.get(
            _static_config()['api']['endpoints']['version'] + '/mrsm',
            use_token = True,
            **kw
        ).json()
    except Exception as e:
        return None
    if isinstance(j, dict) and 'detail' in j:
        return None
    return j
def get_pipe_attributes(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, Any]

Get a Pipe's attributes from the API

Parameters

pipe : Pipe
The pipe whose attributes we are fetching.

Returns

A dictionary of a pipe's attributes. If the pipe does not exist, return an empty dictionary.

Expand source code
def get_pipe_attributes(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Dict[str, Any]:
    """Get a Pipe's attributes from the API

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe whose attributes we are fetching.
        
    Returns
    -------
    A dictionary of a pipe's attributes.
    If the pipe does not exist, return an empty dictionary.
    """
    r_url = pipe_r_url(pipe)
    response = self.get(r_url + '/attributes', debug=debug)
    import json
    try:
        return json.loads(response.text)
    except Exception as e:
        return {}
def get_pipe_columns_types(self, pipe: Pipe, debug: bool = False) ‑> Union[Dict[str, str], None]

Fetch the columns and types of the pipe's table.

Parameters

pipe : Pipe
The pipe whose columns to be queried.

Returns

A dictionary mapping column names to their database types.

Examples

>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
Expand source code
def get_pipe_columns_types(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Union[Dict[str, str], None]:
    """
    Fetch the columns and types of the pipe's table.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe whose columns to be queried.
        
    Returns
    -------
    A dictionary mapping column names to their database types.

    Examples
    --------
    >>> {
    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
    ...   'id': 'BIGINT',
    ...   'val': 'DOUBLE PRECISION',
    ... }
    >>>
    """
    r_url = pipe_r_url(pipe) + '/columns/types'
    response = self.get(
        r_url,
        debug = debug
    )
    j = response.json()
    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
        from meerschaum.utils.warnings import warn
        warn(j['detail'])
        return None
    if not isinstance(j, dict):
        warn(response.text)
        return None
    return j
def get_pipe_data(self, pipe: Pipe, begin: Union[str, datetime.datetime, int, None] = None, end: Union[str, datetime.datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, None]

Fetch data from the API.

Expand source code
def get_pipe_data(
        self,
        pipe: meerschaum.Pipe,
        begin: Union[str, datetime.datetime, int, None] = None,
        end: Union[str, datetime.datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        as_chunks: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union[pandas.DataFrame, None]:
    """Fetch data from the API."""
    import json
    from meerschaum.utils.warnings import warn
    r_url = pipe_r_url(pipe)
    chunks_list = []
    while True:
        try:
            response = self.get(
                r_url + "/data",
                params = {'begin': begin, 'end': end, 'params': json.dumps(params)},
                debug = debug
            )
            if not response.ok:
                return None
            j = response.json()
        except Exception as e:
            warn(str(e))
            return None
        if isinstance(j, dict) and 'detail' in j:
            return False, j['detail']
        break
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.misc import parse_df_datetimes
    pd = import_pandas()
    try:
        df = pd.read_json(response.text)
    except Exception as e:
        warn(str(e))
        return None
    df = parse_df_datetimes(df, debug=debug)
    return df
def get_pipe_id(self, pipe: meerschuam.Pipe, debug: bool = False) ‑> int

Get a Pipe's ID from the API.

Expand source code
def get_pipe_id(
        self,
        pipe: meerschuam.Pipe,
        debug: bool = False,
    ) -> int:
    """Get a Pipe's ID from the API."""
    from meerschaum.utils.debug import dprint
    r_url = pipe_r_url(pipe)
    response = self.get(
        r_url + '/id',
        debug = debug
    )
    if debug:
        dprint(f"Got pipe ID: {response.text}")
    try:
        return int(response.text)
    except Exception as e:
        return None
def get_pipe_rowcount(self, pipe: "'Pipe'", begin: "Optional['datetime.datetime']" = None, end: "Optional['datetime.datetime']" = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) ‑> int

Get a pipe's row count from the API.

Parameters

pipe : 'meerschaum.Pipe':
The pipe whose row count we are counting.
begin : Optional[datetime.datetime], default None
If provided, bound the count by this datetime.
end : Optional[datetime.datetime]
If provided, bound the count by this datetime.
params : Optional[Dict[str, Any]], default None
If provided, bound the count by these parameters.
remote : bool, default False
 

Returns

The number of rows in the pipe's table, bound the given parameters. If the table does not exist, return 0.

Expand source code
def get_pipe_rowcount(
        self,
        pipe: 'meerschaum.Pipe',
        begin: Optional['datetime.datetime'] = None,
        end: Optional['datetime.datetime'] = None,
        params: Optional[Dict[str, Any]] = None,
        remote: bool = False,
        debug: bool = False,
    ) -> int:
    """Get a pipe's row count from the API.

    Parameters
    ----------
    pipe: 'meerschaum.Pipe':
        The pipe whose row count we are counting.
        
    begin: Optional[datetime.datetime], default None
        If provided, bound the count by this datetime.

    end: Optional[datetime.datetime]
        If provided, bound the count by this datetime.

    params: Optional[Dict[str, Any]], default None
        If provided, bound the count by these parameters.

    remote: bool, default False

    Returns
    -------
    The number of rows in the pipe's table, bound the given parameters.
    If the table does not exist, return 0.
    """
    import json
    r_url = pipe_r_url(pipe)
    response = self.get(
        r_url + "/rowcount",
        json = params,
        params = {
            'begin' : begin,
            'end' : end,
            'remote' : remote,
        },
        debug = debug
    )
    try:
        return int(json.loads(response.text))
    except Exception as e:
        return 0
def get_plugin_attributes(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> Mapping[str, Any]

Return a plugin's attributes.

Expand source code
def get_plugin_attributes(
        self,
        plugin: meerschaum.core.Plugin,
        debug: bool = False
    ) -> Mapping[str, Any]:
    """
    Return a plugin's attributes.
    """
    import json
    from meerschaum.utils.warnings import warn, error
    from meerschaum.config.static import _static_config
    r_url = plugin_r_url(plugin) + '/attributes'
    response = self.get(r_url, use_token=True, debug=debug)
    attributes = response.json()
    if isinstance(attributes, str) and attributes and attributes[0] == '{':
        try:
            attributes = json.loads(attributes)
        except Exception as e:
            pass
    if not isinstance(attributes, dict):
        error(response.text)
    elif not response and 'detail' in attributes:
        warn(attributes['detail'])
        return {}
    return attributes
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) ‑> Sequence[str]

Return a list of registered plugin names.

Parameters

user_id :
If specified, return all plugins from a certain user.
user_id : Optional[int] :
(Default value = None)
search_term : Optional[str] :
(Default value = None)
debug : bool :
(Default value = False)

Returns

Expand source code
def get_plugins(
        self,
        user_id : Optional[int] = None,
        search_term : Optional[str] = None,
        debug : bool = False
    ) -> Sequence[str]:
    """Return a list of registered plugin names.

    Parameters
    ----------
    user_id :
        If specified, return all plugins from a certain user.
    user_id : Optional[int] :
         (Default value = None)
    search_term : Optional[str] :
         (Default value = None)
    debug : bool :
         (Default value = False)

    Returns
    -------

    """
    import json
    from meerschaum.utils.warnings import warn, error
    from meerschaum.config.static import _static_config
    response = self.get(
        _static_config()['api']['endpoints']['plugins'],
        params = {'user_id' : user_id, 'search_term' : search_term},
        use_token = True,
        debug = debug
    )
    if not response:
        return []
    plugins = json.loads(response.text)
    if not isinstance(plugins, list):
        error(response.text)
    return plugins
def get_sync_time(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> Union[datetime.datetime, int, None]

Get a Pipe's most recent datetime value from the API.

Parameters

pipe : Pipe
The pipe to select from.
params : Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause.
newest : bool, default True
If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
round_down : bool, default True
If True, round the resulting datetime value down to the nearest minute.

Returns

The most recent (or oldest if newest is False) datetime of a pipe, rounded down to the closest minute.

Expand source code
def get_sync_time(
        self,
        pipe: 'meerschaum.Pipe',
        params: Optional[Dict[str, Any]] = None,
        newest: bool = True,
        round_down: bool = True,
        debug: bool = False,
    ) -> Union[datetime.datetime, int, None]:
    """Get a Pipe's most recent datetime value from the API.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to select from.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the WHERE clause.

    newest: bool, default True
        If `True`, get the most recent datetime (honoring `params`).
        If `False`, get the oldest datetime (ASC instead of DESC).

    round_down: bool, default True
        If `True`, round the resulting datetime value down to the nearest minute.

    Returns
    -------
    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
    rounded down to the closest minute.
    """
    from meerschaum.utils.misc import is_int
    from meerschaum.utils.warnings import warn
    import datetime, json
    r_url = pipe_r_url(pipe)
    response = self.get(
        r_url + '/sync_time',
        json = params,
        params = {'newest': newest, 'debug': debug, 'round_down': round_down},
        debug = debug,
    )
    if not response:
        warn(response.text)
        return None
    j = response.json()
    if j is None:
        dt = None
    else:
        try:
            dt = (
                datetime.datetime.fromisoformat(j)
            ) if not is_int(j) else int(j)
        except Exception as e:
            warn(e)
            dt = None
    return dt
def get_user_attributes(self, user: "'meerschaum.core.User'", debug: bool = False, **kw) ‑> int

Get a user's attributes.

Expand source code
def get_user_attributes(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw
    ) -> int:
    """Get a user's attributes."""
    from meerschaum.config.static import _static_config
    import json
    r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}/attributes"
    response = self.get(r_url, debug=debug, **kw)
    try:
        attributes = json.loads(response.text)
    except Exception as e:
        attributes = None
    return attributes
def get_user_id(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[int]

Get a user's ID.

Expand source code
def get_user_id(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> Optional[int]:
    """Get a user's ID."""
    from meerschaum.config.static import _static_config
    import json
    r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}/id"
    response = self.get(r_url, debug=debug, **kw)
    try:
        user_id = int(json.loads(response.text))
    except Exception as e:
        user_id = None
    return user_id
def get_user_password_hash(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]

If configured, get a user's password hash.

Expand source code
def get_user_password_hash(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """If configured, get a user's password hash."""
    from meerschaum.config.static import _static_config
    r_url = _static_config()['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
    response = self.get(r_url, debug=debug, **kw)
    if not response:
        return None
    return response.json()
def get_user_type(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]

If configured, get a user's type.

Expand source code
def get_user_type(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """If configured, get a user's type."""
    from meerschaum.config.static import _static_config
    r_url = _static_config()['api']['endpoints']['users'] + '/' + user.username + '/type'
    response = self.get(r_url, debug=debug, **kw)
    if not response:
        return None
    return response.json()
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]

Return a list of registered usernames.

Expand source code
def get_users(
        self,
        debug: bool = False,
        **kw : Any
    ) -> List[str]:
    """
    Return a list of registered usernames.
    """
    from meerschaum.config.static import _static_config
    import json
    response = self.get(
        f"{_static_config()['api']['endpoints']['users']}",
        debug = debug,
        use_token = True,
    )
    if not response:
        return []
    try:
        return response.json()
    except Exception as e:
        return []
def install_plugin(self, name: str, force: bool = False, debug: bool = False) ‑> Tuple[bool, str]

Download and attempt to install a plugin from the API.

Expand source code
def install_plugin(
        self,
        name: str,
        force: bool = False,
        debug: bool = False
    ) -> SuccessTuple:
    """Download and attempt to install a plugin from the API."""
    import os, pathlib, json
    from meerschaum.core import Plugin
    from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
    r_url = plugin_r_url(name)
    dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
    if debug:
        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
    archive_path = self.wget(r_url, dest, debug=debug) 
    is_binary = binaryornot_check.is_binary(str(archive_path))
    if not is_binary:
        fail_msg = f"Failed to download binary for plugin '{name}'."
        try:
            with open(archive_path, 'r') as f:
                j = json.load(f)
            if isinstance(j, list):
                success, msg = tuple(j)
            elif isinstance(j, dict) and 'detail' in j:
                success, msg = False, fail_msg
        except Exception as e:
            success, msg = False, fail_msg
        return success, msg
    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
    return plugin.install(force=force, debug=debug)
def login(self, debug: bool = False, warn: bool = True, **kw: Any) ‑> Tuple[bool, str]

Log in and set the session token.

Expand source code
def login(
        self,
        debug: bool = False,
        warn: bool = True,
        **kw: Any
    ) -> SuccessTuple:
    """Log in and set the session token."""
    from meerschaum.utils.warnings import warn as _warn, info, error
    from meerschaum.core import User
    from meerschaum.config.static import _static_config
    import json, datetime
    try:
        login_data = {
            'username': self.username,
            'password': self.password,
        }
    except AttributeError:
        return False, f"Please login with the command `login {self}`."
    response = self.post(
        _static_config()['api']['endpoints']['login'],
        data = login_data,
        use_token = False,
        debug = debug
    )
    if response:
        msg = f"Successfully logged into '{self}' as user '{login_data['username']}'."
        self._token = json.loads(response.text)['access_token']
        self._expires = datetime.datetime.strptime(
            json.loads(response.text)['expires'], 
            '%Y-%m-%dT%H:%M:%S.%f'
        )
    else:
        msg = (
            f"Failed to log into '{self}' as user '{login_data['username']}'.\n" +
            f"    Please verify login details for connector '{self}'."
        )
        if warn:
            _warn(msg, stack=False)

    return response.__bool__(), msg
def patch(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response

Wrapper for requests.patch.

Expand source code
def patch(
        self,
        r_url: str,
        headers: Optional[Dict[str, Any]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> requests.Response:
    """Wrapper for `requests.patch`."""
    if debug:
        from meerschaum.utils.debug import dprint

    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking login token.")
        headers.update({ 'Authorization': f'Bearer {self.token}' })

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending PATCH request to {self.url + r_url}")

    return self.session.patch(
        self.url + r_url,
        headers = headers,
        **kw
    )
def pipe_exists(self, pipe: "'Pipe'", debug: bool = False) ‑> bool

Check the API to see if a Pipe exists.

Parameters

pipe : 'meerschaum.Pipe'
The pipe which were are querying.

Returns

A bool indicating whether a pipe's underlying table exists.

Expand source code
def pipe_exists(
        self,
        pipe: 'meerschaum.Pipe',
        debug: bool = False
    ) -> bool:
    """Check the API to see if a Pipe exists.

    Parameters
    ----------
    pipe: 'meerschaum.Pipe'
        The pipe which were are querying.
        
    Returns
    -------
    A bool indicating whether a pipe's underlying table exists.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    r_url = pipe_r_url(pipe)
    response = self.get(r_url + '/exists', debug=debug)
    if debug:
        dprint("Received response: " + str(response.text))
    j = response.json()
    if isinstance(j, dict) and 'detail' in j:
        warn(j['detail'])
    return j
def post(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response

Wrapper for requests.post.

Expand source code
def post(
        self,
        r_url: str,
        headers: Optional[Dict[str, Any]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> requests.Response:
    """Wrapper for `requests.post`."""
    if debug:
        from meerschaum.utils.debug import dprint

    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking token...")
        headers.update({'Authorization': f'Bearer {self.token}'})

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending POST request to {self.url + r_url}")

    return self.session.post(
        self.url + r_url,
        headers = headers,
        **kw
    )
def register_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

Expand source code
def register_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False
    ) -> SuccessTuple:
    """Submit a POST to the API to register a new Pipe object.
    Returns a tuple of (success_bool, response_dict).
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.config.static import STATIC_CONFIG
    ### NOTE: if `parameters` is supplied in the Pipe constructor,
    ###       then `pipe.parameters` will exist and not be fetched from the database.
    r_url = pipe_r_url(pipe)
    response = self.post(
        r_url + '/register',
        json = pipe.parameters,
        debug = debug,
    )
    if debug:
        dprint(response.text)
    if isinstance(response.json(), list):
        response_tuple = response.__bool__(), response.json()[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), response.json()['detail']
    else:
        response_tuple = response.__bool__(), response.text
    return response_tuple
def register_plugin(self, plugin: meerschaum.core.Plugin, make_archive: bool = True, debug: bool = False) ‑> SuccessTuple

Register a plugin and upload its archive.

Expand source code
def register_plugin(
        self,
        plugin: meerschaum.core.Plugin,
        make_archive: bool = True,
        debug: bool = False,
    ) -> SuccessTuple:
    """Register a plugin and upload its archive."""
    import json
    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
    file_pointer = open(archive_path, 'rb')
    files = {'archive': file_pointer}
    metadata = {
        'version': plugin.version,
        'attributes': json.dumps(plugin.attributes),
    }
    r_url = plugin_r_url(plugin)
    try:
        response = self.post(r_url, files=files, params=metadata, debug=debug)
    except Exception as e:
        return False, f"Failed to register plugin '{plugin}'."
    finally:
        file_pointer.close()

    try:
        success, msg = json.loads(response.text)
    except Exception as e:
        return False, response.text

    return success, msg
def register_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Register a new user.

Expand source code
def register_user(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Register a new user."""
    import json
    from meerschaum.config.static import _static_config
    r_url = f"{_static_config()['api']['endpoints']['users']}/register"
    data = {
        'username': user.username,
        'password': user.password,
        'attributes': json.dumps(user.attributes),
    }
    if user.type:
        data['type'] = user.type
    if user.email:
        data['email'] = user.email
    response = self.post(r_url, data=data, debug=debug)
    try:
        _json = json.loads(response.text)
        if isinstance(_json, dict) and 'detail' in _json:
            return False, _json['detail']
        success_tuple = tuple(_json)
    except Exception:
        msg = response.text if response else f"Failed to register user '{user}'."
        return False, msg

    return tuple(success_tuple)
def sync_pipe(self, pipe: Optional[Pipe] = None, df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) ‑> SuccessTuple

Append a pandas DataFrame to a Pipe. If Pipe does not exist, it is registered with supplied metadata.

Expand source code
def sync_pipe(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None,
        chunksize: Optional[int] = -1,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Append a pandas DataFrame to a Pipe.
    If Pipe does not exist, it is registered with supplied metadata.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.misc import json_serialize_datetime
    from meerschaum.config import get_config
    from meerschaum.utils.packages import attempt_import
    import json, time
    begin = time.time()
    more_itertools = attempt_import('more_itertools')
    if df is None:
        msg = f"DataFrame is `None`. Cannot sync {pipe}."
        return False, msg

    def get_json_str(c):
        ### allow syncing dict or JSON without needing to import pandas (for IOT devices)
        return (
            json.dumps(c, default=json_serialize_datetime) if isinstance(c, (dict, list))
            else c.to_json(date_format='iso', date_unit='ns')
        )

    df = json.loads(df) if isinstance(df, str) else df

    ### TODO Make separate chunksize for API?
    _chunksize : Optional[int] = (1 if chunksize is None else (
        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
        else chunksize
    ))
    keys : list = list(df.keys())
    chunks = []
    if hasattr(df, 'index'):
        rowcount = len(df)
        chunks = [df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize)]
    elif isinstance(df, dict):
        ### `_chunks` is a dict of lists of dicts.
        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
        _chunks = {k: [] for k in keys}
        rowcount = len(df[keys[0]])
        for k in keys:
            if len(df[k]) != rowcount:
                return False, "Arrays must all be the same length."
            chunk_iter = more_itertools.chunked(df[k], _chunksize)
            for l in chunk_iter:
                _chunks[k].append({k: l})

        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
        for k, l in _chunks.items():
            for i, c in enumerate(l):
                try:
                    chunks[i].update(c)
                except IndexError:
                    chunks.append(c)
    elif isinstance(df, list):
        chunks = [df[i] for i in more_itertools.chunked(df, _chunksize)]
        rowcount = len(chunks)

    ### Send columns in case the user has defined them locally.
    if pipe.columns:
        kw['columns'] = json.dumps(pipe.columns)
    r_url = pipe_r_url(pipe) + '/data'

    for i, c in enumerate(chunks):
        if debug:
            dprint(f"Posting chunk ({i + 1} / {len(chunks)}) to {r_url}...")
        json_str = get_json_str(c)

        try:
            response = self.post(
                r_url,
                ### handles check_existing
                params = kw,
                data = json_str,
                debug = debug
            )
        except Exception as e:
            warn(str(e))
            return False, str(e)
            
        if not response:
            return False, f"Failed to receive response. Response text: {response.text}"

        try:
            j = json.loads(response.text)
        except Exception as e:
            return False, str(e)

        if isinstance(j, dict) and 'detail' in j:
            return False, j['detail']

        try:
            j = tuple(j)
        except Exception as e:
            return False, response.text

        if debug:
            dprint("Received response: " + str(j))
        if not j[0]:
            return j

    len_chunks = len(chunks)

    success_tuple = True, (
        f"It took {round(time.time() - begin, 2)} seconds to sync {rowcount} row"
        + ('s' if rowcount != 1 else '')
        + f" across {len_chunks} chunk" + ('s' if len_chunks != 1 else '') +
        f" to {pipe}."
    )
    return success_tuple
def test_connection(self, **kw: Any) ‑> Optional[bool]

Test if a successful connection to the API may be made.

Expand source code
def test_connection(
        self,
        **kw: Any
    ) -> Union[bool, None]:
    """Test if a successful connection to the API may be made."""
    from meerschaum.connectors.poll import retry_connect
    _default_kw = {
        'max_retries': 1, 'retry_wait': 0, 'warn': False,
        'connector': self, 'enforce_chaining': False,
        'enforce_login': False,
    }
    _default_kw.update(kw)
    try:
        return retry_connect(**_default_kw)
    except Exception as e:
        return False
def wget(self, r_url: str, dest: Optional[Union[str, pathlib.Path]] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> pathlib.Path

Mimic wget with requests.

Expand source code
def wget(
        self,
        r_url: str,
        dest: Optional[Union[str, pathlib.Path]] = None,
        headers: Optional[Dict[str, Any]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> pathlib.Path:
    """Mimic wget with requests.
    """
    from meerschaum.utils.misc import wget
    from meerschaum.utils.debug import dprint
    if headers is None:
        headers = {}
    if use_token:
        if debug:
            dprint(f"Checking login token.")
        headers.update({'Authorization': f'Bearer {self.token}'})
    return wget(self.url + r_url, dest=dest, headers=headers, **kw)