meerschaum.connectors.sql

Subpackage for SQLConnector subclass

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Subpackage for SQLConnector subclass
 7"""
 8
 9from meerschaum.connectors.sql._SQLConnector import SQLConnector
10
11__all__ = ('SQLConnector',)
class SQLConnector(meerschaum.connectors.instance._InstanceConnector.InstanceConnector):
 20class SQLConnector(InstanceConnector):
 21    """
 22    Connect to SQL databases via `sqlalchemy`.
 23    
 24    SQLConnectors may be used as Meerschaum instance connectors.
 25    Read more about connectors and instances at
 26    https://meerschaum.io/reference/connectors/
 27
 28    """
 29
 30    from ._create_engine import flavor_configs, create_engine
 31    from ._sql import (
 32        read,
 33        value,
 34        exec,
 35        execute,
 36        to_sql,
 37        exec_queries,
 38        get_connection,
 39        _cleanup_connections,
 40        _init_geopackage_table,
 41    )
 42    from meerschaum.utils.sql import test_connection
 43    from ._fetch import fetch, get_pipe_metadef
 44    from ._cli import cli, _cli_exit
 45    from ._pipes import (
 46        fetch_pipes_keys,
 47        create_indices,
 48        drop_indices,
 49        get_create_index_queries,
 50        get_drop_index_queries,
 51        get_add_columns_queries,
 52        get_alter_columns_queries,
 53        delete_pipe,
 54        get_pipe_data,
 55        get_pipe_data_query,
 56        register_pipe,
 57        edit_pipe,
 58        get_pipe_id,
 59        get_pipe_attributes,
 60        sync_pipe,
 61        sync_pipe_inplace,
 62        get_sync_time,
 63        pipe_exists,
 64        get_pipe_rowcount,
 65        drop_pipe,
 66        clear_pipe,
 67        deduplicate_pipe,
 68        get_pipe_table,
 69        get_pipe_columns_types,
 70        get_to_sql_dtype,
 71        get_pipe_schema,
 72        create_pipe_table_from_df,
 73        get_pipe_columns_indices,
 74        get_temporary_target,
 75        create_pipe_indices,
 76        drop_pipe_indices,
 77        get_pipe_index_names,
 78    )
 79    from ._plugins import (
 80        get_plugins_pipe,
 81        register_plugin,
 82        delete_plugin,
 83        get_plugin_id,
 84        get_plugin_version,
 85        get_plugins,
 86        get_plugin_user_id,
 87        get_plugin_username,
 88        get_plugin_attributes,
 89    )
 90    from ._users import (
 91        get_users_pipe,
 92        register_user,
 93        get_user_id,
 94        get_users,
 95        edit_user,
 96        delete_user,
 97        get_user_password_hash,
 98        get_user_type,
 99        get_user_attributes,
100    )
101    from ._uri import from_uri, parse_uri
102    from ._instance import (
103        _log_temporary_tables_creation,
104        _drop_temporary_table,
105        _drop_temporary_tables,
106        _drop_old_temporary_tables,
107    )
108
109    def __init__(
110        self,
111        label: Optional[str] = None,
112        flavor: Optional[str] = None,
113        wait: bool = False,
114        connect: bool = False,
115        debug: bool = False,
116        **kw: Any
117    ):
118        """
119        Parameters
120        ----------
121        label: str, default 'main'
122            The identifying label for the connector.
123            E.g. for `sql:main`, 'main' is the label.
124            Defaults to 'main'.
125
126        flavor: Optional[str], default None
127            The database flavor, e.g.
128            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
129            To see supported flavors, run the `bootstrap connectors` command.
130
131        wait: bool, default False
132            If `True`, block until a database connection has been made.
133            Defaults to `False`.
134
135        connect: bool, default False
136            If `True`, immediately attempt to connect the database and raise
137            a warning if the connection fails.
138            Defaults to `False`.
139
140        debug: bool, default False
141            Verbosity toggle.
142            Defaults to `False`.
143
144        kw: Any
145            All other arguments will be passed to the connector's attributes.
146            Therefore, a connector may be made without being registered,
147            as long enough parameters are supplied to the constructor.
148        """
149        if 'uri' in kw:
150            uri = kw['uri']
151            if uri.startswith('postgres') and not uri.startswith('postgresql'):
152                uri = uri.replace('postgres', 'postgresql', 1)
153            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
154                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
155            if uri.startswith('timescaledb://'):
156                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
157                flavor = 'timescaledb'
158            if uri.startswith('timescaledb-ha://'):
159                uri = uri.replace('timescaledb-ha://', 'postgresql+psycopg://', 1)
160                flavor = 'timescaledb-ha'
161            if uri.startswith('postgis://'):
162                uri = uri.replace('postgis://', 'postgresql+psycopg://', 1)
163                flavor = 'postgis'
164            kw['uri'] = uri
165            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
166            label = label or from_uri_params.get('label', None)
167            _ = from_uri_params.pop('label', None)
168
169            ### Sometimes the flavor may be provided with a URI.
170            kw.update(from_uri_params)
171            if flavor:
172                kw['flavor'] = flavor
173
174        ### set __dict__ in base class
175        super().__init__(
176            'sql',
177            label = label or self.__dict__.get('label', None),
178            **kw
179        )
180
181        if self.__dict__.get('flavor', None) in ('sqlite', 'geopackage'):
182            self._reset_attributes()
183            self._set_attributes(
184                'sql',
185                label = label,
186                inherit_default = False,
187                **kw
188            )
189            ### For backwards compatability reasons, set the path for sql:local if its missing.
190            if self.label == 'local' and not self.__dict__.get('database', None):
191                from meerschaum.config._paths import SQLITE_DB_PATH
192                self.database = SQLITE_DB_PATH.as_posix()
193
194        ### ensure flavor and label are set accordingly
195        if 'flavor' not in self.__dict__:
196            if flavor is None and 'uri' not in self.__dict__:
197                raise ValueError(
198                    f"    Missing flavor. Provide flavor as a key for '{self}'."
199                )
200            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
201
202        if self.flavor == 'postgres':
203            self.flavor = 'postgresql'
204
205        self._debug = debug
206        ### Store the PID and thread at initialization
207        ### so we can dispose of the Pool in child processes or threads.
208        import os
209        import threading
210        self._pid = os.getpid()
211        self._thread_ident = threading.current_thread().ident
212        self._sessions = {}
213        self._locks = {'_sessions': threading.RLock(), }
214
215        ### verify the flavor's requirements are met
216        if self.flavor not in self.flavor_configs:
217            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
218        if not self.__dict__.get('uri'):
219            self.verify_attributes(
220                self.flavor_configs[self.flavor].get('requirements', set()),
221                debug=debug,
222            )
223
224        if wait:
225            from meerschaum.connectors.poll import retry_connect
226            retry_connect(connector=self, debug=debug)
227
228        if connect:
229            if not self.test_connection(debug=debug):
230                warn(f"Failed to connect with connector '{self}'!", stack=False)
231
232    @property
233    def Session(self):
234        if '_Session' not in self.__dict__:
235            if self.engine is None:
236                return None
237
238            from meerschaum.utils.packages import attempt_import
239            sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False)
240            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
241            self._Session = sqlalchemy_orm.scoped_session(session_factory)
242
243        return self._Session
244
245    @property
246    def engine(self):
247        """
248        Return the SQLAlchemy engine connected to the configured database.
249        """
250        import os
251        import threading
252        if '_engine' not in self.__dict__:
253            self._engine, self._engine_str = self.create_engine(include_uri=True)
254
255        same_process = os.getpid() == self._pid
256        same_thread = threading.current_thread().ident == self._thread_ident
257
258        ### handle child processes
259        if not same_process:
260            self._pid = os.getpid()
261            self._thread = threading.current_thread()
262            warn("Different PID detected. Disposing of connections...")
263            self._engine.dispose()
264
265        ### handle different threads
266        if not same_thread:
267            if self.flavor == 'duckdb':
268                warn("Different thread detected.")
269                self._engine.dispose()
270
271        return self._engine
272
273    @property
274    def DATABASE_URL(self) -> str:
275        """
276        Return the URI connection string (alias for `SQLConnector.URI`.
277        """
278        _ = self.engine
279        return str(self._engine_str)
280
281    @property
282    def URI(self) -> str:
283        """
284        Return the URI connection string.
285        """
286        _ = self.engine
287        return str(self._engine_str)
288
289    @property
290    def IS_THREAD_SAFE(self) -> str:
291        """
292        Return whether this connector may be multithreaded.
293        """
294        if self.flavor in ('duckdb', 'oracle'):
295            return False
296        if self.flavor in ('sqlite', 'geopackage'):
297            return ':memory:' not in self.URI
298        return True
299
300    @property
301    def metadata(self):
302        """
303        Return the metadata bound to this configured schema.
304        """
305        from meerschaum.utils.packages import attempt_import
306        sqlalchemy = attempt_import('sqlalchemy', lazy=False)
307        if '_metadata' not in self.__dict__:
308            self._metadata = sqlalchemy.MetaData(schema=self.schema)
309        return self._metadata
310
311    @property
312    def instance_schema(self):
313        """
314        Return the schema name for Meerschaum tables. 
315        """
316        return self.schema
317
318    @property
319    def internal_schema(self):
320        """
321        Return the schema name for internal tables. 
322        """
323        from meerschaum._internal.static import STATIC_CONFIG
324        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
325        schema_name = self.__dict__.get('internal_schema', None) or (
326            STATIC_CONFIG['sql']['internal_schema']
327            if self.flavor not in NO_SCHEMA_FLAVORS
328            else self.schema
329        )
330
331        if '_internal_schema' not in self.__dict__:
332            self._internal_schema = schema_name
333        return self._internal_schema
334
335    @property
336    def db(self) -> Optional[databases.Database]:
337        from meerschaum.utils.packages import attempt_import
338        databases = attempt_import('databases', lazy=False, install=True)
339        url = self.DATABASE_URL
340        if 'mysql' in url:
341            url = url.replace('+pymysql', '')
342        if '_db' not in self.__dict__:
343            try:
344                self._db = databases.Database(url)
345            except KeyError:
346                ### Likely encountered an unsupported flavor.
347                from meerschaum.utils.warnings import warn
348                self._db = None
349        return self._db
350
351    @property
352    def db_version(self) -> Union[str, None]:
353        """
354        Return the database version.
355        """
356        _db_version = self.__dict__.get('_db_version', None)
357        if _db_version is not None:
358            return _db_version
359
360        from meerschaum.utils.sql import get_db_version
361        self._db_version = get_db_version(self)
362        return self._db_version
363
364    @property
365    def schema(self) -> Union[str, None]:
366        """
367        Return the default schema to use.
368        A value of `None` will not prepend a schema.
369        """
370        if 'schema' in self.__dict__:
371            return self.__dict__['schema']
372
373        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
374        if self.flavor in NO_SCHEMA_FLAVORS:
375            self.__dict__['schema'] = None
376            return None
377
378        sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
379        _schema = sqlalchemy.inspect(self.engine).default_schema_name
380        self.__dict__['schema'] = _schema
381        return _schema
382
383    def get_metadata_cache_path(self, kind: str = 'json') -> pathlib.Path:
384        """
385        Return the path to the file to which to write metadata cache.
386        """
387        from meerschaum.config.paths import SQL_CONN_CACHE_RESOURCES_PATH
388        filename = (
389            f'{self.label}-metadata.pkl'
390            if kind == 'pkl'
391            else f'{self.label}.json'
392        )
393        return SQL_CONN_CACHE_RESOURCES_PATH / filename
394
395    def __getstate__(self):
396        return self.__dict__
397
398    def __setstate__(self, d):
399        self.__dict__.update(d)
400
401    def __call__(self):
402        return self

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

SQLConnector( label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)
109    def __init__(
110        self,
111        label: Optional[str] = None,
112        flavor: Optional[str] = None,
113        wait: bool = False,
114        connect: bool = False,
115        debug: bool = False,
116        **kw: Any
117    ):
118        """
119        Parameters
120        ----------
121        label: str, default 'main'
122            The identifying label for the connector.
123            E.g. for `sql:main`, 'main' is the label.
124            Defaults to 'main'.
125
126        flavor: Optional[str], default None
127            The database flavor, e.g.
128            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
129            To see supported flavors, run the `bootstrap connectors` command.
130
131        wait: bool, default False
132            If `True`, block until a database connection has been made.
133            Defaults to `False`.
134
135        connect: bool, default False
136            If `True`, immediately attempt to connect the database and raise
137            a warning if the connection fails.
138            Defaults to `False`.
139
140        debug: bool, default False
141            Verbosity toggle.
142            Defaults to `False`.
143
144        kw: Any
145            All other arguments will be passed to the connector's attributes.
146            Therefore, a connector may be made without being registered,
147            as long enough parameters are supplied to the constructor.
148        """
149        if 'uri' in kw:
150            uri = kw['uri']
151            if uri.startswith('postgres') and not uri.startswith('postgresql'):
152                uri = uri.replace('postgres', 'postgresql', 1)
153            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
154                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
155            if uri.startswith('timescaledb://'):
156                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
157                flavor = 'timescaledb'
158            if uri.startswith('timescaledb-ha://'):
159                uri = uri.replace('timescaledb-ha://', 'postgresql+psycopg://', 1)
160                flavor = 'timescaledb-ha'
161            if uri.startswith('postgis://'):
162                uri = uri.replace('postgis://', 'postgresql+psycopg://', 1)
163                flavor = 'postgis'
164            kw['uri'] = uri
165            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
166            label = label or from_uri_params.get('label', None)
167            _ = from_uri_params.pop('label', None)
168
169            ### Sometimes the flavor may be provided with a URI.
170            kw.update(from_uri_params)
171            if flavor:
172                kw['flavor'] = flavor
173
174        ### set __dict__ in base class
175        super().__init__(
176            'sql',
177            label = label or self.__dict__.get('label', None),
178            **kw
179        )
180
181        if self.__dict__.get('flavor', None) in ('sqlite', 'geopackage'):
182            self._reset_attributes()
183            self._set_attributes(
184                'sql',
185                label = label,
186                inherit_default = False,
187                **kw
188            )
189            ### For backwards compatability reasons, set the path for sql:local if its missing.
190            if self.label == 'local' and not self.__dict__.get('database', None):
191                from meerschaum.config._paths import SQLITE_DB_PATH
192                self.database = SQLITE_DB_PATH.as_posix()
193
194        ### ensure flavor and label are set accordingly
195        if 'flavor' not in self.__dict__:
196            if flavor is None and 'uri' not in self.__dict__:
197                raise ValueError(
198                    f"    Missing flavor. Provide flavor as a key for '{self}'."
199                )
200            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
201
202        if self.flavor == 'postgres':
203            self.flavor = 'postgresql'
204
205        self._debug = debug
206        ### Store the PID and thread at initialization
207        ### so we can dispose of the Pool in child processes or threads.
208        import os
209        import threading
210        self._pid = os.getpid()
211        self._thread_ident = threading.current_thread().ident
212        self._sessions = {}
213        self._locks = {'_sessions': threading.RLock(), }
214
215        ### verify the flavor's requirements are met
216        if self.flavor not in self.flavor_configs:
217            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
218        if not self.__dict__.get('uri'):
219            self.verify_attributes(
220                self.flavor_configs[self.flavor].get('requirements', set()),
221                debug=debug,
222            )
223
224        if wait:
225            from meerschaum.connectors.poll import retry_connect
226            retry_connect(connector=self, debug=debug)
227
228        if connect:
229            if not self.test_connection(debug=debug):
230                warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
  • label (str, default 'main'): The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
  • flavor (Optional[str], default None): The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
  • wait (bool, default False): If True, block until a database connection has been made. Defaults to False.
  • connect (bool, default False): If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
Session
232    @property
233    def Session(self):
234        if '_Session' not in self.__dict__:
235            if self.engine is None:
236                return None
237
238            from meerschaum.utils.packages import attempt_import
239            sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False)
240            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
241            self._Session = sqlalchemy_orm.scoped_session(session_factory)
242
243        return self._Session
engine
245    @property
246    def engine(self):
247        """
248        Return the SQLAlchemy engine connected to the configured database.
249        """
250        import os
251        import threading
252        if '_engine' not in self.__dict__:
253            self._engine, self._engine_str = self.create_engine(include_uri=True)
254
255        same_process = os.getpid() == self._pid
256        same_thread = threading.current_thread().ident == self._thread_ident
257
258        ### handle child processes
259        if not same_process:
260            self._pid = os.getpid()
261            self._thread = threading.current_thread()
262            warn("Different PID detected. Disposing of connections...")
263            self._engine.dispose()
264
265        ### handle different threads
266        if not same_thread:
267            if self.flavor == 'duckdb':
268                warn("Different thread detected.")
269                self._engine.dispose()
270
271        return self._engine

Return the SQLAlchemy engine connected to the configured database.

DATABASE_URL: str
273    @property
274    def DATABASE_URL(self) -> str:
275        """
276        Return the URI connection string (alias for `SQLConnector.URI`.
277        """
278        _ = self.engine
279        return str(self._engine_str)

Return the URI connection string (alias for SQLConnector.URI.

URI: str
281    @property
282    def URI(self) -> str:
283        """
284        Return the URI connection string.
285        """
286        _ = self.engine
287        return str(self._engine_str)

Return the URI connection string.

IS_THREAD_SAFE: str
289    @property
290    def IS_THREAD_SAFE(self) -> str:
291        """
292        Return whether this connector may be multithreaded.
293        """
294        if self.flavor in ('duckdb', 'oracle'):
295            return False
296        if self.flavor in ('sqlite', 'geopackage'):
297            return ':memory:' not in self.URI
298        return True

Return whether this connector may be multithreaded.

metadata
300    @property
301    def metadata(self):
302        """
303        Return the metadata bound to this configured schema.
304        """
305        from meerschaum.utils.packages import attempt_import
306        sqlalchemy = attempt_import('sqlalchemy', lazy=False)
307        if '_metadata' not in self.__dict__:
308            self._metadata = sqlalchemy.MetaData(schema=self.schema)
309        return self._metadata

Return the metadata bound to this configured schema.

instance_schema
311    @property
312    def instance_schema(self):
313        """
314        Return the schema name for Meerschaum tables. 
315        """
316        return self.schema

Return the schema name for Meerschaum tables.

internal_schema
318    @property
319    def internal_schema(self):
320        """
321        Return the schema name for internal tables. 
322        """
323        from meerschaum._internal.static import STATIC_CONFIG
324        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
325        schema_name = self.__dict__.get('internal_schema', None) or (
326            STATIC_CONFIG['sql']['internal_schema']
327            if self.flavor not in NO_SCHEMA_FLAVORS
328            else self.schema
329        )
330
331        if '_internal_schema' not in self.__dict__:
332            self._internal_schema = schema_name
333        return self._internal_schema

Return the schema name for internal tables.

db: 'Optional[databases.Database]'
335    @property
336    def db(self) -> Optional[databases.Database]:
337        from meerschaum.utils.packages import attempt_import
338        databases = attempt_import('databases', lazy=False, install=True)
339        url = self.DATABASE_URL
340        if 'mysql' in url:
341            url = url.replace('+pymysql', '')
342        if '_db' not in self.__dict__:
343            try:
344                self._db = databases.Database(url)
345            except KeyError:
346                ### Likely encountered an unsupported flavor.
347                from meerschaum.utils.warnings import warn
348                self._db = None
349        return self._db
db_version: Optional[str]
351    @property
352    def db_version(self) -> Union[str, None]:
353        """
354        Return the database version.
355        """
356        _db_version = self.__dict__.get('_db_version', None)
357        if _db_version is not None:
358            return _db_version
359
360        from meerschaum.utils.sql import get_db_version
361        self._db_version = get_db_version(self)
362        return self._db_version

Return the database version.

schema: Optional[str]
364    @property
365    def schema(self) -> Union[str, None]:
366        """
367        Return the default schema to use.
368        A value of `None` will not prepend a schema.
369        """
370        if 'schema' in self.__dict__:
371            return self.__dict__['schema']
372
373        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
374        if self.flavor in NO_SCHEMA_FLAVORS:
375            self.__dict__['schema'] = None
376            return None
377
378        sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
379        _schema = sqlalchemy.inspect(self.engine).default_schema_name
380        self.__dict__['schema'] = _schema
381        return _schema

Return the default schema to use. A value of None will not prepend a schema.

def get_metadata_cache_path(self, kind: str = 'json') -> pathlib.Path:
383    def get_metadata_cache_path(self, kind: str = 'json') -> pathlib.Path:
384        """
385        Return the path to the file to which to write metadata cache.
386        """
387        from meerschaum.config.paths import SQL_CONN_CACHE_RESOURCES_PATH
388        filename = (
389            f'{self.label}-metadata.pkl'
390            if kind == 'pkl'
391            else f'{self.label}.json'
392        )
393        return SQL_CONN_CACHE_RESOURCES_PATH / filename

Return the path to the file to which to write metadata cache.

flavor_configs = {'timescaledb': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 5432}}, 'timescaledb-ha': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 5432}}, 'postgresql': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 5432}}, 'postgis': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 5432}}, 'citus': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 5432}}, 'mssql': {'engine': 'mssql+pyodbc', 'create_engine': {'fast_executemany': True, 'use_insertmanyvalues': False, 'isolation_level': 'AUTOCOMMIT', 'use_setinputsizes': False, 'pool_pre_ping': True, 'ignore_no_transaction_on_rollback': True}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 1433, 'options': 'driver=ODBC Driver 18 for SQL Server&UseFMTONLY=Yes&TrustServerCertificate=yes&Encrypt=no&MARS_Connection=yes'}}, 'mysql': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 3306}}, 'mariadb': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 3306}}, 'oracle': {'engine': 'oracle+oracledb', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 1521}}, 'sqlite': {'engine': 'sqlite', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database'}, 'defaults': {}}, 'geopackage': {'engine': 'sqlite', 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database'}, 'defaults': {}}, 'duckdb': {'engine': 'duckdb', 'create_engine': {}, 'omit_create_engine': {'ALL'}, 'to_sql': {'method': 'multi'}, 'requirements': '', 'defaults': {}}, 'cockroachdb': {'engine': 'cockroachdb', 'omit_create_engine': {'method'}, 'create_engine': {'pool_size': 4, 'max_overflow': 4, 'pool_recycle': 3600, 'connect_args': {}}, 'to_sql': {'method': 'multi'}, 'requirements': {'host'}, 'defaults': {'port': 26257, 'database': 'defaultdb', 'username': 'root', 'password': 'admin'}}}
def create_engine( self, include_uri: bool = False, debug: bool = False, **kw) -> 'sqlalchemy.engine.Engine':
 45def create_engine(
 46    self,
 47    include_uri: bool = False,
 48    debug: bool = False,
 49    **kw
 50) -> 'sqlalchemy.engine.Engine':
 51    """Create a sqlalchemy engine by building the engine string."""
 52    from meerschaum.utils.packages import attempt_import
 53    from meerschaum.utils.warnings import error, warn
 54    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
 55    import urllib
 56    import copy
 57    ### Install and patch required drivers.
 58    if self.flavor in install_flavor_drivers:
 59        _ = attempt_import(
 60            *install_flavor_drivers[self.flavor],
 61            debug=debug,
 62            lazy=False,
 63            warn=False,
 64        )
 65        if self.flavor == 'mssql':
 66            _init_mssql_sqlalchemy()
 67
 68    ### supplement missing values with defaults (e.g. port number)
 69    for a, value in flavor_configs[self.flavor]['defaults'].items():
 70        if a not in self.__dict__:
 71            self.__dict__[a] = value
 72
 73    ### Verify that everything is in order.
 74    if self.flavor not in flavor_configs:
 75        error(f"Cannot create a connector with the flavor '{self.flavor}'.")
 76
 77    _engine = flavor_configs[self.flavor].get('engine', None)
 78    _username = self.__dict__.get('username', None)
 79    _password = self.__dict__.get('password', None)
 80    _host = self.__dict__.get('host', None)
 81    _port = self.__dict__.get('port', None)
 82    _database = self.__dict__.get('database', None)
 83    if _database == '{SQLITE_DB_PATH}':
 84        from meerschaum.config.paths import SQLITE_DB_PATH
 85        _database = SQLITE_DB_PATH.as_posix()
 86    _options = self.__dict__.get('options', {})
 87    if isinstance(_options, str):
 88        _options = dict(urllib.parse.parse_qsl(_options))
 89    _uri = self.__dict__.get('uri', None)
 90
 91    ### Handle registering specific dialects (due to installing in virtual environments).
 92    if self.flavor in flavor_dialects:
 93        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])
 94
 95    ### self._sys_config was deepcopied and can be updated safely
 96    if self.flavor in ("sqlite", "duckdb", "geopackage"):
 97        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
 98        if 'create_engine' not in self._sys_config:
 99            self._sys_config['create_engine'] = {}
100        if 'connect_args' not in self._sys_config['create_engine']:
101            self._sys_config['create_engine']['connect_args'] = {}
102        self._sys_config['create_engine']['connect_args'].update({"check_same_thread": False})
103    else:
104        engine_str = (
105            _engine + "://" + (_username if _username is not None else '') +
106            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
107            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
108            (("/" + _database) if _database is not None else '')
109            + (("?" + urllib.parse.urlencode(_options)) if _options else '')
110        ) if not _uri else _uri
111
112        ### Sometimes the timescaledb:// flavor can slip in.
113        if _uri and self.flavor in _uri:
114            if self.flavor in ('timescaledb', 'timescaledb-ha', 'postgis'):
115                engine_str = engine_str.replace(self.flavor, 'postgresql', 1)
116            elif _uri.startswith('postgresql://'):
117                engine_str = engine_str.replace('postgresql://', 'postgresql+psycopg2://')
118
119    if debug:
120        dprint(
121            (
122                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
123                    if _password is not None else engine_str
124            ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}"
125        )
126
127    _kw_copy = copy.deepcopy(kw)
128
129    ### NOTE: Order of inheritance:
130    ###       1. Defaults
131    ###       2. System configuration
132    ###       3. Connector configuration
133    ###       4. Keyword arguments
134    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
135    def _apply_create_engine_args(update):
136        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
137            _create_engine_args.update(
138                { k: v for k, v in update.items()
139                    if 'omit_create_engine' not in flavor_configs[self.flavor]
140                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
141                }
142            )
143    _apply_create_engine_args(self._sys_config.get('create_engine', {}))
144    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
145    _apply_create_engine_args(_kw_copy)
146
147    try:
148        engine = sqlalchemy.create_engine(
149            engine_str,
150            ### I know this looks confusing, and maybe it's bad code,
151            ### but it's simple. It dynamically parses the config string
152            ### and splits it to separate the class name (QueuePool)
153            ### from the module name (sqlalchemy.pool).
154            poolclass    = getattr(
155                attempt_import(
156                    ".".join(self._sys_config['poolclass'].split('.')[:-1])
157                ),
158                self._sys_config['poolclass'].split('.')[-1]
159            ),
160            echo         = debug,
161            **_create_engine_args
162        )
163    except Exception:
164        warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False)
165        engine = None
166
167    if include_uri:
168        return engine, engine_str
169    return engine

Create a sqlalchemy engine by building the engine string.

def read( self, query_or_table: 'Union[str, sqlalchemy.Query]', params: Union[Dict[str, Any], List[str], NoneType] = None, dtype: Optional[Dict[str, Any]] = None, coerce_float: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.core.frame.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, schema: Optional[str] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any) -> 'Union[pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None]':
 35def read(
 36    self,
 37    query_or_table: Union[str, sqlalchemy.Query],
 38    params: Union[Dict[str, Any], List[str], None] = None,
 39    dtype: Optional[Dict[str, Any]] = None,
 40    coerce_float: bool = True,
 41    chunksize: Optional[int] = -1,
 42    workers: Optional[int] = None,
 43    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
 44    as_hook_results: bool = False,
 45    chunks: Optional[int] = None,
 46    schema: Optional[str] = None,
 47    as_chunks: bool = False,
 48    as_iterator: bool = False,
 49    as_dask: bool = False,
 50    index_col: Optional[str] = None,
 51    silent: bool = False,
 52    debug: bool = False,
 53    **kw: Any
 54) -> Union[
 55    pandas.DataFrame,
 56    dask.DataFrame,
 57    List[pandas.DataFrame],
 58    List[Any],
 59    None,
 60]:
 61    """
 62    Read a SQL query or table into a pandas dataframe.
 63
 64    Parameters
 65    ----------
 66    query_or_table: Union[str, sqlalchemy.Query]
 67        The SQL query (sqlalchemy Query or string) or name of the table from which to select.
 68
 69    params: Optional[Dict[str, Any]], default None
 70        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
 71        See the pandas documentation for more information:
 72        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
 73
 74    dtype: Optional[Dict[str, Any]], default None
 75        A dictionary of data types to pass to `pandas.read_sql()`.
 76        See the pandas documentation for more information:
 77        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
 78
 79    chunksize: Optional[int], default -1
 80        How many chunks to read at a time. `None` will read everything in one large chunk.
 81        Defaults to system configuration.
 82
 83        **NOTE:** DuckDB does not allow for chunking.
 84
 85    workers: Optional[int], default None
 86        How many threads to use when consuming the generator.
 87        Only applies if `chunk_hook` is provided.
 88
 89    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
 90        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
 91        See `--sync-chunks` for an example.
 92        **NOTE:** `as_iterator` MUST be False (default).
 93
 94    as_hook_results: bool, default False
 95        If `True`, return a `List` of the outputs of the hook function.
 96        Only applicable if `chunk_hook` is not None.
 97
 98        **NOTE:** `as_iterator` MUST be `False` (default).
 99
100    chunks: Optional[int], default None
101        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
102        return into a single dataframe.
103        For example, to limit the returned dataframe to 100,000 rows,
104        you could specify a `chunksize` of `1000` and `chunks` of `100`.
105
106    schema: Optional[str], default None
107        If just a table name is provided, optionally specify the table schema.
108        Defaults to `SQLConnector.schema`.
109
110    as_chunks: bool, default False
111        If `True`, return a list of DataFrames.
112        Otherwise return a single DataFrame.
113
114    as_iterator: bool, default False
115        If `True`, return the pandas DataFrame iterator.
116        `chunksize` must not be `None` (falls back to 1000 if so),
117        and hooks are not called in this case.
118
119    index_col: Optional[str], default None
120        If using Dask, use this column as the index column.
121        If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
122
123    silent: bool, default False
124        If `True`, don't raise warnings in case of errors.
125        Defaults to `False`.
126
127    Returns
128    -------
129    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
130    or `None` if something breaks.
131
132    """
133    if chunks is not None and chunks <= 0:
134        return []
135
136    from meerschaum.utils.sql import sql_item_name, truncate_item_name
137    from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone
138    from meerschaum.utils.dtypes.sql import TIMEZONE_NAIVE_FLAVORS
139    from meerschaum.utils.packages import attempt_import, import_pandas
140    from meerschaum.utils.pool import get_pool
141    from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols
142    from meerschaum.utils.misc import filter_arguments
143    import warnings
144    import traceback
145    from decimal import Decimal
146
147    pd = import_pandas()
148    dd = None
149
150    is_dask = 'dask' in pd.__name__
151    pandas = attempt_import('pandas')
152    is_dask = dd is not None
153    npartitions = chunksize_to_npartitions(chunksize)
154    if is_dask:
155        chunksize = None
156
157    schema = schema or self.schema
158    utc_dt_cols = [
159        col
160        for col, typ in dtype.items()
161        if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower()
162    ] if dtype else []
163
164    if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS:
165        dtype = dtype.copy()
166        for col in utc_dt_cols:
167            dtype[col] = 'datetime64[us]'
168
169    pool = get_pool(workers=workers)
170    sqlalchemy = attempt_import("sqlalchemy", lazy=False)
171    default_chunksize = self._sys_config.get('chunksize', None)
172    chunksize = chunksize if chunksize != -1 else default_chunksize
173    if chunksize is None and as_iterator:
174        if not silent and self.flavor not in _disallow_chunks_flavors:
175            warn(
176                "An iterator may only be generated if chunksize is not None.\n"
177                + "Falling back to a chunksize of 1000.", stacklevel=3,
178            )
179        chunksize = 1000
180    if chunksize is not None and self.flavor in _max_chunks_flavors:
181        if chunksize > _max_chunks_flavors[self.flavor]:
182            if chunksize != default_chunksize:
183                warn(
184                    f"The specified chunksize of {chunksize} exceeds the maximum of "
185                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
186                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
187                    stacklevel=3,
188                )
189            chunksize = _max_chunks_flavors[self.flavor]
190
191    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
192        chunksize = None
193
194    if debug:
195        import time
196        start = time.perf_counter()
197        dprint(f"[{self}]\n{query_or_table}")
198        dprint(f"[{self}] Fetching with chunksize: {chunksize}")
199
200    ### This might be sqlalchemy object or the string of a table name.
201    ### We check for spaces and quotes to see if it might be a weird table.
202    if (
203        ' ' not in str(query_or_table)
204        or (
205            ' ' in str(query_or_table)
206            and str(query_or_table).startswith('"')
207            and str(query_or_table).endswith('"')
208        )
209    ):
210        truncated_table_name = truncate_item_name(str(query_or_table), self.flavor)
211        if truncated_table_name != str(query_or_table) and not silent:
212            warn(
213                f"Table '{query_or_table}' is too long for '{self.flavor}',"
214                + f" will instead read the table '{truncated_table_name}'."
215            )
216
217        query_or_table = sql_item_name(str(query_or_table), self.flavor, schema)
218        if debug:
219            dprint(f"[{self}] Reading from table {query_or_table}")
220        formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table))
221        str_query = f"SELECT * FROM {query_or_table}"
222    else:
223        str_query = query_or_table
224
225    formatted_query = (
226        sqlalchemy.text(str_query)
227        if not is_dask and isinstance(str_query, str)
228        else format_sql_query_for_dask(str_query)
229    )
230
231    def _get_chunk_args_kwargs(_chunk):
232        return filter_arguments(
233            chunk_hook,
234            _chunk,
235            workers=workers,
236            chunksize=chunksize,
237            debug=debug,
238            **kw
239        )
240
241    chunk_list = []
242    chunk_hook_results = []
243    def _process_chunk(_chunk, _retry_on_failure: bool = True):
244        if self.flavor in TIMEZONE_NAIVE_FLAVORS:
245            for col in utc_dt_cols:
246                _chunk[col] = coerce_timezone(_chunk[col], strip_utc=False)
247        if not as_hook_results:
248            chunk_list.append(_chunk)
249
250        if chunk_hook is None:
251            return None
252
253        chunk_args, chunk_kwargs = _get_chunk_args_kwargs(_chunk)
254
255        result = None
256        try:
257            result = chunk_hook(*chunk_args, **chunk_kwargs)
258        except Exception:
259            result = False, traceback.format_exc()
260            from meerschaum.utils.formatting import get_console
261            if not silent:
262                get_console().print_exception()
263
264        ### If the chunk fails to process, try it again one more time.
265        if isinstance(result, tuple) and result[0] is False:
266            if _retry_on_failure:
267                return _process_chunk(_chunk, _retry_on_failure=False)
268
269        return result
270
271    try:
272        stream_results = not as_iterator and chunk_hook is not None and chunksize is not None
273        with warnings.catch_warnings():
274            warnings.filterwarnings('ignore', 'case sensitivity issues')
275
276            read_sql_query_kwargs = {
277                'params': params,
278                'dtype': dtype,
279                'coerce_float': coerce_float,
280                'index_col': index_col,
281            }
282            if is_dask:
283                if index_col is None:
284                    dd = None
285                    pd = attempt_import('pandas')
286                    read_sql_query_kwargs.update({
287                        'chunksize': chunksize,
288                    })
289            else:
290                read_sql_query_kwargs.update({
291                    'chunksize': chunksize,
292                })
293
294            if is_dask and dd is not None:
295                ddf = dd.read_sql_query(
296                    formatted_query,
297                    self.URI,
298                    **read_sql_query_kwargs
299                )
300            else:
301
302                def get_chunk_generator(connectable):
303                    chunk_generator = pd.read_sql_query(
304                        formatted_query,
305                        self.engine,
306                        **read_sql_query_kwargs
307                    )
308
309                    to_return = (
310                        (
311                            chunk_generator
312                            if not (as_hook_results or chunksize is None)
313                            else (
314                                _process_chunk(_chunk)
315                                for _chunk in chunk_generator
316                            )
317                        )
318                        if as_iterator or chunksize is None
319                        else (
320                            list(pool.imap(_process_chunk, chunk_generator))
321                            if as_hook_results
322                            else None
323                        )
324                    )
325                    return chunk_generator, to_return
326
327                if self.flavor in SKIP_READ_TRANSACTION_FLAVORS:
328                    chunk_generator, to_return = get_chunk_generator(self.engine)
329                else:
330                    with self.engine.begin() as transaction:
331                        with transaction.execution_options(stream_results=stream_results) as connection:
332                            chunk_generator, to_return = get_chunk_generator(connection)
333
334                if to_return is not None:
335                    return to_return
336
337    except Exception as e:
338        if debug:
339            dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n")
340        if not silent:
341            warn(str(e), stacklevel=3)
342        from meerschaum.utils.formatting import get_console
343        if not silent:
344            get_console().print_exception()
345
346        return None
347
348    if is_dask and dd is not None:
349        ddf = ddf.reset_index()
350        return ddf
351
352    chunk_list = []
353    read_chunks = 0
354    chunk_hook_results = []
355    if chunksize is None:
356        chunk_list.append(chunk_generator)
357    elif as_iterator:
358        return chunk_generator
359    else:
360        try:
361            for chunk in chunk_generator:
362                if chunk_hook is not None:
363                    chunk_args, chunk_kwargs = _get_chunk_args_kwargs(chunk)
364                    chunk_hook_results.append(chunk_hook(*chunk_args, **chunk_kwargs))
365                chunk_list.append(chunk)
366                read_chunks += 1
367                if chunks is not None and read_chunks >= chunks:
368                    break
369        except Exception as e:
370            warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
371            from meerschaum.utils.formatting import get_console
372            if not silent:
373                get_console().print_exception()
374
375    read_chunks = 0
376    try:
377        for chunk in chunk_generator:
378            if chunk_hook is not None:
379                chunk_args, chunk_kwargs = _get_chunk_args_kwargs(chunk)
380                chunk_hook_results.append(chunk_hook(*chunk_args, **chunk_kwargs))
381            chunk_list.append(chunk)
382            read_chunks += 1
383            if chunks is not None and read_chunks >= chunks:
384                break
385    except Exception as e:
386        warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
387        from meerschaum.utils.formatting import get_console
388        if not silent:
389            get_console().print_exception()
390
391        return None
392
393    ### If no chunks returned, read without chunks
394    ### to get columns
395    if len(chunk_list) == 0:
396        with warnings.catch_warnings():
397            warnings.filterwarnings('ignore', 'case sensitivity issues')
398            _ = read_sql_query_kwargs.pop('chunksize', None)
399            with self.engine.begin() as connection:
400                chunk_list.append(
401                    pd.read_sql_query(
402                        formatted_query,
403                        connection,
404                        **read_sql_query_kwargs
405                    )
406                )
407
408    ### call the hook on any missed chunks.
409    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
410        for c in chunk_list[len(chunk_hook_results):]:
411            chunk_args, chunk_kwargs = _get_chunk_args_kwargs(c)
412            chunk_hook_results.append(chunk_hook(*chunk_args, **chunk_kwargs))
413
414    ### chunksize is not None so must iterate
415    if debug:
416        end = time.perf_counter()
417        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")
418
419    if as_hook_results:
420        return chunk_hook_results
421    
422    ### Skip `pd.concat()` if `as_chunks` is specified.
423    if as_chunks:
424        for c in chunk_list:
425            c.reset_index(drop=True, inplace=True)
426            for col in get_numeric_cols(c):
427                c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
428        return chunk_list
429
430    df = pd.concat(chunk_list).reset_index(drop=True)
431    ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes.
432    for col in get_numeric_cols(df):
433        df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
434
435    return df

Read a SQL query or table into a pandas dataframe.

Parameters
  • query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
  • params (Optional[Dict[str, Any]], default None): List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
  • dtype (Optional[Dict[str, Any]], default None): A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
  • chunksize (Optional[int], default -1): How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

    NOTE: DuckDB does not allow for chunking.

  • workers (Optional[int], default None): How many threads to use when consuming the generator. Only applies if chunk_hook is provided.
  • chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None): Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
  • as_hook_results (bool, default False): If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

    NOTE: as_iterator MUST be False (default).

  • chunks (Optional[int], default None): Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
  • schema (Optional[str], default None): If just a table name is provided, optionally specify the table schema. Defaults to SQLConnector.schema.
  • as_chunks (bool, default False): If True, return a list of DataFrames. Otherwise return a single DataFrame.
  • as_iterator (bool, default False): If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case.
  • index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
  • silent (bool, default False): If True, don't raise warnings in case of errors. Defaults to False.
Returns
  • A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators,
  • or None if something breaks.
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) -> Any:
438def value(
439    self,
440    query: str,
441    *args: Any,
442    use_pandas: bool = False,
443    **kw: Any
444) -> Any:
445    """
446    Execute the provided query and return the first value.
447
448    Parameters
449    ----------
450    query: str
451        The SQL query to execute.
452        
453    *args: Any
454        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
455        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
456        
457    use_pandas: bool, default False
458        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
459        `meerschaum.connectors.sql.SQLConnector.exec` (default).
460        **NOTE:** This is always `True` for DuckDB.
461
462    **kw: Any
463        See `args`.
464
465    Returns
466    -------
467    Any value returned from the query.
468
469    """
470    from meerschaum.utils.packages import attempt_import
471    if self.flavor == 'duckdb':
472        use_pandas = True
473    if use_pandas:
474        try:
475            return self.read(query, *args, **kw).iloc[0, 0]
476        except Exception:
477            return None
478
479    _close = kw.get('close', True)
480    _commit = kw.get('commit', (self.flavor != 'mssql'))
481
482    try:
483        result, connection = self.exec(
484            query,
485            *args,
486            with_connection=True,
487            close=False,
488            commit=_commit,
489            **kw
490        )
491        first = result.first() if result is not None else None
492        _val = first[0] if first is not None else None
493    except Exception as e:
494        warn(e, stacklevel=3)
495        return None
496    if _close:
497        try:
498            connection.close()
499        except Exception as e:
500            warn("Failed to close connection with exception:\n" + str(e))
501    return _val

Execute the provided query and return the first value.

Parameters
Returns
  • Any value returned from the query.
def exec( self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, _connection=None, _transaction=None, **kw: Any) -> 'Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]':
515def exec(
516    self,
517    query: str,
518    *args: Any,
519    silent: bool = False,
520    debug: bool = False,
521    commit: Optional[bool] = None,
522    close: Optional[bool] = None,
523    with_connection: bool = False,
524    _connection=None,
525    _transaction=None,
526    **kw: Any
527) -> Union[
528        sqlalchemy.engine.result.resultProxy,
529        sqlalchemy.engine.cursor.LegacyCursorResult,
530        Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
531        Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
532        None
533]:
534    """
535    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
536
537    If inserting data, please use bind variables to avoid SQL injection!
538
539    Parameters
540    ----------
541    query: Union[str, List[str], Tuple[str]]
542        The query to execute.
543        If `query` is a list or tuple, call `self.exec_queries()` instead.
544
545    args: Any
546        Arguments passed to `sqlalchemy.engine.execute`.
547
548    silent: bool, default False
549        If `True`, suppress warnings.
550
551    commit: Optional[bool], default None
552        If `True`, commit the changes after execution.
553        Causes issues with flavors like `'mssql'`.
554        This does not apply if `query` is a list of strings.
555
556    close: Optional[bool], default None
557        If `True`, close the connection after execution.
558        Causes issues with flavors like `'mssql'`.
559        This does not apply if `query` is a list of strings.
560
561    with_connection: bool, default False
562        If `True`, return a tuple including the connection object.
563        This does not apply if `query` is a list of strings.
564
565    Returns
566    -------
567    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.
568
569    """
570    if isinstance(query, (list, tuple)):
571        return self.exec_queries(
572            list(query),
573            *args,
574            silent=silent,
575            debug=debug,
576            **kw
577        )
578
579    from meerschaum.utils.packages import attempt_import
580    sqlalchemy = attempt_import("sqlalchemy", lazy=False)
581    if debug:
582        dprint(f"[{self}] Executing query:\n{query}")
583
584    _close = close if close is not None else (self.flavor != 'mssql')
585    _commit = commit if commit is not None else (
586        (self.flavor != 'mssql' or 'select' not in str(query).lower())
587    )
588
589    ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+).
590    if not hasattr(query, 'compile'):
591        query = sqlalchemy.text(query)
592
593    connection = _connection if _connection is not None else self.get_connection()
594
595    try:
596        transaction = (
597            _transaction
598            if _transaction is not None else (
599                connection.begin()
600                if _commit
601                else None
602            )
603        )
604    except sqlalchemy.exc.InvalidRequestError as e:
605        if _connection is not None or _transaction is not None:
606            raise e
607        connection = self.get_connection(rebuild=True)
608        transaction = connection.begin()
609
610    if transaction is not None and not transaction.is_active and _transaction is not None:
611        connection = self.get_connection(rebuild=True)
612        transaction = connection.begin() if _commit else None
613
614    result = None
615    try:
616        result = connection.execute(query, *args, **kw)
617        if _commit:
618            transaction.commit()
619    except Exception as e:
620        if debug:
621            dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}")
622        if not silent:
623            warn(str(e), stacklevel=3)
624        result = None
625        if _commit:
626            transaction.rollback()
627            connection = self.get_connection(rebuild=True)
628    finally:
629        if _close:
630            connection.close()
631
632    if with_connection:
633        return result, connection
634
635    return result

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters
  • query (Union[str, List[str], Tuple[str]]): The query to execute. If query is a list or tuple, call self.exec_queries() instead.
  • args (Any): Arguments passed to sqlalchemy.engine.execute.
  • silent (bool, default False): If True, suppress warnings.
  • commit (Optional[bool], default None): If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • close (Optional[bool], default None): If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • with_connection (bool, default False): If True, return a tuple including the connection object. This does not apply if query is a list of strings.
Returns
  • The sqlalchemy result object, or a tuple with the connection if with_connection is provided.
def execute( self, *args: Any, **kw: Any) -> 'Optional[sqlalchemy.engine.result.resultProxy]':
504def execute(
505    self,
506    *args : Any,
507    **kw : Any
508) -> Optional[sqlalchemy.engine.result.resultProxy]:
509    """
510    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
511    """
512    return self.exec(*args, **kw)
def to_sql( self, df: pandas.core.frame.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, schema: Optional[str] = None, safe_copy: bool = True, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, _connection=None, _transaction=None, **kw) -> Union[bool, Tuple[bool, str]]:
 733def to_sql(
 734    self,
 735    df: pandas.DataFrame,
 736    name: str = None,
 737    index: bool = False,
 738    if_exists: str = 'replace',
 739    method: str = "",
 740    chunksize: Optional[int] = -1,
 741    schema: Optional[str] = None,
 742    safe_copy: bool = True,
 743    silent: bool = False,
 744    debug: bool = False,
 745    as_tuple: bool = False,
 746    as_dict: bool = False,
 747    _connection=None,
 748    _transaction=None,
 749    **kw
 750) -> Union[bool, SuccessTuple]:
 751    """
 752    Upload a DataFrame's contents to the SQL server.
 753
 754    Parameters
 755    ----------
 756    df: pd.DataFrame
 757        The DataFrame to be inserted.
 758
 759    name: str
 760        The name of the table to be created.
 761
 762    index: bool, default False
 763        If True, creates the DataFrame's indices as columns.
 764
 765    if_exists: str, default 'replace'
 766        Drop and create the table ('replace') or append if it exists
 767        ('append') or raise Exception ('fail').
 768        Options are ['replace', 'append', 'fail'].
 769
 770    method: str, default ''
 771        None or multi. Details on pandas.to_sql.
 772
 773    chunksize: Optional[int], default -1
 774        How many rows to insert at a time.
 775
 776    schema: Optional[str], default None
 777        Optionally override the schema for the table.
 778        Defaults to `SQLConnector.schema`.
 779
 780    safe_copy: bool, defaul True
 781        If `True`, copy the dataframe before making any changes.
 782
 783    as_tuple: bool, default False
 784        If `True`, return a (success_bool, message) tuple instead of a `bool`.
 785        Defaults to `False`.
 786
 787    as_dict: bool, default False
 788        If `True`, return a dictionary of transaction information.
 789        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
 790        `method`, and `target`.
 791
 792    kw: Any
 793        Additional arguments will be passed to the DataFrame's `to_sql` function
 794
 795    Returns
 796    -------
 797    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
 798    """
 799    import time
 800    import json
 801    from datetime import timedelta
 802    from meerschaum.utils.warnings import error, warn
 803    import warnings
 804    import functools
 805    import traceback
 806
 807    if name is None:
 808        error(f"Name must not be `None` to insert data into {self}.")
 809
 810    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
 811    kw.pop('name', None)
 812
 813    schema = schema or self.schema
 814
 815    from meerschaum.utils.sql import (
 816        sql_item_name,
 817        table_exists,
 818        json_flavors,
 819        truncate_item_name,
 820        DROP_IF_EXISTS_FLAVORS,
 821    )
 822    from meerschaum.utils.dataframe import (
 823        get_json_cols,
 824        get_numeric_cols,
 825        get_uuid_cols,
 826        get_bytes_cols,
 827        get_geometry_cols,
 828    )
 829    from meerschaum.utils.dtypes import (
 830        are_dtypes_equal,
 831        coerce_timezone,
 832        encode_bytes_for_bytea,
 833        serialize_bytes,
 834        serialize_decimal,
 835        serialize_geometry,
 836        json_serialize_value,
 837        get_geometry_type_srid,
 838    )
 839    from meerschaum.utils.dtypes.sql import (
 840        PD_TO_SQLALCHEMY_DTYPES_FLAVORS,
 841        get_db_type_from_pd_type,
 842        get_pd_type_from_db_type,
 843        get_numeric_precision_scale,
 844    )
 845    from meerschaum.utils.misc import interval_str
 846    from meerschaum.connectors.sql._create_engine import flavor_configs
 847    from meerschaum.utils.packages import attempt_import, import_pandas
 848    sqlalchemy = attempt_import('sqlalchemy', debug=debug, lazy=False)
 849    pd = import_pandas()
 850    is_dask = 'dask' in df.__module__
 851
 852    bytes_cols = get_bytes_cols(df)
 853    numeric_cols = get_numeric_cols(df)
 854    geometry_cols = get_geometry_cols(df)
 855    ### NOTE: This excludes non-numeric serialized Decimals (e.g. SQLite).
 856    numeric_cols_dtypes = {
 857        col: typ
 858        for col, typ in kw.get('dtype', {}).items()
 859        if (
 860            col in df.columns
 861            and 'numeric' in str(typ).lower()
 862        )
 863    }
 864    numeric_cols.extend([col for col in numeric_cols_dtypes if col not in numeric_cols])
 865    numeric_cols_precisions_scales = {
 866        col: (
 867            (typ.precision, typ.scale)
 868            if hasattr(typ, 'precision')
 869            else get_numeric_precision_scale(self.flavor)
 870        )
 871        for col, typ in numeric_cols_dtypes.items()
 872    }
 873    geometry_cols_dtypes = {
 874        col: typ
 875        for col, typ in kw.get('dtype', {}).items()
 876        if (
 877            col in df.columns
 878            and 'geometry' in str(typ).lower() or 'geography' in str(typ).lower()
 879        )
 880    }
 881    geometry_cols.extend([col for col in geometry_cols_dtypes if col not in geometry_cols])
 882    geometry_cols_types_srids = {
 883        col: (typ.geometry_type, typ.srid)
 884        if hasattr(typ, 'srid')
 885        else get_geometry_type_srid()
 886        for col, typ in geometry_cols_dtypes.items()
 887    }
 888
 889    cols_pd_types = {
 890        col: get_pd_type_from_db_type(str(typ))
 891        for col, typ in kw.get('dtype', {}).items()
 892    }
 893    cols_pd_types.update({
 894        col: f'numeric[{precision},{scale}]'
 895        for col, (precision, scale) in numeric_cols_precisions_scales.items()
 896        if precision and scale
 897    })
 898    cols_db_types = {
 899        col: get_db_type_from_pd_type(typ, flavor=self.flavor)
 900        for col, typ in cols_pd_types.items()
 901    }
 902
 903    enable_bulk_insert = mrsm.get_config(
 904        'system', 'connectors', 'sql', 'bulk_insert', self.flavor,
 905        warn=False,
 906    ) or False
 907    stats = {'target': name}
 908    ### resort to defaults if None
 909    copied = False
 910    use_bulk_insert = False
 911    if method == "":
 912        if enable_bulk_insert:
 913            method = (
 914                functools.partial(mssql_insert_json, cols_types=cols_db_types, debug=debug)
 915                if self.flavor == 'mssql'
 916                else functools.partial(psql_insert_copy, debug=debug)
 917            )
 918            use_bulk_insert = True
 919        else:
 920            ### Should resolve to 'multi' or `None`.
 921            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
 922
 923    if bytes_cols and (use_bulk_insert or self.flavor == 'oracle'):
 924        if safe_copy and not copied:
 925            df = df.copy()
 926            copied = True
 927        bytes_serializer = (
 928            functools.partial(encode_bytes_for_bytea, with_prefix=(self.flavor != 'oracle'))
 929            if self.flavor != 'mssql'
 930            else serialize_bytes
 931        )
 932        for col in bytes_cols:
 933            df[col] = df[col].apply(bytes_serializer)
 934
 935    ### Check for numeric columns.
 936    for col in numeric_cols:
 937        precision, scale = numeric_cols_precisions_scales.get(
 938            col,
 939            get_numeric_precision_scale(self.flavor)
 940        )
 941        df[col] = df[col].apply(
 942            functools.partial(
 943                serialize_decimal,
 944                quantize=True,
 945                precision=precision,
 946                scale=scale,
 947            )
 948        )
 949
 950    for col in geometry_cols:
 951        geometry_type, srid = geometry_cols_types_srids.get(col, get_geometry_type_srid())
 952        with warnings.catch_warnings():
 953            warnings.simplefilter("ignore")
 954            df[col] = df[col].apply(
 955                functools.partial(
 956                    serialize_geometry,
 957                    geometry_format=('wkt' if self.flavor == 'mssql' else 'wkb_hex'),
 958                )
 959            )
 960
 961    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)
 962
 963    default_chunksize = self._sys_config.get('chunksize', None)
 964    chunksize = chunksize if chunksize != -1 else default_chunksize
 965    if chunksize is not None and self.flavor in _max_chunks_flavors:
 966        if chunksize > _max_chunks_flavors[self.flavor]:
 967            if chunksize != default_chunksize:
 968                warn(
 969                    f"The specified chunksize of {chunksize} exceeds the maximum of "
 970                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
 971                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
 972                    stacklevel = 3,
 973                )
 974            chunksize = _max_chunks_flavors[self.flavor]
 975    stats['chunksize'] = chunksize
 976
 977    success, msg = False, "Default to_sql message"
 978    start = time.perf_counter()
 979    if debug:
 980        msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..."
 981        print(msg, end="", flush=True)
 982    stats['num_rows'] = len(df)
 983
 984    ### Check if the name is too long.
 985    truncated_name = truncate_item_name(name, self.flavor)
 986    if name != truncated_name:
 987        warn(
 988            f"Table '{name}' is too long for '{self.flavor}',"
 989            f" will instead create the table '{truncated_name}'."
 990        )
 991
 992    ### filter out non-pandas args
 993    import inspect
 994    to_sql_params = inspect.signature(df.to_sql).parameters
 995    to_sql_kw = {}
 996    for k, v in kw.items():
 997        if k in to_sql_params:
 998            to_sql_kw[k] = v
 999
1000    to_sql_kw.update({
1001        'name': truncated_name,
1002        'schema': schema,
1003        ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI),
1004        'index': index,
1005        'if_exists': if_exists,
1006        'method': method,
1007        'chunksize': chunksize,
1008    })
1009    if is_dask:
1010        to_sql_kw.update({
1011            'parallel': True,
1012        })
1013    elif _connection is not None:
1014        to_sql_kw['con'] = _connection
1015
1016    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
1017    if self.flavor == 'oracle':
1018        ### For some reason 'replace' doesn't work properly in pandas,
1019        ### so try dropping first.
1020        if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug):
1021            success = self.exec(
1022                f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema)
1023            ) is not None
1024            if not success:
1025                warn(f"Unable to drop {name}")
1026
1027        ### Enforce NVARCHAR(2000) as text instead of CLOB.
1028        dtype = to_sql_kw.get('dtype', {})
1029        for col, typ in df.dtypes.items():
1030            if are_dtypes_equal(str(typ), 'object'):
1031                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
1032            elif are_dtypes_equal(str(typ), 'int'):
1033                dtype[col] = sqlalchemy.types.INTEGER
1034        to_sql_kw['dtype'] = dtype
1035    elif self.flavor == 'duckdb':
1036        dtype = to_sql_kw.get('dtype', {})
1037        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
1038        for col in dt_cols:
1039            df[col] = coerce_timezone(df[col], strip_utc=False)
1040    elif self.flavor == 'mssql':
1041        dtype = to_sql_kw.get('dtype', {})
1042        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
1043        new_dtype = {}
1044        for col in dt_cols:
1045            if col in dtype:
1046                continue
1047            dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True)
1048            if col not in dtype:
1049                new_dtype[col] = dt_typ
1050
1051        dtype.update(new_dtype)
1052        to_sql_kw['dtype'] = dtype
1053
1054    ### Check for JSON columns.
1055    if self.flavor not in json_flavors:
1056        json_cols = get_json_cols(df)
1057        for col in json_cols:
1058            df[col] = df[col].apply(
1059                (
1060                    lambda x: json.dumps(x, default=json_serialize_value, sort_keys=True)
1061                    if not isinstance(x, Hashable)
1062                    else x
1063                )
1064            )
1065
1066    if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid':
1067        uuid_cols = get_uuid_cols(df)
1068        for col in uuid_cols:
1069            df[col] = df[col].astype(str)
1070
1071    try:
1072        with warnings.catch_warnings():
1073            warnings.filterwarnings('ignore')
1074            df.to_sql(**to_sql_kw)
1075        success = True
1076    except Exception as e:
1077        if not silent:
1078            warn(str(e))
1079        success, msg = False, traceback.format_exc()
1080
1081    end = time.perf_counter()
1082    if success:
1083        num_rows = len(df)
1084        msg = (
1085            f"It took {interval_str(timedelta(seconds=(end - start)))} "
1086            + f"to sync {num_rows:,} row"
1087            + ('s' if num_rows != 1 else '')
1088            + f" to {name}."
1089        )
1090    stats['start'] = start
1091    stats['end'] = end
1092    stats['duration'] = end - start
1093
1094    if debug:
1095        print(" done.", flush=True)
1096        dprint(msg)
1097
1098    stats['success'] = success
1099    stats['msg'] = msg
1100    if as_tuple:
1101        return success, msg
1102    if as_dict:
1103        return stats
1104    return success

Upload a DataFrame's contents to the SQL server.

Parameters
  • df (pd.DataFrame): The DataFrame to be inserted.
  • name (str): The name of the table to be created.
  • index (bool, default False): If True, creates the DataFrame's indices as columns.
  • if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
  • method (str, default ''): None or multi. Details on pandas.to_sql.
  • chunksize (Optional[int], default -1): How many rows to insert at a time.
  • schema (Optional[str], default None): Optionally override the schema for the table. Defaults to SQLConnector.schema.
  • safe_copy (bool, defaul True): If True, copy the dataframe before making any changes.
  • as_tuple (bool, default False): If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
  • as_dict (bool, default False): If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
  • kw (Any): Additional arguments will be passed to the DataFrame's to_sql function
Returns
  • Either a bool or a SuccessTuple (depends on as_tuple).
def exec_queries( self, queries: "List[Union[str, Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]]]", break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False) -> 'List[Union[sqlalchemy.engine.cursor.CursorResult, None]]':
638def exec_queries(
639    self,
640    queries: List[
641        Union[
642            str,
643            Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]
644        ]
645    ],
646    break_on_error: bool = False,
647    rollback: bool = True,
648    silent: bool = False,
649    debug: bool = False,
650) -> List[Union[sqlalchemy.engine.cursor.CursorResult, None]]:
651    """
652    Execute a list of queries in a single transaction.
653
654    Parameters
655    ----------
656    queries: List[
657        Union[
658            str,
659            Tuple[str, Callable[[], List[str]]]
660        ]
661    ]
662        The queries in the transaction to be executed.
663        If a query is a tuple, the second item of the tuple
664        will be considered a callable hook that returns a list of queries to be executed
665        before the next item in the list.
666
667    break_on_error: bool, default False
668        If `True`, stop executing when a query fails.
669
670    rollback: bool, default True
671        If `break_on_error` is `True`, rollback the transaction if a query fails.
672
673    silent: bool, default False
674        If `True`, suppress warnings.
675
676    Returns
677    -------
678    A list of SQLAlchemy results.
679    """
680    from meerschaum.utils.warnings import warn
681    from meerschaum.utils.debug import dprint
682    from meerschaum.utils.packages import attempt_import
683    sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm', lazy=False)
684    session = sqlalchemy_orm.Session(self.engine)
685
686    result = None
687    results = []
688    with session.begin():
689        for query in queries:
690            hook = None
691            result = None
692
693            if isinstance(query, tuple):
694                query, hook = query
695            if isinstance(query, str):
696                query = sqlalchemy.text(query)
697
698            if debug:
699                dprint(f"[{self}]\n" + str(query))
700
701            try:
702                result = session.execute(query)
703                session.flush()
704            except Exception as e:
705                msg = (f"Encountered error while executing:\n{e}")
706                if not silent:
707                    warn(msg)
708                elif debug:
709                    dprint(f"[{self}]\n" + str(msg))
710                result = None
711            if result is None and break_on_error:
712                if rollback:
713                    session.rollback()
714                results.append(result)
715                break
716            elif result is not None and hook is not None:
717                hook_queries = hook(session)
718                if hook_queries:
719                    hook_results = self.exec_queries(
720                        hook_queries,
721                        break_on_error = break_on_error,
722                        rollback=rollback,
723                        silent=silent,
724                        debug=debug,
725                    )
726                    result = (result, hook_results)
727
728            results.append(result)
729
730    return results

Execute a list of queries in a single transaction.

Parameters
  • queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
  • ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
  • break_on_error (bool, default False): If True, stop executing when a query fails.
  • rollback (bool, default True): If break_on_error is True, rollback the transaction if a query fails.
  • silent (bool, default False): If True, suppress warnings.
Returns
  • A list of SQLAlchemy results.
def get_connection(self, rebuild: bool = False) -> "'sqlalchemy.engine.base.Connection'":
1287def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection':
1288    """
1289    Return the current alive connection.
1290
1291    Parameters
1292    ----------
1293    rebuild: bool, default False
1294        If `True`, close the previous connection and open a new one.
1295
1296    Returns
1297    -------
1298    A `sqlalchemy.engine.base.Connection` object.
1299    """
1300    import threading
1301    if '_thread_connections' not in self.__dict__:
1302        self.__dict__['_thread_connections'] = {}
1303
1304    self._cleanup_connections()
1305
1306    thread_id = threading.get_ident()
1307
1308    thread_connections = self.__dict__.get('_thread_connections', {})
1309    connection = thread_connections.get(thread_id, None)
1310
1311    if rebuild and connection is not None:
1312        try:
1313            connection.close()
1314        except Exception:
1315            pass
1316
1317        _ = thread_connections.pop(thread_id, None)
1318        connection = None
1319
1320    if connection is None or connection.closed:
1321        connection = self.engine.connect()
1322        thread_connections[thread_id] = connection
1323
1324    return connection

Return the current alive connection.

Parameters
  • rebuild (bool, default False): If True, close the previous connection and open a new one.
Returns
  • A sqlalchemy.engine.base.Connection object.
def test_connection(self, **kw: Any) -> Optional[bool]:
863def test_connection(
864    self,
865    **kw: Any
866) -> Union[bool, None]:
867    """
868    Test if a successful connection to the database may be made.
869
870    Parameters
871    ----------
872    **kw:
873        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
874
875    Returns
876    -------
877    `True` if a connection is made, otherwise `False` or `None` in case of failure.
878
879    """
880    import warnings
881    from meerschaum.connectors.poll import retry_connect
882    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
883    _default_kw.update(kw)
884    with warnings.catch_warnings():
885        warnings.filterwarnings('ignore', 'Could not')
886        try:
887            return retry_connect(**_default_kw)
888        except Exception:
889            return False

Test if a successful connection to the database may be made.

Parameters
Returns
  • True if a connection is made, otherwise False or None in case of failure.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', List[Any], None]":
18def fetch(
19    self,
20    pipe: mrsm.Pipe,
21    begin: Union[datetime, int, str, None] = '',
22    end: Union[datetime, int, str, None] = None,
23    check_existing: bool = True,
24    chunksize: Optional[int] = -1,
25    workers: Optional[int] = None,
26    debug: bool = False,
27    **kw: Any
28) -> Union['pd.DataFrame', List[Any], None]:
29    """Execute the SQL definition and return a Pandas DataFrame.
30
31    Parameters
32    ----------
33    pipe: mrsm.Pipe
34        The pipe object which contains the `fetch` metadata.
35
36        - pipe.columns['datetime']: str
37            - Name of the datetime column for the remote table.
38        - pipe.parameters['fetch']: Dict[str, Any]
39            - Parameters necessary to execute a query.
40        - pipe.parameters['fetch']['definition']: str
41            - Raw SQL query to execute to generate the pandas DataFrame.
42        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
43            - How many minutes before `begin` to search for data (*optional*).
44
45    begin: Union[datetime, int, str, None], default None
46        Most recent datatime to search for data.
47        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
48
49    end: Union[datetime, int, str, None], default None
50        The latest datetime to search for data.
51        If `end` is `None`, do not bound 
52
53    check_existing: bool, defult True
54        If `False`, use a backtrack interval of 0 minutes.
55
56    chunksize: Optional[int], default -1
57        How many rows to load into memory at once.
58        Otherwise the entire result set is loaded into memory.
59
60    workers: Optional[int], default None
61        How many threads to use when consuming the generator.
62        Defaults to the number of cores.
63
64    debug: bool, default False
65        Verbosity toggle.
66
67    Returns
68    -------
69    A pandas DataFrame generator.
70    """
71    meta_def = self.get_pipe_metadef(
72        pipe,
73        begin=begin,
74        end=end,
75        check_existing=check_existing,
76        debug=debug,
77        **kw
78    )
79    chunks = self.read(
80        meta_def,
81        chunksize=chunksize,
82        workers=workers,
83        as_iterator=True,
84        debug=debug,
85    )
86    return chunks

Execute the SQL definition and return a Pandas DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe object which contains the fetch metadata.

    • pipe.columns['datetime']: str
      • Name of the datetime column for the remote table.
    • pipe.parameters['fetch']: Dict[str, Any]
      • Parameters necessary to execute a query.
    • pipe.parameters['fetch']['definition']: str
      • Raw SQL query to execute to generate the pandas DataFrame.
    • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
      • How many minutes before begin to search for data (optional).
  • begin (Union[datetime, int, str, None], default None): Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
  • end (Union[datetime, int, str, None], default None): The latest datetime to search for data. If end is None, do not bound
  • check_existing (bool, defult True): If False, use a backtrack interval of 0 minutes.
  • chunksize (Optional[int], default -1): How many rows to load into memory at once. Otherwise the entire result set is loaded into memory.
  • workers (Optional[int], default None): How many threads to use when consuming the generator. Defaults to the number of cores.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas DataFrame generator.
def get_pipe_metadef( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, debug: bool = False, **kw: Any) -> Optional[str]:
 89def get_pipe_metadef(
 90    self,
 91    pipe: mrsm.Pipe,
 92    params: Optional[Dict[str, Any]] = None,
 93    begin: Union[datetime, int, str, None] = '',
 94    end: Union[datetime, int, str, None] = None,
 95    check_existing: bool = True,
 96    debug: bool = False,
 97    **kw: Any
 98) -> Union[str, None]:
 99    """
100    Return a pipe's meta definition fetch query.
101
102    params: Optional[Dict[str, Any]], default None
103        Optional params dictionary to build the `WHERE` clause.
104        See `meerschaum.utils.sql.build_where`.
105
106    begin: Union[datetime, int, str, None], default None
107        Most recent datatime to search for data.
108        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
109
110    end: Union[datetime, int, str, None], default None
111        The latest datetime to search for data.
112        If `end` is `None`, do not bound 
113
114    check_existing: bool, default True
115        If `True`, apply the backtrack interval.
116
117    debug: bool, default False
118        Verbosity toggle.
119
120    Returns
121    -------
122    A pipe's meta definition fetch query string.
123    """
124    from meerschaum.utils.warnings import warn
125    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
126    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
127    from meerschaum.config import get_config
128
129    dt_col = pipe.columns.get('datetime', None)
130    if not dt_col:
131        dt_col = pipe.guess_datetime()
132        dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
133        is_guess = True
134    else:
135        dt_name = sql_item_name(dt_col, self.flavor, None)
136        is_guess = False
137    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
138    db_dt_typ = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
139
140    if begin not in (None, '') or end is not None:
141        if is_guess:
142            if dt_col is None:
143                warn(
144                    f"Unable to determine a datetime column for {pipe}."
145                    + "\n    Ignoring begin and end...",
146                    stack=False,
147                )
148                begin, end = '', None
149            else:
150                warn(
151                    f"A datetime wasn't specified for {pipe}.\n"
152                    + f"    Using column \"{dt_col}\" for datetime bounds...",
153                    stack=False
154                )
155
156    apply_backtrack = begin == '' and check_existing
157    backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug)
158    btm = (
159        int(backtrack_interval.total_seconds() / 60)
160        if isinstance(backtrack_interval, timedelta)
161        else backtrack_interval
162    )
163    begin = (
164        pipe.get_sync_time(debug=debug)
165        if begin == ''
166        else begin
167    )
168
169    if begin not in (None, '') and end is not None and begin >= end:
170        begin = None
171
172    if dt_name:
173        begin_da = (
174            dateadd_str(
175                flavor=self.flavor,
176                datepart='minute',
177                number=((-1 * btm) if apply_backtrack else 0),
178                begin=begin,
179                db_type=db_dt_typ,
180            )
181            if begin not in ('', None)
182            else None
183        )
184        end_da = (
185            dateadd_str(
186                flavor=self.flavor,
187                datepart='minute',
188                number=0,
189                begin=end,
190                db_type=db_dt_typ,
191            )
192            if end is not None
193            else None
194        )
195
196    definition_name = sql_item_name('definition', self.flavor, None)
197    meta_def = (
198        _simple_fetch_query(pipe, self.flavor) if (
199            (not (pipe.columns or {}).get('id', None))
200            or (not get_config('system', 'experimental', 'join_fetch'))
201        ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw)
202    )
203
204    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
205    if dt_name and (begin_da or end_da):
206        definition_dt_name = f"{definition_name}.{dt_name}"
207        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
208        has_where = True
209        if begin_da:
210            meta_def += f"\n    {definition_dt_name}\n    >=\n    {begin_da}\n"
211        if begin_da and end_da:
212            meta_def += "    AND"
213        if end_da:
214            meta_def += f"\n    {definition_dt_name}\n    <\n    {end_da}\n"
215
216    if params is not None:
217        params_where = build_where(params, self, with_where=False)
218        meta_def += "\n    " + ("AND" if has_where else "WHERE") + "    "
219        has_where = True
220        meta_def += params_where
221
222    return meta_def.rstrip()

Return a pipe's meta definition fetch query.

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.

begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime, int, str, None], default None The latest datetime to search for data. If end is None, do not bound

check_existing: bool, default True If True, apply the backtrack interval.

debug: bool, default False Verbosity toggle.

Returns
  • A pipe's meta definition fetch query string.
def cli(self, debug: bool = False) -> Tuple[bool, str]:
39def cli(
40    self,
41    debug: bool = False,
42) -> SuccessTuple:
43    """
44    Launch a subprocess for an interactive CLI.
45    """
46    from meerschaum.utils.warnings import dprint
47    from meerschaum.utils.venv import venv_exec
48
49    ### Initialize the engine so that dependencies are resolved.
50    _ = self.engine
51
52    env = copy.deepcopy(dict(os.environ))
53    env_key = f"MRSM_SQL_{self.label.upper()}"
54    env_val = json.dumps(self.meta)
55    env[env_key] = env_val
56    cli_code = (
57        "import sys\n"
58        "import meerschaum as mrsm\n"
59        "import os\n"
60        f"conn = mrsm.get_connector('sql:{self.label}')\n"
61        "success, msg = conn._cli_exit()\n"
62        "mrsm.pprint((success, msg))\n"
63        "if not success:\n"
64        "    raise Exception(msg)"
65    )
66    if debug:
67        dprint(cli_code)
68    try:
69        _ = venv_exec(cli_code, venv=None, env=env, debug=debug, capture_output=False)
70    except Exception as e:
71        return False, f"[{self}] Failed to start CLI:\n{e}"
72    return True, "Success"

Launch a subprocess for an interactive CLI.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> List[Tuple[str, str, Optional[str], Dict[str, Any]]]:
144def fetch_pipes_keys(
145    self,
146    connector_keys: Optional[List[str]] = None,
147    metric_keys: Optional[List[str]] = None,
148    location_keys: Optional[List[str]] = None,
149    tags: Optional[List[str]] = None,
150    params: Optional[Dict[str, Any]] = None,
151    debug: bool = False,
152) -> List[
153        Tuple[str, str, Union[str, None], Dict[str, Any]]
154    ]:
155    """
156    Return a list of tuples corresponding to the parameters provided.
157
158    Parameters
159    ----------
160    connector_keys: Optional[List[str]], default None
161        List of connector_keys to search by.
162
163    metric_keys: Optional[List[str]], default None
164        List of metric_keys to search by.
165
166    location_keys: Optional[List[str]], default None
167        List of location_keys to search by.
168
169    tags: Optional[List[str]], default None
170        List of pipes to search by.
171
172    params: Optional[Dict[str, Any]], default None
173        Dictionary of additional parameters to search by.
174        E.g. `--params pipe_id:1`
175
176    debug: bool, default False
177        Verbosity toggle.
178
179    Returns
180    -------
181    A list of tuples of pipes' keys and parameters (connector_keys, metric_key, location_key, parameters).
182    """
183    from meerschaum.utils.packages import attempt_import
184    from meerschaum.utils.misc import separate_negation_values
185    from meerschaum.utils.sql import (
186        OMIT_NULLSFIRST_FLAVORS,
187        table_exists,
188        json_flavors,
189    )
190    from meerschaum._internal.static import STATIC_CONFIG
191    import json
192    from copy import deepcopy
193    sqlalchemy, sqlalchemy_sql_functions = attempt_import(
194        'sqlalchemy',
195        'sqlalchemy.sql.functions', lazy=False,
196    )
197    coalesce = sqlalchemy_sql_functions.coalesce
198
199    if connector_keys is None:
200        connector_keys = []
201    if metric_keys is None:
202        metric_keys = []
203    if location_keys is None:
204        location_keys = []
205    else:
206        location_keys = [
207            (
208                lk
209                if lk not in ('[None]', 'None', 'null')
210                else 'None'
211            )
212            for lk in location_keys
213        ]
214    if tags is None:
215        tags = []
216
217    if params is None:
218        params = {}
219
220    ### Add three primary keys to params dictionary
221    ###   (separated for convenience of arguments).
222    cols = {
223        'connector_keys': [str(ck) for ck in connector_keys],
224        'metric_key': [str(mk) for mk in metric_keys],
225        'location_key': [str(lk) for lk in location_keys],
226    }
227
228    ### Make deep copy so we don't mutate this somewhere else.
229    parameters = deepcopy(params)
230    for col, vals in cols.items():
231        if vals not in [[], ['*']]:
232            parameters[col] = vals
233
234    if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug):
235        return []
236
237    from meerschaum.connectors.sql.tables import get_tables
238    pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes']
239
240    _params = {}
241    for k, v in parameters.items():
242        _v = json.dumps(v) if isinstance(v, dict) else v
243        _params[k] = _v
244
245    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
246    ### Parse regular params.
247    ### If a param begins with '_', negate it instead.
248    _where = [
249        (
250            (coalesce(pipes_tbl.c[key], 'None') == val)
251            if not str(val).startswith(negation_prefix)
252            else (pipes_tbl.c[key] != key)
253        ) for key, val in _params.items()
254        if not isinstance(val, (list, tuple)) and key in pipes_tbl.c
255    ]
256    if self.flavor in json_flavors:
257        sqlalchemy_dialects = mrsm.attempt_import('sqlalchemy.dialects', lazy=False)
258        JSONB = sqlalchemy_dialects.postgresql.JSONB
259    else:
260        JSONB = sqlalchemy.String
261
262    select_cols = (
263        [
264            pipes_tbl.c.connector_keys,
265            pipes_tbl.c.metric_key,
266            pipes_tbl.c.location_key,
267            pipes_tbl.c.parameters,
268        ]
269    )
270
271    q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where))
272    for c, vals in cols.items():
273        if not isinstance(vals, (list, tuple)) or not vals or c not in pipes_tbl.c:
274            continue
275        _in_vals, _ex_vals = separate_negation_values(vals)
276        q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q
277        q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q
278
279    ### Finally, parse tags.
280    tag_groups = [tag.split(',') for tag in tags]
281    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
282
283    ors, nands = [], []
284    if self.flavor in json_flavors:
285        tags_jsonb = pipes_tbl.c['parameters'].cast(JSONB).op('->')('tags').cast(JSONB)
286        for _in_tags, _ex_tags in in_ex_tag_groups:
287            if _in_tags:
288                ors.append(
289                    sqlalchemy.and_(
290                        tags_jsonb.contains(_in_tags)
291                    )
292                )
293            for xt in _ex_tags:
294                nands.append(
295                    sqlalchemy.not_(
296                        sqlalchemy.and_(
297                            tags_jsonb.contains([xt])
298                        )
299                    )
300                )
301    else:
302        for _in_tags, _ex_tags in in_ex_tag_groups:
303            sub_ands = []
304            for nt in _in_tags:
305                sub_ands.append(
306                    sqlalchemy.cast(
307                        pipes_tbl.c['parameters'],
308                        sqlalchemy.String,
309                    ).like(f'%"tags":%"{nt}"%')
310                )
311            if sub_ands:
312                ors.append(sqlalchemy.and_(*sub_ands))
313
314            for xt in _ex_tags:
315                nands.append(
316                    sqlalchemy.cast(
317                        pipes_tbl.c['parameters'],
318                        sqlalchemy.String,
319                    ).not_like(f'%"tags":%"{xt}"%')
320                )
321
322    q = q.where(sqlalchemy.and_(*nands)) if nands else q
323    q = q.where(sqlalchemy.or_(*ors)) if ors else q
324    loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key'])
325    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
326        loc_asc = sqlalchemy.nullsfirst(loc_asc)
327    q = q.order_by(
328        sqlalchemy.asc(pipes_tbl.c['connector_keys']),
329        sqlalchemy.asc(pipes_tbl.c['metric_key']),
330        loc_asc,
331    )
332
333    ### execute the query and return a list of tuples
334    if debug:
335        dprint(q)
336    try:
337        rows = (
338            self.execute(q).fetchall()
339            if self.flavor != 'duckdb'
340            else [
341                (row['connector_keys'], row['metric_key'], row['location_key'])
342                for row in self.read(q).to_dict(orient='records')
343            ]
344        )
345    except Exception as e:
346        error(str(e))
347
348    return rows

Return a list of tuples corresponding to the parameters provided.

Parameters
  • connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
  • metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
  • location_keys (Optional[List[str]], default None): List of location_keys to search by.
  • tags (Optional[List[str]], default None): List of pipes to search by.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. E.g. --params pipe_id:1
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of tuples of pipes' keys and parameters (connector_keys, metric_key, location_key, parameters).
def create_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
369def create_indices(
370    self,
371    pipe: mrsm.Pipe,
372    columns: Optional[List[str]] = None,
373    indices: Optional[List[str]] = None,
374    debug: bool = False
375) -> bool:
376    """
377    Create a pipe's indices.
378    """
379    if pipe.__dict__.get('_skip_check_indices', False):
380        return True
381
382    if debug:
383        dprint(f"Creating indices for {pipe}...")
384
385    if not pipe.indices:
386        warn(f"{pipe} has no index columns; skipping index creation.", stack=False)
387        return True
388
389    cols_to_include = set((columns or []) + (indices or [])) or None
390
391    pipe._clear_cache_key('_columns_indices', debug=debug)
392    ix_queries = {
393        col: queries
394        for col, queries in self.get_create_index_queries(pipe, debug=debug).items()
395        if cols_to_include is None or col in cols_to_include
396    }
397    success = True
398    for col, queries in ix_queries.items():
399        ix_success = all(self.exec_queries(queries, debug=debug, silent=False))
400        success = success and ix_success
401        if not ix_success:
402            warn(f"Failed to create index on column: {col}")
403
404    return success

Create a pipe's indices.

def drop_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
425def drop_indices(
426    self,
427    pipe: mrsm.Pipe,
428    columns: Optional[List[str]] = None,
429    indices: Optional[List[str]] = None,
430    debug: bool = False
431) -> bool:
432    """
433    Drop a pipe's indices.
434    """
435    if debug:
436        dprint(f"Dropping indices for {pipe}...")
437
438    if not pipe.indices:
439        warn(f"No indices to drop for {pipe}.", stack=False)
440        return False
441
442    cols_to_include = set((columns or []) + (indices or [])) or None
443
444    ix_queries = {
445        col: queries
446        for col, queries in self.get_drop_index_queries(pipe, debug=debug).items()
447        if cols_to_include is None or col in cols_to_include
448    }
449    success = True
450    for col, queries in ix_queries.items():
451        ix_success = all(self.exec_queries(queries, debug=debug, silent=(not debug)))
452        if not ix_success:
453            success = False
454            if debug:
455                dprint(f"Failed to drop index on column: {col}")
456    return success

Drop a pipe's indices.

def get_create_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
512def get_create_index_queries(
513    self,
514    pipe: mrsm.Pipe,
515    debug: bool = False,
516) -> Dict[str, List[str]]:
517    """
518    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.
519
520    Parameters
521    ----------
522    pipe: mrsm.Pipe
523        The pipe to which the queries will correspond.
524
525    Returns
526    -------
527    A dictionary of index names mapping to lists of queries.
528    """
529    ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly.
530    if self.flavor == 'duckdb':
531        return {}
532    from meerschaum.utils.sql import (
533        sql_item_name,
534        get_distinct_col_count,
535        UPDATE_QUERIES,
536        get_null_replacement,
537        get_create_table_queries,
538        get_rename_table_queries,
539        COALESCE_UNIQUE_INDEX_FLAVORS,
540    )
541    from meerschaum.utils.dtypes import are_dtypes_equal
542    from meerschaum.utils.dtypes.sql import (
543        get_db_type_from_pd_type,
544        get_pd_type_from_db_type,
545        AUTO_INCREMENT_COLUMN_FLAVORS,
546    )
547    from meerschaum.config import get_config
548    index_queries = {}
549
550    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES
551    static = pipe.parameters.get('static', False)
552    null_indices = pipe.parameters.get('null_indices', True)
553    index_names = pipe.get_indices()
554    unique_index_name_unquoted = index_names.get('unique', None) or f'IX_{pipe.target}_unique'
555    if upsert:
556        _ = index_names.pop('unique', None)
557    indices = pipe.indices
558    existing_cols_types = pipe.get_columns_types(debug=debug)
559    existing_cols_pd_types = {
560        col: get_pd_type_from_db_type(typ)
561        for col, typ in existing_cols_types.items()
562    }
563    existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug)
564    existing_ix_names = set()
565    existing_primary_keys = []
566    existing_clustered_primary_keys = []
567    for col, col_indices in existing_cols_indices.items():
568        for col_ix_doc in col_indices:
569            existing_ix_names.add(col_ix_doc.get('name', '').lower())
570            if col_ix_doc.get('type', None) == 'PRIMARY KEY':
571                existing_primary_keys.append(col.lower())
572                if col_ix_doc.get('clustered', True):
573                    existing_clustered_primary_keys.append(col.lower())
574
575    _datetime = pipe.get_columns('datetime', error=False)
576    _datetime_name = (
577        sql_item_name(_datetime, self.flavor, None)
578        if _datetime is not None else None
579    )
580    _datetime_index_name = (
581        sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None)
582        if index_names.get('datetime', None)
583        else None
584    )
585    _id = pipe.get_columns('id', error=False)
586    _id_name = (
587        sql_item_name(_id, self.flavor, None)
588        if _id is not None
589        else None
590    )
591    primary_key = pipe.columns.get('primary', None)
592    primary_key_name = (
593        sql_item_name(primary_key, flavor=self.flavor, schema=None)
594        if primary_key
595        else None
596    )
597    autoincrement = (
598        pipe.parameters.get('autoincrement', False)
599        or (
600            primary_key is not None
601            and primary_key not in existing_cols_pd_types
602        )
603    )
604    primary_key_db_type = (
605        get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int') or 'int', self.flavor)
606        if primary_key
607        else None
608    )
609    primary_key_constraint_name = (
610        sql_item_name(f'PK_{pipe.target}', self.flavor, None)
611        if primary_key is not None
612        else None
613    )
614    primary_key_clustered = "CLUSTERED" if _datetime is None else "NONCLUSTERED"
615    datetime_clustered = (
616        "CLUSTERED"
617        if not existing_clustered_primary_keys and _datetime is not None
618        else "NONCLUSTERED"
619    )
620    include_columns_str = "\n    ,".join(
621        [
622            sql_item_name(col, flavor=self.flavor) for col in existing_cols_types
623            if col != _datetime
624        ]
625    ).rstrip(',')
626    include_clause = (
627        (
628            f"\nINCLUDE (\n    {include_columns_str}\n)"
629        )
630        if datetime_clustered == 'NONCLUSTERED'
631        else ''
632    )
633
634    _id_index_name = (
635        sql_item_name(index_names['id'], self.flavor, None)
636        if index_names.get('id', None)
637        else None
638    )
639    _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
640    _create_space_partition = get_config('system', 'experimental', 'space')
641
642    ### create datetime index
643    dt_query = None
644    if _datetime is not None:
645        if (
646            self.flavor in ('timescaledb', 'timescaledb-ha')
647            and pipe.parameters.get('hypertable', True)
648        ):
649            _id_count = (
650                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
651                if (_id is not None and _create_space_partition) else None
652            )
653
654            chunk_interval = pipe.get_chunk_interval(debug=debug)
655            chunk_interval_minutes = (
656                chunk_interval
657                if isinstance(chunk_interval, int)
658                else int(chunk_interval.total_seconds() / 60)
659            )
660            chunk_time_interval = (
661                f"INTERVAL '{chunk_interval_minutes} MINUTES'"
662                if isinstance(chunk_interval, timedelta)
663                else f'{chunk_interval_minutes}'
664            )
665
666            dt_query = (
667                f"SELECT public.create_hypertable('{_pipe_name}', " +
668                f"'{_datetime}', "
669                + (
670                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
671                    else ''
672                )
673                + f'chunk_time_interval => {chunk_time_interval}, '
674                + 'if_not_exists => true, '
675                + "migrate_data => true);"
676            )
677        elif _datetime_index_name and _datetime != primary_key:
678            if self.flavor == 'mssql':
679                dt_query = (
680                    f"CREATE {datetime_clustered} INDEX {_datetime_index_name} "
681                    f"\nON {_pipe_name} ({_datetime_name}){include_clause}"
682                )
683            else:
684                dt_query = (
685                    f"CREATE INDEX {_datetime_index_name} "
686                    + f"ON {_pipe_name} ({_datetime_name})"
687                )
688
689    if dt_query:
690        index_queries[_datetime] = [dt_query]
691
692    primary_queries = []
693    if (
694        primary_key is not None
695        and primary_key.lower() not in existing_primary_keys
696        and not static
697    ):
698        if autoincrement and primary_key not in existing_cols_pd_types:
699            autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get(
700                self.flavor,
701                AUTO_INCREMENT_COLUMN_FLAVORS['default']
702            )
703            primary_queries.extend([
704                (
705                    f"ALTER TABLE {_pipe_name}\n"
706                    f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}"
707                ),
708            ])
709        elif not autoincrement and primary_key in existing_cols_pd_types:
710            if self.flavor in ('sqlite', 'geopackage'):
711                new_table_name = sql_item_name(
712                    f'_new_{pipe.target}',
713                    self.flavor,
714                    self.get_pipe_schema(pipe)
715                )
716                select_cols_str = ', '.join(
717                    [
718                        sql_item_name(col, self.flavor, None)
719                        for col in existing_cols_types
720                    ]
721                )
722                primary_queries.extend(
723                    get_create_table_queries(
724                        existing_cols_pd_types,
725                        f'_new_{pipe.target}',
726                        self.flavor,
727                        schema=self.get_pipe_schema(pipe),
728                        primary_key=primary_key,
729                    ) + [
730                        (
731                            f"INSERT INTO {new_table_name} ({select_cols_str})\n"
732                            f"SELECT {select_cols_str}\nFROM {_pipe_name}"
733                        ),
734                        f"DROP TABLE {_pipe_name}",
735                    ] + get_rename_table_queries(
736                        f'_new_{pipe.target}',
737                        pipe.target,
738                        self.flavor,
739                        schema=self.get_pipe_schema(pipe),
740                    )
741                )
742            elif self.flavor == 'oracle':
743                primary_queries.extend([
744                    (
745                        f"ALTER TABLE {_pipe_name}\n"
746                        f"MODIFY {primary_key_name} NOT NULL"
747                    ),
748                    (
749                        f"ALTER TABLE {_pipe_name}\n"
750                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
751                    )
752                ])
753            elif self.flavor in ('mysql', 'mariadb'):
754                primary_queries.extend([
755                    (
756                        f"ALTER TABLE {_pipe_name}\n"
757                        f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL"
758                    ),
759                    (
760                        f"ALTER TABLE {_pipe_name}\n"
761                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
762                    )
763                ])
764            elif self.flavor in ('timescaledb', 'timescaledb-ha'):
765                primary_queries.extend([
766                    (
767                        f"ALTER TABLE {_pipe_name}\n"
768                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
769                    ),
770                    (
771                        f"ALTER TABLE {_pipe_name}\n"
772                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + (
773                            f"{_datetime_name}, " if _datetime_name else ""
774                        ) + f"{primary_key_name})"
775                    ),
776                ])
777            elif self.flavor in ('citus', 'postgresql', 'duckdb', 'postgis'):
778                primary_queries.extend([
779                    (
780                        f"ALTER TABLE {_pipe_name}\n"
781                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
782                    ),
783                    (
784                        f"ALTER TABLE {_pipe_name}\n"
785                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
786                    ),
787                ])
788            else:
789                primary_queries.extend([
790                    (
791                        f"ALTER TABLE {_pipe_name}\n"
792                        f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL"
793                    ),
794                    (
795                        f"ALTER TABLE {_pipe_name}\n"
796                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})"
797                    ),
798                ])
799        index_queries[primary_key] = primary_queries
800
801    ### create id index
802    if _id_name is not None:
803        if self.flavor in ('timescaledb', 'timescaledb-ha'):
804            ### Already created indices via create_hypertable.
805            id_query = (
806                None if (_id is not None and _create_space_partition)
807                else (
808                    f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})"
809                    if _id is not None
810                    else None
811                )
812            )
813            pass
814        else: ### mssql, sqlite, etc.
815            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"
816
817        if id_query is not None:
818            index_queries[_id] = id_query if isinstance(id_query, list) else [id_query]
819
820    ### Create indices for other labels in `pipe.columns`.
821    other_index_names = {
822        ix_key: ix_unquoted
823        for ix_key, ix_unquoted in index_names.items()
824        if (
825            ix_key not in ('datetime', 'id', 'primary')
826            and ix_unquoted.lower() not in existing_ix_names
827        )
828    }
829    for ix_key, ix_unquoted in other_index_names.items():
830        ix_name = sql_item_name(ix_unquoted, self.flavor, None)
831        cols = indices[ix_key]
832        if not isinstance(cols, (list, tuple)):
833            cols = [cols]
834        if ix_key == 'unique' and upsert:
835            continue
836        cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col]
837        if not cols_names:
838            continue
839
840        cols_names_str = ", ".join(cols_names)
841        index_query_params_clause = f" ({cols_names_str})"
842        if self.flavor in ('postgis', 'timescaledb-ha'):
843            for col in cols:
844                col_typ = existing_cols_pd_types.get(cols[0], 'object')
845                if col_typ != 'object' and are_dtypes_equal(col_typ, 'geometry'):
846                    index_query_params_clause = f" USING GIST ({cols_names_str})"
847                    break
848
849        index_queries[ix_key] = [
850            f"CREATE INDEX {ix_name} ON {_pipe_name}{index_query_params_clause}"
851        ]
852
853    indices_cols_str = ', '.join(
854        list({
855            sql_item_name(ix, self.flavor)
856            for ix_key, ix in pipe.columns.items()
857            if ix and ix in existing_cols_types
858        })
859    )
860    coalesce_indices_cols_str = ', '.join(
861        [
862            (
863                (
864                    "COALESCE("
865                    + sql_item_name(ix, self.flavor)
866                    + ", "
867                    + get_null_replacement(existing_cols_types[ix], self.flavor)
868                    + ") "
869                )
870                if ix_key != 'datetime' and null_indices
871                else sql_item_name(ix, self.flavor)
872            )
873            for ix_key, ix in pipe.columns.items()
874            if ix and ix in existing_cols_types
875        ]
876    )
877    unique_index_name = sql_item_name(unique_index_name_unquoted, self.flavor)
878    constraint_name_unquoted = unique_index_name_unquoted.replace('IX_', 'UQ_')
879    constraint_name = sql_item_name(constraint_name_unquoted, self.flavor)
880    add_constraint_query = (
881        f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})"
882    )
883    unique_index_cols_str = (
884        indices_cols_str
885        if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS or not null_indices
886        else coalesce_indices_cols_str
887    )
888    create_unique_index_query = (
889        f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})"
890    )
891    constraint_queries = [create_unique_index_query]
892    if self.flavor not in ('sqlite', 'geopackage'):
893        constraint_queries.append(add_constraint_query)
894    if upsert and indices_cols_str:
895        index_queries[unique_index_name] = constraint_queries
896    return index_queries

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of index names mapping to lists of queries.
def get_drop_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
 899def get_drop_index_queries(
 900    self,
 901    pipe: mrsm.Pipe,
 902    debug: bool = False,
 903) -> Dict[str, List[str]]:
 904    """
 905    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.
 906
 907    Parameters
 908    ----------
 909    pipe: mrsm.Pipe
 910        The pipe to which the queries will correspond.
 911
 912    Returns
 913    -------
 914    A dictionary of column names mapping to lists of queries.
 915    """
 916    ### NOTE: Due to breaking changes within DuckDB, indices must be skipped.
 917    if self.flavor == 'duckdb':
 918        return {}
 919    if not pipe.exists(debug=debug):
 920        return {}
 921
 922    from collections import defaultdict
 923    from meerschaum.utils.sql import (
 924        sql_item_name,
 925        table_exists,
 926        hypertable_queries,
 927        DROP_INDEX_IF_EXISTS_FLAVORS,
 928    )
 929    drop_queries = defaultdict(lambda: [])
 930    schema = self.get_pipe_schema(pipe)
 931    index_schema = schema if self.flavor != 'mssql' else None
 932    indices = {
 933        ix_key: ix
 934        for ix_key, ix in pipe.get_indices().items()
 935    }
 936    cols_indices = pipe.get_columns_indices(debug=debug)
 937    existing_indices = set()
 938    clustered_ix = None
 939    for col, ix_metas in cols_indices.items():
 940        for ix_meta in ix_metas:
 941            ix_name = ix_meta.get('name', None)
 942            if ix_meta.get('clustered', False):
 943                clustered_ix = ix_name
 944            existing_indices.add(ix_name.lower())
 945    pipe_name = sql_item_name(pipe.target, self.flavor, schema)
 946    pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None)
 947    upsert = pipe.upsert
 948
 949    if self.flavor not in hypertable_queries:
 950        is_hypertable = False
 951    else:
 952        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
 953        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None
 954
 955    if_exists_str = "IF EXISTS " if self.flavor in DROP_INDEX_IF_EXISTS_FLAVORS else ""
 956    if is_hypertable:
 957        nuke_queries = []
 958        temp_table = '_' + pipe.target + '_temp_migration'
 959        temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe))
 960
 961        if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug):
 962            nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}")
 963        nuke_queries += [
 964            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
 965            f"DROP TABLE {if_exists_str}{pipe_name}",
 966            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}",
 967        ]
 968        nuke_ix_keys = ('datetime', 'id')
 969        nuked = False
 970        for ix_key in nuke_ix_keys:
 971            if ix_key in indices and not nuked:
 972                drop_queries[ix_key].extend(nuke_queries)
 973                nuked = True
 974
 975    for ix_key, ix_unquoted in indices.items():
 976        if ix_key in drop_queries:
 977            continue
 978        if ix_unquoted.lower() not in existing_indices:
 979            continue
 980
 981        if (
 982            ix_key == 'unique'
 983            and upsert
 984            and self.flavor not in ('sqlite', 'geopackage')
 985            and not is_hypertable
 986        ):
 987            constraint_name_unquoted = ix_unquoted.replace('IX_', 'UQ_')
 988            constraint_name = sql_item_name(constraint_name_unquoted, self.flavor)
 989            constraint_or_index = (
 990                "CONSTRAINT"
 991                if self.flavor not in ('mysql', 'mariadb')
 992                else 'INDEX'
 993            )
 994            drop_queries[ix_key].append(
 995                f"ALTER TABLE {pipe_name}\n"
 996                f"DROP {constraint_or_index} {constraint_name}"
 997            )
 998
 999        query = (
1000            (
1001                f"ALTER TABLE {pipe_name}\n"
1002                if self.flavor in ('mysql', 'mariadb')
1003                else ''
1004            )
1005            + f"DROP INDEX {if_exists_str}"
1006            + sql_item_name(ix_unquoted, self.flavor, index_schema)
1007        )
1008        if self.flavor == 'mssql':
1009            query += f"\nON {pipe_name}"
1010            if ix_unquoted == clustered_ix:
1011                query += "\nWITH (ONLINE = ON, MAXDOP = 4)"
1012        drop_queries[ix_key].append(query)
1013
1014
1015    return drop_queries

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of column names mapping to lists of queries.
def get_add_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', _is_db_types: bool = False, debug: bool = False) -> List[str]:
3158def get_add_columns_queries(
3159    self,
3160    pipe: mrsm.Pipe,
3161    df: Union[pd.DataFrame, Dict[str, str]],
3162    _is_db_types: bool = False,
3163    debug: bool = False,
3164) -> List[str]:
3165    """
3166    Add new null columns of the correct type to a table from a dataframe.
3167
3168    Parameters
3169    ----------
3170    pipe: mrsm.Pipe
3171        The pipe to be altered.
3172
3173    df: Union[pd.DataFrame, Dict[str, str]]
3174        The pandas DataFrame which contains new columns.
3175        If a dictionary is provided, assume it maps columns to Pandas data types.
3176
3177    _is_db_types: bool, default False
3178        If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes.
3179
3180    Returns
3181    -------
3182    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
3183    """
3184    if not pipe.exists(debug=debug):
3185        return []
3186
3187    if pipe.parameters.get('static', False):
3188        return []
3189
3190    from decimal import Decimal
3191    import copy
3192    from meerschaum.utils.sql import (
3193        sql_item_name,
3194        SINGLE_ALTER_TABLE_FLAVORS,
3195        get_table_cols_types,
3196    )
3197    from meerschaum.utils.dtypes.sql import (
3198        get_pd_type_from_db_type,
3199        get_db_type_from_pd_type,
3200    )
3201    from meerschaum.utils.misc import flatten_list
3202    is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False
3203    if is_dask:
3204        df = df.partitions[0].compute()
3205    df_cols_types = (
3206        {
3207            col: str(typ)
3208            for col, typ in df.dtypes.items()
3209        }
3210        if not isinstance(df, dict)
3211        else copy.deepcopy(df)
3212    )
3213    if not isinstance(df, dict) and len(df.index) > 0:
3214        for col, typ in list(df_cols_types.items()):
3215            if typ != 'object':
3216                continue
3217            val = df.iloc[0][col]
3218            if isinstance(val, (dict, list)):
3219                df_cols_types[col] = 'json'
3220            elif isinstance(val, Decimal):
3221                df_cols_types[col] = 'numeric'
3222            elif isinstance(val, str):
3223                df_cols_types[col] = 'str'
3224    db_cols_types = {
3225        col: get_pd_type_from_db_type(typ)
3226        for col, typ in get_table_cols_types(
3227            pipe.target,
3228            self,
3229            schema=self.get_pipe_schema(pipe),
3230            debug=debug,
3231        ).items()
3232    }
3233    new_cols = set(df_cols_types) - set(db_cols_types)
3234    if not new_cols:
3235        return []
3236
3237    new_cols_types = {
3238        col: get_db_type_from_pd_type(
3239            df_cols_types[col],
3240            self.flavor
3241        )
3242        for col in new_cols
3243        if col and df_cols_types.get(col, None)
3244    }
3245
3246    alter_table_query = "ALTER TABLE " + sql_item_name(
3247        pipe.target, self.flavor, self.get_pipe_schema(pipe)
3248    )
3249    queries = []
3250    for col, typ in new_cols_types.items():
3251        add_col_query = (
3252            "\nADD "
3253            + sql_item_name(col, self.flavor, None)
3254            + " " + typ + ","
3255        )
3256
3257        if self.flavor in SINGLE_ALTER_TABLE_FLAVORS:
3258            queries.append(alter_table_query + add_col_query[:-1])
3259        else:
3260            alter_table_query += add_col_query
3261
3262    ### For most flavors, only one query is required.
3263    ### This covers SQLite which requires one query per column.
3264    if not queries:
3265        queries.append(alter_table_query[:-1])
3266
3267    if self.flavor != 'duckdb':
3268        return queries
3269
3270    ### NOTE: For DuckDB, we must drop and rebuild the indices.
3271    drop_index_queries = list(flatten_list(
3272        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
3273    ))
3274    create_index_queries = list(flatten_list(
3275        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
3276    ))
3277
3278    return drop_index_queries + queries + create_index_queries

Add new null columns of the correct type to a table from a dataframe.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
  • _is_db_types (bool, default False): If True, assume df is a dictionary mapping columns to SQL native dtypes.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def get_alter_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', debug: bool = False) -> List[str]:
3281def get_alter_columns_queries(
3282    self,
3283    pipe: mrsm.Pipe,
3284    df: Union[pd.DataFrame, Dict[str, str]],
3285    debug: bool = False,
3286) -> List[str]:
3287    """
3288    If we encounter a column of a different type, set the entire column to text.
3289    If the altered columns are numeric, alter to numeric instead.
3290
3291    Parameters
3292    ----------
3293    pipe: mrsm.Pipe
3294        The pipe to be altered.
3295
3296    df: Union[pd.DataFrame, Dict[str, str]]
3297        The pandas DataFrame which may contain altered columns.
3298        If a dict is provided, assume it maps columns to Pandas data types.
3299
3300    Returns
3301    -------
3302    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
3303    """
3304    if not pipe.exists(debug=debug) or pipe.static:
3305        return []
3306
3307    from meerschaum.utils.sql import (
3308        sql_item_name,
3309        get_table_cols_types,
3310        DROP_IF_EXISTS_FLAVORS,
3311        SINGLE_ALTER_TABLE_FLAVORS,
3312    )
3313    from meerschaum.utils.dataframe import get_numeric_cols
3314    from meerschaum.utils.dtypes import are_dtypes_equal
3315    from meerschaum.utils.dtypes.sql import (
3316        get_pd_type_from_db_type,
3317        get_db_type_from_pd_type,
3318    )
3319    from meerschaum.utils.misc import flatten_list, generate_password, items_str
3320    target = pipe.target
3321    session_id = generate_password(3)
3322    numeric_cols = (
3323        get_numeric_cols(df)
3324        if not isinstance(df, dict)
3325        else [
3326            col
3327            for col, typ in df.items()
3328            if typ.startswith('numeric')
3329        ]
3330    )
3331    df_cols_types = (
3332        {
3333            col: str(typ)
3334            for col, typ in df.dtypes.items()
3335        }
3336        if not isinstance(df, dict)
3337        else df
3338    )
3339    db_cols_types = {
3340        col: get_pd_type_from_db_type(typ)
3341        for col, typ in get_table_cols_types(
3342            pipe.target,
3343            self,
3344            schema=self.get_pipe_schema(pipe),
3345            debug=debug,
3346        ).items()
3347    }
3348    pipe_dtypes = pipe.get_dtypes(debug=debug)
3349    pipe_bool_cols = [col for col, typ in pipe_dtypes.items() if are_dtypes_equal(str(typ), 'bool')]
3350    pd_db_df_aliases = {
3351        'int': 'bool',
3352        'float': 'bool',
3353        'numeric': 'bool',
3354        'guid': 'object',
3355    }
3356    if self.flavor == 'oracle':
3357        pd_db_df_aliases.update({
3358            'int': 'numeric',
3359            'date': 'datetime',
3360            'numeric': 'int',
3361        })
3362
3363    altered_cols = {
3364        col: (db_cols_types.get(col, 'object'), typ)
3365        for col, typ in df_cols_types.items()
3366        if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower())
3367        and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string')
3368    }
3369
3370    if debug and altered_cols:
3371        dprint("Columns to be altered:")
3372        mrsm.pprint(altered_cols)
3373
3374    ### NOTE: Special columns (numerics, bools, etc.) are captured and cached upon detection.
3375    new_special_cols = pipe._get_cached_value('new_special_cols', debug=debug) or {}
3376    new_special_db_cols_types = {
3377        col: (db_cols_types.get(col, 'object'), typ)
3378        for col, typ in new_special_cols.items()
3379    }
3380    if debug:
3381        dprint("Cached new special columns:")
3382        mrsm.pprint(new_special_cols)
3383        dprint("New special columns db types:")
3384        mrsm.pprint(new_special_db_cols_types)
3385
3386    altered_cols.update(new_special_db_cols_types)
3387
3388    ### NOTE: Sometimes bools are coerced into ints or floats.
3389    altered_cols_to_ignore = set()
3390    for col, (db_typ, df_typ) in altered_cols.items():
3391        for db_alias, df_alias in pd_db_df_aliases.items():
3392            if (
3393                db_alias in db_typ.lower()
3394                and df_alias in df_typ.lower()
3395                and col not in new_special_cols
3396            ):
3397                altered_cols_to_ignore.add(col)
3398
3399    ### Oracle's bool handling sometimes mixes NUMBER and INT.
3400    for bool_col in pipe_bool_cols:
3401        if bool_col not in altered_cols:
3402            continue
3403        db_is_bool_compatible = (
3404            are_dtypes_equal('int', altered_cols[bool_col][0])
3405            or are_dtypes_equal('float', altered_cols[bool_col][0])
3406            or are_dtypes_equal('numeric', altered_cols[bool_col][0])
3407            or are_dtypes_equal('bool', altered_cols[bool_col][0])
3408        )
3409        df_is_bool_compatible = (
3410            are_dtypes_equal('int', altered_cols[bool_col][1])
3411            or are_dtypes_equal('float', altered_cols[bool_col][1])
3412            or are_dtypes_equal('numeric', altered_cols[bool_col][1])
3413            or are_dtypes_equal('bool', altered_cols[bool_col][1])
3414        )
3415        if db_is_bool_compatible and df_is_bool_compatible:
3416            altered_cols_to_ignore.add(bool_col)
3417
3418    if debug and altered_cols_to_ignore:
3419        dprint("Ignoring the following altered columns (false positives).")
3420        mrsm.pprint(altered_cols_to_ignore)
3421
3422    for col in altered_cols_to_ignore:
3423        _ = altered_cols.pop(col, None)
3424
3425    if not altered_cols:
3426        return []
3427
3428    if numeric_cols:
3429        explicit_pipe_dtypes = pipe.get_dtypes(infer=False, debug=debug)
3430        explicit_pipe_dtypes.update({col: 'numeric' for col in numeric_cols})
3431        pipe.dtypes = explicit_pipe_dtypes
3432        if not pipe.temporary:
3433            edit_success, edit_msg = pipe.edit(debug=debug)
3434            if not edit_success:
3435                warn(
3436                    f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n"
3437                    + f"{edit_msg}"
3438                )
3439    else:
3440        numeric_cols.extend([col for col, typ in pipe_dtypes.items() if typ.startswith('numeric')])
3441
3442    numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False)
3443    text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False)
3444    altered_cols_types = {
3445        col: (
3446            numeric_type
3447            if col in numeric_cols
3448            else text_type
3449        )
3450        for col, (db_typ, typ) in altered_cols.items()
3451    }
3452
3453    if self.flavor in ('sqlite', 'geopackage'):
3454        temp_table_name = '-' + session_id + '_' + target
3455        rename_query = (
3456            "ALTER TABLE "
3457            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3458            + " RENAME TO "
3459            + sql_item_name(temp_table_name, self.flavor, None)
3460        )
3461        create_query = (
3462            "CREATE TABLE "
3463            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3464            + " (\n"
3465        )
3466        for col_name, col_typ in db_cols_types.items():
3467            create_query += (
3468                sql_item_name(col_name, self.flavor, None)
3469                + " "
3470                + (
3471                    col_typ
3472                    if col_name not in altered_cols
3473                    else altered_cols_types[col_name]
3474                )
3475                + ",\n"
3476            )
3477        create_query = create_query[:-2] + "\n)"
3478
3479        insert_query = (
3480            "INSERT INTO "
3481            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3482            + ' ('
3483            + ', '.join([
3484                sql_item_name(col_name, self.flavor, None)
3485                for col_name in db_cols_types
3486            ])
3487            + ')'
3488            + "\nSELECT\n"
3489        )
3490        for col_name in db_cols_types:
3491            new_col_str = (
3492                sql_item_name(col_name, self.flavor, None)
3493                if col_name not in altered_cols
3494                else (
3495                    "CAST("
3496                    + sql_item_name(col_name, self.flavor, None)
3497                    + " AS "
3498                    + altered_cols_types[col_name]
3499                    + ")"
3500                )
3501            )
3502            insert_query += new_col_str + ",\n"
3503
3504        insert_query = insert_query[:-2] + (
3505            f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}"
3506        )
3507
3508        if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
3509
3510        drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name(
3511            temp_table_name, self.flavor, self.get_pipe_schema(pipe)
3512        )
3513        return [
3514            rename_query,
3515            create_query,
3516            insert_query,
3517            drop_query,
3518        ]
3519
3520    queries = []
3521    if self.flavor == 'oracle':
3522        for col, typ in altered_cols_types.items():
3523            add_query = (
3524                "ALTER TABLE "
3525                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3526                + "\nADD " + sql_item_name(col + '_temp', self.flavor, None)
3527                + " " + typ
3528            )
3529            queries.append(add_query)
3530
3531        for col, typ in altered_cols_types.items():
3532            populate_temp_query = (
3533                "UPDATE "
3534                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3535                + "\nSET " + sql_item_name(col + '_temp', self.flavor, None)
3536                + ' = ' + sql_item_name(col, self.flavor, None)
3537            )
3538            queries.append(populate_temp_query)
3539
3540        for col, typ in altered_cols_types.items():
3541            set_old_cols_to_null_query = (
3542                "UPDATE "
3543                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3544                + "\nSET " + sql_item_name(col, self.flavor, None)
3545                + ' = NULL'
3546            )
3547            queries.append(set_old_cols_to_null_query)
3548
3549        for col, typ in altered_cols_types.items():
3550            alter_type_query = (
3551                "ALTER TABLE "
3552                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3553                + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' '
3554                + typ
3555            )
3556            queries.append(alter_type_query)
3557
3558        for col, typ in altered_cols_types.items():
3559            set_old_to_temp_query = (
3560                "UPDATE "
3561                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3562                + "\nSET " + sql_item_name(col, self.flavor, None)
3563                + ' = ' + sql_item_name(col + '_temp', self.flavor, None)
3564            )
3565            queries.append(set_old_to_temp_query)
3566
3567        for col, typ in altered_cols_types.items():
3568            drop_temp_query = (
3569                "ALTER TABLE "
3570                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3571                + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None)
3572            )
3573            queries.append(drop_temp_query)
3574
3575        return queries
3576
3577    query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3578    for col, typ in altered_cols_types.items():
3579        alter_col_prefix = (
3580            'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle')
3581            else 'MODIFY'
3582        )
3583        type_prefix = (
3584            '' if self.flavor in ('mssql', 'mariadb', 'mysql')
3585            else 'TYPE '
3586        )
3587        column_str = 'COLUMN' if self.flavor != 'oracle' else ''
3588        query_suffix = (
3589            f"\n{alter_col_prefix} {column_str} "
3590            + sql_item_name(col, self.flavor, None)
3591            + " " + type_prefix + typ + ","
3592        )
3593        if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS:
3594            query += query_suffix
3595        else:
3596            queries.append(query + query_suffix[:-1])
3597
3598    if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS:
3599        queries.append(query[:-1])
3600
3601    if self.flavor != 'duckdb':
3602        return queries
3603
3604    drop_index_queries = list(flatten_list(
3605        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
3606    ))
3607    create_index_queries = list(flatten_list(
3608        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
3609    ))
3610
3611    return drop_index_queries + queries + create_index_queries

If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def delete_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
1018def delete_pipe(
1019    self,
1020    pipe: mrsm.Pipe,
1021    debug: bool = False,
1022) -> SuccessTuple:
1023    """
1024    Delete a Pipe's registration.
1025    """
1026    from meerschaum.utils.packages import attempt_import
1027    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
1028
1029    if not pipe.id:
1030        return False, f"{pipe} is not registered."
1031
1032    ### ensure pipes table exists
1033    from meerschaum.connectors.sql.tables import get_tables
1034    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
1035
1036    q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
1037    if not self.exec(q, debug=debug):
1038        return False, f"Failed to delete registration for {pipe}."
1039
1040    return True, "Success"

Delete a Pipe's registration.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, str, NoneType] = None, end: Union[datetime.datetime, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, chunksize: Optional[int] = -1, as_iterator: bool = False, debug: bool = False, **kw: Any) -> 'Union[pd.DataFrame, None]':
1043def get_pipe_data(
1044    self,
1045    pipe: mrsm.Pipe,
1046    select_columns: Optional[List[str]] = None,
1047    omit_columns: Optional[List[str]] = None,
1048    begin: Union[datetime, str, None] = None,
1049    end: Union[datetime, str, None] = None,
1050    params: Optional[Dict[str, Any]] = None,
1051    order: str = 'asc',
1052    limit: Optional[int] = None,
1053    begin_add_minutes: int = 0,
1054    end_add_minutes: int = 0,
1055    chunksize: Optional[int] = -1,
1056    as_iterator: bool = False,
1057    debug: bool = False,
1058    **kw: Any
1059) -> Union[pd.DataFrame, None]:
1060    """
1061    Access a pipe's data from the SQL instance.
1062
1063    Parameters
1064    ----------
1065    pipe: mrsm.Pipe:
1066        The pipe to get data from.
1067
1068    select_columns: Optional[List[str]], default None
1069        If provided, only select these given columns.
1070        Otherwise select all available columns (i.e. `SELECT *`).
1071
1072    omit_columns: Optional[List[str]], default None
1073        If provided, remove these columns from the selection.
1074
1075    begin: Union[datetime, str, None], default None
1076        If provided, get rows newer than or equal to this value.
1077
1078    end: Union[datetime, str, None], default None
1079        If provided, get rows older than or equal to this value.
1080
1081    params: Optional[Dict[str, Any]], default None
1082        Additional parameters to filter by.
1083        See `meerschaum.connectors.sql.build_where`.
1084
1085    order: Optional[str], default 'asc'
1086        The selection order for all of the indices in the query.
1087        If `None`, omit the `ORDER BY` clause.
1088
1089    limit: Optional[int], default None
1090        If specified, limit the number of rows retrieved to this value.
1091
1092    begin_add_minutes: int, default 0
1093        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`).
1094
1095    end_add_minutes: int, default 0
1096        The number of minutes to add to the `end` datetime (i.e. `DATEADD`).
1097
1098    chunksize: Optional[int], default -1
1099        The size of dataframe chunks to load into memory.
1100
1101    as_iterator: bool, default False
1102        If `True`, return the chunks iterator directly.
1103
1104    debug: bool, default False
1105        Verbosity toggle.
1106
1107    Returns
1108    -------
1109    A `pd.DataFrame` of the pipe's data.
1110
1111    """
1112    import functools
1113    from meerschaum.utils.packages import import_pandas
1114    from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal
1115    from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
1116    pd = import_pandas()
1117    is_dask = 'dask' in pd.__name__
1118
1119    cols_types = pipe.get_columns_types(debug=debug) if pipe.enforce else {}
1120    pipe_dtypes = pipe.get_dtypes(infer=False, debug=debug) if pipe.enforce else {}
1121
1122    remote_pandas_types = {
1123        col: to_pandas_dtype(get_pd_type_from_db_type(typ))
1124        for col, typ in cols_types.items()
1125    }
1126    remote_dt_cols_types = {
1127        col: typ
1128        for col, typ in remote_pandas_types.items()
1129        if are_dtypes_equal(typ, 'datetime')
1130    }
1131    remote_dt_tz_aware_cols_types = {
1132        col: typ
1133        for col, typ in remote_dt_cols_types.items()
1134        if ',' in typ or typ == 'datetime'
1135    }
1136    remote_dt_tz_naive_cols_types = {
1137        col: typ
1138        for col, typ in remote_dt_cols_types.items()
1139        if col not in remote_dt_tz_aware_cols_types
1140    }
1141
1142    configured_pandas_types = {
1143        col: to_pandas_dtype(typ)
1144        for col, typ in pipe_dtypes.items()
1145    }
1146    configured_lower_precision_dt_cols_types = {
1147        col: typ
1148        for col, typ in pipe_dtypes.items()
1149        if (
1150            are_dtypes_equal('datetime', typ)
1151            and '[' in typ
1152            and 'ns' not in typ
1153        )
1154        
1155    }
1156
1157    dtypes = {
1158        **remote_pandas_types,
1159        **configured_pandas_types,
1160        **remote_dt_tz_aware_cols_types,
1161        **remote_dt_tz_naive_cols_types,
1162        **configured_lower_precision_dt_cols_types
1163    } if pipe.enforce else {}
1164
1165    existing_cols = cols_types.keys()
1166    select_columns = (
1167        [
1168            col
1169            for col in existing_cols
1170            if col not in (omit_columns or [])
1171        ]
1172        if not select_columns
1173        else [
1174            col
1175            for col in select_columns
1176            if col in existing_cols
1177            and col not in (omit_columns or [])
1178        ]
1179    ) if pipe.enforce else select_columns
1180
1181    if select_columns:
1182        dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns}
1183
1184    dtypes = {
1185        col: typ
1186        for col, typ in dtypes.items()
1187        if col in (select_columns or [col]) and col not in (omit_columns or [])
1188    } if pipe.enforce else {}
1189
1190    if debug:
1191        dprint(f"[{self}] `read()` dtypes:")
1192        mrsm.pprint(dtypes)
1193
1194    query = self.get_pipe_data_query(
1195        pipe,
1196        select_columns=select_columns,
1197        omit_columns=omit_columns,
1198        begin=begin,
1199        end=end,
1200        params=params,
1201        order=order,
1202        limit=limit,
1203        begin_add_minutes=begin_add_minutes,
1204        end_add_minutes=end_add_minutes,
1205        debug=debug,
1206        **kw
1207    )
1208
1209    read_kwargs = {}
1210    if is_dask:
1211        index_col = pipe.columns.get('datetime', None)
1212        read_kwargs['index_col'] = index_col
1213
1214    chunks = self.read(
1215        query,
1216        chunksize=chunksize,
1217        as_iterator=True,
1218        coerce_float=False,
1219        dtype=dtypes,
1220        debug=debug,
1221        **read_kwargs
1222    )
1223
1224    if as_iterator:
1225        return chunks
1226
1227    return pd.concat(chunks)

Access a pipe's data from the SQL instance.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get data from.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
  • end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
  • params (Optional[Dict[str, Any]], default None): Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
  • order (Optional[str], default 'asc'): The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
  • limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
  • begin_add_minutes (int, default 0): The number of minutes to add to the begin datetime (i.e. DATEADD).
  • end_add_minutes (int, default 0): The number of minutes to add to the end datetime (i.e. DATEADD).
  • chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
  • as_iterator (bool, default False): If True, return the chunks iterator directly.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the pipe's data.
def get_pipe_data_query( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: Optional[str] = 'asc', sort_datetimes: bool = False, limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, skip_existing_cols_check: bool = False, debug: bool = False, **kw: Any) -> Optional[str]:
1230def get_pipe_data_query(
1231    self,
1232    pipe: mrsm.Pipe,
1233    select_columns: Optional[List[str]] = None,
1234    omit_columns: Optional[List[str]] = None,
1235    begin: Union[datetime, int, str, None] = None,
1236    end: Union[datetime, int, str, None] = None,
1237    params: Optional[Dict[str, Any]] = None,
1238    order: Optional[str] = 'asc',
1239    sort_datetimes: bool = False,
1240    limit: Optional[int] = None,
1241    begin_add_minutes: int = 0,
1242    end_add_minutes: int = 0,
1243    replace_nulls: Optional[str] = None,
1244    skip_existing_cols_check: bool = False,
1245    debug: bool = False,
1246    **kw: Any
1247) -> Union[str, None]:
1248    """
1249    Return the `SELECT` query for retrieving a pipe's data from its instance.
1250
1251    Parameters
1252    ----------
1253    pipe: mrsm.Pipe:
1254        The pipe to get data from.
1255
1256    select_columns: Optional[List[str]], default None
1257        If provided, only select these given columns.
1258        Otherwise select all available columns (i.e. `SELECT *`).
1259
1260    omit_columns: Optional[List[str]], default None
1261        If provided, remove these columns from the selection.
1262
1263    begin: Union[datetime, int, str, None], default None
1264        If provided, get rows newer than or equal to this value.
1265
1266    end: Union[datetime, str, None], default None
1267        If provided, get rows older than or equal to this value.
1268
1269    params: Optional[Dict[str, Any]], default None
1270        Additional parameters to filter by.
1271        See `meerschaum.connectors.sql.build_where`.
1272
1273    order: Optional[str], default None
1274        The selection order for all of the indices in the query.
1275        If `None`, omit the `ORDER BY` clause.
1276
1277    sort_datetimes: bool, default False
1278        Alias for `order='desc'`.
1279
1280    limit: Optional[int], default None
1281        If specified, limit the number of rows retrieved to this value.
1282
1283    begin_add_minutes: int, default 0
1284        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`).
1285
1286    end_add_minutes: int, default 0
1287        The number of minutes to add to the `end` datetime (i.e. `DATEADD`).
1288
1289    chunksize: Optional[int], default -1
1290        The size of dataframe chunks to load into memory.
1291
1292    replace_nulls: Optional[str], default None
1293        If provided, replace null values with this value.
1294
1295    skip_existing_cols_check: bool, default False
1296        If `True`, do not verify that querying columns are actually on the table.
1297
1298    debug: bool, default False
1299        Verbosity toggle.
1300
1301    Returns
1302    -------
1303    A `SELECT` query to retrieve a pipe's data.
1304    """
1305    from meerschaum.utils.misc import items_str
1306    from meerschaum.utils.sql import sql_item_name, dateadd_str
1307    from meerschaum.utils.dtypes import coerce_timezone
1308    from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type, get_db_type_from_pd_type
1309
1310    dt_col = pipe.columns.get('datetime', None)
1311    existing_cols = pipe.get_columns_types(debug=debug) if pipe.enforce else []
1312    skip_existing_cols_check = skip_existing_cols_check or not pipe.enforce
1313    dt_typ = get_pd_type_from_db_type(existing_cols[dt_col]) if dt_col in existing_cols else None
1314    dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
1315    select_columns = (
1316        [col for col in existing_cols]
1317        if not select_columns
1318        else [col for col in select_columns if skip_existing_cols_check or col in existing_cols]
1319    )
1320    if omit_columns:
1321        select_columns = [col for col in select_columns if col not in omit_columns]
1322
1323    if order is None and sort_datetimes:
1324        order = 'desc'
1325
1326    if begin == '':
1327        begin = pipe.get_sync_time(debug=debug)
1328        backtrack_interval = pipe.get_backtrack_interval(debug=debug)
1329        if begin is not None:
1330            begin -= backtrack_interval
1331
1332    begin, end = pipe.parse_date_bounds(begin, end)
1333    if isinstance(begin, datetime) and dt_typ:
1334        begin = coerce_timezone(begin, strip_utc=('utc' not in dt_typ.lower()))
1335    if isinstance(end, datetime) and dt_typ:
1336        end = coerce_timezone(end, strip_utc=('utc' not in dt_typ.lower()))
1337
1338    cols_names = [
1339        sql_item_name(col, self.flavor, None)
1340        for col in select_columns
1341    ]
1342    select_cols_str = (
1343        'SELECT\n    '
1344        + ',\n    '.join(
1345            [
1346                (
1347                    col_name
1348                    if not replace_nulls
1349                    else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}"
1350                )
1351                for col_name in cols_names
1352            ]
1353        )
1354    ) if cols_names else 'SELECT *'
1355    pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
1356    query = f"{select_cols_str}\nFROM {pipe_table_name}"
1357    where = ""
1358
1359    if order is not None:
1360        default_order = 'asc'
1361        if order not in ('asc', 'desc'):
1362            warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.")
1363            order = default_order
1364        order = order.upper()
1365
1366    if not pipe.columns.get('datetime', None):
1367        _dt = pipe.guess_datetime()
1368        dt = sql_item_name(_dt, self.flavor, None) if _dt else None
1369        is_guess = True
1370    else:
1371        _dt = pipe.get_columns('datetime')
1372        dt = sql_item_name(_dt, self.flavor, None)
1373        is_guess = False
1374
1375    quoted_indices = {
1376        key: sql_item_name(val, self.flavor, None)
1377        for key, val in pipe.columns.items()
1378        if val in existing_cols or skip_existing_cols_check
1379    }
1380
1381    if begin is not None or end is not None:
1382        if is_guess:
1383            if _dt is None:
1384                warn(
1385                    f"No datetime could be determined for {pipe}."
1386                    + "\n    Ignoring begin and end...",
1387                    stack=False,
1388                )
1389                begin, end = None, None
1390            else:
1391                warn(
1392                    f"A datetime wasn't specified for {pipe}.\n"
1393                    + f"    Using column \"{_dt}\" for datetime bounds...",
1394                    stack=False,
1395                )
1396
1397    is_dt_bound = False
1398    if begin is not None and (_dt in existing_cols or skip_existing_cols_check):
1399        begin_da = dateadd_str(
1400            flavor=self.flavor,
1401            datepart='minute',
1402            number=begin_add_minutes,
1403            begin=begin,
1404            db_type=dt_db_type,
1405        )
1406        where += f"\n    {dt} >= {begin_da}" + ("\n    AND\n    " if end is not None else "")
1407        is_dt_bound = True
1408
1409    if end is not None and (_dt in existing_cols or skip_existing_cols_check):
1410        if 'int' in str(type(end)).lower() and end == begin:
1411            end += 1
1412        end_da = dateadd_str(
1413            flavor=self.flavor,
1414            datepart='minute',
1415            number=end_add_minutes,
1416            begin=end,
1417            db_type=dt_db_type,
1418        )
1419        where += f"{dt} <  {end_da}"
1420        is_dt_bound = True
1421
1422    if params is not None:
1423        from meerschaum.utils.sql import build_where
1424        valid_params = {
1425            k: v
1426            for k, v in params.items()
1427            if k in existing_cols or skip_existing_cols_check
1428        }
1429        if valid_params:
1430            where += '    ' + build_where(valid_params, self).lstrip().replace(
1431                'WHERE', ('    AND' if is_dt_bound else "    ")
1432            )
1433
1434    if len(where) > 0:
1435        query += "\nWHERE " + where
1436
1437    if order is not None:
1438        ### Sort by indices, starting with datetime.
1439        order_by = ""
1440        if quoted_indices:
1441            order_by += "\nORDER BY "
1442            if _dt and (_dt in existing_cols or skip_existing_cols_check):
1443                order_by += dt + ' ' + order + ','
1444            for key, quoted_col_name in quoted_indices.items():
1445                if dt == quoted_col_name:
1446                    continue
1447                order_by += ' ' + quoted_col_name + ' ' + order + ','
1448            order_by = order_by[:-1]
1449
1450        query += order_by
1451
1452    if isinstance(limit, int):
1453        if self.flavor == 'mssql':
1454            query = f'SELECT TOP {limit}\n' + query[len("SELECT "):]
1455        elif self.flavor == 'oracle':
1456            query = (
1457                f"SELECT * FROM (\n  {query}\n)\n"
1458                + f"WHERE ROWNUM IN ({', '.join([str(i) for i in range(1, limit+1)])})"
1459            )
1460        else:
1461            query += f"\nLIMIT {limit}"
1462
1463    if debug:
1464        to_print = (
1465            []
1466            + ([f"begin='{begin}'"] if begin else [])
1467            + ([f"end='{end}'"] if end else [])
1468            + ([f"params={params}"] if params else [])
1469        )
1470        dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False))
1471
1472    return query

Return the SELECT query for retrieving a pipe's data from its instance.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get data from.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, str, None], default None): If provided, get rows newer than or equal to this value.
  • end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
  • params (Optional[Dict[str, Any]], default None): Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
  • order (Optional[str], default None): The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
  • sort_datetimes (bool, default False): Alias for order='desc'.
  • limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
  • begin_add_minutes (int, default 0): The number of minutes to add to the begin datetime (i.e. DATEADD).
  • end_add_minutes (int, default 0): The number of minutes to add to the end datetime (i.e. DATEADD).
  • chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
  • replace_nulls (Optional[str], default None): If provided, replace null values with this value.
  • skip_existing_cols_check (bool, default False): If True, do not verify that querying columns are actually on the table.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SELECT query to retrieve a pipe's data.
def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
21def register_pipe(
22    self,
23    pipe: mrsm.Pipe,
24    debug: bool = False,
25) -> SuccessTuple:
26    """
27    Register a new pipe.
28    A pipe's attributes must be set before registering.
29    """
30    from meerschaum.utils.packages import attempt_import
31    from meerschaum.utils.sql import json_flavors
32
33    ### ensure pipes table exists
34    from meerschaum.connectors.sql.tables import get_tables
35    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
36
37    if pipe.get_id(debug=debug) is not None:
38        return False, f"{pipe} is already registered."
39
40    ### NOTE: if `parameters` is supplied in the Pipe constructor,
41    ###       then `pipe.parameters` will exist and not be fetched from the database.
42
43    ### 1. Prioritize the Pipe object's `parameters` first.
44    ###    E.g. if the user manually sets the `parameters` property
45    ###    or if the Pipe already exists
46    ###    (which shouldn't be able to be registered anyway but that's an issue for later).
47    parameters = None
48    try:
49        parameters = pipe.get_parameters(apply_symlinks=False)
50    except Exception as e:
51        if debug:
52            dprint(str(e))
53        parameters = None
54
55    ### ensure `parameters` is a dictionary
56    if parameters is None:
57        parameters = {}
58
59    import json
60    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
61    values = {
62        'connector_keys' : pipe.connector_keys,
63        'metric_key'     : pipe.metric_key,
64        'location_key'   : pipe.location_key,
65        'parameters'     : (
66            json.dumps(parameters)
67            if self.flavor not in json_flavors
68            else parameters
69        ),
70    }
71    query = sqlalchemy.insert(pipes_tbl).values(**values)
72    result = self.exec(query, debug=debug)
73    if result is None:
74        return False, f"Failed to register {pipe}."
75    return True, f"Successfully registered {pipe}."

Register a new pipe. A pipe's attributes must be set before registering.

def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 78def edit_pipe(
 79    self,
 80    pipe: mrsm.Pipe,
 81    patch: bool = False,
 82    debug: bool = False,
 83    **kw : Any
 84) -> SuccessTuple:
 85    """
 86    Persist a Pipe's parameters to its database.
 87
 88    Parameters
 89    ----------
 90    pipe: mrsm.Pipe, default None
 91        The pipe to be edited.
 92    patch: bool, default False
 93        If patch is `True`, update the existing parameters by cascading.
 94        Otherwise overwrite the parameters (default).
 95    debug: bool, default False
 96        Verbosity toggle.
 97    """
 98
 99    if pipe.id is None:
100        return False, f"{pipe} is not registered and cannot be edited."
101
102    from meerschaum.utils.packages import attempt_import
103    from meerschaum.utils.sql import json_flavors
104    if not patch:
105        parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {})
106    else:
107        from meerschaum import Pipe
108        from meerschaum.config._patch import apply_patch_to_config
109        original_parameters = Pipe(
110            pipe.connector_keys, pipe.metric_key, pipe.location_key,
111            mrsm_instance=pipe.instance_keys
112        ).get_parameters(apply_symlinks=False)
113        parameters = apply_patch_to_config(
114            original_parameters,
115            pipe._attributes['parameters']
116        )
117
118    ### ensure pipes table exists
119    from meerschaum.connectors.sql.tables import get_tables
120    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
121
122    import json
123    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
124
125    values = {
126        'parameters': (
127            json.dumps(parameters)
128            if self.flavor not in json_flavors
129            else parameters
130        ),
131    }
132    q = sqlalchemy.update(pipes_tbl).values(**values).where(
133        pipes_tbl.c.pipe_id == pipe.id
134    )
135
136    result = self.exec(q, debug=debug)
137    message = (
138        f"Successfully edited {pipe}."
139        if result is not None else f"Failed to edit {pipe}."
140    )
141    return (result is not None), message

Persist a Pipe's parameters to its database.

Parameters
  • pipe (mrsm.Pipe, default None): The pipe to be edited.
  • patch (bool, default False): If patch is True, update the existing parameters by cascading. Otherwise overwrite the parameters (default).
  • debug (bool, default False): Verbosity toggle.
def get_pipe_id(self, pipe: meerschaum.Pipe, debug: bool = False) -> Any:
1475def get_pipe_id(
1476    self,
1477    pipe: mrsm.Pipe,
1478    debug: bool = False,
1479) -> Any:
1480    """
1481    Get a Pipe's ID from the pipes table.
1482    """
1483    if pipe.temporary:
1484        return None
1485    from meerschaum.utils.packages import attempt_import
1486    sqlalchemy = attempt_import('sqlalchemy')
1487    from meerschaum.connectors.sql.tables import get_tables
1488    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
1489
1490    query = sqlalchemy.select(pipes_tbl.c.pipe_id).where(
1491        pipes_tbl.c.connector_keys == pipe.connector_keys
1492    ).where(
1493        pipes_tbl.c.metric_key == pipe.metric_key
1494    ).where(
1495        (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None
1496        else pipes_tbl.c.location_key.is_(None)
1497    )
1498    _id = self.value(query, debug=debug, silent=pipe.temporary)
1499    if _id is not None:
1500        _id = int(_id)
1501    return _id

Get a Pipe's ID from the pipes table.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
1504def get_pipe_attributes(
1505    self,
1506    pipe: mrsm.Pipe,
1507    debug: bool = False,
1508) -> Dict[str, Any]:
1509    """
1510    Get a Pipe's attributes dictionary.
1511    """
1512    from meerschaum.connectors.sql.tables import get_tables
1513    from meerschaum.utils.packages import attempt_import
1514    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
1515
1516    if pipe.get_id(debug=debug) is None:
1517        return {}
1518
1519    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
1520
1521    try:
1522        q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
1523        if debug:
1524            dprint(q)
1525        rows = (
1526            self.exec(q, silent=True, debug=debug).mappings().all()
1527            if self.flavor != 'duckdb'
1528            else self.read(q, debug=debug).to_dict(orient='records')
1529        )
1530        if not rows:
1531            return {}
1532        attributes = dict(rows[0])
1533    except Exception:
1534        warn(traceback.format_exc())
1535        return {}
1536
1537    ### handle non-PostgreSQL databases (text vs JSON)
1538    if not isinstance(attributes.get('parameters', None), dict):
1539        try:
1540            import json
1541            parameters = json.loads(attributes['parameters'])
1542            if isinstance(parameters, str) and parameters[0] == '{':
1543                parameters = json.loads(parameters)
1544            attributes['parameters'] = parameters
1545        except Exception:
1546            attributes['parameters'] = {}
1547
1548    return attributes

Get a Pipe's attributes dictionary.

def sync_pipe( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, str, Dict[Any, Any], None]' = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, _check_temporary_tables: bool = True, **kw: Any) -> Tuple[bool, str]:
1634def sync_pipe(
1635    self,
1636    pipe: mrsm.Pipe,
1637    df: Union[pd.DataFrame, str, Dict[Any, Any], None] = None,
1638    begin: Union[datetime, int, None] = None,
1639    end: Union[datetime, int, None] = None,
1640    chunksize: Optional[int] = -1,
1641    check_existing: bool = True,
1642    blocking: bool = True,
1643    debug: bool = False,
1644    _check_temporary_tables: bool = True,
1645    **kw: Any
1646) -> SuccessTuple:
1647    """
1648    Sync a pipe using a database connection.
1649
1650    Parameters
1651    ----------
1652    pipe: mrsm.Pipe
1653        The Meerschaum Pipe instance into which to sync the data.
1654
1655    df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
1656        An optional DataFrame or equivalent to sync into the pipe.
1657        Defaults to `None`.
1658
1659    begin: Union[datetime, int, None], default None
1660        Optionally specify the earliest datetime to search for data.
1661        Defaults to `None`.
1662
1663    end: Union[datetime, int, None], default None
1664        Optionally specify the latest datetime to search for data.
1665        Defaults to `None`.
1666
1667    chunksize: Optional[int], default -1
1668        Specify the number of rows to sync per chunk.
1669        If `-1`, resort to system configuration (default is `900`).
1670        A `chunksize` of `None` will sync all rows in one transaction.
1671        Defaults to `-1`.
1672
1673    check_existing: bool, default True
1674        If `True`, pull and diff with existing data from the pipe. Defaults to `True`.
1675
1676    blocking: bool, default True
1677        If `True`, wait for sync to finish and return its result, otherwise asyncronously sync.
1678        Defaults to `True`.
1679
1680    debug: bool, default False
1681        Verbosity toggle. Defaults to False.
1682
1683    kw: Any
1684        Catch-all for keyword arguments.
1685
1686    Returns
1687    -------
1688    A `SuccessTuple` of success (`bool`) and message (`str`).
1689    """
1690    from meerschaum.utils.packages import import_pandas
1691    from meerschaum.utils.sql import (
1692        get_update_queries,
1693        sql_item_name,
1694        UPDATE_QUERIES,
1695        get_reset_autoincrement_queries,
1696    )
1697    from meerschaum.utils.dtypes import get_current_timestamp
1698    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
1699    from meerschaum.utils.dataframe import get_special_cols
1700    from meerschaum import Pipe
1701    import time
1702    import copy
1703    pd = import_pandas()
1704    if df is None:
1705        msg = f"DataFrame is None. Cannot sync {pipe}."
1706        warn(msg)
1707        return False, msg
1708
1709    start = time.perf_counter()
1710    pipe_name = sql_item_name(pipe.target, self.flavor, schema=self.get_pipe_schema(pipe))
1711    dtypes = pipe.get_dtypes(debug=debug)
1712
1713    if not pipe.temporary and not pipe.get_id(debug=debug):
1714        register_tuple = pipe.register(debug=debug)
1715        if not register_tuple[0]:
1716            return register_tuple
1717
1718    ### df is the dataframe returned from the remote source
1719    ### via the connector
1720    if debug:
1721        dprint("Fetched data:\n" + str(df))
1722
1723    if not isinstance(df, pd.DataFrame):
1724        df = pipe.enforce_dtypes(
1725            df,
1726            chunksize=chunksize,
1727            safe_copy=kw.get('safe_copy', False),
1728            dtypes=dtypes,
1729            debug=debug,
1730        )
1731
1732    ### if table does not exist, create it with indices
1733    is_new = False
1734    if not pipe.exists(debug=debug):
1735        check_existing = False
1736        is_new = True
1737    else:
1738        ### Check for new columns.
1739        add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug)
1740        if add_cols_queries:
1741            pipe._clear_cache_key('_columns_types', debug=debug)
1742            pipe._clear_cache_key('_columns_indices', debug=debug)
1743            if not self.exec_queries(add_cols_queries, debug=debug):
1744                warn(f"Failed to add new columns to {pipe}.")
1745
1746        alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug)
1747        if alter_cols_queries:
1748            pipe._clear_cache_key('_columns_types', debug=debug)
1749            pipe._clear_cache_key('_columns_types', debug=debug)
1750            if not self.exec_queries(alter_cols_queries, debug=debug):
1751                warn(f"Failed to alter columns for {pipe}.")
1752
1753    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES
1754    if upsert:
1755        check_existing = False
1756    kw['safe_copy'] = kw.get('safe_copy', False)
1757
1758    unseen_df, update_df, delta_df = (
1759        pipe.filter_existing(
1760            df,
1761            chunksize=chunksize,
1762            debug=debug,
1763            **kw
1764        ) if check_existing else (df, None, df)
1765    )
1766    if upsert:
1767        unseen_df, update_df, delta_df = (df.head(0), df, df)
1768
1769    if debug:
1770        dprint("Delta data:\n" + str(delta_df))
1771        dprint("Unseen data:\n" + str(unseen_df))
1772        if update_df is not None:
1773            dprint(("Update" if not upsert else "Upsert") + " data:\n" + str(update_df))
1774
1775    if_exists = kw.get('if_exists', 'append')
1776    if 'if_exists' in kw:
1777        kw.pop('if_exists')
1778    if 'name' in kw:
1779        kw.pop('name')
1780
1781    ### Insert new data into the target table.
1782    unseen_kw = copy.deepcopy(kw)
1783    unseen_kw.update({
1784        'name': pipe.target,
1785        'if_exists': if_exists,
1786        'debug': debug,
1787        'as_dict': True,
1788        'safe_copy': kw.get('safe_copy', False),
1789        'chunksize': chunksize,
1790        'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True),
1791        'schema': self.get_pipe_schema(pipe),
1792    })
1793
1794    dt_col = pipe.columns.get('datetime', None)
1795    primary_key = pipe.columns.get('primary', None)
1796    autoincrement = (
1797        pipe.parameters.get('autoincrement', False)
1798        or (
1799            is_new
1800            and primary_key
1801            and primary_key
1802            not in dtypes
1803            and primary_key not in unseen_df.columns
1804        )
1805    )
1806    if autoincrement and autoincrement not in pipe.parameters:
1807        update_success, update_msg = pipe.update_parameters(
1808            {'autoincrement': autoincrement},
1809            debug=debug,
1810        )
1811        if not update_success:
1812            return update_success, update_msg
1813
1814    def _check_pk(_df_to_clear):
1815        if _df_to_clear is None:
1816            return
1817        if primary_key not in _df_to_clear.columns:
1818            return
1819        if not _df_to_clear[primary_key].notnull().any():
1820            del _df_to_clear[primary_key]
1821
1822    autoincrement_needs_reset = bool(
1823        autoincrement
1824        and primary_key
1825        and primary_key in unseen_df.columns
1826        and unseen_df[primary_key].notnull().any()
1827    )
1828    if autoincrement and primary_key:
1829        for _df_to_clear in (unseen_df, update_df, delta_df):
1830            _check_pk(_df_to_clear)
1831
1832    if is_new:
1833        create_success, create_msg = self.create_pipe_table_from_df(
1834            pipe,
1835            unseen_df,
1836            debug=debug,
1837        )
1838        if not create_success:
1839            return create_success, create_msg
1840
1841    do_identity_insert = bool(
1842        self.flavor in ('mssql',)
1843        and primary_key
1844        and primary_key in unseen_df.columns
1845        and autoincrement
1846    )
1847    stats = {'success': True, 'msg': ''}
1848    if len(unseen_df) > 0:
1849        with self.engine.connect() as connection:
1850            with connection.begin():
1851                if do_identity_insert:
1852                    identity_on_result = self.exec(
1853                        f"SET IDENTITY_INSERT {pipe_name} ON",
1854                        commit=False,
1855                        _connection=connection,
1856                        close=False,
1857                        debug=debug,
1858                    )
1859                    if identity_on_result is None:
1860                        return False, f"Could not enable identity inserts on {pipe}."
1861
1862                stats = self.to_sql(
1863                    unseen_df,
1864                    _connection=connection,
1865                    **unseen_kw
1866                )
1867
1868                if do_identity_insert:
1869                    identity_off_result = self.exec(
1870                        f"SET IDENTITY_INSERT {pipe_name} OFF",
1871                        commit=False,
1872                        _connection=connection,
1873                        close=False,
1874                        debug=debug,
1875                    )
1876                    if identity_off_result is None:
1877                        return False, f"Could not disable identity inserts on {pipe}."
1878
1879    if is_new:
1880        if not self.create_indices(pipe, debug=debug):
1881            warn(f"Failed to create indices for {pipe}. Continuing...")
1882
1883    if autoincrement_needs_reset:
1884        reset_autoincrement_queries = get_reset_autoincrement_queries(
1885            pipe.target,
1886            primary_key,
1887            self,
1888            schema=self.get_pipe_schema(pipe),
1889            debug=debug,
1890        )
1891        results = self.exec_queries(reset_autoincrement_queries, debug=debug)
1892        for result in results:
1893            if result is None:
1894                warn(f"Could not reset auto-incrementing primary key for {pipe}.", stack=False)
1895
1896    if update_df is not None and len(update_df) > 0:
1897        temp_target = self.get_temporary_target(
1898            pipe.target,
1899            label=('update' if not upsert else 'upsert'),
1900        )
1901        self._log_temporary_tables_creation(temp_target, create=(not pipe.temporary), debug=debug)
1902        update_dtypes = {
1903            **{
1904                col: str(typ)
1905                for col, typ in update_df.dtypes.items()
1906            },
1907            **get_special_cols(update_df)
1908        }
1909
1910        temp_pipe = Pipe(
1911            pipe.connector_keys.replace(':', '_') + '_', pipe.metric_key, pipe.location_key,
1912            instance=pipe.instance_keys,
1913            columns={
1914                (ix_key if ix_key != 'primary' else 'primary_'): ix
1915                for ix_key, ix in pipe.columns.items()
1916                if ix and ix in update_df.columns
1917            },
1918            dtypes=update_dtypes,
1919            target=temp_target,
1920            temporary=True,
1921            enforce=False,
1922            static=True,
1923            autoincrement=False,
1924            cache=False,
1925            parameters={
1926                'schema': self.internal_schema,
1927                'hypertable': False,
1928            },
1929        )
1930        _temp_columns_types = {
1931            col: get_db_type_from_pd_type(typ, self.flavor)
1932            for col, typ in update_dtypes.items()
1933        }
1934        temp_pipe._cache_value('_columns_types', _temp_columns_types, memory_only=True, debug=debug)
1935        temp_pipe._cache_value('_skip_check_indices', True, memory_only=True, debug=debug)
1936        now_ts = get_current_timestamp('ms', as_int=True) / 1000
1937        temp_pipe._cache_value('_columns_types_timestamp', now_ts, memory_only=True, debug=debug)
1938        temp_success, temp_msg = temp_pipe.sync(update_df, check_existing=False, debug=debug)
1939        if not temp_success:
1940            return temp_success, temp_msg
1941
1942        existing_cols = pipe.get_columns_types(debug=debug)
1943        join_cols = [
1944            col
1945            for col_key, col in pipe.columns.items()
1946            if col and col in existing_cols
1947        ] if not primary_key or self.flavor == 'oracle' else (
1948            [dt_col, primary_key]
1949            if (
1950                self.flavor in ('timescaledb', 'timescaledb-ha')
1951                and dt_col
1952                and dt_col in update_df.columns
1953            )
1954            else [primary_key]
1955        )
1956        update_queries = get_update_queries(
1957            pipe.target,
1958            temp_target,
1959            self,
1960            join_cols,
1961            upsert=upsert,
1962            schema=self.get_pipe_schema(pipe),
1963            patch_schema=self.internal_schema,
1964            target_cols_types=pipe.get_columns_types(debug=debug),
1965            patch_cols_types=_temp_columns_types,
1966            datetime_col=(dt_col if dt_col in update_df.columns else None),
1967            identity_insert=(autoincrement and primary_key in update_df.columns),
1968            null_indices=pipe.null_indices,
1969            cast_columns=pipe.enforce,
1970            debug=debug,
1971        )
1972        update_results = self.exec_queries(
1973            update_queries,
1974            break_on_error=True,
1975            rollback=True,
1976            debug=debug,
1977        )
1978        update_success = all(update_results)
1979        self._log_temporary_tables_creation(
1980            temp_target,
1981            ready_to_drop=True,
1982            create=(not pipe.temporary),
1983            debug=debug,
1984        )
1985        if not update_success:
1986            warn(f"Failed to apply update to {pipe}.")
1987        stats['success'] = stats['success'] and update_success
1988        stats['msg'] = (
1989            (stats.get('msg', '') + f'\nFailed to apply update to {pipe}.').lstrip()
1990            if not update_success
1991            else stats.get('msg', '')
1992        )
1993
1994    stop = time.perf_counter()
1995    success = stats['success']
1996    if not success:
1997        return success, stats['msg'] or str(stats)
1998
1999    unseen_count = len(unseen_df.index) if unseen_df is not None else 0
2000    update_count = len(update_df.index) if update_df is not None else 0
2001    msg = (
2002        (
2003            f"Inserted {unseen_count:,}, "
2004            + f"updated {update_count:,} rows."
2005        )
2006        if not upsert
2007        else (
2008            f"Upserted {update_count:,} row"
2009            + ('s' if update_count != 1 else '')
2010            + "."
2011        )
2012    )
2013    if debug:
2014        msg = msg[:-1] + (
2015            f"\non table {sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))}\n"
2016            + f"in {round(stop - start, 2)} seconds."
2017        )
2018
2019    if _check_temporary_tables:
2020        drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables(
2021            refresh=False, debug=debug
2022        )
2023        if not drop_stale_success:
2024            warn(drop_stale_msg)
2025
2026    return success, msg

Sync a pipe using a database connection.

Parameters
  • pipe (mrsm.Pipe): The Meerschaum Pipe instance into which to sync the data.
  • df (Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]): An optional DataFrame or equivalent to sync into the pipe. Defaults to None.
  • begin (Union[datetime, int, None], default None): Optionally specify the earliest datetime to search for data. Defaults to None.
  • end (Union[datetime, int, None], default None): Optionally specify the latest datetime to search for data. Defaults to None.
  • chunksize (Optional[int], default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe. Defaults to True.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): Catch-all for keyword arguments.
Returns
  • A SuccessTuple of success (bool) and message (str).
def sync_pipe_inplace( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
2029def sync_pipe_inplace(
2030    self,
2031    pipe: 'mrsm.Pipe',
2032    params: Optional[Dict[str, Any]] = None,
2033    begin: Union[datetime, int, None] = None,
2034    end: Union[datetime, int, None] = None,
2035    chunksize: Optional[int] = -1,
2036    check_existing: bool = True,
2037    debug: bool = False,
2038    **kw: Any
2039) -> SuccessTuple:
2040    """
2041    If a pipe's connector is the same as its instance connector,
2042    it's more efficient to sync the pipe in-place rather than reading data into Pandas.
2043
2044    Parameters
2045    ----------
2046    pipe: mrsm.Pipe
2047        The pipe whose connector is the same as its instance.
2048
2049    params: Optional[Dict[str, Any]], default None
2050        Optional params dictionary to build the `WHERE` clause.
2051        See `meerschaum.utils.sql.build_where`.
2052
2053    begin: Union[datetime, int, None], default None
2054        Optionally specify the earliest datetime to search for data.
2055        Defaults to `None`.
2056
2057    end: Union[datetime, int, None], default None
2058        Optionally specify the latest datetime to search for data.
2059        Defaults to `None`.
2060
2061    chunksize: Optional[int], default -1
2062        Specify the number of rows to sync per chunk.
2063        If `-1`, resort to system configuration (default is `900`).
2064        A `chunksize` of `None` will sync all rows in one transaction.
2065        Defaults to `-1`.
2066
2067    check_existing: bool, default True
2068        If `True`, pull and diff with existing data from the pipe.
2069
2070    debug: bool, default False
2071        Verbosity toggle.
2072
2073    Returns
2074    -------
2075    A SuccessTuple.
2076    """
2077    if self.flavor == 'duckdb':
2078        return pipe.sync(
2079            params=params,
2080            begin=begin,
2081            end=end,
2082            chunksize=chunksize,
2083            check_existing=check_existing,
2084            debug=debug,
2085            _inplace=False,
2086            **kw
2087        )
2088    from meerschaum.utils.sql import (
2089        sql_item_name,
2090        get_update_queries,
2091        get_null_replacement,
2092        get_create_table_queries,
2093        get_create_schema_if_not_exists_queries,
2094        get_table_cols_types,
2095        session_execute,
2096        dateadd_str,
2097        UPDATE_QUERIES,
2098    )
2099    from meerschaum.utils.dtypes.sql import (
2100        get_pd_type_from_db_type,
2101        get_db_type_from_pd_type,
2102    )
2103    from meerschaum.utils.misc import generate_password
2104
2105    transaction_id_length = (
2106        mrsm.get_config(
2107            'system', 'connectors', 'sql', 'instance', 'temporary_target', 'transaction_id_length'
2108        )
2109    )
2110    transact_id = generate_password(transaction_id_length)
2111
2112    internal_schema = self.internal_schema
2113    target = pipe.target
2114    temp_table_roots = ['backtrack', 'new', 'delta', 'joined', 'unseen', 'update']
2115    temp_tables = {
2116        table_root: self.get_temporary_target(target, transact_id=transact_id, label=table_root)
2117        for table_root in temp_table_roots
2118    }
2119    temp_table_names = {
2120        table_root: sql_item_name(table_name_raw, self.flavor, internal_schema)
2121        for table_root, table_name_raw in temp_tables.items()
2122    }
2123    temp_table_aliases = {
2124        table_root: sql_item_name(table_root, self.flavor)
2125        for table_root in temp_table_roots
2126    }
2127    table_alias_as = " AS" if self.flavor != 'oracle' else ''
2128    metadef = self.get_pipe_metadef(
2129        pipe,
2130        params=params,
2131        begin=begin,
2132        end=end,
2133        check_existing=check_existing,
2134        debug=debug,
2135    )
2136    pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
2137    upsert = pipe.parameters.get('upsert', False) and f'{self.flavor}-upsert' in UPDATE_QUERIES
2138    static = pipe.parameters.get('static', False)
2139    database = getattr(self, 'database', self.parse_uri(self.URI).get('database', None))
2140    primary_key = pipe.columns.get('primary', None)
2141    primary_key_typ = pipe.dtypes.get(primary_key, None) if primary_key else None
2142    primary_key_db_type = (
2143        get_db_type_from_pd_type(primary_key_typ, self.flavor)
2144        if primary_key_typ
2145        else None
2146    )
2147    autoincrement = pipe.parameters.get('autoincrement', False)
2148    dt_col = pipe.columns.get('datetime', None)
2149    dt_col_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
2150    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
2151    dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
2152
2153    def clean_up_temp_tables(ready_to_drop: bool = False):
2154        log_success, log_msg = self._log_temporary_tables_creation(
2155            [
2156                table
2157                for table in temp_tables.values()
2158            ] if not upsert else [temp_tables['update']],
2159            ready_to_drop=ready_to_drop,
2160            create=(not pipe.temporary),
2161            debug=debug,
2162        )
2163        if not log_success:
2164            warn(log_msg)
2165        drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables(
2166            refresh=False,
2167            debug=debug,
2168        )
2169        if not drop_stale_success:
2170            warn(drop_stale_msg)
2171        return drop_stale_success, drop_stale_msg
2172
2173    sqlalchemy, sqlalchemy_orm = mrsm.attempt_import(
2174        'sqlalchemy',
2175        'sqlalchemy.orm',
2176    )
2177    if not pipe.exists(debug=debug):
2178        schema = self.get_pipe_schema(pipe)
2179        create_pipe_queries = get_create_table_queries(
2180            metadef,
2181            pipe.target,
2182            self.flavor,
2183            schema=schema,
2184            primary_key=primary_key,
2185            primary_key_db_type=primary_key_db_type,
2186            autoincrement=autoincrement,
2187            datetime_column=dt_col,
2188        )
2189        if schema:
2190            create_pipe_queries = (
2191                get_create_schema_if_not_exists_queries(schema, self.flavor)
2192                + create_pipe_queries
2193            )
2194
2195        results = self.exec_queries(create_pipe_queries, debug=debug)
2196        if not all(results):
2197            _ = clean_up_temp_tables()
2198            return False, f"Could not insert new data into {pipe} from its SQL query definition."
2199
2200        if not self.create_indices(pipe, debug=debug):
2201            warn(f"Failed to create indices for {pipe}. Continuing...")
2202
2203        rowcount = pipe.get_rowcount(debug=debug)
2204        _ = clean_up_temp_tables()
2205        return True, f"Inserted {rowcount:,}, updated 0 rows."
2206
2207    session = sqlalchemy_orm.Session(self.engine)
2208    connectable = session if self.flavor != 'duckdb' else self
2209
2210    create_new_query = get_create_table_queries(
2211        metadef,
2212        temp_tables[('new') if not upsert else 'update'],
2213        self.flavor,
2214        schema=internal_schema,
2215    )[0]
2216    (create_new_success, create_new_msg), create_new_results = session_execute(
2217        session,
2218        create_new_query,
2219        with_results=True,
2220        debug=debug,
2221    )
2222    if not create_new_success:
2223        _ = clean_up_temp_tables()
2224        return create_new_success, create_new_msg
2225    new_count = create_new_results[0].rowcount if create_new_results else 0
2226
2227    new_cols_types = get_table_cols_types(
2228        temp_tables[('new' if not upsert else 'update')],
2229        connectable=connectable,
2230        flavor=self.flavor,
2231        schema=internal_schema,
2232        database=database,
2233        debug=debug,
2234    ) if not static else pipe.get_columns_types(debug=debug)
2235    if not new_cols_types:
2236        return False, f"Failed to get new columns for {pipe}."
2237
2238    new_cols = {
2239        str(col_name): get_pd_type_from_db_type(str(col_type))
2240        for col_name, col_type in new_cols_types.items()
2241    }
2242    new_cols_str = '\n    ' + ',\n    '.join([
2243        sql_item_name(col, self.flavor)
2244        for col in new_cols
2245    ])
2246    def get_col_typ(col: str, cols_types: Dict[str, str]) -> str:
2247        if self.flavor == 'oracle' and new_cols_types.get(col, '').lower() == 'char':
2248            return new_cols_types[col]
2249        return cols_types[col]
2250
2251    add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug)
2252    if add_cols_queries:
2253        pipe._clear_cache_key('_columns_types', debug=debug)
2254        pipe._clear_cache_key('_columns_indices', debug=debug)
2255        self.exec_queries(add_cols_queries, debug=debug)
2256
2257    alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug)
2258    if alter_cols_queries:
2259        pipe._clear_cache_key('_columns_types', debug=debug)
2260        self.exec_queries(alter_cols_queries, debug=debug)
2261
2262    insert_queries = [
2263        (
2264            f"INSERT INTO {pipe_name} ({new_cols_str})\n"
2265            f"SELECT {new_cols_str}\nFROM {temp_table_names['new']}{table_alias_as}"
2266            f" {temp_table_aliases['new']}"
2267        )
2268    ] if not check_existing and not upsert else []
2269
2270    new_queries = insert_queries
2271    new_success, new_msg = (
2272        session_execute(session, new_queries, debug=debug)
2273        if new_queries
2274        else (True, "Success")
2275    )
2276    if not new_success:
2277        _ = clean_up_temp_tables()
2278        return new_success, new_msg
2279
2280    if not check_existing:
2281        session.commit()
2282        _ = clean_up_temp_tables()
2283        return True, f"Inserted {new_count}, updated 0 rows."
2284
2285    min_dt_col_name_da = dateadd_str(
2286        flavor=self.flavor, begin=f"MIN({dt_col_name})", db_type=dt_db_type,
2287    )
2288    max_dt_col_name_da = dateadd_str(
2289        flavor=self.flavor, begin=f"MAX({dt_col_name})", db_type=dt_db_type,
2290    )
2291
2292    (new_dt_bounds_success, new_dt_bounds_msg), new_dt_bounds_results = session_execute(
2293        session,
2294        [
2295            "SELECT\n"
2296            f"    {min_dt_col_name_da} AS {sql_item_name('min_dt', self.flavor)},\n"
2297            f"    {max_dt_col_name_da} AS {sql_item_name('max_dt', self.flavor)}\n"
2298            f"FROM {temp_table_names['new' if not upsert else 'update']}\n"
2299            f"WHERE {dt_col_name} IS NOT NULL"
2300        ],
2301        with_results=True,
2302        debug=debug,
2303    ) if dt_col and not upsert else ((True, "Success"), None)
2304    if not new_dt_bounds_success:
2305        return (
2306            new_dt_bounds_success,
2307            f"Could not determine in-place datetime bounds:\n{new_dt_bounds_msg}"
2308        )
2309
2310    if dt_col and not upsert:
2311        begin, end = new_dt_bounds_results[0].fetchone()
2312
2313    backtrack_def = self.get_pipe_data_query(
2314        pipe,
2315        begin=begin,
2316        end=end,
2317        begin_add_minutes=0,
2318        end_add_minutes=1,
2319        params=params,
2320        debug=debug,
2321        order=None,
2322    )
2323    create_backtrack_query = get_create_table_queries(
2324        backtrack_def,
2325        temp_tables['backtrack'],
2326        self.flavor,
2327        schema=internal_schema,
2328    )[0]
2329    (create_backtrack_success, create_backtrack_msg), create_backtrack_results = session_execute(
2330        session,
2331        create_backtrack_query,
2332        with_results=True,
2333        debug=debug,
2334    ) if not upsert else ((True, "Success"), None)
2335
2336    if not create_backtrack_success:
2337        _ = clean_up_temp_tables()
2338        return create_backtrack_success, create_backtrack_msg
2339
2340    backtrack_cols_types = get_table_cols_types(
2341        temp_tables['backtrack'],
2342        connectable=connectable,
2343        flavor=self.flavor,
2344        schema=internal_schema,
2345        database=database,
2346        debug=debug,
2347    ) if not (upsert or static) else new_cols_types
2348
2349    common_cols = [col for col in new_cols if col in backtrack_cols_types]
2350    primary_key = pipe.columns.get('primary', None)
2351    on_cols = {
2352        col: new_cols.get(col)
2353        for col_key, col in pipe.columns.items()
2354        if (
2355            col
2356            and
2357            col_key != 'value'
2358            and col in backtrack_cols_types
2359            and col in new_cols
2360        )
2361    } if not primary_key or self.flavor == 'oracle' else {primary_key: new_cols.get(primary_key)}
2362
2363    null_replace_new_cols_str = (
2364        '\n    ' + ',\n    '.join([
2365            f"COALESCE({temp_table_aliases['new']}.{sql_item_name(col, self.flavor)}, "
2366            + get_null_replacement(get_col_typ(col, new_cols_types), self.flavor)
2367            + ") AS "
2368            + sql_item_name(col, self.flavor, None)
2369            for col, typ in new_cols.items()
2370        ])
2371    )
2372
2373    select_delta_query = (
2374        "SELECT"
2375        + null_replace_new_cols_str
2376        + f"\nFROM {temp_table_names['new']}{table_alias_as} {temp_table_aliases['new']}\n"
2377        + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as} {temp_table_aliases['backtrack']}"
2378        + "\n    ON\n    "
2379        + '\n    AND\n    '.join([
2380            (
2381                f"    COALESCE({temp_table_aliases['new']}."
2382                + sql_item_name(c, self.flavor, None)
2383                + ", "
2384                + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor)
2385                + ")"
2386                + '\n        =\n    '
2387                + f"    COALESCE({temp_table_aliases['backtrack']}."
2388                + sql_item_name(c, self.flavor, None)
2389                + ", "
2390                + get_null_replacement(get_col_typ(c, backtrack_cols_types), self.flavor)
2391                + ") "
2392            ) for c in common_cols
2393        ])
2394        + "\nWHERE\n    "
2395        + '\n    AND\n    '.join([
2396            (
2397                f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) + ' IS NULL'
2398            ) for c in common_cols
2399        ])
2400    )
2401    create_delta_query = get_create_table_queries(
2402        select_delta_query,
2403        temp_tables['delta'],
2404        self.flavor,
2405        schema=internal_schema,
2406    )[0]
2407    create_delta_success, create_delta_msg = session_execute(
2408        session,
2409        create_delta_query,
2410        debug=debug,
2411    ) if not upsert else (True, "Success")
2412    if not create_delta_success:
2413        _ = clean_up_temp_tables()
2414        return create_delta_success, create_delta_msg
2415
2416    delta_cols_types = get_table_cols_types(
2417        temp_tables['delta'],
2418        connectable=connectable,
2419        flavor=self.flavor,
2420        schema=internal_schema,
2421        database=database,
2422        debug=debug,
2423    ) if not (upsert or static) else new_cols_types
2424
2425    ### This is a weird bug on SQLite.
2426    ### Sometimes the backtrack dtypes are all empty strings.
2427    if not all(delta_cols_types.values()):
2428        delta_cols_types = new_cols_types
2429
2430    delta_cols = {
2431        col: get_pd_type_from_db_type(typ)
2432        for col, typ in delta_cols_types.items()
2433    }
2434    delta_cols_str = ', '.join([
2435        sql_item_name(col, self.flavor)
2436        for col in delta_cols
2437    ])
2438
2439    select_joined_query = (
2440        "SELECT\n    "
2441        + (',\n    '.join([
2442            (
2443                f"{temp_table_aliases['delta']}." + sql_item_name(c, self.flavor, None)
2444                + " AS " + sql_item_name(c + '_delta', self.flavor, None)
2445            ) for c in delta_cols
2446        ]))
2447        + ",\n    "
2448        + (',\n    '.join([
2449            (
2450                f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor, None)
2451                + " AS " + sql_item_name(c + '_backtrack', self.flavor, None)
2452            ) for c in backtrack_cols_types
2453        ]))
2454        + f"\nFROM {temp_table_names['delta']}{table_alias_as} {temp_table_aliases['delta']}\n"
2455        + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as}"
2456        + f" {temp_table_aliases['backtrack']}"
2457        + "\n    ON\n    "
2458        + '\n    AND\n    '.join([
2459            (
2460                f"    COALESCE({temp_table_aliases['delta']}." + sql_item_name(c, self.flavor)
2461                + ", "
2462                + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")"
2463                + '\n        =\n    '
2464                + f"    COALESCE({temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor)
2465                + ", "
2466                + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")"
2467            ) for c, typ in on_cols.items()
2468        ])
2469    )
2470
2471    create_joined_query = get_create_table_queries(
2472        select_joined_query,
2473        temp_tables['joined'],
2474        self.flavor,
2475        schema=internal_schema,
2476    )[0]
2477    create_joined_success, create_joined_msg = session_execute(
2478        session,
2479        create_joined_query,
2480        debug=debug,
2481    ) if on_cols and not upsert else (True, "Success")
2482    if not create_joined_success:
2483        _ = clean_up_temp_tables()
2484        return create_joined_success, create_joined_msg
2485
2486    select_unseen_query = (
2487        "SELECT\n    "
2488        + (',\n    '.join([
2489            (
2490                "CASE\n        WHEN " + sql_item_name(c + '_delta', self.flavor, None)
2491                + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor)
2492                + " THEN " + sql_item_name(c + '_delta', self.flavor, None)
2493                + "\n        ELSE NULL\n    END"
2494                + " AS " + sql_item_name(c, self.flavor, None)
2495            ) for c, typ in delta_cols.items()
2496        ]))
2497        + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n"
2498        + "WHERE\n    "
2499        + '\n    AND\n    '.join([
2500            (
2501                sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NULL'
2502            ) for c in delta_cols
2503        ])
2504    )
2505    create_unseen_query = get_create_table_queries(
2506        select_unseen_query,
2507        temp_tables['unseen'],
2508        self.flavor,
2509        internal_schema,
2510    )[0]
2511    (create_unseen_success, create_unseen_msg), create_unseen_results = session_execute(
2512        session,
2513        create_unseen_query,
2514        with_results=True,
2515        debug=debug
2516    ) if not upsert else ((True, "Success"), None)
2517    if not create_unseen_success:
2518        _ = clean_up_temp_tables()
2519        return create_unseen_success, create_unseen_msg
2520
2521    select_update_query = (
2522        "SELECT\n    "
2523        + (',\n    '.join([
2524            (
2525                "CASE\n        WHEN " + sql_item_name(c + '_delta', self.flavor, None)
2526                + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor)
2527                + " THEN " + sql_item_name(c + '_delta', self.flavor, None)
2528                + "\n        ELSE NULL\n    END"
2529                + " AS " + sql_item_name(c, self.flavor, None)
2530            ) for c, typ in delta_cols.items()
2531        ]))
2532        + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n"
2533        + "WHERE\n    "
2534        + '\n    OR\n    '.join([
2535            (
2536                sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NOT NULL'
2537            ) for c in delta_cols
2538        ])
2539    )
2540
2541    create_update_query = get_create_table_queries(
2542        select_update_query,
2543        temp_tables['update'],
2544        self.flavor,
2545        internal_schema,
2546    )[0]
2547    (create_update_success, create_update_msg), create_update_results = session_execute(
2548        session,
2549        create_update_query,
2550        with_results=True,
2551        debug=debug,
2552    ) if on_cols and not upsert else ((True, "Success"), [])
2553    apply_update_queries = (
2554        get_update_queries(
2555            pipe.target,
2556            temp_tables['update'],
2557            session,
2558            on_cols,
2559            upsert=upsert,
2560            schema=self.get_pipe_schema(pipe),
2561            patch_schema=internal_schema,
2562            target_cols_types=pipe.get_columns_types(debug=debug),
2563            patch_cols_types=delta_cols_types,
2564            datetime_col=pipe.columns.get('datetime', None),
2565            flavor=self.flavor,
2566            null_indices=pipe.null_indices,
2567            cast_columns=pipe.enforce,
2568            debug=debug,
2569        )
2570        if on_cols else []
2571    )
2572
2573    apply_unseen_queries = [
2574        (
2575            f"INSERT INTO {pipe_name} ({delta_cols_str})\n"
2576            + f"SELECT {delta_cols_str}\nFROM "
2577            + (
2578                temp_table_names['unseen']
2579                if on_cols
2580                else temp_table_names['delta']
2581            )
2582        ),
2583    ]
2584
2585    (apply_unseen_success, apply_unseen_msg), apply_unseen_results = session_execute(
2586        session,
2587        apply_unseen_queries,
2588        with_results=True,
2589        debug=debug,
2590    ) if not upsert else ((True, "Success"), None)
2591    if not apply_unseen_success:
2592        _ = clean_up_temp_tables()
2593        return apply_unseen_success, apply_unseen_msg
2594    unseen_count = apply_unseen_results[0].rowcount if apply_unseen_results else 0
2595
2596    (apply_update_success, apply_update_msg), apply_update_results = session_execute(
2597        session,
2598        apply_update_queries,
2599        with_results=True,
2600        debug=debug,
2601    )
2602    if not apply_update_success:
2603        _ = clean_up_temp_tables()
2604        return apply_update_success, apply_update_msg
2605    update_count = apply_update_results[0].rowcount if apply_update_results else 0
2606
2607    session.commit()
2608
2609    msg = (
2610        f"Inserted {unseen_count:,}, updated {update_count:,} rows."
2611        if not upsert
2612        else f"Upserted {update_count:,} row" + ('s' if update_count != 1 else '') + "."
2613    )
2614    _ = clean_up_temp_tables(ready_to_drop=True)
2615
2616    return True, msg

If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.

Parameters
  • pipe (mrsm.Pipe): The pipe whose connector is the same as its instance.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.
  • begin (Union[datetime, int, None], default None): Optionally specify the earliest datetime to search for data. Defaults to None.
  • end (Union[datetime, int, None], default None): Optionally specify the latest datetime to search for data. Defaults to None.
  • chunksize (Optional[int], default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, remote: bool = False, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
2619def get_sync_time(
2620    self,
2621    pipe: 'mrsm.Pipe',
2622    params: Optional[Dict[str, Any]] = None,
2623    newest: bool = True,
2624    remote: bool = False,
2625    debug: bool = False,
2626) -> Union[datetime, int, None]:
2627    """Get a Pipe's most recent datetime value.
2628
2629    Parameters
2630    ----------
2631    pipe: mrsm.Pipe
2632        The pipe to get the sync time for.
2633
2634    params: Optional[Dict[str, Any]], default None
2635        Optional params dictionary to build the `WHERE` clause.
2636        See `meerschaum.utils.sql.build_where`.
2637
2638    newest: bool, default True
2639        If `True`, get the most recent datetime (honoring `params`).
2640        If `False`, get the oldest datetime (ASC instead of DESC).
2641
2642    remote: bool, default False
2643        If `True`, return the sync time for the remote fetch definition.
2644
2645    Returns
2646    -------
2647    A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`.
2648    """
2649    from meerschaum.utils.sql import sql_item_name, build_where, wrap_query_with_cte
2650    src_name = sql_item_name('src', self.flavor)
2651    table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
2652
2653    dt_col = pipe.columns.get('datetime', None)
2654    if dt_col is None:
2655        return None
2656    dt_col_name = sql_item_name(dt_col, self.flavor, None)
2657
2658    if remote and pipe.connector.type != 'sql':
2659        warn(f"Cannot get the remote sync time for {pipe}.")
2660        return None
2661
2662    ASC_or_DESC = "DESC" if newest else "ASC"
2663    existing_cols = pipe.get_columns_types(debug=debug)
2664    valid_params = {}
2665    if params is not None:
2666        valid_params = {k: v for k, v in params.items() if k in existing_cols}
2667    flavor = self.flavor if not remote else pipe.connector.flavor
2668
2669    ### If no bounds are provided for the datetime column,
2670    ### add IS NOT NULL to the WHERE clause.
2671    if dt_col not in valid_params:
2672        valid_params[dt_col] = '_None'
2673    where = "" if not valid_params else build_where(valid_params, self)
2674    src_query = (
2675        f"SELECT {dt_col_name}\nFROM {table_name}{where}"
2676        if not remote
2677        else self.get_pipe_metadef(pipe, params=params, begin=None, end=None)
2678    )
2679
2680    base_query = (
2681        f"SELECT {dt_col_name}\n"
2682        f"FROM {src_name}{where}\n"
2683        f"ORDER BY {dt_col_name} {ASC_or_DESC}\n"
2684        f"LIMIT 1"
2685    )
2686    if self.flavor == 'mssql':
2687        base_query = (
2688            f"SELECT TOP 1 {dt_col_name}\n"
2689            f"FROM {src_name}{where}\n"
2690            f"ORDER BY {dt_col_name} {ASC_or_DESC}"
2691        )
2692    elif self.flavor == 'oracle':
2693        base_query = (
2694            "SELECT * FROM (\n"
2695            f"    SELECT {dt_col_name}\n"
2696            f"    FROM {src_name}{where}\n"
2697            f"    ORDER BY {dt_col_name} {ASC_or_DESC}\n"
2698            ") WHERE ROWNUM = 1"
2699        )
2700
2701    query = wrap_query_with_cte(src_query, base_query, flavor)
2702
2703    try:
2704        db_time = self.value(query, silent=True, debug=debug)
2705
2706        ### No datetime could be found.
2707        if db_time is None:
2708            return None
2709        ### sqlite returns str.
2710        if isinstance(db_time, str):
2711            dateutil_parser = mrsm.attempt_import('dateutil.parser')
2712            st = dateutil_parser.parse(db_time)
2713        ### Do nothing if a datetime object is returned.
2714        elif isinstance(db_time, datetime):
2715            if hasattr(db_time, 'to_pydatetime'):
2716                st = db_time.to_pydatetime()
2717            else:
2718                st = db_time
2719        ### Sometimes the datetime is actually a date.
2720        elif isinstance(db_time, date):
2721            st = datetime.combine(db_time, datetime.min.time())
2722        ### Adding support for an integer datetime axis.
2723        elif 'int' in str(type(db_time)).lower():
2724            st = int(db_time)
2725        ### Convert pandas timestamp to Python datetime.
2726        else:
2727            st = db_time.to_pydatetime()
2728
2729        sync_time = st
2730
2731    except Exception as e:
2732        sync_time = None
2733        warn(str(e))
2734
2735    return sync_time

Get a Pipe's most recent datetime value.

Parameters
  • pipe (mrsm.Pipe): The pipe to get the sync time for.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • remote (bool, default False): If True, return the sync time for the remote fetch definition.
Returns
  • A datetime object (or int if using an integer axis) if the pipe exists, otherwise None.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
2738def pipe_exists(
2739    self,
2740    pipe: mrsm.Pipe,
2741    debug: bool = False
2742) -> bool:
2743    """
2744    Check that a Pipe's table exists.
2745
2746    Parameters
2747    ----------
2748    pipe: mrsm.Pipe:
2749        The pipe to check.
2750
2751    debug: bool, default False
2752        Verbosity toggle.
2753
2754    Returns
2755    -------
2756    A `bool` corresponding to whether a pipe's table exists.
2757
2758    """
2759    from meerschaum.utils.sql import table_exists
2760    exists = table_exists(
2761        pipe.target,
2762        self,
2763        schema=self.get_pipe_schema(pipe),
2764        debug=debug,
2765    )
2766    if debug:
2767        dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.'))
2768    return exists

Check that a Pipe's table exists.

Parameters
  • pipe (mrsm.Pipe:): The pipe to check.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's table exists.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> Optional[int]:
2771def get_pipe_rowcount(
2772    self,
2773    pipe: mrsm.Pipe,
2774    begin: Union[datetime, int, None] = None,
2775    end: Union[datetime, int, None] = None,
2776    params: Optional[Dict[str, Any]] = None,
2777    remote: bool = False,
2778    debug: bool = False
2779) -> Union[int, None]:
2780    """
2781    Get the rowcount for a pipe in accordance with given parameters.
2782
2783    Parameters
2784    ----------
2785    pipe: mrsm.Pipe
2786        The pipe to query with.
2787
2788    begin: Union[datetime, int, None], default None
2789        The begin datetime value.
2790
2791    end: Union[datetime, int, None], default None
2792        The end datetime value.
2793
2794    params: Optional[Dict[str, Any]], default None
2795        See `meerschaum.utils.sql.build_where`.
2796
2797    remote: bool, default False
2798        If `True`, get the rowcount for the remote table.
2799
2800    debug: bool, default False
2801        Verbosity toggle.
2802
2803    Returns
2804    -------
2805    An `int` for the number of rows if the `pipe` exists, otherwise `None`.
2806
2807    """
2808    from meerschaum.utils.sql import dateadd_str, sql_item_name, wrap_query_with_cte, build_where
2809    from meerschaum.connectors.sql._fetch import get_pipe_query
2810    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
2811    if remote:
2812        msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount."
2813        if 'fetch' not in pipe.parameters:
2814            error(msg)
2815            return None
2816        if 'definition' not in pipe.parameters['fetch']:
2817            error(msg)
2818            return None
2819
2820    flavor = self.flavor if not remote else pipe.connector.flavor
2821    conn = self if not remote else pipe.connector
2822    _pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe))
2823    dt_col = pipe.columns.get('datetime', None)
2824    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
2825    dt_db_type = get_db_type_from_pd_type(dt_typ, flavor) if dt_typ else None
2826    if not dt_col:
2827        dt_col = pipe.guess_datetime()
2828        dt_name = sql_item_name(dt_col, flavor, None) if dt_col else None
2829        is_guess = True
2830    else:
2831        dt_col = pipe.get_columns('datetime')
2832        dt_name = sql_item_name(dt_col, flavor, None)
2833        is_guess = False
2834
2835    if begin is not None or end is not None:
2836        if is_guess:
2837            if dt_col is None:
2838                warn(
2839                    f"No datetime could be determined for {pipe}."
2840                    + "\n    Ignoring begin and end...",
2841                    stack=False,
2842                )
2843                begin, end = None, None
2844            else:
2845                warn(
2846                    f"A datetime wasn't specified for {pipe}.\n"
2847                    + f"    Using column \"{dt_col}\" for datetime bounds...",
2848                    stack=False,
2849                )
2850
2851
2852    _datetime_name = sql_item_name(dt_col, flavor)
2853    _cols_names = [
2854        sql_item_name(col, flavor)
2855        for col in set(
2856            (
2857                [dt_col]
2858                if dt_col
2859                else []
2860            ) + (
2861                []
2862                if params is None
2863                else list(params.keys())
2864            )
2865        )
2866    ]
2867    if not _cols_names:
2868        _cols_names = ['*']
2869
2870    src = (
2871        f"SELECT {', '.join(_cols_names)}\nFROM {_pipe_name}"
2872        if not remote
2873        else get_pipe_query(pipe)
2874    )
2875    parent_query = f"SELECT COUNT(*)\nFROM {sql_item_name('src', flavor)}"
2876    query = wrap_query_with_cte(src, parent_query, flavor)
2877    if begin is not None or end is not None:
2878        query += "\nWHERE"
2879    if begin is not None:
2880        query += (
2881            f"\n    {dt_name} >= "
2882            + dateadd_str(flavor, datepart='minute', number=0, begin=begin, db_type=dt_db_type)
2883        )
2884    if end is not None and begin is not None:
2885        query += "\n    AND"
2886    if end is not None:
2887        query += (
2888            f"\n    {dt_name} <  "
2889            + dateadd_str(flavor, datepart='minute', number=0, begin=end, db_type=dt_db_type)
2890        )
2891    if params is not None:
2892        existing_cols = pipe.get_columns_types(debug=debug)
2893        valid_params = {k: v for k, v in params.items() if k in existing_cols}
2894        if valid_params:
2895            query += build_where(valid_params, conn).replace('WHERE', (
2896                'AND' if (begin is not None or end is not None)
2897                    else 'WHERE'
2898                )
2899            )
2900
2901    result = conn.value(query, debug=debug, silent=True)
2902    try:
2903        return int(result)
2904    except Exception:
2905        return None

Get the rowcount for a pipe in accordance with given parameters.

Parameters
  • pipe (mrsm.Pipe): The pipe to query with.
  • begin (Union[datetime, int, None], default None): The begin datetime value.
  • end (Union[datetime, int, None], default None): The end datetime value.
  • params (Optional[Dict[str, Any]], default None): See meerschaum.utils.sql.build_where.
  • remote (bool, default False): If True, get the rowcount for the remote table.
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int for the number of rows if the pipe exists, otherwise None.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw) -> Tuple[bool, str]:
2908def drop_pipe(
2909    self,
2910    pipe: mrsm.Pipe,
2911    debug: bool = False,
2912    **kw
2913) -> SuccessTuple:
2914    """
2915    Drop a pipe's tables but maintain its registration.
2916
2917    Parameters
2918    ----------
2919    pipe: mrsm.Pipe
2920        The pipe to drop.
2921
2922    Returns
2923    -------
2924    A `SuccessTuple` indicated success.
2925    """
2926    from meerschaum.utils.sql import table_exists, sql_item_name, DROP_IF_EXISTS_FLAVORS
2927    success = True
2928    target = pipe.target
2929    schema = self.get_pipe_schema(pipe)
2930    target_name = (
2931        sql_item_name(target, self.flavor, schema)
2932    )
2933    if table_exists(target, self, schema=schema, debug=debug):
2934        if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
2935        success = self.exec(
2936            f"DROP TABLE {if_exists_str} {target_name}", silent=True, debug=debug
2937        ) is not None
2938
2939    msg = "Success" if success else f"Failed to drop {pipe}."
2940    return success, msg

Drop a pipe's tables but maintain its registration.

Parameters
  • pipe (mrsm.Pipe): The pipe to drop.
Returns
  • A SuccessTuple indicated success.
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
2943def clear_pipe(
2944    self,
2945    pipe: mrsm.Pipe,
2946    begin: Union[datetime, int, None] = None,
2947    end: Union[datetime, int, None] = None,
2948    params: Optional[Dict[str, Any]] = None,
2949    debug: bool = False,
2950    **kw
2951) -> SuccessTuple:
2952    """
2953    Delete a pipe's data within a bounded or unbounded interval without dropping the table.
2954
2955    Parameters
2956    ----------
2957    pipe: mrsm.Pipe
2958        The pipe to clear.
2959        
2960    begin: Union[datetime, int, None], default None
2961        Beginning datetime. Inclusive.
2962
2963    end: Union[datetime, int, None], default None
2964         Ending datetime. Exclusive.
2965
2966    params: Optional[Dict[str, Any]], default None
2967         See `meerschaum.utils.sql.build_where`.
2968
2969    """
2970    if not pipe.exists(debug=debug):
2971        return True, f"{pipe} does not exist, so nothing was cleared."
2972
2973    from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str
2974    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
2975    pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
2976
2977    dt_col = pipe.columns.get('datetime', None)
2978    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
2979    dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
2980    if not pipe.columns.get('datetime', None):
2981        dt_col = pipe.guess_datetime()
2982        dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
2983        is_guess = True
2984    else:
2985        dt_col = pipe.get_columns('datetime')
2986        dt_name = sql_item_name(dt_col, self.flavor, None)
2987        is_guess = False
2988
2989    if begin is not None or end is not None:
2990        if is_guess:
2991            if dt_col is None:
2992                warn(
2993                    f"No datetime could be determined for {pipe}."
2994                    + "\n    Ignoring datetime bounds...",
2995                    stack=False,
2996                )
2997                begin, end = None, None
2998            else:
2999                warn(
3000                    f"A datetime wasn't specified for {pipe}.\n"
3001                    + f"    Using column \"{dt_col}\" for datetime bounds...",
3002                    stack=False,
3003                )
3004
3005    valid_params = {}
3006    if params is not None:
3007        existing_cols = pipe.get_columns_types(debug=debug)
3008        valid_params = {k: v for k, v in params.items() if k in existing_cols}
3009    clear_query = (
3010        f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n"
3011        + ('\n    AND ' + build_where(valid_params, self, with_where=False) if valid_params else '')
3012        + (
3013            (
3014                f'\n    AND {dt_name} >= '
3015                + dateadd_str(self.flavor, 'day', 0, begin, db_type=dt_db_type)
3016            )
3017            if begin is not None
3018            else ''
3019        ) + (
3020            (
3021                f'\n    AND {dt_name} <  '
3022                + dateadd_str(self.flavor, 'day', 0, end, db_type=dt_db_type)
3023            )
3024            if end is not None
3025            else ''
3026        )
3027    )
3028    success = self.exec(clear_query, silent=True, debug=debug) is not None
3029    msg = "Success" if success else f"Failed to clear {pipe}."
3030    return success, msg

Delete a pipe's data within a bounded or unbounded interval without dropping the table.

Parameters
  • pipe (mrsm.Pipe): The pipe to clear.
  • begin (Union[datetime, int, None], default None): Beginning datetime. Inclusive.
  • end (Union[datetime, int, None], default None): Ending datetime. Exclusive.
  • params (Optional[Dict[str, Any]], default None): See meerschaum.utils.sql.build_where.
def deduplicate_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
3668def deduplicate_pipe(
3669    self,
3670    pipe: mrsm.Pipe,
3671    begin: Union[datetime, int, None] = None,
3672    end: Union[datetime, int, None] = None,
3673    params: Optional[Dict[str, Any]] = None,
3674    debug: bool = False,
3675    **kwargs: Any
3676) -> SuccessTuple:
3677    """
3678    Delete duplicate values within a pipe's table.
3679
3680    Parameters
3681    ----------
3682    pipe: mrsm.Pipe
3683        The pipe whose table to deduplicate.
3684
3685    begin: Union[datetime, int, None], default None
3686        If provided, only deduplicate values greater than or equal to this value.
3687
3688    end: Union[datetime, int, None], default None
3689        If provided, only deduplicate values less than this value.
3690
3691    params: Optional[Dict[str, Any]], default None
3692        If provided, further limit deduplication to values which match this query dictionary.
3693
3694    debug: bool, default False
3695        Verbosity toggle.
3696
3697    Returns
3698    -------
3699    A `SuccessTuple` indicating success.
3700    """
3701    from meerschaum.utils.sql import (
3702        sql_item_name,
3703        get_rename_table_queries,
3704        DROP_IF_EXISTS_FLAVORS,
3705        get_create_table_query,
3706        format_cte_subquery,
3707        get_null_replacement,
3708    )
3709    from meerschaum.utils.misc import generate_password, flatten_list
3710
3711    pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
3712
3713    if not pipe.exists(debug=debug):
3714        return False, f"Table {pipe_table_name} does not exist."
3715
3716    dt_col = pipe.columns.get('datetime', None)
3717    cols_types = pipe.get_columns_types(debug=debug)
3718    existing_cols = pipe.get_columns_types(debug=debug)
3719
3720    get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}"
3721    old_rowcount = self.value(get_rowcount_query, debug=debug)
3722    if old_rowcount is None:
3723        return False, f"Failed to get rowcount for table {pipe_table_name}."
3724
3725    ### Non-datetime indices that in fact exist.
3726    indices = [
3727        col
3728        for key, col in pipe.columns.items()
3729        if col and col != dt_col and col in cols_types
3730    ]
3731    indices_names = [sql_item_name(index_col, self.flavor, None) for index_col in indices]
3732    existing_cols_names = [sql_item_name(col, self.flavor, None) for col in existing_cols]
3733    duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor, None)
3734    previous_row_number_name = sql_item_name('prev_row_num', self.flavor, None)
3735
3736    index_list_str = (
3737        sql_item_name(dt_col, self.flavor, None)
3738        if dt_col
3739        else ''
3740    )
3741    index_list_str_ordered = (
3742        (
3743            sql_item_name(dt_col, self.flavor, None) + " DESC"
3744        )
3745        if dt_col
3746        else ''
3747    )
3748    if indices:
3749        index_list_str += ', ' + ', '.join(indices_names)
3750        index_list_str_ordered += ', ' + ', '.join(indices_names)
3751    if index_list_str.startswith(','):
3752        index_list_str = index_list_str.lstrip(',').lstrip()
3753    if index_list_str_ordered.startswith(','):
3754        index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip()
3755
3756    cols_list_str = ', '.join(existing_cols_names)
3757
3758    try:
3759        ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()).
3760        is_old_mysql = (
3761            self.flavor in ('mysql', 'mariadb')
3762            and
3763            int(self.db_version.split('.')[0]) < 8
3764        )
3765    except Exception:
3766        is_old_mysql = False
3767
3768    src_query = f"""
3769        SELECT
3770            {cols_list_str},
3771            ROW_NUMBER() OVER (
3772                PARTITION BY
3773                {index_list_str}
3774                ORDER BY {index_list_str_ordered}
3775            ) AS {duplicate_row_number_name}
3776        FROM {pipe_table_name}
3777    """
3778    duplicates_cte_subquery = format_cte_subquery(
3779        src_query,
3780        self.flavor,
3781        sub_name = 'src',
3782        cols_to_select = cols_list_str,
3783    ) + f"""
3784        WHERE {duplicate_row_number_name} = 1
3785        """
3786    old_mysql_query = (
3787        f"""
3788        SELECT
3789            {index_list_str}
3790        FROM (
3791          SELECT
3792            {index_list_str},
3793            IF(
3794                @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')},
3795                @{duplicate_row_number_name} := 0,
3796                @{duplicate_row_number_name}
3797            ),
3798            @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')},
3799            @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """
3800        + f"""{duplicate_row_number_name}
3801          FROM
3802            {pipe_table_name},
3803            (
3804                SELECT @{duplicate_row_number_name} := 0
3805            ) AS {duplicate_row_number_name},
3806            (
3807                SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}'
3808            ) AS {previous_row_number_name}
3809          ORDER BY {index_list_str_ordered}
3810        ) AS t
3811        WHERE {duplicate_row_number_name} = 1
3812        """
3813    )
3814    if is_old_mysql:
3815        duplicates_cte_subquery = old_mysql_query
3816
3817    session_id = generate_password(3)
3818
3819    dedup_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='dedup')
3820    temp_old_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='old')
3821    temp_old_table_name = sql_item_name(temp_old_table, self.flavor, self.get_pipe_schema(pipe))
3822
3823    create_temporary_table_query = get_create_table_query(
3824        duplicates_cte_subquery,
3825        dedup_table,
3826        self.flavor,
3827    ) + f"""
3828    ORDER BY {index_list_str_ordered}
3829    """
3830    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
3831    alter_queries = flatten_list([
3832        get_rename_table_queries(
3833            pipe.target,
3834            temp_old_table,
3835            self.flavor,
3836            schema=self.get_pipe_schema(pipe),
3837        ),
3838        get_rename_table_queries(
3839            dedup_table,
3840            pipe.target,
3841            self.flavor,
3842            schema=self.get_pipe_schema(pipe),
3843        ),
3844        f"DROP TABLE {if_exists_str} {temp_old_table_name}",
3845    ])
3846
3847    self._log_temporary_tables_creation(temp_old_table, create=(not pipe.temporary), debug=debug)
3848    create_temporary_result = self.execute(create_temporary_table_query, debug=debug)
3849    if create_temporary_result is None:
3850        return False, f"Failed to deduplicate table {pipe_table_name}."
3851
3852    results = self.exec_queries(
3853        alter_queries,
3854        break_on_error=True,
3855        rollback=True,
3856        debug=debug,
3857    )
3858
3859    fail_query = None
3860    for result, query in zip(results, alter_queries):
3861        if result is None:
3862            fail_query = query
3863            break
3864    success = fail_query is None
3865
3866    new_rowcount = (
3867        self.value(get_rowcount_query, debug=debug)
3868        if success
3869        else None
3870    )
3871
3872    msg = (
3873        (
3874            f"Successfully deduplicated table {pipe_table_name}"
3875            + (
3876                f"\nfrom {old_rowcount:,} to {new_rowcount:,} rows"
3877                if old_rowcount != new_rowcount
3878                else ''
3879            ) + '.'
3880        )
3881        if success
3882        else f"Failed to execute query:\n{fail_query}"
3883    )
3884    return success, msg

Delete duplicate values within a pipe's table.

Parameters
  • pipe (mrsm.Pipe): The pipe whose table to deduplicate.
  • begin (Union[datetime, int, None], default None): If provided, only deduplicate values greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If provided, only deduplicate values less than this value.
  • params (Optional[Dict[str, Any]], default None): If provided, further limit deduplication to values which match this query dictionary.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple indicating success.
def get_pipe_table( self, pipe: meerschaum.Pipe, debug: bool = False) -> "Union['sqlalchemy.Table', None]":
3033def get_pipe_table(
3034    self,
3035    pipe: mrsm.Pipe,
3036    debug: bool = False,
3037) -> Union['sqlalchemy.Table', None]:
3038    """
3039    Return the `sqlalchemy.Table` object for a `mrsm.Pipe`.
3040
3041    Parameters
3042    ----------
3043    pipe: mrsm.Pipe:
3044        The pipe in question.
3045
3046    Returns
3047    -------
3048    A `sqlalchemy.Table` object. 
3049
3050    """
3051    from meerschaum.utils.sql import get_sqlalchemy_table
3052    if not pipe.exists(debug=debug):
3053        return None
3054
3055    return get_sqlalchemy_table(
3056        pipe.target,
3057        connector=self,
3058        schema=self.get_pipe_schema(pipe),
3059        debug=debug,
3060        refresh=True,
3061    )

Return the sqlalchemy.Table object for a mrsm.Pipe.

Parameters
  • pipe (mrsm.Pipe:): The pipe in question.
Returns
  • A sqlalchemy.Table object.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, str]:
3064def get_pipe_columns_types(
3065    self,
3066    pipe: mrsm.Pipe,
3067    debug: bool = False,
3068) -> Dict[str, str]:
3069    """
3070    Get the pipe's columns and types.
3071
3072    Parameters
3073    ----------
3074    pipe: mrsm.Pipe:
3075        The pipe to get the columns for.
3076
3077    Returns
3078    -------
3079    A dictionary of columns names (`str`) and types (`str`).
3080
3081    Examples
3082    --------
3083    >>> conn.get_pipe_columns_types(pipe)
3084    {
3085      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
3086      'id': 'BIGINT',
3087      'val': 'DOUBLE PRECISION',
3088    }
3089    >>> 
3090    """
3091    from meerschaum.utils.sql import get_table_cols_types
3092    if not pipe.exists(debug=debug):
3093        return {}
3094
3095    if self.flavor not in ('oracle', 'mysql', 'mariadb', 'sqlite', 'geopackage'):
3096        return get_table_cols_types(
3097            pipe.target,
3098            self,
3099            flavor=self.flavor,
3100            schema=self.get_pipe_schema(pipe),
3101            debug=debug,
3102        )
3103
3104    if debug:
3105        dprint(f"Fetching columns_types for {pipe} with via SQLAlchemy table.")
3106
3107    table_columns = {}
3108    try:
3109        pipe_table = self.get_pipe_table(pipe, debug=debug)
3110        if pipe_table is None:
3111            return {}
3112
3113        if debug:
3114            dprint("Found columns:")
3115            mrsm.pprint(dict(pipe_table.columns))
3116
3117        for col in pipe_table.columns:
3118            table_columns[str(col.name)] = str(col.type)
3119    except Exception as e:
3120        traceback.print_exc()
3121        warn(e)
3122        table_columns = {}
3123
3124    return table_columns

Get the pipe's columns and types.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get the columns for.
Returns
  • A dictionary of columns names (str) and types (str).
Examples
>>> conn.get_pipe_columns_types(pipe)
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_to_sql_dtype( self, pipe: meerschaum.Pipe, df: "'pd.DataFrame'", update_dtypes: bool = True) -> "Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']":
3614def get_to_sql_dtype(
3615    self,
3616    pipe: 'mrsm.Pipe',
3617    df: 'pd.DataFrame',
3618    update_dtypes: bool = True,
3619) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']:
3620    """
3621    Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`.
3622
3623    Parameters
3624    ----------
3625    pipe: mrsm.Pipe
3626        The pipe which may contain a `dtypes` parameter.
3627
3628    df: pd.DataFrame
3629        The DataFrame to be pushed via `to_sql()`.
3630
3631    update_dtypes: bool, default True
3632        If `True`, patch the pipe's dtypes onto the DataFrame's dtypes.
3633
3634    Returns
3635    -------
3636    A dictionary with `sqlalchemy` datatypes.
3637
3638    Examples
3639    --------
3640    >>> import pandas as pd
3641    >>> import meerschaum as mrsm
3642    >>> 
3643    >>> conn = mrsm.get_connector('sql:memory')
3644    >>> df = pd.DataFrame([{'a': {'b': 1}}])
3645    >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
3646    >>> get_to_sql_dtype(pipe, df)
3647    {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
3648    """
3649    from meerschaum.utils.dataframe import get_special_cols
3650    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
3651    df_dtypes = {
3652        col: str(typ)
3653        for col, typ in df.dtypes.items()
3654    }
3655    special_cols = get_special_cols(df)
3656    df_dtypes.update(special_cols)
3657
3658    if update_dtypes:
3659        df_dtypes.update(pipe.dtypes)
3660
3661    return {
3662        col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True)
3663        for col, typ in df_dtypes.items()
3664        if col and typ
3665    }

Given a pipe and DataFrame, return the dtype dictionary for to_sql().

Parameters
  • pipe (mrsm.Pipe): The pipe which may contain a dtypes parameter.
  • df (pd.DataFrame): The DataFrame to be pushed via to_sql().
  • update_dtypes (bool, default True): If True, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
  • A dictionary with sqlalchemy datatypes.
Examples
>>> import pandas as pd
>>> import meerschaum as mrsm
>>> 
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
def get_pipe_schema(self, pipe: meerschaum.Pipe) -> Optional[str]:
3887def get_pipe_schema(self, pipe: mrsm.Pipe) -> Union[str, None]:
3888    """
3889    Return the schema to use for this pipe.
3890    First check `pipe.parameters['schema']`, then check `self.schema`.
3891
3892    Parameters
3893    ----------
3894    pipe: mrsm.Pipe
3895        The pipe which may contain a configured schema.
3896
3897    Returns
3898    -------
3899    A schema string or `None` if nothing is configured.
3900    """
3901    if self.flavor in ('sqlite', 'geopackage'):
3902        return self.schema
3903    return pipe.parameters.get('schema', self.schema)

Return the schema to use for this pipe. First check pipe.parameters['schema'], then check self.schema.

Parameters
  • pipe (mrsm.Pipe): The pipe which may contain a configured schema.
Returns
  • A schema string or None if nothing is configured.
def create_pipe_table_from_df( self, pipe: meerschaum.Pipe, df: "'pd.DataFrame'", debug: bool = False) -> Tuple[bool, str]:
1551def create_pipe_table_from_df(
1552    self,
1553    pipe: mrsm.Pipe,
1554    df: 'pd.DataFrame',
1555    debug: bool = False,
1556) -> mrsm.SuccessTuple:
1557    """
1558    Create a pipe's table from its configured dtypes and an incoming dataframe.
1559    """
1560    from meerschaum.utils.dataframe import get_special_cols
1561    from meerschaum.utils.sql import (
1562        get_create_table_queries,
1563        sql_item_name,
1564        get_create_schema_if_not_exists_queries,
1565    )
1566    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
1567    if self.flavor == 'geopackage':
1568        init_success, init_msg = self._init_geopackage_table(df, pipe.target, debug=debug)
1569        if not init_success:
1570            return init_success, init_msg
1571
1572    primary_key = pipe.columns.get('primary', None)
1573    primary_key_typ = (
1574        pipe.dtypes.get(primary_key, str(df.dtypes.get(primary_key, 'int')))
1575        if primary_key
1576        else None
1577    )
1578    primary_key_db_type = (
1579        get_db_type_from_pd_type(primary_key_typ, self.flavor)
1580        if primary_key
1581        else None
1582    )
1583    dt_col = pipe.columns.get('datetime', None)
1584    new_dtypes = {
1585        **{
1586            col: str(typ)
1587            for col, typ in df.dtypes.items()
1588        },
1589        **{
1590            col: str(df.dtypes.get(col, 'int'))
1591            for col_ix, col in pipe.columns.items()
1592            if col and col_ix != 'primary'
1593        },
1594        **get_special_cols(df),
1595        **pipe.dtypes
1596    }
1597    autoincrement = (
1598        pipe.parameters.get('autoincrement', False)
1599        or (primary_key and primary_key not in new_dtypes)
1600    )
1601    if autoincrement:
1602        _ = new_dtypes.pop(primary_key, None)
1603
1604    schema = self.get_pipe_schema(pipe)
1605    create_table_queries = get_create_table_queries(
1606        new_dtypes,
1607        pipe.target,
1608        self.flavor,
1609        schema=schema,
1610        primary_key=primary_key,
1611        primary_key_db_type=primary_key_db_type,
1612        datetime_column=dt_col,
1613    )
1614    if schema:
1615        create_table_queries = (
1616            get_create_schema_if_not_exists_queries(schema, self.flavor)
1617            + create_table_queries
1618        )
1619    success = all(
1620        self.exec_queries(create_table_queries, break_on_error=True, rollback=True, debug=debug)
1621    )
1622    target_name = sql_item_name(pipe.target, schema=self.get_pipe_schema(pipe), flavor=self.flavor)
1623    msg = (
1624        "Success"
1625        if success
1626        else f"Failed to create {target_name}."
1627    )
1628    if success and self.flavor == 'geopackage':
1629        return self._init_geopackage_table(df, target, debug=debug)
1630
1631    return success, msg

Create a pipe's table from its configured dtypes and an incoming dataframe.

def get_pipe_columns_indices( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[Dict[str, str]]]:
3127def get_pipe_columns_indices(
3128    self,
3129    pipe: mrsm.Pipe,
3130    debug: bool = False,
3131) -> Dict[str, List[Dict[str, str]]]:
3132    """
3133    Return a dictionary mapping columns to the indices created on those columns.
3134
3135    Parameters
3136    ----------
3137    pipe: mrsm.Pipe
3138        The pipe to be queried against.
3139
3140    Returns
3141    -------
3142    A dictionary mapping columns names to lists of dictionaries.
3143    The dictionaries in the lists contain the name and type of the indices.
3144    """
3145    if pipe.__dict__.get('_skip_check_indices', False):
3146        return {}
3147
3148    from meerschaum.utils.sql import get_table_cols_indices
3149    return get_table_cols_indices(
3150        pipe.target,
3151        self,
3152        flavor=self.flavor,
3153        schema=self.get_pipe_schema(pipe),
3154        debug=debug,
3155    )

Return a dictionary mapping columns to the indices created on those columns.

Parameters
  • pipe (mrsm.Pipe): The pipe to be queried against.
Returns
  • A dictionary mapping columns names to lists of dictionaries.
  • The dictionaries in the lists contain the name and type of the indices.
@staticmethod
def get_temporary_target( target: str, transact_id: Optional[str] = None, label: Optional[str] = None, separator: Optional[str] = None) -> str:
3906@staticmethod
3907def get_temporary_target(
3908    target: str,
3909    transact_id: Optional[str] = None,
3910    label: Optional[str] = None,
3911    separator: Optional[str] = None,
3912) -> str:
3913    """
3914    Return a unique(ish) temporary target for a pipe.
3915    """
3916    from meerschaum.utils.misc import generate_password
3917    temp_target_cf = (
3918        mrsm.get_config('system', 'connectors', 'sql', 'instance', 'temporary_target') or {}
3919    )
3920    transaction_id_len = temp_target_cf.get('transaction_id_length', 3)
3921    transact_id = transact_id or generate_password(transaction_id_len)
3922    temp_prefix = temp_target_cf.get('prefix', '_')
3923    separator = separator or temp_target_cf.get('separator', '_')
3924    return (
3925        temp_prefix
3926        + target
3927        + separator
3928        + transact_id
3929        + ((separator + label) if label else '')
3930    )

Return a unique(ish) temporary target for a pipe.

def create_pipe_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, debug: bool = False) -> Tuple[bool, str]:
351def create_pipe_indices(
352    self,
353    pipe: mrsm.Pipe,
354    columns: Optional[List[str]] = None,
355    debug: bool = False,
356) -> SuccessTuple:
357    """
358    Create a pipe's indices.
359    """
360    success = self.create_indices(pipe, columns=columns, debug=debug)
361    msg = (
362        "Success"
363        if success
364        else f"Failed to create indices for {pipe}."
365    )
366    return success, msg

Create a pipe's indices.

def drop_pipe_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, debug: bool = False) -> Tuple[bool, str]:
407def drop_pipe_indices(
408    self,
409    pipe: mrsm.Pipe,
410    columns: Optional[List[str]] = None,
411    debug: bool = False,
412) -> SuccessTuple:
413    """
414    Drop a pipe's indices.
415    """
416    success = self.drop_indices(pipe, columns=columns, debug=debug)
417    msg = (
418        "Success"
419        if success
420        else f"Failed to drop indices for {pipe}."
421    )
422    return success, msg

Drop a pipe's indices.

def get_pipe_index_names(self, pipe: meerschaum.Pipe) -> Dict[str, str]:
459def get_pipe_index_names(self, pipe: mrsm.Pipe) -> Dict[str, str]:
460    """
461    Return a dictionary mapping index keys to their names on the database.
462
463    Returns
464    -------
465    A dictionary of index keys to column names.
466    """
467    from meerschaum.utils.sql import DEFAULT_SCHEMA_FLAVORS, truncate_item_name
468    _parameters = pipe.parameters
469    _index_template = _parameters.get('index_template', "IX_{schema_str}{target}_{column_names}")
470    _schema = self.get_pipe_schema(pipe)
471    if _schema is None:
472        _schema = (
473            DEFAULT_SCHEMA_FLAVORS.get(self.flavor, None)
474            if self.flavor != 'mssql'
475            else None
476        )
477    schema_str = '' if _schema is None else f'{_schema}_'
478    schema_str = ''
479    _indices = pipe.indices
480    _target = pipe.target
481    _column_names = {
482        ix: (
483            '_'.join(cols)
484            if isinstance(cols, (list, tuple))
485            else str(cols)
486        )
487        for ix, cols in _indices.items()
488        if cols
489    }
490    _index_names = {
491        ix: _index_template.format(
492            target=_target,
493            column_names=column_names,
494            connector_keys=pipe.connector_keys,
495            metric_key=pipe.metric_key,
496            location_key=pipe.location_key,
497            schema_str=schema_str,
498        )
499        for ix, column_names in _column_names.items()
500    }
501    ### NOTE: Skip any duplicate indices.
502    seen_index_names = {}
503    for ix, index_name in _index_names.items():
504        if index_name in seen_index_names:
505            continue
506        seen_index_names[index_name] = ix
507    return {
508        ix: truncate_item_name(index_name, flavor=self.flavor)
509        for index_name, ix in seen_index_names.items()
510    }

Return a dictionary mapping index keys to their names on the database.

Returns
  • A dictionary of index keys to column names.
def get_plugins_pipe(self) -> meerschaum.Pipe:
18def get_plugins_pipe(self) -> mrsm.Pipe:
19    """
20    Return the internal metadata plugins pipe.
21    """
22    users_pipe = self.get_users_pipe()
23    user_id_dtype = users_pipe.dtypes.get('user_id', 'int')
24    return mrsm.Pipe(
25        'mrsm', 'plugins',
26        instance=self,
27        temporary=True,
28        static=True,
29        null_indices=False,
30        columns={
31            'primary': 'plugin_id',
32            'user_id': 'user_id',    
33        },
34        dtypes={
35            'plugin_name': 'string',
36            'user_id': user_id_dtype,
37            'attributes': 'json',
38            'version': 'string',
39        },
40        indices={
41            'unique': 'plugin_name',
42        },
43    )

Return the internal metadata plugins pipe.

def register_plugin( self, plugin: meerschaum.Plugin, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 46def register_plugin(
 47    self,
 48    plugin: 'mrsm.core.Plugin',
 49    force: bool = False,
 50    debug: bool = False,
 51    **kw: Any
 52) -> SuccessTuple:
 53    """Register a new plugin to the plugins table."""
 54    from meerschaum.utils.packages import attempt_import
 55    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
 56    from meerschaum.utils.sql import json_flavors
 57    from meerschaum.connectors.sql.tables import get_tables
 58    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
 59
 60    old_id = self.get_plugin_id(plugin, debug=debug)
 61
 62    ### Check for version conflict. May be overridden with `--force`.
 63    if old_id is not None and not force:
 64        old_version = self.get_plugin_version(plugin, debug=debug)
 65        new_version = plugin.version
 66        if old_version is None:
 67            old_version = ''
 68        if new_version is None:
 69            new_version = ''
 70
 71        ### verify that the new version is greater than the old
 72        packaging_version = attempt_import('packaging.version')
 73        if (
 74            old_version and new_version
 75            and packaging_version.parse(old_version) >= packaging_version.parse(new_version)
 76        ):
 77            return False, (
 78                f"Version '{new_version}' of plugin '{plugin}' " +
 79                f"must be greater than existing version '{old_version}'."
 80            )
 81
 82    bind_variables = {
 83        'plugin_name': plugin.name,
 84        'version': plugin.version,
 85        'attributes': (
 86            json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes
 87        ),
 88        'user_id': plugin.user_id,
 89    }
 90
 91    if old_id is None:
 92        query = sqlalchemy.insert(plugins_tbl).values(**bind_variables)
 93    else:
 94        query = (
 95            sqlalchemy.update(plugins_tbl)
 96            .values(**bind_variables)
 97            .where(plugins_tbl.c.plugin_id == old_id)
 98        )
 99
100    result = self.exec(query, debug=debug)
101    if result is None:
102        return False, f"Failed to register plugin '{plugin}'."
103    return True, f"Successfully registered plugin '{plugin}'."

Register a new plugin to the plugins table.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
272def delete_plugin(
273    self,
274    plugin: 'mrsm.core.Plugin',
275    debug: bool = False,
276    **kw: Any
277) -> SuccessTuple:
278    """Delete a plugin from the plugins table."""
279    from meerschaum.utils.packages import attempt_import
280    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
281    from meerschaum.connectors.sql.tables import get_tables
282    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
283
284    plugin_id = self.get_plugin_id(plugin, debug=debug)
285    if plugin_id is None:
286        return True, f"Plugin '{plugin}' was not registered."
287
288    query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id)
289    result = self.exec(query, debug=debug)
290    if result is None:
291        return False, f"Failed to delete plugin '{plugin}'."
292    return True, f"Successfully deleted plugin '{plugin}'."

Delete a plugin from the plugins table.

def get_plugin_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[int]:
105def get_plugin_id(
106    self,
107    plugin: 'mrsm.core.Plugin',
108    debug: bool = False
109) -> Optional[int]:
110    """
111    Return a plugin's ID.
112    """
113    ### ensure plugins table exists
114    from meerschaum.connectors.sql.tables import get_tables
115    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
116    from meerschaum.utils.packages import attempt_import
117    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
118
119    query = (
120        sqlalchemy
121        .select(plugins_tbl.c.plugin_id)
122        .where(plugins_tbl.c.plugin_name == plugin.name)
123    )
124    
125    try:
126        return int(self.value(query, debug=debug))
127    except Exception:
128        return None

Return a plugin's ID.

def get_plugin_version( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
131def get_plugin_version(
132    self,
133    plugin: 'mrsm.core.Plugin',
134    debug: bool = False
135) -> Optional[str]:
136    """
137    Return a plugin's version.
138    """
139    ### ensure plugins table exists
140    from meerschaum.connectors.sql.tables import get_tables
141    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
142    from meerschaum.utils.packages import attempt_import
143    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
144    query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name)
145    return self.value(query, debug=debug)

Return a plugin's version.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) -> List[str]:
225def get_plugins(
226    self,
227    user_id: Optional[int] = None,
228    search_term: Optional[str] = None,
229    debug: bool = False,
230    **kw: Any
231) -> List[str]:
232    """
233    Return a list of all registered plugins.
234
235    Parameters
236    ----------
237    user_id: Optional[int], default None
238        If specified, filter plugins by a specific `user_id`.
239
240    search_term: Optional[str], default None
241        If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins.
242
243
244    Returns
245    -------
246    A list of plugin names.
247    """
248    ### ensure plugins table exists
249    from meerschaum.connectors.sql.tables import get_tables
250    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
251    from meerschaum.utils.packages import attempt_import
252    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
253
254    query = sqlalchemy.select(plugins_tbl.c.plugin_name)
255    if user_id is not None:
256        query = query.where(plugins_tbl.c.user_id == user_id)
257    if search_term is not None:
258        query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%'))
259
260    rows = (
261        self.execute(query).fetchall()
262        if self.flavor != 'duckdb'
263        else [
264            (row['plugin_name'],)
265            for row in self.read(query).to_dict(orient='records')
266        ]
267    )
268    
269    return [row[0] for row in rows]

Return a list of all registered plugins.

Parameters
  • user_id (Optional[int], default None): If specified, filter plugins by a specific user_id.
  • search_term (Optional[str], default None): If specified, add a WHERE plugin_name LIKE '{search_term}%' clause to filter the plugins.
Returns
  • A list of plugin names.
def get_plugin_user_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[int]:
147def get_plugin_user_id(
148    self,
149    plugin: 'mrsm.core.Plugin',
150    debug: bool = False
151) -> Optional[int]:
152    """
153    Return a plugin's user ID.
154    """
155    ### ensure plugins table exists
156    from meerschaum.connectors.sql.tables import get_tables
157    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
158    from meerschaum.utils.packages import attempt_import
159    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
160
161    query = (
162        sqlalchemy
163        .select(plugins_tbl.c.user_id)
164        .where(plugins_tbl.c.plugin_name == plugin.name)
165    )
166
167    try:
168        return int(self.value(query, debug=debug))
169    except Exception:
170        return None

Return a plugin's user ID.

def get_plugin_username( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
172def get_plugin_username(
173    self,
174    plugin: 'mrsm.core.Plugin',
175    debug: bool = False
176) -> Optional[str]:
177    """
178    Return the username of a plugin's owner.
179    """
180    ### ensure plugins table exists
181    from meerschaum.connectors.sql.tables import get_tables
182    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
183    users = get_tables(mrsm_instance=self, debug=debug)['users']
184    from meerschaum.utils.packages import attempt_import
185    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
186
187    query = (
188        sqlalchemy.select(users.c.username)
189        .where(
190            users.c.user_id == plugins_tbl.c.user_id
191            and plugins_tbl.c.plugin_name == plugin.name
192        )
193    )
194
195    return self.value(query, debug=debug)

Return the username of a plugin's owner.

def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
198def get_plugin_attributes(
199    self,
200    plugin: 'mrsm.core.Plugin',
201    debug: bool = False
202) -> Dict[str, Any]:
203    """
204    Return the attributes of a plugin.
205    """
206    ### ensure plugins table exists
207    from meerschaum.connectors.sql.tables import get_tables
208    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
209    from meerschaum.utils.packages import attempt_import
210    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
211
212    query = (
213        sqlalchemy
214        .select(plugins_tbl.c.attributes)
215        .where(plugins_tbl.c.plugin_name == plugin.name)
216    )
217
218    _attr = self.value(query, debug=debug)
219    if isinstance(_attr, str):
220        _attr = json.loads(_attr)
221    elif _attr is None:
222        _attr = {}
223    return _attr

Return the attributes of a plugin.

def get_users_pipe(self) -> meerschaum.Pipe:
16def get_users_pipe(self) -> mrsm.Pipe:
17    """
18    Return the internal metadata pipe for users management.
19    """
20    if '_users_pipe' in self.__dict__:
21        return self._users_pipe
22
23    cache_connector = self.__dict__.get('_cache_connector', None)
24    self._users_pipe = mrsm.Pipe(
25        'mrsm', 'users',
26        temporary=True,
27        cache=True,
28        cache_connector_keys=cache_connector,
29        static=True,
30        null_indices=False,
31        enforce=False,
32        autoincrement=True,
33        columns={
34            'primary': 'user_id',
35        },
36        dtypes={
37            'user_id': 'int',
38            'username': 'string',
39            'attributes': 'json',
40            'user_type': 'string',
41        },
42        indices={
43            'unique': 'username',
44        },
45    )
46    return self._users_pipe

Return the internal metadata pipe for users management.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
49def register_user(
50    self,
51    user: mrsm.core.User,
52    debug: bool = False,
53    **kw: Any
54) -> SuccessTuple:
55    """Register a new user."""
56    from meerschaum.utils.packages import attempt_import
57    from meerschaum.utils.sql import json_flavors
58    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
59
60    valid_tuple = valid_username(user.username)
61    if not valid_tuple[0]:
62        return valid_tuple
63
64    old_id = self.get_user_id(user, debug=debug)
65
66    if old_id is not None:
67        return False, f"User '{user}' already exists."
68
69    ### ensure users table exists
70    from meerschaum.connectors.sql.tables import get_tables
71    tables = get_tables(mrsm_instance=self, debug=debug)
72
73    import json
74    bind_variables = {
75        'username': user.username,
76        'email': user.email,
77        'password_hash': user.password_hash,
78        'user_type': user.type,
79        'attributes': (
80            json.dumps(user.attributes)
81            if self.flavor not in json_flavors
82            else user.attributes
83        ),
84    }
85    if old_id is not None:
86        return False, f"User '{user.username}' already exists."
87    if old_id is None:
88        query = (
89            sqlalchemy.insert(tables['users']).
90            values(**bind_variables)
91        )
92
93    result = self.exec(query, debug=debug)
94    if result is None:
95        return False, f"Failed to register user '{user}'."
96    return True, f"Successfully registered user '{user}'."

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[int]:
188def get_user_id(
189    self,
190    user: 'mrsm.core.User',
191    debug: bool = False
192) -> Optional[int]:
193    """If a user is registered, return the `user_id`."""
194    ### ensure users table exists
195    from meerschaum.utils.packages import attempt_import
196    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
197    from meerschaum.connectors.sql.tables import get_tables
198    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
199
200    query = (
201        sqlalchemy.select(users_tbl.c.user_id)
202        .where(users_tbl.c.username == user.username)
203    )
204
205    result = self.value(query, debug=debug)
206    if result is not None:
207        return int(result)
208    return None

If a user is registered, return the user_id.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
282def get_users(
283    self,
284    debug: bool = False,
285    **kw: Any
286) -> List[str]:
287    """
288    Get the registered usernames.
289    """
290    ### ensure users table exists
291    from meerschaum.connectors.sql.tables import get_tables
292    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
293    from meerschaum.utils.packages import attempt_import
294    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
295
296    query = sqlalchemy.select(users_tbl.c.username)
297
298    return list(self.read(query, debug=debug)['username'])

Get the registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
133def edit_user(
134    self,
135    user: 'mrsm.core.User',
136    debug: bool = False,
137    **kw: Any
138) -> SuccessTuple:
139    """Update an existing user's metadata."""
140    from meerschaum.utils.packages import attempt_import
141    from meerschaum.utils.sql import json_flavors
142    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
143    from meerschaum.connectors.sql.tables import get_tables
144    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
145
146    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
147    if user_id is None:
148        return False, (
149            f"User '{user.username}' does not exist. "
150            f"Register user '{user.username}' before editing."
151        )
152    user.user_id = user_id
153
154    import json
155    valid_tuple = valid_username(user.username)
156    if not valid_tuple[0]:
157        return valid_tuple
158
159    bind_variables = {
160        'user_id' : user_id,
161        'username' : user.username,
162    }
163    if user.password != '':
164        bind_variables['password_hash'] = user.password_hash
165    if user.email != '':
166        bind_variables['email'] = user.email
167    if user.attributes is not None and user.attributes != {}:
168        bind_variables['attributes'] = (
169            json.dumps(user.attributes) if self.flavor not in json_flavors
170            else user.attributes
171        )
172    if user.type != '':
173        bind_variables['user_type'] = user.type
174
175    query = (
176        sqlalchemy
177        .update(users_tbl)
178        .values(**bind_variables)
179        .where(users_tbl.c.user_id == user_id)
180    )
181
182    result = self.exec(query, debug=debug)
183    if result is None:
184        return False, f"Failed to edit user '{user}'."
185    return True, f"Successfully edited user '{user}'."

Update an existing user's metadata.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Tuple[bool, str]:
250def delete_user(
251    self,
252    user: 'mrsm.core.User',
253    debug: bool = False
254) -> SuccessTuple:
255    """Delete a user's record from the users table."""
256    ### ensure users table exists
257    from meerschaum.connectors.sql.tables import get_tables
258    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
259    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
260    from meerschaum.utils.packages import attempt_import
261    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
262
263    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
264
265    if user_id is None:
266        return False, f"User '{user.username}' is not registered and cannot be deleted."
267
268    query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id)
269
270    result = self.exec(query, debug=debug)
271    if result is None:
272        return False, f"Failed to delete user '{user}'."
273
274    query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id)
275    result = self.exec(query, debug=debug)
276    if result is None:
277        return False, f"Failed to delete plugins of user '{user}'."
278
279    return True, f"Successfully deleted user '{user}'"

Delete a user's record from the users table.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
301def get_user_password_hash(
302    self,
303    user: 'mrsm.core.User',
304    debug: bool = False,
305    **kw: Any
306) -> Optional[str]:
307    """
308    Return the password has for a user.
309    **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it.
310    """
311    from meerschaum.utils.debug import dprint
312    from meerschaum.connectors.sql.tables import get_tables
313    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
314    from meerschaum.utils.packages import attempt_import
315    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
316
317    if user.user_id is not None:
318        user_id = user.user_id
319        if debug:
320            dprint(f"Already given user_id: {user_id}")
321    else:
322        if debug:
323            dprint("Fetching user_id...")
324        user_id = self.get_user_id(user, debug=debug)
325
326    if user_id is None:
327        return None
328
329    query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id)
330
331    return self.value(query, debug=debug)

Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
334def get_user_type(
335    self,
336    user: 'mrsm.core.User',
337    debug: bool = False,
338    **kw: Any
339) -> Optional[str]:
340    """
341    Return the user's type.
342    """
343    from meerschaum.connectors.sql.tables import get_tables
344    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
345    from meerschaum.utils.packages import attempt_import
346    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
347
348    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
349
350    if user_id is None:
351        return None
352
353    query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id)
354
355    return self.value(query, debug=debug)

Return the user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[Dict[str, Any]]:
210def get_user_attributes(
211    self,
212    user: 'mrsm.core.User',
213    debug: bool = False
214) -> Union[Dict[str, Any], None]:
215    """
216    Return the user's attributes.
217    """
218    ### ensure users table exists
219    from meerschaum.utils.warnings import warn
220    from meerschaum.utils.packages import attempt_import
221    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
222    from meerschaum.connectors.sql.tables import get_tables
223    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
224
225    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
226
227    query = (
228        sqlalchemy.select(users_tbl.c.attributes)
229        .where(users_tbl.c.user_id == user_id)
230    )
231
232    result = self.value(query, debug=debug)
233    if result is not None and not isinstance(result, dict):
234        try:
235            result = dict(result)
236            _parsed = True
237        except Exception:
238            _parsed = False
239        if not _parsed:
240            try:
241                import json
242                result = json.loads(result)
243                _parsed = True
244            except Exception:
245                _parsed = False
246        if not _parsed:
247            warn(f"Received unexpected type for attributes: {result}")
248    return result

Return the user's attributes.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[SQLConnector, Dict[str, Union[str, int]]]:
15@classmethod
16def from_uri(
17    cls,
18    uri: str,
19    label: Optional[str] = None,
20    as_dict: bool = False,
21) -> Union[
22    'meerschaum.connectors.SQLConnector',
23    Dict[str, Union[str, int]],
24]:
25    """
26    Create a new SQLConnector from a URI string.
27
28    Parameters
29    ----------
30    uri: str
31        The URI connection string.
32
33    label: Optional[str], default None
34        If provided, use this as the connector label.
35        Otherwise use the determined database name.
36
37    as_dict: bool, default False
38        If `True`, return a dictionary of the keyword arguments
39        necessary to create a new `SQLConnector`, otherwise create a new object.
40
41    Returns
42    -------
43    A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`).
44    """
45
46    params = cls.parse_uri(uri)
47    params['uri'] = uri
48    flavor = params.get('flavor', None)
49    if not flavor or flavor not in cls.flavor_configs:
50        error(f"Invalid flavor '{flavor}' detected from the provided URI.")
51
52    if 'database' not in params:
53        error("Unable to determine the database from the provided URI.")
54
55    if flavor in ('sqlite', 'duckdb', 'geopackage'):
56        if params['database'] == ':memory:':
57            params['label'] = label or f'memory_{flavor}'
58        else:
59            params['label'] = label or params['database'].split(os.path.sep)[-1].lower()
60    else:
61        params['label'] = label or (
62            (
63                (params['username'] + '@' if 'username' in params else '')
64                + params.get('host', '')
65                + ('/' if 'host' in params else '')
66                + params.get('database', '')
67            ).lower()
68        )
69
70    return cls(**params) if not as_dict else params

Create a new SQLConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new SQLConnector, otherwise create a new object.
Returns
  • A new SQLConnector object or a dictionary of attributes (if as_dict is True).
@staticmethod
def parse_uri(uri: str) -> Dict[str, Any]:
 73@staticmethod
 74def parse_uri(uri: str) -> Dict[str, Any]:
 75    """
 76    Parse a URI string into a dictionary of parameters.
 77
 78    Parameters
 79    ----------
 80    uri: str
 81        The database connection URI.
 82
 83    Returns
 84    -------
 85    A dictionary of attributes.
 86
 87    Examples
 88    --------
 89    >>> parse_uri('sqlite:////home/foo/bar.db')
 90    {'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
 91    >>> parse_uri(
 92    ...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
 93    ...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
 94    ... )
 95    {'host': 'localhost', 'database': 'master', 'username': 'sa',
 96    'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
 97    'driver': 'ODBC Driver 17 for SQL Server'}
 98    >>> 
 99    """
100    from urllib.parse import parse_qs, urlparse
101    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
102    parser = sqlalchemy.engine.url.make_url
103    params = parser(uri).translate_connect_args()
104    params['flavor'] = uri.split(':')[0].split('+')[0]
105    if params['flavor'] == 'postgres':
106        params['flavor'] = 'postgresql'
107    if '?' in uri:
108        parsed_uri = urlparse(uri)
109        for key, value in parse_qs(parsed_uri.query).items():
110            params.update({key: value[0]})
111
112        if '--search_path' in params.get('options', ''):
113            params.update({'schema': params['options'].replace('--search_path=', '', 1)})
114    return params

Parse a URI string into a dictionary of parameters.

Parameters
  • uri (str): The database connection URI.
Returns
  • A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>