Module meerschaum.connectors.api.APIConnector

Interact with Meerschaum APIs. May be chained together (see 'meerschaum:api_instance' in your config.yaml).

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Interact with Meerschaum APIs. May be chained together (see 'meerschaum:api_instance' in your config.yaml).
"""

import datetime
from meerschaum.utils.typing import Optional
from meerschaum.connectors import Connector
from meerschaum.utils.warnings import warn, error
from meerschaum.utils.packages import attempt_import

required_attributes = {
    'host',
}

class APIConnector(Connector):
    """
    Connect to a Meerschaum API instance.
    """

    IS_INSTANCE: bool = True
    IS_THREAD_SAFE: bool = False

    from ._delete import delete
    from ._post import post
    from ._patch import patch
    from ._get import get, wget
    from ._actions import get_actions, do_action
    from ._misc import get_mrsm_version, get_chaining_status
    from ._pipes import (
        register_pipe,
        fetch_pipes_keys,
        edit_pipe,
        sync_pipe,
        delete_pipe,
        get_pipe_data,
        get_backtrack_data,
        get_pipe_id,
        get_pipe_attributes,
        get_sync_time,
        pipe_exists,
        create_metadata,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        get_pipe_columns_types,
    )
    from ._fetch import fetch
    from ._plugins import (
        register_plugin,
        install_plugin,
        delete_plugin,
        get_plugins,
        get_plugin_attributes,
    )
    from ._login import login, test_connection
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri

    def __init__(
        self,
        label: Optional[str] = None,
        wait: bool = False,
        debug: bool = False,
        **kw
    ):
        if 'uri' in kw:
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            from_uri_params.pop('label', None)
            kw.update(from_uri_params)

        super().__init__('api', label=label, **kw)
        if 'protocol' not in self.__dict__:
            self.protocol = 'http'
        if 'port' not in self.__dict__:
            self.port = 8000
        if 'uri' not in self.__dict__:
            self.verify_attributes(required_attributes)
        else:
            from meerschaum.connectors.sql import SQLConnector
            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
            if 'host' not in conn_attrs:
                raise Exception(f"Invalid URI for '{self}'.")
            self.__dict__.update(conn_attrs)
        self.url = (
            self.protocol + '://' +
            self.host + ':' +
            str(self.port)
        )
        self._token = None
        self._expires = None
        self._session = None


    @property
    def URI(self) -> str:
        """
        Return the fully qualified URI.
        """
        username = self.__dict__.get('username', None)
        password = self.__dict__.get('password', None)
        creds = (username + ':' + password + '@') if username and password else ''

        return (
            self.protocol + '://' + creds
            + self.host + ':' + str(self.port)
        )


    @property
    def session(self):
        if self._session is None:
            requests = attempt_import('requests')
            if requests:
                self._session = requests.Session()
            if self._session is None:
                error(f"Failed to import requests. Is requests installed?")
        return self._session

    @property
    def token(self):
        expired = (
            True if self._expires is None else (
                (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1))
            )
        )

        if self._token is None or expired:
            success, msg = self.login()
            if not success:
                warn(msg, stack=False)
        return self._token

Classes

class APIConnector (label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)

Connect to a Meerschaum API instance.

Parameters

type : str
The type of the connector (e.g. sql, api, plugin).
label : str
The label for the connector.

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
Expand source code
class APIConnector(Connector):
    """
    Connect to a Meerschaum API instance.
    """

    IS_INSTANCE: bool = True
    IS_THREAD_SAFE: bool = False

    from ._delete import delete
    from ._post import post
    from ._patch import patch
    from ._get import get, wget
    from ._actions import get_actions, do_action
    from ._misc import get_mrsm_version, get_chaining_status
    from ._pipes import (
        register_pipe,
        fetch_pipes_keys,
        edit_pipe,
        sync_pipe,
        delete_pipe,
        get_pipe_data,
        get_backtrack_data,
        get_pipe_id,
        get_pipe_attributes,
        get_sync_time,
        pipe_exists,
        create_metadata,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        get_pipe_columns_types,
    )
    from ._fetch import fetch
    from ._plugins import (
        register_plugin,
        install_plugin,
        delete_plugin,
        get_plugins,
        get_plugin_attributes,
    )
    from ._login import login, test_connection
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri

    def __init__(
        self,
        label: Optional[str] = None,
        wait: bool = False,
        debug: bool = False,
        **kw
    ):
        if 'uri' in kw:
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            from_uri_params.pop('label', None)
            kw.update(from_uri_params)

        super().__init__('api', label=label, **kw)
        if 'protocol' not in self.__dict__:
            self.protocol = 'http'
        if 'port' not in self.__dict__:
            self.port = 8000
        if 'uri' not in self.__dict__:
            self.verify_attributes(required_attributes)
        else:
            from meerschaum.connectors.sql import SQLConnector
            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
            if 'host' not in conn_attrs:
                raise Exception(f"Invalid URI for '{self}'.")
            self.__dict__.update(conn_attrs)
        self.url = (
            self.protocol + '://' +
            self.host + ':' +
            str(self.port)
        )
        self._token = None
        self._expires = None
        self._session = None


    @property
    def URI(self) -> str:
        """
        Return the fully qualified URI.
        """
        username = self.__dict__.get('username', None)
        password = self.__dict__.get('password', None)
        creds = (username + ':' + password + '@') if username and password else ''

        return (
            self.protocol + '://' + creds
            + self.host + ':' + str(self.port)
        )


    @property
    def session(self):
        if self._session is None:
            requests = attempt_import('requests')
            if requests:
                self._session = requests.Session()
            if self._session is None:
                error(f"Failed to import requests. Is requests installed?")
        return self._session

    @property
    def token(self):
        expired = (
            True if self._expires is None else (
                (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1))
            )
        )

        if self._token is None or expired:
            success, msg = self.login()
            if not success:
                warn(msg, stack=False)
        return self._token

Ancestors

  • meerschaum.connectors.Connector.Connector

Class variables

var IS_INSTANCE : bool
var IS_THREAD_SAFE : bool

Static methods

def from_uri(uri: str, label: Optional[str] = None, as_dict: bool = False) ‑> Union[APIConnector, Dict[str, Union[str, int]]]

Create a new APIConnector from a URI string.

Parameters

uri : str
The URI connection string.
label : Optional[str], default None
If provided, use this as the connector label. Otherwise use the determined database name.
as_dict : bool, default False
If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.

Returns

A new APIConnector object or a dictionary of attributes (if as_dict is True).

Expand source code
@classmethod
def from_uri(
        cls,
        uri: str,
        label: Optional[str] = None,
        as_dict: bool = False,
    ) -> Union[
        'meerschaum.connectors.APIConnector',
        Dict[str, Union[str, int]],
    ]:
    """
    Create a new APIConnector from a URI string.

    Parameters
    ----------
    uri: str
        The URI connection string.

    label: Optional[str], default None
        If provided, use this as the connector label.
        Otherwise use the determined database name.

    as_dict: bool, default False
        If `True`, return a dictionary of the keyword arguments
        necessary to create a new `APIConnector`, otherwise create a new object.

    Returns
    -------
    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
    """
    from meerschaum.connectors.sql import SQLConnector
    params = SQLConnector.parse_uri(uri)
    if 'host' not in params:
        error("No host was found in the provided URI.")
    params['protocol'] = params.pop('flavor')
    params['label'] = label or (
        (
            (params['username'] + '@' if 'username' in params else '')
            + params['host']
        ).lower()
    )

    return cls(**params) if not as_dict else params

Instance variables

var URI : str

Return the fully qualified URI.

Expand source code
@property
def URI(self) -> str:
    """
    Return the fully qualified URI.
    """
    username = self.__dict__.get('username', None)
    password = self.__dict__.get('password', None)
    creds = (username + ':' + password + '@') if username and password else ''

    return (
        self.protocol + '://' + creds
        + self.host + ':' + str(self.port)
    )
var session
Expand source code
@property
def session(self):
    if self._session is None:
        requests = attempt_import('requests')
        if requests:
            self._session = requests.Session()
        if self._session is None:
            error(f"Failed to import requests. Is requests installed?")
    return self._session
var token
Expand source code
@property
def token(self):
    expired = (
        True if self._expires is None else (
            (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1))
        )
    )

    if self._token is None or expired:
        success, msg = self.login()
        if not success:
            warn(msg, stack=False)
    return self._token

Methods

def clear_pipe(self, pipe: Pipe, debug: bool = False, **kw) ‑> SuccessTuple

Delete rows in a pipe's table.

Parameters

pipe : Pipe
The pipe with rows to be deleted.

Returns

A success tuple.

Expand source code
def clear_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Delete rows in a pipe's table.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe with rows to be deleted.
        
    Returns
    -------
    A success tuple.
    """
    kw.pop('metric_keys', None)
    kw.pop('connector_keys', None)
    kw.pop('location_keys', None)
    kw.pop('action', None)
    kw.pop('force', None)
    return self.do_action(
        ['clear', 'pipes'],
        connector_keys = pipe.connector_keys,
        metric_keys = pipe.metric_key,
        location_keys = pipe.location_key,
        force = True,
        debug = debug,
        **kw
    )
def create_metadata(self, debug: bool = False) ‑> bool

Create metadata tables.

Returns

A bool indicating success.

Expand source code
def create_metadata(
        self,
        debug: bool = False
    ) -> bool:
    """Create metadata tables.

    Returns
    -------
    A bool indicating success.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.config.static import STATIC_CONFIG
    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
    response = self.post(r_url, debug=debug)
    if debug:
        dprint("Create metadata response: {response.text}")
    try:
        metadata_response = json.loads(response.text)
    except Exception as e:
        warn(f"Failed to create metadata on {self}:\n{e}")
        metadata_response = False
    return False
def delete(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Ahy) ‑> requests.Response

Wrapper for requests.delete.

Expand source code
def delete(
        self,
        r_url : str,
        headers : Optional[Dict[str, Any]] = None,
        use_token : bool = True,
        debug : bool = False,
        **kw : Ahy,
    ) -> requests.Response:
    """Wrapper for `requests.delete`."""
    if debug:
        from meerschaum.utils.debug import dprint
    
    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking token...")
        headers.update({ 'Authorization': f'Bearer {self.token}' })

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending DELETE request to {self.url + r_url}.")

    return self.session.delete(
        self.url + r_url,
        headers = headers,
        **kw
    )
def delete_pipe(self, pipe: Optional[Pipe] = None, debug: bool = None) ‑> SuccessTuple

Delete a Pipe and drop its table.

Expand source code
def delete_pipe(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        debug: bool = None,        
    ) -> SuccessTuple:
    """Delete a Pipe and drop its table."""
    if pipe is None:
        error(f"Pipe cannot be None.")
    r_url = pipe_r_url(pipe)
    response = self.delete(
        r_url + '/delete',
        debug = debug,
    )
    if debug:
        dprint(response.text)
    if isinstance(response.json(), list):
        response_tuple = response.__bool__(), response.json()[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), response.json()['detail']
    else:
        response_tuple = response.__bool__(), response.text
    return response_tuple
def delete_plugin(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> SuccessTuple

Delete a plugin from an API repository.

Expand source code
def delete_plugin(
        self,
        plugin: meerschaum.core.Plugin,
        debug: bool = False
    ) -> SuccessTuple:
    """Delete a plugin from an API repository."""
    import json
    r_url = plugin_r_url(plugin)
    try:
        response = self.delete(r_url, debug=debug)
    except Exception as e:
        return False, f"Failed to delete plugin '{plugin}'."

    try:
        success, msg = json.loads(response.text)
    except Exception as e:
        return False, response.text

    return success, msg
def delete_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Delete a user.

Expand source code
def delete_user(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Delete a user."""
    from meerschaum.config.static import STATIC_CONFIG
    import json
    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}"
    response = self.delete(r_url, debug=debug)
    try:
        _json = json.loads(response.text)
        if isinstance(_json, dict) and 'detail' in _json:
            return False, _json['detail']
        success_tuple = tuple(_json)
    except Exception as e:
        success_tuple = False, f"Failed to delete user '{user.username}'."
    return success_tuple
def do_action(self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) ‑> Tuple[bool, str]

Execute a Meerschaum action remotely.

If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments.

NOTE: The first index of action should NOT be removed! Example: action = ['show', 'config']

Returns: tuple (succeeded : bool, message : str)

Parameters

action : Optional[List[str]] :
(Default value = None)
sysargs : Optional[List[str]] :
(Default value = None)
debug : bool :
(Default value = False)

**kw :

Returns

Expand source code
def do_action(
        self,
        action: Optional[List[str]] = None,
        sysargs: Optional[List[str]] = None,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """Execute a Meerschaum action remotely.
    
    If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments.
    
    NOTE: The first index of `action` should NOT be removed!
    Example: action = ['show', 'config']
    
    Returns: tuple (succeeded : bool, message : str)

    Parameters
    ----------
    action: Optional[List[str]] :
         (Default value = None)
    sysargs: Optional[List[str]] :
         (Default value = None)
    debug: bool :
         (Default value = False)
    **kw :
        

    Returns
    -------

    """
    import sys, json
    from meerschaum.utils.debug import dprint
    from meerschaum.config.static import STATIC_CONFIG
    from meerschaum.utils.misc import json_serialize_datetime
    if action is None:
        action = []

    if sysargs is not None and action and action[0] == '':
        from meerschaum._internal.arguments import parse_arguments
        if debug:
            dprint(f"Parsing sysargs:\n{sysargs}")
        json_dict = parse_arguments(sysargs)
    else:
        json_dict = kw
        json_dict['action'] = action
        json_dict['debug'] = debug

    root_action = json_dict['action'][0]
    del json_dict['action'][0]
    r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}"
    
    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending data to '{self.url + r_url}':")
        pprint(json_dict, stream=sys.stderr)

    response = self.post(
        r_url,
        data = json.dumps(json_dict, default=json_serialize_datetime),
        debug = debug,
    )
    try:
        response_list = json.loads(response.text)
        if isinstance(response_list, dict) and 'detail' in response_list:
            return False, response_list['detail']
    except Exception as e:
        print(f"Invalid response: {response}")
        print(e)
        return False, response.text
    if debug:
        dprint(response)
    try:
        return response_list[0], response_list[1]
    except Exception as e:
        return False, f"Failed to parse result from action '{root_action}'"
def drop_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Drop a pipe's table but maintain its registration.

Parameters

pipe : meerschaum.Pipe:
The pipe to be dropped.

Returns

A success tuple (bool, str).

Expand source code
def drop_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False
    ) -> SuccessTuple:
    """
    Drop a pipe's table but maintain its registration.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to be dropped.
        
    Returns
    -------
    A success tuple (bool, str).
    """
    from meerschaum.utils.warnings import error
    from meerschaum.utils.debug import dprint
    if pipe is None:
        error(f"Pipe cannot be None.")
    r_url = pipe_r_url(pipe)
    response = self.delete(
        r_url + '/drop',
        debug = debug,
    )
    if debug:
        dprint(response.text)

    try:
        data = response.json()
    except Exception as e:
        return False, f"Failed to drop {pipe}."

    if isinstance(data, list):
        response_tuple = response.__bool__(), data[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), data['detail']
    else:
        response_tuple = response.__bool__(), response.text

    return response_tuple
def edit_pipe(self, pipe: Pipe, patch: bool = False, debug: bool = False) ‑> SuccessTuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

Expand source code
def edit_pipe(
        self,
        pipe: meerschaum.Pipe,
        patch: bool = False,
        debug: bool = False,
    ) -> SuccessTuple:
    """Submit a PATCH to the API to edit an existing Pipe object.
    Returns a tuple of (success_bool, response_dict).
    """
    from meerschaum.utils.debug import dprint
    ### NOTE: if `parameters` is supplied in the Pipe constructor,
    ###       then `pipe.parameters` will exist and not be fetched from the database.
    r_url = pipe_r_url(pipe)
    response = self.patch(
        r_url + '/edit',
        params = {'patch': patch,},
        json = pipe.parameters,
        debug = debug,
    )
    if debug:
        dprint(response.text)
    if isinstance(response.json(), list):
        response_tuple = response.__bool__(), response.json()[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), response.json()['detail']
    else:
        response_tuple = response.__bool__(), response.text
    return response_tuple
def edit_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Edit an existing user.

Expand source code
def edit_user(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Edit an existing user."""
    import json
    from meerschaum.config.static import STATIC_CONFIG
    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
    data = {
        'username': user.username,
        'password': user.password,
        'type': user.type,
        'email': user.email,
        'attributes': json.dumps(user.attributes),
    }
    response = self.post(r_url, data=data, debug=debug)
    try:
        _json = json.loads(response.text)
        if isinstance(_json, dict) and 'detail' in _json:
            return False, _json['detail']
        success_tuple = tuple(_json)
    except Exception as e:
        msg = response.text if response else f"Failed to edit user '{user}'."
        return False, msg

    return tuple(success_tuple)
def fetch(self, pipe: mrsm.Pipe, begin: Union[datetime, str, int] = '', end: Union[datetime, int] = None, params: Optional[Dict, Any] = None, debug: bool = False, **kw: Any) ‑> Iterator['pd.DataFrame']

Get the Pipe data from the remote Pipe.

Expand source code
def fetch(
        self,
        pipe: mrsm.Pipe,
        begin: Union[datetime, str, int] = '',
        end: Union[datetime, int] = None,
        params: Optional[Dict, Any] = None,
        debug: bool = False,
        **kw: Any
    ) -> Iterator['pd.DataFrame']:
    """Get the Pipe data from the remote Pipe."""
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn, error
    from meerschaum.config._patch import apply_patch_to_config

    fetch_params = pipe.parameters.get('fetch', {})
    if not fetch_params:
        warn(f"Missing 'fetch' parameters for {pipe}.", stack=False)
        return None

    pipe_meta = fetch_params.get('pipe', {})
    ### Legacy: check for `connector_keys`, etc. at the root.
    if not pipe_meta:
        ck, mk, lk = (
            fetch_params.get('connector_keys', None),
            fetch_params.get('metric_key', None),
            fetch_params.get('location_key', None),
        )
        if not ck or not mk:
            warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False)
            return None

        pipe_meta.update({
            'connector': ck,
            'metric': mk,
            'location': lk,
        })

    pipe_meta['instance'] = self
    source_pipe = mrsm.Pipe(**pipe_meta)

    _params = copy.deepcopy(params) if params is not None else {}
    _params = apply_patch_to_config(_params, fetch_params.get('params', {}))
    select_columns = fetch_params.get('select_columns', [])
    omit_columns = fetch_params.get('omit_columns', [])

    return source_pipe.get_data(
        select_columns = select_columns,
        omit_columns = omit_columns,
        begin = begin,
        end = end,
        params = _params,
        debug = debug,
        as_iterator = True,
    )
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> List[Tuple[str, str, Optional[str]]]

Fetch registered Pipes' keys from the API.

Parameters

connector_keys : Optional[List[str]], default None
The connector keys for the query.
metric_keys : Optional[List[str]], default None
The metric keys for the query.
location_keys : Optional[List[str]], default None
The location keys for the query.
tags : Optional[List[str]], default None
A list of tags for the query.
params : Optional[Dict[str, Any]], default None
A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
debug : bool, default False
Verbosity toggle.

Returns

A list of tuples containing pipes' keys.

Expand source code
def fetch_pipes_keys(
        self,
        connector_keys: Optional[List[str]] = None,
        metric_keys: Optional[List[str]] = None,
        location_keys: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> Union[List[Tuple[str, str, Union[str, None]]]]:
    """
    Fetch registered Pipes' keys from the API.
    
    Parameters
    ----------
    connector_keys: Optional[List[str]], default None
        The connector keys for the query.

    metric_keys: Optional[List[str]], default None
        The metric keys for the query.

    location_keys: Optional[List[str]], default None
        The location keys for the query.

    tags: Optional[List[str]], default None
        A list of tags for the query.

    params: Optional[Dict[str, Any]], default None
        A parameters dictionary for filtering against the `pipes` table
        (e.g. `{'connector_keys': 'plugin:foo'}`).
        Not recommeded to be used.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A list of tuples containing pipes' keys.
    """
    from meerschaum.config.static import STATIC_CONFIG
    if connector_keys is None:
        connector_keys = []
    if metric_keys is None:
        metric_keys = []
    if location_keys is None:
        location_keys = []
    if tags is None:
        tags = []

    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
    try:
        j = self.get(
            r_url,
            params = {
                'connector_keys': json.dumps(connector_keys),
                'metric_keys': json.dumps(metric_keys),
                'location_keys': json.dumps(location_keys),
                'tags': json.dumps(tags),
                'params': json.dumps(params),
            },
            debug=debug
        ).json()
    except Exception as e:
        error(str(e))

    if 'detail' in j:
        error(j['detail'], stack=False)
    return [tuple(r) for r in j]
def get(self, r_url: str, headers: Optional[Dict[str, str]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Reponse

Wrapper for requests.get.

Expand source code
def get(
        self,
        r_url: str,
        headers: Optional[Dict[str, str]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> requests.Reponse:
    """Wrapper for `requests.get`."""
    if debug:
        from meerschaum.utils.debug import dprint

    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking login token.")
        headers.update({'Authorization': f'Bearer {self.token}'})

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending GET request to {self.url + r_url}.")

    return self.session.get(
        self.url + r_url,
        headers = headers,
        **kw
    )
def get_actions(self) ‑> list

Get available actions from the API server

Expand source code
def get_actions(
        self,
    ) -> list:
    """Get available actions from the API server"""
    from meerschaum.config.static import STATIC_CONFIG
    return self.get(STATIC_CONFIG['api']['endpoints']['actions'])
def get_backtrack_data(self, pipe: Pipe, begin: datetime, backtrack_minutes: int = 0, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any) ‑> pandas.DataFrame

Get a Pipe's backtrack data from the API.

Expand source code
def get_backtrack_data(
        self,
        pipe: meerschaum.Pipe,
        begin: datetime,
        backtrack_minutes: int = 0,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw: Any,
    ) -> pandas.DataFrame:
    """Get a Pipe's backtrack data from the API."""
    r_url = pipe_r_url(pipe)
    try:
        response = self.get(
            r_url + "/backtrack_data",
            params = {
                'begin': begin,
                'backtrack_minutes': backtrack_minutes,
                'params': json.dumps(params),
            },
            debug = debug
        )
    except Exception as e:
        warn(f"Failed to parse backtrack data JSON for {pipe}:\n{e}")
        return None

    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.dataframe import parse_df_datetimes
    if debug:
        dprint(response.text)
    pd = import_pandas()
    try:
        df = pd.read_json(StringIO(response.text))
    except Exception as e:
        warn(f"Failed to read response into a dataframe:\n{e}")
        return None

    df = parse_df_datetimes(pd.read_json(StringIO(response.text)), debug=debug)
    return df
def get_chaining_status(self, **kw) ‑> Optional[bool]

Fetch the chaining status of the API instance.

Expand source code
def get_chaining_status(self, **kw) -> Optional[bool]:
    """
    Fetch the chaining status of the API instance.
    """
    from meerschaum.config.static import STATIC_CONFIG
    try:
        response = self.get(
            STATIC_CONFIG['api']['endpoints']['chaining'],
            use_token = True,
            **kw
        )
        if not response:
            return None
    except Exception as e:
        return None

    return response.json()
def get_mrsm_version(self, **kw) ‑> Optional[str]

Return the Meerschaum version of the API instance.

Expand source code
def get_mrsm_version(self, **kw) -> Optional[str]:
    """
    Return the Meerschaum version of the API instance.
    """
    from meerschaum.config.static import STATIC_CONFIG
    try:
        j = self.get(
            STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm',
            use_token = True,
            **kw
        ).json()
    except Exception as e:
        return None
    if isinstance(j, dict) and 'detail' in j:
        return None
    return j
def get_pipe_attributes(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, Any]

Get a Pipe's attributes from the API

Parameters

pipe : Pipe
The pipe whose attributes we are fetching.

Returns

A dictionary of a pipe's attributes. If the pipe does not exist, return an empty dictionary.

Expand source code
def get_pipe_attributes(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Dict[str, Any]:
    """Get a Pipe's attributes from the API

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe whose attributes we are fetching.
        
    Returns
    -------
    A dictionary of a pipe's attributes.
    If the pipe does not exist, return an empty dictionary.
    """
    r_url = pipe_r_url(pipe)
    response = self.get(r_url + '/attributes', debug=debug)
    try:
        return json.loads(response.text)
    except Exception as e:
        warn(f"Failed to get the attributes for {pipe}:\n{e}")
    return {}
def get_pipe_columns_types(self, pipe: Pipe, debug: bool = False) ‑> Union[Dict[str, str], None]

Fetch the columns and types of the pipe's table.

Parameters

pipe : Pipe
The pipe whose columns to be queried.

Returns

A dictionary mapping column names to their database types.

Examples

>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
Expand source code
def get_pipe_columns_types(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Union[Dict[str, str], None]:
    """
    Fetch the columns and types of the pipe's table.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe whose columns to be queried.
        
    Returns
    -------
    A dictionary mapping column names to their database types.

    Examples
    --------
    >>> {
    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
    ...   'id': 'BIGINT',
    ...   'val': 'DOUBLE PRECISION',
    ... }
    >>>
    """
    r_url = pipe_r_url(pipe) + '/columns/types'
    response = self.get(
        r_url,
        debug = debug
    )
    j = response.json()
    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
        from meerschaum.utils.warnings import warn
        warn(j['detail'])
        return None
    if not isinstance(j, dict):
        warn(response.text)
        return None
    return j
def get_pipe_data(self, pipe: Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[str, datetime, int, None] = None, end: Union[str, datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, None]

Fetch data from the API.

Expand source code
def get_pipe_data(
        self,
        pipe: meerschaum.Pipe,
        select_columns: Optional[List[str]] = None,
        omit_columns: Optional[List[str]] = None,
        begin: Union[str, datetime, int, None] = None,
        end: Union[str, datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        as_chunks: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union[pandas.DataFrame, None]:
    """Fetch data from the API."""
    r_url = pipe_r_url(pipe)
    chunks_list = []
    while True:
        try:
            response = self.get(
                r_url + "/data",
                params = {
                    'select_columns': json.dumps(select_columns),
                    'omit_columns': json.dumps(omit_columns),
                    'begin': begin,
                    'end': end,
                    'params': json.dumps(params)
                },
                debug = debug
            )
            if not response.ok:
                return None
            j = response.json()
        except Exception as e:
            warn(f"Failed to get data for {pipe}:\n{e}")
            return None
        if isinstance(j, dict) and 'detail' in j:
            return False, j['detail']
        break

    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df
    pd = import_pandas()
    try:
        df = pd.read_json(StringIO(response.text))
    except Exception as e:
        warn(f"Failed to parse response for {pipe}:\n{e}")
        return None

    if len(df.columns) == 0:
        return add_missing_cols_to_df(df, pipe.dtypes)

    df = parse_df_datetimes(
        df,
        ignore_cols = [
            col
            for col, dtype in pipe.dtypes.items()
            if 'datetime' not in str(dtype)
        ],
        debug = debug,
    )
    return df
def get_pipe_id(self, pipe: meerschuam.Pipe, debug: bool = False) ‑> int

Get a Pipe's ID from the API.

Expand source code
def get_pipe_id(
        self,
        pipe: meerschuam.Pipe,
        debug: bool = False,
    ) -> int:
    """Get a Pipe's ID from the API."""
    from meerschaum.utils.misc import is_int
    r_url = pipe_r_url(pipe)
    response = self.get(
        r_url + '/id',
        debug = debug
    )
    if debug:
        dprint(f"Got pipe ID: {response.text}")
    try:
        if is_int(response.text):
            return int(response.text)
    except Exception as e:
        warn(f"Failed to get the ID for {pipe}:\n{e}")
    return None
def get_pipe_rowcount(self, pipe: "'Pipe'", begin: Optional[datetime] = None, end: Optional[datetime] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) ‑> int

Get a pipe's row count from the API.

Parameters

pipe : 'meerschaum.Pipe':
The pipe whose row count we are counting.
begin : Optional[datetime], default None
If provided, bound the count by this datetime.
end : Optional[datetime]
If provided, bound the count by this datetime.
params : Optional[Dict[str, Any]], default None
If provided, bound the count by these parameters.
remote : bool, default False
 

Returns

The number of rows in the pipe's table, bound the given parameters. If the table does not exist, return 0.

Expand source code
def get_pipe_rowcount(
        self,
        pipe: 'meerschaum.Pipe',
        begin: Optional[datetime] = None,
        end: Optional[datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        remote: bool = False,
        debug: bool = False,
    ) -> int:
    """Get a pipe's row count from the API.

    Parameters
    ----------
    pipe: 'meerschaum.Pipe':
        The pipe whose row count we are counting.
        
    begin: Optional[datetime], default None
        If provided, bound the count by this datetime.

    end: Optional[datetime]
        If provided, bound the count by this datetime.

    params: Optional[Dict[str, Any]], default None
        If provided, bound the count by these parameters.

    remote: bool, default False

    Returns
    -------
    The number of rows in the pipe's table, bound the given parameters.
    If the table does not exist, return 0.
    """
    r_url = pipe_r_url(pipe)
    response = self.get(
        r_url + "/rowcount",
        json = params,
        params = {
            'begin': begin,
            'end': end,
            'remote': remote,
        },
        debug = debug
    )
    if not response:
        warn(f"Failed to get the rowcount for {pipe}:\n{response.text}")
    try:
        return int(json.loads(response.text))
    except Exception as e:
        warn(f"Failed to get the rowcount for {pipe}:\n{e}")
    return 0
def get_plugin_attributes(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> Mapping[str, Any]

Return a plugin's attributes.

Expand source code
def get_plugin_attributes(
        self,
        plugin: meerschaum.core.Plugin,
        debug: bool = False
    ) -> Mapping[str, Any]:
    """
    Return a plugin's attributes.
    """
    import json
    from meerschaum.utils.warnings import warn, error
    r_url = plugin_r_url(plugin) + '/attributes'
    response = self.get(r_url, use_token=True, debug=debug)
    attributes = response.json()
    if isinstance(attributes, str) and attributes and attributes[0] == '{':
        try:
            attributes = json.loads(attributes)
        except Exception as e:
            pass
    if not isinstance(attributes, dict):
        error(response.text)
    elif not response and 'detail' in attributes:
        warn(attributes['detail'])
        return {}
    return attributes
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) ‑> Sequence[str]

Return a list of registered plugin names.

Parameters

user_id :
If specified, return all plugins from a certain user.
user_id : Optional[int] :
(Default value = None)
search_term : Optional[str] :
(Default value = None)
debug : bool :
(Default value = False)

Returns

Expand source code
def get_plugins(
        self,
        user_id : Optional[int] = None,
        search_term : Optional[str] = None,
        debug : bool = False
    ) -> Sequence[str]:
    """Return a list of registered plugin names.

    Parameters
    ----------
    user_id :
        If specified, return all plugins from a certain user.
    user_id : Optional[int] :
         (Default value = None)
    search_term : Optional[str] :
         (Default value = None)
    debug : bool :
         (Default value = False)

    Returns
    -------

    """
    import json
    from meerschaum.utils.warnings import warn, error
    from meerschaum.config.static import STATIC_CONFIG
    response = self.get(
        STATIC_CONFIG['api']['endpoints']['plugins'],
        params = {'user_id' : user_id, 'search_term' : search_term},
        use_token = True,
        debug = debug
    )
    if not response:
        return []
    plugins = json.loads(response.text)
    if not isinstance(plugins, list):
        error(response.text)
    return plugins
def get_sync_time(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) ‑> Union[datetime, int, None]

Get a Pipe's most recent datetime value from the API.

Parameters

pipe : Pipe
The pipe to select from.
params : Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause.
newest : bool, default True
If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).

Returns

The most recent (or oldest if newest is False) datetime of a pipe, rounded down to the closest minute.

Expand source code
def get_sync_time(
        self,
        pipe: 'meerschaum.Pipe',
        params: Optional[Dict[str, Any]] = None,
        newest: bool = True,
        debug: bool = False,
    ) -> Union[datetime, int, None]:
    """Get a Pipe's most recent datetime value from the API.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to select from.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the WHERE clause.

    newest: bool, default True
        If `True`, get the most recent datetime (honoring `params`).
        If `False`, get the oldest datetime (ASC instead of DESC).

    Returns
    -------
    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
    rounded down to the closest minute.
    """
    from meerschaum.utils.misc import is_int
    from meerschaum.utils.warnings import warn
    r_url = pipe_r_url(pipe)
    response = self.get(
        r_url + '/sync_time',
        json = params,
        params = {'newest': newest, 'debug': debug},
        debug = debug,
    )
    if not response:
        warn(f"Failed to get the sync time for {pipe}:\n" + response.text)
        return None

    j = response.json()
    if j is None:
        dt = None
    else:
        try:
            dt = (
                datetime.fromisoformat(j)
                if not is_int(j)
                else int(j)
            )
        except Exception as e:
            warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}")
            dt = None
    return dt
def get_user_attributes(self, user: "'meerschaum.core.User'", debug: bool = False, **kw) ‑> int

Get a user's attributes.

Expand source code
def get_user_attributes(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw
    ) -> int:
    """Get a user's attributes."""
    from meerschaum.config.static import STATIC_CONFIG
    import json
    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes"
    response = self.get(r_url, debug=debug, **kw)
    try:
        attributes = json.loads(response.text)
    except Exception as e:
        attributes = None
    return attributes
def get_user_id(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[int]

Get a user's ID.

Expand source code
def get_user_id(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> Optional[int]:
    """Get a user's ID."""
    from meerschaum.config.static import STATIC_CONFIG
    import json
    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id"
    response = self.get(r_url, debug=debug, **kw)
    try:
        user_id = int(json.loads(response.text))
    except Exception as e:
        user_id = None
    return user_id
def get_user_password_hash(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]

If configured, get a user's password hash.

Expand source code
def get_user_password_hash(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """If configured, get a user's password hash."""
    from meerschaum.config.static import STATIC_CONFIG
    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
    response = self.get(r_url, debug=debug, **kw)
    if not response:
        return None
    return response.json()
def get_user_type(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]

If configured, get a user's type.

Expand source code
def get_user_type(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """If configured, get a user's type."""
    from meerschaum.config.static import STATIC_CONFIG
    r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type'
    response = self.get(r_url, debug=debug, **kw)
    if not response:
        return None
    return response.json()
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]

Return a list of registered usernames.

Expand source code
def get_users(
        self,
        debug: bool = False,
        **kw : Any
    ) -> List[str]:
    """
    Return a list of registered usernames.
    """
    from meerschaum.config.static import STATIC_CONFIG
    import json
    response = self.get(
        f"{STATIC_CONFIG['api']['endpoints']['users']}",
        debug = debug,
        use_token = True,
    )
    if not response:
        return []
    try:
        return response.json()
    except Exception as e:
        return []
def install_plugin(self, name: str, force: bool = False, debug: bool = False) ‑> Tuple[bool, str]

Download and attempt to install a plugin from the API.

Expand source code
def install_plugin(
        self,
        name: str,
        force: bool = False,
        debug: bool = False
    ) -> SuccessTuple:
    """Download and attempt to install a plugin from the API."""
    import os, pathlib, json
    from meerschaum.core import Plugin
    from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
    r_url = plugin_r_url(name)
    dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
    if debug:
        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
    archive_path = self.wget(r_url, dest, debug=debug) 
    is_binary = binaryornot_check.is_binary(str(archive_path))
    if not is_binary:
        fail_msg = f"Failed to download binary for plugin '{name}'."
        try:
            with open(archive_path, 'r') as f:
                j = json.load(f)
            if isinstance(j, list):
                success, msg = tuple(j)
            elif isinstance(j, dict) and 'detail' in j:
                success, msg = False, fail_msg
        except Exception as e:
            success, msg = False, fail_msg
        return success, msg
    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
    return plugin.install(force=force, debug=debug)
def login(self, debug: bool = False, warn: bool = True, **kw: Any) ‑> Tuple[bool, str]

Log in and set the session token.

Expand source code
def login(
        self,
        debug: bool = False,
        warn: bool = True,
        **kw: Any
    ) -> SuccessTuple:
    """Log in and set the session token."""
    from meerschaum.utils.warnings import warn as _warn, info, error
    from meerschaum.core import User
    from meerschaum.config.static import STATIC_CONFIG
    import json, datetime
    try:
        login_data = {
            'username': self.username,
            'password': self.password,
        }
    except AttributeError:
        return False, f"Please login with the command `login {self}`."
    response = self.post(
        STATIC_CONFIG['api']['endpoints']['login'],
        data = login_data,
        use_token = False,
        debug = debug
    )
    if response:
        msg = f"Successfully logged into '{self}' as user '{login_data['username']}'."
        self._token = json.loads(response.text)['access_token']
        self._expires = datetime.datetime.strptime(
            json.loads(response.text)['expires'], 
            '%Y-%m-%dT%H:%M:%S.%f'
        )
    else:
        msg = (
            f"Failed to log into '{self}' as user '{login_data['username']}'.\n" +
            f"    Please verify login details for connector '{self}'."
        )
        if warn:
            _warn(msg, stack=False)

    return response.__bool__(), msg
def patch(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response

Wrapper for requests.patch.

Expand source code
def patch(
        self,
        r_url: str,
        headers: Optional[Dict[str, Any]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> requests.Response:
    """Wrapper for `requests.patch`."""
    if debug:
        from meerschaum.utils.debug import dprint

    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking login token.")
        headers.update({ 'Authorization': f'Bearer {self.token}' })

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending PATCH request to {self.url + r_url}")

    return self.session.patch(
        self.url + r_url,
        headers = headers,
        **kw
    )
def pipe_exists(self, pipe: "'Pipe'", debug: bool = False) ‑> bool

Check the API to see if a Pipe exists.

Parameters

pipe : 'meerschaum.Pipe'
The pipe which were are querying.

Returns

A bool indicating whether a pipe's underlying table exists.

Expand source code
def pipe_exists(
        self,
        pipe: 'meerschaum.Pipe',
        debug: bool = False
    ) -> bool:
    """Check the API to see if a Pipe exists.

    Parameters
    ----------
    pipe: 'meerschaum.Pipe'
        The pipe which were are querying.
        
    Returns
    -------
    A bool indicating whether a pipe's underlying table exists.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    r_url = pipe_r_url(pipe)
    response = self.get(r_url + '/exists', debug=debug)
    if not response:
        warn(f"Failed to check if {pipe} exists:\n{response.text}")
        return False
    if debug:
        dprint("Received response: " + str(response.text))
    j = response.json()
    if isinstance(j, dict) and 'detail' in j:
        warn(j['detail'])
    return j
def post(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response

Wrapper for requests.post.

Expand source code
def post(
        self,
        r_url: str,
        headers: Optional[Dict[str, Any]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> requests.Response:
    """Wrapper for `requests.post`."""
    if debug:
        from meerschaum.utils.debug import dprint

    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking token...")
        headers.update({'Authorization': f'Bearer {self.token}'})

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending POST request to {self.url + r_url}")

    return self.session.post(
        self.url + r_url,
        headers = headers,
        **kw
    )
def register_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

Expand source code
def register_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False
    ) -> SuccessTuple:
    """Submit a POST to the API to register a new Pipe object.
    Returns a tuple of (success_bool, response_dict).
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.config.static import STATIC_CONFIG
    ### NOTE: if `parameters` is supplied in the Pipe constructor,
    ###       then `pipe.parameters` will exist and not be fetched from the database.
    r_url = pipe_r_url(pipe)
    response = self.post(
        r_url + '/register',
        json = pipe.parameters,
        debug = debug,
    )
    if debug:
        dprint(response.text)
    if isinstance(response.json(), list):
        response_tuple = response.__bool__(), response.json()[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), response.json()['detail']
    else:
        response_tuple = response.__bool__(), response.text
    return response_tuple
def register_plugin(self, plugin: meerschaum.core.Plugin, make_archive: bool = True, debug: bool = False) ‑> SuccessTuple

Register a plugin and upload its archive.

Expand source code
def register_plugin(
        self,
        plugin: meerschaum.core.Plugin,
        make_archive: bool = True,
        debug: bool = False,
    ) -> SuccessTuple:
    """Register a plugin and upload its archive."""
    import json
    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
    file_pointer = open(archive_path, 'rb')
    files = {'archive': file_pointer}
    metadata = {
        'version': plugin.version,
        'attributes': json.dumps(plugin.attributes),
    }
    r_url = plugin_r_url(plugin)
    try:
        response = self.post(r_url, files=files, params=metadata, debug=debug)
    except Exception as e:
        return False, f"Failed to register plugin '{plugin}'."
    finally:
        file_pointer.close()

    try:
        success, msg = json.loads(response.text)
    except Exception as e:
        return False, response.text

    return success, msg
def register_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Register a new user.

Expand source code
def register_user(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Register a new user."""
    import json
    from meerschaum.config.static import STATIC_CONFIG
    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register"
    data = {
        'username': user.username,
        'password': user.password,
        'attributes': json.dumps(user.attributes),
    }
    if user.type:
        data['type'] = user.type
    if user.email:
        data['email'] = user.email
    response = self.post(r_url, data=data, debug=debug)
    try:
        _json = json.loads(response.text)
        if isinstance(_json, dict) and 'detail' in _json:
            return False, _json['detail']
        success_tuple = tuple(_json)
    except Exception:
        msg = response.text if response else f"Failed to register user '{user}'."
        return False, msg

    return tuple(success_tuple)
def sync_pipe(self, pipe: Optional[Pipe] = None, df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) ‑> SuccessTuple

Sync a DataFrame into a Pipe.

Expand source code
def sync_pipe(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None,
        chunksize: Optional[int] = -1,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Sync a DataFrame into a Pipe."""
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.misc import json_serialize_datetime
    from meerschaum.config import get_config
    from meerschaum.utils.packages import attempt_import
    begin = time.time()
    more_itertools = attempt_import('more_itertools')
    if df is None:
        msg = f"DataFrame is `None`. Cannot sync {pipe}."
        return False, msg

    def get_json_str(c):
        ### allow syncing dict or JSON without needing to import pandas (for IOT devices)
        return (
            json.dumps(c, default=json_serialize_datetime) if isinstance(c, (dict, list))
            else c.to_json(date_format='iso', date_unit='ns')
        )

    df = json.loads(df) if isinstance(df, str) else df

    _chunksize: Optional[int] = (1 if chunksize is None else (
        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
        else chunksize
    ))
    keys: list = list(df.columns)
    chunks = []
    if hasattr(df, 'index'):
        df = df.reset_index(drop=True)
        is_dask = 'dask' in df.__module__
        chunks = (
            (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize))
            if not is_dask
            else [partition.compute() for partition in df.partitions]
        )
    elif isinstance(df, dict):
        ### `_chunks` is a dict of lists of dicts.
        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
        _chunks = {k: [] for k in keys}
        for k in keys:
            chunk_iter = more_itertools.chunked(df[k], _chunksize)
            for l in chunk_iter:
                _chunks[k].append({k: l})

        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
        for k, l in _chunks.items():
            for i, c in enumerate(l):
                try:
                    chunks[i].update(c)
                except IndexError:
                    chunks.append(c)
    elif isinstance(df, list):
        chunks = (df[i] for i in more_itertools.chunked(df, _chunksize))

    ### Send columns in case the user has defined them locally.
    if pipe.columns:
        kw['columns'] = json.dumps(pipe.columns)
    r_url = pipe_r_url(pipe) + '/data'

    rowcount = 0
    num_success_chunks = 0
    for i, c in enumerate(chunks):
        if debug:
            dprint(f"[{self}] Posting chunk {i} to {r_url}...")
        if len(c) == 0:
            if debug:
                dprint(f"[{self}] Skipping empty chunk...")
            continue
        json_str = get_json_str(c)

        try:
            response = self.post(
                r_url,
                ### handles check_existing
                params = kw,
                data = json_str,
                debug = debug
            )
        except Exception as e:
            msg = f"Failed to post a chunk to {pipe}:\n{e}"
            warn(msg)
            return False, msg
            
        if not response:
            return False, f"Failed to sync a chunk:\n{response.text}"

        try:
            j = json.loads(response.text)
        except Exception as e:
            return False, f"Failed to parse response from syncing {pipe}:\n{e}"

        if isinstance(j, dict) and 'detail' in j:
            return False, j['detail']

        try:
            j = tuple(j)
        except Exception as e:
            return False, response.text

        if debug:
            dprint("Received response: " + str(j))
        if not j[0]:
            return j

        rowcount += len(c)
        num_success_chunks += 1

    success_tuple = True, (
        f"It took {round(time.time() - begin, 2)} seconds to sync {rowcount} row"
        + ('s' if rowcount != 1 else '')
        + f" across {num_success_chunks} chunk" + ('s' if num_success_chunks != 1 else '') +
        f" to {pipe}."
    )
    return success_tuple
def test_connection(self, **kw: Any) ‑> Optional[bool]

Test if a successful connection to the API may be made.

Expand source code
def test_connection(
        self,
        **kw: Any
    ) -> Union[bool, None]:
    """Test if a successful connection to the API may be made."""
    from meerschaum.connectors.poll import retry_connect
    _default_kw = {
        'max_retries': 1, 'retry_wait': 0, 'warn': False,
        'connector': self, 'enforce_chaining': False,
        'enforce_login': False,
    }
    _default_kw.update(kw)
    try:
        return retry_connect(**_default_kw)
    except Exception as e:
        return False
def wget(self, r_url: str, dest: Optional[Union[str, pathlib.Path]] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> pathlib.Path

Mimic wget with requests.

Expand source code
def wget(
        self,
        r_url: str,
        dest: Optional[Union[str, pathlib.Path]] = None,
        headers: Optional[Dict[str, Any]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> pathlib.Path:
    """Mimic wget with requests.
    """
    from meerschaum.utils.misc import wget
    from meerschaum.utils.debug import dprint
    if headers is None:
        headers = {}
    if use_token:
        if debug:
            dprint(f"Checking login token.")
        headers.update({'Authorization': f'Bearer {self.token}'})
    return wget(self.url + r_url, dest=dest, headers=headers, **kw)