Module meerschaum.connectors.api.APIConnector
Interact with Meerschaum APIs. May be chained together (see 'meerschaum:api_instance' in your config.yaml).
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Interact with Meerschaum APIs. May be chained together (see 'meerschaum:api_instance' in your config.yaml).
"""
import datetime
from meerschaum.utils.typing import Optional
from meerschaum.connectors import Connector
from meerschaum.utils.warnings import warn, error
from meerschaum.utils.packages import attempt_import
required_attributes = {
'host',
}
class APIConnector(Connector):
"""
Connect to a Meerschaum API instance.
"""
IS_INSTANCE: bool = True
IS_THREAD_SAFE: bool = False
from ._delete import delete
from ._post import post
from ._patch import patch
from ._get import get, wget
from ._actions import get_actions, do_action
from ._misc import get_mrsm_version, get_chaining_status
from ._pipes import (
register_pipe,
fetch_pipes_keys,
edit_pipe,
sync_pipe,
delete_pipe,
get_pipe_data,
get_backtrack_data,
get_pipe_id,
get_pipe_attributes,
get_sync_time,
pipe_exists,
create_metadata,
get_pipe_rowcount,
drop_pipe,
clear_pipe,
get_pipe_columns_types,
)
from ._fetch import fetch
from ._plugins import (
register_plugin,
install_plugin,
delete_plugin,
get_plugins,
get_plugin_attributes,
)
from ._login import login, test_connection
from ._users import (
register_user,
get_user_id,
get_users,
edit_user,
delete_user,
get_user_password_hash,
get_user_type,
get_user_attributes,
)
from ._uri import from_uri
def __init__(
self,
label: Optional[str] = None,
wait: bool = False,
debug: bool = False,
**kw
):
if 'uri' in kw:
from_uri_params = self.from_uri(kw['uri'], as_dict=True)
label = label or from_uri_params.get('label', None)
from_uri_params.pop('label', None)
kw.update(from_uri_params)
super().__init__('api', label=label, **kw)
if 'protocol' not in self.__dict__:
self.protocol = 'http'
if 'port' not in self.__dict__:
self.port = 8000
if 'uri' not in self.__dict__:
self.verify_attributes(required_attributes)
else:
from meerschaum.connectors.sql import SQLConnector
conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
if 'host' not in conn_attrs:
raise Exception(f"Invalid URI for '{self}'.")
self.__dict__.update(conn_attrs)
self.url = (
self.protocol + '://' +
self.host + ':' +
str(self.port)
)
self._token = None
self._expires = None
self._session = None
@property
def URI(self) -> str:
"""
Return the fully qualified URI.
"""
username = self.__dict__.get('username', None)
password = self.__dict__.get('password', None)
creds = (username + ':' + password + '@') if username and password else ''
return (
self.protocol + '://' + creds
+ self.host + ':' + str(self.port)
)
@property
def session(self):
if self._session is None:
requests = attempt_import('requests')
if requests:
self._session = requests.Session()
if self._session is None:
error(f"Failed to import requests. Is requests installed?")
return self._session
@property
def token(self):
expired = (
True if self._expires is None else (
(self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1))
)
)
if self._token is None or expired:
success, msg = self.login()
if not success:
warn(msg, stack=False)
return self._token
Classes
class APIConnector (label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)
-
Connect to a Meerschaum API instance.
Parameters
type
:str
- The
type
of the connector (e.g.sql
,api
,plugin
). label
:str
- The
label
for the connector.
Run
mrsm edit config
and to edit connectors in the YAML file:meerschaum: connections: {type}: {label}: ### attributes go here
Expand source code
class APIConnector(Connector): """ Connect to a Meerschaum API instance. """ IS_INSTANCE: bool = True IS_THREAD_SAFE: bool = False from ._delete import delete from ._post import post from ._patch import patch from ._get import get, wget from ._actions import get_actions, do_action from ._misc import get_mrsm_version, get_chaining_status from ._pipes import ( register_pipe, fetch_pipes_keys, edit_pipe, sync_pipe, delete_pipe, get_pipe_data, get_backtrack_data, get_pipe_id, get_pipe_attributes, get_sync_time, pipe_exists, create_metadata, get_pipe_rowcount, drop_pipe, clear_pipe, get_pipe_columns_types, ) from ._fetch import fetch from ._plugins import ( register_plugin, install_plugin, delete_plugin, get_plugins, get_plugin_attributes, ) from ._login import login, test_connection from ._users import ( register_user, get_user_id, get_users, edit_user, delete_user, get_user_password_hash, get_user_type, get_user_attributes, ) from ._uri import from_uri def __init__( self, label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw ): if 'uri' in kw: from_uri_params = self.from_uri(kw['uri'], as_dict=True) label = label or from_uri_params.get('label', None) from_uri_params.pop('label', None) kw.update(from_uri_params) super().__init__('api', label=label, **kw) if 'protocol' not in self.__dict__: self.protocol = 'http' if 'port' not in self.__dict__: self.port = 8000 if 'uri' not in self.__dict__: self.verify_attributes(required_attributes) else: from meerschaum.connectors.sql import SQLConnector conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) if 'host' not in conn_attrs: raise Exception(f"Invalid URI for '{self}'.") self.__dict__.update(conn_attrs) self.url = ( self.protocol + '://' + self.host + ':' + str(self.port) ) self._token = None self._expires = None self._session = None @property def URI(self) -> str: """ Return the fully qualified URI. """ username = self.__dict__.get('username', None) password = self.__dict__.get('password', None) creds = (username + ':' + password + '@') if username and password else '' return ( self.protocol + '://' + creds + self.host + ':' + str(self.port) ) @property def session(self): if self._session is None: requests = attempt_import('requests') if requests: self._session = requests.Session() if self._session is None: error(f"Failed to import requests. Is requests installed?") return self._session @property def token(self): expired = ( True if self._expires is None else ( (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1)) ) ) if self._token is None or expired: success, msg = self.login() if not success: warn(msg, stack=False) return self._token
Ancestors
- meerschaum.connectors.Connector.Connector
Class variables
var IS_INSTANCE : bool
var IS_THREAD_SAFE : bool
Static methods
def from_uri(uri: str, label: Optional[str] = None, as_dict: bool = False) ‑> Union[APIConnector, Dict[str, Union[str, int]]]
-
Create a new APIConnector from a URI string.
Parameters
uri
:str
- The URI connection string.
label
:Optional[str]
, defaultNone
- If provided, use this as the connector label. Otherwise use the determined database name.
as_dict
:bool
, defaultFalse
- If
True
, return a dictionary of the keyword arguments necessary to create a newAPIConnector
, otherwise create a new object.
Returns
A new APIConnector object or a dictionary of attributes (if
as_dict
isTrue
).Expand source code
@classmethod def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False, ) -> Union[ 'meerschaum.connectors.APIConnector', Dict[str, Union[str, int]], ]: """ Create a new APIConnector from a URI string. Parameters ---------- uri: str The URI connection string. label: Optional[str], default None If provided, use this as the connector label. Otherwise use the determined database name. as_dict: bool, default False If `True`, return a dictionary of the keyword arguments necessary to create a new `APIConnector`, otherwise create a new object. Returns ------- A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`). """ from meerschaum.connectors.sql import SQLConnector params = SQLConnector.parse_uri(uri) if 'host' not in params: error("No host was found in the provided URI.") params['protocol'] = params.pop('flavor') params['label'] = label or ( ( (params['username'] + '@' if 'username' in params else '') + params['host'] ).lower() ) return cls(**params) if not as_dict else params
Instance variables
var URI : str
-
Return the fully qualified URI.
Expand source code
@property def URI(self) -> str: """ Return the fully qualified URI. """ username = self.__dict__.get('username', None) password = self.__dict__.get('password', None) creds = (username + ':' + password + '@') if username and password else '' return ( self.protocol + '://' + creds + self.host + ':' + str(self.port) )
var session
-
Expand source code
@property def session(self): if self._session is None: requests = attempt_import('requests') if requests: self._session = requests.Session() if self._session is None: error(f"Failed to import requests. Is requests installed?") return self._session
var token
-
Expand source code
@property def token(self): expired = ( True if self._expires is None else ( (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1)) ) ) if self._token is None or expired: success, msg = self.login() if not success: warn(msg, stack=False) return self._token
Methods
def clear_pipe(self, pipe: Pipe, debug: bool = False, **kw) ‑> SuccessTuple
-
Delete rows in a pipe's table.
Parameters
pipe
:Pipe
- The pipe with rows to be deleted.
Returns
A success tuple.
Expand source code
def clear_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw ) -> SuccessTuple: """ Delete rows in a pipe's table. Parameters ---------- pipe: meerschaum.Pipe The pipe with rows to be deleted. Returns ------- A success tuple. """ kw.pop('metric_keys', None) kw.pop('connector_keys', None) kw.pop('location_keys', None) kw.pop('action', None) kw.pop('force', None) return self.do_action( ['clear', 'pipes'], connector_keys = pipe.connector_keys, metric_keys = pipe.metric_key, location_keys = pipe.location_key, force = True, debug = debug, **kw )
def create_metadata(self, debug: bool = False) ‑> bool
-
Create metadata tables.
Returns
A bool indicating success.
Expand source code
def create_metadata( self, debug: bool = False ) -> bool: """Create metadata tables. Returns ------- A bool indicating success. """ from meerschaum.utils.debug import dprint from meerschaum.config.static import STATIC_CONFIG import json r_url = STATIC_CONFIG['api']['endpoints']['metadata'] response = self.post(r_url, debug=debug) if debug: dprint("Create metadata response: {response.text}") try: metadata_response = json.loads(response.text) except Exception as e: metadata_response = False return False
def delete(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Ahy) ‑> requests.Response
-
Wrapper for
requests.delete
.Expand source code
def delete( self, r_url : str, headers : Optional[Dict[str, Any]] = None, use_token : bool = True, debug : bool = False, **kw : Ahy, ) -> requests.Response: """Wrapper for `requests.delete`.""" if debug: from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking token...") headers.update({ 'Authorization': f'Bearer {self.token}' }) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending DELETE request to {self.url + r_url}.") return self.session.delete( self.url + r_url, headers = headers, **kw )
def delete_pipe(self, pipe: Optional[Pipe] = None, debug: bool = None) ‑> SuccessTuple
-
Delete a Pipe and drop its table.
Expand source code
def delete_pipe( self, pipe: Optional[meerschaum.Pipe] = None, debug: bool = None, ) -> SuccessTuple: """Delete a Pipe and drop its table.""" from meerschaum.utils.warnings import error from meerschaum.utils.debug import dprint if pipe is None: error(f"Pipe cannot be None.") r_url = pipe_r_url(pipe) response = self.delete( r_url + '/delete', debug = debug, ) if debug: dprint(response.text) if isinstance(response.json(), list): response_tuple = response.__bool__(), response.json()[1] elif 'detail' in response.json(): response_tuple = response.__bool__(), response.json()['detail'] else: response_tuple = response.__bool__(), response.text return response_tuple
def delete_plugin(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> SuccessTuple
-
Delete a plugin from an API repository.
Expand source code
def delete_plugin( self, plugin: meerschaum.core.Plugin, debug: bool = False ) -> SuccessTuple: """Delete a plugin from an API repository.""" import json r_url = plugin_r_url(plugin) try: response = self.delete(r_url, debug=debug) except Exception as e: return False, f"Failed to delete plugin '{plugin}'." try: success, msg = json.loads(response.text) except Exception as e: return False, response.text return success, msg
def delete_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Delete a user.
Expand source code
def delete_user( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> SuccessTuple: """Delete a user.""" from meerschaum.config.static import _static_config import json r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}" response = self.delete(r_url, debug=debug) try: _json = json.loads(response.text) if isinstance(_json, dict) and 'detail' in _json: return False, _json['detail'] success_tuple = tuple(_json) except Exception as e: success_tuple = False, f"Failed to delete user '{user.username}'." return success_tuple
def do_action(self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) ‑> Tuple[bool, str]
-
Execute a Meerschaum action remotely.
If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments.
NOTE: The first index of
action
should NOT be removed! Example: action = ['show', 'config']Returns: tuple (succeeded : bool, message : str)
Parameters
action
:Optional[List[str]] :
- (Default value = None)
sysargs
:Optional[List[str]] :
- (Default value = None)
debug
:bool :
- (Default value = False)
**kw :
Returns
Expand source code
def do_action( self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw ) -> SuccessTuple: """Execute a Meerschaum action remotely. If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments. NOTE: The first index of `action` should NOT be removed! Example: action = ['show', 'config'] Returns: tuple (succeeded : bool, message : str) Parameters ---------- action: Optional[List[str]] : (Default value = None) sysargs: Optional[List[str]] : (Default value = None) debug: bool : (Default value = False) **kw : Returns ------- """ import sys, json from meerschaum.utils.debug import dprint from meerschaum.config.static import _static_config from meerschaum.utils.misc import json_serialize_datetime if action is None: action = [] if sysargs is not None and action and action[0] == '': from meerschaum._internal.arguments import parse_arguments if debug: dprint(f"Parsing sysargs:\n{sysargs}") json_dict = parse_arguments(sysargs) else: json_dict = kw json_dict['action'] = action json_dict['debug'] = debug root_action = json_dict['action'][0] del json_dict['action'][0] r_url = f"{_static_config()['api']['endpoints']['actions']}/{root_action}" if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending data to '{self.url + r_url}':") pprint(json_dict, stream=sys.stderr) response = self.post( r_url, data = json.dumps(json_dict, default=json_serialize_datetime), debug = debug, ) try: response_list = json.loads(response.text) if isinstance(response_list, dict) and 'detail' in response_list: return False, response_list['detail'] except Exception as e: print(f"Invalid response: {response}") print(e) return False, response.text if debug: dprint(response) try: return response_list[0], response_list[1] except Exception as e: return False, f"Failed to parse result from action '{root_action}'"
def drop_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple
-
Drop a pipe's table but maintain its registration.
Parameters
pipe
:meerschaum.Pipe:
- The pipe to be dropped.
Returns
A success tuple (bool, str).
Expand source code
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False ) -> SuccessTuple: """Drop a pipe's table but maintain its registration. Parameters ---------- pipe: meerschaum.Pipe: The pipe to be dropped. Returns ------- A success tuple (bool, str). """ return self.do_action( ['drop', 'pipes'], connector_keys = pipe.connector_keys, metric_keys = pipe.metric_key, location_keys = pipe.location_key, force = True, debug = debug )
def edit_pipe(self, pipe: Pipe, patch: bool = False, debug: bool = False) ‑> SuccessTuple
-
Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).
Expand source code
def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False, ) -> SuccessTuple: """Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict). """ from meerschaum.utils.debug import dprint ### NOTE: if `parameters` is supplied in the Pipe constructor, ### then `pipe.parameters` will exist and not be fetched from the database. r_url = pipe_r_url(pipe) response = self.patch( r_url + '/edit', params = {'patch': patch,}, json = pipe.parameters, debug = debug, ) if debug: dprint(response.text) if isinstance(response.json(), list): response_tuple = response.__bool__(), response.json()[1] elif 'detail' in response.json(): response_tuple = response.__bool__(), response.json()['detail'] else: response_tuple = response.__bool__(), response.text return response_tuple
def edit_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Edit an existing user.
Expand source code
def edit_user( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> SuccessTuple: """Edit an existing user.""" import json from meerschaum.config.static import STATIC_CONFIG r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit" data = { 'username': user.username, 'password': user.password, 'type': user.type, 'email': user.email, 'attributes': json.dumps(user.attributes), } response = self.post(r_url, data=data, debug=debug) try: _json = json.loads(response.text) if isinstance(_json, dict) and 'detail' in _json: return False, _json['detail'] success_tuple = tuple(_json) except Exception as e: msg = response.text if response else f"Failed to edit user '{user}'." return False, msg return tuple(success_tuple)
def fetch(self, pipe: Pipe, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, params: Optional[Dict, Any] = None, debug: bool = False, **kw: Any) ‑> pandas.DataFrame
-
Get the Pipe data from the remote Pipe.
Expand source code
def fetch( self, pipe: meerschaum.Pipe, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, params: Optional[Dict, Any] = None, debug: bool = False, **kw: Any ) -> pandas.DataFrame: """Get the Pipe data from the remote Pipe.""" from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn, error from meerschaum.config.static import _static_config from meerschaum.config._patch import apply_patch_to_config if 'fetch' not in pipe.parameters: warn(f"Missing 'fetch' parameters for Pipe '{pipe}'.", stack=False) return None instructions = pipe.parameters['fetch'] if 'connector_keys' not in instructions: warn(f"Missing connector_keys in fetch parameters for Pipe '{pipe}'", stack=False) return None remote_connector_keys = instructions.get('connector_keys', None) if 'metric_key' not in instructions: warn(f"Missing metric_key in fetch parameters for Pipe '{pipe}'", stack=False) return None remote_metric_key = instructions.get('metric_key', None) remote_location_key = instructions.get('location_key', None) if begin is None: begin = pipe.sync_time _params = copy.deepcopy(params) if params is not None else {} _params = apply_patch_to_config(_params, instructions.get('params', {})) from meerschaum import Pipe p = Pipe( remote_connector_keys, remote_metric_key, remote_location_key, mrsm_instance = self ) begin = ( begin if not (isinstance(begin, str) and begin == '') else pipe.get_sync_time(debug=debug) ) return p.get_data( begin=begin, end=end, params=_params, debug=debug )
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> List[Tuple[str, str, Optional[str]]]
-
Fetch registered Pipes' keys from the API.
Parameters
connector_keys
:Optional[List[str]]
, defaultNone
- The connector keys for the query.
metric_keys
:Optional[List[str]]
, defaultNone
- The metric keys for the query.
location_keys
:Optional[List[str]]
, defaultNone
- The location keys for the query.
tags
:Optional[List[str]]
, defaultNone
- A list of tags for the query.
params
:Optional[Dict[str, Any]]
, defaultNone
- A parameters dictionary for filtering against the
pipes
table (e.g.{'connector_keys': 'plugin:foo'}
). Not recommeded to be used. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A list of tuples containing pipes' keys.
Expand source code
def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False ) -> Union[List[Tuple[str, str, Union[str, None]]]]: """ Fetch registered Pipes' keys from the API. Parameters ---------- connector_keys: Optional[List[str]], default None The connector keys for the query. metric_keys: Optional[List[str]], default None The metric keys for the query. location_keys: Optional[List[str]], default None The location keys for the query. tags: Optional[List[str]], default None A list of tags for the query. params: Optional[Dict[str, Any]], default None A parameters dictionary for filtering against the `pipes` table (e.g. `{'connector_keys': 'plugin:foo'}`). Not recommeded to be used. debug: bool, default False Verbosity toggle. Returns ------- A list of tuples containing pipes' keys. """ from meerschaum.utils.warnings import error from meerschaum.config.static import STATIC_CONFIG import json if connector_keys is None: connector_keys = [] if metric_keys is None: metric_keys = [] if location_keys is None: location_keys = [] if tags is None: tags = [] r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys' try: j = self.get( r_url, params = { 'connector_keys': json.dumps(connector_keys), 'metric_keys': json.dumps(metric_keys), 'location_keys': json.dumps(location_keys), 'tags': json.dumps(tags), 'params': json.dumps(params), }, debug=debug ).json() except Exception as e: error(str(e)) if 'detail' in j: error(j['detail'], stack=False) return [tuple(r) for r in j]
def get(self, r_url: str, headers: Optional[Dict[str, str]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Reponse
-
Wrapper for
requests.get
.Expand source code
def get( self, r_url: str, headers: Optional[Dict[str, str]] = None, use_token: bool = True, debug: bool = False, **kw: Any ) -> requests.Reponse: """Wrapper for `requests.get`.""" if debug: from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking login token.") headers.update({'Authorization': f'Bearer {self.token}'}) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending GET request to {self.url + r_url}.") return self.session.get( self.url + r_url, headers = headers, **kw )
def get_actions(self) ‑> list
-
Get available actions from the API server
Expand source code
def get_actions( self, ) -> list: """Get available actions from the API server""" from meerschaum.config.static import STATIC_CONFIG return self.get(STATIC_CONFIG['api']['endpoints']['actions'])
def get_backtrack_data(self, pipe: Pipe, begin: datetime.datetime, backtrack_minutes: int = 0, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any) ‑> pandas.DataFrame
-
Get a Pipe's backtrack data from the API.
Expand source code
def get_backtrack_data( self, pipe: meerschaum.Pipe, begin: datetime.datetime, backtrack_minutes: int = 0, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any, ) -> pandas.DataFrame: """Get a Pipe's backtrack data from the API.""" import json from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn r_url = pipe_r_url(pipe) try: response = self.get( r_url + "/backtrack_data", params = { 'begin': begin, 'backtrack_minutes': backtrack_minutes, 'params': json.dumps(params), }, debug = debug ) except Exception as e: warn(f"Failed to parse backtrack data JSON for {pipe}. Exception:\n" + str(e)) return None from meerschaum.utils.packages import import_pandas from meerschaum.utils.misc import parse_df_datetimes if debug: dprint(response.text) pd = import_pandas() try: df = pd.read_json(response.text) except Exception as e: warn(str(e)) return None df = parse_df_datetimes(pd.read_json(response.text), debug=debug) return df
def get_chaining_status(self, **kw) ‑> Optional[bool]
-
Parameters
**kw :
Returns
type
Expand source code
def get_chaining_status(self, **kw) -> Optional[bool]: """ Parameters ---------- **kw : Returns ------- type """ from meerschaum.config.static import _static_config try: response = self.get( _static_config()['api']['endpoints']['chaining'], use_token = True, **kw ) if not response: return None except Exception as e: return None return response.json()
def get_mrsm_version(self, **kw) ‑> Optional[str]
-
Parameters
**kw :
Returns
type
Expand source code
def get_mrsm_version(self, **kw) -> Optional[str]: """ Parameters ---------- **kw : Returns ------- type """ from meerschaum.config.static import _static_config try: j = self.get( _static_config()['api']['endpoints']['version'] + '/mrsm', use_token = True, **kw ).json() except Exception as e: return None if isinstance(j, dict) and 'detail' in j: return None return j
def get_pipe_attributes(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, Any]
-
Get a Pipe's attributes from the API
Parameters
pipe
:Pipe
- The pipe whose attributes we are fetching.
Returns
A dictionary of a pipe's attributes. If the pipe does not exist, return an empty dictionary.
Expand source code
def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> Dict[str, Any]: """Get a Pipe's attributes from the API Parameters ---------- pipe: meerschaum.Pipe The pipe whose attributes we are fetching. Returns ------- A dictionary of a pipe's attributes. If the pipe does not exist, return an empty dictionary. """ r_url = pipe_r_url(pipe) response = self.get(r_url + '/attributes', debug=debug) import json try: return json.loads(response.text) except Exception as e: return {}
def get_pipe_columns_types(self, pipe: Pipe, debug: bool = False) ‑> Union[Dict[str, str], None]
-
Fetch the columns and types of the pipe's table.
Parameters
pipe
:Pipe
- The pipe whose columns to be queried.
Returns
A dictionary mapping column names to their database types.
Examples
>>> { ... 'dt': 'TIMESTAMP WITHOUT TIMEZONE', ... 'id': 'BIGINT', ... 'val': 'DOUBLE PRECISION', ... } >>>
Expand source code
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> Union[Dict[str, str], None]: """ Fetch the columns and types of the pipe's table. Parameters ---------- pipe: meerschaum.Pipe The pipe whose columns to be queried. Returns ------- A dictionary mapping column names to their database types. Examples -------- >>> { ... 'dt': 'TIMESTAMP WITHOUT TIMEZONE', ... 'id': 'BIGINT', ... 'val': 'DOUBLE PRECISION', ... } >>> """ r_url = pipe_r_url(pipe) + '/columns/types' response = self.get( r_url, debug = debug ) j = response.json() if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: from meerschaum.utils.warnings import warn warn(j['detail']) return None if not isinstance(j, dict): warn(response.text) return None return j
def get_pipe_data(self, pipe: Pipe, begin: Union[str, datetime.datetime, int, None] = None, end: Union[str, datetime.datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, None]
-
Fetch data from the API.
Expand source code
def get_pipe_data( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, None] = None, end: Union[str, datetime.datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any ) -> Union[pandas.DataFrame, None]: """Fetch data from the API.""" import json from meerschaum.utils.warnings import warn r_url = pipe_r_url(pipe) chunks_list = [] while True: try: response = self.get( r_url + "/data", params = {'begin': begin, 'end': end, 'params': json.dumps(params)}, debug = debug ) if not response.ok: return None j = response.json() except Exception as e: warn(str(e)) return None if isinstance(j, dict) and 'detail' in j: return False, j['detail'] break from meerschaum.utils.packages import import_pandas from meerschaum.utils.misc import parse_df_datetimes pd = import_pandas() try: df = pd.read_json(response.text) except Exception as e: warn(str(e)) return None df = parse_df_datetimes( df, ignore_cols = [ col for col, dtype in pipe.dtypes.items() if 'datetime' not in str(dtype) ], debug = debug, ) return df
def get_pipe_id(self, pipe: meerschuam.Pipe, debug: bool = False) ‑> int
-
Get a Pipe's ID from the API.
Expand source code
def get_pipe_id( self, pipe: meerschuam.Pipe, debug: bool = False, ) -> int: """Get a Pipe's ID from the API.""" from meerschaum.utils.debug import dprint r_url = pipe_r_url(pipe) response = self.get( r_url + '/id', debug = debug ) if debug: dprint(f"Got pipe ID: {response.text}") try: return int(response.text) except Exception as e: return None
def get_pipe_rowcount(self, pipe: "'Pipe'", begin: "Optional['datetime.datetime']" = None, end: "Optional['datetime.datetime']" = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) ‑> int
-
Get a pipe's row count from the API.
Parameters
pipe
:'meerschaum.Pipe':
- The pipe whose row count we are counting.
begin
:Optional[datetime.datetime]
, defaultNone
- If provided, bound the count by this datetime.
end
:Optional[datetime.datetime]
- If provided, bound the count by this datetime.
params
:Optional[Dict[str, Any]]
, defaultNone
- If provided, bound the count by these parameters.
remote
:bool
, defaultFalse
Returns
The number of rows in the pipe's table, bound the given parameters. If the table does not exist, return 0.
Expand source code
def get_pipe_rowcount( self, pipe: 'meerschaum.Pipe', begin: Optional['datetime.datetime'] = None, end: Optional['datetime.datetime'] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False, ) -> int: """Get a pipe's row count from the API. Parameters ---------- pipe: 'meerschaum.Pipe': The pipe whose row count we are counting. begin: Optional[datetime.datetime], default None If provided, bound the count by this datetime. end: Optional[datetime.datetime] If provided, bound the count by this datetime. params: Optional[Dict[str, Any]], default None If provided, bound the count by these parameters. remote: bool, default False Returns ------- The number of rows in the pipe's table, bound the given parameters. If the table does not exist, return 0. """ import json r_url = pipe_r_url(pipe) response = self.get( r_url + "/rowcount", json = params, params = { 'begin' : begin, 'end' : end, 'remote' : remote, }, debug = debug ) try: return int(json.loads(response.text)) except Exception as e: return 0
def get_plugin_attributes(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> Mapping[str, Any]
-
Return a plugin's attributes.
Expand source code
def get_plugin_attributes( self, plugin: meerschaum.core.Plugin, debug: bool = False ) -> Mapping[str, Any]: """ Return a plugin's attributes. """ import json from meerschaum.utils.warnings import warn, error from meerschaum.config.static import _static_config r_url = plugin_r_url(plugin) + '/attributes' response = self.get(r_url, use_token=True, debug=debug) attributes = response.json() if isinstance(attributes, str) and attributes and attributes[0] == '{': try: attributes = json.loads(attributes) except Exception as e: pass if not isinstance(attributes, dict): error(response.text) elif not response and 'detail' in attributes: warn(attributes['detail']) return {} return attributes
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) ‑> Sequence[str]
-
Return a list of registered plugin names.
Parameters
- user_id :
- If specified, return all plugins from a certain user.
user_id
:Optional[int] :
- (Default value = None)
search_term
:Optional[str] :
- (Default value = None)
debug
:bool :
- (Default value = False)
Returns
Expand source code
def get_plugins( self, user_id : Optional[int] = None, search_term : Optional[str] = None, debug : bool = False ) -> Sequence[str]: """Return a list of registered plugin names. Parameters ---------- user_id : If specified, return all plugins from a certain user. user_id : Optional[int] : (Default value = None) search_term : Optional[str] : (Default value = None) debug : bool : (Default value = False) Returns ------- """ import json from meerschaum.utils.warnings import warn, error from meerschaum.config.static import _static_config response = self.get( _static_config()['api']['endpoints']['plugins'], params = {'user_id' : user_id, 'search_term' : search_term}, use_token = True, debug = debug ) if not response: return [] plugins = json.loads(response.text) if not isinstance(plugins, list): error(response.text) return plugins
def get_sync_time(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> Union[datetime.datetime, int, None]
-
Get a Pipe's most recent datetime value from the API.
Parameters
pipe
:Pipe
- The pipe to select from.
params
:Optional[Dict[str, Any]]
, defaultNone
- Optional params dictionary to build the WHERE clause.
newest
:bool
, defaultTrue
- If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC). round_down
:bool
, defaultTrue
- If
True
, round the resulting datetime value down to the nearest minute.
Returns
The most recent (or oldest if
newest
isFalse
) datetime of a pipe, rounded down to the closest minute.Expand source code
def get_sync_time( self, pipe: 'meerschaum.Pipe', params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False, ) -> Union[datetime.datetime, int, None]: """Get a Pipe's most recent datetime value from the API. Parameters ---------- pipe: meerschaum.Pipe The pipe to select from. params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. newest: bool, default True If `True`, get the most recent datetime (honoring `params`). If `False`, get the oldest datetime (ASC instead of DESC). round_down: bool, default True If `True`, round the resulting datetime value down to the nearest minute. Returns ------- The most recent (or oldest if `newest` is `False`) datetime of a pipe, rounded down to the closest minute. """ from meerschaum.utils.misc import is_int from meerschaum.utils.warnings import warn import datetime, json r_url = pipe_r_url(pipe) response = self.get( r_url + '/sync_time', json = params, params = {'newest': newest, 'debug': debug, 'round_down': round_down}, debug = debug, ) if not response: warn(response.text) return None j = response.json() if j is None: dt = None else: try: dt = ( datetime.datetime.fromisoformat(j) ) if not is_int(j) else int(j) except Exception as e: warn(e) dt = None return dt
def get_user_attributes(self, user: "'meerschaum.core.User'", debug: bool = False, **kw) ‑> int
-
Get a user's attributes.
Expand source code
def get_user_attributes( self, user: 'meerschaum.core.User', debug: bool = False, **kw ) -> int: """Get a user's attributes.""" from meerschaum.config.static import _static_config import json r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}/attributes" response = self.get(r_url, debug=debug, **kw) try: attributes = json.loads(response.text) except Exception as e: attributes = None return attributes
def get_user_id(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[int]
-
Get a user's ID.
Expand source code
def get_user_id( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> Optional[int]: """Get a user's ID.""" from meerschaum.config.static import _static_config import json r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}/id" response = self.get(r_url, debug=debug, **kw) try: user_id = int(json.loads(response.text)) except Exception as e: user_id = None return user_id
def get_user_password_hash(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]
-
If configured, get a user's password hash.
Expand source code
def get_user_password_hash( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> Optional[str]: """If configured, get a user's password hash.""" from meerschaum.config.static import _static_config r_url = _static_config()['api']['endpoints']['users'] + '/' + user.username + '/password_hash' response = self.get(r_url, debug=debug, **kw) if not response: return None return response.json()
def get_user_type(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]
-
If configured, get a user's type.
Expand source code
def get_user_type( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> Optional[str]: """If configured, get a user's type.""" from meerschaum.config.static import _static_config r_url = _static_config()['api']['endpoints']['users'] + '/' + user.username + '/type' response = self.get(r_url, debug=debug, **kw) if not response: return None return response.json()
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]
-
Return a list of registered usernames.
Expand source code
def get_users( self, debug: bool = False, **kw : Any ) -> List[str]: """ Return a list of registered usernames. """ from meerschaum.config.static import _static_config import json response = self.get( f"{_static_config()['api']['endpoints']['users']}", debug = debug, use_token = True, ) if not response: return [] try: return response.json() except Exception as e: return []
def install_plugin(self, name: str, force: bool = False, debug: bool = False) ‑> Tuple[bool, str]
-
Download and attempt to install a plugin from the API.
Expand source code
def install_plugin( self, name: str, force: bool = False, debug: bool = False ) -> SuccessTuple: """Download and attempt to install a plugin from the API.""" import os, pathlib, json from meerschaum.core import Plugin from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import binaryornot_check = attempt_import('binaryornot.check', lazy=False) r_url = plugin_r_url(name) dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz')) if debug: dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...") archive_path = self.wget(r_url, dest, debug=debug) is_binary = binaryornot_check.is_binary(str(archive_path)) if not is_binary: fail_msg = f"Failed to download binary for plugin '{name}'." try: with open(archive_path, 'r') as f: j = json.load(f) if isinstance(j, list): success, msg = tuple(j) elif isinstance(j, dict) and 'detail' in j: success, msg = False, fail_msg except Exception as e: success, msg = False, fail_msg return success, msg plugin = Plugin(name, archive_path=archive_path, repo_connector=self) return plugin.install(force=force, debug=debug)
def login(self, debug: bool = False, warn: bool = True, **kw: Any) ‑> Tuple[bool, str]
-
Log in and set the session token.
Expand source code
def login( self, debug: bool = False, warn: bool = True, **kw: Any ) -> SuccessTuple: """Log in and set the session token.""" from meerschaum.utils.warnings import warn as _warn, info, error from meerschaum.core import User from meerschaum.config.static import _static_config import json, datetime try: login_data = { 'username': self.username, 'password': self.password, } except AttributeError: return False, f"Please login with the command `login {self}`." response = self.post( _static_config()['api']['endpoints']['login'], data = login_data, use_token = False, debug = debug ) if response: msg = f"Successfully logged into '{self}' as user '{login_data['username']}'." self._token = json.loads(response.text)['access_token'] self._expires = datetime.datetime.strptime( json.loads(response.text)['expires'], '%Y-%m-%dT%H:%M:%S.%f' ) else: msg = ( f"Failed to log into '{self}' as user '{login_data['username']}'.\n" + f" Please verify login details for connector '{self}'." ) if warn: _warn(msg, stack=False) return response.__bool__(), msg
def patch(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response
-
Wrapper for
requests.patch
.Expand source code
def patch( self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any ) -> requests.Response: """Wrapper for `requests.patch`.""" if debug: from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking login token.") headers.update({ 'Authorization': f'Bearer {self.token}' }) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending PATCH request to {self.url + r_url}") return self.session.patch( self.url + r_url, headers = headers, **kw )
def pipe_exists(self, pipe: "'Pipe'", debug: bool = False) ‑> bool
-
Check the API to see if a Pipe exists.
Parameters
pipe
:'meerschaum.Pipe'
- The pipe which were are querying.
Returns
A bool indicating whether a pipe's underlying table exists.
Expand source code
def pipe_exists( self, pipe: 'meerschaum.Pipe', debug: bool = False ) -> bool: """Check the API to see if a Pipe exists. Parameters ---------- pipe: 'meerschaum.Pipe' The pipe which were are querying. Returns ------- A bool indicating whether a pipe's underlying table exists. """ from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn r_url = pipe_r_url(pipe) response = self.get(r_url + '/exists', debug=debug) if debug: dprint("Received response: " + str(response.text)) j = response.json() if isinstance(j, dict) and 'detail' in j: warn(j['detail']) return j
def post(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response
-
Wrapper for
requests.post
.Expand source code
def post( self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any ) -> requests.Response: """Wrapper for `requests.post`.""" if debug: from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking token...") headers.update({'Authorization': f'Bearer {self.token}'}) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending POST request to {self.url + r_url}") return self.session.post( self.url + r_url, headers = headers, **kw )
def register_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple
-
Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).
Expand source code
def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False ) -> SuccessTuple: """Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict). """ from meerschaum.utils.debug import dprint from meerschaum.config.static import STATIC_CONFIG ### NOTE: if `parameters` is supplied in the Pipe constructor, ### then `pipe.parameters` will exist and not be fetched from the database. r_url = pipe_r_url(pipe) response = self.post( r_url + '/register', json = pipe.parameters, debug = debug, ) if debug: dprint(response.text) if isinstance(response.json(), list): response_tuple = response.__bool__(), response.json()[1] elif 'detail' in response.json(): response_tuple = response.__bool__(), response.json()['detail'] else: response_tuple = response.__bool__(), response.text return response_tuple
def register_plugin(self, plugin: meerschaum.core.Plugin, make_archive: bool = True, debug: bool = False) ‑> SuccessTuple
-
Register a plugin and upload its archive.
Expand source code
def register_plugin( self, plugin: meerschaum.core.Plugin, make_archive: bool = True, debug: bool = False, ) -> SuccessTuple: """Register a plugin and upload its archive.""" import json archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path file_pointer = open(archive_path, 'rb') files = {'archive': file_pointer} metadata = { 'version': plugin.version, 'attributes': json.dumps(plugin.attributes), } r_url = plugin_r_url(plugin) try: response = self.post(r_url, files=files, params=metadata, debug=debug) except Exception as e: return False, f"Failed to register plugin '{plugin}'." finally: file_pointer.close() try: success, msg = json.loads(response.text) except Exception as e: return False, response.text return success, msg
def register_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Register a new user.
Expand source code
def register_user( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> SuccessTuple: """Register a new user.""" import json from meerschaum.config.static import _static_config r_url = f"{_static_config()['api']['endpoints']['users']}/register" data = { 'username': user.username, 'password': user.password, 'attributes': json.dumps(user.attributes), } if user.type: data['type'] = user.type if user.email: data['email'] = user.email response = self.post(r_url, data=data, debug=debug) try: _json = json.loads(response.text) if isinstance(_json, dict) and 'detail' in _json: return False, _json['detail'] success_tuple = tuple(_json) except Exception: msg = response.text if response else f"Failed to register user '{user}'." return False, msg return tuple(success_tuple)
def sync_pipe(self, pipe: Optional[Pipe] = None, df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Append a pandas DataFrame to a Pipe. If Pipe does not exist, it is registered with supplied metadata.
Expand source code
def sync_pipe( self, pipe: Optional[meerschaum.Pipe] = None, df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any ) -> SuccessTuple: """Append a pandas DataFrame to a Pipe. If Pipe does not exist, it is registered with supplied metadata. """ from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn from meerschaum.utils.misc import json_serialize_datetime from meerschaum.config import get_config from meerschaum.utils.packages import attempt_import import json, time begin = time.time() more_itertools = attempt_import('more_itertools') if df is None: msg = f"DataFrame is `None`. Cannot sync {pipe}." return False, msg def get_json_str(c): ### allow syncing dict or JSON without needing to import pandas (for IOT devices) return ( json.dumps(c, default=json_serialize_datetime) if isinstance(c, (dict, list)) else c.to_json(date_format='iso', date_unit='ns') ) df = json.loads(df) if isinstance(df, str) else df ### TODO Make separate chunksize for API? _chunksize : Optional[int] = (1 if chunksize is None else ( get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1 else chunksize )) keys : list = list(df.keys()) chunks = [] if hasattr(df, 'index'): rowcount = len(df) chunks = [df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize)] elif isinstance(df, dict): ### `_chunks` is a dict of lists of dicts. ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] } _chunks = {k: [] for k in keys} rowcount = len(df[keys[0]]) for k in keys: if len(df[k]) != rowcount: return False, "Arrays must all be the same length." chunk_iter = more_itertools.chunked(df[k], _chunksize) for l in chunk_iter: _chunks[k].append({k: l}) ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON). for k, l in _chunks.items(): for i, c in enumerate(l): try: chunks[i].update(c) except IndexError: chunks.append(c) elif isinstance(df, list): chunks = [df[i] for i in more_itertools.chunked(df, _chunksize)] rowcount = len(chunks) ### Send columns in case the user has defined them locally. if pipe.columns: kw['columns'] = json.dumps(pipe.columns) r_url = pipe_r_url(pipe) + '/data' for i, c in enumerate(chunks): if debug: dprint(f"Posting chunk ({i + 1} / {len(chunks)}) to {r_url}...") json_str = get_json_str(c) try: response = self.post( r_url, ### handles check_existing params = kw, data = json_str, debug = debug ) except Exception as e: warn(str(e)) return False, str(e) if not response: return False, f"Failed to receive response. Response text: {response.text}" try: j = json.loads(response.text) except Exception as e: return False, str(e) if isinstance(j, dict) and 'detail' in j: return False, j['detail'] try: j = tuple(j) except Exception as e: return False, response.text if debug: dprint("Received response: " + str(j)) if not j[0]: return j len_chunks = len(chunks) success_tuple = True, ( f"It took {round(time.time() - begin, 2)} seconds to sync {rowcount} row" + ('s' if rowcount != 1 else '') + f" across {len_chunks} chunk" + ('s' if len_chunks != 1 else '') + f" to {pipe}." ) return success_tuple
def test_connection(self, **kw: Any) ‑> Optional[bool]
-
Test if a successful connection to the API may be made.
Expand source code
def test_connection( self, **kw: Any ) -> Union[bool, None]: """Test if a successful connection to the API may be made.""" from meerschaum.connectors.poll import retry_connect _default_kw = { 'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self, 'enforce_chaining': False, 'enforce_login': False, } _default_kw.update(kw) try: return retry_connect(**_default_kw) except Exception as e: return False
def wget(self, r_url: str, dest: Optional[Union[str, pathlib.Path]] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> pathlib.Path
-
Mimic wget with requests.
Expand source code
def wget( self, r_url: str, dest: Optional[Union[str, pathlib.Path]] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any ) -> pathlib.Path: """Mimic wget with requests. """ from meerschaum.utils.misc import wget from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking login token.") headers.update({'Authorization': f'Bearer {self.token}'}) return wget(self.url + r_url, dest=dest, headers=headers, **kw)