Module meerschaum.connectors.sql.SQLConnector
Interface with SQL servers using sqlalchemy.
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Interface with SQL servers using sqlalchemy.
"""
from __future__ import annotations
from meerschaum.utils.typing import Optional, Any
from meerschaum.connectors import Connector
from meerschaum.utils.warnings import error
class SQLConnector(Connector):
"""
Connect to SQL databases via `sqlalchemy`.
SQLConnectors may be used as Meerschaum instance connectors.
Read more about connectors and instances at
https://meerschaum.io/reference/connectors/
"""
IS_INSTANCE: bool = True
from ._create_engine import flavor_configs, create_engine
from ._sql import read, value, exec, execute, to_sql, exec_queries
from meerschaum.utils.sql import test_connection
from ._fetch import fetch, get_pipe_metadef
from ._cli import cli
from ._pipes import (
fetch_pipes_keys,
create_indices,
drop_indices,
get_create_index_queries,
get_drop_index_queries,
get_add_columns_queries,
get_alter_columns_queries,
delete_pipe,
get_pipe_data,
get_pipe_data_query,
register_pipe,
edit_pipe,
get_pipe_id,
get_pipe_attributes,
sync_pipe,
sync_pipe_inplace,
get_sync_time,
pipe_exists,
get_pipe_rowcount,
drop_pipe,
clear_pipe,
deduplicate_pipe,
get_pipe_table,
get_pipe_columns_types,
get_to_sql_dtype,
)
from ._plugins import (
register_plugin,
delete_plugin,
get_plugin_id,
get_plugin_version,
get_plugins,
get_plugin_user_id,
get_plugin_username,
get_plugin_attributes,
)
from ._users import (
register_user,
get_user_id,
get_users,
edit_user,
delete_user,
get_user_password_hash,
get_user_type,
get_user_attributes,
)
from ._uri import from_uri, parse_uri
def __init__(
self,
label: Optional[str] = None,
flavor: Optional[str] = None,
wait: bool = False,
connect: bool = False,
debug: bool = False,
**kw: Any
):
"""
Parameters
----------
label: str, default 'main'
The identifying label for the connector.
E.g. for `sql:main`, 'main' is the label.
Defaults to 'main'.
flavor: Optional[str], default None
The database flavor, e.g.
`'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
To see supported flavors, run the `bootstrap connectors` command.
wait: bool, default False
If `True`, block until a database connection has been made.
Defaults to `False`.
connect: bool, default False
If `True`, immediately attempt to connect the database and raise
a warning if the connection fails.
Defaults to `False`.
debug: bool, default False
Verbosity toggle.
Defaults to `False`.
kw: Any
All other arguments will be passed to the connector's attributes.
Therefore, a connector may be made without being registered,
as long enough parameters are supplied to the constructor.
"""
if 'uri' in kw:
uri = kw['uri']
if uri.startswith('postgres://'):
uri = uri.replace('postgres://', 'postgresql://', 1)
if uri.startswith('timescaledb://'):
uri = uri.replace('timescaledb://', 'postgresql://', 1)
flavor = 'timescaledb'
kw['uri'] = uri
from_uri_params = self.from_uri(kw['uri'], as_dict=True)
label = label or from_uri_params.get('label', None)
_ = from_uri_params.pop('label', None)
### Sometimes the flavor may be provided with a URI.
kw.update(from_uri_params)
if flavor:
kw['flavor'] = flavor
### set __dict__ in base class
super().__init__(
'sql',
label = label or self.__dict__.get('label', None),
**kw
)
if self.__dict__.get('flavor', None) == 'sqlite':
self._reset_attributes()
self._set_attributes(
'sql',
label = label,
inherit_default = False,
**kw
)
### For backwards compatability reasons, set the path for sql:local if its missing.
if self.label == 'local' and not self.__dict__.get('database', None):
from meerschaum.config._paths import SQLITE_DB_PATH
self.database = str(SQLITE_DB_PATH)
### ensure flavor and label are set accordingly
if 'flavor' not in self.__dict__:
if flavor is None and 'uri' not in self.__dict__:
raise Exception(
f" Missing flavor. Provide flavor as a key for '{self}'."
)
self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
if self.flavor == 'postgres':
self.flavor = 'postgresql'
self._debug = debug
### Store the PID and thread at initialization
### so we can dispose of the Pool in child processes or threads.
import os, threading
self._pid = os.getpid()
self._thread_ident = threading.current_thread().ident
self._sessions = {}
self._locks = {'_sessions': threading.RLock(), }
### verify the flavor's requirements are met
if self.flavor not in self.flavor_configs:
error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
if not self.__dict__.get('uri'):
self.verify_attributes(
self.flavor_configs[self.flavor].get('requirements', set()),
debug=debug,
)
if wait:
from meerschaum.connectors.poll import retry_connect
retry_connect(connector=self, debug=debug)
if connect:
if not self.test_connection(debug=debug):
from meerschaum.utils.warnings import warn
warn(f"Failed to connect with connector '{self}'!", stack=False)
@property
def Session(self):
if '_Session' not in self.__dict__:
if self.engine is None:
return None
from meerschaum.utils.packages import attempt_import
sqlalchemy_orm = attempt_import('sqlalchemy.orm')
session_factory = sqlalchemy_orm.sessionmaker(self.engine)
self._Session = sqlalchemy_orm.scoped_session(session_factory)
return self._Session
@property
def engine(self):
import os, threading
### build the sqlalchemy engine
if '_engine' not in self.__dict__:
self._engine, self._engine_str = self.create_engine(include_uri=True)
same_process = os.getpid() == self._pid
same_thread = threading.current_thread().ident == self._thread_ident
### handle child processes
if not same_process:
self._pid = os.getpid()
self._thread = threading.current_thread()
from meerschaum.utils.warnings import warn
warn(f"Different PID detected. Disposing of connections...")
self._engine.dispose()
### handle different threads
if not same_thread:
pass
return self._engine
@property
def DATABASE_URL(self) -> str:
"""
Return the URI connection string (alias for `SQLConnector.URI`.
"""
_ = self.engine
return str(self._engine_str)
@property
def URI(self) -> str:
"""
Return the URI connection string.
"""
_ = self.engine
return str(self._engine_str)
@property
def IS_THREAD_SAFE(self) -> str:
"""
Return whether this connector may be multithreaded.
"""
if self.flavor == 'duckdb':
return False
if self.flavor == 'sqlite':
return ':memory:' not in self.URI
return True
@property
def metadata(self):
from meerschaum.utils.packages import attempt_import
sqlalchemy = attempt_import('sqlalchemy')
if '_metadata' not in self.__dict__:
self._metadata = sqlalchemy.MetaData()
return self._metadata
@property
def db(self) -> Optional[databases.Database]:
from meerschaum.utils.packages import attempt_import
databases = attempt_import('databases', lazy=False, install=True)
url = self.DATABASE_URL
if 'mysql' in url:
url = url.replace('+pymysql', '')
if '_db' not in self.__dict__:
try:
self._db = databases.Database(url)
except KeyError:
### Likely encountered an unsupported flavor.
from meerschaum.utils.warnings import warn
self._db = None
return self._db
@property
def db_version(self) -> Union[str, None]:
"""
Return the database version.
"""
_db_version = self.__dict__.get('_db_version', None)
if _db_version is not None:
return _db_version
from meerschaum.utils.sql import get_db_version
self._db_version = get_db_version(self)
return self._db_version
def __getstate__(self):
return self.__dict__
def __setstate__(self, d):
self.__dict__.update(d)
def __call__(self):
return self
Classes
class SQLConnector (label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)
-
Connect to SQL databases via
sqlalchemy
.SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
Parameters
label
:str
, default'main'
- The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. flavor
:Optional[str]
, defaultNone
- The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. wait
:bool
, defaultFalse
- If
True
, block until a database connection has been made. Defaults toFalse
. connect
:bool
, defaultFalse
- If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. debug
:bool
, defaultFalse
- Verbosity toggle.
Defaults to
False
. kw
:Any
- All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
Expand source code
class SQLConnector(Connector): """ Connect to SQL databases via `sqlalchemy`. SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/ """ IS_INSTANCE: bool = True from ._create_engine import flavor_configs, create_engine from ._sql import read, value, exec, execute, to_sql, exec_queries from meerschaum.utils.sql import test_connection from ._fetch import fetch, get_pipe_metadef from ._cli import cli from ._pipes import ( fetch_pipes_keys, create_indices, drop_indices, get_create_index_queries, get_drop_index_queries, get_add_columns_queries, get_alter_columns_queries, delete_pipe, get_pipe_data, get_pipe_data_query, register_pipe, edit_pipe, get_pipe_id, get_pipe_attributes, sync_pipe, sync_pipe_inplace, get_sync_time, pipe_exists, get_pipe_rowcount, drop_pipe, clear_pipe, deduplicate_pipe, get_pipe_table, get_pipe_columns_types, get_to_sql_dtype, ) from ._plugins import ( register_plugin, delete_plugin, get_plugin_id, get_plugin_version, get_plugins, get_plugin_user_id, get_plugin_username, get_plugin_attributes, ) from ._users import ( register_user, get_user_id, get_users, edit_user, delete_user, get_user_password_hash, get_user_type, get_user_attributes, ) from ._uri import from_uri, parse_uri def __init__( self, label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any ): """ Parameters ---------- label: str, default 'main' The identifying label for the connector. E.g. for `sql:main`, 'main' is the label. Defaults to 'main'. flavor: Optional[str], default None The database flavor, e.g. `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. To see supported flavors, run the `bootstrap connectors` command. wait: bool, default False If `True`, block until a database connection has been made. Defaults to `False`. connect: bool, default False If `True`, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to `False`. debug: bool, default False Verbosity toggle. Defaults to `False`. kw: Any All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor. """ if 'uri' in kw: uri = kw['uri'] if uri.startswith('postgres://'): uri = uri.replace('postgres://', 'postgresql://', 1) if uri.startswith('timescaledb://'): uri = uri.replace('timescaledb://', 'postgresql://', 1) flavor = 'timescaledb' kw['uri'] = uri from_uri_params = self.from_uri(kw['uri'], as_dict=True) label = label or from_uri_params.get('label', None) _ = from_uri_params.pop('label', None) ### Sometimes the flavor may be provided with a URI. kw.update(from_uri_params) if flavor: kw['flavor'] = flavor ### set __dict__ in base class super().__init__( 'sql', label = label or self.__dict__.get('label', None), **kw ) if self.__dict__.get('flavor', None) == 'sqlite': self._reset_attributes() self._set_attributes( 'sql', label = label, inherit_default = False, **kw ) ### For backwards compatability reasons, set the path for sql:local if its missing. if self.label == 'local' and not self.__dict__.get('database', None): from meerschaum.config._paths import SQLITE_DB_PATH self.database = str(SQLITE_DB_PATH) ### ensure flavor and label are set accordingly if 'flavor' not in self.__dict__: if flavor is None and 'uri' not in self.__dict__: raise Exception( f" Missing flavor. Provide flavor as a key for '{self}'." ) self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) if self.flavor == 'postgres': self.flavor = 'postgresql' self._debug = debug ### Store the PID and thread at initialization ### so we can dispose of the Pool in child processes or threads. import os, threading self._pid = os.getpid() self._thread_ident = threading.current_thread().ident self._sessions = {} self._locks = {'_sessions': threading.RLock(), } ### verify the flavor's requirements are met if self.flavor not in self.flavor_configs: error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") if not self.__dict__.get('uri'): self.verify_attributes( self.flavor_configs[self.flavor].get('requirements', set()), debug=debug, ) if wait: from meerschaum.connectors.poll import retry_connect retry_connect(connector=self, debug=debug) if connect: if not self.test_connection(debug=debug): from meerschaum.utils.warnings import warn warn(f"Failed to connect with connector '{self}'!", stack=False) @property def Session(self): if '_Session' not in self.__dict__: if self.engine is None: return None from meerschaum.utils.packages import attempt_import sqlalchemy_orm = attempt_import('sqlalchemy.orm') session_factory = sqlalchemy_orm.sessionmaker(self.engine) self._Session = sqlalchemy_orm.scoped_session(session_factory) return self._Session @property def engine(self): import os, threading ### build the sqlalchemy engine if '_engine' not in self.__dict__: self._engine, self._engine_str = self.create_engine(include_uri=True) same_process = os.getpid() == self._pid same_thread = threading.current_thread().ident == self._thread_ident ### handle child processes if not same_process: self._pid = os.getpid() self._thread = threading.current_thread() from meerschaum.utils.warnings import warn warn(f"Different PID detected. Disposing of connections...") self._engine.dispose() ### handle different threads if not same_thread: pass return self._engine @property def DATABASE_URL(self) -> str: """ Return the URI connection string (alias for `SQLConnector.URI`. """ _ = self.engine return str(self._engine_str) @property def URI(self) -> str: """ Return the URI connection string. """ _ = self.engine return str(self._engine_str) @property def IS_THREAD_SAFE(self) -> str: """ Return whether this connector may be multithreaded. """ if self.flavor == 'duckdb': return False if self.flavor == 'sqlite': return ':memory:' not in self.URI return True @property def metadata(self): from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if '_metadata' not in self.__dict__: self._metadata = sqlalchemy.MetaData() return self._metadata @property def db(self) -> Optional[databases.Database]: from meerschaum.utils.packages import attempt_import databases = attempt_import('databases', lazy=False, install=True) url = self.DATABASE_URL if 'mysql' in url: url = url.replace('+pymysql', '') if '_db' not in self.__dict__: try: self._db = databases.Database(url) except KeyError: ### Likely encountered an unsupported flavor. from meerschaum.utils.warnings import warn self._db = None return self._db @property def db_version(self) -> Union[str, None]: """ Return the database version. """ _db_version = self.__dict__.get('_db_version', None) if _db_version is not None: return _db_version from meerschaum.utils.sql import get_db_version self._db_version = get_db_version(self) return self._db_version def __getstate__(self): return self.__dict__ def __setstate__(self, d): self.__dict__.update(d) def __call__(self): return self
Ancestors
- meerschaum.connectors.Connector.Connector
Class variables
var IS_INSTANCE : bool
var flavor_configs
Static methods
def from_uri(uri: str, label: Optional[str] = None, as_dict: bool = False) ‑> Union[SQLConnector, Dict[str, Union[str, int]]]
-
Create a new SQLConnector from a URI string.
Parameters
uri
:str
- The URI connection string.
label
:Optional[str]
, defaultNone
- If provided, use this as the connector label. Otherwise use the determined database name.
as_dict
:bool
, defaultFalse
- If
True
, return a dictionary of the keyword arguments necessary to create a newSQLConnector
, otherwise create a new object.
Returns
A new SQLConnector object or a dictionary of attributes (if
as_dict
isTrue
).Expand source code
@classmethod def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False, ) -> Union[ 'meerschaum.connectors.SQLConnector', Dict[str, Union[str, int]], ]: """ Create a new SQLConnector from a URI string. Parameters ---------- uri: str The URI connection string. label: Optional[str], default None If provided, use this as the connector label. Otherwise use the determined database name. as_dict: bool, default False If `True`, return a dictionary of the keyword arguments necessary to create a new `SQLConnector`, otherwise create a new object. Returns ------- A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`). """ params = cls.parse_uri(uri) params['uri'] = uri flavor = params.get('flavor', None) if not flavor or flavor not in cls.flavor_configs: error(f"Invalid flavor '{flavor}' detected from the provided URI.") if 'database' not in params: error("Unable to determine the database from the provided URI.") if flavor in ('sqlite', 'duckdb'): if params['database'] == ':memory:': params['label'] = label or f'memory_{flavor}' else: params['label'] = label or params['database'].split(os.path.sep)[-1].lower() else: params['label'] = label or ( ( (params['username'] + '@' if 'username' in params else '') + params.get('host', '') + ('/' if 'host' in params else '') + params.get('database', '') ).lower() ) return cls(**params) if not as_dict else params
def parse_uri(uri: str) ‑> Dict[str, Any]
-
Parse a URI string into a dictionary of parameters.
Parameters
uri
:str
- The database connection URI.
Returns
A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db') {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} >>> parse_uri( ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' ... ) {'host': 'localhost', 'database': 'master', 'username': 'sa', 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 'driver': 'ODBC Driver 17 for SQL Server'} >>>
Expand source code
@staticmethod def parse_uri(uri: str) -> Dict[str, Any]: """ Parse a URI string into a dictionary of parameters. Parameters ---------- uri: str The database connection URI. Returns ------- A dictionary of attributes. Examples -------- >>> parse_uri('sqlite:////home/foo/bar.db') {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} >>> parse_uri( ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' ... ) {'host': 'localhost', 'database': 'master', 'username': 'sa', 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 'driver': 'ODBC Driver 17 for SQL Server'} >>> """ from urllib.parse import parse_qs, urlparse sqlalchemy = attempt_import('sqlalchemy') parser = sqlalchemy.engine.url.make_url params = parser(uri).translate_connect_args() params['flavor'] = uri.split(':')[0].split('+')[0] if params['flavor'] == 'postgres': params['flavor'] = 'postgresql' if '?' in uri: parsed_uri = urlparse(uri) for key, value in parse_qs(parsed_uri.query).items(): params.update({key: value[0]}) return params
Instance variables
var DATABASE_URL : str
-
Return the URI connection string (alias for
SQLConnector.URI
.Expand source code
@property def DATABASE_URL(self) -> str: """ Return the URI connection string (alias for `SQLConnector.URI`. """ _ = self.engine return str(self._engine_str)
var IS_THREAD_SAFE : str
-
Return whether this connector may be multithreaded.
Expand source code
@property def IS_THREAD_SAFE(self) -> str: """ Return whether this connector may be multithreaded. """ if self.flavor == 'duckdb': return False if self.flavor == 'sqlite': return ':memory:' not in self.URI return True
var Session
-
Expand source code
@property def Session(self): if '_Session' not in self.__dict__: if self.engine is None: return None from meerschaum.utils.packages import attempt_import sqlalchemy_orm = attempt_import('sqlalchemy.orm') session_factory = sqlalchemy_orm.sessionmaker(self.engine) self._Session = sqlalchemy_orm.scoped_session(session_factory) return self._Session
var URI : str
-
Return the URI connection string.
Expand source code
@property def URI(self) -> str: """ Return the URI connection string. """ _ = self.engine return str(self._engine_str)
var db : Optional[databases.Database]
-
Expand source code
@property def db(self) -> Optional[databases.Database]: from meerschaum.utils.packages import attempt_import databases = attempt_import('databases', lazy=False, install=True) url = self.DATABASE_URL if 'mysql' in url: url = url.replace('+pymysql', '') if '_db' not in self.__dict__: try: self._db = databases.Database(url) except KeyError: ### Likely encountered an unsupported flavor. from meerschaum.utils.warnings import warn self._db = None return self._db
var db_version : Union[str, None]
-
Return the database version.
Expand source code
@property def db_version(self) -> Union[str, None]: """ Return the database version. """ _db_version = self.__dict__.get('_db_version', None) if _db_version is not None: return _db_version from meerschaum.utils.sql import get_db_version self._db_version = get_db_version(self) return self._db_version
var engine
-
Expand source code
@property def engine(self): import os, threading ### build the sqlalchemy engine if '_engine' not in self.__dict__: self._engine, self._engine_str = self.create_engine(include_uri=True) same_process = os.getpid() == self._pid same_thread = threading.current_thread().ident == self._thread_ident ### handle child processes if not same_process: self._pid = os.getpid() self._thread = threading.current_thread() from meerschaum.utils.warnings import warn warn(f"Different PID detected. Disposing of connections...") self._engine.dispose() ### handle different threads if not same_thread: pass return self._engine
var metadata
-
Expand source code
@property def metadata(self): from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if '_metadata' not in self.__dict__: self._metadata = sqlalchemy.MetaData() return self._metadata
Methods
def clear_pipe(self, pipe: mrsm.Pipe, begin: Union[datetime, int, None] = None, end: Union[datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> Tuple[bool, str]
-
Delete a pipe's data within a bounded or unbounded interval without dropping the table.
Parameters
pipe
:mrsm.Pipe
- The pipe to clear.
begin
:Union[datetime, int, None]
, defaultNone
- Beginning datetime. Inclusive.
end
:Union[datetime, int, None]
, defaultNone
- Ending datetime. Exclusive.
params
:Optional[Dict[str, Any]]
, defaultNone
- See
build_where()
.
Expand source code
def clear_pipe( self, pipe: mrsm.Pipe, begin: Union[datetime, int, None] = None, end: Union[datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw ) -> SuccessTuple: """ Delete a pipe's data within a bounded or unbounded interval without dropping the table. Parameters ---------- pipe: mrsm.Pipe The pipe to clear. begin: Union[datetime, int, None], default None Beginning datetime. Inclusive. end: Union[datetime, int, None], default None Ending datetime. Exclusive. params: Optional[Dict[str, Any]], default None See `meerschaum.utils.sql.build_where`. """ if not pipe.exists(debug=debug): return True, f"{pipe} does not exist, so nothing was cleared." from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str from meerschaum.utils.warnings import warn pipe_name = sql_item_name(pipe.target, self.flavor) if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt_name = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt_name = sql_item_name(_dt, self.flavor) is_guess = False if begin is not None or end is not None: if is_guess: if _dt is None: warn( f"No datetime could be determined for {pipe}." + "\n Ignoring datetime bounds...", stack = False, ) begin, end = None, None else: warn( f"A datetime wasn't specified for {pipe}.\n" + f" Using column \"{_dt}\" for datetime bounds...", stack = False, ) valid_params = {} if params is not None: existing_cols = pipe.get_columns_types(debug=debug) valid_params = {k: v for k, v in params.items() if k in existing_cols} clear_query = ( f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n" + (' AND ' + build_where(valid_params, self, with_where=False) if valid_params else '') + ( f' AND {dt_name} >= ' + dateadd_str(self.flavor, 'day', 0, begin) if begin is not None else '' ) + ( f' AND {dt_name} < ' + dateadd_str(self.flavor, 'day', 0, end) if end is not None else '' ) ) success = self.exec(clear_query, silent=True, debug=debug) is not None msg = "Success" if success else f"Failed to clear {pipe}." return success, msg
def cli(self, debug: bool = False) ‑> Tuple[bool, str]
-
Launch an interactive CLI for the SQLConnector's flavor.
Expand source code
def cli( self, debug : bool = False ) -> SuccessTuple: """Launch an interactive CLI for the SQLConnector's flavor.""" from meerschaum.utils.packages import venv_exec, attempt_import from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import error import sys, subprocess, os if self.flavor not in flavor_clis: return False, f"No CLI available for flavor '{self.flavor}'." if self.flavor == 'duckdb': gadwall = attempt_import('gadwall', debug = debug, lazy=False) gadwall_shell = gadwall.Gadwall(self.database) try: gadwall_shell.cmdloop() except KeyboardInterrupt: pass return True, "Success" elif self.flavor == 'mssql': if 'DOTNET_SYSTEM_GLOBALIZATION_INVARIANT' not in os.environ: os.environ['DOTNET_SYSTEM_GLOBALIZATION_INVARIANT'] = '1' cli_name = flavor_clis[self.flavor] ### Install the CLI package and any dependencies. cli, cli_main = attempt_import(cli_name, (cli_name + '.main'), lazy=False, debug=debug) if cli_name in cli_deps: for dep in cli_deps[cli_name]: locals()[dep] = attempt_import(dep, lazy=False, warn=False, debug=debug) ### NOTE: The `DATABASE_URL` property must be initialized first in case the database is not ### yet defined (e.g. 'sql:local'). cli_arg_str = self.DATABASE_URL if self.flavor in ('sqlite', 'duckdb'): cli_arg_str = str(self.database) ### Define the script to execute to launch the CLI. ### The `mssqlcli` script is manually written to avoid telemetry ### and because `main.cli()` is not defined. launch_cli = f"cli_main.cli(['{cli_arg_str}'])" if self.flavor == 'mssql': launch_cli = ( "mssqlclioptionsparser, mssql_cli = attempt_import(" + "'mssqlcli.mssqlclioptionsparser', 'mssqlcli.mssql_cli', lazy=False)\n" + "ms_parser = mssqlclioptionsparser.create_parser()\n" + f"ms_options = ms_parser.parse_args(['--server', 'tcp:{self.host},{self.port}', " + f"'--database', '{self.database}', " + f"'--username', '{self.username}', '--password', '{self.password}'])\n" + "ms_object = mssql_cli.MssqlCli(ms_options)\n" + "try:\n" + " ms_object.connect_to_database()\n" + " ms_object.run()\n" + "finally:\n" + " ms_object.shutdown()" ) elif self.flavor == 'duckdb': launch_cli = () try: if debug: dprint(f'Launching CLI:\n{launch_cli}') exec(launch_cli) success, msg = True, 'Success' except Exception as e: success, msg = False, str(e) return success, msg
def create_engine(self, include_uri: bool = False, debug: bool = False, **kw) ‑> sqlalchemy.engine.Engine
-
Create a sqlalchemy engine by building the engine string.
Expand source code
def create_engine( self, include_uri: bool = False, debug: bool = False, **kw ) -> 'sqlalchemy.engine.Engine': """Create a sqlalchemy engine by building the engine string.""" from meerschaum.utils.packages import attempt_import from meerschaum.utils.warnings import error, warn sqlalchemy = attempt_import('sqlalchemy') import urllib import copy ### Install and patch required drivers. if self.flavor in install_flavor_drivers: attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False) if self.flavor in require_patching_flavors: from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution import pathlib for install_name, import_name in require_patching_flavors[self.flavor]: pkg = attempt_import( import_name, debug = debug, lazy = False, warn = False ) _monkey_patch_get_distribution( install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm') ) ### supplement missing values with defaults (e.g. port number) for a, value in flavor_configs[self.flavor]['defaults'].items(): if a not in self.__dict__: self.__dict__[a] = value ### Verify that everything is in order. if self.flavor not in flavor_configs: error(f"Cannot create a connector with the flavor '{self.flavor}'.") _engine = flavor_configs[self.flavor].get('engine', None) _username = self.__dict__.get('username', None) _password = self.__dict__.get('password', None) _host = self.__dict__.get('host', None) _port = self.__dict__.get('port', None) _database = self.__dict__.get('database', None) _driver = self.__dict__.get('driver', None) if _driver is not None: ### URL-encode the driver if a space is detected. ### Not a bullet-proof solution, but this should work for most cases. _driver = urllib.parse.quote_plus(_driver) if ' ' in _driver else _driver _uri = self.__dict__.get('uri', None) ### Handle registering specific dialects (due to installing in virtual environments). if self.flavor in flavor_dialects: sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) ### self._sys_config was deepcopied and can be updated safely if self.flavor in ("sqlite", "duckdb"): engine_str = f"{_engine}:///{_database}" if not _uri else _uri if 'create_engine' not in self._sys_config: self._sys_config['create_engine'] = {} if 'connect_args' not in self._sys_config['create_engine']: self._sys_config['create_engine']['connect_args'] = {} self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False}) else: engine_str = ( _engine + "://" + (_username if _username is not None else '') + ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + "@" + _host + ((":" + str(_port)) if _port is not None else '') + (("/" + _database) if _database is not None else '') + (("?driver=" + _driver) if _driver is not None else '') ) if not _uri else _uri ### Sometimes the timescaledb:// flavor can slip in. if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri: engine_str = engine_str.replace(f'{self.flavor}://', 'postgresql://') if debug: dprint( ( (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) if _password is not None else engine_str ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" ) _kw_copy = copy.deepcopy(kw) ### NOTE: Order of inheritance: ### 1. Defaults ### 2. System configuration ### 3. Connector configuration ### 4. Keyword arguments _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) def _apply_create_engine_args(update): if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): _create_engine_args.update( { k: v for k, v in update.items() if 'omit_create_engine' not in flavor_configs[self.flavor] or k not in flavor_configs[self.flavor].get('omit_create_engine') } ) _apply_create_engine_args(self._sys_config.get('create_engine', {})) _apply_create_engine_args(self.__dict__.get('create_engine', {})) _apply_create_engine_args(_kw_copy) try: engine = sqlalchemy.create_engine( engine_str, ### I know this looks confusing, and maybe it's bad code, ### but it's simple. It dynamically parses the config string ### and splits it to separate the class name (QueuePool) ### from the module name (sqlalchemy.pool). poolclass = getattr( attempt_import( ".".join(self._sys_config['poolclass'].split('.')[:-1]) ), self._sys_config['poolclass'].split('.')[-1] ), echo = debug, **_create_engine_args ) except Exception as e: warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) engine = None if include_uri: return engine, engine_str return engine
def create_indices(self, pipe: mrsm.Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool
-
Create a pipe's indices.
Expand source code
def create_indices( self, pipe: mrsm.Pipe, indices: Optional[List[str]] = None, debug: bool = False ) -> bool: """ Create a pipe's indices. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint if debug: dprint(f"Creating indices for {pipe}...") if not pipe.columns: warn(f"Unable to create indices for {pipe} without columns.", stack=False) return False ix_queries = { ix: queries for ix, queries in self.get_create_index_queries(pipe, debug=debug).items() if indices is None or ix in indices } success = True for ix, queries in ix_queries.items(): ix_success = all(self.exec_queries(queries, debug=debug, silent=True)) if not ix_success: success = False if debug: dprint(f"Failed to create index on column: {ix}") return success
def deduplicate_pipe(self, pipe: mrsm.Pipe, begin: Union[datetime, int, None] = None, end: Union[datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) ‑> Tuple[bool, str]
-
Delete duplicate values within a pipe's table.
Parameters
pipe
:mrsm.Pipe
- The pipe whose table to deduplicate.
begin
:Union[datetime, int, None]
, defaultNone
- If provided, only deduplicate values greater than or equal to this value.
end
:Union[datetime, int, None]
, defaultNone
- If provided, only deduplicate values less than this value.
params
:Optional[Dict[str, Any]]
, defaultNone
- If provided, further limit deduplication to values which match this query dictionary.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
SuccessTuple
indicating success.Expand source code
def deduplicate_pipe( self, pipe: mrsm.Pipe, begin: Union[datetime, int, None] = None, end: Union[datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any ) -> SuccessTuple: """ Delete duplicate values within a pipe's table. Parameters ---------- pipe: mrsm.Pipe The pipe whose table to deduplicate. begin: Union[datetime, int, None], default None If provided, only deduplicate values greater than or equal to this value. end: Union[datetime, int, None], default None If provided, only deduplicate values less than this value. params: Optional[Dict[str, Any]], default None If provided, further limit deduplication to values which match this query dictionary. debug: bool, default False Verbosity toggle. Returns ------- A `SuccessTuple` indicating success. """ from meerschaum.utils.sql import ( sql_item_name, NO_CTE_FLAVORS, get_rename_table_queries, NO_SELECT_INTO_FLAVORS, get_create_table_query, format_cte_subquery, get_null_replacement, ) from meerschaum.utils.misc import generate_password, flatten_list pipe_table_name = sql_item_name(pipe.target, self.flavor) if not pipe.exists(debug=debug): return False, f"Table {pipe_table_name} does not exist." ### TODO: Handle deleting duplicates without a datetime axis. dt_col = pipe.columns.get('datetime', None) dt_col_name = sql_item_name(dt_col, self.flavor) cols_types = pipe.get_columns_types(debug=debug) existing_cols = pipe.get_columns_types(debug=debug) get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}" old_rowcount = self.value(get_rowcount_query, debug=debug) if old_rowcount is None: return False, f"Failed to get rowcount for table {pipe_table_name}." ### Non-datetime indices that in fact exist. indices = [ col for key, col in pipe.columns.items() if col and col != dt_col and col in cols_types ] indices_names = [sql_item_name(index_col, self.flavor) for index_col in indices] existing_cols_names = [sql_item_name(col, self.flavor) for col in existing_cols] duplicates_cte_name = sql_item_name('dups', self.flavor) duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor) previous_row_number_name = sql_item_name('prev_row_num', self.flavor) index_list_str = ( sql_item_name(dt_col, self.flavor) if dt_col else '' ) index_list_str_ordered = ( ( sql_item_name(dt_col, self.flavor) + " DESC" ) if dt_col else '' ) if indices: index_list_str += ', ' + ', '.join(indices_names) index_list_str_ordered += ', ' + ', '.join(indices_names) if index_list_str.startswith(','): index_list_str = index_list_str.lstrip(',').lstrip() if index_list_str_ordered.startswith(','): index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip() cols_list_str = ', '.join(existing_cols_names) try: ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()). is_old_mysql = ( self.flavor in ('mysql', 'mariadb') and int(self.db_version.split('.')[0]) < 8 ) except Exception as e: is_old_mysql = False src_query = f""" SELECT {cols_list_str}, ROW_NUMBER() OVER ( PARTITION BY {index_list_str} ORDER BY {index_list_str_ordered} ) AS {duplicate_row_number_name} FROM {pipe_table_name} """ duplicates_cte_subquery = format_cte_subquery( src_query, self.flavor, sub_name = 'src', cols_to_select = cols_list_str, ) + f""" WHERE {duplicate_row_number_name} = 1 """ old_mysql_query = ( f""" SELECT {index_list_str} FROM ( SELECT {index_list_str}, IF( @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')}, @{duplicate_row_number_name} := 0, @{duplicate_row_number_name} ), @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')}, @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """ + f"""{duplicate_row_number_name} FROM {pipe_table_name}, ( SELECT @{duplicate_row_number_name} := 0 ) AS {duplicate_row_number_name}, ( SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}' ) AS {previous_row_number_name} ORDER BY {index_list_str_ordered} ) AS t WHERE {duplicate_row_number_name} = 1 """ ) if is_old_mysql: duplicates_cte_subquery = old_mysql_query session_id = generate_password(3) dedup_table = '_' + session_id + f'_dedup_{pipe.target}' temp_old_table = '_' + session_id + f"_old_{pipe.target}" dedup_table_name = sql_item_name(dedup_table, self.flavor) temp_old_table_name = sql_item_name(temp_old_table, self.flavor) duplicates_count_name = sql_item_name('num_duplicates', self.flavor) create_temporary_table_query = get_create_table_query( duplicates_cte_subquery, dedup_table, self.flavor, ) + f""" ORDER BY {index_list_str_ordered} """ alter_queries = flatten_list([ get_rename_table_queries(pipe.target, temp_old_table, self.flavor), get_rename_table_queries(dedup_table, pipe.target, self.flavor), f""" DROP TABLE {temp_old_table_name} """, ]) create_temporary_result = self.execute(create_temporary_table_query, debug=debug) if create_temporary_result is None: return False, f"Failed to deduplicate table {pipe_table_name}." results = self.exec_queries( alter_queries, break_on_error = True, rollback = True, debug = debug, ) fail_query = None for result, query in zip(results, alter_queries): if result is None: fail_query = query break success = fail_query is None new_rowcount = ( self.value(get_rowcount_query, debug=debug) if success else None ) msg = ( ( f"Successfully deduplicated table {pipe_table_name}" + ( f"\nfrom {old_rowcount} to {new_rowcount} rows" if old_rowcount != new_rowcount else '' ) + '.' ) if success else f"Failed to execute query:\n{fail_query}" ) return success, msg
def delete_pipe(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Tuple[bool, str]
-
Delete a Pipe's registration and drop its table.
Expand source code
def delete_pipe( self, pipe: mrsm.Pipe, debug: bool = False, ) -> SuccessTuple: """ Delete a Pipe's registration and drop its table. """ from meerschaum.utils.warnings import warn from meerschaum.utils.sql import sql_item_name from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') ### try dropping first drop_tuple = pipe.drop(debug=debug) if not drop_tuple[0]: return drop_tuple if not pipe.id: return False, f"{pipe} is not registered." ### ensure pipes table exists from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) if not self.exec(q, debug=debug): return False, f"Failed to delete registration for {pipe}." return True, "Success"
def delete_plugin(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Delete a plugin from the plugins table.
Expand source code
def delete_plugin( self, plugin: 'meerschaum.core.Plugin', debug: bool = False, **kw: Any ) -> SuccessTuple: """Delete a plugin from the plugins table.""" from meerschaum.utils.warnings import warn, error from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] plugin_id = self.get_plugin_id(plugin, debug=debug) if plugin_id is None: return True, f"Plugin '{plugin}' was not registered." bind_variables = { 'plugin_id' : plugin_id, } query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to delete plugin '{plugin}'." return True, f"Successfully deleted plugin '{plugin}'."
def delete_user(self, user: meerschaum.core.User, debug: bool = False) ‑> SuccessTuple
-
Delete a user's record from the users table.
Expand source code
def delete_user( self, user: meerschaum.core.User, debug: bool = False ) -> SuccessTuple: """Delete a user's record from the users table.""" ### ensure users table exists from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] plugins = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) if user_id is None: return False, f"User '{user.username}' is not registered and cannot be deleted." query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to delete user '{user}'." query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to delete plugins of user '{user}'." return True, f"Successfully deleted user '{user}'"
def drop_indices(self, pipe: mrsm.Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool
-
Drop a pipe's indices.
Expand source code
def drop_indices( self, pipe: mrsm.Pipe, indices: Optional[List[str]] = None, debug: bool = False ) -> bool: """ Drop a pipe's indices. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint if debug: dprint(f"Dropping indices for {pipe}...") if not pipe.columns: warn(f"Unable to drop indices for {pipe} without columns.", stack=False) return False ix_queries = { ix: queries for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items() if indices is None or ix in indices } success = True for ix, queries in ix_queries.items(): ix_success = all(self.exec_queries(queries, debug=debug, silent=True)) if not ix_success: success = False if debug: dprint(f"Failed to drop index on column: {ix}") return success
def drop_pipe(self, pipe: mrsm.Pipe, debug: bool = False, **kw) ‑> Tuple[bool, str]
-
Drop a pipe's tables but maintain its registration.
Parameters
pipe
:mrsm.Pipe
- The pipe to drop.
Expand source code
def drop_pipe( self, pipe: mrsm.Pipe, debug: bool = False, **kw ) -> SuccessTuple: """ Drop a pipe's tables but maintain its registration. Parameters ---------- pipe: mrsm.Pipe The pipe to drop. """ from meerschaum.utils.sql import table_exists, sql_item_name success = True target, temp_target = pipe.target, '_' + pipe.target target_name, temp_name = ( sql_item_name(target, self.flavor), sql_item_name(temp_target, self.flavor), ) if table_exists(target, self, debug=debug): success = self.exec(f"DROP TABLE {target_name}", silent=True, debug=debug) is not None if table_exists(temp_target, self, debug=debug): success = ( success and self.exec(f"DROP TABLE {temp_name}", silent=True, debug=debug) is not None ) msg = "Success" if success else f"Failed to drop {pipe}." return success, msg
def edit_pipe(self, pipe: mrsm.Pipe = None, patch: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]
-
Persist a Pipe's parameters to its database.
Parameters
pipe
:mrsm.Pipe
, defaultNone
- The pipe to be edited.
patch
:bool
, defaultFalse
- If patch is
True
, update the existing parameters by cascading. Otherwise overwrite the parameters (default). debug
:bool
, defaultFalse
- Verbosity toggle.
Expand source code
def edit_pipe( self, pipe : mrsm.Pipe = None, patch: bool = False, debug: bool = False, **kw : Any ) -> SuccessTuple: """ Persist a Pipe's parameters to its database. Parameters ---------- pipe: mrsm.Pipe, default None The pipe to be edited. patch: bool, default False If patch is `True`, update the existing parameters by cascading. Otherwise overwrite the parameters (default). debug: bool, default False Verbosity toggle. """ if pipe.id is None: return False, f"{pipe} is not registered and cannot be edited." from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import from meerschaum.utils.sql import json_flavors if not patch: parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {}) else: from meerschaum import Pipe from meerschaum.config._patch import apply_patch_to_config original_parameters = Pipe( pipe.connector_keys, pipe.metric_key, pipe.location_key, mrsm_instance=pipe.instance_keys ).parameters parameters = apply_patch_to_config( original_parameters, pipe.parameters ) ### ensure pipes table exists from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] import json sqlalchemy = attempt_import('sqlalchemy') values = { 'parameters': ( json.dumps(parameters) if self.flavor not in json_flavors else parameters ), } q = sqlalchemy.update(pipes_tbl).values(**values).where( pipes_tbl.c.pipe_id == pipe.id ) result = self.exec(q, debug=debug) message = ( f"Successfully edited {pipe}." if result is not None else f"Failed to edit {pipe}." ) return (result is not None), message
def edit_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Update an existing user's metadata.
Expand source code
def edit_user( self, user: meerschaum.core.User, debug: bool = False, **kw: Any ) -> SuccessTuple: """Update an existing user's metadata.""" from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables from meerschaum.utils.sql import json_flavors users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) if user_id is None: return False, ( f"User '{user.username}' does not exist. " + f"Register user '{user.username}' before editing." ) user.user_id = user_id import json valid_tuple = valid_username(user.username) if not valid_tuple[0]: return valid_tuple bind_variables = { 'user_id' : user_id, 'username' : user.username, } if user.password != '': bind_variables['password_hash'] = user.password_hash if user.email != '': bind_variables['email'] = user.email if user.attributes is not None and user.attributes != {}: bind_variables['attributes'] = ( json.dumps(user.attributes) if self.flavor in ('duckdb',) else user.attributes ) if user.type != '': bind_variables['user_type'] = user.type query = ( sqlalchemy .update(users_tbl) .values(**bind_variables) .where(users_tbl.c.user_id == user_id) ) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to edit user '{user}'." return True, f"Successfully edited user '{user}'."
def exec(self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, **kw: Any) ‑> Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]
-
Execute SQL code and return the
sqlalchemy
result, e.g. when calling stored procedures.If inserting data, please use bind variables to avoid SQL injection!
Parameters
query
:Union[str, List[str], Tuple[str]]
- The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. args
:Any
- Arguments passed to
sqlalchemy.engine.execute
. silent
:bool
, defaultFalse
- If
True
, suppress warnings. commit
:Optional[bool]
, defaultNone
- If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. close
:Optional[bool]
, defaultNone
- If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. with_connection
:bool
, defaultFalse
- If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.Expand source code
def exec( self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, **kw: Any ) -> Union[ sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None ]: """ Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. If inserting data, please use bind variables to avoid SQL injection! Parameters ---------- query: Union[str, List[str], Tuple[str]] The query to execute. If `query` is a list or tuple, call `self.exec_queries()` instead. args: Any Arguments passed to `sqlalchemy.engine.execute`. silent: bool, default False If `True`, suppress warnings. commit: Optional[bool], default None If `True`, commit the changes after execution. Causes issues with flavors like `'mssql'`. This does not apply if `query` is a list of strings. close: Optional[bool], default None If `True`, close the connection after execution. Causes issues with flavors like `'mssql'`. This does not apply if `query` is a list of strings. with_connection: bool, default False If `True`, return a tuple including the connection object. This does not apply if `query` is a list of strings. Returns ------- The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. """ if isinstance(query, (list, tuple)): return self.exec_queries( list(query), *args, silent = silent, debug = debug, **kw ) from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import("sqlalchemy") if debug: dprint(f"[{self}] Executing query:\n{query}") _close = close if close is not None else (self.flavor != 'mssql') _commit = commit if commit is not None else ( (self.flavor != 'mssql' or 'select' not in str(query).lower()) ) ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). if not hasattr(query, 'compile'): query = sqlalchemy.text(query) connection = self.engine.connect() transaction = connection.begin() if _commit else None try: result = connection.execute(query, *args, **kw) if _commit: transaction.commit() except Exception as e: if debug: dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") if not silent: warn(str(e)) result = None if _commit: transaction.rollback() finally: if _close: connection.close() if with_connection: return result, connection return result
def exec_queries(self, queries: List[str], break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False) ‑> List[sqlalchemy.engine.cursor.LegacyCursorResult]
-
Execute a list of queries in a single transaction.
Parameters
queries
:List[str]
- The queries in the transaction to be executed.
break_on_error
:bool
, defaultTrue
- If
True
, stop executing when a query fails. rollback
:bool
, defaultTrue
- If
break_on_error
isTrue
, rollback the transaction if a query fails. silent
:bool
, defaultFalse
- If
True
, suppress warnings.
Returns
A list of SQLAlchemy results.
Expand source code
def exec_queries( self, queries: List[str], break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False, ) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]: """ Execute a list of queries in a single transaction. Parameters ---------- queries: List[str] The queries in the transaction to be executed. break_on_error: bool, default True If `True`, stop executing when a query fails. rollback: bool, default True If `break_on_error` is `True`, rollback the transaction if a query fails. silent: bool, default False If `True`, suppress warnings. Returns ------- A list of SQLAlchemy results. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') results = [] with self.engine.begin() as connection: for query in queries: if debug: dprint(f"[{self}]\n" + str(query)) if isinstance(query, str): query = sqlalchemy.text(query) try: result = connection.execute(query) except Exception as e: msg = (f"Encountered error while executing:\n{e}") if not silent: warn(msg) elif debug: dprint(f"[{self}]\n" + str(msg)) result = None results.append(result) if result is None and break_on_error: if rollback: connection.rollback() break return results
def execute(self, *args: Any, **kw: Any) ‑> Optional[sqlalchemy.engine.result.resultProxy]
-
An alias for
meerschaum.connectors.sql.SQLConnector.exec
.Expand source code
def execute( self, *args : Any, **kw : Any ) -> Optional[sqlalchemy.engine.result.resultProxy]: """ An alias for `meerschaum.connectors.sql.SQLConnector.exec`. """ return self.exec(*args, **kw)
def fetch(self, pipe: Pipe, begin: Union[datetime, int, str, None] = '', end: Union[datetime, int, str, None] = None, check_existing: bool = True, chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) ‑> Union['pd.DataFrame', List[Any], None]
-
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe
:Pipe
-
The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
begin
:Union[datetime, int, str, None]
, defaultNone
- Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. end
:Union[datetime, int, str, None]
, defaultNone
- The latest datetime to search for data.
If
end
isNone
, do not bound check_existing
:bool, defult True
- If
False
, use a backtrack interval of 0 minutes. chunk_hook
:Callable[[pd.DataFrame], Any]
, defaultNone
- A function to pass to
read()
that accepts a Pandas DataFrame. chunksize
:Optional[int]
, default-1
- How many rows to load into memory at once (when
chunk_hook
is provided). Otherwise the entire result set is loaded into memory. workers
:Optional[int]
, defaultNone
- How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A pandas DataFrame or
None
. Ifchunk_hook
is not None, return a list of the hook function's results.Expand source code
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime, int, str, None] = '', end: Union[datetime, int, str, None] = None, check_existing: bool = True, chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any ) -> Union['pd.DataFrame', List[Any], None]: """Execute the SQL definition and return a Pandas DataFrame. Parameters ---------- pipe: meerschaum.Pipe The pipe object which contains the `fetch` metadata. - pipe.columns['datetime']: str - Name of the datetime column for the remote table. - pipe.parameters['fetch']: Dict[str, Any] - Parameters necessary to execute a query. - pipe.parameters['fetch']['definition']: str - Raw SQL query to execute to generate the pandas DataFrame. - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] - How many minutes before `begin` to search for data (*optional*). begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If `backtrack_minutes` is provided, subtract `backtrack_minutes`. end: Union[datetime, int, str, None], default None The latest datetime to search for data. If `end` is `None`, do not bound check_existing: bool, defult True If `False`, use a backtrack interval of 0 minutes. chunk_hook: Callable[[pd.DataFrame], Any], default None A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame. chunksize: Optional[int], default -1 How many rows to load into memory at once (when `chunk_hook` is provided). Otherwise the entire result set is loaded into memory. workers: Optional[int], default None How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores. debug: bool, default False Verbosity toggle. Returns ------- A pandas DataFrame or `None`. If `chunk_hook` is not None, return a list of the hook function's results. """ meta_def = self.get_pipe_metadef( pipe, begin = begin, end = end, check_existing = check_existing, debug = debug, **kw ) as_hook_results = chunk_hook is not None chunks = self.read( meta_def, chunk_hook = chunk_hook, as_hook_results = as_hook_results, chunksize = chunksize, workers = workers, debug = debug, ) ### if sqlite, parse for datetimes if not as_hook_results and self.flavor == 'sqlite': from meerschaum.utils.misc import parse_df_datetimes ignore_cols = [ col for col, dtype in pipe.dtypes.items() if 'datetime' not in str(dtype) ] return ( parse_df_datetimes( chunk, ignore_cols = ignore_cols, debug = debug, ) for chunk in chunks ) return chunks
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Optional[List[Tuple[str, str, Optional[str]]]]
-
Return a list of tuples corresponding to the parameters provided.
Parameters
connector_keys
:Optional[List[str]]
, defaultNone
- List of connector_keys to search by.
metric_keys
:Optional[List[str]]
, defaultNone
- List of metric_keys to search by.
location_keys
:Optional[List[str]]
, defaultNone
- List of location_keys to search by.
params
:Optional[Dict[str, Any]]
, defaultNone
- Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
debug
:bool
, defaultFalse
- Verbosity toggle.
Expand source code
def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False ) -> Optional[List[Tuple[str, str, Optional[str]]]]: """ Return a list of tuples corresponding to the parameters provided. Parameters ---------- connector_keys: Optional[List[str]], default None List of connector_keys to search by. metric_keys: Optional[List[str]], default None List of metric_keys to search by. location_keys: Optional[List[str]], default None List of location_keys to search by. params: Optional[Dict[str, Any]], default None Dictionary of additional parameters to search by. E.g. `--params pipe_id:1` debug: bool, default False Verbosity toggle. """ from meerschaum.utils.warnings import error from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import from meerschaum.utils.misc import separate_negation_values from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists from meerschaum.config.static import STATIC_CONFIG sqlalchemy = attempt_import('sqlalchemy') import json from copy import deepcopy if connector_keys is None: connector_keys = [] if metric_keys is None: metric_keys = [] if location_keys is None: location_keys = [] else: location_keys = [ (lk if lk not in ('[None]', 'None', 'null') else None) for lk in location_keys ] if tags is None: tags = [] if params is None: params = {} ### Add three primary keys to params dictionary ### (separated for convenience of arguments). cols = { 'connector_keys': connector_keys, 'metric_key': metric_keys, 'location_key': location_keys, } ### Make deep copy so we don't mutate this somewhere else. parameters = deepcopy(params) for col, vals in cols.items(): ### Allow for IS NULL to be declared as a single-item list ([None]). if vals == [None]: vals = None if vals not in [[], ['*']]: parameters[col] = vals cols = {k: v for k, v in cols.items() if v != [None]} if not table_exists('pipes', self, debug=debug): return [] from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] _params = {} for k, v in parameters.items(): _v = json.dumps(v) if isinstance(v, dict) else v _params[k] = _v negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] ### Parse regular params. ### If a param begins with '_', negate it instead. _where = [ ( (pipes_tbl.c[key] == val) if not str(val).startswith(negation_prefix) else (pipes_tbl.c[key] != key) ) for key, val in _params.items() if not isinstance(val, (list, tuple)) and key in pipes_tbl.c ] select_cols = ( [pipes_tbl.c.connector_keys, pipes_tbl.c.metric_key, pipes_tbl.c.location_key] + ([pipes_tbl.c.parameters] if tags else []) ) q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) ### Parse IN params and add OR IS NULL if None in list. for c, vals in cols.items(): if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c: continue _in_vals, _ex_vals = separate_negation_values(vals) ### Include params (positive) q = ( q.where(pipes_tbl.c[c].in_(_in_vals)) if None not in _in_vals else q.where(sqlalchemy.or_(pipes_tbl.c[c].in_(_in_vals), pipes_tbl.c[c].is_(None))) ) if _in_vals else q ### Exclude params (negative) q = q.where(pipes_tbl.c[c].not_in(_ex_vals)) if _ex_vals else q ### Finally, parse tags. _in_tags, _ex_tags = separate_negation_values(tags) ors = [] for nt in _in_tags: ors.append( sqlalchemy.cast( pipes_tbl.c['parameters'], sqlalchemy.String, ).like(f'%"tags":%"{nt}"%') ) q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q ors = [] for xt in _ex_tags: ors.append( sqlalchemy.cast( pipes_tbl.c['parameters'], sqlalchemy.String, ).not_like(f'%"tags":%"{xt}"%') ) q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) if self.flavor not in OMIT_NULLSFIRST_FLAVORS: loc_asc = sqlalchemy.nullsfirst(loc_asc) q = q.order_by( sqlalchemy.asc(pipes_tbl.c['connector_keys']), sqlalchemy.asc(pipes_tbl.c['metric_key']), loc_asc, ) ### execute the query and return a list of tuples if debug: dprint(q.compile(compile_kwargs={'literal_binds': True})) try: rows = ( self.execute(q).fetchall() if self.flavor != 'duckdb' else [ (row['connector_keys'], row['metric_key'], row['location_key']) for row in self.read(q).to_dict(orient='records') ] ) except Exception as e: error(str(e)) _keys = [(row[0], row[1], row[2]) for row in rows] if not tags: return _keys ### Make 100% sure that the tags are correct. keys = [] for row in rows: ktup = (row[0], row[1], row[2]) _actual_tags = ( json.loads(row[3]) if isinstance(row[3], str) else row[3] ).get('tags', []) for nt in _in_tags: if nt in _actual_tags: keys.append(ktup) for xt in _ex_tags: if xt in _actual_tags: keys.remove(ktup) else: keys.append(ktup) return keys
def get_add_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]
-
Add new null columns of the correct type to a table from a dataframe.
Parameters
pipe
:mrsm.Pipe
- The pipe to be altered.
df
:Union[pd.DataFrame, Dict[str, str]]
- The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
Returns
A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.Expand source code
def get_add_columns_queries( self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False, ) -> List[str]: """ Add new null columns of the correct type to a table from a dataframe. Parameters ---------- pipe: mrsm.Pipe The pipe to be altered. df: Union[pd.DataFrame, Dict[str, str]] The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types. Returns ------- A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. """ if not pipe.exists(debug=debug): return [] import copy from meerschaum.utils.sql import ( sql_item_name, SINGLE_ALTER_TABLE_FLAVORS, ) from meerschaum.utils.dtypes.sql import ( get_pd_type_from_db_type, get_db_type_from_pd_type, ) from meerschaum.utils.misc import flatten_list table_obj = self.get_pipe_table(pipe, debug=debug) is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False if is_dask: df = df.partitions[0].compute() df_cols_types = ( { col: str(typ) for col, typ in df.dtypes.items() } if not isinstance(df, dict) else copy.deepcopy(df) ) if not isinstance(df, dict) and len(df.index) > 0: for col, typ in list(df_cols_types.items()): if typ != 'object': continue val = df.iloc[0][col] if isinstance(val, (dict, list)): df_cols_types[col] = 'json' elif isinstance(val, str): df_cols_types[col] = 'str' db_cols_types = { col: get_pd_type_from_db_type(str(typ.type)) for col, typ in table_obj.columns.items() } new_cols = set(df_cols_types) - set(db_cols_types) if not new_cols: return [] new_cols_types = { col: get_db_type_from_pd_type( df_cols_types[col], self.flavor ) for col in new_cols } alter_table_query = "ALTER TABLE " + sql_item_name(pipe.target, self.flavor) queries = [] for col, typ in new_cols_types.items(): add_col_query = "\nADD " + sql_item_name(col, self.flavor) + " " + typ + "," if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: queries.append(alter_table_query + add_col_query[:-1]) else: alter_table_query += add_col_query ### For most flavors, only one query is required. ### This covers SQLite which requires one query per column. if not queries: queries.append(alter_table_query[:-1]) if self.flavor != 'duckdb': return queries ### NOTE: For DuckDB, we must drop and rebuild the indices. drop_index_queries = list(flatten_list( [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] )) create_index_queries = list(flatten_list( [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] )) return drop_index_queries + queries + create_index_queries
def get_alter_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]
-
If we encounter a column of a different type, set the entire column to text.
Parameters
pipe
:mrsm.Pipe
- The pipe to be altered.
df
:Union[pd.DataFrame, Dict[str, str]]
- The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.Expand source code
def get_alter_columns_queries( self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False, ) -> List[str]: """ If we encounter a column of a different type, set the entire column to text. Parameters ---------- pipe: mrsm.Pipe The pipe to be altered. df: Union[pd.DataFrame, Dict[str, str]] The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types. Returns ------- A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. """ if not pipe.exists(debug=debug): return [] from meerschaum.utils.sql import sql_item_name from meerschaum.utils.dtypes import are_dtypes_equal from meerschaum.utils.dtypes.sql import ( get_pd_type_from_db_type, get_db_type_from_pd_type, ) from meerschaum.utils.misc import flatten_list, generate_password table_obj = self.get_pipe_table(pipe, debug=debug) target = pipe.target session_id = generate_password(3) df_cols_types = ( { col: str(typ) for col, typ in df.dtypes.items() } if not isinstance(df, dict) else df ) db_cols_types = { col: get_pd_type_from_db_type(str(typ.type)) for col, typ in table_obj.columns.items() } pd_db_df_aliases = { 'int': 'bool', } altered_cols = { col: (db_cols_types.get(col, 'object'), typ) for col, typ in df_cols_types.items() if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower()) and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string') } ### NOTE: Sometimes bools are coerced into ints. altered_cols_to_ignore = set() for col, (db_typ, df_typ) in altered_cols.items(): for db_alias, df_alias in pd_db_df_aliases.items(): if db_alias in db_typ.lower() and df_alias in df_typ.lower(): altered_cols_to_ignore.add(col) continue for col in altered_cols_to_ignore: _ = altered_cols.pop(col, None) if not altered_cols: return [] text_type = get_db_type_from_pd_type('str', self.flavor) altered_cols_types = { col: text_type for col in altered_cols } if self.flavor == 'sqlite': temp_table_name = '_' + session_id + '_' + target rename_query = ( "ALTER TABLE " + sql_item_name(target, self.flavor) + " RENAME TO " + sql_item_name(temp_table_name, self.flavor) ) create_query = ( "CREATE TABLE " + sql_item_name(target, self.flavor) + " (\n" ) for col_name, col_obj in table_obj.columns.items(): create_query += ( sql_item_name(col_name, self.flavor) + " " + (str(col_obj.type) if col_name not in altered_cols else text_type) + ",\n" ) create_query = create_query[:-2] + "\n)" insert_query = ( "INSERT INTO " + sql_item_name(target, self.flavor) + "\nSELECT\n" ) for col_name, col_obj in table_obj.columns.items(): new_col_str = ( sql_item_name(col_name, self.flavor) if col_name not in altered_cols else f"CAST({sql_item_name(col_name, self.flavor)} AS {text_type})" ) insert_query += new_col_str + ",\n" insert_query = insert_query[:-2] + f"\nFROM {sql_item_name(temp_table_name, self.flavor)}" drop_query = f"DROP TABLE {sql_item_name(temp_table_name, self.flavor)}" return [ rename_query, create_query, insert_query, drop_query, ] queries = [] if self.flavor == 'oracle': add_query = "ALTER TABLE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): add_query += "\nADD " + sql_item_name(col + '_temp', self.flavor) + " " + typ + "," add_query = add_query[:-1] queries.append(add_query) populate_temp_query = "UPDATE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): populate_temp_query += ( "\nSET " + sql_item_name(col + '_temp', self.flavor) + ' = ' + sql_item_name(col, self.flavor) + ',' ) populate_temp_query = populate_temp_query[:-1] queries.append(populate_temp_query) set_old_cols_to_null_query = "UPDATE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): set_old_cols_to_null_query += ( "\nSET " + sql_item_name(col, self.flavor) + ' = NULL,' ) set_old_cols_to_null_query = set_old_cols_to_null_query[:-1] queries.append(set_old_cols_to_null_query) alter_type_query = "ALTER TABLE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): alter_type_query += ( "\nMODIFY " + sql_item_name(col, self.flavor) + ' ' + typ + ',' ) alter_type_query = alter_type_query[:-1] queries.append(alter_type_query) set_old_to_temp_query = "UPDATE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): set_old_to_temp_query += ( "\nSET " + sql_item_name(col, self.flavor) + ' = ' + sql_item_name(col + '_temp', self.flavor) + ',' ) set_old_to_temp_query = set_old_to_temp_query[:-1] queries.append(set_old_to_temp_query) drop_temp_query = "ALTER TABLE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): drop_temp_query += ( "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor) + ',' ) drop_temp_query = drop_temp_query[:-1] queries.append(drop_temp_query) return queries query = "ALTER TABLE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): alter_col_prefix = ( 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') else 'MODIFY' ) type_prefix = ( '' if self.flavor in ('mssql', 'mariadb', 'mysql') else 'TYPE ' ) column_str = 'COLUMN' if self.flavor != 'oracle' else '' query += ( f"\n{alter_col_prefix} {column_str} " + sql_item_name(col, self.flavor) + " " + type_prefix + typ + "," ) query = query[:-1] queries.append(query) if self.flavor != 'duckdb': return queries drop_index_queries = list(flatten_list( [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] )) create_index_queries = list(flatten_list( [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] )) return drop_index_queries + queries + create_index_queries
def get_create_index_queries(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Dict[str, List[str]]
-
Return a dictionary mapping columns to a
CREATE INDEX
or equivalent query.Parameters
pipe
:mrsm.Pipe
- The pipe to which the queries will correspond.
Returns
A dictionary of column names mapping to lists of queries.
Expand source code
def get_create_index_queries( self, pipe: mrsm.Pipe, debug: bool = False, ) -> Dict[str, List[str]]: """ Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. Parameters ---------- pipe: mrsm.Pipe The pipe to which the queries will correspond. Returns ------- A dictionary of column names mapping to lists of queries. """ from meerschaum.utils.debug import dprint from meerschaum.utils.sql import sql_item_name, get_distinct_col_count from meerschaum.utils.warnings import warn from meerschaum.config import get_config index_queries = {} indices = pipe.get_indices() _datetime = pipe.get_columns('datetime', error=False) _datetime_type = pipe.dtypes.get(_datetime, 'datetime64[ns]') _datetime_name = sql_item_name(_datetime, self.flavor) if _datetime is not None else None _datetime_index_name = ( sql_item_name(indices['datetime'], self.flavor) if indices.get('datetime', None) else None ) _id = pipe.get_columns('id', error=False) _id_name = sql_item_name(_id, self.flavor) if _id is not None else None _id_index_name = sql_item_name(indices['id'], self.flavor) if indices.get('id') else None _pipe_name = sql_item_name(pipe.target, self.flavor) _create_space_partition = get_config('system', 'experimental', 'space') ### create datetime index if _datetime is not None: if self.flavor == 'timescaledb': _id_count = ( get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) if (_id is not None and _create_space_partition) else None ) chunk_interval = pipe.get_chunk_interval(debug=debug) chunk_interval_minutes = ( chunk_interval if isinstance(chunk_interval, int) else int(chunk_interval.total_seconds() / 60) ) chunk_time_interval = ( f"INTERVAL '{chunk_interval_minutes} MINUTES'" if isinstance(chunk_interval, timedelta) else f'{chunk_interval_minutes}' ) dt_query = ( f"SELECT create_hypertable('{_pipe_name}', " + f"'{_datetime}', " + ( f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) else '' ) + f'chunk_time_interval => {chunk_time_interval}, ' + "migrate_data => true);" ) else: ### mssql, sqlite, etc. dt_query = ( f"CREATE INDEX {_datetime_index_name} " + f"ON {_pipe_name} ({_datetime_name})" ) index_queries[_datetime] = [dt_query] ### create id index if _id_name is not None: if self.flavor == 'timescaledb': ### Already created indices via create_hypertable. id_query = ( None if (_id is not None and _create_space_partition) else ( f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" if _id is not None else None ) ) pass elif self.flavor == 'citus': id_query = [( f"CREATE INDEX {_id_index_name} " + f"ON {_pipe_name} ({_id_name});" ), ( f"SELECT create_distributed_table('{_pipe_name}', '{_id}');" )] else: ### mssql, sqlite, etc. id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" if id_query is not None: index_queries[_id] = [id_query] ### Create indices for other labels in `pipe.columns`. other_indices = { ix_key: ix_unquoted for ix_key, ix_unquoted in pipe.get_indices().items() if ix_key not in ('datetime', 'id') } for ix_key, ix_unquoted in other_indices.items(): ix_name = sql_item_name(ix_unquoted, self.flavor) col = pipe.columns[ix_key] col_name = sql_item_name(col, self.flavor) index_queries[col] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({col_name})"] return index_queries
def get_drop_index_queries(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Dict[str, List[str]]
-
Return a dictionary mapping columns to a
DROP INDEX
or equivalent query.Parameters
pipe
:mrsm.Pipe
- The pipe to which the queries will correspond.
Returns
A dictionary of column names mapping to lists of queries.
Expand source code
def get_drop_index_queries( self, pipe: mrsm.Pipe, debug: bool = False, ) -> Dict[str, List[str]]: """ Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. Parameters ---------- pipe: mrsm.Pipe The pipe to which the queries will correspond. Returns ------- A dictionary of column names mapping to lists of queries. """ if not pipe.exists(debug=debug): return {} from meerschaum.utils.sql import sql_item_name, table_exists, hypertable_queries drop_queries = {} indices = pipe.get_indices() pipe_name = sql_item_name(pipe.target, self.flavor) if self.flavor not in hypertable_queries: is_hypertable = False else: is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None if is_hypertable: nuke_queries = [] temp_table = '_' + pipe.target + '_temp_migration' temp_table_name = sql_item_name(temp_table, self.flavor) if table_exists(temp_table, self, debug=debug): nuke_queries.append(f"DROP TABLE {temp_table_name}") nuke_queries += [ f"SELECT * INTO {temp_table_name} FROM {pipe_name}", f"DROP TABLE {pipe_name}", f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name}", ] nuke_ix_keys = ('datetime', 'id') nuked = False for ix_key in nuke_ix_keys: if ix_key in indices and not nuked: drop_queries[ix_key] = nuke_queries nuked = True drop_queries.update({ ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor)] for ix_key, ix_unquoted in indices.items() if ix_key not in drop_queries }) return drop_queries
def get_pipe_attributes(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Dict[str, Any]
-
Get a Pipe's attributes dictionary.
Expand source code
def get_pipe_attributes( self, pipe: mrsm.Pipe, debug: bool = False, ) -> Dict[str, Any]: """ Get a Pipe's attributes dictionary. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint from meerschaum.connectors.sql.tables import get_tables from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if pipe.get_id(debug=debug) is None: return {} pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] try: q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) if debug: dprint(q) attributes = ( dict(self.exec(q, silent=True, debug=debug).first()._mapping) if self.flavor != 'duckdb' else self.read(q, debug=debug).to_dict(orient='records')[0] ) except Exception as e: import traceback traceback.print_exc() warn(e) print(pipe) return {} ### handle non-PostgreSQL databases (text vs JSON) if not isinstance(attributes.get('parameters', None), dict): try: import json parameters = json.loads(attributes['parameters']) if isinstance(parameters, str) and parameters[0] == '{': parameters = json.loads(parameters) attributes['parameters'] = parameters except Exception as e: attributes['parameters'] = {} return attributes
def get_pipe_columns_types(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Optional[Dict[str, str]]
-
Get the pipe's columns and types.
Parameters
pipe
:mrsm.Pipe:
- The pipe to get the columns for.
Returns
A dictionary of columns names (
str
) and types (str
).Examples
>>> conn.get_pipe_columns_types(pipe) { 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 'id': 'BIGINT', 'val': 'DOUBLE PRECISION', } >>>
Expand source code
def get_pipe_columns_types( self, pipe: mrsm.Pipe, debug: bool = False, ) -> Optional[Dict[str, str]]: """ Get the pipe's columns and types. Parameters ---------- pipe: mrsm.Pipe: The pipe to get the columns for. Returns ------- A dictionary of columns names (`str`) and types (`str`). Examples -------- >>> conn.get_pipe_columns_types(pipe) { 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 'id': 'BIGINT', 'val': 'DOUBLE PRECISION', } >>> """ if not pipe.exists(debug=debug): return {} table_columns = {} try: pipe_table = self.get_pipe_table(pipe, debug=debug) if pipe_table is None: return {} for col in pipe_table.columns: table_columns[str(col.name)] = str(col.type) except Exception as e: import traceback traceback.print_exc() from meerschaum.utils.warnings import warn warn(e) table_columns = None return table_columns
def get_pipe_data(self, pipe: Optional[mrsm.Pipe] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime, str, None] = None, end: Union[datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any) ‑> Union[pd.DataFrame, None]
-
Access a pipe's data from the SQL instance.
Parameters
pipe
:mrsm.Pipe:
- The pipe to get data from.
select_columns
:Optional[List[str]]
, defaultNone
- If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). omit_columns
:Optional[List[str]]
, defaultNone
- If provided, remove these columns from the selection.
begin
:Union[datetime, str, None]
, defaultNone
- If provided, get rows newer than or equal to this value.
end
:Union[datetime, str, None]
, defaultNone
- If provided, get rows older than or equal to this value.
params
:Optional[Dict[str, Any]]
, defaultNone
- Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. order
:Optional[str]
, default'asc'
- The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. limit
:Optional[int]
, defaultNone
- If specified, limit the number of rows retrieved to this value.
begin_add_minutes
:int
, default0
- The number of minutes to add to the
begin
datetime (i.e.DATEADD
. end_add_minutes
:int
, default0
- The number of minutes to add to the
end
datetime (i.e.DATEADD
. chunksize
:Optional[int]
, default-1
- The size of dataframe chunks to load into memory.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
pd.DataFrame
of the pipe's data.Expand source code
def get_pipe_data( self, pipe: Optional[mrsm.Pipe] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime, str, None] = None, end: Union[datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any ) -> Union[pd.DataFrame, None]: """ Access a pipe's data from the SQL instance. Parameters ---------- pipe: mrsm.Pipe: The pipe to get data from. select_columns: Optional[List[str]], default None If provided, only select these given columns. Otherwise select all available columns (i.e. `SELECT *`). omit_columns: Optional[List[str]], default None If provided, remove these columns from the selection. begin: Union[datetime, str, None], default None If provided, get rows newer than or equal to this value. end: Union[datetime, str, None], default None If provided, get rows older than or equal to this value. params: Optional[Dict[str, Any]], default None Additional parameters to filter by. See `meerschaum.connectors.sql.build_where`. order: Optional[str], default 'asc' The selection order for all of the indices in the query. If `None`, omit the `ORDER BY` clause. limit: Optional[int], default None If specified, limit the number of rows retrieved to this value. begin_add_minutes: int, default 0 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`. end_add_minutes: int, default 0 The number of minutes to add to the `end` datetime (i.e. `DATEADD`. chunksize: Optional[int], default -1 The size of dataframe chunks to load into memory. debug: bool, default False Verbosity toggle. Returns ------- A `pd.DataFrame` of the pipe's data. """ import json from meerschaum.utils.sql import sql_item_name from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype from meerschaum.utils.packages import import_pandas pd = import_pandas() is_dask = 'dask' in pd.__name__ dtypes = pipe.dtypes if dtypes: if self.flavor == 'sqlite': if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt = sql_item_name(_dt, self.flavor) is_guess = False if _dt: dt_type = dtypes.get(_dt, 'object').lower() if 'datetime' not in dt_type: if 'int' not in dt_type: dtypes[_dt] = 'datetime64[ns]' existing_cols = pipe.get_columns_types(debug=debug) select_columns = ( [ col for col in existing_cols if col not in (omit_columns or []) ] if not select_columns else [ col for col in select_columns if col in existing_cols and col not in (omit_columns or []) ] ) if select_columns: dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns} dtypes = { col: to_pandas_dtype(typ) for col, typ in dtypes.items() if col in select_columns and col not in (omit_columns or []) } query = self.get_pipe_data_query( pipe, select_columns = select_columns, omit_columns = omit_columns, begin = begin, end = end, params = params, order = order, limit = limit, begin_add_minutes = begin_add_minutes, end_add_minutes = end_add_minutes, debug = debug, **kw ) if is_dask: index_col = pipe.columns.get('datetime', None) kw['index_col'] = index_col df = self.read( query, dtype = dtypes, debug = debug, **kw ) if self.flavor == 'sqlite': ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly df = ( parse_df_datetimes( df, ignore_cols = [ col for col, dtype in pipe.dtypes.items() if 'datetime' not in str(dtype) ], chunksize = kw.get('chunksize', None), debug = debug, ) if isinstance(df, pd.DataFrame) else ( [ parse_df_datetimes( c, ignore_cols = [ col for col, dtype in pipe.dtypes.items() if 'datetime' not in str(dtype) ], chunksize = kw.get('chunksize', None), debug = debug, ) for c in df ] ) ) for col, typ in dtypes.items(): if typ != 'json': continue df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x) return df
def get_pipe_data_query(self, pipe: mrsm.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime, int, str, None] = None, end: Union[datetime, int, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, debug: bool = False, **kw: Any) ‑> Optional[str]
-
Return the
SELECT
query for retrieving a pipe's data from its instance.Parameters
pipe
:mrsm.Pipe:
- The pipe to get data from.
select_columns
:Optional[List[str]]
, defaultNone
- If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). omit_columns
:Optional[List[str]]
, defaultNone
- If provided, remove these columns from the selection.
begin
:Union[datetime, int, str, None]
, defaultNone
- If provided, get rows newer than or equal to this value.
end
:Union[datetime, str, None]
, defaultNone
- If provided, get rows older than or equal to this value.
params
:Optional[Dict[str, Any]]
, defaultNone
- Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. order
:Optional[str]
, default'asc'
- The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. limit
:Optional[int]
, defaultNone
- If specified, limit the number of rows retrieved to this value.
begin_add_minutes
:int
, default0
- The number of minutes to add to the
begin
datetime (i.e.DATEADD
). end_add_minutes
:int
, default0
- The number of minutes to add to the
end
datetime (i.e.DATEADD
). chunksize
:Optional[int]
, default-1
- The size of dataframe chunks to load into memory.
replace_nulls
:Optional[str]
, defaultNone
- If provided, replace null values with this value.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
SELECT
query to retrieve a pipe's data.Expand source code
def get_pipe_data_query( self, pipe: mrsm.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime, int, str, None] = None, end: Union[datetime, int, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, debug: bool = False, **kw: Any ) -> Union[str, None]: """ Return the `SELECT` query for retrieving a pipe's data from its instance. Parameters ---------- pipe: mrsm.Pipe: The pipe to get data from. select_columns: Optional[List[str]], default None If provided, only select these given columns. Otherwise select all available columns (i.e. `SELECT *`). omit_columns: Optional[List[str]], default None If provided, remove these columns from the selection. begin: Union[datetime, int, str, None], default None If provided, get rows newer than or equal to this value. end: Union[datetime, str, None], default None If provided, get rows older than or equal to this value. params: Optional[Dict[str, Any]], default None Additional parameters to filter by. See `meerschaum.connectors.sql.build_where`. order: Optional[str], default 'asc' The selection order for all of the indices in the query. If `None`, omit the `ORDER BY` clause. limit: Optional[int], default None If specified, limit the number of rows retrieved to this value. begin_add_minutes: int, default 0 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). end_add_minutes: int, default 0 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). chunksize: Optional[int], default -1 The size of dataframe chunks to load into memory. replace_nulls: Optional[str], default None If provided, replace null values with this value. debug: bool, default False Verbosity toggle. Returns ------- A `SELECT` query to retrieve a pipe's data. """ import json from meerschaum.utils.debug import dprint from meerschaum.utils.misc import items_str from meerschaum.utils.sql import sql_item_name, dateadd_str from meerschaum.utils.packages import import_pandas from meerschaum.utils.warnings import warn pd = import_pandas() existing_cols = pipe.get_columns_types(debug=debug) select_columns = ( [col for col in existing_cols] if not select_columns else [col for col in select_columns if col in existing_cols] ) if omit_columns: select_columns = [col for col in select_columns if col not in omit_columns] if begin == '': begin = pipe.get_sync_time(debug=debug) backtrack_interval = pipe.get_backtrack_interval(debug=debug) if begin is not None: begin -= backtrack_interval cols_names = [sql_item_name(col, self.flavor) for col in select_columns] select_cols_str = ( 'SELECT\n' + ',\n '.join( [ ( col_name if not replace_nulls else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}" ) for col_name in cols_names ] ) ) pipe_table_name = sql_item_name(pipe.target, self.flavor) query = f"{select_cols_str}\nFROM {pipe_table_name}" where = "" if order is not None: default_order = 'asc' if order not in ('asc', 'desc'): warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.") order = default_order order = order.upper() if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt = sql_item_name(_dt, self.flavor) is_guess = False quoted_indices = { key: sql_item_name(val, self.flavor) for key, val in pipe.columns.items() if val in existing_cols } if begin is not None or end is not None: if is_guess: if _dt is None: warn( f"No datetime could be determined for {pipe}." + "\n Ignoring begin and end...", stack = False, ) begin, end = None, None else: warn( f"A datetime wasn't specified for {pipe}.\n" + f" Using column \"{_dt}\" for datetime bounds...", stack = False, ) is_dt_bound = False if begin is not None and _dt in existing_cols: begin_da = dateadd_str( flavor = self.flavor, datepart = 'minute', number = begin_add_minutes, begin = begin ) where += f"{dt} >= {begin_da}" + (" AND " if end is not None else "") is_dt_bound = True if end is not None and _dt in existing_cols: if 'int' in str(type(end)).lower() and end == begin: end += 1 end_da = dateadd_str( flavor = self.flavor, datepart = 'minute', number = end_add_minutes, begin = end ) where += f"{dt} < {end_da}" is_dt_bound = True if params is not None: from meerschaum.utils.sql import build_where valid_params = {k: v for k, v in params.items() if k in existing_cols} if valid_params: where += build_where(valid_params, self).replace( 'WHERE', ('AND' if is_dt_bound else "") ) if len(where) > 0: query += "\nWHERE " + where if order is not None: ### Sort by indices, starting with datetime. order_by = "" if quoted_indices: order_by += "\nORDER BY " if _dt and _dt in existing_cols: order_by += dt + ' ' + order + ',' for key, quoted_col_name in quoted_indices.items(): if key == 'datetime': continue order_by += ' ' + quoted_col_name + ' ' + order + ',' order_by = order_by[:-1] query += order_by if isinstance(limit, int): if self.flavor == 'mssql': query = f'SELECT TOP {limit} ' + query[len("SELECT *"):] elif self.flavor == 'oracle': query = f"SELECT * FROM (\n {query}\n)\nWHERE ROWNUM = 1" else: query += f"\nLIMIT {limit}" if debug: to_print = ( [] + ([f"begin='{begin}'"] if begin else []) + ([f"end='{end}'"] if end else []) + ([f"params='{json.dumps(params)}'"] if params else []) ) dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False)) return query
def get_pipe_id(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Any
-
Get a Pipe's ID from the pipes table.
Expand source code
def get_pipe_id( self, pipe: mrsm.Pipe, debug: bool = False, ) -> Any: """ Get a Pipe's ID from the pipes table. """ if pipe.temporary: return None from meerschaum.utils.packages import attempt_import import json sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] query = sqlalchemy.select(pipes_tbl.c.pipe_id).where( pipes_tbl.c.connector_keys == pipe.connector_keys ).where( pipes_tbl.c.metric_key == pipe.metric_key ).where( (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None else pipes_tbl.c.location_key.is_(None) ) _id = self.value(query, debug=debug, silent=pipe.temporary) if _id is not None: _id = int(_id) return _id
def get_pipe_metadef(self, pipe: Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime, int, str, None] = '', end: Union[datetime, int, str, None] = None, check_existing: bool = True, debug: bool = False, **kw: Any) ‑> Union[str, None]
-
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None Optional params dictionary to build the
WHERE
clause. Seebuild_where()
.begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If
backtrack_minutes
is provided, subtractbacktrack_minutes
.end: Union[datetime, int, str, None], default None The latest datetime to search for data. If
end
isNone
, do not boundcheck_existing: bool, default True If
True
, apply the backtrack interval.debug: bool, default False Verbosity toggle.
Returns
A pipe's meta definition fetch query string.
Expand source code
def get_pipe_metadef( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime, int, str, None] = '', end: Union[datetime, int, str, None] = None, check_existing: bool = True, debug: bool = False, **kw: Any ) -> Union[str, None]: """ Return a pipe's meta definition fetch query. params: Optional[Dict[str, Any]], default None Optional params dictionary to build the `WHERE` clause. See `meerschaum.utils.sql.build_where`. begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If `backtrack_minutes` is provided, subtract `backtrack_minutes`. end: Union[datetime, int, str, None], default None The latest datetime to search for data. If `end` is `None`, do not bound check_existing: bool, default True If `True`, apply the backtrack interval. debug: bool, default False Verbosity toggle. Returns ------- A pipe's meta definition fetch query string. """ from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn, error from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where from meerschaum.utils.misc import is_int from meerschaum.config import get_config definition = get_pipe_query(pipe) if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt_name = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt_name = sql_item_name(_dt, self.flavor) is_guess = False if begin not in (None, '') or end is not None: if is_guess: if _dt is None: warn( f"Unable to determine a datetime column for {pipe}." + "\n Ignoring begin and end...", stack = False, ) begin, end = '', None else: warn( f"A datetime wasn't specified for {pipe}.\n" + f" Using column \"{_dt}\" for datetime bounds...", stack = False ) if 'order by' in definition.lower() and 'over' not in definition.lower(): error("Cannot fetch with an ORDER clause in the definition") apply_backtrack = begin == '' and check_existing backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) btm = ( int(backtrack_interval.total_seconds() / 60) if isinstance(backtrack_interval, timedelta) else backtrack_interval ) begin = ( pipe.get_sync_time(debug=debug) if begin == '' else begin ) if begin and end and begin >= end: begin = None da = None if dt_name: begin_da = ( dateadd_str( flavor = self.flavor, datepart = 'minute', number = ((-1 * btm) if apply_backtrack else 0), begin = begin, ) if begin else None ) end_da = ( dateadd_str( flavor = self.flavor, datepart = 'minute', number = 0, begin = end, ) if end else None ) meta_def = ( _simple_fetch_query(pipe) if ( (not (pipe.columns or {}).get('id', None)) or (not get_config('system', 'experimental', 'join_fetch')) ) else _join_fetch_query(pipe, debug=debug, **kw) ) has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] if dt_name and (begin_da or end_da): definition_dt_name = ( dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}") if not is_int((begin_da or end_da)) else f"definition.{dt_name}" ) meta_def += "\n" + ("AND" if has_where else "WHERE") + " " has_where = True if begin_da: meta_def += f"{definition_dt_name} >= {begin_da}" if begin_da and end_da: meta_def += " AND " if end_da: meta_def += f"{definition_dt_name} < {end_da}" if params is not None: params_where = build_where(params, self, with_where=False) meta_def += "\n" + ("AND" if has_where else "WHERE") + " " has_where = True meta_def += params_where return meta_def
def get_pipe_rowcount(self, pipe: mrsm.Pipe, begin: Union[datetime, int, None] = None, end: Union[datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) ‑> Optional[int]
-
Get the rowcount for a pipe in accordance with given parameters.
Parameters
pipe
:mrsm.Pipe
- The pipe to query with.
begin
:Union[datetime, int, None]
, defaultNone
- The begin datetime value.
end
:Union[datetime, int, None]
, defaultNone
- The end datetime value.
params
:Optional[Dict[str, Any]]
, defaultNone
- See
build_where()
. remote
:bool
, defaultFalse
- If
True
, get the rowcount for the remote table. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
An
int
for the number of rows if thepipe
exists, otherwiseNone
.Expand source code
def get_pipe_rowcount( self, pipe: mrsm.Pipe, begin: Union[datetime, int, None] = None, end: Union[datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False ) -> Union[int, None]: """ Get the rowcount for a pipe in accordance with given parameters. Parameters ---------- pipe: mrsm.Pipe The pipe to query with. begin: Union[datetime, int, None], default None The begin datetime value. end: Union[datetime, int, None], default None The end datetime value. params: Optional[Dict[str, Any]], default None See `meerschaum.utils.sql.build_where`. remote: bool, default False If `True`, get the rowcount for the remote table. debug: bool, default False Verbosity toggle. Returns ------- An `int` for the number of rows if the `pipe` exists, otherwise `None`. """ from meerschaum.utils.sql import dateadd_str, sql_item_name, NO_CTE_FLAVORS from meerschaum.utils.warnings import error, warn from meerschaum.connectors.sql._fetch import get_pipe_query if remote: msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount." if 'fetch' not in pipe.parameters: error(msg) return None if 'definition' not in pipe.parameters['fetch']: error(msg) return None _pipe_name = sql_item_name(pipe.target, self.flavor) if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt = sql_item_name(_dt, self.flavor) is_guess = False if begin is not None or end is not None: if is_guess: if _dt is None: warn( f"No datetime could be determined for {pipe}." + "\n Ignoring begin and end...", stack = False, ) begin, end = None, None else: warn( f"A datetime wasn't specified for {pipe}.\n" + f" Using column \"{_dt}\" for datetime bounds...", stack = False, ) _datetime_name = sql_item_name( _dt, pipe.instance_connector.flavor if not remote else pipe.connector.flavor ) _cols_names = [ sql_item_name(col, pipe.instance_connector.flavor if not remote else pipe.connector.flavor) for col in set( ([_dt] if _dt else []) + ([] if params is None else list(params.keys())) ) ] if not _cols_names: _cols_names = ['*'] src = ( f"SELECT {', '.join(_cols_names)} FROM {_pipe_name}" if not remote else get_pipe_query(pipe) ) query = ( f""" WITH src AS ({src}) SELECT COUNT(*) FROM src """ ) if self.flavor not in ('mysql', 'mariadb') else ( f""" SELECT COUNT(*) FROM ({src}) AS src """ ) if begin is not None or end is not None: query += "WHERE" if begin is not None: query += f""" {dt} >= {dateadd_str(self.flavor, datepart='minute', number=0, begin=begin)} """ if end is not None and begin is not None: query += "AND" if end is not None: query += f""" {dt} < {dateadd_str(self.flavor, datepart='minute', number=0, begin=end)} """ if params is not None: from meerschaum.utils.sql import build_where existing_cols = pipe.get_columns_types(debug=debug) valid_params = {k: v for k, v in params.items() if k in existing_cols} if valid_params: query += build_where(valid_params, self).replace('WHERE', ( 'AND' if (begin is not None or end is not None) else 'WHERE' ) ) result = self.value(query, debug=debug, silent=True) try: return int(result) except Exception as e: return None
def get_pipe_table(self, pipe: mrsm.Pipe, debug: bool = False) ‑> sqlalchemy.Table
-
Return the
sqlalchemy.Table
object for amrsm.Pipe
.Parameters
pipe
:mrsm.Pipe:
- The pipe in question.
Returns
A
sqlalchemy.Table
object.Expand source code
def get_pipe_table( self, pipe: mrsm.Pipe, debug: bool = False, ) -> sqlalchemy.Table: """ Return the `sqlalchemy.Table` object for a `mrsm.Pipe`. Parameters ---------- pipe: mrsm.Pipe: The pipe in question. Returns ------- A `sqlalchemy.Table` object. """ from meerschaum.utils.sql import get_sqlalchemy_table if not pipe.exists(debug=debug): return None return get_sqlalchemy_table(pipe.target, connector=self, debug=debug, refresh=True)
def get_plugin_attributes(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Dict[str, Any]
-
Return the attributes of a plugin.
Expand source code
def get_plugin_attributes( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Dict[str, Any]: """ Return the attributes of a plugin. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables import json plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = ( sqlalchemy .select(plugins_tbl.c.attributes) .where(plugins_tbl.c.plugin_name == plugin.name) ) _attr = self.value(query, debug=debug) if isinstance(_attr, str): _attr = json.loads(_attr) elif _attr is None: _attr = {} return _attr
def get_plugin_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[int]
-
Return a plugin's ID.
Expand source code
def get_plugin_id( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Optional[int]: """ Return a plugin's ID. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = ( sqlalchemy .select(plugins_tbl.c.plugin_id) .where(plugins_tbl.c.plugin_name == plugin.name) ) try: return int(self.value(query, debug=debug)) except Exception as e: return None
def get_plugin_user_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[int]
-
Return a plugin's user ID.
Expand source code
def get_plugin_user_id( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Optional[int]: """ Return a plugin's user ID. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = ( sqlalchemy .select(plugins_tbl.c.user_id) .where(plugins_tbl.c.plugin_name == plugin.name) ) try: return int(self.value(query, debug=debug)) except Exception as e: return None
def get_plugin_username(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[str]
-
Return the username of a plugin's owner.
Expand source code
def get_plugin_username( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Optional[str]: """ Return the username of a plugin's owner. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] users = get_tables(mrsm_instance=self, debug=debug)['users'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = ( sqlalchemy.select(users.c.username) .where( users.c.user_id == plugins_tbl.c.user_id and plugins_tbl.c.plugin_name == plugin.name ) ) return self.value(query, debug=debug)
def get_plugin_version(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[str]
-
Return a plugin's version.
Expand source code
def get_plugin_version( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Optional[str]: """ Return a plugin's version. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name) return self.value(query, debug=debug)
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) ‑> List[str]
-
Return a list of all registered plugins.
Parameters
user_id
:Optional[int]
, defaultNone
- If specified, filter plugins by a specific
user_id
. search_term
:Optional[str]
, defaultNone
- If specified, add a
WHERE plugin_name LIKE '{search_term}%'
clause to filter the plugins.
Returns
A list of plugin names.
Expand source code
def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any ) -> List[str]: """ Return a list of all registered plugins. Parameters ---------- user_id: Optional[int], default None If specified, filter plugins by a specific `user_id`. search_term: Optional[str], default None If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins. Returns ------- A list of plugin names. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = sqlalchemy.select(plugins_tbl.c.plugin_name) if user_id is not None: query = query.where(plugins_tbl.c.user_id == user_id) if search_term is not None: query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%')) rows = ( self.execute(query).fetchall() if self.flavor != 'duckdb' else [ (row['plugin_name'],) for row in self.read(query).to_dict(orient='records') ] ) return [row[0] for row in rows]
def get_sync_time(self, pipe: "'mrsm.Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) ‑> Union[datetime.datetime, int, ForwardRef(None)]
-
Get a Pipe's most recent datetime value.
Parameters
pipe
:mrsm.Pipe
- The pipe to get the sync time for.
params
:Optional[Dict[str, Any]]
, defaultNone
- Optional params dictionary to build the
WHERE
clause. Seebuild_where()
. newest
:bool
, defaultTrue
- If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC).
Returns
A
datetime
object (orint
if using an integer axis) if the pipe exists, otherwiseNone
.Expand source code
def get_sync_time( self, pipe: 'mrsm.Pipe', params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False, ) -> Union[datetime, int, None]: """Get a Pipe's most recent datetime value. Parameters ---------- pipe: mrsm.Pipe The pipe to get the sync time for. params: Optional[Dict[str, Any]], default None Optional params dictionary to build the `WHERE` clause. See `meerschaum.utils.sql.build_where`. newest: bool, default True If `True`, get the most recent datetime (honoring `params`). If `False`, get the oldest datetime (ASC instead of DESC). Returns ------- A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`. """ from meerschaum.utils.sql import sql_item_name, build_where from meerschaum.utils.warnings import warn table = sql_item_name(pipe.target, self.flavor) dt_col = pipe.columns.get('datetime', None) dt_type = pipe.dtypes.get(dt_col, 'datetime64[ns]') if not dt_col: _dt = pipe.guess_datetime() dt = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = dt_col dt = sql_item_name(_dt, self.flavor) is_guess = False if _dt is None: return None ASC_or_DESC = "DESC" if newest else "ASC" existing_cols = pipe.get_columns_types(debug=debug) valid_params = {} if params is not None: valid_params = {k: v for k, v in params.items() if k in existing_cols} ### If no bounds are provided for the datetime column, ### add IS NOT NULL to the WHERE clause. if _dt not in valid_params: valid_params[_dt] = '_None' where = "" if not valid_params else build_where(valid_params, self) q = f"SELECT {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}\nLIMIT 1" if self.flavor == 'mssql': q = f"SELECT TOP 1 {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}" elif self.flavor == 'oracle': q = ( "SELECT * FROM (\n" + f" SELECT {dt}\nFROM {table}{where}\n ORDER BY {dt} {ASC_or_DESC}\n" + ") WHERE ROWNUM = 1" ) try: db_time = self.value(q, silent=True, debug=debug) ### No datetime could be found. if db_time is None: return None ### sqlite returns str. if isinstance(db_time, str): from meerschaum.utils.packages import attempt_import dateutil_parser = attempt_import('dateutil.parser') st = dateutil_parser.parse(db_time) ### Do nothing if a datetime object is returned. elif isinstance(db_time, datetime): if hasattr(db_time, 'to_pydatetime'): st = db_time.to_pydatetime() else: st = db_time ### Sometimes the datetime is actually a date. elif isinstance(db_time, date): st = datetime.combine(db_time, datetime.min.time()) ### Adding support for an integer datetime axis. elif 'int' in str(type(db_time)).lower(): st = int(db_time) ### Convert pandas timestamp to Python datetime. else: st = db_time.to_pydatetime() sync_time = st except Exception as e: sync_time = None warn(str(e)) return sync_time
def get_to_sql_dtype(self, pipe: "'mrsm.Pipe'", df: "'pd.DataFrame'", update_dtypes: bool = True) ‑> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']
-
Given a pipe and DataFrame, return the
dtype
dictionary forto_sql()
.Parameters
pipe
:mrsm.Pipe
- The pipe which may contain a
dtypes
parameter. df
:pd.DataFrame
- The DataFrame to be pushed via
to_sql()
. update_dtypes
:bool
, defaultTrue
- If
True
, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
A dictionary with
sqlalchemy
datatypes.Examples
>>> import pandas as pd >>> import meerschaum as mrsm >>> >>> conn = mrsm.get_connector('sql:memory') >>> df = pd.DataFrame([{'a': {'b': 1}}]) >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) >>> get_to_sql_dtype(pipe, df) {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
Expand source code
def get_to_sql_dtype( self, pipe: 'mrsm.Pipe', df: 'pd.DataFrame', update_dtypes: bool = True, ) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']: """ Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`. Parameters ---------- pipe: mrsm.Pipe The pipe which may contain a `dtypes` parameter. df: pd.DataFrame The DataFrame to be pushed via `to_sql()`. update_dtypes: bool, default True If `True`, patch the pipe's dtypes onto the DataFrame's dtypes. Returns ------- A dictionary with `sqlalchemy` datatypes. Examples -------- >>> import pandas as pd >>> import meerschaum as mrsm >>> >>> conn = mrsm.get_connector('sql:memory') >>> df = pd.DataFrame([{'a': {'b': 1}}]) >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) >>> get_to_sql_dtype(pipe, df) {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>} """ from meerschaum.utils.misc import get_json_cols from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type df_dtypes = { col: str(typ) for col, typ in df.dtypes.items() } json_cols = get_json_cols(df) df_dtypes.update({col: 'json' for col in json_cols}) if update_dtypes: df_dtypes.update(pipe.dtypes) return { col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True) for col, typ in df_dtypes.items() }
def get_user_attributes(self, user: meerschaum.core.User, debug: bool = False) ‑> Union[Dict[str, Any], None]
-
Return the user's attributes.
Expand source code
def get_user_attributes( self, user: meerschaum.core.User, debug: bool = False ) -> Union[Dict[str, Any], None]: """ Return the user's attributes. """ ### ensure users table exists from meerschaum.utils.warnings import warn from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) query = ( sqlalchemy.select(users_tbl.c.attributes) .where(users_tbl.c.user_id == user_id) ) result = self.value(query, debug=debug) if result is not None and not isinstance(result, dict): try: result = dict(result) _parsed = True except Exception as e: _parsed = False if not _parsed: try: import json result = json.loads(result) _parsed = True except Exception as e: _parsed = False if not _parsed: warn(f"Received unexpected type for attributes: {result}") return result
def get_user_id(self, user: meerschaum.core.User, debug: bool = False) ‑> Optional[int]
-
If a user is registered, return the
user_id
.Expand source code
def get_user_id( self, user: meerschaum.core.User, debug : bool = False ) -> Optional[int]: """If a user is registered, return the `user_id`.""" ### ensure users table exists from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] query = ( sqlalchemy.select(users_tbl.c.user_id) .where(users_tbl.c.username == user.username) ) result = self.value(query, debug=debug) if result is not None: return int(result) return None
def get_user_password_hash(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]
-
Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.
Expand source code
def get_user_password_hash( self, user: meerschaum.core.User, debug: bool = False, **kw: Any ) -> Optional[str]: """ Return the password has for a user. **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it. """ from meerschaum.utils.debug import dprint from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if user.user_id is not None: user_id = user.user_id if debug: dprint(f"Already given user_id: {user_id}") else: if debug: dprint(f"Fetching user_id...") user_id = self.get_user_id(user, debug=debug) if user_id is None: return None query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id) return self.value(query, debug=debug)
def get_user_type(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]
-
Return the user's type.
Expand source code
def get_user_type( self, user: meerschaum.core.User, debug: bool = False, **kw: Any ) -> Optional[str]: """ Return the user's type. """ from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) if user_id is None: return None query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id) return self.value(query, debug=debug)
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]
-
Get the registered usernames.
Expand source code
def get_users( self, debug: bool = False, **kw: Any ) -> List[str]: """ Get the registered usernames. """ ### ensure users table exists from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = sqlalchemy.select(users_tbl.c.username) return list(self.read(query, debug=debug)['username'])
def pipe_exists(self, pipe: mrsm.Pipe, debug: bool = False) ‑> bool
-
Check that a Pipe's table exists.
Parameters
pipe
:mrsm.Pipe:
- The pipe to check.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
bool
corresponding to whether a pipe's table exists.Expand source code
def pipe_exists( self, pipe: mrsm.Pipe, debug: bool = False ) -> bool: """ Check that a Pipe's table exists. Parameters ---------- pipe: mrsm.Pipe: The pipe to check. debug: bool, default False Verbosity toggle. Returns ------- A `bool` corresponding to whether a pipe's table exists. """ from meerschaum.utils.sql import table_exists exists = table_exists(pipe.target, self, debug=debug) if debug: from meerschaum.utils.debug import dprint dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.')) return exists
def read(self, query_or_table: Union[str, sqlalchemy.Query], params: Optional[Dict[str, Any], List[str]] = None, dtype: Optional[Dict[str, Any]] = None, dtype_backend: str = 'pyarrow', chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None]
-
Read a SQL query or table into a pandas dataframe.
Parameters
query_or_table
:Union[str, sqlalchemy.Query]
- The SQL query (sqlalchemy Query or string) or name of the table from which to select.
params
:Optional[Dict[str, Any]]
, defaultNone
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.htmldtype
:Optional[Dict[str, Any]]
, defaultNone
- A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html dtype_backend
:str
, default'pyarrow'
- Which pandas dtype engine to use.
chunksize
:Optional[int]
, default-1
-
How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
workers
:Optional[int]
, defaultNone
- How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. chunk_hook
:Optional[Callable[[pandas.DataFrame], Any]]
, defaultNone
- Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results
:bool
, defaultFalse
-
If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default). chunks
:Optional[int]
, defaultNone
- Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. as_chunks
:bool
, defaultFalse
- If
True
, return a list of DataFrames. Otherwise return a single DataFrame. as_iterator
:bool
, defaultFalse
- If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. index_col
:Optional[str]
, defaultNone
- If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
silent
:bool
, defaultFalse
- If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, orNone
if something breaks.Expand source code
def read( self, query_or_table: Union[str, sqlalchemy.Query], params: Optional[Dict[str, Any], List[str]] = None, dtype: Optional[Dict[str, Any]] = None, dtype_backend: str = 'pyarrow', chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any ) -> Union[ pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None, ]: """ Read a SQL query or table into a pandas dataframe. Parameters ---------- query_or_table: Union[str, sqlalchemy.Query] The SQL query (sqlalchemy Query or string) or name of the table from which to select. params: Optional[Dict[str, Any]], default None `List` or `Dict` of parameters to pass to `pandas.read_sql()`. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html dtype: Optional[Dict[str, Any]], default None A dictionary of data types to pass to `pandas.read_sql()`. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html dtype_backend: str, default 'pyarrow' Which pandas dtype engine to use. chunksize: Optional[int], default -1 How many chunks to read at a time. `None` will read everything in one large chunk. Defaults to system configuration. **NOTE:** DuckDB does not allow for chunking. workers: Optional[int], default None How many threads to use when consuming the generator. Only applies if `chunk_hook` is provided. chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See `--sync-chunks` for an example. **NOTE:** `as_iterator` MUST be False (default). as_hook_results: bool, default False If `True`, return a `List` of the outputs of the hook function. Only applicable if `chunk_hook` is not None. **NOTE:** `as_iterator` MUST be `False` (default). chunks: Optional[int], default None Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a `chunksize` of `1000` and `chunks` of `100`. as_chunks: bool, default False If `True`, return a list of DataFrames. Otherwise return a single DataFrame. as_iterator: bool, default False If `True`, return the pandas DataFrame iterator. `chunksize` must not be `None` (falls back to 1000 if so), and hooks are not called in this case. index_col: Optional[str], default None If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. silent: bool, default False If `True`, don't raise warnings in case of errors. Defaults to `False`. Returns ------- A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, or `None` if something breaks. """ if chunks is not None and chunks <= 0: return [] from meerschaum.utils.sql import sql_item_name, truncate_item_name from meerschaum.utils.packages import attempt_import, import_pandas from meerschaum.utils.pool import get_pool from meerschaum.utils.dataframe import chunksize_to_npartitions import warnings import inspect import traceback pd = import_pandas() dd = None is_dask = 'dask' in pd.__name__ pd = attempt_import('pandas') # pd = import_pandas() is_dask = dd is not None npartitions = chunksize_to_npartitions(chunksize) if is_dask: chunksize = None sqlalchemy = attempt_import("sqlalchemy") default_chunksize = self._sys_config.get('chunksize', None) chunksize = chunksize if chunksize != -1 else default_chunksize if chunksize is None and as_iterator: if not silent and self.flavor not in _disallow_chunks_flavors: warn( f"An iterator may only be generated if chunksize is not None.\n" + "Falling back to a chunksize of 1000.", stacklevel=3, ) chunksize = 1000 if chunksize is not None and self.flavor in _max_chunks_flavors: if chunksize > _max_chunks_flavors[self.flavor]: if chunksize != default_chunksize: warn( f"The specified chunksize of {chunksize} exceeds the maximum of " + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", stacklevel = 3, ) chunksize = _max_chunks_flavors[self.flavor] ### NOTE: A bug in duckdb_engine does not allow for chunks. if chunksize is not None and self.flavor in _disallow_chunks_flavors: chunksize = None if debug: import time start = time.perf_counter() dprint(query_or_table) dprint(f"[{self}] Fetching with chunksize: {chunksize}") ### This might be sqlalchemy object or the string of a table name. ### We check for spaces and quotes to see if it might be a weird table. if ( ' ' not in str(query_or_table) or ( ' ' in str(query_or_table) and str(query_or_table).startswith('"') and str(query_or_table).endswith('"') ) ): truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) if truncated_table_name != str(query_or_table) and not silent: warn( f"Table '{name}' is too long for '{self.flavor}'," + f" will instead create the table '{truncated_name}'." ) query_or_table = sql_item_name(str(query_or_table), self.flavor) if debug: dprint(f"[{self}] Reading from table {query_or_table}") formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) str_query = f"SELECT * FROM {query_or_table}" else: str_query = query_or_table formatted_query = ( sqlalchemy.text(str_query) if not is_dask and isinstance(str_query, str) else format_sql_query_for_dask(str_query) ) chunk_list = [] chunk_hook_results = [] try: stream_results = not as_iterator and chunk_hook is not None and chunksize is not None with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'case sensitivity issues') read_sql_query_kwargs = { 'params': params, 'dtype': dtype, 'dtype_backend': dtype_backend, 'index_col': index_col, } if is_dask: if index_col is None: dd = None pd = attempt_import('pandas') read_sql_query_kwargs.update({ 'chunksize': chunksize, }) else: read_sql_query_kwargs.update({ 'chunksize': chunksize, }) if is_dask and dd is not None: ddf = dd.read_sql_query( formatted_query, self.URI, **read_sql_query_kwargs ) else: with self.engine.begin() as transaction: with transaction.execution_options(stream_results=stream_results) as connection: chunk_generator = pd.read_sql_query( formatted_query, connection, params = params, chunksize = chunksize, dtype = dtype, ) ### `stream_results` must be False (will load everything into memory). if as_iterator or chunksize is None: return chunk_generator ### We must consume the generator in this context if using server-side cursors. if stream_results: pool = get_pool(workers=workers) def _process_chunk(_chunk, _retry_on_failure: bool = True): if not as_hook_results: chunk_list.append(_chunk) result = None if chunk_hook is not None: try: result = chunk_hook( _chunk, workers = workers, chunksize = chunksize, debug = debug, **kw ) except Exception as e: result = False, traceback.format_exc() from meerschaum.utils.formatting import get_console get_console().print_exception() ### If the chunk fails to process, try it again one more time. if isinstance(result, tuple) and result[0] is False: if _retry_on_failure: return _process_chunk(_chunk, _retry_on_failure=False) return result chunk_hook_results = list(pool.imap(_process_chunk, chunk_generator)) if as_hook_results: return chunk_hook_results except Exception as e: if debug: dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") if not silent: warn(str(e), stacklevel=3) from meerschaum.utils.formatting import get_console get_console().print_exception() return None if is_dask and dd is not None: ddf = ddf.reset_index() return ddf chunk_list = [] read_chunks = 0 chunk_hook_results = [] if chunksize is None: chunk_list.append(chunk_generator) elif as_iterator: return chunk_generator else: try: for chunk in chunk_generator: if chunk_hook is not None: chunk_hook_results.append( chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) ) chunk_list.append(chunk) read_chunks += 1 if chunks is not None and read_chunks >= chunks: break except Exception as e: warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) from meerschaum.utils.formatting import get_console get_console().print_exception() read_chunks = 0 try: for chunk in chunk_generator: if chunk_hook is not None: chunk_hook_results.append( chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) ) chunk_list.append(chunk) read_chunks += 1 if chunks is not None and read_chunks >= chunks: break except Exception as e: warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) from meerschaum.utils.formatting import get_console get_console().print_exception() return None ### If no chunks returned, read without chunks ### to get columns if len(chunk_list) == 0: with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'case sensitivity issues') with self.engine.begin() as connection: chunk_list.append( pd.read_sql_query( formatted_query, connection, params = params, dtype = dtype, ) ) ### call the hook on any missed chunks. if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): for c in chunk_list[len(chunk_hook_results):]: chunk_hook_results.append( chunk_hook(c, chunksize=chunksize, debug=debug, **kw) ) ### chunksize is not None so must iterate if debug: end = time.perf_counter() dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") if as_hook_results: return chunk_hook_results ### Skip `pd.concat()` if `as_chunks` is specified. if as_chunks: for c in chunk_list: c.reset_index(drop=True, inplace=True) return chunk_list return pd.concat(chunk_list).reset_index(drop=True)
def register_pipe(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Tuple[bool, str]
-
Register a new pipe. A pipe's attributes must be set before registering.
Expand source code
def register_pipe( self, pipe: mrsm.Pipe, debug: bool = False, ) -> SuccessTuple: """ Register a new pipe. A pipe's attributes must be set before registering. """ from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import from meerschaum.utils.sql import json_flavors ### ensure pipes table exists from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] if pipe.get_id(debug=debug) is not None: return False, f"{pipe} is already registered." ### NOTE: if `parameters` is supplied in the Pipe constructor, ### then `pipe.parameters` will exist and not be fetched from the database. ### 1. Prioritize the Pipe object's `parameters` first. ### E.g. if the user manually sets the `parameters` property ### or if the Pipe already exists ### (which shouldn't be able to be registered anyway but that's an issue for later). parameters = None try: parameters = pipe.parameters except Exception as e: if debug: dprint(str(e)) parameters = None ### ensure `parameters` is a dictionary if parameters is None: parameters = {} import json sqlalchemy = attempt_import('sqlalchemy') values = { 'connector_keys' : pipe.connector_keys, 'metric_key' : pipe.metric_key, 'location_key' : pipe.location_key, 'parameters' : ( json.dumps(parameters) if self.flavor not in json_flavors else parameters ), } query = sqlalchemy.insert(pipes_tbl).values(**values) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to register {pipe}." return True, f"Successfully registered {pipe}."
def register_plugin(self, plugin: "'meerschaum.core.Plugin'", force: bool = False, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Register a new plugin to the plugins table.
Expand source code
def register_plugin( self, plugin: 'meerschaum.core.Plugin', force: bool = False, debug: bool = False, **kw: Any ) -> SuccessTuple: """Register a new plugin to the plugins table.""" from meerschaum.utils.warnings import warn, error from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.utils.sql import json_flavors from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] old_id = self.get_plugin_id(plugin, debug=debug) ### Check for version conflict. May be overridden with `--force`. if old_id is not None and not force: old_version = self.get_plugin_version(plugin, debug=debug) new_version = plugin.version if old_version is None: old_version = '' if new_version is None: new_version = '' ### verify that the new version is greater than the old packaging_version = attempt_import('packaging.version') if ( old_version and new_version and packaging_version.parse(old_version) >= packaging_version.parse(new_version) ): return False, ( f"Version '{new_version}' of plugin '{plugin}' " + f"must be greater than existing version '{old_version}'." ) import json bind_variables = { 'plugin_name' : plugin.name, 'version' : plugin.version, 'attributes' : ( json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes ), 'user_id' : plugin.user_id, } if old_id is None: query = sqlalchemy.insert(plugins_tbl).values(**bind_variables) else: query = ( sqlalchemy.update(plugins_tbl) .values(**bind_variables) .where(plugins_tbl.c.plugin_id == old_id) ) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to register plugin '{plugin}'." return True, f"Successfully registered plugin '{plugin}'."
def register_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Register a new user.
Expand source code
def register_user( self, user: meerschaum.core.User, debug: bool = False, **kw: Any ) -> SuccessTuple: """Register a new user.""" from meerschaum.utils.warnings import warn, error, info from meerschaum.utils.packages import attempt_import from meerschaum.utils.sql import json_flavors sqlalchemy = attempt_import('sqlalchemy') valid_tuple = valid_username(user.username) if not valid_tuple[0]: return valid_tuple old_id = self.get_user_id(user, debug=debug) if old_id is not None: return False, f"User '{user}' already exists." ### ensure users table exists from meerschaum.connectors.sql.tables import get_tables tables = get_tables(mrsm_instance=self, debug=debug) import json bind_variables = { 'username' : user.username, 'email' : user.email, 'password_hash' : user.password_hash, 'user_type' : user.type, 'attributes' : ( json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes ), } if old_id is not None: return False, f"User '{user.username}' already exists." if old_id is None: query = ( sqlalchemy.insert(tables['users']). values(**bind_variables) ) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to register user '{user}'." return True, f"Successfully registered user '{user}'."
def sync_pipe(self, pipe: mrsm.Pipe, df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None, begin: Optional[datetime] = None, end: Optional[datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Sync a pipe using a database connection.
Parameters
pipe
:mrsm.Pipe
- The Meerschaum Pipe instance into which to sync the data.
df
:Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
- An optional DataFrame or equivalent to sync into the pipe.
Defaults to
None
. begin
:Optional[datetime]
, defaultNone
- Optionally specify the earliest datetime to search for data.
Defaults to
None
. end
:Optional[datetime]
, defaultNone
- Optionally specify the latest datetime to search for data.
Defaults to
None
. chunksize
:Optional[int]
, default-1
- Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. check_existing
:bool
, defaultTrue
- If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. blocking
:bool
, defaultTrue
- If
True
, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults toTrue
. debug
:bool
, defaultFalse
- Verbosity toggle. Defaults to False.
kw
:Any
- Catch-all for keyword arguments.
Returns
A
SuccessTuple
of success (bool
) and message (str
).Expand source code
def sync_pipe( self, pipe: mrsm.Pipe, df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None, begin: Optional[datetime] = None, end: Optional[datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, **kw: Any ) -> SuccessTuple: """ Sync a pipe using a database connection. Parameters ---------- pipe: mrsm.Pipe The Meerschaum Pipe instance into which to sync the data. df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]] An optional DataFrame or equivalent to sync into the pipe. Defaults to `None`. begin: Optional[datetime], default None Optionally specify the earliest datetime to search for data. Defaults to `None`. end: Optional[datetime], default None Optionally specify the latest datetime to search for data. Defaults to `None`. chunksize: Optional[int], default -1 Specify the number of rows to sync per chunk. If `-1`, resort to system configuration (default is `900`). A `chunksize` of `None` will sync all rows in one transaction. Defaults to `-1`. check_existing: bool, default True If `True`, pull and diff with existing data from the pipe. Defaults to `True`. blocking: bool, default True If `True`, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults to `True`. debug: bool, default False Verbosity toggle. Defaults to False. kw: Any Catch-all for keyword arguments. Returns ------- A `SuccessTuple` of success (`bool`) and message (`str`). """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint from meerschaum.utils.packages import import_pandas from meerschaum.utils.sql import get_update_queries, sql_item_name, json_flavors from meerschaum.utils.misc import generate_password from meerschaum.utils.dataframe import get_json_cols from meerschaum.utils.dtypes import are_dtypes_equal from meerschaum import Pipe import time import copy pd = import_pandas() if df is None: msg = f"DataFrame is None. Cannot sync {pipe}." warn(msg) return False, msg start = time.perf_counter() if not pipe.temporary and not pipe.get_id(debug=debug): register_tuple = pipe.register(debug=debug) if not register_tuple[0]: return register_tuple ### df is the dataframe returned from the remote source ### via the connector if debug: dprint("Fetched data:\n" + str(df)) if not isinstance(df, pd.DataFrame): df = pipe.enforce_dtypes(df, chunksize=chunksize, debug=debug) ### if table does not exist, create it with indices is_new = False add_cols_query = None if not pipe.exists(debug=debug): check_existing = False is_new = True else: ### Check for new columns. add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug) if add_cols_queries: if not self.exec_queries(add_cols_queries, debug=debug): warn(f"Failed to add new columns to {pipe}.") alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug) if alter_cols_queries: if not self.exec_queries(alter_cols_queries, debug=debug): warn(f"Failed to alter columns for {pipe}.") else: _ = pipe.infer_dtypes(persist=True) ### NOTE: Oracle SQL < 23c (2023) and SQLite does not support booleans, ### so infer bools and persist them to `dtypes`. ### MSSQL supports `BIT` for booleans, but we coerce bools to int for MSSQL ### to avoid merge issues. if self.flavor in ('oracle', 'sqlite', 'mssql'): pipe_dtypes = pipe.dtypes new_bool_cols = { col: 'bool[pyarrow]' for col, typ in df.dtypes.items() if col not in pipe_dtypes and are_dtypes_equal(str(typ), 'bool') } pipe_dtypes.update(new_bool_cols) pipe.dtypes = pipe_dtypes if not pipe.temporary: infer_bool_success, infer_bool_msg = pipe.edit(debug=debug) if not infer_bool_success: return infer_bool_success, infer_bool_msg unseen_df, update_df, delta_df = ( pipe.filter_existing( df, chunksize = chunksize, debug = debug, **kw ) if check_existing else (df, None, df) ) if debug: dprint("Delta data:\n" + str(delta_df)) dprint("Unseen data:\n" + str(unseen_df)) if update_df is not None: dprint("Update data:\n" + str(update_df)) if update_df is not None and not len(update_df) == 0: transact_id = generate_password(3) temp_target = '_' + transact_id + '_' + pipe.target update_kw = copy.deepcopy(kw) update_kw.update({ 'name': temp_target, 'if_exists': 'append', 'chunksize': chunksize, 'dtype': self.get_to_sql_dtype(pipe, update_df, update_dtypes=False), 'debug': debug, }) self.to_sql(update_df, **update_kw) temp_pipe = Pipe( pipe.connector_keys + '_', pipe.metric_key, pipe.location_key, instance = pipe.instance_keys, columns = pipe.columns, target = temp_target, temporary = True, ) existing_cols = pipe.get_columns_types(debug=debug) join_cols = [ col for col_key, col in pipe.columns.items() if col and col_key != 'value' and col in existing_cols ] queries = get_update_queries( pipe.target, temp_target, self, join_cols, debug = debug ) success = all(self.exec_queries(queries, break_on_error=True, debug=debug)) drop_success, drop_msg = temp_pipe.drop(debug=debug) if not drop_success: warn(drop_msg) if not success: return False, f"Failed to apply update to {pipe}." if_exists = kw.get('if_exists', 'append') if 'if_exists' in kw: kw.pop('if_exists') if 'name' in kw: kw.pop('name') ### Account for first-time syncs of JSON columns. unseen_json_cols = get_json_cols(unseen_df) update_json_cols = get_json_cols(update_df) if update_df is not None else [] json_cols = list(set(unseen_json_cols + update_json_cols)) existing_json_cols = [col for col, typ in pipe.dtypes.items() if typ == 'json'] new_json_cols = [col for col in json_cols if col not in existing_json_cols] if new_json_cols: pipe.dtypes.update({col: 'json' for col in json_cols}) if not pipe.temporary: edit_success, edit_msg = pipe.edit(interactive=False, debug=debug) if not edit_success: warn(f"Unable to update JSON dtypes for {pipe}:\n{edit_msg}") ### Insert new data into Pipe's table. unseen_kw = copy.deepcopy(kw) unseen_kw.update({ 'name': pipe.target, 'if_exists': if_exists, 'debug': debug, 'as_dict': True, 'chunksize': chunksize, 'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True), }) stats = self.to_sql(unseen_df, **unseen_kw) if is_new: if not self.create_indices(pipe, debug=debug): if debug: dprint(f"Failed to create indices for {pipe}. Continuing...") end = time.perf_counter() success = stats['success'] if not success: return success, stats['msg'] msg = ( f"Inserted {len(unseen_df.index)}, " + f"updated {len(update_df.index) if update_df is not None else 0} rows." ) if debug: msg = msg[:-1] + ( f"\non table {sql_item_name(pipe.target, self.flavor)}\n" + f"in {round(end-start, 2)} seconds." ) return success, msg
def sync_pipe_inplace(self, pipe: "'mrsm.Pipe'", params: Optional[Dict[str, Any]] = None, begin: Optional[datetime] = None, end: Optional[datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]
-
If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.
Parameters
pipe
:mrsm.Pipe
- The pipe whose connector is the same as its instance.
params
:Optional[Dict[str, Any]]
, defaultNone
- Optional params dictionary to build the
WHERE
clause. Seebuild_where()
. begin
:Optional[datetime]
, defaultNone
- Optionally specify the earliest datetime to search for data.
Defaults to
None
. end
:Optional[datetime]
, defaultNone
- Optionally specify the latest datetime to search for data.
Defaults to
None
. chunksize
:Optional[int]
, default-1
- Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. check_existing
:bool
, defaultTrue
- If
True
, pull and diff with existing data from the pipe. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A SuccessTuple.
Expand source code
def sync_pipe_inplace( self, pipe: 'mrsm.Pipe', params: Optional[Dict[str, Any]] = None, begin: Optional[datetime] = None, end: Optional[datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any ) -> SuccessTuple: """ If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas. Parameters ---------- pipe: mrsm.Pipe The pipe whose connector is the same as its instance. params: Optional[Dict[str, Any]], default None Optional params dictionary to build the `WHERE` clause. See `meerschaum.utils.sql.build_where`. begin: Optional[datetime], default None Optionally specify the earliest datetime to search for data. Defaults to `None`. end: Optional[datetime], default None Optionally specify the latest datetime to search for data. Defaults to `None`. chunksize: Optional[int], default -1 Specify the number of rows to sync per chunk. If `-1`, resort to system configuration (default is `900`). A `chunksize` of `None` will sync all rows in one transaction. Defaults to `-1`. check_existing: bool, default True If `True`, pull and diff with existing data from the pipe. debug: bool, default False Verbosity toggle. Returns ------- A SuccessTuple. """ from meerschaum.utils.sql import ( sql_item_name, table_exists, get_sqlalchemy_table, get_update_queries, get_null_replacement, NO_CTE_FLAVORS, NO_SELECT_INTO_FLAVORS, format_cte_subquery, get_create_table_query, ) from meerschaum.utils.dtypes.sql import ( get_pd_type_from_db_type, ) from meerschaum.utils.misc import generate_password from meerschaum.utils.debug import dprint metadef = self.get_pipe_metadef( pipe, params = params, begin = begin, end = end, check_existing = check_existing, debug = debug, ) metadef_name = sql_item_name('metadef', self.flavor) pipe_name = sql_item_name(pipe.target, self.flavor) if not pipe.exists(debug=debug): create_pipe_query = get_create_table_query(metadef, pipe.target, self.flavor) result = self.exec(create_pipe_query, debug=debug) if result is None: return False, f"Could not insert new data into {pipe} from its SQL query definition." if not self.create_indices(pipe, debug=debug): if debug: dprint(f"Failed to create indices for {pipe}. Continuing...") rowcount = pipe.get_rowcount(debug=debug) return True, f"Inserted {rowcount}, updated 0 rows." ### Generate names for the tables. transact_id = generate_password(3) def get_temp_table_name(label: str) -> str: return '_' + transact_id + '_' + label + '_' + pipe.target backtrack_table_raw = get_temp_table_name('backtrack') backtrack_table_name = sql_item_name(backtrack_table_raw, self.flavor) new_table_raw = get_temp_table_name('new') new_table_name = sql_item_name(new_table_raw, self.flavor) delta_table_raw = get_temp_table_name('delta') delta_table_name = sql_item_name(delta_table_raw, self.flavor) joined_table_raw = get_temp_table_name('joined') joined_table_name = sql_item_name(joined_table_raw, self.flavor) unseen_table_raw = get_temp_table_name('unseen') unseen_table_name = sql_item_name(unseen_table_raw, self.flavor) update_table_raw = get_temp_table_name('update') update_table_name = sql_item_name(update_table_raw, self.flavor) metadef_name = sql_item_name('metadef', self.flavor) new_queries = [] drop_new_query = f"DROP TABLE {new_table_name}" if table_exists(new_table_raw, self, debug=debug): new_queries.append(drop_new_query) create_new_query = get_create_table_query(metadef, new_table_raw, self.flavor) new_queries.append(create_new_query) new_success = all(self.exec_queries(new_queries, break_on_error=True, debug=debug)) if not new_success: self.exec_queries([drop_new_query], break_on_error=False, debug=debug) return False, f"Could not fetch new data for {pipe}." new_table_obj = get_sqlalchemy_table( new_table_raw, connector = self, refresh = True, debug = debug, ) new_cols = { str(col.name): get_pd_type_from_db_type(str(col.type)) for col in new_table_obj.columns } add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug) if add_cols_queries: if not self.exec_queries(add_cols_queries, debug=debug): warn(f"Failed to add new columns to {pipe}.") alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug) if alter_cols_queries: if not self.exec_queries(alter_cols_queries, debug=debug): warn(f"Failed to alter columns for {pipe}.") else: _ = pipe.infer_dtypes(persist=True) if not check_existing: new_count = self.value(f"SELECT COUNT(*) FROM {new_table_name}", debug=debug) insert_queries = [ ( f"INSERT INTO {pipe_name}\n" + f"SELECT *\nFROM {new_table_name}" ), f"DROP TABLE {new_table_name}" ] if not self.exec_queries(insert_queries, debug=debug, break_on_error=False): return False, f"Failed to insert into rows into {pipe}." return True, f"Inserted {new_count}, updated 0 rows." backtrack_queries = [] drop_backtrack_query = f"DROP TABLE {backtrack_table_name}" if table_exists(backtrack_table_raw, self, debug=debug): backtrack_queries.append(drop_backtrack_query) backtrack_def = self.get_pipe_data_query( pipe, begin = begin, end = end, begin_add_minutes = 0, end_add_minutes = 1, params = params, debug = debug, order = None, ) select_backtrack_query = format_cte_subquery( backtrack_def, self.flavor, sub_name = 'backtrack_def', ) create_backtrack_query = get_create_table_query( backtrack_def, backtrack_table_raw, self.flavor, ) backtrack_queries.append(create_backtrack_query) backtrack_success = all(self.exec_queries(backtrack_queries, break_on_error=True, debug=debug)) if not backtrack_success: self.exec_queries([drop_new_query, drop_backtrack_query], break_on_error=False, debug=debug) return False, f"Could not fetch backtrack data from {pipe}." ### Determine which index columns are present in both tables. backtrack_table_obj = get_sqlalchemy_table( backtrack_table_raw, connector = self, refresh = True, debug = debug, ) backtrack_cols = {str(col.name): str(col.type) for col in backtrack_table_obj.columns} common_cols = [col for col in new_cols if col in backtrack_cols] on_cols = { col: new_cols.get(col, 'object') for col_key, col in pipe.columns.items() if ( col and col_key != 'value' and col in backtrack_cols and col in new_cols ) } delta_queries = [] drop_delta_query = f"DROP TABLE {delta_table_name}" if table_exists(delta_table_raw, self, debug=debug): delta_queries.append(drop_delta_query) null_replace_new_cols_str = ( ', '.join([ f"COALESCE({new_table_name}.{sql_item_name(col, self.flavor)}, " + f"{get_null_replacement(typ, self.flavor)}) AS " + sql_item_name(col, self.flavor) for col, typ in new_cols.items() ]) ) select_delta_query = ( f"SELECT\n" + null_replace_new_cols_str + "\n" + f"\nFROM {new_table_name}\n" + f"LEFT OUTER JOIN {backtrack_table_name}\nON\n" + '\nAND\n'.join([ ( f'COALESCE({new_table_name}.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(new_cols[c], self.flavor) + ") " + ' = ' + f'COALESCE({backtrack_table_name}.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(backtrack_cols[c], self.flavor) + ") " ) for c in common_cols ]) + "\nWHERE\n" + '\nAND\n'.join([ ( f'{backtrack_table_name}.' + sql_item_name(c, self.flavor) + ' IS NULL' ) for c in common_cols ]) ) create_delta_query = get_create_table_query(select_delta_query, delta_table_raw, self.flavor) delta_queries.append(create_delta_query) delta_success = all(self.exec_queries(delta_queries, break_on_error=True, debug=debug)) if not delta_success: self.exec_queries( [ drop_new_query, drop_backtrack_query, drop_delta_query, ], break_on_error = False, debug = debug, ) return False, f"Could not filter data for {pipe}." delta_table_obj = get_sqlalchemy_table( delta_table_raw, connector = self, refresh = True, debug = debug, ) delta_cols = { str(col.name): get_pd_type_from_db_type(str(col.type)) for col in delta_table_obj.columns } joined_queries = [] drop_joined_query = f"DROP TABLE {joined_table_name}" if on_cols and table_exists(joined_table_raw, self, debug=debug): joined_queries.append(drop_joined_query) select_joined_query = ( "SELECT " + (', '.join([ ( f'{delta_table_name}.' + sql_item_name(c, self.flavor) + " AS " + sql_item_name(c + '_delta', self.flavor) ) for c in delta_cols ])) + ", " + (', '.join([ ( f'{backtrack_table_name}.' + sql_item_name(c, self.flavor) + " AS " + sql_item_name(c + '_backtrack', self.flavor) ) for c in backtrack_cols ])) + f"\nFROM {delta_table_name}\n" + f"LEFT OUTER JOIN {backtrack_table_name}\nON\n" + '\nAND\n'.join([ ( f'COALESCE({delta_table_name}.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(typ, self.flavor) + ")" + ' = ' + f'COALESCE({backtrack_table_name}.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(typ, self.flavor) + ")" ) for c, typ in on_cols.items() ]) ) create_joined_query = get_create_table_query(select_joined_query, joined_table_raw, self.flavor) joined_queries.append(create_joined_query) joined_success = ( all(self.exec_queries(joined_queries, break_on_error=True, debug=debug)) if on_cols else True ) if not joined_success: self.exec_queries( [ drop_new_query, drop_backtrack_query, drop_delta_query, drop_joined_query, ], break_on_error = False, debug = debug, ) return False, f"Could not separate new and updated data for {pipe}." unseen_queries = [] drop_unseen_query = f"DROP TABLE {unseen_table_name}" if on_cols and table_exists(unseen_table_raw, self, debug=debug): unseen_queries.append(drop_unseen_query) select_unseen_query = ( "SELECT " + (', '.join([ ( "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor) + " != " + get_null_replacement(typ, self.flavor) + " THEN " + sql_item_name(c + '_delta', self.flavor) + "\n ELSE NULL\nEND " + " AS " + sql_item_name(c, self.flavor) ) for c, typ in delta_cols.items() ])) + f"\nFROM {joined_table_name}\n" + f"WHERE " + '\nAND\n'.join([ ( sql_item_name(c + '_backtrack', self.flavor) + ' IS NULL' ) for c in delta_cols ]) ) create_unseen_query = get_create_table_query(select_unseen_query, unseen_table_raw, self.flavor) unseen_queries.append(create_unseen_query) unseen_success = ( all(self.exec_queries(unseen_queries, break_on_error=True, debug=debug)) if on_cols else True ) if not unseen_success: self.exec_queries( [ drop_new_query, drop_backtrack_query, drop_delta_query, drop_joined_query, drop_unseen_query, ], break_on_error = False, debug = debug, ) return False, f"Could not determine new data for {pipe}." unseen_count = self.value( ( "SELECT COUNT(*) FROM " + (unseen_table_name if on_cols else delta_table_name) ), debug = debug, ) update_queries = [] drop_update_query = f"DROP TABLE {update_table_name}" if on_cols and table_exists(update_table_raw, self, debug=debug): update_queries.append(drop_unseen_query) select_update_query = ( "SELECT " + (', '.join([ ( "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor) + " != " + get_null_replacement(typ, self.flavor) + " THEN " + sql_item_name(c + '_delta', self.flavor) + "\n ELSE NULL\nEND " + " AS " + sql_item_name(c, self.flavor) ) for c, typ in delta_cols.items() ])) + f"\nFROM {joined_table_name}\n" + f"WHERE " + '\nOR\n'.join([ ( sql_item_name(c + '_backtrack', self.flavor) + ' IS NOT NULL' ) for c in delta_cols ]) ) create_update_query = get_create_table_query(select_update_query, update_table_raw, self.flavor) update_queries.append(create_update_query) update_success = ( all(self.exec_queries(update_queries, break_on_error=True, debug=debug)) if on_cols else True ) if not update_success: self.exec_queries( [ drop_new_query, drop_backtrack_query, drop_delta_query, drop_joined_query, drop_unseen_query, drop_update_query, ], break_on_error = False, debug = debug, ) return False, "Could not determine updated data for {pipe}." update_count = ( self.value(f"SELECT COUNT(*) FROM {update_table_name}", debug=debug) if on_cols else 0 ) apply_update_queries = ( get_update_queries( pipe.target, update_table_raw, self, on_cols, debug = debug ) if on_cols else [] ) apply_unseen_queries = [ ( f"INSERT INTO {pipe_name}\n" + f"SELECT *\nFROM " + ( unseen_table_name if on_cols else delta_table_name ) ), ] apply_queries = ( (apply_unseen_queries if unseen_count > 0 else []) + (apply_update_queries if update_count > 0 else []) + [ drop_new_query, drop_backtrack_query, drop_delta_query, ] + ( [ drop_joined_query, drop_unseen_query, drop_update_query, ] if on_cols else [] ) ) success = all(self.exec_queries(apply_queries, break_on_error=False, debug=debug)) msg = ( f"Was not able to apply changes to {pipe}." if not success else f"Inserted {unseen_count}, updated {update_count} rows." ) return success, msg
def test_connection(self, **kw: Any) ‑> Optional[bool]
-
Test if a successful connection to the database may be made.
Parameters
**kw: The keyword arguments are passed to
retry_connect()
.Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.Expand source code
def test_connection( self, **kw: Any ) -> Union[bool, None]: """ Test if a successful connection to the database may be made. Parameters ---------- **kw: The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. Returns ------- `True` if a connection is made, otherwise `False` or `None` in case of failure. """ import warnings from meerschaum.connectors.poll import retry_connect _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} _default_kw.update(kw) with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'Could not') try: return retry_connect(**_default_kw) except Exception as e: return False
def to_sql(self, df: pandas.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, **kw) ‑> Union[bool, SuccessTuple]
-
Upload a DataFrame's contents to the SQL server.
Parameters
df
:pd.DataFrame
- The DataFrame to be uploaded.
name
:str
- The name of the table to be created.
index
:bool
, defaultFalse
- If True, creates the DataFrame's indices as columns.
if_exists
:str
, default'replace'
- Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
method
:str
, default''
- None or multi. Details on pandas.to_sql.
as_tuple
:bool
, defaultFalse
- If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. as_dict
:bool
, defaultFalse
- If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. kw
:Any
- Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
Either a
bool
or aSuccessTuple
(depends onas_tuple
).Expand source code
def to_sql( self, df: pandas.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = "", chunksize: Optional[int] = -1, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, **kw ) -> Union[bool, SuccessTuple]: """ Upload a DataFrame's contents to the SQL server. Parameters ---------- df: pd.DataFrame The DataFrame to be uploaded. name: str The name of the table to be created. index: bool, default False If True, creates the DataFrame's indices as columns. if_exists: str, default 'replace' Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail']. method: str, default '' None or multi. Details on pandas.to_sql. as_tuple: bool, default False If `True`, return a (success_bool, message) tuple instead of a `bool`. Defaults to `False`. as_dict: bool, default False If `True`, return a dictionary of transaction information. The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, `method`, and `target`. kw: Any Additional arguments will be passed to the DataFrame's `to_sql` function Returns ------- Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). """ import time import json from meerschaum.utils.warnings import error, warn import warnings import functools if name is None: error(f"Name must not be `None` to insert data into {self}.") ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. kw.pop('name', None) from meerschaum.utils.sql import sql_item_name, table_exists, json_flavors, truncate_item_name from meerschaum.utils.dataframe import get_json_cols from meerschaum.utils.dtypes import are_dtypes_equal from meerschaum.connectors.sql._create_engine import flavor_configs from meerschaum.utils.packages import attempt_import, import_pandas sqlalchemy = attempt_import('sqlalchemy', debug=debug) pd = import_pandas() is_dask = 'dask' in df.__module__ stats = {'target': name, } ### resort to defaults if None if method == "": if self.flavor in _bulk_flavors: method = psql_insert_copy else: ### Should resolve to 'multi' or `None`. method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) default_chunksize = self._sys_config.get('chunksize', None) chunksize = chunksize if chunksize != -1 else default_chunksize if chunksize is not None and self.flavor in _max_chunks_flavors: if chunksize > _max_chunks_flavors[self.flavor]: if chunksize != default_chunksize: warn( f"The specified chunksize of {chunksize} exceeds the maximum of " + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", stacklevel = 3, ) chunksize = _max_chunks_flavors[self.flavor] stats['chunksize'] = chunksize success, msg = False, "Default to_sql message" start = time.perf_counter() if debug: msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." print(msg, end="", flush=True) stats['num_rows'] = len(df) ### Check if the name is too long. truncated_name = truncate_item_name(name, self.flavor) if name != truncated_name: warn( f"Table '{name}' is too long for '{self.flavor}'," + f" will instead create the table '{truncated_name}'." ) ### filter out non-pandas args import inspect to_sql_params = inspect.signature(df.to_sql).parameters to_sql_kw = {} for k, v in kw.items(): if k in to_sql_params: to_sql_kw[k] = v to_sql_kw.update({ 'name': truncated_name, ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 'index': index, 'if_exists': if_exists, 'method': method, 'chunksize': chunksize, }) if is_dask: to_sql_kw.update({ 'parallel': True, }) if self.flavor == 'oracle': ### For some reason 'replace' doesn't work properly in pandas, ### so try dropping first. if if_exists == 'replace' and table_exists(name, self, debug=debug): success = self.exec("DROP TABLE " + sql_item_name(name, 'oracle')) is not None if not success: warn(f"Unable to drop {name}") ### Enforce NVARCHAR(2000) as text instead of CLOB. dtype = to_sql_kw.get('dtype', {}) for col, typ in df.dtypes.items(): if are_dtypes_equal(str(typ), 'object'): dtype[col] = sqlalchemy.types.NVARCHAR(2000) elif are_dtypes_equal(str(typ), 'int'): dtype[col] = sqlalchemy.types.INTEGER to_sql_kw['dtype'] = dtype elif self.flavor == 'mssql': dtype = to_sql_kw.get('dtype', {}) for col, typ in df.dtypes.items(): if are_dtypes_equal(str(typ), 'bool'): dtype[col] = sqlalchemy.types.INTEGER to_sql_kw['dtype'] = dtype ### Check for JSON columns. if self.flavor not in json_flavors: json_cols = get_json_cols(df) if json_cols: for col in json_cols: df[col] = df[col].apply( ( lambda x: json.dumps(x, default=str, sort_keys=True) if not isinstance(x, Hashable) else x ) ) try: with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'case sensitivity issues') df.to_sql(**to_sql_kw) success = True except Exception as e: if not silent: warn(str(e)) success, msg = False, str(e) end = time.perf_counter() if success: msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}." stats['start'] = start stats['end'] = end stats['duration'] = end - start if debug: print(f" done.", flush=True) dprint(msg) stats['success'] = success stats['msg'] = msg if as_tuple: return success, msg if as_dict: return stats return success
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) ‑> Any
-
Execute the provided query and return the first value.
Parameters
query
:str
- The SQL query to execute.
*args
:Any
- The arguments passed to
meerschaum.connectors.sql.SQLConnector.exec
ifuse_pandas
isFalse
(default) or tomeerschaum.connectors.sql.SQLConnector.read
. use_pandas
:bool
, defaultFalse
- If
True
, useread()
, otherwise usemeerschaum.connectors.sql.SQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. **kw
:Any
- See
args
.
Returns
Any value returned from the query.
Expand source code
def value( self, query: str, *args: Any, use_pandas: bool = False, **kw: Any ) -> Any: """ Execute the provided query and return the first value. Parameters ---------- query: str The SQL query to execute. *args: Any The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. use_pandas: bool, default False If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use `meerschaum.connectors.sql.SQLConnector.exec` (default). **NOTE:** This is always `True` for DuckDB. **kw: Any See `args`. Returns ------- Any value returned from the query. """ from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if self.flavor == 'duckdb': use_pandas = True if use_pandas: try: return self.read(query, *args, **kw).iloc[0, 0] except Exception as e: # import traceback # traceback.print_exc() # warn(e) return None _close = kw.get('close', True) _commit = kw.get('commit', (self.flavor != 'mssql')) try: result, connection = self.exec( query, *args, with_connection=True, close=False, commit=_commit, **kw ) first = result.first() if result is not None else None _val = first[0] if first is not None else None except Exception as e: warn(e, stacklevel=3) return None if _close: try: connection.close() except Exception as e: warn("Failed to close connection with exception:\n" + str(e)) return _val