meerschaum.connectors.sql

Subpackage for SQLConnector subclass

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Subpackage for SQLConnector subclass
 7"""
 8
 9from meerschaum.connectors.sql._SQLConnector import SQLConnector
10
11__all__ = ('SQLConnector',)
class SQLConnector(meerschaum.connectors._Connector.Connector):
 18class SQLConnector(Connector):
 19    """
 20    Connect to SQL databases via `sqlalchemy`.
 21    
 22    SQLConnectors may be used as Meerschaum instance connectors.
 23    Read more about connectors and instances at
 24    https://meerschaum.io/reference/connectors/
 25
 26    """
 27
 28    IS_INSTANCE: bool = True
 29
 30    from ._create_engine import flavor_configs, create_engine
 31    from ._sql import (
 32        read,
 33        value,
 34        exec,
 35        execute,
 36        to_sql,
 37        exec_queries,
 38        get_connection,
 39        _cleanup_connections,
 40    )
 41    from meerschaum.utils.sql import test_connection
 42    from ._fetch import fetch, get_pipe_metadef
 43    from ._cli import cli, _cli_exit
 44    from ._pipes import (
 45        fetch_pipes_keys,
 46        create_indices,
 47        drop_indices,
 48        get_create_index_queries,
 49        get_drop_index_queries,
 50        get_add_columns_queries,
 51        get_alter_columns_queries,
 52        delete_pipe,
 53        get_pipe_data,
 54        get_pipe_data_query,
 55        register_pipe,
 56        edit_pipe,
 57        get_pipe_id,
 58        get_pipe_attributes,
 59        sync_pipe,
 60        sync_pipe_inplace,
 61        get_sync_time,
 62        pipe_exists,
 63        get_pipe_rowcount,
 64        drop_pipe,
 65        clear_pipe,
 66        deduplicate_pipe,
 67        get_pipe_table,
 68        get_pipe_columns_types,
 69        get_to_sql_dtype,
 70        get_pipe_schema,
 71        create_pipe_table_from_df,
 72        get_pipe_columns_indices,
 73        get_temporary_target,
 74        create_pipe_indices,
 75        drop_pipe_indices,
 76        get_pipe_index_names,
 77    )
 78    from ._plugins import (
 79        register_plugin,
 80        delete_plugin,
 81        get_plugin_id,
 82        get_plugin_version,
 83        get_plugins,
 84        get_plugin_user_id,
 85        get_plugin_username,
 86        get_plugin_attributes,
 87    )
 88    from ._users import (
 89        register_user,
 90        get_user_id,
 91        get_users,
 92        edit_user,
 93        delete_user,
 94        get_user_password_hash,
 95        get_user_type,
 96        get_user_attributes,
 97    )
 98    from ._uri import from_uri, parse_uri
 99    from ._instance import (
100        _log_temporary_tables_creation,
101        _drop_temporary_table,
102        _drop_temporary_tables,
103        _drop_old_temporary_tables,
104    )
105
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        flavor: Optional[str] = None,
110        wait: bool = False,
111        connect: bool = False,
112        debug: bool = False,
113        **kw: Any
114    ):
115        """
116        Parameters
117        ----------
118        label: str, default 'main'
119            The identifying label for the connector.
120            E.g. for `sql:main`, 'main' is the label.
121            Defaults to 'main'.
122
123        flavor: Optional[str], default None
124            The database flavor, e.g.
125            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
126            To see supported flavors, run the `bootstrap connectors` command.
127
128        wait: bool, default False
129            If `True`, block until a database connection has been made.
130            Defaults to `False`.
131
132        connect: bool, default False
133            If `True`, immediately attempt to connect the database and raise
134            a warning if the connection fails.
135            Defaults to `False`.
136
137        debug: bool, default False
138            Verbosity toggle.
139            Defaults to `False`.
140
141        kw: Any
142            All other arguments will be passed to the connector's attributes.
143            Therefore, a connector may be made without being registered,
144            as long enough parameters are supplied to the constructor.
145        """
146        if 'uri' in kw:
147            uri = kw['uri']
148            if uri.startswith('postgres') and not uri.startswith('postgresql'):
149                uri = uri.replace('postgres', 'postgresql', 1)
150            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
151                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
152            if uri.startswith('timescaledb://'):
153                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
154                flavor = 'timescaledb'
155            kw['uri'] = uri
156            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
157            label = label or from_uri_params.get('label', None)
158            _ = from_uri_params.pop('label', None)
159
160            ### Sometimes the flavor may be provided with a URI.
161            kw.update(from_uri_params)
162            if flavor:
163                kw['flavor'] = flavor
164
165        ### set __dict__ in base class
166        super().__init__(
167            'sql',
168            label = label or self.__dict__.get('label', None),
169            **kw
170        )
171
172        if self.__dict__.get('flavor', None) == 'sqlite':
173            self._reset_attributes()
174            self._set_attributes(
175                'sql',
176                label = label,
177                inherit_default = False,
178                **kw
179            )
180            ### For backwards compatability reasons, set the path for sql:local if its missing.
181            if self.label == 'local' and not self.__dict__.get('database', None):
182                from meerschaum.config._paths import SQLITE_DB_PATH
183                self.database = str(SQLITE_DB_PATH)
184
185        ### ensure flavor and label are set accordingly
186        if 'flavor' not in self.__dict__:
187            if flavor is None and 'uri' not in self.__dict__:
188                raise Exception(
189                    f"    Missing flavor. Provide flavor as a key for '{self}'."
190                )
191            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
192
193        if self.flavor == 'postgres':
194            self.flavor = 'postgresql'
195
196        self._debug = debug
197        ### Store the PID and thread at initialization
198        ### so we can dispose of the Pool in child processes or threads.
199        import os
200        import threading
201        self._pid = os.getpid()
202        self._thread_ident = threading.current_thread().ident
203        self._sessions = {}
204        self._locks = {'_sessions': threading.RLock(), }
205
206        ### verify the flavor's requirements are met
207        if self.flavor not in self.flavor_configs:
208            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
209        if not self.__dict__.get('uri'):
210            self.verify_attributes(
211                self.flavor_configs[self.flavor].get('requirements', set()),
212                debug=debug,
213            )
214
215        if wait:
216            from meerschaum.connectors.poll import retry_connect
217            retry_connect(connector=self, debug=debug)
218
219        if connect:
220            if not self.test_connection(debug=debug):
221                warn(f"Failed to connect with connector '{self}'!", stack=False)
222
223    @property
224    def Session(self):
225        if '_Session' not in self.__dict__:
226            if self.engine is None:
227                return None
228
229            from meerschaum.utils.packages import attempt_import
230            sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False)
231            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
232            self._Session = sqlalchemy_orm.scoped_session(session_factory)
233
234        return self._Session
235
236    @property
237    def engine(self):
238        """
239        Return the SQLAlchemy engine connected to the configured database.
240        """
241        import os
242        import threading
243        if '_engine' not in self.__dict__:
244            self._engine, self._engine_str = self.create_engine(include_uri=True)
245
246        same_process = os.getpid() == self._pid
247        same_thread = threading.current_thread().ident == self._thread_ident
248
249        ### handle child processes
250        if not same_process:
251            self._pid = os.getpid()
252            self._thread = threading.current_thread()
253            warn("Different PID detected. Disposing of connections...")
254            self._engine.dispose()
255
256        ### handle different threads
257        if not same_thread:
258            if self.flavor == 'duckdb':
259                warn("Different thread detected.")
260                self._engine.dispose()
261
262        return self._engine
263
264    @property
265    def DATABASE_URL(self) -> str:
266        """
267        Return the URI connection string (alias for `SQLConnector.URI`.
268        """
269        _ = self.engine
270        return str(self._engine_str)
271
272    @property
273    def URI(self) -> str:
274        """
275        Return the URI connection string.
276        """
277        _ = self.engine
278        return str(self._engine_str)
279
280    @property
281    def IS_THREAD_SAFE(self) -> str:
282        """
283        Return whether this connector may be multithreaded.
284        """
285        if self.flavor in ('duckdb', 'oracle'):
286            return False
287        if self.flavor == 'sqlite':
288            return ':memory:' not in self.URI
289        return True
290
291    @property
292    def metadata(self):
293        """
294        Return the metadata bound to this configured schema.
295        """
296        from meerschaum.utils.packages import attempt_import
297        sqlalchemy = attempt_import('sqlalchemy', lazy=False)
298        if '_metadata' not in self.__dict__:
299            self._metadata = sqlalchemy.MetaData(schema=self.schema)
300        return self._metadata
301
302    @property
303    def instance_schema(self):
304        """
305        Return the schema name for Meerschaum tables. 
306        """
307        return self.schema
308
309    @property
310    def internal_schema(self):
311        """
312        Return the schema name for internal tables. 
313        """
314        from meerschaum.config.static import STATIC_CONFIG
315        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
316        schema_name = self.__dict__.get('internal_schema', None) or (
317            STATIC_CONFIG['sql']['internal_schema']
318            if self.flavor not in NO_SCHEMA_FLAVORS
319            else self.schema
320        )
321
322        if '_internal_schema' not in self.__dict__:
323            self._internal_schema = schema_name
324        return self._internal_schema
325
326    @property
327    def db(self) -> Optional[databases.Database]:
328        from meerschaum.utils.packages import attempt_import
329        databases = attempt_import('databases', lazy=False, install=True)
330        url = self.DATABASE_URL
331        if 'mysql' in url:
332            url = url.replace('+pymysql', '')
333        if '_db' not in self.__dict__:
334            try:
335                self._db = databases.Database(url)
336            except KeyError:
337                ### Likely encountered an unsupported flavor.
338                from meerschaum.utils.warnings import warn
339                self._db = None
340        return self._db
341
342    @property
343    def db_version(self) -> Union[str, None]:
344        """
345        Return the database version.
346        """
347        _db_version = self.__dict__.get('_db_version', None)
348        if _db_version is not None:
349            return _db_version
350
351        from meerschaum.utils.sql import get_db_version
352        self._db_version = get_db_version(self)
353        return self._db_version
354
355    @property
356    def schema(self) -> Union[str, None]:
357        """
358        Return the default schema to use.
359        A value of `None` will not prepend a schema.
360        """
361        if 'schema' in self.__dict__:
362            return self.__dict__['schema']
363
364        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
365        if self.flavor in NO_SCHEMA_FLAVORS:
366            self.__dict__['schema'] = None
367            return None
368
369        sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
370        _schema = sqlalchemy.inspect(self.engine).default_schema_name
371        self.__dict__['schema'] = _schema
372        return _schema
373
374    def __getstate__(self):
375        return self.__dict__
376
377    def __setstate__(self, d):
378        self.__dict__.update(d)
379
380    def __call__(self):
381        return self

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

SQLConnector( label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        flavor: Optional[str] = None,
110        wait: bool = False,
111        connect: bool = False,
112        debug: bool = False,
113        **kw: Any
114    ):
115        """
116        Parameters
117        ----------
118        label: str, default 'main'
119            The identifying label for the connector.
120            E.g. for `sql:main`, 'main' is the label.
121            Defaults to 'main'.
122
123        flavor: Optional[str], default None
124            The database flavor, e.g.
125            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
126            To see supported flavors, run the `bootstrap connectors` command.
127
128        wait: bool, default False
129            If `True`, block until a database connection has been made.
130            Defaults to `False`.
131
132        connect: bool, default False
133            If `True`, immediately attempt to connect the database and raise
134            a warning if the connection fails.
135            Defaults to `False`.
136
137        debug: bool, default False
138            Verbosity toggle.
139            Defaults to `False`.
140
141        kw: Any
142            All other arguments will be passed to the connector's attributes.
143            Therefore, a connector may be made without being registered,
144            as long enough parameters are supplied to the constructor.
145        """
146        if 'uri' in kw:
147            uri = kw['uri']
148            if uri.startswith('postgres') and not uri.startswith('postgresql'):
149                uri = uri.replace('postgres', 'postgresql', 1)
150            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
151                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
152            if uri.startswith('timescaledb://'):
153                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
154                flavor = 'timescaledb'
155            kw['uri'] = uri
156            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
157            label = label or from_uri_params.get('label', None)
158            _ = from_uri_params.pop('label', None)
159
160            ### Sometimes the flavor may be provided with a URI.
161            kw.update(from_uri_params)
162            if flavor:
163                kw['flavor'] = flavor
164
165        ### set __dict__ in base class
166        super().__init__(
167            'sql',
168            label = label or self.__dict__.get('label', None),
169            **kw
170        )
171
172        if self.__dict__.get('flavor', None) == 'sqlite':
173            self._reset_attributes()
174            self._set_attributes(
175                'sql',
176                label = label,
177                inherit_default = False,
178                **kw
179            )
180            ### For backwards compatability reasons, set the path for sql:local if its missing.
181            if self.label == 'local' and not self.__dict__.get('database', None):
182                from meerschaum.config._paths import SQLITE_DB_PATH
183                self.database = str(SQLITE_DB_PATH)
184
185        ### ensure flavor and label are set accordingly
186        if 'flavor' not in self.__dict__:
187            if flavor is None and 'uri' not in self.__dict__:
188                raise Exception(
189                    f"    Missing flavor. Provide flavor as a key for '{self}'."
190                )
191            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
192
193        if self.flavor == 'postgres':
194            self.flavor = 'postgresql'
195
196        self._debug = debug
197        ### Store the PID and thread at initialization
198        ### so we can dispose of the Pool in child processes or threads.
199        import os
200        import threading
201        self._pid = os.getpid()
202        self._thread_ident = threading.current_thread().ident
203        self._sessions = {}
204        self._locks = {'_sessions': threading.RLock(), }
205
206        ### verify the flavor's requirements are met
207        if self.flavor not in self.flavor_configs:
208            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
209        if not self.__dict__.get('uri'):
210            self.verify_attributes(
211                self.flavor_configs[self.flavor].get('requirements', set()),
212                debug=debug,
213            )
214
215        if wait:
216            from meerschaum.connectors.poll import retry_connect
217            retry_connect(connector=self, debug=debug)
218
219        if connect:
220            if not self.test_connection(debug=debug):
221                warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
  • label (str, default 'main'): The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
  • flavor (Optional[str], default None): The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
  • wait (bool, default False): If True, block until a database connection has been made. Defaults to False.
  • connect (bool, default False): If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
IS_INSTANCE: bool = True
Session
223    @property
224    def Session(self):
225        if '_Session' not in self.__dict__:
226            if self.engine is None:
227                return None
228
229            from meerschaum.utils.packages import attempt_import
230            sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False)
231            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
232            self._Session = sqlalchemy_orm.scoped_session(session_factory)
233
234        return self._Session
engine
236    @property
237    def engine(self):
238        """
239        Return the SQLAlchemy engine connected to the configured database.
240        """
241        import os
242        import threading
243        if '_engine' not in self.__dict__:
244            self._engine, self._engine_str = self.create_engine(include_uri=True)
245
246        same_process = os.getpid() == self._pid
247        same_thread = threading.current_thread().ident == self._thread_ident
248
249        ### handle child processes
250        if not same_process:
251            self._pid = os.getpid()
252            self._thread = threading.current_thread()
253            warn("Different PID detected. Disposing of connections...")
254            self._engine.dispose()
255
256        ### handle different threads
257        if not same_thread:
258            if self.flavor == 'duckdb':
259                warn("Different thread detected.")
260                self._engine.dispose()
261
262        return self._engine

Return the SQLAlchemy engine connected to the configured database.

DATABASE_URL: str
264    @property
265    def DATABASE_URL(self) -> str:
266        """
267        Return the URI connection string (alias for `SQLConnector.URI`.
268        """
269        _ = self.engine
270        return str(self._engine_str)

Return the URI connection string (alias for SQLConnector.URI.

URI: str
272    @property
273    def URI(self) -> str:
274        """
275        Return the URI connection string.
276        """
277        _ = self.engine
278        return str(self._engine_str)

Return the URI connection string.

IS_THREAD_SAFE: str
280    @property
281    def IS_THREAD_SAFE(self) -> str:
282        """
283        Return whether this connector may be multithreaded.
284        """
285        if self.flavor in ('duckdb', 'oracle'):
286            return False
287        if self.flavor == 'sqlite':
288            return ':memory:' not in self.URI
289        return True

Return whether this connector may be multithreaded.

metadata
291    @property
292    def metadata(self):
293        """
294        Return the metadata bound to this configured schema.
295        """
296        from meerschaum.utils.packages import attempt_import
297        sqlalchemy = attempt_import('sqlalchemy', lazy=False)
298        if '_metadata' not in self.__dict__:
299            self._metadata = sqlalchemy.MetaData(schema=self.schema)
300        return self._metadata

Return the metadata bound to this configured schema.

instance_schema
302    @property
303    def instance_schema(self):
304        """
305        Return the schema name for Meerschaum tables. 
306        """
307        return self.schema

Return the schema name for Meerschaum tables.

internal_schema
309    @property
310    def internal_schema(self):
311        """
312        Return the schema name for internal tables. 
313        """
314        from meerschaum.config.static import STATIC_CONFIG
315        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
316        schema_name = self.__dict__.get('internal_schema', None) or (
317            STATIC_CONFIG['sql']['internal_schema']
318            if self.flavor not in NO_SCHEMA_FLAVORS
319            else self.schema
320        )
321
322        if '_internal_schema' not in self.__dict__:
323            self._internal_schema = schema_name
324        return self._internal_schema

Return the schema name for internal tables.

db: 'Optional[databases.Database]'
326    @property
327    def db(self) -> Optional[databases.Database]:
328        from meerschaum.utils.packages import attempt_import
329        databases = attempt_import('databases', lazy=False, install=True)
330        url = self.DATABASE_URL
331        if 'mysql' in url:
332            url = url.replace('+pymysql', '')
333        if '_db' not in self.__dict__:
334            try:
335                self._db = databases.Database(url)
336            except KeyError:
337                ### Likely encountered an unsupported flavor.
338                from meerschaum.utils.warnings import warn
339                self._db = None
340        return self._db
db_version: Optional[str]
342    @property
343    def db_version(self) -> Union[str, None]:
344        """
345        Return the database version.
346        """
347        _db_version = self.__dict__.get('_db_version', None)
348        if _db_version is not None:
349            return _db_version
350
351        from meerschaum.utils.sql import get_db_version
352        self._db_version = get_db_version(self)
353        return self._db_version

Return the database version.

schema: Optional[str]
355    @property
356    def schema(self) -> Union[str, None]:
357        """
358        Return the default schema to use.
359        A value of `None` will not prepend a schema.
360        """
361        if 'schema' in self.__dict__:
362            return self.__dict__['schema']
363
364        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
365        if self.flavor in NO_SCHEMA_FLAVORS:
366            self.__dict__['schema'] = None
367            return None
368
369        sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
370        _schema = sqlalchemy.inspect(self.engine).default_schema_name
371        self.__dict__['schema'] = _schema
372        return _schema

Return the default schema to use. A value of None will not prepend a schema.

flavor_configs = {'timescaledb': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'database', 'host', 'password', 'username'}, 'defaults': {'port': 5432}}, 'postgresql': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'database', 'host', 'password', 'username'}, 'defaults': {'port': 5432}}, 'citus': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'database', 'host', 'password', 'username'}, 'defaults': {'port': 5432}}, 'mssql': {'engine': 'mssql+pyodbc', 'create_engine': {'fast_executemany': True, 'use_insertmanyvalues': False, 'isolation_level': 'AUTOCOMMIT', 'use_setinputsizes': False, 'pool_pre_ping': True, 'ignore_no_transaction_on_rollback': True}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'database', 'host', 'password', 'username'}, 'defaults': {'port': 1433, 'options': 'driver=ODBC Driver 18 for SQL Server&UseFMTONLY=Yes&TrustServerCertificate=yes&Encrypt=no&MARS_Connection=yes'}}, 'mysql': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database', 'host', 'password', 'username'}, 'defaults': {'port': 3306}}, 'mariadb': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database', 'host', 'password', 'username'}, 'defaults': {'port': 3306}}, 'oracle': {'engine': 'oracle+oracledb', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'database', 'host', 'password', 'username'}, 'defaults': {'port': 1521}}, 'sqlite': {'engine': 'sqlite', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database'}, 'defaults': {}}, 'duckdb': {'engine': 'duckdb', 'create_engine': {}, 'omit_create_engine': {'ALL'}, 'to_sql': {'method': 'multi'}, 'requirements': '', 'defaults': {}}, 'cockroachdb': {'engine': 'cockroachdb', 'omit_create_engine': {'method'}, 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'to_sql': {'method': 'multi'}, 'requirements': {'host'}, 'defaults': {'port': 26257, 'database': 'defaultdb', 'username': 'root', 'password': 'admin'}}}
def create_engine( self, include_uri: bool = False, debug: bool = False, **kw) -> 'sqlalchemy.engine.Engine':
181def create_engine(
182    self,
183    include_uri: bool = False,
184    debug: bool = False,
185    **kw
186) -> 'sqlalchemy.engine.Engine':
187    """Create a sqlalchemy engine by building the engine string."""
188    from meerschaum.utils.packages import attempt_import
189    from meerschaum.utils.warnings import error, warn
190    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
191    import urllib
192    import copy
193    ### Install and patch required drivers.
194    if self.flavor in install_flavor_drivers:
195        _ = attempt_import(
196            *install_flavor_drivers[self.flavor],
197            debug=debug,
198            lazy=False,
199            warn=False,
200        )
201        if self.flavor == 'mssql':
202            pyodbc = attempt_import('pyodbc', debug=debug, lazy=False, warn=False)
203            pyodbc.pooling = False
204    if self.flavor in require_patching_flavors:
205        from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution
206        import pathlib
207        for install_name, import_name in require_patching_flavors[self.flavor]:
208            pkg = attempt_import(
209                import_name,
210                debug=debug,
211                lazy=False,
212                warn=False
213            )
214            _monkey_patch_get_distribution(
215                install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm')
216            )
217
218    ### supplement missing values with defaults (e.g. port number)
219    for a, value in flavor_configs[self.flavor]['defaults'].items():
220        if a not in self.__dict__:
221            self.__dict__[a] = value
222
223    ### Verify that everything is in order.
224    if self.flavor not in flavor_configs:
225        error(f"Cannot create a connector with the flavor '{self.flavor}'.")
226
227    _engine = flavor_configs[self.flavor].get('engine', None)
228    _username = self.__dict__.get('username', None)
229    _password = self.__dict__.get('password', None)
230    _host = self.__dict__.get('host', None)
231    _port = self.__dict__.get('port', None)
232    _database = self.__dict__.get('database', None)
233    _options = self.__dict__.get('options', {})
234    if isinstance(_options, str):
235        _options = dict(urllib.parse.parse_qsl(_options))
236    _uri = self.__dict__.get('uri', None)
237
238    ### Handle registering specific dialects (due to installing in virtual environments).
239    if self.flavor in flavor_dialects:
240        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])
241
242    ### self._sys_config was deepcopied and can be updated safely
243    if self.flavor in ("sqlite", "duckdb"):
244        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
245        if 'create_engine' not in self._sys_config:
246            self._sys_config['create_engine'] = {}
247        if 'connect_args' not in self._sys_config['create_engine']:
248            self._sys_config['create_engine']['connect_args'] = {}
249        self._sys_config['create_engine']['connect_args'].update({"check_same_thread": False})
250    else:
251        engine_str = (
252            _engine + "://" + (_username if _username is not None else '') +
253            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
254            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
255            (("/" + _database) if _database is not None else '')
256            + (("?" + urllib.parse.urlencode(_options)) if _options else '')
257        ) if not _uri else _uri
258
259        ### Sometimes the timescaledb:// flavor can slip in.
260        if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri:
261            engine_str = engine_str.replace(f'{self.flavor}', 'postgresql', 1)
262
263    if debug:
264        dprint(
265            (
266                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
267                    if _password is not None else engine_str
268            ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}"
269        )
270
271    _kw_copy = copy.deepcopy(kw)
272
273    ### NOTE: Order of inheritance:
274    ###       1. Defaults
275    ###       2. System configuration
276    ###       3. Connector configuration
277    ###       4. Keyword arguments
278    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
279    def _apply_create_engine_args(update):
280        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
281            _create_engine_args.update(
282                { k: v for k, v in update.items()
283                    if 'omit_create_engine' not in flavor_configs[self.flavor]
284                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
285                }
286            )
287    _apply_create_engine_args(self._sys_config.get('create_engine', {}))
288    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
289    _apply_create_engine_args(_kw_copy)
290
291    try:
292        engine = sqlalchemy.create_engine(
293            engine_str,
294            ### I know this looks confusing, and maybe it's bad code,
295            ### but it's simple. It dynamically parses the config string
296            ### and splits it to separate the class name (QueuePool)
297            ### from the module name (sqlalchemy.pool).
298            poolclass    = getattr(
299                attempt_import(
300                    ".".join(self._sys_config['poolclass'].split('.')[:-1])
301                ),
302                self._sys_config['poolclass'].split('.')[-1]
303            ),
304            echo         = debug,
305            **_create_engine_args
306        )
307    except Exception as e:
308        warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False)
309        engine = None
310
311    if include_uri:
312        return engine, engine_str
313    return engine

Create a sqlalchemy engine by building the engine string.

def read( self, query_or_table: 'Union[str, sqlalchemy.Query]', params: Union[Dict[str, Any], List[str], NoneType] = None, dtype: Optional[Dict[str, Any]] = None, coerce_float: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.core.frame.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, schema: Optional[str] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any) -> 'Union[pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None]':
 28def read(
 29    self,
 30    query_or_table: Union[str, sqlalchemy.Query],
 31    params: Union[Dict[str, Any], List[str], None] = None,
 32    dtype: Optional[Dict[str, Any]] = None,
 33    coerce_float: bool = True,
 34    chunksize: Optional[int] = -1,
 35    workers: Optional[int] = None,
 36    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
 37    as_hook_results: bool = False,
 38    chunks: Optional[int] = None,
 39    schema: Optional[str] = None,
 40    as_chunks: bool = False,
 41    as_iterator: bool = False,
 42    as_dask: bool = False,
 43    index_col: Optional[str] = None,
 44    silent: bool = False,
 45    debug: bool = False,
 46    **kw: Any
 47) -> Union[
 48    pandas.DataFrame,
 49    dask.DataFrame,
 50    List[pandas.DataFrame],
 51    List[Any],
 52    None,
 53]:
 54    """
 55    Read a SQL query or table into a pandas dataframe.
 56
 57    Parameters
 58    ----------
 59    query_or_table: Union[str, sqlalchemy.Query]
 60        The SQL query (sqlalchemy Query or string) or name of the table from which to select.
 61
 62    params: Optional[Dict[str, Any]], default None
 63        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
 64        See the pandas documentation for more information:
 65        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
 66
 67    dtype: Optional[Dict[str, Any]], default None
 68        A dictionary of data types to pass to `pandas.read_sql()`.
 69        See the pandas documentation for more information:
 70        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
 71
 72    chunksize: Optional[int], default -1
 73        How many chunks to read at a time. `None` will read everything in one large chunk.
 74        Defaults to system configuration.
 75
 76        **NOTE:** DuckDB does not allow for chunking.
 77
 78    workers: Optional[int], default None
 79        How many threads to use when consuming the generator.
 80        Only applies if `chunk_hook` is provided.
 81
 82    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
 83        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
 84        See `--sync-chunks` for an example.
 85        **NOTE:** `as_iterator` MUST be False (default).
 86
 87    as_hook_results: bool, default False
 88        If `True`, return a `List` of the outputs of the hook function.
 89        Only applicable if `chunk_hook` is not None.
 90
 91        **NOTE:** `as_iterator` MUST be `False` (default).
 92
 93    chunks: Optional[int], default None
 94        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
 95        return into a single dataframe.
 96        For example, to limit the returned dataframe to 100,000 rows,
 97        you could specify a `chunksize` of `1000` and `chunks` of `100`.
 98
 99    schema: Optional[str], default None
100        If just a table name is provided, optionally specify the table schema.
101        Defaults to `SQLConnector.schema`.
102
103    as_chunks: bool, default False
104        If `True`, return a list of DataFrames.
105        Otherwise return a single DataFrame.
106
107    as_iterator: bool, default False
108        If `True`, return the pandas DataFrame iterator.
109        `chunksize` must not be `None` (falls back to 1000 if so),
110        and hooks are not called in this case.
111
112    index_col: Optional[str], default None
113        If using Dask, use this column as the index column.
114        If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
115
116    silent: bool, default False
117        If `True`, don't raise warnings in case of errors.
118        Defaults to `False`.
119
120    Returns
121    -------
122    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
123    or `None` if something breaks.
124
125    """
126    if chunks is not None and chunks <= 0:
127        return []
128    from meerschaum.utils.sql import sql_item_name, truncate_item_name
129    from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone
130    from meerschaum.utils.dtypes.sql import TIMEZONE_NAIVE_FLAVORS
131    from meerschaum.utils.packages import attempt_import, import_pandas
132    from meerschaum.utils.pool import get_pool
133    from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols
134    import warnings
135    import traceback
136    from decimal import Decimal
137    pd = import_pandas()
138    dd = None
139    is_dask = 'dask' in pd.__name__
140    pandas = attempt_import('pandas')
141    is_dask = dd is not None
142    npartitions = chunksize_to_npartitions(chunksize)
143    if is_dask:
144        chunksize = None
145    schema = schema or self.schema
146    utc_dt_cols = [
147        col
148        for col, typ in dtype.items()
149        if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower()
150    ] if dtype else []
151
152    if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS:
153        dtype = dtype.copy()
154        for col in utc_dt_cols:
155            dtype[col] = 'datetime64[ns]'
156
157    pool = get_pool(workers=workers)
158    sqlalchemy = attempt_import("sqlalchemy", lazy=False)
159    default_chunksize = self._sys_config.get('chunksize', None)
160    chunksize = chunksize if chunksize != -1 else default_chunksize
161    if chunksize is None and as_iterator:
162        if not silent and self.flavor not in _disallow_chunks_flavors:
163            warn(
164                "An iterator may only be generated if chunksize is not None.\n"
165                + "Falling back to a chunksize of 1000.", stacklevel=3,
166            )
167        chunksize = 1000
168    if chunksize is not None and self.flavor in _max_chunks_flavors:
169        if chunksize > _max_chunks_flavors[self.flavor]:
170            if chunksize != default_chunksize:
171                warn(
172                    f"The specified chunksize of {chunksize} exceeds the maximum of "
173                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
174                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
175                    stacklevel=3,
176                )
177            chunksize = _max_chunks_flavors[self.flavor]
178
179    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
180        chunksize = None
181
182    if debug:
183        import time
184        start = time.perf_counter()
185        dprint(f"[{self}]\n{query_or_table}")
186        dprint(f"[{self}] Fetching with chunksize: {chunksize}")
187
188    ### This might be sqlalchemy object or the string of a table name.
189    ### We check for spaces and quotes to see if it might be a weird table.
190    if (
191        ' ' not in str(query_or_table)
192        or (
193            ' ' in str(query_or_table)
194            and str(query_or_table).startswith('"')
195            and str(query_or_table).endswith('"')
196        )
197    ):
198        truncated_table_name = truncate_item_name(str(query_or_table), self.flavor)
199        if truncated_table_name != str(query_or_table) and not silent:
200            warn(
201                f"Table '{query_or_table}' is too long for '{self.flavor}',"
202                + f" will instead read the table '{truncated_table_name}'."
203            )
204
205        query_or_table = sql_item_name(str(query_or_table), self.flavor, schema)
206        if debug:
207            dprint(f"[{self}] Reading from table {query_or_table}")
208        formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table))
209        str_query = f"SELECT * FROM {query_or_table}"
210    else:
211        str_query = query_or_table
212
213    formatted_query = (
214        sqlalchemy.text(str_query)
215        if not is_dask and isinstance(str_query, str)
216        else format_sql_query_for_dask(str_query)
217    )
218
219    chunk_list = []
220    chunk_hook_results = []
221    def _process_chunk(_chunk, _retry_on_failure: bool = True):
222        if self.flavor in TIMEZONE_NAIVE_FLAVORS:
223            for col in utc_dt_cols:
224                _chunk[col] = coerce_timezone(_chunk[col], strip_timezone=False)
225        if not as_hook_results:
226            chunk_list.append(_chunk)
227        if chunk_hook is None:
228            return None
229
230        result = None
231        try:
232            result = chunk_hook(
233                _chunk,
234                workers=workers,
235                chunksize=chunksize,
236                debug=debug,
237                **kw
238            )
239        except Exception:
240            result = False, traceback.format_exc()
241            from meerschaum.utils.formatting import get_console
242            if not silent:
243                get_console().print_exception()
244
245        ### If the chunk fails to process, try it again one more time.
246        if isinstance(result, tuple) and result[0] is False:
247            if _retry_on_failure:
248                return _process_chunk(_chunk, _retry_on_failure=False)
249
250        return result
251
252    try:
253        stream_results = not as_iterator and chunk_hook is not None and chunksize is not None
254        with warnings.catch_warnings():
255            warnings.filterwarnings('ignore', 'case sensitivity issues')
256
257            read_sql_query_kwargs = {
258                'params': params,
259                'dtype': dtype,
260                'coerce_float': coerce_float,
261                'index_col': index_col,
262            }
263            if is_dask:
264                if index_col is None:
265                    dd = None
266                    pd = attempt_import('pandas')
267                    read_sql_query_kwargs.update({
268                        'chunksize': chunksize,
269                    })
270            else:
271                read_sql_query_kwargs.update({
272                    'chunksize': chunksize,
273                })
274
275            if is_dask and dd is not None:
276                ddf = dd.read_sql_query(
277                    formatted_query,
278                    self.URI,
279                    **read_sql_query_kwargs
280                )
281            else:
282
283                def get_chunk_generator(connectable):
284                    chunk_generator = pd.read_sql_query(
285                        formatted_query,
286                        self.engine,
287                        **read_sql_query_kwargs
288                    )
289                    to_return = (
290                        chunk_generator
291                        if as_iterator or chunksize is None
292                        else (
293                            list(pool.imap(_process_chunk, chunk_generator))
294                            if as_hook_results
295                            else None
296                        )
297                    )
298                    return chunk_generator, to_return
299
300                if self.flavor in SKIP_READ_TRANSACTION_FLAVORS:
301                    chunk_generator, to_return = get_chunk_generator(self.engine)
302                else:
303                    with self.engine.begin() as transaction:
304                        with transaction.execution_options(stream_results=stream_results) as connection:
305                            chunk_generator, to_return = get_chunk_generator(connection)
306
307                if to_return is not None:
308                    return to_return
309
310    except Exception as e:
311        if debug:
312            dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n")
313        if not silent:
314            warn(str(e), stacklevel=3)
315        from meerschaum.utils.formatting import get_console
316        if not silent:
317            get_console().print_exception()
318
319        return None
320
321    if is_dask and dd is not None:
322        ddf = ddf.reset_index()
323        return ddf
324
325    chunk_list = []
326    read_chunks = 0
327    chunk_hook_results = []
328    if chunksize is None:
329        chunk_list.append(chunk_generator)
330    elif as_iterator:
331        return chunk_generator
332    else:
333        try:
334            for chunk in chunk_generator:
335                if chunk_hook is not None:
336                    chunk_hook_results.append(
337                        chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
338                    )
339                chunk_list.append(chunk)
340                read_chunks += 1
341                if chunks is not None and read_chunks >= chunks:
342                    break
343        except Exception as e:
344            warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
345            from meerschaum.utils.formatting import get_console
346            if not silent:
347                get_console().print_exception()
348
349    read_chunks = 0
350    try:
351        for chunk in chunk_generator:
352            if chunk_hook is not None:
353                chunk_hook_results.append(
354                    chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
355                )
356            chunk_list.append(chunk)
357            read_chunks += 1
358            if chunks is not None and read_chunks >= chunks:
359                break
360    except Exception as e:
361        warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
362        from meerschaum.utils.formatting import get_console
363        if not silent:
364            get_console().print_exception()
365
366        return None
367
368    ### If no chunks returned, read without chunks
369    ### to get columns
370    if len(chunk_list) == 0:
371        with warnings.catch_warnings():
372            warnings.filterwarnings('ignore', 'case sensitivity issues')
373            _ = read_sql_query_kwargs.pop('chunksize', None)
374            with self.engine.begin() as connection:
375                chunk_list.append(
376                    pd.read_sql_query(
377                        formatted_query,
378                        connection,
379                        **read_sql_query_kwargs
380                    )
381                )
382
383    ### call the hook on any missed chunks.
384    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
385        for c in chunk_list[len(chunk_hook_results):]:
386            chunk_hook_results.append(
387                chunk_hook(c, chunksize=chunksize, debug=debug, **kw)
388            )
389
390    ### chunksize is not None so must iterate
391    if debug:
392        end = time.perf_counter()
393        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")
394
395    if as_hook_results:
396        return chunk_hook_results
397    
398    ### Skip `pd.concat()` if `as_chunks` is specified.
399    if as_chunks:
400        for c in chunk_list:
401            c.reset_index(drop=True, inplace=True)
402            for col in get_numeric_cols(c):
403                c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
404        return chunk_list
405
406    df = pd.concat(chunk_list).reset_index(drop=True)
407    ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes.
408    for col in get_numeric_cols(df):
409        df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
410
411    return df

Read a SQL query or table into a pandas dataframe.

Parameters
  • query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
  • params (Optional[Dict[str, Any]], default None): List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
  • dtype (Optional[Dict[str, Any]], default None): A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
  • chunksize (Optional[int], default -1): How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

    NOTE: DuckDB does not allow for chunking.

  • workers (Optional[int], default None): How many threads to use when consuming the generator. Only applies if chunk_hook is provided.
  • chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None): Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
  • as_hook_results (bool, default False): If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

    NOTE: as_iterator MUST be False (default).

  • chunks (Optional[int], default None): Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
  • schema (Optional[str], default None): If just a table name is provided, optionally specify the table schema. Defaults to SQLConnector.schema.
  • as_chunks (bool, default False): If True, return a list of DataFrames. Otherwise return a single DataFrame.
  • as_iterator (bool, default False): If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case.
  • index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
  • silent (bool, default False): If True, don't raise warnings in case of errors. Defaults to False.
Returns
  • A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators,
  • or None if something breaks.
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) -> Any:
414def value(
415    self,
416    query: str,
417    *args: Any,
418    use_pandas: bool = False,
419    **kw: Any
420) -> Any:
421    """
422    Execute the provided query and return the first value.
423
424    Parameters
425    ----------
426    query: str
427        The SQL query to execute.
428        
429    *args: Any
430        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
431        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
432        
433    use_pandas: bool, default False
434        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
435        `meerschaum.connectors.sql.SQLConnector.exec` (default).
436        **NOTE:** This is always `True` for DuckDB.
437
438    **kw: Any
439        See `args`.
440
441    Returns
442    -------
443    Any value returned from the query.
444
445    """
446    from meerschaum.utils.packages import attempt_import
447    if self.flavor == 'duckdb':
448        use_pandas = True
449    if use_pandas:
450        try:
451            return self.read(query, *args, **kw).iloc[0, 0]
452        except Exception:
453            return None
454
455    _close = kw.get('close', True)
456    _commit = kw.get('commit', (self.flavor != 'mssql'))
457
458    try:
459        result, connection = self.exec(
460            query,
461            *args,
462            with_connection=True,
463            close=False,
464            commit=_commit,
465            **kw
466        )
467        first = result.first() if result is not None else None
468        _val = first[0] if first is not None else None
469    except Exception as e:
470        warn(e, stacklevel=3)
471        return None
472    if _close:
473        try:
474            connection.close()
475        except Exception as e:
476            warn("Failed to close connection with exception:\n" + str(e))
477    return _val

Execute the provided query and return the first value.

Parameters
Returns
  • Any value returned from the query.
def exec( self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, _connection=None, _transaction=None, **kw: Any) -> 'Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]':
491def exec(
492    self,
493    query: str,
494    *args: Any,
495    silent: bool = False,
496    debug: bool = False,
497    commit: Optional[bool] = None,
498    close: Optional[bool] = None,
499    with_connection: bool = False,
500    _connection=None,
501    _transaction=None,
502    **kw: Any
503) -> Union[
504        sqlalchemy.engine.result.resultProxy,
505        sqlalchemy.engine.cursor.LegacyCursorResult,
506        Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
507        Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
508        None
509]:
510    """
511    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
512
513    If inserting data, please use bind variables to avoid SQL injection!
514
515    Parameters
516    ----------
517    query: Union[str, List[str], Tuple[str]]
518        The query to execute.
519        If `query` is a list or tuple, call `self.exec_queries()` instead.
520
521    args: Any
522        Arguments passed to `sqlalchemy.engine.execute`.
523
524    silent: bool, default False
525        If `True`, suppress warnings.
526
527    commit: Optional[bool], default None
528        If `True`, commit the changes after execution.
529        Causes issues with flavors like `'mssql'`.
530        This does not apply if `query` is a list of strings.
531
532    close: Optional[bool], default None
533        If `True`, close the connection after execution.
534        Causes issues with flavors like `'mssql'`.
535        This does not apply if `query` is a list of strings.
536
537    with_connection: bool, default False
538        If `True`, return a tuple including the connection object.
539        This does not apply if `query` is a list of strings.
540
541    Returns
542    -------
543    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.
544
545    """
546    if isinstance(query, (list, tuple)):
547        return self.exec_queries(
548            list(query),
549            *args,
550            silent=silent,
551            debug=debug,
552            **kw
553        )
554
555    from meerschaum.utils.packages import attempt_import
556    sqlalchemy = attempt_import("sqlalchemy", lazy=False)
557    if debug:
558        dprint(f"[{self}] Executing query:\n{query}")
559
560    _close = close if close is not None else (self.flavor != 'mssql')
561    _commit = commit if commit is not None else (
562        (self.flavor != 'mssql' or 'select' not in str(query).lower())
563    )
564
565    ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+).
566    if not hasattr(query, 'compile'):
567        query = sqlalchemy.text(query)
568
569    connection = _connection if _connection is not None else self.get_connection()
570
571    try:
572        transaction = (
573            _transaction
574            if _transaction is not None else (
575                connection.begin()
576                if _commit
577                else None
578            )
579        )
580    except sqlalchemy.exc.InvalidRequestError as e:
581        if _connection is not None or _transaction is not None:
582            raise e
583        connection = self.get_connection(rebuild=True)
584        transaction = connection.begin()
585
586    if transaction is not None and not transaction.is_active and _transaction is not None:
587        connection = self.get_connection(rebuild=True)
588        transaction = connection.begin() if _commit else None
589
590    result = None
591    try:
592        result = connection.execute(query, *args, **kw)
593        if _commit:
594            transaction.commit()
595    except Exception as e:
596        if debug:
597            dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}")
598        if not silent:
599            warn(str(e), stacklevel=3)
600        result = None
601        if _commit:
602            transaction.rollback()
603            connection = self.get_connection(rebuild=True)
604    finally:
605        if _close:
606            connection.close()
607
608    if with_connection:
609        return result, connection
610
611    return result

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters
  • query (Union[str, List[str], Tuple[str]]): The query to execute. If query is a list or tuple, call self.exec_queries() instead.
  • args (Any): Arguments passed to sqlalchemy.engine.execute.
  • silent (bool, default False): If True, suppress warnings.
  • commit (Optional[bool], default None): If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • close (Optional[bool], default None): If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • with_connection (bool, default False): If True, return a tuple including the connection object. This does not apply if query is a list of strings.
Returns
  • The sqlalchemy result object, or a tuple with the connection if with_connection is provided.
def execute( self, *args: Any, **kw: Any) -> 'Optional[sqlalchemy.engine.result.resultProxy]':
480def execute(
481    self,
482    *args : Any,
483    **kw : Any
484) -> Optional[sqlalchemy.engine.result.resultProxy]:
485    """
486    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
487    """
488    return self.exec(*args, **kw)
def to_sql( self, df: pandas.core.frame.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, schema: Optional[str] = None, safe_copy: bool = True, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, _connection=None, _transaction=None, **kw) -> Union[bool, Tuple[bool, str]]:
 709def to_sql(
 710    self,
 711    df: pandas.DataFrame,
 712    name: str = None,
 713    index: bool = False,
 714    if_exists: str = 'replace',
 715    method: str = "",
 716    chunksize: Optional[int] = -1,
 717    schema: Optional[str] = None,
 718    safe_copy: bool = True,
 719    silent: bool = False,
 720    debug: bool = False,
 721    as_tuple: bool = False,
 722    as_dict: bool = False,
 723    _connection=None,
 724    _transaction=None,
 725    **kw
 726) -> Union[bool, SuccessTuple]:
 727    """
 728    Upload a DataFrame's contents to the SQL server.
 729
 730    Parameters
 731    ----------
 732    df: pd.DataFrame
 733        The DataFrame to be inserted.
 734
 735    name: str
 736        The name of the table to be created.
 737
 738    index: bool, default False
 739        If True, creates the DataFrame's indices as columns.
 740
 741    if_exists: str, default 'replace'
 742        Drop and create the table ('replace') or append if it exists
 743        ('append') or raise Exception ('fail').
 744        Options are ['replace', 'append', 'fail'].
 745
 746    method: str, default ''
 747        None or multi. Details on pandas.to_sql.
 748
 749    chunksize: Optional[int], default -1
 750        How many rows to insert at a time.
 751
 752    schema: Optional[str], default None
 753        Optionally override the schema for the table.
 754        Defaults to `SQLConnector.schema`.
 755
 756    safe_copy: bool, defaul True
 757        If `True`, copy the dataframe before making any changes.
 758
 759    as_tuple: bool, default False
 760        If `True`, return a (success_bool, message) tuple instead of a `bool`.
 761        Defaults to `False`.
 762
 763    as_dict: bool, default False
 764        If `True`, return a dictionary of transaction information.
 765        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
 766        `method`, and `target`.
 767
 768    kw: Any
 769        Additional arguments will be passed to the DataFrame's `to_sql` function
 770
 771    Returns
 772    -------
 773    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
 774    """
 775    import time
 776    import json
 777    from datetime import timedelta
 778    from meerschaum.utils.warnings import error, warn
 779    import warnings
 780    import functools
 781
 782    if name is None:
 783        error(f"Name must not be `None` to insert data into {self}.")
 784
 785    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
 786    kw.pop('name', None)
 787
 788    schema = schema or self.schema
 789
 790    from meerschaum.utils.sql import (
 791        sql_item_name,
 792        table_exists,
 793        json_flavors,
 794        truncate_item_name,
 795        DROP_IF_EXISTS_FLAVORS,
 796    )
 797    from meerschaum.utils.dataframe import (
 798        get_json_cols,
 799        get_numeric_cols,
 800        get_uuid_cols,
 801        get_bytes_cols,
 802    )
 803    from meerschaum.utils.dtypes import (
 804        are_dtypes_equal,
 805        coerce_timezone,
 806        encode_bytes_for_bytea,
 807        serialize_bytes,
 808        serialize_decimal,
 809        json_serialize_value,
 810    )
 811    from meerschaum.utils.dtypes.sql import (
 812        PD_TO_SQLALCHEMY_DTYPES_FLAVORS,
 813        get_db_type_from_pd_type,
 814        get_pd_type_from_db_type,
 815        get_numeric_precision_scale,
 816    )
 817    from meerschaum.utils.misc import interval_str
 818    from meerschaum.connectors.sql._create_engine import flavor_configs
 819    from meerschaum.utils.packages import attempt_import, import_pandas
 820    sqlalchemy = attempt_import('sqlalchemy', debug=debug, lazy=False)
 821    pd = import_pandas()
 822    is_dask = 'dask' in df.__module__
 823
 824    bytes_cols = get_bytes_cols(df)
 825    numeric_cols = get_numeric_cols(df)
 826    ### NOTE: This excludes non-numeric serialized Decimals (e.g. SQLite).
 827    numeric_cols_dtypes = {
 828        col: typ
 829        for col, typ in kw.get('dtype', {}).items()
 830        if (
 831            col in df.columns
 832            and 'numeric' in str(typ).lower()
 833        )
 834        
 835    }
 836    numeric_cols.extend([col for col in numeric_cols_dtypes if col not in numeric_cols])
 837    numeric_cols_precisions_scales = {
 838        col: (
 839            (typ.precision, typ.scale)
 840            if hasattr(typ, 'precision')
 841            else get_numeric_precision_scale(self.flavor)
 842        )
 843        for col, typ in numeric_cols_dtypes.items()
 844    }
 845    cols_pd_types = {
 846        col: get_pd_type_from_db_type(str(typ))
 847        for col, typ in kw.get('dtype', {}).items()
 848    }
 849    cols_pd_types.update({
 850        col: f'numeric[{precision},{scale}]'
 851        for col, (precision, scale) in numeric_cols_precisions_scales.items()
 852        if precision and scale
 853    })
 854    cols_db_types = {
 855        col: get_db_type_from_pd_type(typ, flavor=self.flavor)
 856        for col, typ in cols_pd_types.items()
 857    }
 858
 859    enable_bulk_insert = mrsm.get_config(
 860        'system', 'connectors', 'sql', 'bulk_insert'
 861    ).get(self.flavor, False)
 862    stats = {'target': name}
 863    ### resort to defaults if None
 864    copied = False
 865    use_bulk_insert = False
 866    if method == "":
 867        if enable_bulk_insert:
 868            method = (
 869                functools.partial(mssql_insert_json, cols_types=cols_db_types, debug=debug)
 870                if self.flavor == 'mssql'
 871                else functools.partial(psql_insert_copy, debug=debug)
 872            )
 873            use_bulk_insert = True
 874        else:
 875            ### Should resolve to 'multi' or `None`.
 876            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
 877
 878    if bytes_cols and (use_bulk_insert or self.flavor == 'oracle'):
 879        if safe_copy and not copied:
 880            df = df.copy()
 881            copied = True
 882        bytes_serializer = (
 883            functools.partial(encode_bytes_for_bytea, with_prefix=(self.flavor != 'oracle'))
 884            if self.flavor != 'mssql'
 885            else serialize_bytes
 886        )
 887        for col in bytes_cols:
 888            df[col] = df[col].apply(bytes_serializer)
 889
 890    ### Check for numeric columns.
 891    for col in numeric_cols:
 892        precision, scale = numeric_cols_precisions_scales.get(
 893            col,
 894            get_numeric_precision_scale(self.flavor)
 895        )
 896        df[col] = df[col].apply(
 897            functools.partial(
 898                serialize_decimal,
 899                quantize=True,
 900                precision=precision,
 901                scale=scale,
 902            )
 903        )
 904
 905    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)
 906
 907    default_chunksize = self._sys_config.get('chunksize', None)
 908    chunksize = chunksize if chunksize != -1 else default_chunksize
 909    if chunksize is not None and self.flavor in _max_chunks_flavors:
 910        if chunksize > _max_chunks_flavors[self.flavor]:
 911            if chunksize != default_chunksize:
 912                warn(
 913                    f"The specified chunksize of {chunksize} exceeds the maximum of "
 914                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
 915                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
 916                    stacklevel = 3,
 917                )
 918            chunksize = _max_chunks_flavors[self.flavor]
 919    stats['chunksize'] = chunksize
 920
 921    success, msg = False, "Default to_sql message"
 922    start = time.perf_counter()
 923    if debug:
 924        msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..."
 925        print(msg, end="", flush=True)
 926    stats['num_rows'] = len(df)
 927
 928    ### Check if the name is too long.
 929    truncated_name = truncate_item_name(name, self.flavor)
 930    if name != truncated_name:
 931        warn(
 932            f"Table '{name}' is too long for '{self.flavor}',"
 933            f" will instead create the table '{truncated_name}'."
 934        )
 935
 936    ### filter out non-pandas args
 937    import inspect
 938    to_sql_params = inspect.signature(df.to_sql).parameters
 939    to_sql_kw = {}
 940    for k, v in kw.items():
 941        if k in to_sql_params:
 942            to_sql_kw[k] = v
 943
 944    to_sql_kw.update({
 945        'name': truncated_name,
 946        'schema': schema,
 947        ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI),
 948        'index': index,
 949        'if_exists': if_exists,
 950        'method': method,
 951        'chunksize': chunksize,
 952    })
 953    if is_dask:
 954        to_sql_kw.update({
 955            'parallel': True,
 956        })
 957    elif _connection is not None:
 958        to_sql_kw['con'] = _connection
 959
 960    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
 961    if self.flavor == 'oracle':
 962        ### For some reason 'replace' doesn't work properly in pandas,
 963        ### so try dropping first.
 964        if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug):
 965            success = self.exec(
 966                f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema)
 967            ) is not None
 968            if not success:
 969                warn(f"Unable to drop {name}")
 970
 971        ### Enforce NVARCHAR(2000) as text instead of CLOB.
 972        dtype = to_sql_kw.get('dtype', {})
 973        for col, typ in df.dtypes.items():
 974            if are_dtypes_equal(str(typ), 'object'):
 975                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
 976            elif are_dtypes_equal(str(typ), 'int'):
 977                dtype[col] = sqlalchemy.types.INTEGER
 978        to_sql_kw['dtype'] = dtype
 979    elif self.flavor == 'duckdb':
 980        dtype = to_sql_kw.get('dtype', {})
 981        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
 982        for col in dt_cols:
 983            df[col] = coerce_timezone(df[col], strip_utc=False)
 984    elif self.flavor == 'mssql':
 985        dtype = to_sql_kw.get('dtype', {})
 986        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
 987        new_dtype = {}
 988        for col in dt_cols:
 989            if col in dtype:
 990                continue
 991            dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True)
 992            if col not in dtype:
 993                new_dtype[col] = dt_typ
 994
 995        dtype.update(new_dtype)
 996        to_sql_kw['dtype'] = dtype
 997
 998    ### Check for JSON columns.
 999    if self.flavor not in json_flavors:
1000        json_cols = get_json_cols(df)
1001        for col in json_cols:
1002            df[col] = df[col].apply(
1003                (
1004                    lambda x: json.dumps(x, default=json_serialize_value, sort_keys=True)
1005                    if not isinstance(x, Hashable)
1006                    else x
1007                )
1008            )
1009
1010    if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid':
1011        uuid_cols = get_uuid_cols(df)
1012        for col in uuid_cols:
1013            df[col] = df[col].astype(str)
1014
1015    try:
1016        with warnings.catch_warnings():
1017            warnings.filterwarnings('ignore')
1018            df.to_sql(**to_sql_kw)
1019        success = True
1020    except Exception as e:
1021        if not silent:
1022            warn(str(e))
1023        success, msg = False, str(e)
1024
1025    end = time.perf_counter()
1026    if success:
1027        num_rows = len(df)
1028        msg = (
1029            f"It took {interval_str(timedelta(seconds=(end - start)))} "
1030            + f"to sync {num_rows:,} row"
1031            + ('s' if num_rows != 1 else '')
1032            + f" to {name}."
1033        )
1034    stats['start'] = start
1035    stats['end'] = end
1036    stats['duration'] = end - start
1037
1038    if debug:
1039        print(" done.", flush=True)
1040        dprint(msg)
1041
1042    stats['success'] = success
1043    stats['msg'] = msg
1044    if as_tuple:
1045        return success, msg
1046    if as_dict:
1047        return stats
1048    return success

Upload a DataFrame's contents to the SQL server.

Parameters
  • df (pd.DataFrame): The DataFrame to be inserted.
  • name (str): The name of the table to be created.
  • index (bool, default False): If True, creates the DataFrame's indices as columns.
  • if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
  • method (str, default ''): None or multi. Details on pandas.to_sql.
  • chunksize (Optional[int], default -1): How many rows to insert at a time.
  • schema (Optional[str], default None): Optionally override the schema for the table. Defaults to SQLConnector.schema.
  • safe_copy (bool, defaul True): If True, copy the dataframe before making any changes.
  • as_tuple (bool, default False): If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
  • as_dict (bool, default False): If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
  • kw (Any): Additional arguments will be passed to the DataFrame's to_sql function
Returns
  • Either a bool or a SuccessTuple (depends on as_tuple).
def exec_queries( self, queries: "List[Union[str, Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]]]", break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False) -> 'List[Union[sqlalchemy.engine.cursor.CursorResult, None]]':
614def exec_queries(
615    self,
616    queries: List[
617        Union[
618            str,
619            Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]
620        ]
621    ],
622    break_on_error: bool = False,
623    rollback: bool = True,
624    silent: bool = False,
625    debug: bool = False,
626) -> List[Union[sqlalchemy.engine.cursor.CursorResult, None]]:
627    """
628    Execute a list of queries in a single transaction.
629
630    Parameters
631    ----------
632    queries: List[
633        Union[
634            str,
635            Tuple[str, Callable[[], List[str]]]
636        ]
637    ]
638        The queries in the transaction to be executed.
639        If a query is a tuple, the second item of the tuple
640        will be considered a callable hook that returns a list of queries to be executed
641        before the next item in the list.
642
643    break_on_error: bool, default False
644        If `True`, stop executing when a query fails.
645
646    rollback: bool, default True
647        If `break_on_error` is `True`, rollback the transaction if a query fails.
648
649    silent: bool, default False
650        If `True`, suppress warnings.
651
652    Returns
653    -------
654    A list of SQLAlchemy results.
655    """
656    from meerschaum.utils.warnings import warn
657    from meerschaum.utils.debug import dprint
658    from meerschaum.utils.packages import attempt_import
659    sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm', lazy=False)
660    session = sqlalchemy_orm.Session(self.engine)
661
662    result = None
663    results = []
664    with session.begin():
665        for query in queries:
666            hook = None
667            result = None
668
669            if isinstance(query, tuple):
670                query, hook = query
671            if isinstance(query, str):
672                query = sqlalchemy.text(query)
673
674            if debug:
675                dprint(f"[{self}]\n" + str(query))
676
677            try:
678                result = session.execute(query)
679                session.flush()
680            except Exception as e:
681                msg = (f"Encountered error while executing:\n{e}")
682                if not silent:
683                    warn(msg)
684                elif debug:
685                    dprint(f"[{self}]\n" + str(msg))
686                result = None
687            if result is None and break_on_error:
688                if rollback:
689                    session.rollback()
690                results.append(result)
691                break
692            elif result is not None and hook is not None:
693                hook_queries = hook(session)
694                if hook_queries:
695                    hook_results = self.exec_queries(
696                        hook_queries,
697                        break_on_error = break_on_error,
698                        rollback=rollback,
699                        silent=silent,
700                        debug=debug,
701                    )
702                    result = (result, hook_results)
703
704            results.append(result)
705
706    return results

Execute a list of queries in a single transaction.

Parameters
  • queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
  • ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
  • break_on_error (bool, default False): If True, stop executing when a query fails.
  • rollback (bool, default True): If break_on_error is True, rollback the transaction if a query fails.
  • silent (bool, default False): If True, suppress warnings.
Returns
  • A list of SQLAlchemy results.
def get_connection(self, rebuild: bool = False) -> "'sqlalchemy.engine.base.Connection'":
1231def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection':
1232    """
1233    Return the current alive connection.
1234
1235    Parameters
1236    ----------
1237    rebuild: bool, default False
1238        If `True`, close the previous connection and open a new one.
1239
1240    Returns
1241    -------
1242    A `sqlalchemy.engine.base.Connection` object.
1243    """
1244    import threading
1245    if '_thread_connections' not in self.__dict__:
1246        self.__dict__['_thread_connections'] = {}
1247
1248    self._cleanup_connections()
1249
1250    thread_id = threading.get_ident()
1251
1252    thread_connections = self.__dict__.get('_thread_connections', {})
1253    connection = thread_connections.get(thread_id, None)
1254
1255    if rebuild and connection is not None:
1256        try:
1257            connection.close()
1258        except Exception:
1259            pass
1260
1261        _ = thread_connections.pop(thread_id, None)
1262        connection = None
1263
1264    if connection is None or connection.closed:
1265        connection = self.engine.connect()
1266        thread_connections[thread_id] = connection
1267
1268    return connection

Return the current alive connection.

Parameters
  • rebuild (bool, default False): If True, close the previous connection and open a new one.
Returns
  • A sqlalchemy.engine.base.Connection object.
def test_connection(self, **kw: Any) -> Optional[bool]:
707def test_connection(
708    self,
709    **kw: Any
710) -> Union[bool, None]:
711    """
712    Test if a successful connection to the database may be made.
713
714    Parameters
715    ----------
716    **kw:
717        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
718
719    Returns
720    -------
721    `True` if a connection is made, otherwise `False` or `None` in case of failure.
722
723    """
724    import warnings
725    from meerschaum.connectors.poll import retry_connect
726    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
727    _default_kw.update(kw)
728    with warnings.catch_warnings():
729        warnings.filterwarnings('ignore', 'Could not')
730        try:
731            return retry_connect(**_default_kw)
732        except Exception:
733            return False

Test if a successful connection to the database may be made.

Parameters
Returns
  • True if a connection is made, otherwise False or None in case of failure.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', List[Any], None]":
18def fetch(
19    self,
20    pipe: mrsm.Pipe,
21    begin: Union[datetime, int, str, None] = '',
22    end: Union[datetime, int, str, None] = None,
23    check_existing: bool = True,
24    chunksize: Optional[int] = -1,
25    workers: Optional[int] = None,
26    debug: bool = False,
27    **kw: Any
28) -> Union['pd.DataFrame', List[Any], None]:
29    """Execute the SQL definition and return a Pandas DataFrame.
30
31    Parameters
32    ----------
33    pipe: mrsm.Pipe
34        The pipe object which contains the `fetch` metadata.
35
36        - pipe.columns['datetime']: str
37            - Name of the datetime column for the remote table.
38        - pipe.parameters['fetch']: Dict[str, Any]
39            - Parameters necessary to execute a query.
40        - pipe.parameters['fetch']['definition']: str
41            - Raw SQL query to execute to generate the pandas DataFrame.
42        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
43            - How many minutes before `begin` to search for data (*optional*).
44
45    begin: Union[datetime, int, str, None], default None
46        Most recent datatime to search for data.
47        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
48
49    end: Union[datetime, int, str, None], default None
50        The latest datetime to search for data.
51        If `end` is `None`, do not bound 
52
53    check_existing: bool, defult True
54        If `False`, use a backtrack interval of 0 minutes.
55
56    chunksize: Optional[int], default -1
57        How many rows to load into memory at once.
58        Otherwise the entire result set is loaded into memory.
59
60    workers: Optional[int], default None
61        How many threads to use when consuming the generator.
62        Defaults to the number of cores.
63
64    debug: bool, default False
65        Verbosity toggle.
66
67    Returns
68    -------
69    A pandas DataFrame generator.
70    """
71    meta_def = self.get_pipe_metadef(
72        pipe,
73        begin=begin,
74        end=end,
75        check_existing=check_existing,
76        debug=debug,
77        **kw
78    )
79    chunks = self.read(
80        meta_def,
81        chunksize=chunksize,
82        workers=workers,
83        as_iterator=True,
84        debug=debug,
85    )
86    return chunks

Execute the SQL definition and return a Pandas DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe object which contains the fetch metadata.

    • pipe.columns['datetime']: str
      • Name of the datetime column for the remote table.
    • pipe.parameters['fetch']: Dict[str, Any]
      • Parameters necessary to execute a query.
    • pipe.parameters['fetch']['definition']: str
      • Raw SQL query to execute to generate the pandas DataFrame.
    • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
      • How many minutes before begin to search for data (optional).
  • begin (Union[datetime, int, str, None], default None): Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
  • end (Union[datetime, int, str, None], default None): The latest datetime to search for data. If end is None, do not bound
  • check_existing (bool, defult True): If False, use a backtrack interval of 0 minutes.
  • chunksize (Optional[int], default -1): How many rows to load into memory at once. Otherwise the entire result set is loaded into memory.
  • workers (Optional[int], default None): How many threads to use when consuming the generator. Defaults to the number of cores.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas DataFrame generator.
def get_pipe_metadef( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, debug: bool = False, **kw: Any) -> Optional[str]:
 89def get_pipe_metadef(
 90    self,
 91    pipe: mrsm.Pipe,
 92    params: Optional[Dict[str, Any]] = None,
 93    begin: Union[datetime, int, str, None] = '',
 94    end: Union[datetime, int, str, None] = None,
 95    check_existing: bool = True,
 96    debug: bool = False,
 97    **kw: Any
 98) -> Union[str, None]:
 99    """
100    Return a pipe's meta definition fetch query.
101
102    params: Optional[Dict[str, Any]], default None
103        Optional params dictionary to build the `WHERE` clause.
104        See `meerschaum.utils.sql.build_where`.
105
106    begin: Union[datetime, int, str, None], default None
107        Most recent datatime to search for data.
108        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
109
110    end: Union[datetime, int, str, None], default None
111        The latest datetime to search for data.
112        If `end` is `None`, do not bound 
113
114    check_existing: bool, default True
115        If `True`, apply the backtrack interval.
116
117    debug: bool, default False
118        Verbosity toggle.
119
120    Returns
121    -------
122    A pipe's meta definition fetch query string.
123    """
124    from meerschaum.utils.warnings import warn
125    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
126    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
127    from meerschaum.config import get_config
128
129    dt_col = pipe.columns.get('datetime', None)
130    if not dt_col:
131        dt_col = pipe.guess_datetime()
132        dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
133        is_guess = True
134    else:
135        dt_name = sql_item_name(dt_col, self.flavor, None)
136        is_guess = False
137    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
138    db_dt_typ = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
139
140    if begin not in (None, '') or end is not None:
141        if is_guess:
142            if dt_col is None:
143                warn(
144                    f"Unable to determine a datetime column for {pipe}."
145                    + "\n    Ignoring begin and end...",
146                    stack=False,
147                )
148                begin, end = '', None
149            else:
150                warn(
151                    f"A datetime wasn't specified for {pipe}.\n"
152                    + f"    Using column \"{dt_col}\" for datetime bounds...",
153                    stack=False
154                )
155
156    apply_backtrack = begin == '' and check_existing
157    backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug)
158    btm = (
159        int(backtrack_interval.total_seconds() / 60)
160        if isinstance(backtrack_interval, timedelta)
161        else backtrack_interval
162    )
163    begin = (
164        pipe.get_sync_time(debug=debug)
165        if begin == ''
166        else begin
167    )
168
169    if begin not in (None, '') and end is not None and begin >= end:
170        begin = None
171
172    if dt_name:
173        begin_da = (
174            dateadd_str(
175                flavor=self.flavor,
176                datepart='minute',
177                number=((-1 * btm) if apply_backtrack else 0),
178                begin=begin,
179                db_type=db_dt_typ,
180            )
181            if begin not in ('', None)
182            else None
183        )
184        end_da = (
185            dateadd_str(
186                flavor=self.flavor,
187                datepart='minute',
188                number=0,
189                begin=end,
190                db_type=db_dt_typ,
191            )
192            if end is not None
193            else None
194        )
195
196    definition_name = sql_item_name('definition', self.flavor, None)
197    meta_def = (
198        _simple_fetch_query(pipe, self.flavor) if (
199            (not (pipe.columns or {}).get('id', None))
200            or (not get_config('system', 'experimental', 'join_fetch'))
201        ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw)
202    )
203
204    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
205    if dt_name and (begin_da or end_da):
206        definition_dt_name = f"{definition_name}.{dt_name}"
207        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
208        has_where = True
209        if begin_da:
210            meta_def += f"\n    {definition_dt_name}\n    >=\n    {begin_da}\n"
211        if begin_da and end_da:
212            meta_def += "    AND"
213        if end_da:
214            meta_def += f"\n    {definition_dt_name}\n    <\n    {end_da}\n"
215
216    if params is not None:
217        params_where = build_where(params, self, with_where=False)
218        meta_def += "\n    " + ("AND" if has_where else "WHERE") + "    "
219        has_where = True
220        meta_def += params_where
221
222    return meta_def.rstrip()

Return a pipe's meta definition fetch query.

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.

begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime, int, str, None], default None The latest datetime to search for data. If end is None, do not bound

check_existing: bool, default True If True, apply the backtrack interval.

debug: bool, default False Verbosity toggle.

Returns
  • A pipe's meta definition fetch query string.
def cli(self, debug: bool = False) -> Tuple[bool, str]:
36def cli(
37    self,
38    debug: bool = False,
39) -> SuccessTuple:
40    """
41    Launch a subprocess for an interactive CLI.
42    """
43    from meerschaum.utils.warnings import dprint
44    from meerschaum.utils.venv import venv_exec
45    env = copy.deepcopy(dict(os.environ))
46    env_key = f"MRSM_SQL_{self.label.upper()}"
47    env_val = json.dumps(self.meta)
48    env[env_key] = env_val
49    cli_code = (
50        "import sys\n"
51        "import meerschaum as mrsm\n"
52        "import os\n"
53        f"conn = mrsm.get_connector('sql:{self.label}')\n"
54        "success, msg = conn._cli_exit()\n"
55        "mrsm.pprint((success, msg))\n"
56        "if not success:\n"
57        "    raise Exception(msg)"
58    )
59    if debug:
60        dprint(cli_code)
61    try:
62        _ = venv_exec(cli_code, venv=None, env=env, debug=debug, capture_output=False)
63    except Exception as e:
64        return False, f"[{self}] Failed to start CLI:\n{e}"
65    return True, "Success"

Launch a subprocess for an interactive CLI.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Optional[List[Tuple[str, str, Optional[str]]]]:
144def fetch_pipes_keys(
145    self,
146    connector_keys: Optional[List[str]] = None,
147    metric_keys: Optional[List[str]] = None,
148    location_keys: Optional[List[str]] = None,
149    tags: Optional[List[str]] = None,
150    params: Optional[Dict[str, Any]] = None,
151    debug: bool = False
152) -> Optional[List[Tuple[str, str, Optional[str]]]]:
153    """
154    Return a list of tuples corresponding to the parameters provided.
155
156    Parameters
157    ----------
158    connector_keys: Optional[List[str]], default None
159        List of connector_keys to search by.
160
161    metric_keys: Optional[List[str]], default None
162        List of metric_keys to search by.
163
164    location_keys: Optional[List[str]], default None
165        List of location_keys to search by.
166
167    params: Optional[Dict[str, Any]], default None
168        Dictionary of additional parameters to search by.
169        E.g. `--params pipe_id:1`
170
171    debug: bool, default False
172        Verbosity toggle.
173    """
174    from meerschaum.utils.debug import dprint
175    from meerschaum.utils.packages import attempt_import
176    from meerschaum.utils.misc import separate_negation_values
177    from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists
178    from meerschaum.config.static import STATIC_CONFIG
179    import json
180    from copy import deepcopy
181    sqlalchemy, sqlalchemy_sql_functions = attempt_import(
182        'sqlalchemy',
183        'sqlalchemy.sql.functions', lazy=False,
184    )
185    coalesce = sqlalchemy_sql_functions.coalesce
186
187    if connector_keys is None:
188        connector_keys = []
189    if metric_keys is None:
190        metric_keys = []
191    if location_keys is None:
192        location_keys = []
193    else:
194        location_keys = [
195            (
196                lk
197                if lk not in ('[None]', 'None', 'null')
198                else 'None'
199            )
200            for lk in location_keys
201        ]
202    if tags is None:
203        tags = []
204
205    if params is None:
206        params = {}
207
208    ### Add three primary keys to params dictionary
209    ###   (separated for convenience of arguments).
210    cols = {
211        'connector_keys': [str(ck) for ck in connector_keys],
212        'metric_key': [str(mk) for mk in metric_keys],
213        'location_key': [str(lk) for lk in location_keys],
214    }
215
216    ### Make deep copy so we don't mutate this somewhere else.
217    parameters = deepcopy(params)
218    for col, vals in cols.items():
219        if vals not in [[], ['*']]:
220            parameters[col] = vals
221
222    if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug):
223        return []
224
225    from meerschaum.connectors.sql.tables import get_tables
226    pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes']
227
228    _params = {}
229    for k, v in parameters.items():
230        _v = json.dumps(v) if isinstance(v, dict) else v
231        _params[k] = _v
232
233    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
234    ### Parse regular params.
235    ### If a param begins with '_', negate it instead.
236    _where = [
237        (
238            (coalesce(pipes_tbl.c[key], 'None') == val)
239            if not str(val).startswith(negation_prefix)
240            else (pipes_tbl.c[key] != key)
241        ) for key, val in _params.items()
242        if not isinstance(val, (list, tuple)) and key in pipes_tbl.c
243    ]
244    select_cols = (
245        [
246            pipes_tbl.c.connector_keys,
247            pipes_tbl.c.metric_key,
248            pipes_tbl.c.location_key,
249        ]
250    )
251
252    q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where))
253    for c, vals in cols.items():
254        if not isinstance(vals, (list, tuple)) or not vals or c not in pipes_tbl.c:
255            continue
256        _in_vals, _ex_vals = separate_negation_values(vals)
257        q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q
258        q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q
259
260    ### Finally, parse tags.
261    tag_groups = [tag.split(',') for tag in tags]
262    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
263
264    ors, nands = [], []
265    for _in_tags, _ex_tags in in_ex_tag_groups:
266        sub_ands = []
267        for nt in _in_tags:
268            sub_ands.append(
269                sqlalchemy.cast(
270                    pipes_tbl.c['parameters'],
271                    sqlalchemy.String,
272                ).like(f'%"tags":%"{nt}"%')
273            )
274        if sub_ands:
275            ors.append(sqlalchemy.and_(*sub_ands))
276
277        for xt in _ex_tags:
278            nands.append(
279                sqlalchemy.cast(
280                    pipes_tbl.c['parameters'],
281                    sqlalchemy.String,
282                ).not_like(f'%"tags":%"{xt}"%')
283            )
284
285    q = q.where(sqlalchemy.and_(*nands)) if nands else q
286    q = q.where(sqlalchemy.or_(*ors)) if ors else q
287    loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key'])
288    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
289        loc_asc = sqlalchemy.nullsfirst(loc_asc)
290    q = q.order_by(
291        sqlalchemy.asc(pipes_tbl.c['connector_keys']),
292        sqlalchemy.asc(pipes_tbl.c['metric_key']),
293        loc_asc,
294    )
295
296    ### execute the query and return a list of tuples
297    if debug:
298        dprint(q.compile(compile_kwargs={'literal_binds': True}))
299    try:
300        rows = (
301            self.execute(q).fetchall()
302            if self.flavor != 'duckdb'
303            else [
304                (row['connector_keys'], row['metric_key'], row['location_key'])
305                for row in self.read(q).to_dict(orient='records')
306            ]
307        )
308    except Exception as e:
309        error(str(e))
310
311    return [(row[0], row[1], row[2]) for row in rows]

Return a list of tuples corresponding to the parameters provided.

Parameters
  • connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
  • metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
  • location_keys (Optional[List[str]], default None): List of location_keys to search by.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. E.g. --params pipe_id:1
  • debug (bool, default False): Verbosity toggle.
def create_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
332def create_indices(
333    self,
334    pipe: mrsm.Pipe,
335    columns: Optional[List[str]] = None,
336    indices: Optional[List[str]] = None,
337    debug: bool = False
338) -> bool:
339    """
340    Create a pipe's indices.
341    """
342    from meerschaum.utils.debug import dprint
343    if debug:
344        dprint(f"Creating indices for {pipe}...")
345
346    if not pipe.indices:
347        warn(f"{pipe} has no index columns; skipping index creation.", stack=False)
348        return True
349
350    cols_to_include = set((columns or []) + (indices or [])) or None
351
352    _ = pipe.__dict__.pop('_columns_indices', None)
353    ix_queries = {
354        col: queries
355        for col, queries in self.get_create_index_queries(pipe, debug=debug).items()
356        if cols_to_include is None or col in cols_to_include
357    }
358    success = True
359    for col, queries in ix_queries.items():
360        ix_success = all(self.exec_queries(queries, debug=debug, silent=False))
361        success = success and ix_success
362        if not ix_success:
363            warn(f"Failed to create index on column: {col}")
364
365    return success

Create a pipe's indices.

def drop_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
386def drop_indices(
387    self,
388    pipe: mrsm.Pipe,
389    columns: Optional[List[str]] = None,
390    indices: Optional[List[str]] = None,
391    debug: bool = False
392) -> bool:
393    """
394    Drop a pipe's indices.
395    """
396    from meerschaum.utils.debug import dprint
397    if debug:
398        dprint(f"Dropping indices for {pipe}...")
399
400    if not pipe.indices:
401        warn(f"No indices to drop for {pipe}.", stack=False)
402        return False
403
404    cols_to_include = set((columns or []) + (indices or [])) or None
405
406    ix_queries = {
407        col: queries
408        for col, queries in self.get_drop_index_queries(pipe, debug=debug).items()
409        if cols_to_include is None or col in cols_to_include
410    }
411    success = True
412    for col, queries in ix_queries.items():
413        ix_success = all(self.exec_queries(queries, debug=debug, silent=(not debug)))
414        if not ix_success:
415            success = False
416            if debug:
417                dprint(f"Failed to drop index on column: {col}")
418    return success

Drop a pipe's indices.

def get_create_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
474def get_create_index_queries(
475    self,
476    pipe: mrsm.Pipe,
477    debug: bool = False,
478) -> Dict[str, List[str]]:
479    """
480    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.
481
482    Parameters
483    ----------
484    pipe: mrsm.Pipe
485        The pipe to which the queries will correspond.
486
487    Returns
488    -------
489    A dictionary of index names mapping to lists of queries.
490    """
491    ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly.
492    if self.flavor == 'duckdb':
493        return {}
494    from meerschaum.utils.sql import (
495        sql_item_name,
496        get_distinct_col_count,
497        UPDATE_QUERIES,
498        get_null_replacement,
499        get_create_table_queries,
500        get_rename_table_queries,
501        COALESCE_UNIQUE_INDEX_FLAVORS,
502    )
503    from meerschaum.utils.dtypes.sql import (
504        get_db_type_from_pd_type,
505        get_pd_type_from_db_type,
506        AUTO_INCREMENT_COLUMN_FLAVORS,
507    )
508    from meerschaum.config import get_config
509    index_queries = {}
510
511    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES
512    static = pipe.parameters.get('static', False)
513    null_indices = pipe.parameters.get('null_indices', True)
514    index_names = pipe.get_indices()
515    unique_index_name_unquoted = index_names.get('unique', None) or f'IX_{pipe.target}_unique'
516    if upsert:
517        _ = index_names.pop('unique', None)
518    indices = pipe.indices
519    existing_cols_types = pipe.get_columns_types(debug=debug)
520    existing_cols_pd_types = {
521        col: get_pd_type_from_db_type(typ)
522        for col, typ in existing_cols_types.items()
523    }
524    existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug)
525    existing_ix_names = set()
526    existing_primary_keys = []
527    existing_clustered_primary_keys = []
528    for col, col_indices in existing_cols_indices.items():
529        for col_ix_doc in col_indices:
530            existing_ix_names.add(col_ix_doc.get('name', '').lower())
531            if col_ix_doc.get('type', None) == 'PRIMARY KEY':
532                existing_primary_keys.append(col.lower())
533                if col_ix_doc.get('clustered', True):
534                    existing_clustered_primary_keys.append(col.lower())
535
536    _datetime = pipe.get_columns('datetime', error=False)
537    _datetime_name = (
538        sql_item_name(_datetime, self.flavor, None)
539        if _datetime is not None else None
540    )
541    _datetime_index_name = (
542        sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None)
543        if index_names.get('datetime', None)
544        else None
545    )
546    _id = pipe.get_columns('id', error=False)
547    _id_name = (
548        sql_item_name(_id, self.flavor, None)
549        if _id is not None
550        else None
551    )
552    primary_key = pipe.columns.get('primary', None)
553    primary_key_name = (
554        sql_item_name(primary_key, flavor=self.flavor, schema=None)
555        if primary_key
556        else None
557    )
558    autoincrement = (
559        pipe.parameters.get('autoincrement', False)
560        or (
561            primary_key is not None
562            and primary_key not in existing_cols_pd_types
563        )
564    )
565    primary_key_db_type = (
566        get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int') or 'int', self.flavor)
567        if primary_key
568        else None
569    )
570    primary_key_constraint_name = (
571        sql_item_name(f'PK_{pipe.target}', self.flavor, None)
572        if primary_key is not None
573        else None
574    )
575    primary_key_clustered = "CLUSTERED" if _datetime is None else "NONCLUSTERED"
576    datetime_clustered = (
577        "CLUSTERED"
578        if not existing_clustered_primary_keys and _datetime is not None
579        else "NONCLUSTERED"
580    )
581    include_columns_str = "\n    ,".join(
582        [
583            sql_item_name(col, flavor=self.flavor) for col in existing_cols_types
584            if col != _datetime
585        ]
586    ).rstrip(',')
587    include_clause = (
588        (
589            f"\nINCLUDE (\n    {include_columns_str}\n)"
590        )
591        if datetime_clustered == 'NONCLUSTERED'
592        else ''
593    )
594
595    _id_index_name = (
596        sql_item_name(index_names['id'], self.flavor, None)
597        if index_names.get('id', None)
598        else None
599    )
600    _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
601    _create_space_partition = get_config('system', 'experimental', 'space')
602
603    ### create datetime index
604    dt_query = None
605    if _datetime is not None:
606        if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True):
607            _id_count = (
608                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
609                if (_id is not None and _create_space_partition) else None
610            )
611
612            chunk_interval = pipe.get_chunk_interval(debug=debug)
613            chunk_interval_minutes = (
614                chunk_interval
615                if isinstance(chunk_interval, int)
616                else int(chunk_interval.total_seconds() / 60)
617            )
618            chunk_time_interval = (
619                f"INTERVAL '{chunk_interval_minutes} MINUTES'"
620                if isinstance(chunk_interval, timedelta)
621                else f'{chunk_interval_minutes}'
622            )
623
624            dt_query = (
625                f"SELECT public.create_hypertable('{_pipe_name}', " +
626                f"'{_datetime}', "
627                + (
628                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
629                    else ''
630                )
631                + f'chunk_time_interval => {chunk_time_interval}, '
632                + 'if_not_exists => true, '
633                + "migrate_data => true);"
634            )
635        elif _datetime_index_name and _datetime != primary_key:
636            if self.flavor == 'mssql':
637                dt_query = (
638                    f"CREATE {datetime_clustered} INDEX {_datetime_index_name} "
639                    f"\nON {_pipe_name} ({_datetime_name}){include_clause}"
640                )
641            else:
642                dt_query = (
643                    f"CREATE INDEX {_datetime_index_name} "
644                    + f"ON {_pipe_name} ({_datetime_name})"
645                )
646
647    if dt_query:
648        index_queries[_datetime] = [dt_query]
649
650    primary_queries = []
651    if (
652        primary_key is not None
653        and primary_key.lower() not in existing_primary_keys
654        and not static
655    ):
656        if autoincrement and primary_key not in existing_cols_pd_types:
657            autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get(
658                self.flavor,
659                AUTO_INCREMENT_COLUMN_FLAVORS['default']
660            )
661            primary_queries.extend([
662                (
663                    f"ALTER TABLE {_pipe_name}\n"
664                    f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}"
665                ),
666            ])
667        elif not autoincrement and primary_key in existing_cols_pd_types:
668            if self.flavor == 'sqlite':
669                new_table_name = sql_item_name(
670                    f'_new_{pipe.target}',
671                    self.flavor,
672                    self.get_pipe_schema(pipe)
673                )
674                select_cols_str = ', '.join(
675                    [
676                        sql_item_name(col, self.flavor, None)
677                        for col in existing_cols_types
678                    ]
679                )
680                primary_queries.extend(
681                    get_create_table_queries(
682                        existing_cols_pd_types,
683                        f'_new_{pipe.target}',
684                        self.flavor,
685                        schema=self.get_pipe_schema(pipe),
686                        primary_key=primary_key,
687                    ) + [
688                        (
689                            f"INSERT INTO {new_table_name} ({select_cols_str})\n"
690                            f"SELECT {select_cols_str}\nFROM {_pipe_name}"
691                        ),
692                        f"DROP TABLE {_pipe_name}",
693                    ] + get_rename_table_queries(
694                        f'_new_{pipe.target}',
695                        pipe.target,
696                        self.flavor,
697                        schema=self.get_pipe_schema(pipe),
698                    )
699                )
700            elif self.flavor == 'oracle':
701                primary_queries.extend([
702                    (
703                        f"ALTER TABLE {_pipe_name}\n"
704                        f"MODIFY {primary_key_name} NOT NULL"
705                    ),
706                    (
707                        f"ALTER TABLE {_pipe_name}\n"
708                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
709                    )
710                ])
711            elif self.flavor in ('mysql', 'mariadb'):
712                primary_queries.extend([
713                    (
714                        f"ALTER TABLE {_pipe_name}\n"
715                        f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL"
716                    ),
717                    (
718                        f"ALTER TABLE {_pipe_name}\n"
719                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
720                    )
721                ])
722            elif self.flavor == 'timescaledb':
723                primary_queries.extend([
724                    (
725                        f"ALTER TABLE {_pipe_name}\n"
726                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
727                    ),
728                    (
729                        f"ALTER TABLE {_pipe_name}\n"
730                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + (
731                            f"{_datetime_name}, " if _datetime_name else ""
732                        ) + f"{primary_key_name})"
733                    ),
734                ])
735            elif self.flavor in ('citus', 'postgresql', 'duckdb'):
736                primary_queries.extend([
737                    (
738                        f"ALTER TABLE {_pipe_name}\n"
739                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
740                    ),
741                    (
742                        f"ALTER TABLE {_pipe_name}\n"
743                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
744                    ),
745                ])
746            else:
747                primary_queries.extend([
748                    (
749                        f"ALTER TABLE {_pipe_name}\n"
750                        f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL"
751                    ),
752                    (
753                        f"ALTER TABLE {_pipe_name}\n"
754                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})"
755                    ),
756                ])
757        index_queries[primary_key] = primary_queries
758
759    ### create id index
760    if _id_name is not None:
761        if self.flavor == 'timescaledb':
762            ### Already created indices via create_hypertable.
763            id_query = (
764                None if (_id is not None and _create_space_partition)
765                else (
766                    f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})"
767                    if _id is not None
768                    else None
769                )
770            )
771            pass
772        else: ### mssql, sqlite, etc.
773            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"
774
775        if id_query is not None:
776            index_queries[_id] = id_query if isinstance(id_query, list) else [id_query]
777
778    ### Create indices for other labels in `pipe.columns`.
779    other_index_names = {
780        ix_key: ix_unquoted
781        for ix_key, ix_unquoted in index_names.items()
782        if (
783            ix_key not in ('datetime', 'id', 'primary')
784            and ix_unquoted.lower() not in existing_ix_names
785        )
786    }
787    for ix_key, ix_unquoted in other_index_names.items():
788        ix_name = sql_item_name(ix_unquoted, self.flavor, None)
789        cols = indices[ix_key]
790        if not isinstance(cols, (list, tuple)):
791            cols = [cols]
792        if ix_key == 'unique' and upsert:
793            continue
794        cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col]
795        if not cols_names:
796            continue
797        cols_names_str = ", ".join(cols_names)
798        index_queries[ix_key] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({cols_names_str})"]
799
800    indices_cols_str = ', '.join(
801        list({
802            sql_item_name(ix, self.flavor)
803            for ix_key, ix in pipe.columns.items()
804            if ix and ix in existing_cols_types
805        })
806    )
807    coalesce_indices_cols_str = ', '.join(
808        [
809            (
810                (
811                    "COALESCE("
812                    + sql_item_name(ix, self.flavor)
813                    + ", "
814                    + get_null_replacement(existing_cols_types[ix], self.flavor)
815                    + ") "
816                )
817                if ix_key != 'datetime' and null_indices
818                else sql_item_name(ix, self.flavor)
819            )
820            for ix_key, ix in pipe.columns.items()
821            if ix and ix in existing_cols_types
822        ]
823    )
824    unique_index_name = sql_item_name(unique_index_name_unquoted, self.flavor)
825    constraint_name_unquoted = unique_index_name_unquoted.replace('IX_', 'UQ_')
826    constraint_name = sql_item_name(constraint_name_unquoted, self.flavor)
827    add_constraint_query = (
828        f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})"
829    )
830    unique_index_cols_str = (
831        indices_cols_str
832        if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS or not null_indices
833        else coalesce_indices_cols_str
834    )
835    create_unique_index_query = (
836        f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})"
837    )
838    constraint_queries = [create_unique_index_query]
839    if self.flavor != 'sqlite':
840        constraint_queries.append(add_constraint_query)
841    if upsert and indices_cols_str:
842        index_queries[unique_index_name] = constraint_queries
843    return index_queries

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of index names mapping to lists of queries.
def get_drop_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
846def get_drop_index_queries(
847    self,
848    pipe: mrsm.Pipe,
849    debug: bool = False,
850) -> Dict[str, List[str]]:
851    """
852    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.
853
854    Parameters
855    ----------
856    pipe: mrsm.Pipe
857        The pipe to which the queries will correspond.
858
859    Returns
860    -------
861    A dictionary of column names mapping to lists of queries.
862    """
863    ### NOTE: Due to breaking changes within DuckDB, indices must be skipped.
864    if self.flavor == 'duckdb':
865        return {}
866    if not pipe.exists(debug=debug):
867        return {}
868
869    from collections import defaultdict
870    from meerschaum.utils.sql import (
871        sql_item_name,
872        table_exists,
873        hypertable_queries,
874        DROP_INDEX_IF_EXISTS_FLAVORS,
875    )
876    drop_queries = defaultdict(lambda: [])
877    schema = self.get_pipe_schema(pipe)
878    index_schema = schema if self.flavor != 'mssql' else None
879    indices = {
880        ix_key: ix
881        for ix_key, ix in pipe.get_indices().items()
882    }
883    cols_indices = pipe.get_columns_indices(debug=debug)
884    existing_indices = set()
885    clustered_ix = None
886    for col, ix_metas in cols_indices.items():
887        for ix_meta in ix_metas:
888            ix_name = ix_meta.get('name', None)
889            if ix_meta.get('clustered', False):
890                clustered_ix = ix_name
891            existing_indices.add(ix_name.lower())
892    pipe_name = sql_item_name(pipe.target, self.flavor, schema)
893    pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None)
894    upsert = pipe.upsert
895
896    if self.flavor not in hypertable_queries:
897        is_hypertable = False
898    else:
899        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
900        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None
901
902    if_exists_str = "IF EXISTS " if self.flavor in DROP_INDEX_IF_EXISTS_FLAVORS else ""
903    if is_hypertable:
904        nuke_queries = []
905        temp_table = '_' + pipe.target + '_temp_migration'
906        temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe))
907
908        if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug):
909            nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}")
910        nuke_queries += [
911            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
912            f"DROP TABLE {if_exists_str}{pipe_name}",
913            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}",
914        ]
915        nuke_ix_keys = ('datetime', 'id')
916        nuked = False
917        for ix_key in nuke_ix_keys:
918            if ix_key in indices and not nuked:
919                drop_queries[ix_key].extend(nuke_queries)
920                nuked = True
921
922    for ix_key, ix_unquoted in indices.items():
923        if ix_key in drop_queries:
924            continue
925        if ix_unquoted.lower() not in existing_indices:
926            continue
927
928        if ix_key == 'unique' and upsert and self.flavor not in ('sqlite',) and not is_hypertable:
929            constraint_name_unquoted = ix_unquoted.replace('IX_', 'UQ_')
930            constraint_name = sql_item_name(constraint_name_unquoted, self.flavor)
931            constraint_or_index = (
932                "CONSTRAINT"
933                if self.flavor not in ('mysql', 'mariadb')
934                else 'INDEX'
935            )
936            drop_queries[ix_key].append(
937                f"ALTER TABLE {pipe_name}\n"
938                f"DROP {constraint_or_index} {constraint_name}"
939            )
940
941        query = (
942            (
943                f"ALTER TABLE {pipe_name}\n"
944                if self.flavor in ('mysql', 'mariadb')
945                else ''
946            )
947            + f"DROP INDEX {if_exists_str}"
948            + sql_item_name(ix_unquoted, self.flavor, index_schema)
949        )
950        if self.flavor == 'mssql':
951            query += f"\nON {pipe_name}"
952            if ix_unquoted == clustered_ix:
953                query += "\nWITH (ONLINE = ON, MAXDOP = 4)"
954        drop_queries[ix_key].append(query)
955
956
957    return drop_queries

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of column names mapping to lists of queries.
def get_add_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', _is_db_types: bool = False, debug: bool = False) -> List[str]:
3123def get_add_columns_queries(
3124    self,
3125    pipe: mrsm.Pipe,
3126    df: Union[pd.DataFrame, Dict[str, str]],
3127    _is_db_types: bool = False,
3128    debug: bool = False,
3129) -> List[str]:
3130    """
3131    Add new null columns of the correct type to a table from a dataframe.
3132
3133    Parameters
3134    ----------
3135    pipe: mrsm.Pipe
3136        The pipe to be altered.
3137
3138    df: Union[pd.DataFrame, Dict[str, str]]
3139        The pandas DataFrame which contains new columns.
3140        If a dictionary is provided, assume it maps columns to Pandas data types.
3141
3142    _is_db_types: bool, default False
3143        If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes.
3144
3145    Returns
3146    -------
3147    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
3148    """
3149    if not pipe.exists(debug=debug):
3150        return []
3151
3152    if pipe.parameters.get('static', False):
3153        return []
3154
3155    from decimal import Decimal
3156    import copy
3157    from meerschaum.utils.sql import (
3158        sql_item_name,
3159        SINGLE_ALTER_TABLE_FLAVORS,
3160        get_table_cols_types,
3161    )
3162    from meerschaum.utils.dtypes.sql import (
3163        get_pd_type_from_db_type,
3164        get_db_type_from_pd_type,
3165    )
3166    from meerschaum.utils.misc import flatten_list
3167    table_obj = self.get_pipe_table(pipe, debug=debug)
3168    is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False
3169    if is_dask:
3170        df = df.partitions[0].compute()
3171    df_cols_types = (
3172        {
3173            col: str(typ)
3174            for col, typ in df.dtypes.items()
3175        }
3176        if not isinstance(df, dict)
3177        else copy.deepcopy(df)
3178    )
3179    if not isinstance(df, dict) and len(df.index) > 0:
3180        for col, typ in list(df_cols_types.items()):
3181            if typ != 'object':
3182                continue
3183            val = df.iloc[0][col]
3184            if isinstance(val, (dict, list)):
3185                df_cols_types[col] = 'json'
3186            elif isinstance(val, Decimal):
3187                df_cols_types[col] = 'numeric'
3188            elif isinstance(val, str):
3189                df_cols_types[col] = 'str'
3190    db_cols_types = {
3191        col: get_pd_type_from_db_type(str(typ.type))
3192        for col, typ in table_obj.columns.items()
3193    } if table_obj is not None else {
3194        col: get_pd_type_from_db_type(typ)
3195        for col, typ in get_table_cols_types(
3196            pipe.target,
3197            self,
3198            schema=self.get_pipe_schema(pipe),
3199            debug=debug,
3200        ).items()
3201    }
3202    new_cols = set(df_cols_types) - set(db_cols_types)
3203    if not new_cols:
3204        return []
3205
3206    new_cols_types = {
3207        col: get_db_type_from_pd_type(
3208            df_cols_types[col],
3209            self.flavor
3210        )
3211        for col in new_cols
3212        if col and df_cols_types.get(col, None)
3213    }
3214
3215    alter_table_query = "ALTER TABLE " + sql_item_name(
3216        pipe.target, self.flavor, self.get_pipe_schema(pipe)
3217    )
3218    queries = []
3219    for col, typ in new_cols_types.items():
3220        add_col_query = (
3221            "\nADD "
3222            + sql_item_name(col, self.flavor, None)
3223            + " " + typ + ","
3224        )
3225
3226        if self.flavor in SINGLE_ALTER_TABLE_FLAVORS:
3227            queries.append(alter_table_query + add_col_query[:-1])
3228        else:
3229            alter_table_query += add_col_query
3230
3231    ### For most flavors, only one query is required.
3232    ### This covers SQLite which requires one query per column.
3233    if not queries:
3234        queries.append(alter_table_query[:-1])
3235
3236    if self.flavor != 'duckdb':
3237        return queries
3238
3239    ### NOTE: For DuckDB, we must drop and rebuild the indices.
3240    drop_index_queries = list(flatten_list(
3241        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
3242    ))
3243    create_index_queries = list(flatten_list(
3244        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
3245    ))
3246
3247    return drop_index_queries + queries + create_index_queries

Add new null columns of the correct type to a table from a dataframe.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
  • _is_db_types (bool, default False): If True, assume df is a dictionary mapping columns to SQL native dtypes.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def get_alter_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', debug: bool = False) -> List[str]:
3250def get_alter_columns_queries(
3251    self,
3252    pipe: mrsm.Pipe,
3253    df: Union[pd.DataFrame, Dict[str, str]],
3254    debug: bool = False,
3255) -> List[str]:
3256    """
3257    If we encounter a column of a different type, set the entire column to text.
3258    If the altered columns are numeric, alter to numeric instead.
3259
3260    Parameters
3261    ----------
3262    pipe: mrsm.Pipe
3263        The pipe to be altered.
3264
3265    df: Union[pd.DataFrame, Dict[str, str]]
3266        The pandas DataFrame which may contain altered columns.
3267        If a dict is provided, assume it maps columns to Pandas data types.
3268
3269    Returns
3270    -------
3271    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
3272    """
3273    if not pipe.exists(debug=debug):
3274        return []
3275    if pipe.static:
3276        return
3277    from meerschaum.utils.sql import (
3278        sql_item_name,
3279        get_table_cols_types,
3280        DROP_IF_EXISTS_FLAVORS,
3281        SINGLE_ALTER_TABLE_FLAVORS,
3282    )
3283    from meerschaum.utils.dataframe import get_numeric_cols
3284    from meerschaum.utils.dtypes import are_dtypes_equal
3285    from meerschaum.utils.dtypes.sql import (
3286        get_pd_type_from_db_type,
3287        get_db_type_from_pd_type,
3288    )
3289    from meerschaum.utils.misc import flatten_list, generate_password, items_str
3290    table_obj = self.get_pipe_table(pipe, debug=debug)
3291    target = pipe.target
3292    session_id = generate_password(3)
3293    numeric_cols = (
3294        get_numeric_cols(df)
3295        if not isinstance(df, dict)
3296        else [
3297            col
3298            for col, typ in df.items()
3299            if typ.startswith('numeric')
3300        ]
3301    )
3302    df_cols_types = (
3303        {
3304            col: str(typ)
3305            for col, typ in df.dtypes.items()
3306        }
3307        if not isinstance(df, dict)
3308        else df
3309    )
3310    db_cols_types = {
3311        col: get_pd_type_from_db_type(str(typ.type))
3312        for col, typ in table_obj.columns.items()
3313    } if table_obj is not None else {
3314        col: get_pd_type_from_db_type(typ)
3315        for col, typ in get_table_cols_types(
3316            pipe.target,
3317            self,
3318            schema=self.get_pipe_schema(pipe),
3319            debug=debug,
3320        ).items()
3321    }
3322    pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')]
3323    pd_db_df_aliases = {
3324        'int': 'bool',
3325        'float': 'bool',
3326        'numeric': 'bool',
3327        'guid': 'object',
3328    }
3329    if self.flavor == 'oracle':
3330        pd_db_df_aliases['int'] = 'numeric'
3331
3332    altered_cols = {
3333        col: (db_cols_types.get(col, 'object'), typ)
3334        for col, typ in df_cols_types.items()
3335        if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower())
3336        and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string')
3337    }
3338
3339    ### NOTE: Sometimes bools are coerced into ints or floats.
3340    altered_cols_to_ignore = set()
3341    for col, (db_typ, df_typ) in altered_cols.items():
3342        for db_alias, df_alias in pd_db_df_aliases.items():
3343            if db_alias in db_typ.lower() and df_alias in df_typ.lower():
3344                altered_cols_to_ignore.add(col)
3345
3346    ### Oracle's bool handling sometimes mixes NUMBER and INT.
3347    for bool_col in pipe_bool_cols:
3348        if bool_col not in altered_cols:
3349            continue
3350        db_is_bool_compatible = (
3351            are_dtypes_equal('int', altered_cols[bool_col][0])
3352            or are_dtypes_equal('float', altered_cols[bool_col][0])
3353            or are_dtypes_equal('numeric', altered_cols[bool_col][0])
3354            or are_dtypes_equal('bool', altered_cols[bool_col][0])
3355        )
3356        df_is_bool_compatible = (
3357            are_dtypes_equal('int', altered_cols[bool_col][1])
3358            or are_dtypes_equal('float', altered_cols[bool_col][1])
3359            or are_dtypes_equal('numeric', altered_cols[bool_col][1])
3360            or are_dtypes_equal('bool', altered_cols[bool_col][1])
3361        )
3362        if db_is_bool_compatible and df_is_bool_compatible:
3363            altered_cols_to_ignore.add(bool_col)
3364
3365    for col in altered_cols_to_ignore:
3366        _ = altered_cols.pop(col, None)
3367    if not altered_cols:
3368        return []
3369
3370    if numeric_cols:
3371        pipe.dtypes.update({col: 'numeric' for col in numeric_cols})
3372        edit_success, edit_msg = pipe.edit(debug=debug)
3373        if not edit_success:
3374            warn(
3375                f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n"
3376                + f"{edit_msg}"
3377            )
3378    else:
3379        numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ.startswith('numeric')])
3380
3381    numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False)
3382    text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False)
3383    altered_cols_types = {
3384        col: (
3385            numeric_type
3386            if col in numeric_cols
3387            else text_type
3388        )
3389        for col, (db_typ, typ) in altered_cols.items()
3390    }
3391
3392    if self.flavor == 'sqlite':
3393        temp_table_name = '-' + session_id + '_' + target
3394        rename_query = (
3395            "ALTER TABLE "
3396            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3397            + " RENAME TO "
3398            + sql_item_name(temp_table_name, self.flavor, None)
3399        )
3400        create_query = (
3401            "CREATE TABLE "
3402            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3403            + " (\n"
3404        )
3405        for col_name, col_obj in table_obj.columns.items():
3406            create_query += (
3407                sql_item_name(col_name, self.flavor, None)
3408                + " "
3409                + (
3410                    str(col_obj.type)
3411                    if col_name not in altered_cols
3412                    else altered_cols_types[col_name]
3413                )
3414                + ",\n"
3415            )
3416        create_query = create_query[:-2] + "\n)"
3417
3418        insert_query = (
3419            "INSERT INTO "
3420            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3421            + ' ('
3422            + ', '.join([
3423                sql_item_name(col_name, self.flavor, None)
3424                for col_name, _ in table_obj.columns.items()
3425            ])
3426            + ')'
3427            + "\nSELECT\n"
3428        )
3429        for col_name, col_obj in table_obj.columns.items():
3430            new_col_str = (
3431                sql_item_name(col_name, self.flavor, None)
3432                if col_name not in altered_cols
3433                else (
3434                    "CAST("
3435                    + sql_item_name(col_name, self.flavor, None)
3436                    + " AS "
3437                    + altered_cols_types[col_name]
3438                    + ")"
3439                )
3440            )
3441            insert_query += new_col_str + ",\n"
3442        insert_query = insert_query[:-2] + (
3443            f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}"
3444        )
3445
3446        if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
3447
3448        drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name(
3449            temp_table_name, self.flavor, self.get_pipe_schema(pipe)
3450        )
3451        return [
3452            rename_query,
3453            create_query,
3454            insert_query,
3455            drop_query,
3456        ]
3457
3458    queries = []
3459    if self.flavor == 'oracle':
3460        for col, typ in altered_cols_types.items():
3461            add_query = (
3462                "ALTER TABLE "
3463                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3464                + "\nADD " + sql_item_name(col + '_temp', self.flavor, None)
3465                + " " + typ
3466            )
3467            queries.append(add_query)
3468
3469        for col, typ in altered_cols_types.items():
3470            populate_temp_query = (
3471                "UPDATE "
3472                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3473                + "\nSET " + sql_item_name(col + '_temp', self.flavor, None)
3474                + ' = ' + sql_item_name(col, self.flavor, None)
3475            )
3476            queries.append(populate_temp_query)
3477
3478        for col, typ in altered_cols_types.items():
3479            set_old_cols_to_null_query = (
3480                "UPDATE "
3481                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3482                + "\nSET " + sql_item_name(col, self.flavor, None)
3483                + ' = NULL'
3484            )
3485            queries.append(set_old_cols_to_null_query)
3486
3487        for col, typ in altered_cols_types.items():
3488            alter_type_query = (
3489                "ALTER TABLE "
3490                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3491                + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' '
3492                + typ
3493            )
3494            queries.append(alter_type_query)
3495
3496        for col, typ in altered_cols_types.items():
3497            set_old_to_temp_query = (
3498                "UPDATE "
3499                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3500                + "\nSET " + sql_item_name(col, self.flavor, None)
3501                + ' = ' + sql_item_name(col + '_temp', self.flavor, None)
3502            )
3503            queries.append(set_old_to_temp_query)
3504
3505        for col, typ in altered_cols_types.items():
3506            drop_temp_query = (
3507                "ALTER TABLE "
3508                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3509                + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None)
3510            )
3511            queries.append(drop_temp_query)
3512
3513        return queries
3514
3515    query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3516    for col, typ in altered_cols_types.items():
3517        alter_col_prefix = (
3518            'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle')
3519            else 'MODIFY'
3520        )
3521        type_prefix = (
3522            '' if self.flavor in ('mssql', 'mariadb', 'mysql')
3523            else 'TYPE '
3524        )
3525        column_str = 'COLUMN' if self.flavor != 'oracle' else ''
3526        query_suffix = (
3527            f"\n{alter_col_prefix} {column_str} "
3528            + sql_item_name(col, self.flavor, None)
3529            + " " + type_prefix + typ + ","
3530        )
3531        if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS:
3532            query += query_suffix
3533        else:
3534            queries.append(query + query_suffix[:-1])
3535
3536    if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS:
3537        queries.append(query[:-1])
3538
3539    if self.flavor != 'duckdb':
3540        return queries
3541
3542    drop_index_queries = list(flatten_list(
3543        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
3544    ))
3545    create_index_queries = list(flatten_list(
3546        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
3547    ))
3548
3549    return drop_index_queries + queries + create_index_queries

If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def delete_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
960def delete_pipe(
961    self,
962    pipe: mrsm.Pipe,
963    debug: bool = False,
964) -> SuccessTuple:
965    """
966    Delete a Pipe's registration.
967    """
968    from meerschaum.utils.packages import attempt_import
969    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
970
971    if not pipe.id:
972        return False, f"{pipe} is not registered."
973
974    ### ensure pipes table exists
975    from meerschaum.connectors.sql.tables import get_tables
976    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
977
978    q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
979    if not self.exec(q, debug=debug):
980        return False, f"Failed to delete registration for {pipe}."
981
982    return True, "Success"

Delete a Pipe's registration.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, str, NoneType] = None, end: Union[datetime.datetime, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any) -> 'Union[pd.DataFrame, None]':
 985def get_pipe_data(
 986    self,
 987    pipe: mrsm.Pipe,
 988    select_columns: Optional[List[str]] = None,
 989    omit_columns: Optional[List[str]] = None,
 990    begin: Union[datetime, str, None] = None,
 991    end: Union[datetime, str, None] = None,
 992    params: Optional[Dict[str, Any]] = None,
 993    order: str = 'asc',
 994    limit: Optional[int] = None,
 995    begin_add_minutes: int = 0,
 996    end_add_minutes: int = 0,
 997    debug: bool = False,
 998    **kw: Any
 999) -> Union[pd.DataFrame, None]:
1000    """
1001    Access a pipe's data from the SQL instance.
1002
1003    Parameters
1004    ----------
1005    pipe: mrsm.Pipe:
1006        The pipe to get data from.
1007
1008    select_columns: Optional[List[str]], default None
1009        If provided, only select these given columns.
1010        Otherwise select all available columns (i.e. `SELECT *`).
1011
1012    omit_columns: Optional[List[str]], default None
1013        If provided, remove these columns from the selection.
1014
1015    begin: Union[datetime, str, None], default None
1016        If provided, get rows newer than or equal to this value.
1017
1018    end: Union[datetime, str, None], default None
1019        If provided, get rows older than or equal to this value.
1020
1021    params: Optional[Dict[str, Any]], default None
1022        Additional parameters to filter by.
1023        See `meerschaum.connectors.sql.build_where`.
1024
1025    order: Optional[str], default 'asc'
1026        The selection order for all of the indices in the query.
1027        If `None`, omit the `ORDER BY` clause.
1028
1029    limit: Optional[int], default None
1030        If specified, limit the number of rows retrieved to this value.
1031
1032    begin_add_minutes: int, default 0
1033        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`.
1034
1035    end_add_minutes: int, default 0
1036        The number of minutes to add to the `end` datetime (i.e. `DATEADD`.
1037
1038    chunksize: Optional[int], default -1
1039        The size of dataframe chunks to load into memory.
1040
1041    debug: bool, default False
1042        Verbosity toggle.
1043
1044    Returns
1045    -------
1046    A `pd.DataFrame` of the pipe's data.
1047
1048    """
1049    import json
1050    from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype
1051    from meerschaum.utils.packages import import_pandas
1052    from meerschaum.utils.dtypes import (
1053        attempt_cast_to_numeric,
1054        attempt_cast_to_uuid,
1055        attempt_cast_to_bytes,
1056        are_dtypes_equal,
1057    )
1058    from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
1059    pd = import_pandas()
1060    is_dask = 'dask' in pd.__name__
1061
1062    cols_types = pipe.get_columns_types(debug=debug) if pipe.enforce else {}
1063    dtypes = {
1064        **{
1065            p_col: to_pandas_dtype(p_typ)
1066            for p_col, p_typ in pipe.dtypes.items()
1067        },
1068        **{
1069            col: get_pd_type_from_db_type(typ)
1070            for col, typ in cols_types.items()
1071        }
1072    } if pipe.enforce else {}
1073    if dtypes:
1074        if self.flavor == 'sqlite':
1075            if not pipe.columns.get('datetime', None):
1076                _dt = pipe.guess_datetime()
1077            else:
1078                _dt = pipe.get_columns('datetime')
1079
1080            if _dt:
1081                dt_type = dtypes.get(_dt, 'object').lower()
1082                if 'datetime' not in dt_type:
1083                    if 'int' not in dt_type:
1084                        dtypes[_dt] = 'datetime64[ns, UTC]'
1085
1086    existing_cols = cols_types.keys()
1087    select_columns = (
1088        [
1089            col
1090            for col in existing_cols
1091            if col not in (omit_columns or [])
1092        ]
1093        if not select_columns
1094        else [
1095            col
1096            for col in select_columns
1097            if col in existing_cols
1098            and col not in (omit_columns or [])
1099        ]
1100    ) if pipe.enforce else select_columns
1101    if select_columns:
1102        dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns}
1103    dtypes = {
1104        col: to_pandas_dtype(typ)
1105        for col, typ in dtypes.items()
1106        if col in select_columns and col not in (omit_columns or [])
1107    } if pipe.enforce else {}
1108    query = self.get_pipe_data_query(
1109        pipe,
1110        select_columns=select_columns,
1111        omit_columns=omit_columns,
1112        begin=begin,
1113        end=end,
1114        params=params,
1115        order=order,
1116        limit=limit,
1117        begin_add_minutes=begin_add_minutes,
1118        end_add_minutes=end_add_minutes,
1119        debug=debug,
1120        **kw
1121    )
1122
1123    if is_dask:
1124        index_col = pipe.columns.get('datetime', None)
1125        kw['index_col'] = index_col
1126
1127    numeric_columns = [
1128        col
1129        for col, typ in pipe.dtypes.items()
1130        if typ.startswith('numeric') and col in dtypes
1131    ]
1132    uuid_columns = [
1133        col
1134        for col, typ in pipe.dtypes.items()
1135        if typ == 'uuid' and col in dtypes
1136    ]
1137    bytes_columns = [
1138        col
1139        for col, typ in pipe.dtypes.items()
1140        if typ == 'bytes' and col in dtypes
1141    ]
1142
1143    kw['coerce_float'] = kw.get('coerce_float', (len(numeric_columns) == 0))
1144
1145    df = self.read(
1146        query,
1147        dtype=dtypes,
1148        debug=debug,
1149        **kw
1150    )
1151    for col in numeric_columns:
1152        if col not in df.columns:
1153            continue
1154        df[col] = df[col].apply(attempt_cast_to_numeric)
1155
1156    for col in uuid_columns:
1157        if col not in df.columns:
1158            continue
1159        df[col] = df[col].apply(attempt_cast_to_uuid)
1160
1161    for col in bytes_columns:
1162        if col not in df.columns:
1163            continue
1164        df[col] = df[col].apply(attempt_cast_to_bytes)
1165
1166    if self.flavor == 'sqlite':
1167        ignore_dt_cols = [
1168            col
1169            for col, dtype in pipe.dtypes.items()
1170            if not are_dtypes_equal(str(dtype), 'datetime')
1171        ]
1172        ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly
1173        df = (
1174            parse_df_datetimes(
1175                df,
1176                ignore_cols=ignore_dt_cols,
1177                chunksize=kw.get('chunksize', None),
1178                strip_timezone=(pipe.tzinfo is None),
1179                debug=debug,
1180            ) if isinstance(df, pd.DataFrame) else (
1181                [
1182                    parse_df_datetimes(
1183                        c,
1184                        ignore_cols=ignore_dt_cols,
1185                        chunksize=kw.get('chunksize', None),
1186                        strip_timezone=(pipe.tzinfo is None),
1187                        debug=debug,
1188                    )
1189                    for c in df
1190                ]
1191            )
1192        )
1193        for col, typ in dtypes.items():
1194            if typ != 'json':
1195                continue
1196            df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x)
1197    return df

Access a pipe's data from the SQL instance.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get data from.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
  • end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
  • params (Optional[Dict[str, Any]], default None): Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
  • order (Optional[str], default 'asc'): The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
  • limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
  • begin_add_minutes (int, default 0): The number of minutes to add to the begin datetime (i.e. DATEADD.
  • end_add_minutes (int, default 0): The number of minutes to add to the end datetime (i.e. DATEADD.
  • chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the pipe's data.
def get_pipe_data_query( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: Optional[str] = 'asc', sort_datetimes: bool = False, limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, skip_existing_cols_check: bool = False, debug: bool = False, **kw: Any) -> Optional[str]:
1200def get_pipe_data_query(
1201    self,
1202    pipe: mrsm.Pipe,
1203    select_columns: Optional[List[str]] = None,
1204    omit_columns: Optional[List[str]] = None,
1205    begin: Union[datetime, int, str, None] = None,
1206    end: Union[datetime, int, str, None] = None,
1207    params: Optional[Dict[str, Any]] = None,
1208    order: Optional[str] = 'asc',
1209    sort_datetimes: bool = False,
1210    limit: Optional[int] = None,
1211    begin_add_minutes: int = 0,
1212    end_add_minutes: int = 0,
1213    replace_nulls: Optional[str] = None,
1214    skip_existing_cols_check: bool = False,
1215    debug: bool = False,
1216    **kw: Any
1217) -> Union[str, None]:
1218    """
1219    Return the `SELECT` query for retrieving a pipe's data from its instance.
1220
1221    Parameters
1222    ----------
1223    pipe: mrsm.Pipe:
1224        The pipe to get data from.
1225
1226    select_columns: Optional[List[str]], default None
1227        If provided, only select these given columns.
1228        Otherwise select all available columns (i.e. `SELECT *`).
1229
1230    omit_columns: Optional[List[str]], default None
1231        If provided, remove these columns from the selection.
1232
1233    begin: Union[datetime, int, str, None], default None
1234        If provided, get rows newer than or equal to this value.
1235
1236    end: Union[datetime, str, None], default None
1237        If provided, get rows older than or equal to this value.
1238
1239    params: Optional[Dict[str, Any]], default None
1240        Additional parameters to filter by.
1241        See `meerschaum.connectors.sql.build_where`.
1242
1243    order: Optional[str], default None
1244        The selection order for all of the indices in the query.
1245        If `None`, omit the `ORDER BY` clause.
1246
1247    sort_datetimes: bool, default False
1248        Alias for `order='desc'`.
1249
1250    limit: Optional[int], default None
1251        If specified, limit the number of rows retrieved to this value.
1252
1253    begin_add_minutes: int, default 0
1254        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`).
1255
1256    end_add_minutes: int, default 0
1257        The number of minutes to add to the `end` datetime (i.e. `DATEADD`).
1258
1259    chunksize: Optional[int], default -1
1260        The size of dataframe chunks to load into memory.
1261
1262    replace_nulls: Optional[str], default None
1263        If provided, replace null values with this value.
1264
1265    skip_existing_cols_check: bool, default False
1266        If `True`, do not verify that querying columns are actually on the table.
1267
1268    debug: bool, default False
1269        Verbosity toggle.
1270
1271    Returns
1272    -------
1273    A `SELECT` query to retrieve a pipe's data.
1274    """
1275    from meerschaum.utils.misc import items_str
1276    from meerschaum.utils.sql import sql_item_name, dateadd_str
1277    from meerschaum.utils.dtypes import coerce_timezone
1278    from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type, get_db_type_from_pd_type
1279
1280    dt_col = pipe.columns.get('datetime', None)
1281    existing_cols = pipe.get_columns_types(debug=debug) if pipe.enforce else []
1282    skip_existing_cols_check = skip_existing_cols_check or not pipe.enforce
1283    dt_typ = get_pd_type_from_db_type(existing_cols[dt_col]) if dt_col in existing_cols else None
1284    dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
1285    select_columns = (
1286        [col for col in existing_cols]
1287        if not select_columns
1288        else [col for col in select_columns if skip_existing_cols_check or col in existing_cols]
1289    )
1290    if omit_columns:
1291        select_columns = [col for col in select_columns if col not in omit_columns]
1292
1293    if order is None and sort_datetimes:
1294        order = 'desc'
1295
1296    if begin == '':
1297        begin = pipe.get_sync_time(debug=debug)
1298        backtrack_interval = pipe.get_backtrack_interval(debug=debug)
1299        if begin is not None:
1300            begin -= backtrack_interval
1301
1302    begin, end = pipe.parse_date_bounds(begin, end)
1303    if isinstance(begin, datetime) and dt_typ:
1304        begin = coerce_timezone(begin, strip_utc=('utc' not in dt_typ.lower()))
1305    if isinstance(end, datetime) and dt_typ:
1306        end = coerce_timezone(end, strip_utc=('utc' not in dt_typ.lower()))
1307
1308    cols_names = [
1309        sql_item_name(col, self.flavor, None)
1310        for col in select_columns
1311    ]
1312    select_cols_str = (
1313        'SELECT\n    '
1314        + ',\n    '.join(
1315            [
1316                (
1317                    col_name
1318                    if not replace_nulls
1319                    else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}"
1320                )
1321                for col_name in cols_names
1322            ]
1323        )
1324    ) if cols_names else 'SELECT *'
1325    pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
1326    query = f"{select_cols_str}\nFROM {pipe_table_name}"
1327    where = ""
1328
1329    if order is not None:
1330        default_order = 'asc'
1331        if order not in ('asc', 'desc'):
1332            warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.")
1333            order = default_order
1334        order = order.upper()
1335
1336    if not pipe.columns.get('datetime', None):
1337        _dt = pipe.guess_datetime()
1338        dt = sql_item_name(_dt, self.flavor, None) if _dt else None
1339        is_guess = True
1340    else:
1341        _dt = pipe.get_columns('datetime')
1342        dt = sql_item_name(_dt, self.flavor, None)
1343        is_guess = False
1344
1345    quoted_indices = {
1346        key: sql_item_name(val, self.flavor, None)
1347        for key, val in pipe.columns.items()
1348        if val in existing_cols or skip_existing_cols_check
1349    }
1350
1351    if begin is not None or end is not None:
1352        if is_guess:
1353            if _dt is None:
1354                warn(
1355                    f"No datetime could be determined for {pipe}."
1356                    + "\n    Ignoring begin and end...",
1357                    stack=False,
1358                )
1359                begin, end = None, None
1360            else:
1361                warn(
1362                    f"A datetime wasn't specified for {pipe}.\n"
1363                    + f"    Using column \"{_dt}\" for datetime bounds...",
1364                    stack=False,
1365                )
1366
1367    is_dt_bound = False
1368    if begin is not None and (_dt in existing_cols or skip_existing_cols_check):
1369        begin_da = dateadd_str(
1370            flavor=self.flavor,
1371            datepart='minute',
1372            number=begin_add_minutes,
1373            begin=begin,
1374            db_type=dt_db_type,
1375        )
1376        where += f"\n    {dt} >= {begin_da}" + ("\n    AND\n    " if end is not None else "")
1377        is_dt_bound = True
1378
1379    if end is not None and (_dt in existing_cols or skip_existing_cols_check):
1380        if 'int' in str(type(end)).lower() and end == begin:
1381            end += 1
1382        end_da = dateadd_str(
1383            flavor=self.flavor,
1384            datepart='minute',
1385            number=end_add_minutes,
1386            begin=end,
1387            db_type=dt_db_type,
1388        )
1389        where += f"{dt} <  {end_da}"
1390        is_dt_bound = True
1391
1392    if params is not None:
1393        from meerschaum.utils.sql import build_where
1394        valid_params = {
1395            k: v
1396            for k, v in params.items()
1397            if k in existing_cols or skip_existing_cols_check
1398        }
1399        if valid_params:
1400            where += build_where(valid_params, self).replace(
1401                'WHERE', ('    AND' if is_dt_bound else "    ")
1402            )
1403
1404    if len(where) > 0:
1405        query += "\nWHERE " + where
1406
1407    if order is not None:
1408        ### Sort by indices, starting with datetime.
1409        order_by = ""
1410        if quoted_indices:
1411            order_by += "\nORDER BY "
1412            if _dt and (_dt in existing_cols or skip_existing_cols_check):
1413                order_by += dt + ' ' + order + ','
1414            for key, quoted_col_name in quoted_indices.items():
1415                if dt == quoted_col_name:
1416                    continue
1417                order_by += ' ' + quoted_col_name + ' ' + order + ','
1418            order_by = order_by[:-1]
1419
1420        query += order_by
1421
1422    if isinstance(limit, int):
1423        if self.flavor == 'mssql':
1424            query = f'SELECT TOP {limit}\n' + query[len("SELECT "):]
1425        elif self.flavor == 'oracle':
1426            query = (
1427                f"SELECT * FROM (\n  {query}\n)\n"
1428                + f"WHERE ROWNUM IN ({', '.join([str(i) for i in range(1, limit+1)])})"
1429            )
1430        else:
1431            query += f"\nLIMIT {limit}"
1432
1433    if debug:
1434        to_print = (
1435            []
1436            + ([f"begin='{begin}'"] if begin else [])
1437            + ([f"end='{end}'"] if end else [])
1438            + ([f"params={params}"] if params else [])
1439        )
1440        dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False))
1441
1442    return query

Return the SELECT query for retrieving a pipe's data from its instance.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get data from.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, str, None], default None): If provided, get rows newer than or equal to this value.
  • end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
  • params (Optional[Dict[str, Any]], default None): Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
  • order (Optional[str], default None): The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
  • sort_datetimes (bool, default False): Alias for order='desc'.
  • limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
  • begin_add_minutes (int, default 0): The number of minutes to add to the begin datetime (i.e. DATEADD).
  • end_add_minutes (int, default 0): The number of minutes to add to the end datetime (i.e. DATEADD).
  • chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
  • replace_nulls (Optional[str], default None): If provided, replace null values with this value.
  • skip_existing_cols_check (bool, default False): If True, do not verify that querying columns are actually on the table.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SELECT query to retrieve a pipe's data.
def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
20def register_pipe(
21    self,
22    pipe: mrsm.Pipe,
23    debug: bool = False,
24) -> SuccessTuple:
25    """
26    Register a new pipe.
27    A pipe's attributes must be set before registering.
28    """
29    from meerschaum.utils.debug import dprint
30    from meerschaum.utils.packages import attempt_import
31    from meerschaum.utils.sql import json_flavors
32
33    ### ensure pipes table exists
34    from meerschaum.connectors.sql.tables import get_tables
35    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
36
37    if pipe.get_id(debug=debug) is not None:
38        return False, f"{pipe} is already registered."
39
40    ### NOTE: if `parameters` is supplied in the Pipe constructor,
41    ###       then `pipe.parameters` will exist and not be fetched from the database.
42
43    ### 1. Prioritize the Pipe object's `parameters` first.
44    ###    E.g. if the user manually sets the `parameters` property
45    ###    or if the Pipe already exists
46    ###    (which shouldn't be able to be registered anyway but that's an issue for later).
47    parameters = None
48    try:
49        parameters = pipe.parameters
50    except Exception as e:
51        if debug:
52            dprint(str(e))
53        parameters = None
54
55    ### ensure `parameters` is a dictionary
56    if parameters is None:
57        parameters = {}
58
59    import json
60    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
61    values = {
62        'connector_keys' : pipe.connector_keys,
63        'metric_key'     : pipe.metric_key,
64        'location_key'   : pipe.location_key,
65        'parameters'     : (
66            json.dumps(parameters)
67            if self.flavor not in json_flavors
68            else parameters
69        ),
70    }
71    query = sqlalchemy.insert(pipes_tbl).values(**values)
72    result = self.exec(query, debug=debug)
73    if result is None:
74        return False, f"Failed to register {pipe}."
75    return True, f"Successfully registered {pipe}."

Register a new pipe. A pipe's attributes must be set before registering.

def edit_pipe( self, pipe: meerschaum.Pipe = None, patch: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 78def edit_pipe(
 79    self,
 80    pipe : mrsm.Pipe = None,
 81    patch: bool = False,
 82    debug: bool = False,
 83    **kw : Any
 84) -> SuccessTuple:
 85    """
 86    Persist a Pipe's parameters to its database.
 87
 88    Parameters
 89    ----------
 90    pipe: mrsm.Pipe, default None
 91        The pipe to be edited.
 92    patch: bool, default False
 93        If patch is `True`, update the existing parameters by cascading.
 94        Otherwise overwrite the parameters (default).
 95    debug: bool, default False
 96        Verbosity toggle.
 97    """
 98
 99    if pipe.id is None:
100        return False, f"{pipe} is not registered and cannot be edited."
101
102    from meerschaum.utils.packages import attempt_import
103    from meerschaum.utils.sql import json_flavors
104    if not patch:
105        parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {})
106    else:
107        from meerschaum import Pipe
108        from meerschaum.config._patch import apply_patch_to_config
109        original_parameters = Pipe(
110            pipe.connector_keys, pipe.metric_key, pipe.location_key,
111            mrsm_instance=pipe.instance_keys
112        ).parameters
113        parameters = apply_patch_to_config(
114            original_parameters,
115            pipe.parameters
116        )
117
118    ### ensure pipes table exists
119    from meerschaum.connectors.sql.tables import get_tables
120    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
121
122    import json
123    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
124
125    values = {
126        'parameters': (
127            json.dumps(parameters)
128            if self.flavor not in json_flavors
129            else parameters
130        ),
131    }
132    q = sqlalchemy.update(pipes_tbl).values(**values).where(
133        pipes_tbl.c.pipe_id == pipe.id
134    )
135
136    result = self.exec(q, debug=debug)
137    message = (
138        f"Successfully edited {pipe}."
139        if result is not None else f"Failed to edit {pipe}."
140    )
141    return (result is not None), message

Persist a Pipe's parameters to its database.

Parameters
  • pipe (mrsm.Pipe, default None): The pipe to be edited.
  • patch (bool, default False): If patch is True, update the existing parameters by cascading. Otherwise overwrite the parameters (default).
  • debug (bool, default False): Verbosity toggle.
def get_pipe_id(self, pipe: meerschaum.Pipe, debug: bool = False) -> Any:
1445def get_pipe_id(
1446    self,
1447    pipe: mrsm.Pipe,
1448    debug: bool = False,
1449) -> Any:
1450    """
1451    Get a Pipe's ID from the pipes table.
1452    """
1453    if pipe.temporary:
1454        return None
1455    from meerschaum.utils.packages import attempt_import
1456    sqlalchemy = attempt_import('sqlalchemy')
1457    from meerschaum.connectors.sql.tables import get_tables
1458    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
1459
1460    query = sqlalchemy.select(pipes_tbl.c.pipe_id).where(
1461        pipes_tbl.c.connector_keys == pipe.connector_keys
1462    ).where(
1463        pipes_tbl.c.metric_key == pipe.metric_key
1464    ).where(
1465        (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None
1466        else pipes_tbl.c.location_key.is_(None)
1467    )
1468    _id = self.value(query, debug=debug, silent=pipe.temporary)
1469    if _id is not None:
1470        _id = int(_id)
1471    return _id

Get a Pipe's ID from the pipes table.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
1474def get_pipe_attributes(
1475    self,
1476    pipe: mrsm.Pipe,
1477    debug: bool = False,
1478) -> Dict[str, Any]:
1479    """
1480    Get a Pipe's attributes dictionary.
1481    """
1482    from meerschaum.connectors.sql.tables import get_tables
1483    from meerschaum.utils.packages import attempt_import
1484    sqlalchemy = attempt_import('sqlalchemy')
1485
1486    if pipe.get_id(debug=debug) is None:
1487        return {}
1488
1489    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
1490
1491    try:
1492        q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
1493        if debug:
1494            dprint(q)
1495        attributes = (
1496            dict(self.exec(q, silent=True, debug=debug).first()._mapping)
1497            if self.flavor != 'duckdb'
1498            else self.read(q, debug=debug).to_dict(orient='records')[0]
1499        )
1500    except Exception as e:
1501        import traceback
1502        traceback.print_exc()
1503        warn(e)
1504        print(pipe)
1505        return {}
1506
1507    ### handle non-PostgreSQL databases (text vs JSON)
1508    if not isinstance(attributes.get('parameters', None), dict):
1509        try:
1510            import json
1511            parameters = json.loads(attributes['parameters'])
1512            if isinstance(parameters, str) and parameters[0] == '{':
1513                parameters = json.loads(parameters)
1514            attributes['parameters'] = parameters
1515        except Exception as e:
1516            attributes['parameters'] = {}
1517
1518    return attributes

Get a Pipe's attributes dictionary.

def sync_pipe( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, str, Dict[Any, Any], None]' = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, _check_temporary_tables: bool = True, **kw: Any) -> Tuple[bool, str]:
1615def sync_pipe(
1616    self,
1617    pipe: mrsm.Pipe,
1618    df: Union[pd.DataFrame, str, Dict[Any, Any], None] = None,
1619    begin: Optional[datetime] = None,
1620    end: Optional[datetime] = None,
1621    chunksize: Optional[int] = -1,
1622    check_existing: bool = True,
1623    blocking: bool = True,
1624    debug: bool = False,
1625    _check_temporary_tables: bool = True,
1626    **kw: Any
1627) -> SuccessTuple:
1628    """
1629    Sync a pipe using a database connection.
1630
1631    Parameters
1632    ----------
1633    pipe: mrsm.Pipe
1634        The Meerschaum Pipe instance into which to sync the data.
1635
1636    df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
1637        An optional DataFrame or equivalent to sync into the pipe.
1638        Defaults to `None`.
1639
1640    begin: Optional[datetime], default None
1641        Optionally specify the earliest datetime to search for data.
1642        Defaults to `None`.
1643
1644    end: Optional[datetime], default None
1645        Optionally specify the latest datetime to search for data.
1646        Defaults to `None`.
1647
1648    chunksize: Optional[int], default -1
1649        Specify the number of rows to sync per chunk.
1650        If `-1`, resort to system configuration (default is `900`).
1651        A `chunksize` of `None` will sync all rows in one transaction.
1652        Defaults to `-1`.
1653
1654    check_existing: bool, default True
1655        If `True`, pull and diff with existing data from the pipe. Defaults to `True`.
1656
1657    blocking: bool, default True
1658        If `True`, wait for sync to finish and return its result, otherwise asyncronously sync.
1659        Defaults to `True`.
1660
1661    debug: bool, default False
1662        Verbosity toggle. Defaults to False.
1663
1664    kw: Any
1665        Catch-all for keyword arguments.
1666
1667    Returns
1668    -------
1669    A `SuccessTuple` of success (`bool`) and message (`str`).
1670    """
1671    from meerschaum.utils.packages import import_pandas
1672    from meerschaum.utils.sql import (
1673        get_update_queries,
1674        sql_item_name,
1675        UPDATE_QUERIES,
1676        get_reset_autoincrement_queries,
1677    )
1678    from meerschaum.utils.dtypes import are_dtypes_equal
1679    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
1680    from meerschaum import Pipe
1681    import time
1682    import copy
1683    pd = import_pandas()
1684    if df is None:
1685        msg = f"DataFrame is None. Cannot sync {pipe}."
1686        warn(msg)
1687        return False, msg
1688
1689    start = time.perf_counter()
1690    pipe_name = sql_item_name(pipe.target, self.flavor, schema=self.get_pipe_schema(pipe))
1691
1692    if not pipe.temporary and not pipe.get_id(debug=debug):
1693        register_tuple = pipe.register(debug=debug)
1694        if not register_tuple[0]:
1695            return register_tuple
1696
1697    ### df is the dataframe returned from the remote source
1698    ### via the connector
1699    if debug:
1700        dprint("Fetched data:\n" + str(df))
1701
1702    if not isinstance(df, pd.DataFrame):
1703        df = pipe.enforce_dtypes(
1704            df,
1705            chunksize=chunksize,
1706            safe_copy=kw.get('safe_copy', False),
1707            debug=debug,
1708        )
1709
1710    ### if table does not exist, create it with indices
1711    is_new = False
1712    if not pipe.exists(debug=debug):
1713        check_existing = False
1714        is_new = True
1715    else:
1716        ### Check for new columns.
1717        add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug)
1718        if add_cols_queries:
1719            _ = pipe.__dict__.pop('_columns_indices', None)
1720            _ = pipe.__dict__.pop('_columns_types', None)
1721            if not self.exec_queries(add_cols_queries, debug=debug):
1722                warn(f"Failed to add new columns to {pipe}.")
1723
1724        alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug)
1725        if alter_cols_queries:
1726            _ = pipe.__dict__.pop('_columns_indices', None)
1727            _ = pipe.__dict__.pop('_columns_types', None)
1728            if not self.exec_queries(alter_cols_queries, debug=debug):
1729                warn(f"Failed to alter columns for {pipe}.")
1730            else:
1731                _ = pipe.infer_dtypes(persist=True)
1732
1733    ### NOTE: Oracle SQL < 23c (2023) and SQLite does not support booleans,
1734    ### so infer bools and persist them to `dtypes`.
1735    if self.flavor in ('oracle', 'sqlite', 'mysql', 'mariadb'):
1736        pipe_dtypes = pipe.dtypes
1737        new_bool_cols = {
1738            col: 'bool[pyarrow]'
1739            for col, typ in df.dtypes.items()
1740            if col not in pipe_dtypes
1741            and are_dtypes_equal(str(typ), 'bool')
1742        }
1743        pipe_dtypes.update(new_bool_cols)
1744        pipe.dtypes = pipe_dtypes
1745        if new_bool_cols and not pipe.temporary:
1746            infer_bool_success, infer_bool_msg = pipe.edit(debug=debug)
1747            if not infer_bool_success:
1748                return infer_bool_success, infer_bool_msg
1749
1750    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES
1751    if upsert:
1752        check_existing = False
1753    kw['safe_copy'] = kw.get('safe_copy', False)
1754
1755    unseen_df, update_df, delta_df = (
1756        pipe.filter_existing(
1757            df,
1758            chunksize=chunksize,
1759            debug=debug,
1760            **kw
1761        ) if check_existing else (df, None, df)
1762    )
1763    if upsert:
1764        unseen_df, update_df, delta_df = (df.head(0), df, df)
1765
1766    if debug:
1767        dprint("Delta data:\n" + str(delta_df))
1768        dprint("Unseen data:\n" + str(unseen_df))
1769        if update_df is not None:
1770            dprint(("Update" if not upsert else "Upsert") + " data:\n" + str(update_df))
1771
1772    if_exists = kw.get('if_exists', 'append')
1773    if 'if_exists' in kw:
1774        kw.pop('if_exists')
1775    if 'name' in kw:
1776        kw.pop('name')
1777
1778    ### Insert new data into Pipe's table.
1779    unseen_kw = copy.deepcopy(kw)
1780    unseen_kw.update({
1781        'name': pipe.target,
1782        'if_exists': if_exists,
1783        'debug': debug,
1784        'as_dict': True,
1785        'safe_copy': kw.get('safe_copy', False),
1786        'chunksize': chunksize,
1787        'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True),
1788        'schema': self.get_pipe_schema(pipe),
1789    })
1790
1791    dt_col = pipe.columns.get('datetime', None)
1792    primary_key = pipe.columns.get('primary', None)
1793    autoincrement = (
1794        pipe.parameters.get('autoincrement', False)
1795        or (
1796            is_new
1797            and primary_key
1798            and primary_key
1799            not in pipe.dtypes
1800            and primary_key not in unseen_df.columns
1801        )
1802    )
1803    if autoincrement and autoincrement not in pipe.parameters:
1804        pipe.parameters['autoincrement'] = autoincrement
1805        edit_success, edit_msg = pipe.edit(debug=debug)
1806        if not edit_success:
1807            return edit_success, edit_msg
1808
1809    def _check_pk(_df_to_clear):
1810        if _df_to_clear is None:
1811            return
1812        if primary_key not in _df_to_clear.columns:
1813            return
1814        if not _df_to_clear[primary_key].notnull().any():
1815            del _df_to_clear[primary_key]
1816
1817    autoincrement_needs_reset = bool(
1818        autoincrement
1819        and primary_key
1820        and primary_key in unseen_df.columns
1821        and unseen_df[primary_key].notnull().any()
1822    )
1823    if autoincrement and primary_key:
1824        for _df_to_clear in (unseen_df, update_df, delta_df):
1825            _check_pk(_df_to_clear)
1826
1827    if is_new:
1828        create_success, create_msg = self.create_pipe_table_from_df(
1829            pipe,
1830            unseen_df,
1831            debug=debug,
1832        )
1833        if not create_success:
1834            return create_success, create_msg
1835
1836    do_identity_insert = bool(
1837        self.flavor in ('mssql',)
1838        and primary_key
1839        and primary_key in unseen_df.columns
1840        and autoincrement
1841    )
1842    stats = {'success': True, 'msg': ''}
1843    if len(unseen_df) > 0:
1844        with self.engine.connect() as connection:
1845            with connection.begin():
1846                if do_identity_insert:
1847                    identity_on_result = self.exec(
1848                        f"SET IDENTITY_INSERT {pipe_name} ON",
1849                        commit=False,
1850                        _connection=connection,
1851                        close=False,
1852                        debug=debug,
1853                    )
1854                    if identity_on_result is None:
1855                        return False, f"Could not enable identity inserts on {pipe}."
1856
1857                stats = self.to_sql(
1858                    unseen_df,
1859                    _connection=connection,
1860                    **unseen_kw
1861                )
1862
1863                if do_identity_insert:
1864                    identity_off_result = self.exec(
1865                        f"SET IDENTITY_INSERT {pipe_name} OFF",
1866                        commit=False,
1867                        _connection=connection,
1868                        close=False,
1869                        debug=debug,
1870                    )
1871                    if identity_off_result is None:
1872                        return False, f"Could not disable identity inserts on {pipe}."
1873
1874    if is_new:
1875        if not self.create_indices(pipe, debug=debug):
1876            warn(f"Failed to create indices for {pipe}. Continuing...")
1877
1878    if autoincrement_needs_reset:
1879        reset_autoincrement_queries = get_reset_autoincrement_queries(
1880            pipe.target,
1881            primary_key,
1882            self,
1883            schema=self.get_pipe_schema(pipe),
1884            debug=debug,
1885        )
1886        results = self.exec_queries(reset_autoincrement_queries, debug=debug)
1887        for result in results:
1888            if result is None:
1889                warn(f"Could not reset auto-incrementing primary key for {pipe}.", stack=False)
1890
1891    if update_df is not None and len(update_df) > 0:
1892        temp_target = self.get_temporary_target(
1893            pipe.target,
1894            label=('update' if not upsert else 'upsert'),
1895        )
1896        self._log_temporary_tables_creation(temp_target, create=(not pipe.temporary), debug=debug)
1897        temp_pipe = Pipe(
1898            pipe.connector_keys.replace(':', '_') + '_', pipe.metric_key, pipe.location_key,
1899            instance=pipe.instance_keys,
1900            columns={
1901                (ix_key if ix_key != 'primary' else 'primary_'): ix
1902                for ix_key, ix in pipe.columns.items()
1903                if ix and ix in update_df.columns
1904            },
1905            dtypes={
1906                col: typ
1907                for col, typ in pipe.dtypes.items()
1908                if col in update_df.columns
1909            },
1910            target=temp_target,
1911            temporary=True,
1912            enforce=False,
1913            static=True,
1914            autoincrement=False,
1915            parameters={
1916                'schema': self.internal_schema,
1917                'hypertable': False,
1918            },
1919        )
1920        temp_pipe.__dict__['_columns_types'] = {
1921            col: get_db_type_from_pd_type(
1922                pipe.dtypes.get(col, str(typ)),
1923                self.flavor,
1924            )
1925            for col, typ in update_df.dtypes.items()
1926        }
1927        now_ts = time.perf_counter()
1928        temp_pipe.__dict__['_columns_types_timestamp'] = now_ts
1929        temp_pipe.__dict__['_skip_check_indices'] = True
1930        temp_success, temp_msg = temp_pipe.sync(update_df, check_existing=False, debug=debug)
1931        if not temp_success:
1932            return temp_success, temp_msg
1933        existing_cols = pipe.get_columns_types(debug=debug)
1934        join_cols = [
1935            col
1936            for col_key, col in pipe.columns.items()
1937            if col and col in existing_cols
1938        ] if not primary_key or self.flavor == 'oracle' else (
1939            [dt_col, primary_key]
1940            if self.flavor == 'timescaledb' and dt_col and dt_col in update_df.columns
1941            else [primary_key]
1942        )
1943        update_queries = get_update_queries(
1944            pipe.target,
1945            temp_target,
1946            self,
1947            join_cols,
1948            upsert=upsert,
1949            schema=self.get_pipe_schema(pipe),
1950            patch_schema=self.internal_schema,
1951            datetime_col=(dt_col if dt_col in update_df.columns else None),
1952            identity_insert=(autoincrement and primary_key in update_df.columns),
1953            null_indices=pipe.null_indices,
1954            cast_columns=pipe.enforce,
1955            debug=debug,
1956        )
1957        update_results = self.exec_queries(
1958            update_queries,
1959            break_on_error=True,
1960            rollback=True,
1961            debug=debug,
1962        )
1963        update_success = all(update_results)
1964        self._log_temporary_tables_creation(
1965            temp_target,
1966            ready_to_drop=True,
1967            create=(not pipe.temporary),
1968            debug=debug,
1969        )
1970        if not update_success:
1971            warn(f"Failed to apply update to {pipe}.")
1972        stats['success'] = stats['success'] and update_success
1973        stats['msg'] = (
1974            (stats.get('msg', '') + f'\nFailed to apply update to {pipe}.').lstrip()
1975            if not update_success
1976            else stats.get('msg', '')
1977        )
1978
1979    stop = time.perf_counter()
1980    success = stats['success']
1981    if not success:
1982        return success, stats['msg'] or str(stats)
1983
1984    unseen_count = len(unseen_df.index) if unseen_df is not None else 0
1985    update_count = len(update_df.index) if update_df is not None else 0
1986    msg = (
1987        (
1988            f"Inserted {unseen_count:,}, "
1989            + f"updated {update_count:,} rows."
1990        )
1991        if not upsert
1992        else (
1993            f"Upserted {update_count:,} row"
1994            + ('s' if update_count != 1 else '')
1995            + "."
1996        )
1997    )
1998    if debug:
1999        msg = msg[:-1] + (
2000            f"\non table {sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))}\n"
2001            + f"in {round(stop - start, 2)} seconds."
2002        )
2003
2004    if _check_temporary_tables:
2005        drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables(
2006            refresh=False, debug=debug
2007        )
2008        if not drop_stale_success:
2009            warn(drop_stale_msg)
2010
2011    return success, msg

Sync a pipe using a database connection.

Parameters
  • pipe (mrsm.Pipe): The Meerschaum Pipe instance into which to sync the data.
  • df (Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]): An optional DataFrame or equivalent to sync into the pipe. Defaults to None.
  • begin (Optional[datetime], default None): Optionally specify the earliest datetime to search for data. Defaults to None.
  • end (Optional[datetime], default None): Optionally specify the latest datetime to search for data. Defaults to None.
  • chunksize (Optional[int], default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe. Defaults to True.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): Catch-all for keyword arguments.
Returns
  • A SuccessTuple of success (bool) and message (str).
def sync_pipe_inplace( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
2014def sync_pipe_inplace(
2015    self,
2016    pipe: 'mrsm.Pipe',
2017    params: Optional[Dict[str, Any]] = None,
2018    begin: Union[datetime, int, None] = None,
2019    end: Union[datetime, int, None] = None,
2020    chunksize: Optional[int] = -1,
2021    check_existing: bool = True,
2022    debug: bool = False,
2023    **kw: Any
2024) -> SuccessTuple:
2025    """
2026    If a pipe's connector is the same as its instance connector,
2027    it's more efficient to sync the pipe in-place rather than reading data into Pandas.
2028
2029    Parameters
2030    ----------
2031    pipe: mrsm.Pipe
2032        The pipe whose connector is the same as its instance.
2033
2034    params: Optional[Dict[str, Any]], default None
2035        Optional params dictionary to build the `WHERE` clause.
2036        See `meerschaum.utils.sql.build_where`.
2037
2038    begin: Union[datetime, int, None], default None
2039        Optionally specify the earliest datetime to search for data.
2040        Defaults to `None`.
2041
2042    end: Union[datetime, int, None], default None
2043        Optionally specify the latest datetime to search for data.
2044        Defaults to `None`.
2045
2046    chunksize: Optional[int], default -1
2047        Specify the number of rows to sync per chunk.
2048        If `-1`, resort to system configuration (default is `900`).
2049        A `chunksize` of `None` will sync all rows in one transaction.
2050        Defaults to `-1`.
2051
2052    check_existing: bool, default True
2053        If `True`, pull and diff with existing data from the pipe.
2054
2055    debug: bool, default False
2056        Verbosity toggle.
2057
2058    Returns
2059    -------
2060    A SuccessTuple.
2061    """
2062    if self.flavor == 'duckdb':
2063        return pipe.sync(
2064            params=params,
2065            begin=begin,
2066            end=end,
2067            chunksize=chunksize,
2068            check_existing=check_existing,
2069            debug=debug,
2070            _inplace=False,
2071            **kw
2072        )
2073    from meerschaum.utils.sql import (
2074        sql_item_name,
2075        get_update_queries,
2076        get_null_replacement,
2077        get_create_table_queries,
2078        get_table_cols_types,
2079        session_execute,
2080        dateadd_str,
2081        UPDATE_QUERIES,
2082    )
2083    from meerschaum.utils.dtypes.sql import (
2084        get_pd_type_from_db_type,
2085        get_db_type_from_pd_type,
2086    )
2087    from meerschaum.utils.misc import generate_password
2088
2089    transaction_id_length = (
2090        mrsm.get_config(
2091            'system', 'connectors', 'sql', 'instance', 'temporary_target', 'transaction_id_length'
2092        )
2093    )
2094    transact_id = generate_password(transaction_id_length)
2095
2096    internal_schema = self.internal_schema
2097    target = pipe.target
2098    temp_table_roots = ['backtrack', 'new', 'delta', 'joined', 'unseen', 'update']
2099    temp_tables = {
2100        table_root: self.get_temporary_target(target, transact_id=transact_id, label=table_root)
2101        for table_root in temp_table_roots
2102    }
2103    temp_table_names = {
2104        table_root: sql_item_name(table_name_raw, self.flavor, internal_schema)
2105        for table_root, table_name_raw in temp_tables.items()
2106    }
2107    temp_table_aliases = {
2108        table_root: sql_item_name(table_root, self.flavor)
2109        for table_root in temp_table_roots
2110    }
2111    table_alias_as = " AS" if self.flavor != 'oracle' else ''
2112    metadef = self.get_pipe_metadef(
2113        pipe,
2114        params=params,
2115        begin=begin,
2116        end=end,
2117        check_existing=check_existing,
2118        debug=debug,
2119    )
2120    pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
2121    upsert = pipe.parameters.get('upsert', False) and f'{self.flavor}-upsert' in UPDATE_QUERIES
2122    static = pipe.parameters.get('static', False)
2123    database = getattr(self, 'database', self.parse_uri(self.URI).get('database', None))
2124    primary_key = pipe.columns.get('primary', None)
2125    primary_key_typ = pipe.dtypes.get(primary_key, None) if primary_key else None
2126    primary_key_db_type = (
2127        get_db_type_from_pd_type(primary_key_typ, self.flavor)
2128        if primary_key_typ
2129        else None
2130    )
2131    autoincrement = pipe.parameters.get('autoincrement', False)
2132    dt_col = pipe.columns.get('datetime', None)
2133    dt_col_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
2134    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
2135    dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
2136
2137    def clean_up_temp_tables(ready_to_drop: bool = False):
2138        log_success, log_msg = self._log_temporary_tables_creation(
2139            [
2140                table
2141                for table in temp_tables.values()
2142            ] if not upsert else [temp_tables['update']],
2143            ready_to_drop=ready_to_drop,
2144            create=(not pipe.temporary),
2145            debug=debug,
2146        )
2147        if not log_success:
2148            warn(log_msg)
2149        drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables(
2150            refresh=False,
2151            debug=debug,
2152        )
2153        if not drop_stale_success:
2154            warn(drop_stale_msg)
2155        return drop_stale_success, drop_stale_msg
2156
2157    sqlalchemy, sqlalchemy_orm = mrsm.attempt_import('sqlalchemy', 'sqlalchemy.orm')
2158    if not pipe.exists(debug=debug):
2159        create_pipe_queries = get_create_table_queries(
2160            metadef,
2161            pipe.target,
2162            self.flavor,
2163            schema=self.get_pipe_schema(pipe),
2164            primary_key=primary_key,
2165            primary_key_db_type=primary_key_db_type,
2166            autoincrement=autoincrement,
2167            datetime_column=dt_col,
2168        )
2169        results = self.exec_queries(create_pipe_queries, debug=debug)
2170        if not all(results):
2171            _ = clean_up_temp_tables()
2172            return False, f"Could not insert new data into {pipe} from its SQL query definition."
2173
2174        if not self.create_indices(pipe, debug=debug):
2175            warn(f"Failed to create indices for {pipe}. Continuing...")
2176
2177        rowcount = pipe.get_rowcount(debug=debug)
2178        _ = clean_up_temp_tables()
2179        return True, f"Inserted {rowcount:,}, updated 0 rows."
2180
2181    session = sqlalchemy_orm.Session(self.engine)
2182    connectable = session if self.flavor != 'duckdb' else self
2183
2184    create_new_query = get_create_table_queries(
2185        metadef,
2186        temp_tables[('new') if not upsert else 'update'],
2187        self.flavor,
2188        schema=internal_schema,
2189    )[0]
2190    (create_new_success, create_new_msg), create_new_results = session_execute(
2191        session,
2192        create_new_query,
2193        with_results=True,
2194        debug=debug,
2195    )
2196    if not create_new_success:
2197        _ = clean_up_temp_tables()
2198        return create_new_success, create_new_msg
2199    new_count = create_new_results[0].rowcount if create_new_results else 0
2200
2201    new_cols_types = get_table_cols_types(
2202        temp_tables[('new' if not upsert else 'update')],
2203        connectable=connectable,
2204        flavor=self.flavor,
2205        schema=internal_schema,
2206        database=database,
2207        debug=debug,
2208    ) if not static else pipe.get_columns_types(debug=debug)
2209    if not new_cols_types:
2210        return False, f"Failed to get new columns for {pipe}."
2211
2212    new_cols = {
2213        str(col_name): get_pd_type_from_db_type(str(col_type))
2214        for col_name, col_type in new_cols_types.items()
2215    }
2216    new_cols_str = '\n    ' + ',\n    '.join([
2217        sql_item_name(col, self.flavor)
2218        for col in new_cols
2219    ])
2220    def get_col_typ(col: str, cols_types: Dict[str, str]) -> str:
2221        if self.flavor == 'oracle' and new_cols_types.get(col, '').lower() == 'char':
2222            return new_cols_types[col]
2223        return cols_types[col]
2224
2225    add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug)
2226    if add_cols_queries:
2227        _ = pipe.__dict__.pop('_columns_types', None)
2228        _ = pipe.__dict__.pop('_columns_indices', None)
2229        self.exec_queries(add_cols_queries, debug=debug)
2230
2231    alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug)
2232    if alter_cols_queries:
2233        _ = pipe.__dict__.pop('_columns_types', None)
2234        self.exec_queries(alter_cols_queries, debug=debug)
2235
2236    insert_queries = [
2237        (
2238            f"INSERT INTO {pipe_name} ({new_cols_str})\n"
2239            f"SELECT {new_cols_str}\nFROM {temp_table_names['new']}{table_alias_as}"
2240            f" {temp_table_aliases['new']}"
2241        )
2242    ] if not check_existing and not upsert else []
2243
2244    new_queries = insert_queries
2245    new_success, new_msg = (
2246        session_execute(session, new_queries, debug=debug)
2247        if new_queries
2248        else (True, "Success")
2249    )
2250    if not new_success:
2251        _ = clean_up_temp_tables()
2252        return new_success, new_msg
2253
2254    if not check_existing:
2255        session.commit()
2256        _ = clean_up_temp_tables()
2257        return True, f"Inserted {new_count}, updated 0 rows."
2258
2259    min_dt_col_name_da = dateadd_str(
2260        flavor=self.flavor, begin=f"MIN({dt_col_name})", db_type=dt_db_type,
2261    )
2262    max_dt_col_name_da = dateadd_str(
2263        flavor=self.flavor, begin=f"MAX({dt_col_name})", db_type=dt_db_type,
2264    )
2265
2266    (new_dt_bounds_success, new_dt_bounds_msg), new_dt_bounds_results = session_execute(
2267        session,
2268        [
2269            "SELECT\n"
2270            f"    {min_dt_col_name_da} AS {sql_item_name('min_dt', self.flavor)},\n"
2271            f"    {max_dt_col_name_da} AS {sql_item_name('max_dt', self.flavor)}\n"
2272            f"FROM {temp_table_names['new' if not upsert else 'update']}\n"
2273            f"WHERE {dt_col_name} IS NOT NULL"
2274        ],
2275        with_results=True,
2276        debug=debug,
2277    ) if dt_col and not upsert else ((True, "Success"), None)
2278    if not new_dt_bounds_success:
2279        return (
2280            new_dt_bounds_success,
2281            f"Could not determine in-place datetime bounds:\n{new_dt_bounds_msg}"
2282        )
2283
2284    if dt_col and not upsert:
2285        begin, end = new_dt_bounds_results[0].fetchone()
2286
2287    backtrack_def = self.get_pipe_data_query(
2288        pipe,
2289        begin=begin,
2290        end=end,
2291        begin_add_minutes=0,
2292        end_add_minutes=1,
2293        params=params,
2294        debug=debug,
2295        order=None,
2296    )
2297    create_backtrack_query = get_create_table_queries(
2298        backtrack_def,
2299        temp_tables['backtrack'],
2300        self.flavor,
2301        schema=internal_schema,
2302    )[0]
2303    (create_backtrack_success, create_backtrack_msg), create_backtrack_results = session_execute(
2304        session,
2305        create_backtrack_query,
2306        with_results=True,
2307        debug=debug,
2308    ) if not upsert else ((True, "Success"), None)
2309
2310    if not create_backtrack_success:
2311        _ = clean_up_temp_tables()
2312        return create_backtrack_success, create_backtrack_msg
2313
2314    backtrack_cols_types = get_table_cols_types(
2315        temp_tables['backtrack'],
2316        connectable=connectable,
2317        flavor=self.flavor,
2318        schema=internal_schema,
2319        database=database,
2320        debug=debug,
2321    ) if not (upsert or static) else new_cols_types
2322
2323    common_cols = [col for col in new_cols if col in backtrack_cols_types]
2324    primary_key = pipe.columns.get('primary', None)
2325    on_cols = {
2326        col: new_cols.get(col)
2327        for col_key, col in pipe.columns.items()
2328        if (
2329            col
2330            and
2331            col_key != 'value'
2332            and col in backtrack_cols_types
2333            and col in new_cols
2334        )
2335    } if not primary_key or self.flavor == 'oracle' else {primary_key: new_cols.get(primary_key)}
2336
2337    null_replace_new_cols_str = (
2338        '\n    ' + ',\n    '.join([
2339            f"COALESCE({temp_table_aliases['new']}.{sql_item_name(col, self.flavor)}, "
2340            + get_null_replacement(get_col_typ(col, new_cols_types), self.flavor)
2341            + ") AS "
2342            + sql_item_name(col, self.flavor, None)
2343            for col, typ in new_cols.items()
2344        ])
2345    )
2346
2347    select_delta_query = (
2348        "SELECT"
2349        + null_replace_new_cols_str
2350        + f"\nFROM {temp_table_names['new']}{table_alias_as} {temp_table_aliases['new']}\n"
2351        + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as} {temp_table_aliases['backtrack']}"
2352        + "\n    ON\n    "
2353        + '\n    AND\n    '.join([
2354            (
2355                f"    COALESCE({temp_table_aliases['new']}."
2356                + sql_item_name(c, self.flavor, None)
2357                + ", "
2358                + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor)
2359                + ")"
2360                + '\n        =\n    '
2361                + f"    COALESCE({temp_table_aliases['backtrack']}."
2362                + sql_item_name(c, self.flavor, None)
2363                + ", "
2364                + get_null_replacement(get_col_typ(c, backtrack_cols_types), self.flavor)
2365                + ") "
2366            ) for c in common_cols
2367        ])
2368        + "\nWHERE\n    "
2369        + '\n    AND\n    '.join([
2370            (
2371                f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) + ' IS NULL'
2372            ) for c in common_cols
2373        ])
2374    )
2375    create_delta_query = get_create_table_queries(
2376        select_delta_query,
2377        temp_tables['delta'],
2378        self.flavor,
2379        schema=internal_schema,
2380    )[0]
2381    create_delta_success, create_delta_msg = session_execute(
2382        session,
2383        create_delta_query,
2384        debug=debug,
2385    ) if not upsert else (True, "Success")
2386    if not create_delta_success:
2387        _ = clean_up_temp_tables()
2388        return create_delta_success, create_delta_msg
2389
2390    delta_cols_types = get_table_cols_types(
2391        temp_tables['delta'],
2392        connectable=connectable,
2393        flavor=self.flavor,
2394        schema=internal_schema,
2395        database=database,
2396        debug=debug,
2397    ) if not (upsert or static) else new_cols_types
2398
2399    ### This is a weird bug on SQLite.
2400    ### Sometimes the backtrack dtypes are all empty strings.
2401    if not all(delta_cols_types.values()):
2402        delta_cols_types = new_cols_types
2403
2404    delta_cols = {
2405        col: get_pd_type_from_db_type(typ)
2406        for col, typ in delta_cols_types.items()
2407    }
2408    delta_cols_str = ', '.join([
2409        sql_item_name(col, self.flavor)
2410        for col in delta_cols
2411    ])
2412
2413    select_joined_query = (
2414        "SELECT\n    "
2415        + (',\n    '.join([
2416            (
2417                f"{temp_table_aliases['delta']}." + sql_item_name(c, self.flavor, None)
2418                + " AS " + sql_item_name(c + '_delta', self.flavor, None)
2419            ) for c in delta_cols
2420        ]))
2421        + ",\n    "
2422        + (',\n    '.join([
2423            (
2424                f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor, None)
2425                + " AS " + sql_item_name(c + '_backtrack', self.flavor, None)
2426            ) for c in backtrack_cols_types
2427        ]))
2428        + f"\nFROM {temp_table_names['delta']}{table_alias_as} {temp_table_aliases['delta']}\n"
2429        + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as}"
2430        + f" {temp_table_aliases['backtrack']}"
2431        + "\n    ON\n    "
2432        + '\n    AND\n    '.join([
2433            (
2434                f"    COALESCE({temp_table_aliases['delta']}." + sql_item_name(c, self.flavor)
2435                + ", "
2436                + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")"
2437                + '\n        =\n    '
2438                + f"    COALESCE({temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor)
2439                + ", "
2440                + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")"
2441            ) for c, typ in on_cols.items()
2442        ])
2443    )
2444
2445    create_joined_query = get_create_table_queries(
2446        select_joined_query,
2447        temp_tables['joined'],
2448        self.flavor,
2449        schema=internal_schema,
2450    )[0]
2451    create_joined_success, create_joined_msg = session_execute(
2452        session,
2453        create_joined_query,
2454        debug=debug,
2455    ) if on_cols and not upsert else (True, "Success")
2456    if not create_joined_success:
2457        _ = clean_up_temp_tables()
2458        return create_joined_success, create_joined_msg
2459
2460    select_unseen_query = (
2461        "SELECT\n    "
2462        + (',\n    '.join([
2463            (
2464                "CASE\n        WHEN " + sql_item_name(c + '_delta', self.flavor, None)
2465                + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor)
2466                + " THEN " + sql_item_name(c + '_delta', self.flavor, None)
2467                + "\n        ELSE NULL\n    END"
2468                + " AS " + sql_item_name(c, self.flavor, None)
2469            ) for c, typ in delta_cols.items()
2470        ]))
2471        + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n"
2472        + "WHERE\n    "
2473        + '\n    AND\n    '.join([
2474            (
2475                sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NULL'
2476            ) for c in delta_cols
2477        ])
2478    )
2479    create_unseen_query = get_create_table_queries(
2480        select_unseen_query,
2481        temp_tables['unseen'],
2482        self.flavor,
2483        internal_schema,
2484    )[0]
2485    (create_unseen_success, create_unseen_msg), create_unseen_results = session_execute(
2486        session,
2487        create_unseen_query,
2488        with_results=True,
2489        debug=debug
2490    ) if not upsert else ((True, "Success"), None)
2491    if not create_unseen_success:
2492        _ = clean_up_temp_tables()
2493        return create_unseen_success, create_unseen_msg
2494
2495    select_update_query = (
2496        "SELECT\n    "
2497        + (',\n    '.join([
2498            (
2499                "CASE\n        WHEN " + sql_item_name(c + '_delta', self.flavor, None)
2500                + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor)
2501                + " THEN " + sql_item_name(c + '_delta', self.flavor, None)
2502                + "\n        ELSE NULL\n    END"
2503                + " AS " + sql_item_name(c, self.flavor, None)
2504            ) for c, typ in delta_cols.items()
2505        ]))
2506        + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n"
2507        + "WHERE\n    "
2508        + '\n    OR\n    '.join([
2509            (
2510                sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NOT NULL'
2511            ) for c in delta_cols
2512        ])
2513    )
2514
2515    create_update_query = get_create_table_queries(
2516        select_update_query,
2517        temp_tables['update'],
2518        self.flavor,
2519        internal_schema,
2520    )[0]
2521    (create_update_success, create_update_msg), create_update_results = session_execute(
2522        session,
2523        create_update_query,
2524        with_results=True,
2525        debug=debug,
2526    ) if on_cols and not upsert else ((True, "Success"), [])
2527    apply_update_queries = (
2528        get_update_queries(
2529            pipe.target,
2530            temp_tables['update'],
2531            session,
2532            on_cols,
2533            upsert=upsert,
2534            schema=self.get_pipe_schema(pipe),
2535            patch_schema=internal_schema,
2536            datetime_col=pipe.columns.get('datetime', None),
2537            flavor=self.flavor,
2538            null_indices=pipe.null_indices,
2539            cast_columns=pipe.enforce,
2540            debug=debug,
2541        )
2542        if on_cols else []
2543    )
2544
2545    apply_unseen_queries = [
2546        (
2547            f"INSERT INTO {pipe_name} ({delta_cols_str})\n"
2548            + f"SELECT {delta_cols_str}\nFROM "
2549            + (
2550                temp_table_names['unseen']
2551                if on_cols
2552                else temp_table_names['delta']
2553            )
2554        ),
2555    ]
2556
2557    (apply_unseen_success, apply_unseen_msg), apply_unseen_results = session_execute(
2558        session,
2559        apply_unseen_queries,
2560        with_results=True,
2561        debug=debug,
2562    ) if not upsert else ((True, "Success"), None)
2563    if not apply_unseen_success:
2564        _ = clean_up_temp_tables()
2565        return apply_unseen_success, apply_unseen_msg
2566    unseen_count = apply_unseen_results[0].rowcount if apply_unseen_results else 0
2567
2568    (apply_update_success, apply_update_msg), apply_update_results = session_execute(
2569        session,
2570        apply_update_queries,
2571        with_results=True,
2572        debug=debug,
2573    )
2574    if not apply_update_success:
2575        _ = clean_up_temp_tables()
2576        return apply_update_success, apply_update_msg
2577    update_count = apply_update_results[0].rowcount if apply_update_results else 0
2578
2579    session.commit()
2580
2581    msg = (
2582        f"Inserted {unseen_count:,}, updated {update_count:,} rows."
2583        if not upsert
2584        else f"Upserted {update_count:,} row" + ('s' if update_count != 1 else '') + "."
2585    )
2586    _ = clean_up_temp_tables(ready_to_drop=True)
2587
2588    return True, msg

If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.

Parameters
  • pipe (mrsm.Pipe): The pipe whose connector is the same as its instance.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.
  • begin (Union[datetime, int, None], default None): Optionally specify the earliest datetime to search for data. Defaults to None.
  • end (Union[datetime, int, None], default None): Optionally specify the latest datetime to search for data. Defaults to None.
  • chunksize (Optional[int], default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, remote: bool = False, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
2591def get_sync_time(
2592    self,
2593    pipe: 'mrsm.Pipe',
2594    params: Optional[Dict[str, Any]] = None,
2595    newest: bool = True,
2596    remote: bool = False,
2597    debug: bool = False,
2598) -> Union[datetime, int, None]:
2599    """Get a Pipe's most recent datetime value.
2600
2601    Parameters
2602    ----------
2603    pipe: mrsm.Pipe
2604        The pipe to get the sync time for.
2605
2606    params: Optional[Dict[str, Any]], default None
2607        Optional params dictionary to build the `WHERE` clause.
2608        See `meerschaum.utils.sql.build_where`.
2609
2610    newest: bool, default True
2611        If `True`, get the most recent datetime (honoring `params`).
2612        If `False`, get the oldest datetime (ASC instead of DESC).
2613
2614    remote: bool, default False
2615        If `True`, return the sync time for the remote fetch definition.
2616
2617    Returns
2618    -------
2619    A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`.
2620    """
2621    from meerschaum.utils.sql import sql_item_name, build_where, wrap_query_with_cte
2622    src_name = sql_item_name('src', self.flavor)
2623    table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
2624
2625    dt_col = pipe.columns.get('datetime', None)
2626    if dt_col is None:
2627        return None
2628    dt_col_name = sql_item_name(dt_col, self.flavor, None)
2629
2630    if remote and pipe.connector.type != 'sql':
2631        warn(f"Cannot get the remote sync time for {pipe}.")
2632        return None
2633
2634    ASC_or_DESC = "DESC" if newest else "ASC"
2635    existing_cols = pipe.get_columns_types(debug=debug)
2636    valid_params = {}
2637    if params is not None:
2638        valid_params = {k: v for k, v in params.items() if k in existing_cols}
2639    flavor = self.flavor if not remote else pipe.connector.flavor
2640
2641    ### If no bounds are provided for the datetime column,
2642    ### add IS NOT NULL to the WHERE clause.
2643    if dt_col not in valid_params:
2644        valid_params[dt_col] = '_None'
2645    where = "" if not valid_params else build_where(valid_params, self)
2646    src_query = (
2647        f"SELECT {dt_col_name}\nFROM {table_name}{where}"
2648        if not remote
2649        else self.get_pipe_metadef(pipe, params=params, begin=None, end=None)
2650    )
2651
2652    base_query = (
2653        f"SELECT {dt_col_name}\n"
2654        f"FROM {src_name}{where}\n"
2655        f"ORDER BY {dt_col_name} {ASC_or_DESC}\n"
2656        f"LIMIT 1"
2657    )
2658    if self.flavor == 'mssql':
2659        base_query = (
2660            f"SELECT TOP 1 {dt_col_name}\n"
2661            f"FROM {src_name}{where}\n"
2662            f"ORDER BY {dt_col_name} {ASC_or_DESC}"
2663        )
2664    elif self.flavor == 'oracle':
2665        base_query = (
2666            "SELECT * FROM (\n"
2667            f"    SELECT {dt_col_name}\n"
2668            f"    FROM {src_name}{where}\n"
2669            f"    ORDER BY {dt_col_name} {ASC_or_DESC}\n"
2670            ") WHERE ROWNUM = 1"
2671        )
2672
2673    query = wrap_query_with_cte(src_query, base_query, flavor)
2674
2675    try:
2676        db_time = self.value(query, silent=True, debug=debug)
2677
2678        ### No datetime could be found.
2679        if db_time is None:
2680            return None
2681        ### sqlite returns str.
2682        if isinstance(db_time, str):
2683            dateutil_parser = mrsm.attempt_import('dateutil.parser')
2684            st = dateutil_parser.parse(db_time)
2685        ### Do nothing if a datetime object is returned.
2686        elif isinstance(db_time, datetime):
2687            if hasattr(db_time, 'to_pydatetime'):
2688                st = db_time.to_pydatetime()
2689            else:
2690                st = db_time
2691        ### Sometimes the datetime is actually a date.
2692        elif isinstance(db_time, date):
2693            st = datetime.combine(db_time, datetime.min.time())
2694        ### Adding support for an integer datetime axis.
2695        elif 'int' in str(type(db_time)).lower():
2696            st = int(db_time)
2697        ### Convert pandas timestamp to Python datetime.
2698        else:
2699            st = db_time.to_pydatetime()
2700
2701        sync_time = st
2702
2703    except Exception as e:
2704        sync_time = None
2705        warn(str(e))
2706
2707    return sync_time

Get a Pipe's most recent datetime value.

Parameters
  • pipe (mrsm.Pipe): The pipe to get the sync time for.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • remote (bool, default False): If True, return the sync time for the remote fetch definition.
Returns
  • A datetime object (or int if using an integer axis) if the pipe exists, otherwise None.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
2710def pipe_exists(
2711    self,
2712    pipe: mrsm.Pipe,
2713    debug: bool = False
2714) -> bool:
2715    """
2716    Check that a Pipe's table exists.
2717
2718    Parameters
2719    ----------
2720    pipe: mrsm.Pipe:
2721        The pipe to check.
2722
2723    debug: bool, default False
2724        Verbosity toggle.
2725
2726    Returns
2727    -------
2728    A `bool` corresponding to whether a pipe's table exists.
2729
2730    """
2731    from meerschaum.utils.sql import table_exists
2732    exists = table_exists(
2733        pipe.target,
2734        self,
2735        schema=self.get_pipe_schema(pipe),
2736        debug=debug,
2737    )
2738    if debug:
2739        from meerschaum.utils.debug import dprint
2740        dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.'))
2741    return exists

Check that a Pipe's table exists.

Parameters
  • pipe (mrsm.Pipe:): The pipe to check.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's table exists.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> Optional[int]:
2744def get_pipe_rowcount(
2745    self,
2746    pipe: mrsm.Pipe,
2747    begin: Union[datetime, int, None] = None,
2748    end: Union[datetime, int, None] = None,
2749    params: Optional[Dict[str, Any]] = None,
2750    remote: bool = False,
2751    debug: bool = False
2752) -> Union[int, None]:
2753    """
2754    Get the rowcount for a pipe in accordance with given parameters.
2755
2756    Parameters
2757    ----------
2758    pipe: mrsm.Pipe
2759        The pipe to query with.
2760
2761    begin: Union[datetime, int, None], default None
2762        The begin datetime value.
2763
2764    end: Union[datetime, int, None], default None
2765        The end datetime value.
2766
2767    params: Optional[Dict[str, Any]], default None
2768        See `meerschaum.utils.sql.build_where`.
2769
2770    remote: bool, default False
2771        If `True`, get the rowcount for the remote table.
2772
2773    debug: bool, default False
2774        Verbosity toggle.
2775
2776    Returns
2777    -------
2778    An `int` for the number of rows if the `pipe` exists, otherwise `None`.
2779
2780    """
2781    from meerschaum.utils.sql import dateadd_str, sql_item_name, wrap_query_with_cte, build_where
2782    from meerschaum.connectors.sql._fetch import get_pipe_query
2783    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
2784    if remote:
2785        msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount."
2786        if 'fetch' not in pipe.parameters:
2787            error(msg)
2788            return None
2789        if 'definition' not in pipe.parameters['fetch']:
2790            error(msg)
2791            return None
2792
2793
2794    flavor = self.flavor if not remote else pipe.connector.flavor
2795    conn = self if not remote else pipe.connector
2796    _pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe))
2797    dt_col = pipe.columns.get('datetime', None)
2798    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
2799    dt_db_type = get_db_type_from_pd_type(dt_typ, flavor) if dt_typ else None
2800    if not dt_col:
2801        dt_col = pipe.guess_datetime()
2802        dt_name = sql_item_name(dt_col, flavor, None) if dt_col else None
2803        is_guess = True
2804    else:
2805        dt_col = pipe.get_columns('datetime')
2806        dt_name = sql_item_name(dt_col, flavor, None)
2807        is_guess = False
2808
2809    if begin is not None or end is not None:
2810        if is_guess:
2811            if dt_col is None:
2812                warn(
2813                    f"No datetime could be determined for {pipe}."
2814                    + "\n    Ignoring begin and end...",
2815                    stack=False,
2816                )
2817                begin, end = None, None
2818            else:
2819                warn(
2820                    f"A datetime wasn't specified for {pipe}.\n"
2821                    + f"    Using column \"{dt_col}\" for datetime bounds...",
2822                    stack=False,
2823                )
2824
2825
2826    _datetime_name = sql_item_name(dt_col, flavor)
2827    _cols_names = [
2828        sql_item_name(col, flavor)
2829        for col in set(
2830            (
2831                [dt_col]
2832                if dt_col
2833                else []
2834            ) + (
2835                []
2836                if params is None
2837                else list(params.keys())
2838            )
2839        )
2840    ]
2841    if not _cols_names:
2842        _cols_names = ['*']
2843
2844    src = (
2845        f"SELECT {', '.join(_cols_names)}\nFROM {_pipe_name}"
2846        if not remote
2847        else get_pipe_query(pipe)
2848    )
2849    parent_query = f"SELECT COUNT(*)\nFROM {sql_item_name('src', flavor)}"
2850    query = wrap_query_with_cte(src, parent_query, flavor)
2851    if begin is not None or end is not None:
2852        query += "\nWHERE"
2853    if begin is not None:
2854        query += (
2855            f"\n    {dt_name} >= "
2856            + dateadd_str(flavor, datepart='minute', number=0, begin=begin, db_type=dt_db_type)
2857        )
2858    if end is not None and begin is not None:
2859        query += "\n    AND"
2860    if end is not None:
2861        query += (
2862            f"\n    {dt_name} <  "
2863            + dateadd_str(flavor, datepart='minute', number=0, begin=end, db_type=dt_db_type)
2864        )
2865    if params is not None:
2866        existing_cols = pipe.get_columns_types(debug=debug)
2867        valid_params = {k: v for k, v in params.items() if k in existing_cols}
2868        if valid_params:
2869            query += build_where(valid_params, conn).replace('WHERE', (
2870                'AND' if (begin is not None or end is not None)
2871                    else 'WHERE'
2872                )
2873            )
2874
2875    result = conn.value(query, debug=debug, silent=True)
2876    try:
2877        return int(result)
2878    except Exception:
2879        return None

Get the rowcount for a pipe in accordance with given parameters.

Parameters
  • pipe (mrsm.Pipe): The pipe to query with.
  • begin (Union[datetime, int, None], default None): The begin datetime value.
  • end (Union[datetime, int, None], default None): The end datetime value.
  • params (Optional[Dict[str, Any]], default None): See meerschaum.utils.sql.build_where.
  • remote (bool, default False): If True, get the rowcount for the remote table.
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int for the number of rows if the pipe exists, otherwise None.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw) -> Tuple[bool, str]:
2882def drop_pipe(
2883    self,
2884    pipe: mrsm.Pipe,
2885    debug: bool = False,
2886    **kw
2887) -> SuccessTuple:
2888    """
2889    Drop a pipe's tables but maintain its registration.
2890
2891    Parameters
2892    ----------
2893    pipe: mrsm.Pipe
2894        The pipe to drop.
2895
2896    Returns
2897    -------
2898    A `SuccessTuple` indicated success.
2899    """
2900    from meerschaum.utils.sql import table_exists, sql_item_name, DROP_IF_EXISTS_FLAVORS
2901    success = True
2902    target = pipe.target
2903    schema = self.get_pipe_schema(pipe)
2904    target_name = (
2905        sql_item_name(target, self.flavor, schema)
2906    )
2907    if table_exists(target, self, schema=schema, debug=debug):
2908        if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
2909        success = self.exec(
2910            f"DROP TABLE {if_exists_str} {target_name}", silent=True, debug=debug
2911        ) is not None
2912
2913    msg = "Success" if success else f"Failed to drop {pipe}."
2914    return success, msg

Drop a pipe's tables but maintain its registration.

Parameters
  • pipe (mrsm.Pipe): The pipe to drop.
Returns
  • A SuccessTuple indicated success.
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
2917def clear_pipe(
2918    self,
2919    pipe: mrsm.Pipe,
2920    begin: Union[datetime, int, None] = None,
2921    end: Union[datetime, int, None] = None,
2922    params: Optional[Dict[str, Any]] = None,
2923    debug: bool = False,
2924    **kw
2925) -> SuccessTuple:
2926    """
2927    Delete a pipe's data within a bounded or unbounded interval without dropping the table.
2928
2929    Parameters
2930    ----------
2931    pipe: mrsm.Pipe
2932        The pipe to clear.
2933        
2934    begin: Union[datetime, int, None], default None
2935        Beginning datetime. Inclusive.
2936
2937    end: Union[datetime, int, None], default None
2938         Ending datetime. Exclusive.
2939
2940    params: Optional[Dict[str, Any]], default None
2941         See `meerschaum.utils.sql.build_where`.
2942
2943    """
2944    if not pipe.exists(debug=debug):
2945        return True, f"{pipe} does not exist, so nothing was cleared."
2946
2947    from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str
2948    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
2949    pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
2950
2951    dt_col = pipe.columns.get('datetime', None)
2952    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
2953    dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
2954    if not pipe.columns.get('datetime', None):
2955        dt_col = pipe.guess_datetime()
2956        dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
2957        is_guess = True
2958    else:
2959        dt_col = pipe.get_columns('datetime')
2960        dt_name = sql_item_name(dt_col, self.flavor, None)
2961        is_guess = False
2962
2963    if begin is not None or end is not None:
2964        if is_guess:
2965            if dt_col is None:
2966                warn(
2967                    f"No datetime could be determined for {pipe}."
2968                    + "\n    Ignoring datetime bounds...",
2969                    stack=False,
2970                )
2971                begin, end = None, None
2972            else:
2973                warn(
2974                    f"A datetime wasn't specified for {pipe}.\n"
2975                    + f"    Using column \"{dt_col}\" for datetime bounds...",
2976                    stack=False,
2977                )
2978
2979    valid_params = {}
2980    if params is not None:
2981        existing_cols = pipe.get_columns_types(debug=debug)
2982        valid_params = {k: v for k, v in params.items() if k in existing_cols}
2983    clear_query = (
2984        f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n"
2985        + ('\n    AND ' + build_where(valid_params, self, with_where=False) if valid_params else '')
2986        + (
2987            (
2988                f'\n    AND {dt_name} >= '
2989                + dateadd_str(self.flavor, 'day', 0, begin, db_type=dt_db_type)
2990            )
2991            if begin is not None
2992            else ''
2993        ) + (
2994            (
2995                f'\n    AND {dt_name} <  '
2996                + dateadd_str(self.flavor, 'day', 0, end, db_type=dt_db_type)
2997            )
2998            if end is not None
2999            else ''
3000        )
3001    )
3002    success = self.exec(clear_query, silent=True, debug=debug) is not None
3003    msg = "Success" if success else f"Failed to clear {pipe}."
3004    return success, msg

Delete a pipe's data within a bounded or unbounded interval without dropping the table.

Parameters
  • pipe (mrsm.Pipe): The pipe to clear.
  • begin (Union[datetime, int, None], default None): Beginning datetime. Inclusive.
  • end (Union[datetime, int, None], default None): Ending datetime. Exclusive.
  • params (Optional[Dict[str, Any]], default None): See meerschaum.utils.sql.build_where.
def deduplicate_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
3608def deduplicate_pipe(
3609    self,
3610    pipe: mrsm.Pipe,
3611    begin: Union[datetime, int, None] = None,
3612    end: Union[datetime, int, None] = None,
3613    params: Optional[Dict[str, Any]] = None,
3614    debug: bool = False,
3615    **kwargs: Any
3616) -> SuccessTuple:
3617    """
3618    Delete duplicate values within a pipe's table.
3619
3620    Parameters
3621    ----------
3622    pipe: mrsm.Pipe
3623        The pipe whose table to deduplicate.
3624
3625    begin: Union[datetime, int, None], default None
3626        If provided, only deduplicate values greater than or equal to this value.
3627
3628    end: Union[datetime, int, None], default None
3629        If provided, only deduplicate values less than this value.
3630
3631    params: Optional[Dict[str, Any]], default None
3632        If provided, further limit deduplication to values which match this query dictionary.
3633
3634    debug: bool, default False
3635        Verbosity toggle.
3636
3637    Returns
3638    -------
3639    A `SuccessTuple` indicating success.
3640    """
3641    from meerschaum.utils.sql import (
3642        sql_item_name,
3643        get_rename_table_queries,
3644        DROP_IF_EXISTS_FLAVORS,
3645        get_create_table_query,
3646        format_cte_subquery,
3647        get_null_replacement,
3648    )
3649    from meerschaum.utils.misc import generate_password, flatten_list
3650
3651    pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
3652
3653    if not pipe.exists(debug=debug):
3654        return False, f"Table {pipe_table_name} does not exist."
3655
3656    dt_col = pipe.columns.get('datetime', None)
3657    cols_types = pipe.get_columns_types(debug=debug)
3658    existing_cols = pipe.get_columns_types(debug=debug)
3659
3660    get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}"
3661    old_rowcount = self.value(get_rowcount_query, debug=debug)
3662    if old_rowcount is None:
3663        return False, f"Failed to get rowcount for table {pipe_table_name}."
3664
3665    ### Non-datetime indices that in fact exist.
3666    indices = [
3667        col
3668        for key, col in pipe.columns.items()
3669        if col and col != dt_col and col in cols_types
3670    ]
3671    indices_names = [sql_item_name(index_col, self.flavor, None) for index_col in indices]
3672    existing_cols_names = [sql_item_name(col, self.flavor, None) for col in existing_cols]
3673    duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor, None)
3674    previous_row_number_name = sql_item_name('prev_row_num', self.flavor, None)
3675
3676    index_list_str = (
3677        sql_item_name(dt_col, self.flavor, None)
3678        if dt_col
3679        else ''
3680    )
3681    index_list_str_ordered = (
3682        (
3683            sql_item_name(dt_col, self.flavor, None) + " DESC"
3684        )
3685        if dt_col
3686        else ''
3687    )
3688    if indices:
3689        index_list_str += ', ' + ', '.join(indices_names)
3690        index_list_str_ordered += ', ' + ', '.join(indices_names)
3691    if index_list_str.startswith(','):
3692        index_list_str = index_list_str.lstrip(',').lstrip()
3693    if index_list_str_ordered.startswith(','):
3694        index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip()
3695
3696    cols_list_str = ', '.join(existing_cols_names)
3697
3698    try:
3699        ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()).
3700        is_old_mysql = (
3701            self.flavor in ('mysql', 'mariadb')
3702            and
3703            int(self.db_version.split('.')[0]) < 8
3704        )
3705    except Exception:
3706        is_old_mysql = False
3707
3708    src_query = f"""
3709        SELECT
3710            {cols_list_str},
3711            ROW_NUMBER() OVER (
3712                PARTITION BY
3713                {index_list_str}
3714                ORDER BY {index_list_str_ordered}
3715            ) AS {duplicate_row_number_name}
3716        FROM {pipe_table_name}
3717    """
3718    duplicates_cte_subquery = format_cte_subquery(
3719        src_query,
3720        self.flavor,
3721        sub_name = 'src',
3722        cols_to_select = cols_list_str,
3723    ) + f"""
3724        WHERE {duplicate_row_number_name} = 1
3725        """
3726    old_mysql_query = (
3727        f"""
3728        SELECT
3729            {index_list_str}
3730        FROM (
3731          SELECT
3732            {index_list_str},
3733            IF(
3734                @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')},
3735                @{duplicate_row_number_name} := 0,
3736                @{duplicate_row_number_name}
3737            ),
3738            @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')},
3739            @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """
3740        + f"""{duplicate_row_number_name}
3741          FROM
3742            {pipe_table_name},
3743            (
3744                SELECT @{duplicate_row_number_name} := 0
3745            ) AS {duplicate_row_number_name},
3746            (
3747                SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}'
3748            ) AS {previous_row_number_name}
3749          ORDER BY {index_list_str_ordered}
3750        ) AS t
3751        WHERE {duplicate_row_number_name} = 1
3752        """
3753    )
3754    if is_old_mysql:
3755        duplicates_cte_subquery = old_mysql_query
3756
3757    session_id = generate_password(3)
3758
3759    dedup_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='dedup')
3760    temp_old_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='old')
3761    temp_old_table_name = sql_item_name(temp_old_table, self.flavor, self.get_pipe_schema(pipe))
3762
3763    create_temporary_table_query = get_create_table_query(
3764        duplicates_cte_subquery,
3765        dedup_table,
3766        self.flavor,
3767    ) + f"""
3768    ORDER BY {index_list_str_ordered}
3769    """
3770    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
3771    alter_queries = flatten_list([
3772        get_rename_table_queries(
3773            pipe.target,
3774            temp_old_table,
3775            self.flavor,
3776            schema=self.get_pipe_schema(pipe),
3777        ),
3778        get_rename_table_queries(
3779            dedup_table,
3780            pipe.target,
3781            self.flavor,
3782            schema=self.get_pipe_schema(pipe),
3783        ),
3784        f"DROP TABLE {if_exists_str} {temp_old_table_name}",
3785    ])
3786
3787    self._log_temporary_tables_creation(temp_old_table, create=(not pipe.temporary), debug=debug)
3788    create_temporary_result = self.execute(create_temporary_table_query, debug=debug)
3789    if create_temporary_result is None:
3790        return False, f"Failed to deduplicate table {pipe_table_name}."
3791
3792    results = self.exec_queries(
3793        alter_queries,
3794        break_on_error=True,
3795        rollback=True,
3796        debug=debug,
3797    )
3798
3799    fail_query = None
3800    for result, query in zip(results, alter_queries):
3801        if result is None:
3802            fail_query = query
3803            break
3804    success = fail_query is None
3805
3806    new_rowcount = (
3807        self.value(get_rowcount_query, debug=debug)
3808        if success
3809        else None
3810    )
3811
3812    msg = (
3813        (
3814            f"Successfully deduplicated table {pipe_table_name}"
3815            + (
3816                f"\nfrom {old_rowcount:,} to {new_rowcount:,} rows"
3817                if old_rowcount != new_rowcount
3818                else ''
3819            ) + '.'
3820        )
3821        if success
3822        else f"Failed to execute query:\n{fail_query}"
3823    )
3824    return success, msg

Delete duplicate values within a pipe's table.

Parameters
  • pipe (mrsm.Pipe): The pipe whose table to deduplicate.
  • begin (Union[datetime, int, None], default None): If provided, only deduplicate values greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If provided, only deduplicate values less than this value.
  • params (Optional[Dict[str, Any]], default None): If provided, further limit deduplication to values which match this query dictionary.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple indicating success.
def get_pipe_table( self, pipe: meerschaum.Pipe, debug: bool = False) -> "Union['sqlalchemy.Table', None]":
3007def get_pipe_table(
3008    self,
3009    pipe: mrsm.Pipe,
3010    debug: bool = False,
3011) -> Union['sqlalchemy.Table', None]:
3012    """
3013    Return the `sqlalchemy.Table` object for a `mrsm.Pipe`.
3014
3015    Parameters
3016    ----------
3017    pipe: mrsm.Pipe:
3018        The pipe in question.
3019
3020    Returns
3021    -------
3022    A `sqlalchemy.Table` object. 
3023
3024    """
3025    from meerschaum.utils.sql import get_sqlalchemy_table
3026    if not pipe.exists(debug=debug):
3027        return None
3028    return get_sqlalchemy_table(
3029        pipe.target,
3030        connector=self,
3031        schema=self.get_pipe_schema(pipe),
3032        debug=debug,
3033        refresh=True,
3034    )

Return the sqlalchemy.Table object for a mrsm.Pipe.

Parameters
  • pipe (mrsm.Pipe:): The pipe in question.
Returns
  • A sqlalchemy.Table object.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, str]:
3037def get_pipe_columns_types(
3038    self,
3039    pipe: mrsm.Pipe,
3040    debug: bool = False,
3041) -> Dict[str, str]:
3042    """
3043    Get the pipe's columns and types.
3044
3045    Parameters
3046    ----------
3047    pipe: mrsm.Pipe:
3048        The pipe to get the columns for.
3049
3050    Returns
3051    -------
3052    A dictionary of columns names (`str`) and types (`str`).
3053
3054    Examples
3055    --------
3056    >>> conn.get_pipe_columns_types(pipe)
3057    {
3058      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
3059      'id': 'BIGINT',
3060      'val': 'DOUBLE PRECISION',
3061    }
3062    >>> 
3063    """
3064    from meerschaum.utils.sql import get_table_cols_types
3065    if not pipe.exists(debug=debug):
3066        return {}
3067
3068    if self.flavor not in ('oracle', 'mysql', 'mariadb', 'sqlite'):
3069        return get_table_cols_types(
3070            pipe.target,
3071            self,
3072            flavor=self.flavor,
3073            schema=self.get_pipe_schema(pipe),
3074            debug=debug,
3075        )
3076
3077    table_columns = {}
3078    try:
3079        pipe_table = self.get_pipe_table(pipe, debug=debug)
3080        if pipe_table is None:
3081            return {}
3082        for col in pipe_table.columns:
3083            table_columns[str(col.name)] = str(col.type)
3084    except Exception as e:
3085        import traceback
3086        traceback.print_exc()
3087        warn(e)
3088        table_columns = {}
3089
3090    return table_columns

Get the pipe's columns and types.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get the columns for.
Returns
  • A dictionary of columns names (str) and types (str).
Examples
>>> conn.get_pipe_columns_types(pipe)
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_to_sql_dtype( self, pipe: meerschaum.Pipe, df: "'pd.DataFrame'", update_dtypes: bool = True) -> "Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']":
3552def get_to_sql_dtype(
3553    self,
3554    pipe: 'mrsm.Pipe',
3555    df: 'pd.DataFrame',
3556    update_dtypes: bool = True,
3557) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']:
3558    """
3559    Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`.
3560
3561    Parameters
3562    ----------
3563    pipe: mrsm.Pipe
3564        The pipe which may contain a `dtypes` parameter.
3565
3566    df: pd.DataFrame
3567        The DataFrame to be pushed via `to_sql()`.
3568
3569    update_dtypes: bool, default True
3570        If `True`, patch the pipe's dtypes onto the DataFrame's dtypes.
3571
3572    Returns
3573    -------
3574    A dictionary with `sqlalchemy` datatypes.
3575
3576    Examples
3577    --------
3578    >>> import pandas as pd
3579    >>> import meerschaum as mrsm
3580    >>> 
3581    >>> conn = mrsm.get_connector('sql:memory')
3582    >>> df = pd.DataFrame([{'a': {'b': 1}}])
3583    >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
3584    >>> get_to_sql_dtype(pipe, df)
3585    {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
3586    """
3587    from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols, get_uuid_cols
3588    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
3589    df_dtypes = {
3590        col: str(typ)
3591        for col, typ in df.dtypes.items()
3592    }
3593    json_cols = get_json_cols(df)
3594    numeric_cols = get_numeric_cols(df)
3595    uuid_cols = get_uuid_cols(df)
3596    df_dtypes.update({col: 'json' for col in json_cols})
3597    df_dtypes.update({col: 'numeric' for col in numeric_cols})
3598    df_dtypes.update({col: 'uuid' for col in uuid_cols})
3599    if update_dtypes:
3600        df_dtypes.update(pipe.dtypes)
3601    return {
3602        col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True)
3603        for col, typ in df_dtypes.items()
3604        if col and typ
3605    }

Given a pipe and DataFrame, return the dtype dictionary for to_sql().

Parameters
  • pipe (mrsm.Pipe): The pipe which may contain a dtypes parameter.
  • df (pd.DataFrame): The DataFrame to be pushed via to_sql().
  • update_dtypes (bool, default True): If True, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
  • A dictionary with sqlalchemy datatypes.
Examples
>>> import pandas as pd
>>> import meerschaum as mrsm
>>> 
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
def get_pipe_schema(self, pipe: meerschaum.Pipe) -> Optional[str]:
3827def get_pipe_schema(self, pipe: mrsm.Pipe) -> Union[str, None]:
3828    """
3829    Return the schema to use for this pipe.
3830    First check `pipe.parameters['schema']`, then check `self.schema`.
3831
3832    Parameters
3833    ----------
3834    pipe: mrsm.Pipe
3835        The pipe which may contain a configured schema.
3836
3837    Returns
3838    -------
3839    A schema string or `None` if nothing is configured.
3840    """
3841    return pipe.parameters.get('schema', self.schema)

Return the schema to use for this pipe. First check pipe.parameters['schema'], then check self.schema.

Parameters
  • pipe (mrsm.Pipe): The pipe which may contain a configured schema.
Returns
  • A schema string or None if nothing is configured.
def create_pipe_table_from_df( self, pipe: meerschaum.Pipe, df: "'pd.DataFrame'", debug: bool = False) -> Tuple[bool, str]:
1521def create_pipe_table_from_df(
1522    self,
1523    pipe: mrsm.Pipe,
1524    df: 'pd.DataFrame',
1525    debug: bool = False,
1526) -> mrsm.SuccessTuple:
1527    """
1528    Create a pipe's table from its configured dtypes and an incoming dataframe.
1529    """
1530    from meerschaum.utils.dataframe import (
1531        get_json_cols,
1532        get_numeric_cols,
1533        get_uuid_cols,
1534        get_datetime_cols,
1535        get_bytes_cols,
1536    )
1537    from meerschaum.utils.sql import get_create_table_queries, sql_item_name
1538    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
1539    primary_key = pipe.columns.get('primary', None)
1540    primary_key_typ = (
1541        pipe.dtypes.get(primary_key, str(df.dtypes.get(primary_key, 'int')))
1542        if primary_key
1543        else None
1544    )
1545    primary_key_db_type = (
1546        get_db_type_from_pd_type(primary_key_typ, self.flavor)
1547        if primary_key
1548        else None
1549    )
1550    dt_col = pipe.columns.get('datetime', None)
1551    new_dtypes = {
1552        **{
1553            col: str(typ)
1554            for col, typ in df.dtypes.items()
1555        },
1556        **{
1557            col: str(df.dtypes.get(col, 'int'))
1558            for col_ix, col in pipe.columns.items()
1559            if col and col_ix != 'primary'
1560        },
1561        **{
1562            col: 'uuid'
1563            for col in get_uuid_cols(df)
1564        },
1565        **{
1566            col: 'json'
1567            for col in get_json_cols(df)
1568        },
1569        **{
1570            col: 'numeric'
1571            for col in get_numeric_cols(df)
1572        },
1573        **{
1574            col: 'bytes'
1575            for col in get_bytes_cols(df)
1576        },
1577        **{
1578            col: 'datetime64[ns, UTC]'
1579            for col in get_datetime_cols(df, timezone_aware=True, timezone_naive=False)
1580        },
1581        **{
1582            col: 'datetime64[ns]'
1583            for col in get_datetime_cols(df, timezone_aware=False, timezone_naive=True)
1584        },
1585        **pipe.dtypes
1586    }
1587    autoincrement = (
1588        pipe.parameters.get('autoincrement', False)
1589        or (primary_key and primary_key not in new_dtypes)
1590    )
1591    if autoincrement:
1592        _ = new_dtypes.pop(primary_key, None)
1593
1594    create_table_queries = get_create_table_queries(
1595        new_dtypes,
1596        pipe.target,
1597        self.flavor,
1598        schema=self.get_pipe_schema(pipe),
1599        primary_key=primary_key,
1600        primary_key_db_type=primary_key_db_type,
1601        datetime_column=dt_col,
1602    )
1603    success = all(
1604        self.exec_queries(create_table_queries, break_on_error=True, rollback=True, debug=debug)
1605    )
1606    target_name = sql_item_name(pipe.target, schema=self.get_pipe_schema(pipe), flavor=self.flavor)
1607    msg = (
1608        "Success"
1609        if success
1610        else f"Failed to create {target_name}."
1611    )
1612    return success, msg

Create a pipe's table from its configured dtypes and an incoming dataframe.

def get_pipe_columns_indices( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[Dict[str, str]]]:
3093def get_pipe_columns_indices(
3094    self,
3095    pipe: mrsm.Pipe,
3096    debug: bool = False,
3097) -> Dict[str, List[Dict[str, str]]]:
3098    """
3099    Return a dictionary mapping columns to the indices created on those columns.
3100
3101    Parameters
3102    ----------
3103    pipe: mrsm.Pipe
3104        The pipe to be queried against.
3105
3106    Returns
3107    -------
3108    A dictionary mapping columns names to lists of dictionaries.
3109    The dictionaries in the lists contain the name and type of the indices.
3110    """
3111    if pipe.__dict__.get('_skip_check_indices', False):
3112        return {}
3113    from meerschaum.utils.sql import get_table_cols_indices
3114    return get_table_cols_indices(
3115        pipe.target,
3116        self,
3117        flavor=self.flavor,
3118        schema=self.get_pipe_schema(pipe),
3119        debug=debug,
3120    )

Return a dictionary mapping columns to the indices created on those columns.

Parameters
  • pipe (mrsm.Pipe): The pipe to be queried against.
Returns
  • A dictionary mapping columns names to lists of dictionaries.
  • The dictionaries in the lists contain the name and type of the indices.
@staticmethod
def get_temporary_target( target: str, transact_id: 'Optional[str, None]' = None, label: Optional[str] = None, separator: Optional[str] = None) -> str:
3844@staticmethod
3845def get_temporary_target(
3846    target: str,
3847    transact_id: Optional[str, None] = None,
3848    label: Optional[str] = None,
3849    separator: Optional[str] = None,
3850) -> str:
3851    """
3852    Return a unique(ish) temporary target for a pipe.
3853    """
3854    from meerschaum.utils.misc import generate_password
3855    temp_target_cf = (
3856        mrsm.get_config('system', 'connectors', 'sql', 'instance', 'temporary_target') or {}
3857    )
3858    transaction_id_len = temp_target_cf.get('transaction_id_length', 3)
3859    transact_id = transact_id or generate_password(transaction_id_len)
3860    temp_prefix = temp_target_cf.get('prefix', '_')
3861    separator = separator or temp_target_cf.get('separator', '_')
3862    return (
3863        temp_prefix
3864        + target
3865        + separator
3866        + transact_id
3867        + ((separator + label) if label else '')
3868    )

Return a unique(ish) temporary target for a pipe.

def create_pipe_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, debug: bool = False) -> Tuple[bool, str]:
314def create_pipe_indices(
315    self,
316    pipe: mrsm.Pipe,
317    columns: Optional[List[str]] = None,
318    debug: bool = False,
319) -> SuccessTuple:
320    """
321    Create a pipe's indices.
322    """
323    success = self.create_indices(pipe, columns=columns, debug=debug)
324    msg = (
325        "Success"
326        if success
327        else f"Failed to create indices for {pipe}."
328    )
329    return success, msg

Create a pipe's indices.

def drop_pipe_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, debug: bool = False) -> Tuple[bool, str]:
368def drop_pipe_indices(
369    self,
370    pipe: mrsm.Pipe,
371    columns: Optional[List[str]] = None,
372    debug: bool = False,
373) -> SuccessTuple:
374    """
375    Drop a pipe's indices.
376    """
377    success = self.drop_indices(pipe, columns=columns, debug=debug)
378    msg = (
379        "Success"
380        if success
381        else f"Failed to drop indices for {pipe}."
382    )
383    return success, msg

Drop a pipe's indices.

def get_pipe_index_names(self, pipe: meerschaum.Pipe) -> Dict[str, str]:
421def get_pipe_index_names(self, pipe: mrsm.Pipe) -> Dict[str, str]:
422    """
423    Return a dictionary mapping index keys to their names on the database.
424
425    Returns
426    -------
427    A dictionary of index keys to column names.
428    """
429    from meerschaum.utils.sql import DEFAULT_SCHEMA_FLAVORS
430    _parameters = pipe.parameters
431    _index_template = _parameters.get('index_template', "IX_{schema_str}{target}_{column_names}")
432    _schema = self.get_pipe_schema(pipe)
433    if _schema is None:
434        _schema = (
435            DEFAULT_SCHEMA_FLAVORS.get(self.flavor, None)
436            if self.flavor != 'mssql'
437            else None
438        )
439    schema_str = '' if _schema is None else f'{_schema}_'
440    schema_str = ''
441    _indices = pipe.indices
442    _target = pipe.target
443    _column_names = {
444        ix: (
445            '_'.join(cols)
446            if isinstance(cols, (list, tuple))
447            else str(cols)
448        )
449        for ix, cols in _indices.items()
450        if cols
451    }
452    _index_names = {
453        ix: _index_template.format(
454            target=_target,
455            column_names=column_names,
456            connector_keys=pipe.connector_keys,
457            metric_key=pipe.metric_key,
458            location_key=pipe.location_key,
459            schema_str=schema_str,
460        )
461        for ix, column_names in _column_names.items()
462    }
463    ### NOTE: Skip any duplicate indices.
464    seen_index_names = {}
465    for ix, index_name in _index_names.items():
466        if index_name in seen_index_names:
467            continue
468        seen_index_names[index_name] = ix
469    return {
470        ix: index_name
471        for index_name, ix in seen_index_names.items()
472    }

Return a dictionary mapping index keys to their names on the database.

Returns
  • A dictionary of index keys to column names.
def register_plugin( self, plugin: meerschaum.Plugin, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
17def register_plugin(
18    self,
19    plugin: 'mrsm.core.Plugin',
20    force: bool = False,
21    debug: bool = False,
22    **kw: Any
23) -> SuccessTuple:
24    """Register a new plugin to the plugins table."""
25    from meerschaum.utils.packages import attempt_import
26    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
27    from meerschaum.utils.sql import json_flavors
28    from meerschaum.connectors.sql.tables import get_tables
29    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
30
31    old_id = self.get_plugin_id(plugin, debug=debug)
32
33    ### Check for version conflict. May be overridden with `--force`.
34    if old_id is not None and not force:
35        old_version = self.get_plugin_version(plugin, debug=debug)
36        new_version = plugin.version
37        if old_version is None:
38            old_version = ''
39        if new_version is None:
40            new_version = ''
41
42        ### verify that the new version is greater than the old
43        packaging_version = attempt_import('packaging.version')
44        if (
45            old_version and new_version
46            and packaging_version.parse(old_version) >= packaging_version.parse(new_version)
47        ):
48            return False, (
49                f"Version '{new_version}' of plugin '{plugin}' " +
50                f"must be greater than existing version '{old_version}'."
51            )
52
53    bind_variables = {
54        'plugin_name': plugin.name,
55        'version': plugin.version,
56        'attributes': (
57            json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes
58        ),
59        'user_id': plugin.user_id,
60    }
61
62    if old_id is None:
63        query = sqlalchemy.insert(plugins_tbl).values(**bind_variables)
64    else:
65        query = (
66            sqlalchemy.update(plugins_tbl)
67            .values(**bind_variables)
68            .where(plugins_tbl.c.plugin_id == old_id)
69        )
70
71    result = self.exec(query, debug=debug)
72    if result is None:
73        return False, f"Failed to register plugin '{plugin}'."
74    return True, f"Successfully registered plugin '{plugin}'."

Register a new plugin to the plugins table.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
243def delete_plugin(
244    self,
245    plugin: 'mrsm.core.Plugin',
246    debug: bool = False,
247    **kw: Any
248) -> SuccessTuple:
249    """Delete a plugin from the plugins table."""
250    from meerschaum.utils.packages import attempt_import
251    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
252    from meerschaum.connectors.sql.tables import get_tables
253    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
254
255    plugin_id = self.get_plugin_id(plugin, debug=debug)
256    if plugin_id is None:
257        return True, f"Plugin '{plugin}' was not registered."
258
259    query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id)
260    result = self.exec(query, debug=debug)
261    if result is None:
262        return False, f"Failed to delete plugin '{plugin}'."
263    return True, f"Successfully deleted plugin '{plugin}'."

Delete a plugin from the plugins table.

def get_plugin_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[int]:
76def get_plugin_id(
77    self,
78    plugin: 'mrsm.core.Plugin',
79    debug: bool = False
80) -> Optional[int]:
81    """
82    Return a plugin's ID.
83    """
84    ### ensure plugins table exists
85    from meerschaum.connectors.sql.tables import get_tables
86    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
87    from meerschaum.utils.packages import attempt_import
88    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
89
90    query = (
91        sqlalchemy
92        .select(plugins_tbl.c.plugin_id)
93        .where(plugins_tbl.c.plugin_name == plugin.name)
94    )
95    
96    try:
97        return int(self.value(query, debug=debug))
98    except Exception:
99        return None

Return a plugin's ID.

def get_plugin_version( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
102def get_plugin_version(
103    self,
104    plugin: 'mrsm.core.Plugin',
105    debug: bool = False
106) -> Optional[str]:
107    """
108    Return a plugin's version.
109    """
110    ### ensure plugins table exists
111    from meerschaum.connectors.sql.tables import get_tables
112    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
113    from meerschaum.utils.packages import attempt_import
114    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
115    query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name)
116    return self.value(query, debug=debug)

Return a plugin's version.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) -> List[str]:
196def get_plugins(
197    self,
198    user_id: Optional[int] = None,
199    search_term: Optional[str] = None,
200    debug: bool = False,
201    **kw: Any
202) -> List[str]:
203    """
204    Return a list of all registered plugins.
205
206    Parameters
207    ----------
208    user_id: Optional[int], default None
209        If specified, filter plugins by a specific `user_id`.
210
211    search_term: Optional[str], default None
212        If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins.
213
214
215    Returns
216    -------
217    A list of plugin names.
218    """
219    ### ensure plugins table exists
220    from meerschaum.connectors.sql.tables import get_tables
221    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
222    from meerschaum.utils.packages import attempt_import
223    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
224
225    query = sqlalchemy.select(plugins_tbl.c.plugin_name)
226    if user_id is not None:
227        query = query.where(plugins_tbl.c.user_id == user_id)
228    if search_term is not None:
229        query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%'))
230
231    rows = (
232        self.execute(query).fetchall()
233        if self.flavor != 'duckdb'
234        else [
235            (row['plugin_name'],)
236            for row in self.read(query).to_dict(orient='records')
237        ]
238    )
239    
240    return [row[0] for row in rows]

Return a list of all registered plugins.

Parameters
  • user_id (Optional[int], default None): If specified, filter plugins by a specific user_id.
  • search_term (Optional[str], default None): If specified, add a WHERE plugin_name LIKE '{search_term}%' clause to filter the plugins.
Returns
  • A list of plugin names.
def get_plugin_user_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[int]:
118def get_plugin_user_id(
119    self,
120    plugin: 'mrsm.core.Plugin',
121    debug: bool = False
122) -> Optional[int]:
123    """
124    Return a plugin's user ID.
125    """
126    ### ensure plugins table exists
127    from meerschaum.connectors.sql.tables import get_tables
128    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
129    from meerschaum.utils.packages import attempt_import
130    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
131
132    query = (
133        sqlalchemy
134        .select(plugins_tbl.c.user_id)
135        .where(plugins_tbl.c.plugin_name == plugin.name)
136    )
137
138    try:
139        return int(self.value(query, debug=debug))
140    except Exception:
141        return None

Return a plugin's user ID.

def get_plugin_username( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
143def get_plugin_username(
144    self,
145    plugin: 'mrsm.core.Plugin',
146    debug: bool = False
147) -> Optional[str]:
148    """
149    Return the username of a plugin's owner.
150    """
151    ### ensure plugins table exists
152    from meerschaum.connectors.sql.tables import get_tables
153    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
154    users = get_tables(mrsm_instance=self, debug=debug)['users']
155    from meerschaum.utils.packages import attempt_import
156    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
157
158    query = (
159        sqlalchemy.select(users.c.username)
160        .where(
161            users.c.user_id == plugins_tbl.c.user_id
162            and plugins_tbl.c.plugin_name == plugin.name
163        )
164    )
165
166    return self.value(query, debug=debug)

Return the username of a plugin's owner.

def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
169def get_plugin_attributes(
170    self,
171    plugin: 'mrsm.core.Plugin',
172    debug: bool = False
173) -> Dict[str, Any]:
174    """
175    Return the attributes of a plugin.
176    """
177    ### ensure plugins table exists
178    from meerschaum.connectors.sql.tables import get_tables
179    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
180    from meerschaum.utils.packages import attempt_import
181    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
182
183    query = (
184        sqlalchemy
185        .select(plugins_tbl.c.attributes)
186        .where(plugins_tbl.c.plugin_name == plugin.name)
187    )
188
189    _attr = self.value(query, debug=debug)
190    if isinstance(_attr, str):
191        _attr = json.loads(_attr)
192    elif _attr is None:
193        _attr = {}
194    return _attr

Return the attributes of a plugin.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
16def register_user(
17    self,
18    user: mrsm.core.User,
19    debug: bool = False,
20    **kw: Any
21) -> SuccessTuple:
22    """Register a new user."""
23    from meerschaum.utils.packages import attempt_import
24    from meerschaum.utils.sql import json_flavors
25    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
26
27    valid_tuple = valid_username(user.username)
28    if not valid_tuple[0]:
29        return valid_tuple
30
31    old_id = self.get_user_id(user, debug=debug)
32
33    if old_id is not None:
34        return False, f"User '{user}' already exists."
35
36    ### ensure users table exists
37    from meerschaum.connectors.sql.tables import get_tables
38    tables = get_tables(mrsm_instance=self, debug=debug)
39
40    import json
41    bind_variables = {
42        'username': user.username,
43        'email': user.email,
44        'password_hash': user.password_hash,
45        'user_type': user.type,
46        'attributes': (
47            json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes
48        ),
49    }
50    if old_id is not None:
51        return False, f"User '{user.username}' already exists."
52    if old_id is None:
53        query = (
54            sqlalchemy.insert(tables['users']).
55            values(**bind_variables)
56        )
57
58    result = self.exec(query, debug=debug)
59    if result is None:
60        return False, f"Failed to register user '{user}'."
61    return True, f"Successfully registered user '{user}'."

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[int]:
152def get_user_id(
153    self,
154    user: 'mrsm.core.User',
155    debug: bool = False
156) -> Optional[int]:
157    """If a user is registered, return the `user_id`."""
158    ### ensure users table exists
159    from meerschaum.utils.packages import attempt_import
160    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
161    from meerschaum.connectors.sql.tables import get_tables
162    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
163
164    query = (
165        sqlalchemy.select(users_tbl.c.user_id)
166        .where(users_tbl.c.username == user.username)
167    )
168
169    result = self.value(query, debug=debug)
170    if result is not None:
171        return int(result)
172    return None

If a user is registered, return the user_id.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
246def get_users(
247    self,
248    debug: bool = False,
249    **kw: Any
250) -> List[str]:
251    """
252    Get the registered usernames.
253    """
254    ### ensure users table exists
255    from meerschaum.connectors.sql.tables import get_tables
256    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
257    from meerschaum.utils.packages import attempt_import
258    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
259
260    query = sqlalchemy.select(users_tbl.c.username)
261
262    return list(self.read(query, debug=debug)['username'])

Get the registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 98def edit_user(
 99    self,
100    user: 'mrsm.core.User',
101    debug: bool = False,
102    **kw: Any
103) -> SuccessTuple:
104    """Update an existing user's metadata."""
105    from meerschaum.utils.packages import attempt_import
106    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
107    from meerschaum.connectors.sql.tables import get_tables
108    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
109
110    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
111    if user_id is None:
112        return False, (
113            f"User '{user.username}' does not exist. " +
114            f"Register user '{user.username}' before editing."
115        )
116    user.user_id = user_id
117
118    import json
119    valid_tuple = valid_username(user.username)
120    if not valid_tuple[0]:
121        return valid_tuple
122
123    bind_variables = {
124        'user_id' : user_id,
125        'username' : user.username,
126    }
127    if user.password != '':
128        bind_variables['password_hash'] = user.password_hash
129    if user.email != '':
130        bind_variables['email'] = user.email
131    if user.attributes is not None and user.attributes != {}:
132        bind_variables['attributes'] = (
133            json.dumps(user.attributes) if self.flavor in ('duckdb',)
134            else user.attributes
135        )
136    if user.type != '':
137        bind_variables['user_type'] = user.type
138
139    query = (
140        sqlalchemy
141        .update(users_tbl)
142        .values(**bind_variables)
143        .where(users_tbl.c.user_id == user_id)
144    )
145
146    result = self.exec(query, debug=debug)
147    if result is None:
148        return False, f"Failed to edit user '{user}'."
149    return True, f"Successfully edited user '{user}'."

Update an existing user's metadata.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Tuple[bool, str]:
214def delete_user(
215    self,
216    user: 'mrsm.core.User',
217    debug: bool = False
218) -> SuccessTuple:
219    """Delete a user's record from the users table."""
220    ### ensure users table exists
221    from meerschaum.connectors.sql.tables import get_tables
222    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
223    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
224    from meerschaum.utils.packages import attempt_import
225    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
226
227    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
228
229    if user_id is None:
230        return False, f"User '{user.username}' is not registered and cannot be deleted."
231
232    query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id)
233
234    result = self.exec(query, debug=debug)
235    if result is None:
236        return False, f"Failed to delete user '{user}'."
237
238    query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id)
239    result = self.exec(query, debug=debug)
240    if result is None:
241        return False, f"Failed to delete plugins of user '{user}'."
242
243    return True, f"Successfully deleted user '{user}'"

Delete a user's record from the users table.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
265def get_user_password_hash(
266    self,
267    user: 'mrsm.core.User',
268    debug: bool = False,
269    **kw: Any
270) -> Optional[str]:
271    """
272    Return the password has for a user.
273    **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it.
274    """
275    from meerschaum.utils.debug import dprint
276    from meerschaum.connectors.sql.tables import get_tables
277    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
278    from meerschaum.utils.packages import attempt_import
279    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
280
281    if user.user_id is not None:
282        user_id = user.user_id
283        if debug:
284            dprint(f"Already given user_id: {user_id}")
285    else:
286        if debug:
287            dprint("Fetching user_id...")
288        user_id = self.get_user_id(user, debug=debug)
289
290    if user_id is None:
291        return None
292
293    query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id)
294
295    return self.value(query, debug=debug)

Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
298def get_user_type(
299    self,
300    user: 'mrsm.core.User',
301    debug: bool = False,
302    **kw: Any
303) -> Optional[str]:
304    """
305    Return the user's type.
306    """
307    from meerschaum.connectors.sql.tables import get_tables
308    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
309    from meerschaum.utils.packages import attempt_import
310    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
311
312    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
313
314    if user_id is None:
315        return None
316
317    query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id)
318
319    return self.value(query, debug=debug)

Return the user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[Dict[str, Any]]:
174def get_user_attributes(
175    self,
176    user: 'mrsm.core.User',
177    debug: bool = False
178) -> Union[Dict[str, Any], None]:
179    """
180    Return the user's attributes.
181    """
182    ### ensure users table exists
183    from meerschaum.utils.warnings import warn
184    from meerschaum.utils.packages import attempt_import
185    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
186    from meerschaum.connectors.sql.tables import get_tables
187    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
188
189    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
190
191    query = (
192        sqlalchemy.select(users_tbl.c.attributes)
193        .where(users_tbl.c.user_id == user_id)
194    )
195
196    result = self.value(query, debug=debug)
197    if result is not None and not isinstance(result, dict):
198        try:
199            result = dict(result)
200            _parsed = True
201        except Exception:
202            _parsed = False
203        if not _parsed:
204            try:
205                import json
206                result = json.loads(result)
207                _parsed = True
208            except Exception:
209                _parsed = False
210        if not _parsed:
211            warn(f"Received unexpected type for attributes: {result}")
212    return result

Return the user's attributes.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[SQLConnector, Dict[str, Union[str, int]]]:
15@classmethod
16def from_uri(
17    cls,
18    uri: str,
19    label: Optional[str] = None,
20    as_dict: bool = False,
21) -> Union[
22    'meerschaum.connectors.SQLConnector',
23    Dict[str, Union[str, int]],
24]:
25    """
26    Create a new SQLConnector from a URI string.
27
28    Parameters
29    ----------
30    uri: str
31        The URI connection string.
32
33    label: Optional[str], default None
34        If provided, use this as the connector label.
35        Otherwise use the determined database name.
36
37    as_dict: bool, default False
38        If `True`, return a dictionary of the keyword arguments
39        necessary to create a new `SQLConnector`, otherwise create a new object.
40
41    Returns
42    -------
43    A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`).
44    """
45
46    params = cls.parse_uri(uri)
47    params['uri'] = uri
48    flavor = params.get('flavor', None)
49    if not flavor or flavor not in cls.flavor_configs:
50        error(f"Invalid flavor '{flavor}' detected from the provided URI.")
51
52    if 'database' not in params:
53        error("Unable to determine the database from the provided URI.")
54
55    if flavor in ('sqlite', 'duckdb'):
56        if params['database'] == ':memory:':
57            params['label'] = label or f'memory_{flavor}'
58        else:
59            params['label'] = label or params['database'].split(os.path.sep)[-1].lower()
60    else:
61        params['label'] = label or (
62            (
63                (params['username'] + '@' if 'username' in params else '')
64                + params.get('host', '')
65                + ('/' if 'host' in params else '')
66                + params.get('database', '')
67            ).lower()
68        )
69
70    return cls(**params) if not as_dict else params

Create a new SQLConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new SQLConnector, otherwise create a new object.
Returns
  • A new SQLConnector object or a dictionary of attributes (if as_dict is True).
@staticmethod
def parse_uri(uri: str) -> Dict[str, Any]:
 73@staticmethod
 74def parse_uri(uri: str) -> Dict[str, Any]:
 75    """
 76    Parse a URI string into a dictionary of parameters.
 77
 78    Parameters
 79    ----------
 80    uri: str
 81        The database connection URI.
 82
 83    Returns
 84    -------
 85    A dictionary of attributes.
 86
 87    Examples
 88    --------
 89    >>> parse_uri('sqlite:////home/foo/bar.db')
 90    {'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
 91    >>> parse_uri(
 92    ...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
 93    ...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
 94    ... )
 95    {'host': 'localhost', 'database': 'master', 'username': 'sa',
 96    'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
 97    'driver': 'ODBC Driver 17 for SQL Server'}
 98    >>> 
 99    """
100    from urllib.parse import parse_qs, urlparse
101    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
102    parser = sqlalchemy.engine.url.make_url
103    params = parser(uri).translate_connect_args()
104    params['flavor'] = uri.split(':')[0].split('+')[0]
105    if params['flavor'] == 'postgres':
106        params['flavor'] = 'postgresql'
107    if '?' in uri:
108        parsed_uri = urlparse(uri)
109        for key, value in parse_qs(parsed_uri.query).items():
110            params.update({key: value[0]})
111
112        if '--search_path' in params.get('options', ''):
113            params.update({'schema': params['options'].replace('--search_path=', '', 1)})
114    return params

Parse a URI string into a dictionary of parameters.

Parameters
  • uri (str): The database connection URI.
Returns
  • A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>