Module meerschaum.Pipe
Pipes are the primary metaphor of the Meerschaum system.
You can interact with pipe data via meerschaum.Pipe
objects.
If you are working with multiple pipes, it is highly recommended that you instead use
meerschaum.utils.get_pipes
(available as meerschaum.get_pipes
)
to create a dictionary of Pipe objects.
>>> from meerschaum import get_pipes
>>> pipes = get_pipes()
Examples
For the below examples to work, sql:remote_server
must be defined (check with edit config
)
with correct credentials, as well as a network connection and valid permissions.
Manually Adding Data
>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'energy')
>>>
>>> ### Columns only need to be defined if you're creating a new pipe.
>>> pipe.columns = { 'datetime' : 'time', 'id' : 'station_id' }
>>>
>>> ### Create a Pandas DataFrame somehow,
>>> ### or you can use a dictionary of lists instead.
>>> df = pd.read_csv('data.csv')
>>>
>>> pipe.sync(df)
Registering a Remote Pipe
>>> from meerschaum import Pipe
>>> pipe = Pipe('sql:remote_server', 'energy')
>>>
>>> pipe.attributes = {
... 'fetch' : {
... 'definition' : 'SELECT * FROM energy_table',
... },
... }
>>>
>>> ### Columns are a subset of attributes, so define columns
>>> ### after defining attributes.
>>> pipe.columns = { 'datetime' : 'time', 'id' : 'station_id' }
>>>
>>> pipe.sync()
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Pipes are the primary metaphor of the Meerschaum system.
You can interact with pipe data via `meerschaum.Pipe` objects.
If you are working with multiple pipes, it is highly recommended that you instead use
`meerschaum.utils.get_pipes` (available as `meerschaum.get_pipes`)
to create a dictionary of Pipe objects.
```
>>> from meerschaum import get_pipes
>>> pipes = get_pipes()
```
# Examples
For the below examples to work, `sql:remote_server` must be defined (check with `edit config`)
with correct credentials, as well as a network connection and valid permissions.
## Manually Adding Data
---
```
>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'energy')
>>>
>>> ### Columns only need to be defined if you're creating a new pipe.
>>> pipe.columns = { 'datetime' : 'time', 'id' : 'station_id' }
>>>
>>> ### Create a Pandas DataFrame somehow,
>>> ### or you can use a dictionary of lists instead.
>>> df = pd.read_csv('data.csv')
>>>
>>> pipe.sync(df)
```
## Registering a Remote Pipe
---
```
>>> from meerschaum import Pipe
>>> pipe = Pipe('sql:remote_server', 'energy')
>>>
>>> pipe.attributes = {
... 'fetch' : {
... 'definition' : 'SELECT * FROM energy_table',
... },
... }
>>>
>>> ### Columns are a subset of attributes, so define columns
>>> ### after defining attributes.
>>> pipe.columns = { 'datetime' : 'time', 'id' : 'station_id' }
>>>
>>> pipe.sync()
```
"""
from __future__ import annotations
from meerschaum.utils.typing import Optional, Dict, Any, Union, InstanceConnector
class Pipe:
"""
Access Meerschaum pipes via Pipe objects.
Pipes are identified by the following:
1. Connector keys (e.g. `'sql:main'`)
2. Metric key (e.g. `'weather'`)
3. Location (optional; e.g. `None`)
A pipe's connector keys correspond to a data source, and when the pipe is synced,
its `fetch` definition is evaluated and executed to produce new data.
Alternatively, new data may be directly synced via `pipe.sync()`:
```
>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
```
"""
from ._fetch import fetch
from ._data import get_data, get_backtrack_data, get_rowcount
from ._register import register
from ._attributes import (
attributes,
parameters,
columns,
get_columns,
get_columns_types,
get_id,
id,
get_val_column,
parents,
)
from ._show import show
from ._edit import edit, edit_definition
from ._sync import sync, get_sync_time, exists, filter_existing
from ._delete import delete
from ._drop import drop
from ._clear import clear
from ._bootstrap import bootstrap
def __init__(
self,
connector_keys: str,
metric_key: str,
location_key: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
columns: Optional[Dict[str, str]] = None,
mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
instance: Optional[Union[str, InstanceConnector]] = None,
cache: bool = False,
debug: bool = False
):
"""
Parameters
----------
connector_keys: str
Keys for the pipe's source connector, e.g. `'sql:main'`.
metric_key: str
Label for the pipe's contents, e.g. `'weather'`.
location_key: str, default None
Label for the pipe's location. Defaults to `None`.
parameters: Optional[Dict[str, Any]], default None
Optionally set a pipe's parameters from the constructor,
e.g. columns and other attributes.
Defaults to `None`.
columns: Optional[Dict[str, str]], default None
Subset of parameters for ease of use.
If `parameters` is provided, `columns` has not effect.
Defaults to `None`.
mrsm_instance: Optional[Union[str, InstanceConnector]], default None
Connector for the Meerschaum instance where the pipe resides.
Defaults to the preconfigured default instance (`'sql:main'`).
instance: Optional[Union[str, InstanceConnector]], default None
Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
cache: bool, default False
If `True`, cache fetched data into a local database file.
Experimental features must be enabled.
You can enable experimental caching under `system:experimental:cache`.
Defaults to `False`.
"""
if location_key in ('[None]', 'None'):
location_key = None
self.connector_keys = connector_keys
self.metric_key = metric_key
self.location_key = location_key
### only set parameters if values are provided
if parameters is not None:
self._parameters = parameters
if columns is not None:
if self.__dict__.get('_parameters', None) is None:
self._parameters = {}
self._parameters['columns'] = columns
### NOTE: The parameters dictionary is {} by default.
### A Pipe may be registered without parameters, then edited,
### or a Pipe may be registered with parameters set in-memory first.
from meerschaum.config import get_config
_mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
if _mrsm_instance is None:
_mrsm_instance = get_config('meerschaum', 'instance', patch=True)
if not isinstance(_mrsm_instance, str):
self._instance_connector = _mrsm_instance
self.instance_keys = str(_mrsm_instance)
else: ### NOTE: must be SQL or API Connector for this work
self.instance_keys = _mrsm_instance
self._cache = cache and get_config('system', 'experimental', 'cache')
@property
def meta(self):
"""Simulate the MetaPipe model without importing FastAPI."""
refresh = False
if '_meta' not in self.__dict__:
refresh = True
# elif self.parameters != self.__dict__['_meta']['parameters']:
# refresh = True
if refresh:
# parameters = self.parameters
# if parameters is None:
# parameters = dict()
self._meta = {
'connector_keys' : self.connector_keys,
'metric_key' : self.metric_key,
'location_key' : self.location_key,
# 'parameters' : parameters,
'instance' : self.instance_keys,
}
return self._meta
@property
def instance_connector(self) -> Union[InstanceConnector, None]:
"""
The connector to where this pipe resides.
May either be of type `'sql'` (`meerschaum.connectors.sql.SQLConnector` or of type `'api'`
(`meerschaum.connectors.api.APIConnector`).
"""
if '_instance_connector' not in self.__dict__:
from meerschaum.connectors.parse import parse_instance_keys
conn = parse_instance_keys(self.instance_keys)
if conn:
self._instance_connector = conn
else:
return None
return self._instance_connector
@property
def connector(self) -> Union[meerschaum.connectors.Connector, None]:
"""
The connector to the data source.
May be of type `'sql'`, `'api`', `'mqtt'`, or `'plugin'`.
"""
if '_connector' not in self.__dict__:
from meerschaum.connectors.parse import parse_instance_keys
conn = parse_instance_keys(self.connector_keys)
if conn:
self._connector = conn
else:
return None
return self._connector
@property
def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
"""
If the pipe was created with `cache=True`, return the connector to the pipe's
SQLite database for caching.
"""
if not self._cache:
return None
if '_cache_connector' not in self.__dict__:
from meerschaum.connectors import get_connector
from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
_resources_path = SQLITE_RESOURCES_PATH
self._cache_connector = get_connector(
'sql', '_cache_' + str(self),
flavor='sqlite',
database=str(_resources_path / ('_cache_' + str(self) + '.db')),
)
return self._cache_connector
@property
def cache_pipe(self) -> Union['meerschaum.Pipe.Pipe', None]:
"""
If the pipe was created with `cache=True`, return another `meerschaum.Pipe.Pipe` used to
manage the local data.
"""
if self.cache_connector is None:
return None
if '_cache_pipe' not in self.__dict__:
from meerschaum import Pipe
from meerschaum.config._patch import apply_patch_to_config
from meerschaum.connectors.sql.tools import sql_item_name
_parameters = self.parameters.copy()
_fetch_patch = {
'fetch': ({
'definition': (
f"SELECT * FROM {sql_item_name(str(self), self.instance_connector.flavor)}"
),
}) if self.instance_connector.type == 'sql' else ({
'connector_keys': self.connector_keys,
'metric_key': self.metric_key,
'location_key': self.location_key,
})
}
_parameters = apply_patch_to_config(_parameters, _fetch_patch)
self._cache_pipe = Pipe(
self.instance_keys,
(self.connector_keys + '_' + self.metric_key + '_cache'),
self.location_key,
mrsm_instance=self.cache_connector,
parameters=_parameters,
cache=False,
)
return self._cache_pipe
@property
def sync_time(self) -> Union[datetime.datetime, None]:
"""
Convenience function to get the pipe's latest datetime.
Use `meerschaum.Pipe.Pipe.get_sync_time()` instead.
"""
return self.get_sync_time()
def __str__(self):
"""
The Pipe's SQL table name. Converts the `':'` in the `connector_keys` to an `'_'`.
"""
name = f"{self.connector_keys.replace(':', '_')}_{self.metric_key}"
if self.location_key is not None:
name += f"_{self.location_key}"
return name
def __eq__(self, other):
try:
return (
type(self) == type(other)
and self.connector_keys == other.connector_keys
and self.metric_key == other.metric_key
and self.location_key == other.location_key
and self.instance_keys == other.instance_keys
)
except Exception as e:
return False
def __hash__(self):
### Using an esoteric separator to avoid collisions.
sep = "[\"']"
return hash(
str(self.connector_keys) + sep
+ str(self.metric_key) + sep
+ str(self.location_key) + sep
+ str(self.instance_keys) + sep
)
def __repr__(self):
return str(self)
def __getstate__(self):
"""
Define the state dictionary (pickling).
"""
state = {
'connector_keys' : self.connector_keys,
'metric_key' : self.metric_key,
'location_key' : self.location_key,
'parameters' : self.parameters,
'mrsm_instance' : self.instance_keys,
}
return state
def __setstate__(self, _state : dict):
"""
Read the state (unpickling).
"""
self.__init__(**_state)
Classes
class Pipe (connector_keys: str, metric_key: str, location_key: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Optional[Dict[str, str]] = None, mrsm_instance: Optional[Union[str, InstanceConnector]] = None, instance: Optional[Union[str, InstanceConnector]] = None, cache: bool = False, debug: bool = False)
-
Access Meerschaum pipes via Pipe objects.
Pipes are identified by the following:
- Connector keys (e.g.
'sql:main'
) - Metric key (e.g.
'weather'
) - Location (optional; e.g.
None
)
A pipe's connector keys correspond to a data source, and when the pipe is synced, its
fetch
definition is evaluated and executed to produce new data.Alternatively, new data may be directly synced via
pipe.sync()
:>>> from meerschaum import Pipe >>> pipe = Pipe('csv', 'weather') >>> >>> import pandas as pd >>> df = pd.read_csv('weather.csv') >>> pipe.sync(df)
Parameters
connector_keys
:str
- Keys for the pipe's source connector, e.g.
'sql:main'
. metric_key
:str
- Label for the pipe's contents, e.g.
'weather'
. location_key
:str
, defaultNone
- Label for the pipe's location. Defaults to
None
. parameters
:Optional[Dict[str, Any]]
, defaultNone
- Optionally set a pipe's parameters from the constructor,
e.g. columns and other attributes.
Defaults to
None
. columns
:Optional[Dict[str, str]]
, defaultNone
- Subset of parameters for ease of use.
If
parameters
is provided,columns
has not effect. Defaults toNone
. mrsm_instance
:Optional[Union[str, InstanceConnector]]
, defaultNone
- Connector for the Meerschaum instance where the pipe resides.
Defaults to the preconfigured default instance (
'sql:main'
). instance
:Optional[Union[str, InstanceConnector]]
, defaultNone
- Alias for
mrsm_instance
. Ifmrsm_instance
is supplied, this value is ignored. cache
:bool
, defaultFalse
- If
True
, cache fetched data into a local database file. Experimental features must be enabled. You can enable experimental caching undersystem:experimental:cache
. Defaults toFalse
.
Expand source code
class Pipe: """ Access Meerschaum pipes via Pipe objects. Pipes are identified by the following: 1. Connector keys (e.g. `'sql:main'`) 2. Metric key (e.g. `'weather'`) 3. Location (optional; e.g. `None`) A pipe's connector keys correspond to a data source, and when the pipe is synced, its `fetch` definition is evaluated and executed to produce new data. Alternatively, new data may be directly synced via `pipe.sync()`: ``` >>> from meerschaum import Pipe >>> pipe = Pipe('csv', 'weather') >>> >>> import pandas as pd >>> df = pd.read_csv('weather.csv') >>> pipe.sync(df) ``` """ from ._fetch import fetch from ._data import get_data, get_backtrack_data, get_rowcount from ._register import register from ._attributes import ( attributes, parameters, columns, get_columns, get_columns_types, get_id, id, get_val_column, parents, ) from ._show import show from ._edit import edit, edit_definition from ._sync import sync, get_sync_time, exists, filter_existing from ._delete import delete from ._drop import drop from ._clear import clear from ._bootstrap import bootstrap def __init__( self, connector_keys: str, metric_key: str, location_key: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Optional[Dict[str, str]] = None, mrsm_instance: Optional[Union[str, InstanceConnector]] = None, instance: Optional[Union[str, InstanceConnector]] = None, cache: bool = False, debug: bool = False ): """ Parameters ---------- connector_keys: str Keys for the pipe's source connector, e.g. `'sql:main'`. metric_key: str Label for the pipe's contents, e.g. `'weather'`. location_key: str, default None Label for the pipe's location. Defaults to `None`. parameters: Optional[Dict[str, Any]], default None Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. Defaults to `None`. columns: Optional[Dict[str, str]], default None Subset of parameters for ease of use. If `parameters` is provided, `columns` has not effect. Defaults to `None`. mrsm_instance: Optional[Union[str, InstanceConnector]], default None Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance (`'sql:main'`). instance: Optional[Union[str, InstanceConnector]], default None Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. cache: bool, default False If `True`, cache fetched data into a local database file. Experimental features must be enabled. You can enable experimental caching under `system:experimental:cache`. Defaults to `False`. """ if location_key in ('[None]', 'None'): location_key = None self.connector_keys = connector_keys self.metric_key = metric_key self.location_key = location_key ### only set parameters if values are provided if parameters is not None: self._parameters = parameters if columns is not None: if self.__dict__.get('_parameters', None) is None: self._parameters = {} self._parameters['columns'] = columns ### NOTE: The parameters dictionary is {} by default. ### A Pipe may be registered without parameters, then edited, ### or a Pipe may be registered with parameters set in-memory first. from meerschaum.config import get_config _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance if _mrsm_instance is None: _mrsm_instance = get_config('meerschaum', 'instance', patch=True) if not isinstance(_mrsm_instance, str): self._instance_connector = _mrsm_instance self.instance_keys = str(_mrsm_instance) else: ### NOTE: must be SQL or API Connector for this work self.instance_keys = _mrsm_instance self._cache = cache and get_config('system', 'experimental', 'cache') @property def meta(self): """Simulate the MetaPipe model without importing FastAPI.""" refresh = False if '_meta' not in self.__dict__: refresh = True # elif self.parameters != self.__dict__['_meta']['parameters']: # refresh = True if refresh: # parameters = self.parameters # if parameters is None: # parameters = dict() self._meta = { 'connector_keys' : self.connector_keys, 'metric_key' : self.metric_key, 'location_key' : self.location_key, # 'parameters' : parameters, 'instance' : self.instance_keys, } return self._meta @property def instance_connector(self) -> Union[InstanceConnector, None]: """ The connector to where this pipe resides. May either be of type `'sql'` (`meerschaum.connectors.sql.SQLConnector` or of type `'api'` (`meerschaum.connectors.api.APIConnector`). """ if '_instance_connector' not in self.__dict__: from meerschaum.connectors.parse import parse_instance_keys conn = parse_instance_keys(self.instance_keys) if conn: self._instance_connector = conn else: return None return self._instance_connector @property def connector(self) -> Union[meerschaum.connectors.Connector, None]: """ The connector to the data source. May be of type `'sql'`, `'api`', `'mqtt'`, or `'plugin'`. """ if '_connector' not in self.__dict__: from meerschaum.connectors.parse import parse_instance_keys conn = parse_instance_keys(self.connector_keys) if conn: self._connector = conn else: return None return self._connector @property def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: """ If the pipe was created with `cache=True`, return the connector to the pipe's SQLite database for caching. """ if not self._cache: return None if '_cache_connector' not in self.__dict__: from meerschaum.connectors import get_connector from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH _resources_path = SQLITE_RESOURCES_PATH self._cache_connector = get_connector( 'sql', '_cache_' + str(self), flavor='sqlite', database=str(_resources_path / ('_cache_' + str(self) + '.db')), ) return self._cache_connector @property def cache_pipe(self) -> Union['meerschaum.Pipe.Pipe', None]: """ If the pipe was created with `cache=True`, return another `meerschaum.Pipe.Pipe` used to manage the local data. """ if self.cache_connector is None: return None if '_cache_pipe' not in self.__dict__: from meerschaum import Pipe from meerschaum.config._patch import apply_patch_to_config from meerschaum.connectors.sql.tools import sql_item_name _parameters = self.parameters.copy() _fetch_patch = { 'fetch': ({ 'definition': ( f"SELECT * FROM {sql_item_name(str(self), self.instance_connector.flavor)}" ), }) if self.instance_connector.type == 'sql' else ({ 'connector_keys': self.connector_keys, 'metric_key': self.metric_key, 'location_key': self.location_key, }) } _parameters = apply_patch_to_config(_parameters, _fetch_patch) self._cache_pipe = Pipe( self.instance_keys, (self.connector_keys + '_' + self.metric_key + '_cache'), self.location_key, mrsm_instance=self.cache_connector, parameters=_parameters, cache=False, ) return self._cache_pipe @property def sync_time(self) -> Union[datetime.datetime, None]: """ Convenience function to get the pipe's latest datetime. Use `meerschaum.Pipe.Pipe.get_sync_time()` instead. """ return self.get_sync_time() def __str__(self): """ The Pipe's SQL table name. Converts the `':'` in the `connector_keys` to an `'_'`. """ name = f"{self.connector_keys.replace(':', '_')}_{self.metric_key}" if self.location_key is not None: name += f"_{self.location_key}" return name def __eq__(self, other): try: return ( type(self) == type(other) and self.connector_keys == other.connector_keys and self.metric_key == other.metric_key and self.location_key == other.location_key and self.instance_keys == other.instance_keys ) except Exception as e: return False def __hash__(self): ### Using an esoteric separator to avoid collisions. sep = "[\"']" return hash( str(self.connector_keys) + sep + str(self.metric_key) + sep + str(self.location_key) + sep + str(self.instance_keys) + sep ) def __repr__(self): return str(self) def __getstate__(self): """ Define the state dictionary (pickling). """ state = { 'connector_keys' : self.connector_keys, 'metric_key' : self.metric_key, 'location_key' : self.location_key, 'parameters' : self.parameters, 'mrsm_instance' : self.instance_keys, } return state def __setstate__(self, _state : dict): """ Read the state (unpickling). """ self.__init__(**_state)
Instance variables
var attributes : Union[Dict[str, Any], NoneType]
-
Return a dictionary of a pipe's keys and parameters. Is a superset of
Pipe.parameters
.Expand source code
@property def attributes(self) -> Optional[Dict[str, Any]]: """ Return a dictionary of a pipe's keys and parameters. Is a superset of `meerschaum.Pipe.Pipe.parameters`. """ from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn if '_attributes' not in self.__dict__: if self.id is None: return None self._attributes = self.instance_connector.get_pipe_attributes(self) return self._attributes
var cache_connector : Union[meerschaum.connectors.sql.SQLConnector, None]
-
If the pipe was created with
cache=True
, return the connector to the pipe's SQLite database for caching.Expand source code
@property def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: """ If the pipe was created with `cache=True`, return the connector to the pipe's SQLite database for caching. """ if not self._cache: return None if '_cache_connector' not in self.__dict__: from meerschaum.connectors import get_connector from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH _resources_path = SQLITE_RESOURCES_PATH self._cache_connector = get_connector( 'sql', '_cache_' + str(self), flavor='sqlite', database=str(_resources_path / ('_cache_' + str(self) + '.db')), ) return self._cache_connector
var cache_pipe : Union[Pipe, NoneType]
-
If the pipe was created with
cache=True
, return anotherPipe
used to manage the local data.Expand source code
@property def cache_pipe(self) -> Union['meerschaum.Pipe.Pipe', None]: """ If the pipe was created with `cache=True`, return another `meerschaum.Pipe.Pipe` used to manage the local data. """ if self.cache_connector is None: return None if '_cache_pipe' not in self.__dict__: from meerschaum import Pipe from meerschaum.config._patch import apply_patch_to_config from meerschaum.connectors.sql.tools import sql_item_name _parameters = self.parameters.copy() _fetch_patch = { 'fetch': ({ 'definition': ( f"SELECT * FROM {sql_item_name(str(self), self.instance_connector.flavor)}" ), }) if self.instance_connector.type == 'sql' else ({ 'connector_keys': self.connector_keys, 'metric_key': self.metric_key, 'location_key': self.location_key, }) } _parameters = apply_patch_to_config(_parameters, _fetch_patch) self._cache_pipe = Pipe( self.instance_keys, (self.connector_keys + '_' + self.metric_key + '_cache'), self.location_key, mrsm_instance=self.cache_connector, parameters=_parameters, cache=False, ) return self._cache_pipe
var columns : Union[Dict[str, str], NoneType]
-
If defined, return the
columns
dictionary defined inPipe.parameters
.Expand source code
@property def columns(self) -> Union[Dict[str, str], None]: """ If defined, return the `columns` dictionary defined in `meerschaum.Pipe.Pipe.parameters`. """ if not self.parameters: if '_columns' in self.__dict__: return self._columns return None if 'columns' not in self.parameters: return None return self.parameters['columns']
var connector : Union[meerschaum.connectors.Connector, None]
-
The connector to the data source. May be of type
'sql'
,'api
','mqtt'
, or'plugin'
.Expand source code
@property def connector(self) -> Union[meerschaum.connectors.Connector, None]: """ The connector to the data source. May be of type `'sql'`, `'api`', `'mqtt'`, or `'plugin'`. """ if '_connector' not in self.__dict__: from meerschaum.connectors.parse import parse_instance_keys conn = parse_instance_keys(self.connector_keys) if conn: self._connector = conn else: return None return self._connector
var id : Union[int, NoneType]
-
Fetch and cache a pipe's ID.
Expand source code
@property def id(self) -> Union[int, None]: """ Fetch and cache a pipe's ID. """ if not ('_id' in self.__dict__ and self._id): self._id = self.get_id() return self._id
var instance_connector : Union[meerschaum.connectors.sql.SQLConnector, meerschaum.connectors.api.APIConnector, NoneType]
-
The connector to where this pipe resides. May either be of type
'sql'
(meerschaum.connectors.sql.SQLConnector
or of type'api'
(meerschaum.connectors.api.APIConnector
).Expand source code
@property def instance_connector(self) -> Union[InstanceConnector, None]: """ The connector to where this pipe resides. May either be of type `'sql'` (`meerschaum.connectors.sql.SQLConnector` or of type `'api'` (`meerschaum.connectors.api.APIConnector`). """ if '_instance_connector' not in self.__dict__: from meerschaum.connectors.parse import parse_instance_keys conn = parse_instance_keys(self.instance_keys) if conn: self._instance_connector = conn else: return None return self._instance_connector
var meta
-
Simulate the MetaPipe model without importing FastAPI.
Expand source code
@property def meta(self): """Simulate the MetaPipe model without importing FastAPI.""" refresh = False if '_meta' not in self.__dict__: refresh = True # elif self.parameters != self.__dict__['_meta']['parameters']: # refresh = True if refresh: # parameters = self.parameters # if parameters is None: # parameters = dict() self._meta = { 'connector_keys' : self.connector_keys, 'metric_key' : self.metric_key, 'location_key' : self.location_key, # 'parameters' : parameters, 'instance' : self.instance_keys, } return self._meta
var parameters : Union[Dict[str, Any], NoneType]
-
Return the parameters dictionary of the pipe.
Expand source code
@property def parameters(self) -> Optional[Dict[str, Any]]: """ Return the parameters dictionary of the pipe. """ if '_parameters' not in self.__dict__: if not self.attributes: return None self._parameters = self.attributes['parameters'] return self._parameters
var parents : List[Pipe]
-
Return a list of
Pipe
objects. These pipes will be synced before this pipe.NOTE: Not yet in use!
Expand source code
@property def parents(self) -> List[meerschaum.Pipe.Pipe]: """ Return a list of `meerschaum.Pipe.Pipe` objects. These pipes will be synced before this pipe. NOTE: Not yet in use! """ if 'parents' not in self.parameters: return [] from meerschaum.utils.warnings import warn _parents_keys = self.parameters['parents'] if not isinstance(_parents_keys, list): warn( f"Please ensure the parents for pipe '{self}' are defined as a list of keys.", stacklevel = 4 ) return [] from meerschaum import Pipe _parents = [] for keys in _parents_keys: try: p = Pipe(**keys) except Exception as e: warn(f"Unable to build parent with keys '{keys}' for pipe '{self}':\n{e}") continue _parents.append(p) return _parents
var sync_time : Union[datetime.datetime, None]
-
Convenience function to get the pipe's latest datetime. Use
get_sync_time()
instead.Expand source code
@property def sync_time(self) -> Union[datetime.datetime, None]: """ Convenience function to get the pipe's latest datetime. Use `meerschaum.Pipe.Pipe.get_sync_time()` instead. """ return self.get_sync_time()
Methods
def bootstrap(self, debug: bool = False, yes: bool = False, force: bool = False, noask: bool = False, shell: bool = False, **kw) ‑> Tuple[bool, str]
-
Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang.
Parameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
yes
:bool
, defaultFalse:
- Print the questions and automatically agree.
force
:bool
, defaultFalse:
- Skip the questions and agree anyway.
noask
:bool
, defaultFalse:
- Print the questions but go with the default answer.
shell
:bool
, defaultFalse:
- Used to determine if we are in the interactive shell.
Returns
A
SuccessTuple
corresponding to the success of this procedure.Expand source code
def bootstrap( self, debug: bool = False, yes: bool = False, force: bool = False, noask: bool = False, shell: bool = False, **kw ) -> SuccessTuple: """ Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang. Parameters ---------- debug: bool, default False: Verbosity toggle. yes: bool, default False: Print the questions and automatically agree. force: bool, default False: Skip the questions and agree anyway. noask: bool, default False: Print the questions but go with the default answer. shell: bool, default False: Used to determine if we are in the interactive shell. Returns ------- A `SuccessTuple` corresponding to the success of this procedure. """ from meerschaum.utils.warnings import warn, info, error from meerschaum.utils.prompt import prompt, yes_no from meerschaum.utils.formatting import pprint from meerschaum.config import get_config from meerschaum.utils.formatting._shell import clear_screen from meerschaum.utils.formatting import print_tuple from meerschaum.actions import actions _clear = get_config('shell', 'clear_screen', patch=True) if self.get_id(debug=debug) is not None: delete_tuple = self.delete(debug=debug) if not delete_tuple[0]: return delete_tuple if _clear: clear_screen(debug=debug) _parameters = _get_parameters(self, debug=debug) self.parameters = _parameters pprint(self.parameters) try: prompt( f"\n Press [Enter] to register pipe '{self}' with the above configuration:", icon = False ) except KeyboardInterrupt as e: return False, f"Aborting bootstrapping pipe '{self}'." register_tuple = self.instance_connector.register_pipe(self, debug=debug) if not register_tuple[0]: return register_tuple if _clear: clear_screen(debug=debug) try: if yes_no( f"Would you like to edit the definition for pipe '{self}'?", yes=yes, noask=noask ): edit_tuple = self.edit_definition(debug=debug) if not edit_tuple[0]: return edit_tuple if yes_no(f"Would you like to try syncing pipe '{self}' now?", yes=yes, noask=noask): # sync_tuple = self.sync(debug=debug) sync_tuple = actions['sync']( ['pipes'], connector_keys = [self.connector_keys], metric_keys = [self.metric_key], location_keys = [self.location_key], mrsm_instance = str(self.instance_connector), debug = debug, shell = shell, ) if not sync_tuple[0]: return sync_tuple except Exception as e: return False, f"Failed to bootstrap pipe '{self}':\n" + str(e) print_tuple((True, f"Finished bootstrapping pipe '{self}'!")) info( f"You can edit this pipe later with `edit pipes` or set the definition with `edit pipes definition`.\n" + " To sync data into your pipe, run `sync pipes`." ) return True, "Success"
def clear(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Call the Pipe's instance connector's
clear_pipe
method.Parameters
begin
:Optional[datetime.datetime]
, defaultNone:
- If provided, only remove rows newer than this datetime value.
end
:Optional[datetime.datetime]
, defaultNone:
- If provided, only remove rows older than this datetime column (not including end).
debug
:bool
, defaultFalse:
- Verbositity toggle.
Returns
A
SuccessTuple
corresponding to whether this procedure completed successfully.Examples
>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local') >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]}) >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]}) >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]}) >>> >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0)) >>> pipe.get_data() dt 0 2020-01-01
Expand source code
def clear( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, debug: bool = False, **kw: Any ) -> SuccessTuple: """ Call the Pipe's instance connector's `clear_pipe` method. Parameters ---------- begin: Optional[datetime.datetime], default None: If provided, only remove rows newer than this datetime value. end: Optional[datetime.datetime], default None: If provided, only remove rows older than this datetime column (not including end). debug: bool, default False: Verbositity toggle. Returns ------- A `SuccessTuple` corresponding to whether this procedure completed successfully. Examples -------- >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local') >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]}) >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]}) >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]}) >>> >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0)) >>> pipe.get_data() dt 0 2020-01-01 """ from meerschaum.utils.warnings import warn if self.cache_pipe is not None: success, msg = self.cache_pipe.clear(begin=begin, end=end, debug=debug, **kw) if not success: warn(msg) return self.instance_connector.clear_pipe(self, begin=begin, end=end, debug=debug, **kw)
def delete(self, debug: bool = False, **kw) ‑> Tuple[bool, str]
-
Call the Pipe's instance connector's
delete_pipe()
method.Parameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
Returns
A
SuccessTuple
of success (bool
), message (str
).Expand source code
def delete( self, debug: bool = False, **kw ) -> SuccessTuple: """ Call the Pipe's instance connector's `delete_pipe()` method. Parameters ---------- debug : bool, default False: Verbosity toggle. Returns ------- A `SuccessTuple` of success (`bool`), message (`str`). """ import os, pathlib from meerschaum.utils.warnings import warn if self.cache_pipe is not None: _delete_cache_tuple = self.cache_pipe.delete(debug=debug, **kw) if not _delete_cache_tuple[0]: warn(_delete_cache_tuple[1]) _cache_db_path = pathlib.Path(self.cache_connector.database) try: os.remove(_cache_db_path) except Exception as e: warn(f"Could not delete cache file '{_cache_db_path}' for pipe '{self}':\n{e}") result = self.instance_connector.delete_pipe(self, debug=debug, **kw) if not isinstance(result, tuple): return False, f"Received unexpected result from '{self.instance_connector}': {result}" if result[0]: to_delete = ['_id', '_attributes', '_parameters', '_columns', '_data'] for member in to_delete: if member in self.__dict__: del self.__dict__[member] return result
def drop(self, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]
-
Call the Pipe's instance connector's
drop_pipe()
methodParameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
Returns
A
SuccessTuple
of success, message.Expand source code
def drop( self, debug: bool = False, **kw : Any ) -> SuccessTuple: """ Call the Pipe's instance connector's `drop_pipe()` method Parameters ---------- debug: bool, default False: Verbosity toggle. Returns ------- A `SuccessTuple` of success, message. """ from meerschaum.utils.warnings import warn if self.cache_pipe is not None: _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw) if not _drop_cache_tuple[0]: warn(_drop_cache_tuple[1]) return self.instance_connector.drop_pipe(self, debug=debug, **kw)
def edit(self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]
-
Edit a Pipe's configuration.
Parameters
patch
:bool
, defaultFalse
- If
patch
is True, update parameters by cascading rather than overwriting. interactive
:bool
, defaultFalse
- If
True
, open an editor for the user to make changes to the pipe's YAML file. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
SuccessTuple
of success, message.Expand source code
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any ) -> SuccessTuple: """ Edit a Pipe's configuration. Parameters ---------- patch: bool, default False If `patch` is True, update parameters by cascading rather than overwriting. interactive: bool, default False If `True`, open an editor for the user to make changes to the pipe's YAML file. debug: bool, default False Verbosity toggle. Returns ------- A `SuccessTuple` of success, message. """ if not interactive: return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw) from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH from meerschaum.utils.misc import edit_file import pathlib, os parameters_filename = str(self) + '.yaml' parameters_path = pathlib.Path(os.path.join(PIPES_CACHE_RESOURCES_PATH, parameters_filename)) from meerschaum.utils.yaml import yaml edit_header = "#######################################" for i in range(len(str(self))): edit_header += "#" edit_header += "\n" edit_header += f"# Edit the parameters for the Pipe '{self}' #" edit_header += "\n#######################################" for i in range(len(str(self))): edit_header += "#" edit_header += "\n\n" from meerschaum.config import get_config parameters = dict(get_config('pipes', 'parameters', patch=True)) from meerschaum.config._patch import apply_patch_to_config parameters = apply_patch_to_config(parameters, self.parameters) ### write parameters to yaml file with open(parameters_path, 'w+') as f: f.write(edit_header) yaml.dump(parameters, stream=f, sort_keys=False) ### only quit editing if yaml is valid editing = True while editing: edit_file(parameters_path) try: with open(parameters_path, 'r') as f: file_parameters = yaml.load(f.read()) except Exception as e: from meerschaum.utils.warnings import warn warn(f"Invalid format defined for '{self}':\n\n{e}") input(f"Press [Enter] to correct the configuration for '{self}': ") else: editing = False self.parameters = file_parameters if debug: from meerschaum.utils.formatting import pprint pprint(self.parameters) return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
def edit_definition(self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]
-
Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!
Returns
A
SuccessTuple
of success, message.Expand source code
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug : bool = False, **kw : Any ) -> SuccessTuple: """ Edit a pipe's definition file and update its configuration. **NOTE:** This function is interactive and should not be used in automated scripts! Returns ------- A `SuccessTuple` of success, message. """ if self.connector.type not in ('sql', 'api'): return self.edit(interactive=True, debug=debug, **kw) import json from meerschaum.utils.warnings import info, warn from meerschaum.utils.debug import dprint from meerschaum.config._patch import apply_patch_to_config from meerschaum.utils.misc import edit_file _parameters = self.parameters if 'fetch' not in _parameters: _parameters['fetch'] = {} def _edit_api(): from meerschaum.utils.prompt import prompt, yes_no info( f"Please enter the keys of the source pipe from '{self.connector}'.\n" + "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip." ) _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None } for k in _keys: _keys[k] = _parameters['fetch'].get(k, None) for k, v in _keys.items(): try: _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v) except KeyboardInterrupt: continue if _keys[k] in ('', 'None', '\'None\'', '[None]'): _keys[k] = None _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys) info("You may optionally specify additional filter parameters as JSON.") print(" Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.") print(" For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':") print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': '))) if force or yes_no( "Would you like to add additional filter parameters?", yes=yes, noask=noask ): from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH definition_filename = str(self) + '.json' definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename try: definition_path.touch() with open(definition_path, 'w+') as f: json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2) except Exception as e: return False, f"Failed writing file '{definition_path}':\n" + str(e) _params = None while True: edit_file(definition_path) try: with open(definition_path, 'r') as f: _params = json.load(f) except Exception as e: warn(f'Failed to read parameters JSON:\n{e}', stack=False) if force or yes_no( "Would you like to try again?\n " + "If not, the parameters JSON file will be ignored.", noask=noask, yes=yes ): continue _params = None break if _params is not None: if 'fetch' not in _parameters: _parameters['fetch'] = {} _parameters['fetch']['params'] = _params self.parameters = _parameters return True, "Success" def _edit_sql(): import pathlib, os, textwrap from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH from meerschaum.utils.misc import edit_file definition_filename = str(self) + '.sql' definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename sql_definition = _parameters['fetch'].get('definition', None) if sql_definition is None: sql_definition = '' sql_definition = textwrap.dedent(sql_definition).lstrip() try: definition_path.touch() with open(definition_path, 'w+') as f: f.write(sql_definition) except Exception as e: return False, f"Failed writing file '{definition_path}':\n" + str(e) edit_file(definition_path) try: with open(definition_path, 'r') as f: file_definition = f.read() except Exception as e: return False, f"Failed reading file '{definition_path}':\n" + str(e) if sql_definition == file_definition: return False, f"No changes made to definition for pipe '{self}'." if ' ' not in file_definition: return False, f"Invalid SQL definition for pipe '{self}'." if debug: dprint("Read SQL definition:\n\n" + file_definition) _parameters['fetch']['definition'] = file_definition self.parameters = _parameters return True, "Success" locals()['_edit_' + str(self.connector.type)]() return self.edit(interactive=False, debug=debug, **kw)
def exists(self, debug: bool = False) ‑> bool
-
See if a Pipe's table exists.
Parameters
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
bool
corresponding to whether a pipe's underlying table exists.Expand source code
def exists( self, debug : bool = False ) -> bool: """ See if a Pipe's table exists. Parameters ---------- debug: bool, default False Verbosity toggle. Returns ------- A `bool` corresponding to whether a pipe's underlying table exists. """ ### TODO test against views return self.instance_connector.pipe_exists(pipe=self, debug=debug)
def fetch(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any) ‑> 'pd.DataFrame or None'
-
Fetch a Pipe's latest data from its connector.
Parameters
begin
:Optional[datetime.datetime]
, defaultNone:
- If provided, only fetch data newer than or equal to
begin
. end
:Optional[datetime.datetime]
, defaultNone:
- If provided, only fetch data older than or equal to
end
. sync_chunks
:bool
, defaultFalse
- If
True
and the pipe's connector is of type'sql'
, begin syncing chunks while fetching loads chunks into memory. deactivate_plugin_venv
:bool
, defaultTrue
- If
True
and the pipe's connector is of type'plugin'
, deactivate the plugin's virtual environment after retrieving the dataframe. Not intended for general use. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
pd.DataFrame
of the newest unseen data.Expand source code
def fetch( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any ) -> 'pd.DataFrame or None': """ Fetch a Pipe's latest data from its connector. Parameters ---------- begin: Optional[datetime.datetime], default None: If provided, only fetch data newer than or equal to `begin`. end: Optional[datetime.datetime], default None: If provided, only fetch data older than or equal to `end`. sync_chunks: bool, default False If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching loads chunks into memory. deactivate_plugin_venv: bool, default True If `True` and the pipe's connector is of type `'plugin'`, deactivate the plugin's virtual environment after retrieving the dataframe. Not intended for general use. debug: bool, default False Verbosity toggle. Returns ------- A `pd.DataFrame` of the newest unseen data. """ if 'fetch' not in dir(self.connector): from meerschaum.utils.warnings import warn warn(f"No `fetch()` function defined for connector '{self.connector}'") return None from meerschaum.utils.debug import dprint, _checkpoint if self.connector.type == 'plugin': from meerschaum.utils.packages import activate_venv, deactivate_venv activate_venv(self.connector.label, debug=debug) _chunk_hook = kw.pop('chunk_hook') if 'chunk_hook' in kw else None df = self.connector.fetch( self, begin = begin, end = end, chunk_hook = ( self.sync if sync_chunks and _chunk_hook is None else _chunk_hook ), debug = debug, **kw ) if self.connector.type == 'plugin' and deactivate_plugin_venv: deactivate_venv(self.connector.label, debug=debug) ### Return True if we're syncing in parallel, else continue as usual. if sync_chunks: return True return df
def filter_existing(self, df: "'pd.DataFrame'", begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> 'pd.DataFrame'
-
Inspect a dataframe and filter out rows which already exist in the pipe.
Parameters
df
:'pd.DataFrame'
- The dataframe to inspect and filter.
begin
:Optional[datetime.datetime]
, defaultNone
- If provided, use this boundary when searching for existing data.
end
:Optional[datetime.datetime]
, default- If provided, use this boundary when searching for existing data.
chunksize
:Optional[int]
, default-1
- The
chunksize
used when fetching existing data. params
:Optional[Dict[str, Any]]
, defaultNone
- If provided, use this filter when searching for existing data.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
pd.DataFrame
with existing rows removed.Expand source code
def filter_existing( self, df: 'pd.DataFrame', begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw ) -> 'pd.DataFrame': """ Inspect a dataframe and filter out rows which already exist in the pipe. Parameters ---------- df: 'pd.DataFrame' The dataframe to inspect and filter. begin: Optional[datetime.datetime], default None If provided, use this boundary when searching for existing data. end: Optional[datetime.datetime], default If provided, use this boundary when searching for existing data. chunksize: Optional[int], default -1 The `chunksize` used when fetching existing data. params: Optional[Dict[str, Any]], default None If provided, use this filter when searching for existing data. debug: bool, default False Verbosity toggle. Returns ------- A `pd.DataFrame` with existing rows removed. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint from meerschaum.utils.misc import round_time from meerschaum.utils.packages import attempt_import, import_pandas import datetime pd = import_pandas() ### begin is the oldest data in the new dataframe try: min_dt = pd.to_datetime(df[self.get_columns('datetime')].min(skipna=True)).to_pydatetime() except Exception as e: ### NOTE: This will fetch the entire pipe! min_dt = self.get_sync_time(newest=False, debug=debug) if not isinstance(min_dt, datetime.datetime) or str(min_dt) == 'NaT': ### min_dt might be None, a user-supplied value, or the sync time. min_dt = begin ### If `min_dt` is None, use `datetime.utcnow()`. begin = round_time( min_dt, to = 'down' ) - datetime.timedelta(minutes=1) ### end is the newest data in the new dataframe try: max_dt = pd.to_datetime(df[self.get_columns('datetime')].max(skipna=True)).to_pydatetime() except Exception as e: max_dt = end if not isinstance(max_dt, datetime.datetime) or str(max_dt) == 'NaT': max_dt = None if max_dt is not None and min_dt > max_dt: warn(f"Detected minimum datetime greater than maximum datetime.") ### If `max_dt` is `None`, unbound the search. end = ( round_time( max_dt, to = 'down' ) + datetime.timedelta(minutes=1) ) if max_dt is not None else end if begin is not None and end is not None and begin > end: begin = end - datetime.timedelta(minutes=1) if debug: dprint(f"Looking at data between '{begin}' and '{end}'.", **kw) ### backtrack_df is existing Pipe data that overlaps with the fetched df try: backtrack_minutes = self.parameters['fetch']['backtrack_minutes'] except Exception as e: backtrack_minutes = 0 backtrack_df = self.get_data( begin = begin, end = end, chunksize = chunksize, params = params, debug = debug, **kw ) if debug: dprint("Existing data:\n" + str(backtrack_df), **kw) ### remove data we've already seen before from meerschaum.utils.misc import filter_unseen_df return filter_unseen_df(backtrack_df, df, debug=debug)
def get_backtrack_data(self, backtrack_minutes: int = 0, begin: "Optional['datetime.datetime']" = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Union[pd.DataFrame, NoneType]
-
Get the most recent data from the instance connector as a Pandas DataFrame.
Parameters
backtrack_minutes
:int
, default0
- How many minutes from
begin
to select from. Defaults to 0. This may return a few rows due to a rounding quirk. begin
:Optional[datetime.datetime]
, defaultNone
- The starting point to search for data.
If begin is
None
(default), use the most recent observed datetime (AKA sync_time).
E.g. begin = 02:00 Search this region. Ignore this, even if there's data. / / / / / / / / / | -----|----------|----------|----------|----------|----------| 00:00 01:00 02:00 03:00 04:00 05:00
fresh
:bool
, defaultFalse
- If
True
, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created withcache=True
. debug
:bool default False
- Verbosity toggle.
Returns
A
pd.DataFrame
for the pipe's data corresponding to the provided parameters. Backtrack data is a convenient way to get a pipe's data "backtracked" from the most recent datetime.Expand source code
def get_backtrack_data( self, backtrack_minutes: int = 0, begin: Optional['datetime.datetime'] = None, fresh: bool = False, debug : bool = False, **kw : Any ) -> Optional['pd.DataFrame']: """ Get the most recent data from the instance connector as a Pandas DataFrame. Parameters ---------- backtrack_minutes: int, default 0 How many minutes from `begin` to select from. Defaults to 0. This may return a few rows due to a rounding quirk. begin: Optional[datetime.datetime], default None The starting point to search for data. If begin is `None` (default), use the most recent observed datetime (AKA sync_time). ``` E.g. begin = 02:00 Search this region. Ignore this, even if there's data. / / / / / / / / / | -----|----------|----------|----------|----------|----------| 00:00 01:00 02:00 03:00 04:00 05:00 ``` fresh: bool, default False If `True`, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with `cache=True`. debug: bool default False Verbosity toggle. Returns ------- A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data is a convenient way to get a pipe's data "backtracked" from the most recent datetime. """ from meerschaum.utils.warnings import warn kw.update({'backtrack_minutes': backtrack_minutes, 'begin': begin,}) if not self.exists(debug=debug): return None if self.cache_pipe is not None: if not fresh: _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw) if not _sync_cache_tuple[0]: warn(f"Failed to sync cache for pipe '{self}':\n" + _sync_cache_tuple[1]) fresh = True else: ### Successfully synced cache. return self.cache_pipe.get_backtrack_data(debug=debug, fresh=True, **kw) ### If `fresh` or the syncing failed, directly pull from the instance connector. return self.instance_connector.get_backtrack_data( pipe = self, debug = debug, **kw )
def get_columns(self, *args: str, error: bool = True) ‑> Tuple[str]
-
Check if the requested columns are defined.
Parameters
*args
:str :
- The column names to be retrieved.
error
:bool
, defaultTrue:
- If
True
, raise anException
if the specified column is not defined.
Returns
A tuple of the same size of
args
.Examples
>>> pipe = mrsm.Pipe('test', 'test') >>> pipe.columns = {'datetime': 'dt', 'id': 'id'} >>> pipe.get_columns('datetime', 'id') ('dt', 'id') >>> pipe.get_columns('value') Exception: 🛑 Missing 'value' column for Pipe 'test_test'.
Expand source code
def get_columns(self, *args: str, error : bool = True) -> Tuple[str]: """ Check if the requested columns are defined. Parameters ---------- *args : str : The column names to be retrieved. error : bool, default True: If `True`, raise an `Exception` if the specified column is not defined. Returns ------- A tuple of the same size of `args`. Examples -------- >>> pipe = mrsm.Pipe('test', 'test') >>> pipe.columns = {'datetime': 'dt', 'id': 'id'} >>> pipe.get_columns('datetime', 'id') ('dt', 'id') >>> pipe.get_columns('value') Exception: 🛑 Missing 'value' column for Pipe 'test_test'. """ from meerschaum.utils.warnings import error as _error, warn if not args: args = tuple(self.columns.keys()) col_names = [] for col in args: col_name = None try: col_name = self.columns[col] if col_name is None and error: _error(f"Please define the name of the '{col}' column for Pipe '{self}'.") except Exception as e: col_name = None if col_name is None and error: _error(f"Missing '{col}'" + f" column for Pipe '{self}'.") col_names.append(col_name) if len(col_names) == 1: return col_names[0] return tuple(col_names)
def get_columns_types(self, debug: bool = False) ‑> Union[Dict[str, str], NoneType]
-
Get a dictionary of a pipe's column names and their types.
Parameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
Returns
A dictionary of column names (
str
) to column types (str
).Examples
>>> pipe.get_columns_types() { 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 'id': 'BIGINT', 'val': 'DOUBLE PRECISION', } >>>
Expand source code
def get_columns_types(self, debug : bool = False) -> Union[Dict[str, str], None]: """ Get a dictionary of a pipe's column names and their types. Parameters ---------- debug : bool, default False: Verbosity toggle. Returns ------- A dictionary of column names (`str`) to column types (`str`). Examples -------- >>> pipe.get_columns_types() { 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 'id': 'BIGINT', 'val': 'DOUBLE PRECISION', } >>> """ return self.instance_connector.get_pipe_columns_types(self, debug=debug)
def get_data(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Optional[pandas.DataFrame]
-
Get a pipe's data from the instance connector.
Parameters
begin
:Optional[datetime.datetime]
, defaultNone
- Lower bound datetime to begin searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime >= begin
. Defaults toNone
. end
:Optional[datetime.datetime]
, defaultNone
- Upper bound datetime to stop searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime <= end
. Defaults toNone
. params
:Optional[Dict[str, Any]]
, defaultNone
- Filter the retrieved data by a dictionary of parameters.
See
build_where()
for more details. fresh
:bool
, defaultTrue
- If
True
, skip local cache and directly query the instance connector. Defaults toTrue
. debug
:bool
, defaultFalse
- Verbosity toggle.
Defaults to
False
.
Returns
A
pd.DataFrame
for the pipe's data corresponding to the provided parameters.Expand source code
def get_data( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, fresh: bool = False, debug: bool = False, **kw: Any ) -> Optional[pandas.DataFrame]: """ Get a pipe's data from the instance connector. Parameters ---------- begin: Optional[datetime.datetime], default None Lower bound datetime to begin searching for data (inclusive). Translates to a `WHERE` clause like `WHERE datetime >= begin`. Defaults to `None`. end: Optional[datetime.datetime], default None Upper bound datetime to stop searching for data (inclusive). Translates to a `WHERE` clause like `WHERE datetime <= end`. Defaults to `None`. params: Optional[Dict[str, Any]], default None Filter the retrieved data by a dictionary of parameters. See `meerschaum.connectors.sql.tools.build_where` for more details. fresh: bool, default True If `True`, skip local cache and directly query the instance connector. Defaults to `True`. debug: bool, default False Verbosity toggle. Defaults to `False`. Returns ------- A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. """ from meerschaum.utils.warnings import warn kw.update({'begin': begin, 'end': end, 'params': params,}) if not self.exists(debug=debug): return None if self.cache_pipe is not None: if not fresh: _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw) if not _sync_cache_tuple[0]: warn(f"Failed to sync cache for pipe '{self}':\n" + _sync_cache_tuple[1]) fresh = True else: ### Successfully synced cache. return self.cache_pipe.get_data(debug=debug, fresh=True, **kw) ### If `fresh` or the syncing failed, directly pull from the instance connector. return self.instance_connector.get_pipe_data( pipe = self, debug = debug, **kw )
def get_id(self, **kw: Any) ‑> Union[int, NoneType]
-
Fetch a pipe's ID from its instance connector. If the pipe does not exist, return
None
.Expand source code
def get_id(self, **kw : Any) -> Union[int, None]: """ Fetch a pipe's ID from its instance connector. If the pipe does not exist, return `None`. """ return self.instance_connector.get_pipe_id(self, **kw)
def get_rowcount(self, begin: "Optional['datetime.datetime']" = None, end: "Optional['datetime.datetime']" = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Union[int, None]
-
Get a Pipe's instance or remote rowcount.
Parameters
begin
:Optional[datetime.datetime]
, defaultNone
- Count rows where datetime > begin.
end
:Optional[datetime.datetime]
, defaultNone
- Count rows where datetime <= end.
remote
:bool
, defaultFalse
- Count rows from a pipe's remote source. NOTE: This is experimental!
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
An
int
of the number of rows in the pipe corresponding to the provided parameters.None
is returned if the pipe does not exist.Expand source code
def get_rowcount( self, begin: Optional['datetime.datetime'] = None, end: Optional['datetime.datetime'] = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False ) -> Union[int, None]: """ Get a Pipe's instance or remote rowcount. Parameters ---------- begin: Optional[datetime.datetime], default None Count rows where datetime > begin. end: Optional[datetime.datetime], default None Count rows where datetime <= end. remote: bool, default False Count rows from a pipe's remote source. **NOTE**: This is experimental! debug: bool, default False Verbosity toggle. Returns ------- An `int` of the number of rows in the pipe corresponding to the provided parameters. `None` is returned if the pipe does not exist. """ from meerschaum.utils.warnings import warn connector = self.instance_connector if not remote else self.connector try: return connector.get_pipe_rowcount( self, begin=begin, end=end, remote=remote, params=params, debug=debug ) except AttributeError as e: warn(e) if remote: return None warn(f"Failed to get a rowcount for pipe '{self}'.") return None
def get_sync_time(self, params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> Union[datetime.datetime, NoneType]
-
Get the most recent datetime value for a Pipe.
Parameters
params
:Optional[Dict[str, Any]]
, defaultNone
- Dictionary to build a WHERE clause for a specific column.
See
build_where()
. newest
:bool
, defaultTrue
- If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC
instead ofDESC
). round_down
:bool
, defaultTrue
- If
True
, round down the sync time to the nearest minute. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
datetime.datetime
object if the pipe exists, otherwiseNone
.Expand source code
def get_sync_time( self, params : Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug : bool = False ) -> Union['datetime.datetime', None]: """ Get the most recent datetime value for a Pipe. Parameters ---------- params: Optional[Dict[str, Any]], default None Dictionary to build a WHERE clause for a specific column. See `meerschaum.connectors.sql.tools.build_where`. newest: bool, default True If `True`, get the most recent datetime (honoring `params`). If `False`, get the oldest datetime (`ASC` instead of `DESC`). round_down: bool, default True If `True`, round down the sync time to the nearest minute. debug: bool, default False Verbosity toggle. Returns ------- A `datetime.datetime` object if the pipe exists, otherwise `None`. """ from meerschaum.utils.warnings import error, warn if self.columns is None: warn( f"No columns found for Pipe '{self}'. " + "Pipe might not be registered or is missing columns in parameters." ) return None if 'datetime' not in self.columns: warn( f"'datetime' must be declared in parameters:columns for Pipe '{self}'.\n\n" + f"You can add parameters for this Pipe with the following command:\n\n" + f"mrsm edit pipes -C {self.connector_keys} -M " + f"{self.metric_key} -L " + (f"[None]" if self.location_key is None else f"{self.location_key}") ) return None return self.instance_connector.get_sync_time( self, params = params, newest = newest, round_down = round_down, debug = debug, )
def get_val_column(self, debug: bool = False) ‑> Union[str, NoneType]
-
Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the
columns
dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, returnNone
.Parameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
Returns
Either a string or
None
.Expand source code
def get_val_column(self, debug: bool = False) -> Union[str, None]: """ Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the `columns` dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return `None`. Parameters ---------- debug: bool, default False: Verbosity toggle. Returns ------- Either a string or `None`. """ from meerschaum.utils.debug import dprint if debug: dprint('Attempting to determine the value column...') try: val_name = self.get_columns('value') except Exception as e: val_name = None if val_name is not None: if debug: dprint(f"Value column: {val_name}") return val_name cols = self.columns if cols is None: if debug: dprint('No columns could be determined. Returning...') return None try: dt_name = self.get_columns('datetime') except Exception as e: dt_name = None try: id_name = self.get_columns('id') except Exception as e: id_name = None if debug: dprint(f"dt_name: {dt_name}") dprint(f"id_name: {id_name}") cols_types = self.get_columns_types(debug=debug) if cols_types is None: return None if debug: dprint(f"cols_types: {cols_types}") if dt_name is not None: cols_types.pop(dt_name, None) if id_name is not None: cols_types.pop(id_name, None) candidates = [] candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',} for search_term in candidate_keywords: for col, typ in cols_types.items(): if search_term in typ.lower(): candidates.append(col) break if not candidates: if debug: dprint(f"No value column could be determined.") return None return candidates[0]
def register(self, debug: bool = False)
-
Register a new Pipe along with its attributes.
Parameters
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
SuccessTuple
of success, message.Expand source code
def register( self, debug: bool = False ): """ Register a new Pipe along with its attributes. Parameters ---------- debug: bool, default False Verbosity toggle. Returns ------- A `SuccessTuple` of success, message. """ import warnings with warnings.catch_warnings(): warnings.simplefilter('ignore') try: _conn = self.connector except Exception as e: _conn = None if _conn is not None and _conn.type == 'plugin' and _conn.register is not None: params = self.connector.register(self) params = {} if params is None else params if not isinstance(params, dict): from meerschaum.utils.warnings import warn warn( f"Invalid parameters returned from `register()` in plugin {self.connector}:\n" + f"{params}" ) else: self.parameters = params if not self.parameters: self.parameters = { 'columns': self.columns, } return self.instance_connector.register_pipe(self, debug=debug)
def show(self, nopretty: bool = False, debug: bool = False, **kw) ‑> Tuple[bool, str]
-
Show attributes of a Pipe.
Parameters
nopretty
:bool
, defaultFalse
- If
True
, simply print the JSON of the pipe's attributes. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
SuccessTuple
of success, message.Expand source code
def show( self, nopretty: bool = False, debug: bool = False, **kw ) -> SuccessTuple: """ Show attributes of a Pipe. Parameters ---------- nopretty: bool, default False If `True`, simply print the JSON of the pipe's attributes. debug: bool, default False Verbosity toggle. Returns ------- A `SuccessTuple` of success, message. """ import json from meerschaum.utils.formatting import pprint, make_header from meerschaum.utils.warnings import info if not nopretty: print(make_header(f"Attributes for pipe '{self}':")) pprint(self.attributes) else: print(json.dumps(self.attributes)) return True, "Success"
def sync(self, df: Optional[Union[pandas.DataFrame, Dict[str, List[Any]]]] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Fetch new data from the source and update the pipe's table with new data.
Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.
Parameters
df
:Union[None, pd.DataFrame, Dict[str, List[Any]]]
, defaultNone
- An optional DataFrame to sync into the pipe. Defaults to
None
. begin
:Optional[datetime.datetime]
, defaultNone
- Optionally specify the earliest datetime to search for data.
Defaults to
None
. end
:Optional[datetime.datetime]
, defaultNone
- Optionally specify the latest datetime to search for data.
Defaults to
None
. force
:bool
, defaultFalse
- If
True
, keep trying to sync untulretries
attempts. Defaults toFalse
. retries
:int
, default10
- If
force
, how many attempts to try syncing before declaring failure. Defaults to10
. min_seconds
:Union[int, float]
, default1
- If
force
, how many seconds to sleep between retries. Defaults to1
. check_existing
:bool
, defaultTrue
- If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. blocking
:bool
, defaultTrue
- If
True
, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults toTrue
. Only intended for specific scenarios. workers
:Optional[int]
, defaultNone
- No use directly within
sync()
. Instead is passed on to instance connectors'sync_pipe()
methods (e.g.meerschaum.connectors.plugin.PluginConnector
). Defaults toNone
. callback
:Optional[Callable[[Tuple[bool, str]], Any]]
, defaultNone
- Callback function which expects a SuccessTuple as input.
Only applies when
blocking=False
. error_callback
:Optional[Callable[[Exception], Any]]
, defaultNone
- Callback function which expects an Exception as input.
Only applies when
blocking=False
. chunksize
:int
, default-1
- Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. sync_chunks
:bool
, defaultFalse
- If possible, sync chunks while fetching them into memory.
Defaults to
False
. deactivate_plugin_venv
:bool
, defaultTrue
- If
True
, deactivate a plugin's virtual environment after syncing. Defaults toTrue
. debug
:bool
, defaultFalse
- Verbosity toggle. Defaults to False.
Returns
A
SuccessTuple
of success (bool
) and message (str
).Expand source code
def sync( self, df: Optional[Union[pandas.DataFrame, Dict[str, List[Any]]]] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any ) -> SuccessTuple: """ Fetch new data from the source and update the pipe's table with new data. Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data. Parameters ---------- df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None An optional DataFrame to sync into the pipe. Defaults to `None`. begin: Optional[datetime.datetime], default None Optionally specify the earliest datetime to search for data. Defaults to `None`. end: Optional[datetime.datetime], default None Optionally specify the latest datetime to search for data. Defaults to `None`. force: bool, default False If `True`, keep trying to sync untul `retries` attempts. Defaults to `False`. retries: int, default 10 If `force`, how many attempts to try syncing before declaring failure. Defaults to `10`. min_seconds: Union[int, float], default 1 If `force`, how many seconds to sleep between retries. Defaults to `1`. check_existing: bool, default True If `True`, pull and diff with existing data from the pipe. Defaults to `True`. blocking: bool, default True If `True`, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to `True`. Only intended for specific scenarios. workers: Optional[int], default None No use directly within `Pipe.sync()`. Instead is passed on to instance connectors' `sync_pipe()` methods (e.g. `meerschaum.connectors.plugin.PluginConnector`). Defaults to `None`. callback: Optional[Callable[[Tuple[bool, str]], Any]], default None Callback function which expects a SuccessTuple as input. Only applies when `blocking=False`. error_callback: Optional[Callable[[Exception], Any]], default None Callback function which expects an Exception as input. Only applies when `blocking=False`. chunksize: int, default -1 Specify the number of rows to sync per chunk. If `-1`, resort to system configuration (default is `900`). A `chunksize` of `None` will sync all rows in one transaction. Defaults to `-1`. sync_chunks: bool, default False If possible, sync chunks while fetching them into memory. Defaults to `False`. deactivate_plugin_venv: bool, default True If `True`, deactivate a plugin's virtual environment after syncing. Defaults to `True`. debug: bool, default False Verbosity toggle. Defaults to False. Returns ------- A `SuccessTuple` of success (`bool`) and message (`str`). """ from meerschaum.utils.debug import dprint, _checkpoint from meerschaum.utils.warnings import warn, error import datetime import time if (callback is not None or error_callback is not None) and blocking: warn("Callback functions are only executed when blocking = False. Ignoring...") _checkpoint(_total=2, **kw) if ( not self.connector_keys.startswith('plugin:') and not self.get_columns('datetime', error=False) ): return False, f"Cannot sync pipe '{self}' without a datetime column." ### NOTE: Setting begin to the sync time for Simple Sync. ### TODO: Add flag for specifying syncing method. begin = _determine_begin(self, begin, debug=debug) kw.update({ 'begin': begin, 'end': end, 'force': force, 'retries': retries, 'min_seconds': min_seconds, 'check_existing': check_existing, 'blocking': blocking, 'workers': workers, 'callback': callback, 'error_callback': error_callback, 'sync_chunks': sync_chunks, 'chunksize': chunksize, }) def _sync( p: 'meerschaum.Pipe', df: Optional['pandas.DataFrame'] = None ) -> SuccessTuple: ### Ensure that Pipe is registered. if p.get_id(debug=debug) is None: ### NOTE: This may trigger an interactive session for plugins! register_tuple = p.register(debug=debug) if not register_tuple[0]: return register_tuple ### If connector is a plugin with a `sync()` method, return that instead. ### If the plugin does not have a `sync()` method but does have a `fetch()` method, ### use that instead. ### NOTE: The DataFrame must be None for the plugin sync method to apply. ### If a DataFrame is provided, continue as expected. if df is None: try: if p.connector.type == 'plugin' and p.connector.sync is not None: from meerschaum.utils.packages import activate_venv, deactivate_venv activate_venv(p.connector.label, debug=debug) return_tuple = p.connector.sync(p, debug=debug, **kw) if deactivate_plugin_venv: deactivate_venv(p.connector.label, debug=debug) if not isinstance(return_tuple, tuple): return_tuple = ( False, f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}" ) return return_tuple except Exception as e: msg = f"Failed to sync pipe '{p}' with exception: '" + str(e) + "'" if debug: error(msg, silent=False) return False, msg ### default: fetch new data via the connector. ### If new data is provided, skip fetching. if df is None: if p.connector is None: return False, f"Cannot fetch data for pipe '{p}' without a connector." df = p.fetch(debug=debug, **kw) if df is None: return False, f"Unable to fetch data for pipe '{p}'." if df is True: return True, f"Pipe '{p}' was synced in parallel." ### CHECKPOINT: Retrieved the DataFrame. _checkpoint(**kw) if debug: dprint( "DataFrame to sync:\n" + (str(df)[:255] + '...' if len(str(df)) >= 256 else str(df)), **kw ) ### if force, continue to sync until success return_tuple = False, f"Did not sync pipe '{p}'." run = True _retries = 1 while run: return_tuple = p.instance_connector.sync_pipe( pipe = p, df = df, debug = debug, **kw ) _retries += 1 run = (not return_tuple[0]) and force and _retries <= retries if run and debug: dprint(f"Syncing failed for pipe '{p}'. Attempt ( {_retries} / {retries} )", **kw) dprint(f"Sleeping for {min_seconds} seconds...", **kw) time.sleep(min_seconds) if _retries > retries: warn( f"Unable to sync pipe '{p}' within {retries} attempt" + ("s" if retries != 1 else "") + "!" ) ### CHECKPOINT: Finished syncing. Handle caching. _checkpoint(**kw) if self.cache_pipe is not None: if debug: dprint(f"Caching retrieved dataframe.", **kw) _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw) if not _sync_cache_tuple[0]: warn(f"Failed to sync local cache for pipe '{self}'.") return return_tuple if blocking: return _sync(self, df = df) ### TODO implement concurrent syncing (split DataFrame? mimic the functionality of modin?) from meerschaum.utils.threading import Thread def default_callback(result_tuple : SuccessTuple): dprint(f"Asynchronous result from Pipe '{self}': {result_tuple}", **kw) def default_error_callback(x : Exception): dprint(f"Error received for Pipe '{self}': {x}", **kw) if callback is None and debug: callback = default_callback if error_callback is None and debug: error_callback = default_error_callback try: thread = Thread( target = _sync, args = (self,), kwargs = {'df' : df}, daemon = False, callback = callback, error_callback = error_callback ) thread.start() except Exception as e: return False, str(e) return True, f"Spawned asyncronous sync for pipe '{self}'."
- Connector keys (e.g.