meerschaum.connectors.sql

Subpackage for SQLConnector subclass

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Subpackage for SQLConnector subclass
 7"""
 8
 9from meerschaum.connectors.sql._SQLConnector import SQLConnector
10
11__all__ = ('SQLConnector',)
class SQLConnector(meerschaum.connectors._Connector.Connector):
 18class SQLConnector(Connector):
 19    """
 20    Connect to SQL databases via `sqlalchemy`.
 21    
 22    SQLConnectors may be used as Meerschaum instance connectors.
 23    Read more about connectors and instances at
 24    https://meerschaum.io/reference/connectors/
 25
 26    """
 27
 28    IS_INSTANCE: bool = True
 29
 30    from ._create_engine import flavor_configs, create_engine
 31    from ._sql import (
 32        read,
 33        value,
 34        exec,
 35        execute,
 36        to_sql,
 37        exec_queries,
 38        get_connection,
 39        _cleanup_connections,
 40    )
 41    from meerschaum.utils.sql import test_connection
 42    from ._fetch import fetch, get_pipe_metadef
 43    from ._cli import cli, _cli_exit
 44    from ._pipes import (
 45        fetch_pipes_keys,
 46        create_indices,
 47        drop_indices,
 48        get_create_index_queries,
 49        get_drop_index_queries,
 50        get_add_columns_queries,
 51        get_alter_columns_queries,
 52        delete_pipe,
 53        get_pipe_data,
 54        get_pipe_data_query,
 55        register_pipe,
 56        edit_pipe,
 57        get_pipe_id,
 58        get_pipe_attributes,
 59        sync_pipe,
 60        sync_pipe_inplace,
 61        get_sync_time,
 62        pipe_exists,
 63        get_pipe_rowcount,
 64        drop_pipe,
 65        clear_pipe,
 66        deduplicate_pipe,
 67        get_pipe_table,
 68        get_pipe_columns_types,
 69        get_to_sql_dtype,
 70        get_pipe_schema,
 71        create_pipe_table_from_df,
 72        get_pipe_columns_indices,
 73        get_temporary_target,
 74        create_pipe_indices,
 75        drop_pipe_indices,
 76        get_pipe_index_names,
 77    )
 78    from ._plugins import (
 79        register_plugin,
 80        delete_plugin,
 81        get_plugin_id,
 82        get_plugin_version,
 83        get_plugins,
 84        get_plugin_user_id,
 85        get_plugin_username,
 86        get_plugin_attributes,
 87    )
 88    from ._users import (
 89        register_user,
 90        get_user_id,
 91        get_users,
 92        edit_user,
 93        delete_user,
 94        get_user_password_hash,
 95        get_user_type,
 96        get_user_attributes,
 97    )
 98    from ._uri import from_uri, parse_uri
 99    from ._instance import (
100        _log_temporary_tables_creation,
101        _drop_temporary_table,
102        _drop_temporary_tables,
103        _drop_old_temporary_tables,
104    )
105
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        flavor: Optional[str] = None,
110        wait: bool = False,
111        connect: bool = False,
112        debug: bool = False,
113        **kw: Any
114    ):
115        """
116        Parameters
117        ----------
118        label: str, default 'main'
119            The identifying label for the connector.
120            E.g. for `sql:main`, 'main' is the label.
121            Defaults to 'main'.
122
123        flavor: Optional[str], default None
124            The database flavor, e.g.
125            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
126            To see supported flavors, run the `bootstrap connectors` command.
127
128        wait: bool, default False
129            If `True`, block until a database connection has been made.
130            Defaults to `False`.
131
132        connect: bool, default False
133            If `True`, immediately attempt to connect the database and raise
134            a warning if the connection fails.
135            Defaults to `False`.
136
137        debug: bool, default False
138            Verbosity toggle.
139            Defaults to `False`.
140
141        kw: Any
142            All other arguments will be passed to the connector's attributes.
143            Therefore, a connector may be made without being registered,
144            as long enough parameters are supplied to the constructor.
145        """
146        if 'uri' in kw:
147            uri = kw['uri']
148            if uri.startswith('postgres') and not uri.startswith('postgresql'):
149                uri = uri.replace('postgres', 'postgresql', 1)
150            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
151                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
152            if uri.startswith('timescaledb://'):
153                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
154                flavor = 'timescaledb'
155            if uri.startswith('postgis://'):
156                uri = uri.replace('postgis://', 'postgresql+psycopg://', 1)
157                flavor = 'postgis'
158            kw['uri'] = uri
159            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
160            label = label or from_uri_params.get('label', None)
161            _ = from_uri_params.pop('label', None)
162
163            ### Sometimes the flavor may be provided with a URI.
164            kw.update(from_uri_params)
165            if flavor:
166                kw['flavor'] = flavor
167
168        ### set __dict__ in base class
169        super().__init__(
170            'sql',
171            label = label or self.__dict__.get('label', None),
172            **kw
173        )
174
175        if self.__dict__.get('flavor', None) == 'sqlite':
176            self._reset_attributes()
177            self._set_attributes(
178                'sql',
179                label = label,
180                inherit_default = False,
181                **kw
182            )
183            ### For backwards compatability reasons, set the path for sql:local if its missing.
184            if self.label == 'local' and not self.__dict__.get('database', None):
185                from meerschaum.config._paths import SQLITE_DB_PATH
186                self.database = str(SQLITE_DB_PATH)
187
188        ### ensure flavor and label are set accordingly
189        if 'flavor' not in self.__dict__:
190            if flavor is None and 'uri' not in self.__dict__:
191                raise ValueError(
192                    f"    Missing flavor. Provide flavor as a key for '{self}'."
193                )
194            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
195
196        if self.flavor == 'postgres':
197            self.flavor = 'postgresql'
198
199        self._debug = debug
200        ### Store the PID and thread at initialization
201        ### so we can dispose of the Pool in child processes or threads.
202        import os
203        import threading
204        self._pid = os.getpid()
205        self._thread_ident = threading.current_thread().ident
206        self._sessions = {}
207        self._locks = {'_sessions': threading.RLock(), }
208
209        ### verify the flavor's requirements are met
210        if self.flavor not in self.flavor_configs:
211            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
212        if not self.__dict__.get('uri'):
213            self.verify_attributes(
214                self.flavor_configs[self.flavor].get('requirements', set()),
215                debug=debug,
216            )
217
218        if wait:
219            from meerschaum.connectors.poll import retry_connect
220            retry_connect(connector=self, debug=debug)
221
222        if connect:
223            if not self.test_connection(debug=debug):
224                warn(f"Failed to connect with connector '{self}'!", stack=False)
225
226    @property
227    def Session(self):
228        if '_Session' not in self.__dict__:
229            if self.engine is None:
230                return None
231
232            from meerschaum.utils.packages import attempt_import
233            sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False)
234            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
235            self._Session = sqlalchemy_orm.scoped_session(session_factory)
236
237        return self._Session
238
239    @property
240    def engine(self):
241        """
242        Return the SQLAlchemy engine connected to the configured database.
243        """
244        import os
245        import threading
246        if '_engine' not in self.__dict__:
247            self._engine, self._engine_str = self.create_engine(include_uri=True)
248
249        same_process = os.getpid() == self._pid
250        same_thread = threading.current_thread().ident == self._thread_ident
251
252        ### handle child processes
253        if not same_process:
254            self._pid = os.getpid()
255            self._thread = threading.current_thread()
256            warn("Different PID detected. Disposing of connections...")
257            self._engine.dispose()
258
259        ### handle different threads
260        if not same_thread:
261            if self.flavor == 'duckdb':
262                warn("Different thread detected.")
263                self._engine.dispose()
264
265        return self._engine
266
267    @property
268    def DATABASE_URL(self) -> str:
269        """
270        Return the URI connection string (alias for `SQLConnector.URI`.
271        """
272        _ = self.engine
273        return str(self._engine_str)
274
275    @property
276    def URI(self) -> str:
277        """
278        Return the URI connection string.
279        """
280        _ = self.engine
281        return str(self._engine_str)
282
283    @property
284    def IS_THREAD_SAFE(self) -> str:
285        """
286        Return whether this connector may be multithreaded.
287        """
288        if self.flavor in ('duckdb', 'oracle'):
289            return False
290        if self.flavor == 'sqlite':
291            return ':memory:' not in self.URI
292        return True
293
294    @property
295    def metadata(self):
296        """
297        Return the metadata bound to this configured schema.
298        """
299        from meerschaum.utils.packages import attempt_import
300        sqlalchemy = attempt_import('sqlalchemy', lazy=False)
301        if '_metadata' not in self.__dict__:
302            self._metadata = sqlalchemy.MetaData(schema=self.schema)
303        return self._metadata
304
305    @property
306    def instance_schema(self):
307        """
308        Return the schema name for Meerschaum tables. 
309        """
310        return self.schema
311
312    @property
313    def internal_schema(self):
314        """
315        Return the schema name for internal tables. 
316        """
317        from meerschaum.config.static import STATIC_CONFIG
318        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
319        schema_name = self.__dict__.get('internal_schema', None) or (
320            STATIC_CONFIG['sql']['internal_schema']
321            if self.flavor not in NO_SCHEMA_FLAVORS
322            else self.schema
323        )
324
325        if '_internal_schema' not in self.__dict__:
326            self._internal_schema = schema_name
327        return self._internal_schema
328
329    @property
330    def db(self) -> Optional[databases.Database]:
331        from meerschaum.utils.packages import attempt_import
332        databases = attempt_import('databases', lazy=False, install=True)
333        url = self.DATABASE_URL
334        if 'mysql' in url:
335            url = url.replace('+pymysql', '')
336        if '_db' not in self.__dict__:
337            try:
338                self._db = databases.Database(url)
339            except KeyError:
340                ### Likely encountered an unsupported flavor.
341                from meerschaum.utils.warnings import warn
342                self._db = None
343        return self._db
344
345    @property
346    def db_version(self) -> Union[str, None]:
347        """
348        Return the database version.
349        """
350        _db_version = self.__dict__.get('_db_version', None)
351        if _db_version is not None:
352            return _db_version
353
354        from meerschaum.utils.sql import get_db_version
355        self._db_version = get_db_version(self)
356        return self._db_version
357
358    @property
359    def schema(self) -> Union[str, None]:
360        """
361        Return the default schema to use.
362        A value of `None` will not prepend a schema.
363        """
364        if 'schema' in self.__dict__:
365            return self.__dict__['schema']
366
367        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
368        if self.flavor in NO_SCHEMA_FLAVORS:
369            self.__dict__['schema'] = None
370            return None
371
372        sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
373        _schema = sqlalchemy.inspect(self.engine).default_schema_name
374        self.__dict__['schema'] = _schema
375        return _schema
376
377    def __getstate__(self):
378        return self.__dict__
379
380    def __setstate__(self, d):
381        self.__dict__.update(d)
382
383    def __call__(self):
384        return self

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

SQLConnector( label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        flavor: Optional[str] = None,
110        wait: bool = False,
111        connect: bool = False,
112        debug: bool = False,
113        **kw: Any
114    ):
115        """
116        Parameters
117        ----------
118        label: str, default 'main'
119            The identifying label for the connector.
120            E.g. for `sql:main`, 'main' is the label.
121            Defaults to 'main'.
122
123        flavor: Optional[str], default None
124            The database flavor, e.g.
125            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
126            To see supported flavors, run the `bootstrap connectors` command.
127
128        wait: bool, default False
129            If `True`, block until a database connection has been made.
130            Defaults to `False`.
131
132        connect: bool, default False
133            If `True`, immediately attempt to connect the database and raise
134            a warning if the connection fails.
135            Defaults to `False`.
136
137        debug: bool, default False
138            Verbosity toggle.
139            Defaults to `False`.
140
141        kw: Any
142            All other arguments will be passed to the connector's attributes.
143            Therefore, a connector may be made without being registered,
144            as long enough parameters are supplied to the constructor.
145        """
146        if 'uri' in kw:
147            uri = kw['uri']
148            if uri.startswith('postgres') and not uri.startswith('postgresql'):
149                uri = uri.replace('postgres', 'postgresql', 1)
150            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
151                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
152            if uri.startswith('timescaledb://'):
153                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
154                flavor = 'timescaledb'
155            if uri.startswith('postgis://'):
156                uri = uri.replace('postgis://', 'postgresql+psycopg://', 1)
157                flavor = 'postgis'
158            kw['uri'] = uri
159            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
160            label = label or from_uri_params.get('label', None)
161            _ = from_uri_params.pop('label', None)
162
163            ### Sometimes the flavor may be provided with a URI.
164            kw.update(from_uri_params)
165            if flavor:
166                kw['flavor'] = flavor
167
168        ### set __dict__ in base class
169        super().__init__(
170            'sql',
171            label = label or self.__dict__.get('label', None),
172            **kw
173        )
174
175        if self.__dict__.get('flavor', None) == 'sqlite':
176            self._reset_attributes()
177            self._set_attributes(
178                'sql',
179                label = label,
180                inherit_default = False,
181                **kw
182            )
183            ### For backwards compatability reasons, set the path for sql:local if its missing.
184            if self.label == 'local' and not self.__dict__.get('database', None):
185                from meerschaum.config._paths import SQLITE_DB_PATH
186                self.database = str(SQLITE_DB_PATH)
187
188        ### ensure flavor and label are set accordingly
189        if 'flavor' not in self.__dict__:
190            if flavor is None and 'uri' not in self.__dict__:
191                raise ValueError(
192                    f"    Missing flavor. Provide flavor as a key for '{self}'."
193                )
194            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
195
196        if self.flavor == 'postgres':
197            self.flavor = 'postgresql'
198
199        self._debug = debug
200        ### Store the PID and thread at initialization
201        ### so we can dispose of the Pool in child processes or threads.
202        import os
203        import threading
204        self._pid = os.getpid()
205        self._thread_ident = threading.current_thread().ident
206        self._sessions = {}
207        self._locks = {'_sessions': threading.RLock(), }
208
209        ### verify the flavor's requirements are met
210        if self.flavor not in self.flavor_configs:
211            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
212        if not self.__dict__.get('uri'):
213            self.verify_attributes(
214                self.flavor_configs[self.flavor].get('requirements', set()),
215                debug=debug,
216            )
217
218        if wait:
219            from meerschaum.connectors.poll import retry_connect
220            retry_connect(connector=self, debug=debug)
221
222        if connect:
223            if not self.test_connection(debug=debug):
224                warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
  • label (str, default 'main'): The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
  • flavor (Optional[str], default None): The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
  • wait (bool, default False): If True, block until a database connection has been made. Defaults to False.
  • connect (bool, default False): If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
IS_INSTANCE: bool = True
Session
226    @property
227    def Session(self):
228        if '_Session' not in self.__dict__:
229            if self.engine is None:
230                return None
231
232            from meerschaum.utils.packages import attempt_import
233            sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False)
234            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
235            self._Session = sqlalchemy_orm.scoped_session(session_factory)
236
237        return self._Session
engine
239    @property
240    def engine(self):
241        """
242        Return the SQLAlchemy engine connected to the configured database.
243        """
244        import os
245        import threading
246        if '_engine' not in self.__dict__:
247            self._engine, self._engine_str = self.create_engine(include_uri=True)
248
249        same_process = os.getpid() == self._pid
250        same_thread = threading.current_thread().ident == self._thread_ident
251
252        ### handle child processes
253        if not same_process:
254            self._pid = os.getpid()
255            self._thread = threading.current_thread()
256            warn("Different PID detected. Disposing of connections...")
257            self._engine.dispose()
258
259        ### handle different threads
260        if not same_thread:
261            if self.flavor == 'duckdb':
262                warn("Different thread detected.")
263                self._engine.dispose()
264
265        return self._engine

Return the SQLAlchemy engine connected to the configured database.

DATABASE_URL: str
267    @property
268    def DATABASE_URL(self) -> str:
269        """
270        Return the URI connection string (alias for `SQLConnector.URI`.
271        """
272        _ = self.engine
273        return str(self._engine_str)

Return the URI connection string (alias for SQLConnector.URI.

URI: str
275    @property
276    def URI(self) -> str:
277        """
278        Return the URI connection string.
279        """
280        _ = self.engine
281        return str(self._engine_str)

Return the URI connection string.

IS_THREAD_SAFE: str
283    @property
284    def IS_THREAD_SAFE(self) -> str:
285        """
286        Return whether this connector may be multithreaded.
287        """
288        if self.flavor in ('duckdb', 'oracle'):
289            return False
290        if self.flavor == 'sqlite':
291            return ':memory:' not in self.URI
292        return True

Return whether this connector may be multithreaded.

metadata
294    @property
295    def metadata(self):
296        """
297        Return the metadata bound to this configured schema.
298        """
299        from meerschaum.utils.packages import attempt_import
300        sqlalchemy = attempt_import('sqlalchemy', lazy=False)
301        if '_metadata' not in self.__dict__:
302            self._metadata = sqlalchemy.MetaData(schema=self.schema)
303        return self._metadata

Return the metadata bound to this configured schema.

instance_schema
305    @property
306    def instance_schema(self):
307        """
308        Return the schema name for Meerschaum tables. 
309        """
310        return self.schema

Return the schema name for Meerschaum tables.

internal_schema
312    @property
313    def internal_schema(self):
314        """
315        Return the schema name for internal tables. 
316        """
317        from meerschaum.config.static import STATIC_CONFIG
318        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
319        schema_name = self.__dict__.get('internal_schema', None) or (
320            STATIC_CONFIG['sql']['internal_schema']
321            if self.flavor not in NO_SCHEMA_FLAVORS
322            else self.schema
323        )
324
325        if '_internal_schema' not in self.__dict__:
326            self._internal_schema = schema_name
327        return self._internal_schema

Return the schema name for internal tables.

db: 'Optional[databases.Database]'
329    @property
330    def db(self) -> Optional[databases.Database]:
331        from meerschaum.utils.packages import attempt_import
332        databases = attempt_import('databases', lazy=False, install=True)
333        url = self.DATABASE_URL
334        if 'mysql' in url:
335            url = url.replace('+pymysql', '')
336        if '_db' not in self.__dict__:
337            try:
338                self._db = databases.Database(url)
339            except KeyError:
340                ### Likely encountered an unsupported flavor.
341                from meerschaum.utils.warnings import warn
342                self._db = None
343        return self._db
db_version: Optional[str]
345    @property
346    def db_version(self) -> Union[str, None]:
347        """
348        Return the database version.
349        """
350        _db_version = self.__dict__.get('_db_version', None)
351        if _db_version is not None:
352            return _db_version
353
354        from meerschaum.utils.sql import get_db_version
355        self._db_version = get_db_version(self)
356        return self._db_version

Return the database version.

schema: Optional[str]
358    @property
359    def schema(self) -> Union[str, None]:
360        """
361        Return the default schema to use.
362        A value of `None` will not prepend a schema.
363        """
364        if 'schema' in self.__dict__:
365            return self.__dict__['schema']
366
367        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
368        if self.flavor in NO_SCHEMA_FLAVORS:
369            self.__dict__['schema'] = None
370            return None
371
372        sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
373        _schema = sqlalchemy.inspect(self.engine).default_schema_name
374        self.__dict__['schema'] = _schema
375        return _schema

Return the default schema to use. A value of None will not prepend a schema.

flavor_configs = {'timescaledb': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 5432}}, 'postgresql': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 5432}}, 'postgis': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 5432}}, 'citus': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 5432}}, 'mssql': {'engine': 'mssql+pyodbc', 'create_engine': {'fast_executemany': True, 'use_insertmanyvalues': False, 'isolation_level': 'AUTOCOMMIT', 'use_setinputsizes': False, 'pool_pre_ping': True, 'ignore_no_transaction_on_rollback': True}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 1433, 'options': 'driver=ODBC Driver 18 for SQL Server&UseFMTONLY=Yes&TrustServerCertificate=yes&Encrypt=no&MARS_Connection=yes'}}, 'mysql': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 3306}}, 'mariadb': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 3306}}, 'oracle': {'engine': 'oracle+oracledb', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 1521}}, 'sqlite': {'engine': 'sqlite', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database'}, 'defaults': {}}, 'duckdb': {'engine': 'duckdb', 'create_engine': {}, 'omit_create_engine': {'ALL'}, 'to_sql': {'method': 'multi'}, 'requirements': '', 'defaults': {}}, 'cockroachdb': {'engine': 'cockroachdb', 'omit_create_engine': {'method'}, 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'to_sql': {'method': 'multi'}, 'requirements': {'host'}, 'defaults': {'port': 26257, 'database': 'defaultdb', 'username': 'root', 'password': 'admin'}}}
def create_engine( self, include_uri: bool = False, debug: bool = False, **kw) -> 'sqlalchemy.engine.Engine':
193def create_engine(
194    self,
195    include_uri: bool = False,
196    debug: bool = False,
197    **kw
198) -> 'sqlalchemy.engine.Engine':
199    """Create a sqlalchemy engine by building the engine string."""
200    from meerschaum.utils.packages import attempt_import
201    from meerschaum.utils.warnings import error, warn
202    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
203    import urllib
204    import copy
205    ### Install and patch required drivers.
206    if self.flavor in install_flavor_drivers:
207        _ = attempt_import(
208            *install_flavor_drivers[self.flavor],
209            debug=debug,
210            lazy=False,
211            warn=False,
212        )
213        if self.flavor == 'mssql':
214            _init_mssql_sqlalchemy()
215    if self.flavor in require_patching_flavors:
216        from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution
217        import pathlib
218        for install_name, import_name in require_patching_flavors[self.flavor]:
219            pkg = attempt_import(
220                import_name,
221                debug=debug,
222                lazy=False,
223                warn=False
224            )
225            _monkey_patch_get_distribution(
226                install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm')
227            )
228
229    ### supplement missing values with defaults (e.g. port number)
230    for a, value in flavor_configs[self.flavor]['defaults'].items():
231        if a not in self.__dict__:
232            self.__dict__[a] = value
233
234    ### Verify that everything is in order.
235    if self.flavor not in flavor_configs:
236        error(f"Cannot create a connector with the flavor '{self.flavor}'.")
237
238    _engine = flavor_configs[self.flavor].get('engine', None)
239    _username = self.__dict__.get('username', None)
240    _password = self.__dict__.get('password', None)
241    _host = self.__dict__.get('host', None)
242    _port = self.__dict__.get('port', None)
243    _database = self.__dict__.get('database', None)
244    _options = self.__dict__.get('options', {})
245    if isinstance(_options, str):
246        _options = dict(urllib.parse.parse_qsl(_options))
247    _uri = self.__dict__.get('uri', None)
248
249    ### Handle registering specific dialects (due to installing in virtual environments).
250    if self.flavor in flavor_dialects:
251        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])
252
253    ### self._sys_config was deepcopied and can be updated safely
254    if self.flavor in ("sqlite", "duckdb"):
255        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
256        if 'create_engine' not in self._sys_config:
257            self._sys_config['create_engine'] = {}
258        if 'connect_args' not in self._sys_config['create_engine']:
259            self._sys_config['create_engine']['connect_args'] = {}
260        self._sys_config['create_engine']['connect_args'].update({"check_same_thread": False})
261    else:
262        engine_str = (
263            _engine + "://" + (_username if _username is not None else '') +
264            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
265            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
266            (("/" + _database) if _database is not None else '')
267            + (("?" + urllib.parse.urlencode(_options)) if _options else '')
268        ) if not _uri else _uri
269
270        ### Sometimes the timescaledb:// flavor can slip in.
271        if _uri and self.flavor in _uri:
272            if self.flavor in ('timescaledb', 'postgis'):
273                engine_str = engine_str.replace(self.flavor, 'postgresql', 1)
274            elif _uri.startswith('postgresql://'):
275                engine_str = engine_str.replace('postgresql://', 'postgresql+psycopg2://')
276
277    if debug:
278        dprint(
279            (
280                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
281                    if _password is not None else engine_str
282            ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}"
283        )
284
285    _kw_copy = copy.deepcopy(kw)
286
287    ### NOTE: Order of inheritance:
288    ###       1. Defaults
289    ###       2. System configuration
290    ###       3. Connector configuration
291    ###       4. Keyword arguments
292    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
293    def _apply_create_engine_args(update):
294        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
295            _create_engine_args.update(
296                { k: v for k, v in update.items()
297                    if 'omit_create_engine' not in flavor_configs[self.flavor]
298                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
299                }
300            )
301    _apply_create_engine_args(self._sys_config.get('create_engine', {}))
302    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
303    _apply_create_engine_args(_kw_copy)
304
305    try:
306        engine = sqlalchemy.create_engine(
307            engine_str,
308            ### I know this looks confusing, and maybe it's bad code,
309            ### but it's simple. It dynamically parses the config string
310            ### and splits it to separate the class name (QueuePool)
311            ### from the module name (sqlalchemy.pool).
312            poolclass    = getattr(
313                attempt_import(
314                    ".".join(self._sys_config['poolclass'].split('.')[:-1])
315                ),
316                self._sys_config['poolclass'].split('.')[-1]
317            ),
318            echo         = debug,
319            **_create_engine_args
320        )
321    except Exception:
322        warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False)
323        engine = None
324
325    if include_uri:
326        return engine, engine_str
327    return engine

Create a sqlalchemy engine by building the engine string.

def read( self, query_or_table: 'Union[str, sqlalchemy.Query]', params: Union[Dict[str, Any], List[str], NoneType] = None, dtype: Optional[Dict[str, Any]] = None, coerce_float: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.core.frame.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, schema: Optional[str] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any) -> 'Union[pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None]':
 28def read(
 29    self,
 30    query_or_table: Union[str, sqlalchemy.Query],
 31    params: Union[Dict[str, Any], List[str], None] = None,
 32    dtype: Optional[Dict[str, Any]] = None,
 33    coerce_float: bool = True,
 34    chunksize: Optional[int] = -1,
 35    workers: Optional[int] = None,
 36    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
 37    as_hook_results: bool = False,
 38    chunks: Optional[int] = None,
 39    schema: Optional[str] = None,
 40    as_chunks: bool = False,
 41    as_iterator: bool = False,
 42    as_dask: bool = False,
 43    index_col: Optional[str] = None,
 44    silent: bool = False,
 45    debug: bool = False,
 46    **kw: Any
 47) -> Union[
 48    pandas.DataFrame,
 49    dask.DataFrame,
 50    List[pandas.DataFrame],
 51    List[Any],
 52    None,
 53]:
 54    """
 55    Read a SQL query or table into a pandas dataframe.
 56
 57    Parameters
 58    ----------
 59    query_or_table: Union[str, sqlalchemy.Query]
 60        The SQL query (sqlalchemy Query or string) or name of the table from which to select.
 61
 62    params: Optional[Dict[str, Any]], default None
 63        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
 64        See the pandas documentation for more information:
 65        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
 66
 67    dtype: Optional[Dict[str, Any]], default None
 68        A dictionary of data types to pass to `pandas.read_sql()`.
 69        See the pandas documentation for more information:
 70        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
 71
 72    chunksize: Optional[int], default -1
 73        How many chunks to read at a time. `None` will read everything in one large chunk.
 74        Defaults to system configuration.
 75
 76        **NOTE:** DuckDB does not allow for chunking.
 77
 78    workers: Optional[int], default None
 79        How many threads to use when consuming the generator.
 80        Only applies if `chunk_hook` is provided.
 81
 82    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
 83        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
 84        See `--sync-chunks` for an example.
 85        **NOTE:** `as_iterator` MUST be False (default).
 86
 87    as_hook_results: bool, default False
 88        If `True`, return a `List` of the outputs of the hook function.
 89        Only applicable if `chunk_hook` is not None.
 90
 91        **NOTE:** `as_iterator` MUST be `False` (default).
 92
 93    chunks: Optional[int], default None
 94        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
 95        return into a single dataframe.
 96        For example, to limit the returned dataframe to 100,000 rows,
 97        you could specify a `chunksize` of `1000` and `chunks` of `100`.
 98
 99    schema: Optional[str], default None
100        If just a table name is provided, optionally specify the table schema.
101        Defaults to `SQLConnector.schema`.
102
103    as_chunks: bool, default False
104        If `True`, return a list of DataFrames.
105        Otherwise return a single DataFrame.
106
107    as_iterator: bool, default False
108        If `True`, return the pandas DataFrame iterator.
109        `chunksize` must not be `None` (falls back to 1000 if so),
110        and hooks are not called in this case.
111
112    index_col: Optional[str], default None
113        If using Dask, use this column as the index column.
114        If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
115
116    silent: bool, default False
117        If `True`, don't raise warnings in case of errors.
118        Defaults to `False`.
119
120    Returns
121    -------
122    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
123    or `None` if something breaks.
124
125    """
126    if chunks is not None and chunks <= 0:
127        return []
128    from meerschaum.utils.sql import sql_item_name, truncate_item_name
129    from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone
130    from meerschaum.utils.dtypes.sql import TIMEZONE_NAIVE_FLAVORS
131    from meerschaum.utils.packages import attempt_import, import_pandas
132    from meerschaum.utils.pool import get_pool
133    from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols
134    import warnings
135    import traceback
136    from decimal import Decimal
137    pd = import_pandas()
138    dd = None
139    is_dask = 'dask' in pd.__name__
140    pandas = attempt_import('pandas')
141    is_dask = dd is not None
142    npartitions = chunksize_to_npartitions(chunksize)
143    if is_dask:
144        chunksize = None
145    schema = schema or self.schema
146    utc_dt_cols = [
147        col
148        for col, typ in dtype.items()
149        if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower()
150    ] if dtype else []
151
152    if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS:
153        dtype = dtype.copy()
154        for col in utc_dt_cols:
155            dtype[col] = 'datetime64[ns]'
156
157    pool = get_pool(workers=workers)
158    sqlalchemy = attempt_import("sqlalchemy", lazy=False)
159    default_chunksize = self._sys_config.get('chunksize', None)
160    chunksize = chunksize if chunksize != -1 else default_chunksize
161    if chunksize is None and as_iterator:
162        if not silent and self.flavor not in _disallow_chunks_flavors:
163            warn(
164                "An iterator may only be generated if chunksize is not None.\n"
165                + "Falling back to a chunksize of 1000.", stacklevel=3,
166            )
167        chunksize = 1000
168    if chunksize is not None and self.flavor in _max_chunks_flavors:
169        if chunksize > _max_chunks_flavors[self.flavor]:
170            if chunksize != default_chunksize:
171                warn(
172                    f"The specified chunksize of {chunksize} exceeds the maximum of "
173                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
174                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
175                    stacklevel=3,
176                )
177            chunksize = _max_chunks_flavors[self.flavor]
178
179    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
180        chunksize = None
181
182    if debug:
183        import time
184        start = time.perf_counter()
185        dprint(f"[{self}]\n{query_or_table}")
186        dprint(f"[{self}] Fetching with chunksize: {chunksize}")
187
188    ### This might be sqlalchemy object or the string of a table name.
189    ### We check for spaces and quotes to see if it might be a weird table.
190    if (
191        ' ' not in str(query_or_table)
192        or (
193            ' ' in str(query_or_table)
194            and str(query_or_table).startswith('"')
195            and str(query_or_table).endswith('"')
196        )
197    ):
198        truncated_table_name = truncate_item_name(str(query_or_table), self.flavor)
199        if truncated_table_name != str(query_or_table) and not silent:
200            warn(
201                f"Table '{query_or_table}' is too long for '{self.flavor}',"
202                + f" will instead read the table '{truncated_table_name}'."
203            )
204
205        query_or_table = sql_item_name(str(query_or_table), self.flavor, schema)
206        if debug:
207            dprint(f"[{self}] Reading from table {query_or_table}")
208        formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table))
209        str_query = f"SELECT * FROM {query_or_table}"
210    else:
211        str_query = query_or_table
212
213    formatted_query = (
214        sqlalchemy.text(str_query)
215        if not is_dask and isinstance(str_query, str)
216        else format_sql_query_for_dask(str_query)
217    )
218
219    chunk_list = []
220    chunk_hook_results = []
221    def _process_chunk(_chunk, _retry_on_failure: bool = True):
222        if self.flavor in TIMEZONE_NAIVE_FLAVORS:
223            for col in utc_dt_cols:
224                _chunk[col] = coerce_timezone(_chunk[col], strip_timezone=False)
225        if not as_hook_results:
226            chunk_list.append(_chunk)
227        if chunk_hook is None:
228            return None
229
230        result = None
231        try:
232            result = chunk_hook(
233                _chunk,
234                workers=workers,
235                chunksize=chunksize,
236                debug=debug,
237                **kw
238            )
239        except Exception:
240            result = False, traceback.format_exc()
241            from meerschaum.utils.formatting import get_console
242            if not silent:
243                get_console().print_exception()
244
245        ### If the chunk fails to process, try it again one more time.
246        if isinstance(result, tuple) and result[0] is False:
247            if _retry_on_failure:
248                return _process_chunk(_chunk, _retry_on_failure=False)
249
250        return result
251
252    try:
253        stream_results = not as_iterator and chunk_hook is not None and chunksize is not None
254        with warnings.catch_warnings():
255            warnings.filterwarnings('ignore', 'case sensitivity issues')
256
257            read_sql_query_kwargs = {
258                'params': params,
259                'dtype': dtype,
260                'coerce_float': coerce_float,
261                'index_col': index_col,
262            }
263            if is_dask:
264                if index_col is None:
265                    dd = None
266                    pd = attempt_import('pandas')
267                    read_sql_query_kwargs.update({
268                        'chunksize': chunksize,
269                    })
270            else:
271                read_sql_query_kwargs.update({
272                    'chunksize': chunksize,
273                })
274
275            if is_dask and dd is not None:
276                ddf = dd.read_sql_query(
277                    formatted_query,
278                    self.URI,
279                    **read_sql_query_kwargs
280                )
281            else:
282
283                def get_chunk_generator(connectable):
284                    chunk_generator = pd.read_sql_query(
285                        formatted_query,
286                        self.engine,
287                        **read_sql_query_kwargs
288                    )
289                    to_return = (
290                        chunk_generator
291                        if as_iterator or chunksize is None
292                        else (
293                            list(pool.imap(_process_chunk, chunk_generator))
294                            if as_hook_results
295                            else None
296                        )
297                    )
298                    return chunk_generator, to_return
299
300                if self.flavor in SKIP_READ_TRANSACTION_FLAVORS:
301                    chunk_generator, to_return = get_chunk_generator(self.engine)
302                else:
303                    with self.engine.begin() as transaction:
304                        with transaction.execution_options(stream_results=stream_results) as connection:
305                            chunk_generator, to_return = get_chunk_generator(connection)
306
307                if to_return is not None:
308                    return to_return
309
310    except Exception as e:
311        if debug:
312            dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n")
313        if not silent:
314            warn(str(e), stacklevel=3)
315        from meerschaum.utils.formatting import get_console
316        if not silent:
317            get_console().print_exception()
318
319        return None
320
321    if is_dask and dd is not None:
322        ddf = ddf.reset_index()
323        return ddf
324
325    chunk_list = []
326    read_chunks = 0
327    chunk_hook_results = []
328    if chunksize is None:
329        chunk_list.append(chunk_generator)
330    elif as_iterator:
331        return chunk_generator
332    else:
333        try:
334            for chunk in chunk_generator:
335                if chunk_hook is not None:
336                    chunk_hook_results.append(
337                        chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
338                    )
339                chunk_list.append(chunk)
340                read_chunks += 1
341                if chunks is not None and read_chunks >= chunks:
342                    break
343        except Exception as e:
344            warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
345            from meerschaum.utils.formatting import get_console
346            if not silent:
347                get_console().print_exception()
348
349    read_chunks = 0
350    try:
351        for chunk in chunk_generator:
352            if chunk_hook is not None:
353                chunk_hook_results.append(
354                    chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
355                )
356            chunk_list.append(chunk)
357            read_chunks += 1
358            if chunks is not None and read_chunks >= chunks:
359                break
360    except Exception as e:
361        warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
362        from meerschaum.utils.formatting import get_console
363        if not silent:
364            get_console().print_exception()
365
366        return None
367
368    ### If no chunks returned, read without chunks
369    ### to get columns
370    if len(chunk_list) == 0:
371        with warnings.catch_warnings():
372            warnings.filterwarnings('ignore', 'case sensitivity issues')
373            _ = read_sql_query_kwargs.pop('chunksize', None)
374            with self.engine.begin() as connection:
375                chunk_list.append(
376                    pd.read_sql_query(
377                        formatted_query,
378                        connection,
379                        **read_sql_query_kwargs
380                    )
381                )
382
383    ### call the hook on any missed chunks.
384    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
385        for c in chunk_list[len(chunk_hook_results):]:
386            chunk_hook_results.append(
387                chunk_hook(c, chunksize=chunksize, debug=debug, **kw)
388            )
389
390    ### chunksize is not None so must iterate
391    if debug:
392        end = time.perf_counter()
393        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")
394
395    if as_hook_results:
396        return chunk_hook_results
397    
398    ### Skip `pd.concat()` if `as_chunks` is specified.
399    if as_chunks:
400        for c in chunk_list:
401            c.reset_index(drop=True, inplace=True)
402            for col in get_numeric_cols(c):
403                c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
404        return chunk_list
405
406    df = pd.concat(chunk_list).reset_index(drop=True)
407    ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes.
408    for col in get_numeric_cols(df):
409        df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
410
411    return df

Read a SQL query or table into a pandas dataframe.

Parameters
  • query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
  • params (Optional[Dict[str, Any]], default None): List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
  • dtype (Optional[Dict[str, Any]], default None): A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
  • chunksize (Optional[int], default -1): How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

    NOTE: DuckDB does not allow for chunking.

  • workers (Optional[int], default None): How many threads to use when consuming the generator. Only applies if chunk_hook is provided.
  • chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None): Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
  • as_hook_results (bool, default False): If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

    NOTE: as_iterator MUST be False (default).

  • chunks (Optional[int], default None): Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
  • schema (Optional[str], default None): If just a table name is provided, optionally specify the table schema. Defaults to SQLConnector.schema.
  • as_chunks (bool, default False): If True, return a list of DataFrames. Otherwise return a single DataFrame.
  • as_iterator (bool, default False): If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case.
  • index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
  • silent (bool, default False): If True, don't raise warnings in case of errors. Defaults to False.
Returns
  • A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators,
  • or None if something breaks.
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) -> Any:
414def value(
415    self,
416    query: str,
417    *args: Any,
418    use_pandas: bool = False,
419    **kw: Any
420) -> Any:
421    """
422    Execute the provided query and return the first value.
423
424    Parameters
425    ----------
426    query: str
427        The SQL query to execute.
428        
429    *args: Any
430        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
431        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
432        
433    use_pandas: bool, default False
434        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
435        `meerschaum.connectors.sql.SQLConnector.exec` (default).
436        **NOTE:** This is always `True` for DuckDB.
437
438    **kw: Any
439        See `args`.
440
441    Returns
442    -------
443    Any value returned from the query.
444
445    """
446    from meerschaum.utils.packages import attempt_import
447    if self.flavor == 'duckdb':
448        use_pandas = True
449    if use_pandas:
450        try:
451            return self.read(query, *args, **kw).iloc[0, 0]
452        except Exception:
453            return None
454
455    _close = kw.get('close', True)
456    _commit = kw.get('commit', (self.flavor != 'mssql'))
457
458    try:
459        result, connection = self.exec(
460            query,
461            *args,
462            with_connection=True,
463            close=False,
464            commit=_commit,
465            **kw
466        )
467        first = result.first() if result is not None else None
468        _val = first[0] if first is not None else None
469    except Exception as e:
470        warn(e, stacklevel=3)
471        return None
472    if _close:
473        try:
474            connection.close()
475        except Exception as e:
476            warn("Failed to close connection with exception:\n" + str(e))
477    return _val

Execute the provided query and return the first value.

Parameters
Returns
  • Any value returned from the query.
def exec( self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, _connection=None, _transaction=None, **kw: Any) -> 'Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]':
491def exec(
492    self,
493    query: str,
494    *args: Any,
495    silent: bool = False,
496    debug: bool = False,
497    commit: Optional[bool] = None,
498    close: Optional[bool] = None,
499    with_connection: bool = False,
500    _connection=None,
501    _transaction=None,
502    **kw: Any
503) -> Union[
504        sqlalchemy.engine.result.resultProxy,
505        sqlalchemy.engine.cursor.LegacyCursorResult,
506        Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
507        Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
508        None
509]:
510    """
511    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
512
513    If inserting data, please use bind variables to avoid SQL injection!
514
515    Parameters
516    ----------
517    query: Union[str, List[str], Tuple[str]]
518        The query to execute.
519        If `query` is a list or tuple, call `self.exec_queries()` instead.
520
521    args: Any
522        Arguments passed to `sqlalchemy.engine.execute`.
523
524    silent: bool, default False
525        If `True`, suppress warnings.
526
527    commit: Optional[bool], default None
528        If `True`, commit the changes after execution.
529        Causes issues with flavors like `'mssql'`.
530        This does not apply if `query` is a list of strings.
531
532    close: Optional[bool], default None
533        If `True`, close the connection after execution.
534        Causes issues with flavors like `'mssql'`.
535        This does not apply if `query` is a list of strings.
536
537    with_connection: bool, default False
538        If `True`, return a tuple including the connection object.
539        This does not apply if `query` is a list of strings.
540
541    Returns
542    -------
543    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.
544
545    """
546    if isinstance(query, (list, tuple)):
547        return self.exec_queries(
548            list(query),
549            *args,
550            silent=silent,
551            debug=debug,
552            **kw
553        )
554
555    from meerschaum.utils.packages import attempt_import
556    sqlalchemy = attempt_import("sqlalchemy", lazy=False)
557    if debug:
558        dprint(f"[{self}] Executing query:\n{query}")
559
560    _close = close if close is not None else (self.flavor != 'mssql')
561    _commit = commit if commit is not None else (
562        (self.flavor != 'mssql' or 'select' not in str(query).lower())
563    )
564
565    ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+).
566    if not hasattr(query, 'compile'):
567        query = sqlalchemy.text(query)
568
569    connection = _connection if _connection is not None else self.get_connection()
570
571    try:
572        transaction = (
573            _transaction
574            if _transaction is not None else (
575                connection.begin()
576                if _commit
577                else None
578            )
579        )
580    except sqlalchemy.exc.InvalidRequestError as e:
581        if _connection is not None or _transaction is not None:
582            raise e
583        connection = self.get_connection(rebuild=True)
584        transaction = connection.begin()
585
586    if transaction is not None and not transaction.is_active and _transaction is not None:
587        connection = self.get_connection(rebuild=True)
588        transaction = connection.begin() if _commit else None
589
590    result = None
591    try:
592        result = connection.execute(query, *args, **kw)
593        if _commit:
594            transaction.commit()
595    except Exception as e:
596        if debug:
597            dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}")
598        if not silent:
599            warn(str(e), stacklevel=3)
600        result = None
601        if _commit:
602            transaction.rollback()
603            connection = self.get_connection(rebuild=True)
604    finally:
605        if _close:
606            connection.close()
607
608    if with_connection:
609        return result, connection
610
611    return result

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters
  • query (Union[str, List[str], Tuple[str]]): The query to execute. If query is a list or tuple, call self.exec_queries() instead.
  • args (Any): Arguments passed to sqlalchemy.engine.execute.
  • silent (bool, default False): If True, suppress warnings.
  • commit (Optional[bool], default None): If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • close (Optional[bool], default None): If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • with_connection (bool, default False): If True, return a tuple including the connection object. This does not apply if query is a list of strings.
Returns
  • The sqlalchemy result object, or a tuple with the connection if with_connection is provided.
def execute( self, *args: Any, **kw: Any) -> 'Optional[sqlalchemy.engine.result.resultProxy]':
480def execute(
481    self,
482    *args : Any,
483    **kw : Any
484) -> Optional[sqlalchemy.engine.result.resultProxy]:
485    """
486    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
487    """
488    return self.exec(*args, **kw)
def to_sql( self, df: pandas.core.frame.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, schema: Optional[str] = None, safe_copy: bool = True, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, _connection=None, _transaction=None, **kw) -> Union[bool, Tuple[bool, str]]:
 709def to_sql(
 710    self,
 711    df: pandas.DataFrame,
 712    name: str = None,
 713    index: bool = False,
 714    if_exists: str = 'replace',
 715    method: str = "",
 716    chunksize: Optional[int] = -1,
 717    schema: Optional[str] = None,
 718    safe_copy: bool = True,
 719    silent: bool = False,
 720    debug: bool = False,
 721    as_tuple: bool = False,
 722    as_dict: bool = False,
 723    _connection=None,
 724    _transaction=None,
 725    **kw
 726) -> Union[bool, SuccessTuple]:
 727    """
 728    Upload a DataFrame's contents to the SQL server.
 729
 730    Parameters
 731    ----------
 732    df: pd.DataFrame
 733        The DataFrame to be inserted.
 734
 735    name: str
 736        The name of the table to be created.
 737
 738    index: bool, default False
 739        If True, creates the DataFrame's indices as columns.
 740
 741    if_exists: str, default 'replace'
 742        Drop and create the table ('replace') or append if it exists
 743        ('append') or raise Exception ('fail').
 744        Options are ['replace', 'append', 'fail'].
 745
 746    method: str, default ''
 747        None or multi. Details on pandas.to_sql.
 748
 749    chunksize: Optional[int], default -1
 750        How many rows to insert at a time.
 751
 752    schema: Optional[str], default None
 753        Optionally override the schema for the table.
 754        Defaults to `SQLConnector.schema`.
 755
 756    safe_copy: bool, defaul True
 757        If `True`, copy the dataframe before making any changes.
 758
 759    as_tuple: bool, default False
 760        If `True`, return a (success_bool, message) tuple instead of a `bool`.
 761        Defaults to `False`.
 762
 763    as_dict: bool, default False
 764        If `True`, return a dictionary of transaction information.
 765        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
 766        `method`, and `target`.
 767
 768    kw: Any
 769        Additional arguments will be passed to the DataFrame's `to_sql` function
 770
 771    Returns
 772    -------
 773    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
 774    """
 775    import time
 776    import json
 777    from datetime import timedelta
 778    from meerschaum.utils.warnings import error, warn
 779    import warnings
 780    import functools
 781
 782    if name is None:
 783        error(f"Name must not be `None` to insert data into {self}.")
 784
 785    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
 786    kw.pop('name', None)
 787
 788    schema = schema or self.schema
 789
 790    from meerschaum.utils.sql import (
 791        sql_item_name,
 792        table_exists,
 793        json_flavors,
 794        truncate_item_name,
 795        DROP_IF_EXISTS_FLAVORS,
 796    )
 797    from meerschaum.utils.dataframe import (
 798        get_json_cols,
 799        get_numeric_cols,
 800        get_uuid_cols,
 801        get_bytes_cols,
 802        get_geometry_cols,
 803    )
 804    from meerschaum.utils.dtypes import (
 805        are_dtypes_equal,
 806        coerce_timezone,
 807        encode_bytes_for_bytea,
 808        serialize_bytes,
 809        serialize_decimal,
 810        serialize_geometry,
 811        json_serialize_value,
 812        get_geometry_type_srid,
 813    )
 814    from meerschaum.utils.dtypes.sql import (
 815        PD_TO_SQLALCHEMY_DTYPES_FLAVORS,
 816        get_db_type_from_pd_type,
 817        get_pd_type_from_db_type,
 818        get_numeric_precision_scale,
 819    )
 820    from meerschaum.utils.misc import interval_str
 821    from meerschaum.connectors.sql._create_engine import flavor_configs
 822    from meerschaum.utils.packages import attempt_import, import_pandas
 823    sqlalchemy = attempt_import('sqlalchemy', debug=debug, lazy=False)
 824    pd = import_pandas()
 825    is_dask = 'dask' in df.__module__
 826
 827    bytes_cols = get_bytes_cols(df)
 828    numeric_cols = get_numeric_cols(df)
 829    geometry_cols = get_geometry_cols(df)
 830    ### NOTE: This excludes non-numeric serialized Decimals (e.g. SQLite).
 831    numeric_cols_dtypes = {
 832        col: typ
 833        for col, typ in kw.get('dtype', {}).items()
 834        if (
 835            col in df.columns
 836            and 'numeric' in str(typ).lower()
 837        )
 838    }
 839    numeric_cols.extend([col for col in numeric_cols_dtypes if col not in numeric_cols])
 840    numeric_cols_precisions_scales = {
 841        col: (
 842            (typ.precision, typ.scale)
 843            if hasattr(typ, 'precision')
 844            else get_numeric_precision_scale(self.flavor)
 845        )
 846        for col, typ in numeric_cols_dtypes.items()
 847    }
 848    geometry_cols_dtypes = {
 849        col: typ
 850        for col, typ in kw.get('dtype', {}).items()
 851        if (
 852            col in df.columns
 853            and 'geometry' in str(typ).lower() or 'geography' in str(typ).lower()
 854        )
 855    }
 856    geometry_cols.extend([col for col in geometry_cols_dtypes if col not in geometry_cols])
 857    geometry_cols_types_srids = {
 858        col: (typ.geometry_type, typ.srid)
 859        if hasattr(typ, 'srid')
 860        else get_geometry_type_srid()
 861        for col, typ in geometry_cols_dtypes.items()
 862    }
 863
 864    cols_pd_types = {
 865        col: get_pd_type_from_db_type(str(typ))
 866        for col, typ in kw.get('dtype', {}).items()
 867    }
 868    cols_pd_types.update({
 869        col: f'numeric[{precision},{scale}]'
 870        for col, (precision, scale) in numeric_cols_precisions_scales.items()
 871        if precision and scale
 872    })
 873    cols_db_types = {
 874        col: get_db_type_from_pd_type(typ, flavor=self.flavor)
 875        for col, typ in cols_pd_types.items()
 876    }
 877
 878    enable_bulk_insert = mrsm.get_config(
 879        'system', 'connectors', 'sql', 'bulk_insert', self.flavor,
 880        warn=False,
 881    ) or False
 882    stats = {'target': name}
 883    ### resort to defaults if None
 884    copied = False
 885    use_bulk_insert = False
 886    if method == "":
 887        if enable_bulk_insert:
 888            method = (
 889                functools.partial(mssql_insert_json, cols_types=cols_db_types, debug=debug)
 890                if self.flavor == 'mssql'
 891                else functools.partial(psql_insert_copy, debug=debug)
 892            )
 893            use_bulk_insert = True
 894        else:
 895            ### Should resolve to 'multi' or `None`.
 896            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
 897
 898    if bytes_cols and (use_bulk_insert or self.flavor == 'oracle'):
 899        if safe_copy and not copied:
 900            df = df.copy()
 901            copied = True
 902        bytes_serializer = (
 903            functools.partial(encode_bytes_for_bytea, with_prefix=(self.flavor != 'oracle'))
 904            if self.flavor != 'mssql'
 905            else serialize_bytes
 906        )
 907        for col in bytes_cols:
 908            df[col] = df[col].apply(bytes_serializer)
 909
 910    ### Check for numeric columns.
 911    for col in numeric_cols:
 912        precision, scale = numeric_cols_precisions_scales.get(
 913            col,
 914            get_numeric_precision_scale(self.flavor)
 915        )
 916        df[col] = df[col].apply(
 917            functools.partial(
 918                serialize_decimal,
 919                quantize=True,
 920                precision=precision,
 921                scale=scale,
 922            )
 923        )
 924
 925    for col in geometry_cols:
 926        geometry_type, srid = geometry_cols_types_srids.get(col, get_geometry_type_srid())
 927        with warnings.catch_warnings():
 928            warnings.simplefilter("ignore")
 929            df[col] = df[col].apply(
 930                functools.partial(
 931                    serialize_geometry,
 932                    as_wkt=(self.flavor == 'mssql')
 933                )
 934            )
 935
 936    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)
 937
 938    default_chunksize = self._sys_config.get('chunksize', None)
 939    chunksize = chunksize if chunksize != -1 else default_chunksize
 940    if chunksize is not None and self.flavor in _max_chunks_flavors:
 941        if chunksize > _max_chunks_flavors[self.flavor]:
 942            if chunksize != default_chunksize:
 943                warn(
 944                    f"The specified chunksize of {chunksize} exceeds the maximum of "
 945                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
 946                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
 947                    stacklevel = 3,
 948                )
 949            chunksize = _max_chunks_flavors[self.flavor]
 950    stats['chunksize'] = chunksize
 951
 952    success, msg = False, "Default to_sql message"
 953    start = time.perf_counter()
 954    if debug:
 955        msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..."
 956        print(msg, end="", flush=True)
 957    stats['num_rows'] = len(df)
 958
 959    ### Check if the name is too long.
 960    truncated_name = truncate_item_name(name, self.flavor)
 961    if name != truncated_name:
 962        warn(
 963            f"Table '{name}' is too long for '{self.flavor}',"
 964            f" will instead create the table '{truncated_name}'."
 965        )
 966
 967    ### filter out non-pandas args
 968    import inspect
 969    to_sql_params = inspect.signature(df.to_sql).parameters
 970    to_sql_kw = {}
 971    for k, v in kw.items():
 972        if k in to_sql_params:
 973            to_sql_kw[k] = v
 974
 975    to_sql_kw.update({
 976        'name': truncated_name,
 977        'schema': schema,
 978        ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI),
 979        'index': index,
 980        'if_exists': if_exists,
 981        'method': method,
 982        'chunksize': chunksize,
 983    })
 984    if is_dask:
 985        to_sql_kw.update({
 986            'parallel': True,
 987        })
 988    elif _connection is not None:
 989        to_sql_kw['con'] = _connection
 990
 991    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
 992    if self.flavor == 'oracle':
 993        ### For some reason 'replace' doesn't work properly in pandas,
 994        ### so try dropping first.
 995        if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug):
 996            success = self.exec(
 997                f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema)
 998            ) is not None
 999            if not success:
1000                warn(f"Unable to drop {name}")
1001
1002        ### Enforce NVARCHAR(2000) as text instead of CLOB.
1003        dtype = to_sql_kw.get('dtype', {})
1004        for col, typ in df.dtypes.items():
1005            if are_dtypes_equal(str(typ), 'object'):
1006                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
1007            elif are_dtypes_equal(str(typ), 'int'):
1008                dtype[col] = sqlalchemy.types.INTEGER
1009        to_sql_kw['dtype'] = dtype
1010    elif self.flavor == 'duckdb':
1011        dtype = to_sql_kw.get('dtype', {})
1012        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
1013        for col in dt_cols:
1014            df[col] = coerce_timezone(df[col], strip_utc=False)
1015    elif self.flavor == 'mssql':
1016        dtype = to_sql_kw.get('dtype', {})
1017        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
1018        new_dtype = {}
1019        for col in dt_cols:
1020            if col in dtype:
1021                continue
1022            dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True)
1023            if col not in dtype:
1024                new_dtype[col] = dt_typ
1025
1026        dtype.update(new_dtype)
1027        to_sql_kw['dtype'] = dtype
1028
1029    ### Check for JSON columns.
1030    if self.flavor not in json_flavors:
1031        json_cols = get_json_cols(df)
1032        for col in json_cols:
1033            df[col] = df[col].apply(
1034                (
1035                    lambda x: json.dumps(x, default=json_serialize_value, sort_keys=True)
1036                    if not isinstance(x, Hashable)
1037                    else x
1038                )
1039            )
1040
1041    if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid':
1042        uuid_cols = get_uuid_cols(df)
1043        for col in uuid_cols:
1044            df[col] = df[col].astype(str)
1045
1046    try:
1047        with warnings.catch_warnings():
1048            warnings.filterwarnings('ignore')
1049            df.to_sql(**to_sql_kw)
1050        success = True
1051    except Exception as e:
1052        if not silent:
1053            warn(str(e))
1054        success, msg = False, str(e)
1055
1056    end = time.perf_counter()
1057    if success:
1058        num_rows = len(df)
1059        msg = (
1060            f"It took {interval_str(timedelta(seconds=(end - start)))} "
1061            + f"to sync {num_rows:,} row"
1062            + ('s' if num_rows != 1 else '')
1063            + f" to {name}."
1064        )
1065    stats['start'] = start
1066    stats['end'] = end
1067    stats['duration'] = end - start
1068
1069    if debug:
1070        print(" done.", flush=True)
1071        dprint(msg)
1072
1073    stats['success'] = success
1074    stats['msg'] = msg
1075    if as_tuple:
1076        return success, msg
1077    if as_dict:
1078        return stats
1079    return success

Upload a DataFrame's contents to the SQL server.

Parameters
  • df (pd.DataFrame): The DataFrame to be inserted.
  • name (str): The name of the table to be created.
  • index (bool, default False): If True, creates the DataFrame's indices as columns.
  • if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
  • method (str, default ''): None or multi. Details on pandas.to_sql.
  • chunksize (Optional[int], default -1): How many rows to insert at a time.
  • schema (Optional[str], default None): Optionally override the schema for the table. Defaults to SQLConnector.schema.
  • safe_copy (bool, defaul True): If True, copy the dataframe before making any changes.
  • as_tuple (bool, default False): If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
  • as_dict (bool, default False): If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
  • kw (Any): Additional arguments will be passed to the DataFrame's to_sql function
Returns
  • Either a bool or a SuccessTuple (depends on as_tuple).
def exec_queries( self, queries: "List[Union[str, Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]]]", break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False) -> 'List[Union[sqlalchemy.engine.cursor.CursorResult, None]]':
614def exec_queries(
615    self,
616    queries: List[
617        Union[
618            str,
619            Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]
620        ]
621    ],
622    break_on_error: bool = False,
623    rollback: bool = True,
624    silent: bool = False,
625    debug: bool = False,
626) -> List[Union[sqlalchemy.engine.cursor.CursorResult, None]]:
627    """
628    Execute a list of queries in a single transaction.
629
630    Parameters
631    ----------
632    queries: List[
633        Union[
634            str,
635            Tuple[str, Callable[[], List[str]]]
636        ]
637    ]
638        The queries in the transaction to be executed.
639        If a query is a tuple, the second item of the tuple
640        will be considered a callable hook that returns a list of queries to be executed
641        before the next item in the list.
642
643    break_on_error: bool, default False
644        If `True`, stop executing when a query fails.
645
646    rollback: bool, default True
647        If `break_on_error` is `True`, rollback the transaction if a query fails.
648
649    silent: bool, default False
650        If `True`, suppress warnings.
651
652    Returns
653    -------
654    A list of SQLAlchemy results.
655    """
656    from meerschaum.utils.warnings import warn
657    from meerschaum.utils.debug import dprint
658    from meerschaum.utils.packages import attempt_import
659    sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm', lazy=False)
660    session = sqlalchemy_orm.Session(self.engine)
661
662    result = None
663    results = []
664    with session.begin():
665        for query in queries:
666            hook = None
667            result = None
668
669            if isinstance(query, tuple):
670                query, hook = query
671            if isinstance(query, str):
672                query = sqlalchemy.text(query)
673
674            if debug:
675                dprint(f"[{self}]\n" + str(query))
676
677            try:
678                result = session.execute(query)
679                session.flush()
680            except Exception as e:
681                msg = (f"Encountered error while executing:\n{e}")
682                if not silent:
683                    warn(msg)
684                elif debug:
685                    dprint(f"[{self}]\n" + str(msg))
686                result = None
687            if result is None and break_on_error:
688                if rollback:
689                    session.rollback()
690                results.append(result)
691                break
692            elif result is not None and hook is not None:
693                hook_queries = hook(session)
694                if hook_queries:
695                    hook_results = self.exec_queries(
696                        hook_queries,
697                        break_on_error = break_on_error,
698                        rollback=rollback,
699                        silent=silent,
700                        debug=debug,
701                    )
702                    result = (result, hook_results)
703
704            results.append(result)
705
706    return results

Execute a list of queries in a single transaction.

Parameters
  • queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
  • ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
  • break_on_error (bool, default False): If True, stop executing when a query fails.
  • rollback (bool, default True): If break_on_error is True, rollback the transaction if a query fails.
  • silent (bool, default False): If True, suppress warnings.
Returns
  • A list of SQLAlchemy results.
def get_connection(self, rebuild: bool = False) -> "'sqlalchemy.engine.base.Connection'":
1262def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection':
1263    """
1264    Return the current alive connection.
1265
1266    Parameters
1267    ----------
1268    rebuild: bool, default False
1269        If `True`, close the previous connection and open a new one.
1270
1271    Returns
1272    -------
1273    A `sqlalchemy.engine.base.Connection` object.
1274    """
1275    import threading
1276    if '_thread_connections' not in self.__dict__:
1277        self.__dict__['_thread_connections'] = {}
1278
1279    self._cleanup_connections()
1280
1281    thread_id = threading.get_ident()
1282
1283    thread_connections = self.__dict__.get('_thread_connections', {})
1284    connection = thread_connections.get(thread_id, None)
1285
1286    if rebuild and connection is not None:
1287        try:
1288            connection.close()
1289        except Exception:
1290            pass
1291
1292        _ = thread_connections.pop(thread_id, None)
1293        connection = None
1294
1295    if connection is None or connection.closed:
1296        connection = self.engine.connect()
1297        thread_connections[thread_id] = connection
1298
1299    return connection

Return the current alive connection.

Parameters
  • rebuild (bool, default False): If True, close the previous connection and open a new one.
Returns
  • A sqlalchemy.engine.base.Connection object.
def test_connection(self, **kw: Any) -> Optional[bool]:
717def test_connection(
718    self,
719    **kw: Any
720) -> Union[bool, None]:
721    """
722    Test if a successful connection to the database may be made.
723
724    Parameters
725    ----------
726    **kw:
727        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
728
729    Returns
730    -------
731    `True` if a connection is made, otherwise `False` or `None` in case of failure.
732
733    """
734    import warnings
735    from meerschaum.connectors.poll import retry_connect
736    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
737    _default_kw.update(kw)
738    with warnings.catch_warnings():
739        warnings.filterwarnings('ignore', 'Could not')
740        try:
741            return retry_connect(**_default_kw)
742        except Exception:
743            return False

Test if a successful connection to the database may be made.

Parameters
Returns
  • True if a connection is made, otherwise False or None in case of failure.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', List[Any], None]":
18def fetch(
19    self,
20    pipe: mrsm.Pipe,
21    begin: Union[datetime, int, str, None] = '',
22    end: Union[datetime, int, str, None] = None,
23    check_existing: bool = True,
24    chunksize: Optional[int] = -1,
25    workers: Optional[int] = None,
26    debug: bool = False,
27    **kw: Any
28) -> Union['pd.DataFrame', List[Any], None]:
29    """Execute the SQL definition and return a Pandas DataFrame.
30
31    Parameters
32    ----------
33    pipe: mrsm.Pipe
34        The pipe object which contains the `fetch` metadata.
35
36        - pipe.columns['datetime']: str
37            - Name of the datetime column for the remote table.
38        - pipe.parameters['fetch']: Dict[str, Any]
39            - Parameters necessary to execute a query.
40        - pipe.parameters['fetch']['definition']: str
41            - Raw SQL query to execute to generate the pandas DataFrame.
42        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
43            - How many minutes before `begin` to search for data (*optional*).
44
45    begin: Union[datetime, int, str, None], default None
46        Most recent datatime to search for data.
47        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
48
49    end: Union[datetime, int, str, None], default None
50        The latest datetime to search for data.
51        If `end` is `None`, do not bound 
52
53    check_existing: bool, defult True
54        If `False`, use a backtrack interval of 0 minutes.
55
56    chunksize: Optional[int], default -1
57        How many rows to load into memory at once.
58        Otherwise the entire result set is loaded into memory.
59
60    workers: Optional[int], default None
61        How many threads to use when consuming the generator.
62        Defaults to the number of cores.
63
64    debug: bool, default False
65        Verbosity toggle.
66
67    Returns
68    -------
69    A pandas DataFrame generator.
70    """
71    meta_def = self.get_pipe_metadef(
72        pipe,
73        begin=begin,
74        end=end,
75        check_existing=check_existing,
76        debug=debug,
77        **kw
78    )
79    chunks = self.read(
80        meta_def,
81        chunksize=chunksize,
82        workers=workers,
83        as_iterator=True,
84        debug=debug,
85    )
86    return chunks

Execute the SQL definition and return a Pandas DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe object which contains the fetch metadata.

    • pipe.columns['datetime']: str
      • Name of the datetime column for the remote table.
    • pipe.parameters['fetch']: Dict[str, Any]
      • Parameters necessary to execute a query.
    • pipe.parameters['fetch']['definition']: str
      • Raw SQL query to execute to generate the pandas DataFrame.
    • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
      • How many minutes before begin to search for data (optional).
  • begin (Union[datetime, int, str, None], default None): Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
  • end (Union[datetime, int, str, None], default None): The latest datetime to search for data. If end is None, do not bound
  • check_existing (bool, defult True): If False, use a backtrack interval of 0 minutes.
  • chunksize (Optional[int], default -1): How many rows to load into memory at once. Otherwise the entire result set is loaded into memory.
  • workers (Optional[int], default None): How many threads to use when consuming the generator. Defaults to the number of cores.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas DataFrame generator.
def get_pipe_metadef( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, debug: bool = False, **kw: Any) -> Optional[str]:
 89def get_pipe_metadef(
 90    self,
 91    pipe: mrsm.Pipe,
 92    params: Optional[Dict[str, Any]] = None,
 93    begin: Union[datetime, int, str, None] = '',
 94    end: Union[datetime, int, str, None] = None,
 95    check_existing: bool = True,
 96    debug: bool = False,
 97    **kw: Any
 98) -> Union[str, None]:
 99    """
100    Return a pipe's meta definition fetch query.
101
102    params: Optional[Dict[str, Any]], default None
103        Optional params dictionary to build the `WHERE` clause.
104        See `meerschaum.utils.sql.build_where`.
105
106    begin: Union[datetime, int, str, None], default None
107        Most recent datatime to search for data.
108        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
109
110    end: Union[datetime, int, str, None], default None
111        The latest datetime to search for data.
112        If `end` is `None`, do not bound 
113
114    check_existing: bool, default True
115        If `True`, apply the backtrack interval.
116
117    debug: bool, default False
118        Verbosity toggle.
119
120    Returns
121    -------
122    A pipe's meta definition fetch query string.
123    """
124    from meerschaum.utils.warnings import warn
125    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
126    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
127    from meerschaum.config import get_config
128
129    dt_col = pipe.columns.get('datetime', None)
130    if not dt_col:
131        dt_col = pipe.guess_datetime()
132        dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
133        is_guess = True
134    else:
135        dt_name = sql_item_name(dt_col, self.flavor, None)
136        is_guess = False
137    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
138    db_dt_typ = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
139
140    if begin not in (None, '') or end is not None:
141        if is_guess:
142            if dt_col is None:
143                warn(
144                    f"Unable to determine a datetime column for {pipe}."
145                    + "\n    Ignoring begin and end...",
146                    stack=False,
147                )
148                begin, end = '', None
149            else:
150                warn(
151                    f"A datetime wasn't specified for {pipe}.\n"
152                    + f"    Using column \"{dt_col}\" for datetime bounds...",
153                    stack=False
154                )
155
156    apply_backtrack = begin == '' and check_existing
157    backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug)
158    btm = (
159        int(backtrack_interval.total_seconds() / 60)
160        if isinstance(backtrack_interval, timedelta)
161        else backtrack_interval
162    )
163    begin = (
164        pipe.get_sync_time(debug=debug)
165        if begin == ''
166        else begin
167    )
168
169    if begin not in (None, '') and end is not None and begin >= end:
170        begin = None
171
172    if dt_name:
173        begin_da = (
174            dateadd_str(
175                flavor=self.flavor,
176                datepart='minute',
177                number=((-1 * btm) if apply_backtrack else 0),
178                begin=begin,
179                db_type=db_dt_typ,
180            )
181            if begin not in ('', None)
182            else None
183        )
184        end_da = (
185            dateadd_str(
186                flavor=self.flavor,
187                datepart='minute',
188                number=0,
189                begin=end,
190                db_type=db_dt_typ,
191            )
192            if end is not None
193            else None
194        )
195
196    definition_name = sql_item_name('definition', self.flavor, None)
197    meta_def = (
198        _simple_fetch_query(pipe, self.flavor) if (
199            (not (pipe.columns or {}).get('id', None))
200            or (not get_config('system', 'experimental', 'join_fetch'))
201        ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw)
202    )
203
204    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
205    if dt_name and (begin_da or end_da):
206        definition_dt_name = f"{definition_name}.{dt_name}"
207        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
208        has_where = True
209        if begin_da:
210            meta_def += f"\n    {definition_dt_name}\n    >=\n    {begin_da}\n"
211        if begin_da and end_da:
212            meta_def += "    AND"
213        if end_da:
214            meta_def += f"\n    {definition_dt_name}\n    <\n    {end_da}\n"
215
216    if params is not None:
217        params_where = build_where(params, self, with_where=False)
218        meta_def += "\n    " + ("AND" if has_where else "WHERE") + "    "
219        has_where = True
220        meta_def += params_where
221
222    return meta_def.rstrip()

Return a pipe's meta definition fetch query.

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.

begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime, int, str, None], default None The latest datetime to search for data. If end is None, do not bound

check_existing: bool, default True If True, apply the backtrack interval.

debug: bool, default False Verbosity toggle.

Returns
  • A pipe's meta definition fetch query string.
def cli(self, debug: bool = False) -> Tuple[bool, str]:
37def cli(
38    self,
39    debug: bool = False,
40) -> SuccessTuple:
41    """
42    Launch a subprocess for an interactive CLI.
43    """
44    from meerschaum.utils.warnings import dprint
45    from meerschaum.utils.venv import venv_exec
46    env = copy.deepcopy(dict(os.environ))
47    env_key = f"MRSM_SQL_{self.label.upper()}"
48    env_val = json.dumps(self.meta)
49    env[env_key] = env_val
50    cli_code = (
51        "import sys\n"
52        "import meerschaum as mrsm\n"
53        "import os\n"
54        f"conn = mrsm.get_connector('sql:{self.label}')\n"
55        "success, msg = conn._cli_exit()\n"
56        "mrsm.pprint((success, msg))\n"
57        "if not success:\n"
58        "    raise Exception(msg)"
59    )
60    if debug:
61        dprint(cli_code)
62    try:
63        _ = venv_exec(cli_code, venv=None, env=env, debug=debug, capture_output=False)
64    except Exception as e:
65        return False, f"[{self}] Failed to start CLI:\n{e}"
66    return True, "Success"

Launch a subprocess for an interactive CLI.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Optional[List[Tuple[str, str, Optional[str]]]]:
144def fetch_pipes_keys(
145    self,
146    connector_keys: Optional[List[str]] = None,
147    metric_keys: Optional[List[str]] = None,
148    location_keys: Optional[List[str]] = None,
149    tags: Optional[List[str]] = None,
150    params: Optional[Dict[str, Any]] = None,
151    debug: bool = False
152) -> Optional[List[Tuple[str, str, Optional[str]]]]:
153    """
154    Return a list of tuples corresponding to the parameters provided.
155
156    Parameters
157    ----------
158    connector_keys: Optional[List[str]], default None
159        List of connector_keys to search by.
160
161    metric_keys: Optional[List[str]], default None
162        List of metric_keys to search by.
163
164    location_keys: Optional[List[str]], default None
165        List of location_keys to search by.
166
167    params: Optional[Dict[str, Any]], default None
168        Dictionary of additional parameters to search by.
169        E.g. `--params pipe_id:1`
170
171    debug: bool, default False
172        Verbosity toggle.
173    """
174    from meerschaum.utils.debug import dprint
175    from meerschaum.utils.packages import attempt_import
176    from meerschaum.utils.misc import separate_negation_values
177    from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists
178    from meerschaum.config.static import STATIC_CONFIG
179    import json
180    from copy import deepcopy
181    sqlalchemy, sqlalchemy_sql_functions = attempt_import(
182        'sqlalchemy',
183        'sqlalchemy.sql.functions', lazy=False,
184    )
185    coalesce = sqlalchemy_sql_functions.coalesce
186
187    if connector_keys is None:
188        connector_keys = []
189    if metric_keys is None:
190        metric_keys = []
191    if location_keys is None:
192        location_keys = []
193    else:
194        location_keys = [
195            (
196                lk
197                if lk not in ('[None]', 'None', 'null')
198                else 'None'
199            )
200            for lk in location_keys
201        ]
202    if tags is None:
203        tags = []
204
205    if params is None:
206        params = {}
207
208    ### Add three primary keys to params dictionary
209    ###   (separated for convenience of arguments).
210    cols = {
211        'connector_keys': [str(ck) for ck in connector_keys],
212        'metric_key': [str(mk) for mk in metric_keys],
213        'location_key': [str(lk) for lk in location_keys],
214    }
215
216    ### Make deep copy so we don't mutate this somewhere else.
217    parameters = deepcopy(params)
218    for col, vals in cols.items():
219        if vals not in [[], ['*']]:
220            parameters[col] = vals
221
222    if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug):
223        return []
224
225    from meerschaum.connectors.sql.tables import get_tables
226    pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes']
227
228    _params = {}
229    for k, v in parameters.items():
230        _v = json.dumps(v) if isinstance(v, dict) else v
231        _params[k] = _v
232
233    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
234    ### Parse regular params.
235    ### If a param begins with '_', negate it instead.
236    _where = [
237        (
238            (coalesce(pipes_tbl.c[key], 'None') == val)
239            if not str(val).startswith(negation_prefix)
240            else (pipes_tbl.c[key] != key)
241        ) for key, val in _params.items()
242        if not isinstance(val, (list, tuple)) and key in pipes_tbl.c
243    ]
244    select_cols = (
245        [
246            pipes_tbl.c.connector_keys,
247            pipes_tbl.c.metric_key,
248            pipes_tbl.c.location_key,
249        ]
250    )
251
252    q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where))
253    for c, vals in cols.items():
254        if not isinstance(vals, (list, tuple)) or not vals or c not in pipes_tbl.c:
255            continue
256        _in_vals, _ex_vals = separate_negation_values(vals)
257        q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q
258        q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q
259
260    ### Finally, parse tags.
261    tag_groups = [tag.split(',') for tag in tags]
262    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
263
264    ors, nands = [], []
265    for _in_tags, _ex_tags in in_ex_tag_groups:
266        sub_ands = []
267        for nt in _in_tags:
268            sub_ands.append(
269                sqlalchemy.cast(
270                    pipes_tbl.c['parameters'],
271                    sqlalchemy.String,
272                ).like(f'%"tags":%"{nt}"%')
273            )
274        if sub_ands:
275            ors.append(sqlalchemy.and_(*sub_ands))
276
277        for xt in _ex_tags:
278            nands.append(
279                sqlalchemy.cast(
280                    pipes_tbl.c['parameters'],
281                    sqlalchemy.String,
282                ).not_like(f'%"tags":%"{xt}"%')
283            )
284
285    q = q.where(sqlalchemy.and_(*nands)) if nands else q
286    q = q.where(sqlalchemy.or_(*ors)) if ors else q
287    loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key'])
288    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
289        loc_asc = sqlalchemy.nullsfirst(loc_asc)
290    q = q.order_by(
291        sqlalchemy.asc(pipes_tbl.c['connector_keys']),
292        sqlalchemy.asc(pipes_tbl.c['metric_key']),
293        loc_asc,
294    )
295
296    ### execute the query and return a list of tuples
297    if debug:
298        dprint(q.compile(compile_kwargs={'literal_binds': True}))
299    try:
300        rows = (
301            self.execute(q).fetchall()
302            if self.flavor != 'duckdb'
303            else [
304                (row['connector_keys'], row['metric_key'], row['location_key'])
305                for row in self.read(q).to_dict(orient='records')
306            ]
307        )
308    except Exception as e:
309        error(str(e))
310
311    return [(row[0], row[1], row[2]) for row in rows]

Return a list of tuples corresponding to the parameters provided.

Parameters
  • connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
  • metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
  • location_keys (Optional[List[str]], default None): List of location_keys to search by.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. E.g. --params pipe_id:1
  • debug (bool, default False): Verbosity toggle.
def create_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
332def create_indices(
333    self,
334    pipe: mrsm.Pipe,
335    columns: Optional[List[str]] = None,
336    indices: Optional[List[str]] = None,
337    debug: bool = False
338) -> bool:
339    """
340    Create a pipe's indices.
341    """
342    from meerschaum.utils.debug import dprint
343    if debug:
344        dprint(f"Creating indices for {pipe}...")
345
346    if not pipe.indices:
347        warn(f"{pipe} has no index columns; skipping index creation.", stack=False)
348        return True
349
350    cols_to_include = set((columns or []) + (indices or [])) or None
351
352    _ = pipe.__dict__.pop('_columns_indices', None)
353    ix_queries = {
354        col: queries
355        for col, queries in self.get_create_index_queries(pipe, debug=debug).items()
356        if cols_to_include is None or col in cols_to_include
357    }
358    success = True
359    for col, queries in ix_queries.items():
360        ix_success = all(self.exec_queries(queries, debug=debug, silent=False))
361        success = success and ix_success
362        if not ix_success:
363            warn(f"Failed to create index on column: {col}")
364
365    return success

Create a pipe's indices.

def drop_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
386def drop_indices(
387    self,
388    pipe: mrsm.Pipe,
389    columns: Optional[List[str]] = None,
390    indices: Optional[List[str]] = None,
391    debug: bool = False
392) -> bool:
393    """
394    Drop a pipe's indices.
395    """
396    from meerschaum.utils.debug import dprint
397    if debug:
398        dprint(f"Dropping indices for {pipe}...")
399
400    if not pipe.indices:
401        warn(f"No indices to drop for {pipe}.", stack=False)
402        return False
403
404    cols_to_include = set((columns or []) + (indices or [])) or None
405
406    ix_queries = {
407        col: queries
408        for col, queries in self.get_drop_index_queries(pipe, debug=debug).items()
409        if cols_to_include is None or col in cols_to_include
410    }
411    success = True
412    for col, queries in ix_queries.items():
413        ix_success = all(self.exec_queries(queries, debug=debug, silent=(not debug)))
414        if not ix_success:
415            success = False
416            if debug:
417                dprint(f"Failed to drop index on column: {col}")
418    return success

Drop a pipe's indices.

def get_create_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
474def get_create_index_queries(
475    self,
476    pipe: mrsm.Pipe,
477    debug: bool = False,
478) -> Dict[str, List[str]]:
479    """
480    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.
481
482    Parameters
483    ----------
484    pipe: mrsm.Pipe
485        The pipe to which the queries will correspond.
486
487    Returns
488    -------
489    A dictionary of index names mapping to lists of queries.
490    """
491    ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly.
492    if self.flavor == 'duckdb':
493        return {}
494    from meerschaum.utils.sql import (
495        sql_item_name,
496        get_distinct_col_count,
497        UPDATE_QUERIES,
498        get_null_replacement,
499        get_create_table_queries,
500        get_rename_table_queries,
501        COALESCE_UNIQUE_INDEX_FLAVORS,
502    )
503    from meerschaum.utils.dtypes import are_dtypes_equal
504    from meerschaum.utils.dtypes.sql import (
505        get_db_type_from_pd_type,
506        get_pd_type_from_db_type,
507        AUTO_INCREMENT_COLUMN_FLAVORS,
508    )
509    from meerschaum.config import get_config
510    index_queries = {}
511
512    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES
513    static = pipe.parameters.get('static', False)
514    null_indices = pipe.parameters.get('null_indices', True)
515    index_names = pipe.get_indices()
516    unique_index_name_unquoted = index_names.get('unique', None) or f'IX_{pipe.target}_unique'
517    if upsert:
518        _ = index_names.pop('unique', None)
519    indices = pipe.indices
520    existing_cols_types = pipe.get_columns_types(debug=debug)
521    existing_cols_pd_types = {
522        col: get_pd_type_from_db_type(typ)
523        for col, typ in existing_cols_types.items()
524    }
525    existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug)
526    existing_ix_names = set()
527    existing_primary_keys = []
528    existing_clustered_primary_keys = []
529    for col, col_indices in existing_cols_indices.items():
530        for col_ix_doc in col_indices:
531            existing_ix_names.add(col_ix_doc.get('name', '').lower())
532            if col_ix_doc.get('type', None) == 'PRIMARY KEY':
533                existing_primary_keys.append(col.lower())
534                if col_ix_doc.get('clustered', True):
535                    existing_clustered_primary_keys.append(col.lower())
536
537    _datetime = pipe.get_columns('datetime', error=False)
538    _datetime_name = (
539        sql_item_name(_datetime, self.flavor, None)
540        if _datetime is not None else None
541    )
542    _datetime_index_name = (
543        sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None)
544        if index_names.get('datetime', None)
545        else None
546    )
547    _id = pipe.get_columns('id', error=False)
548    _id_name = (
549        sql_item_name(_id, self.flavor, None)
550        if _id is not None
551        else None
552    )
553    primary_key = pipe.columns.get('primary', None)
554    primary_key_name = (
555        sql_item_name(primary_key, flavor=self.flavor, schema=None)
556        if primary_key
557        else None
558    )
559    autoincrement = (
560        pipe.parameters.get('autoincrement', False)
561        or (
562            primary_key is not None
563            and primary_key not in existing_cols_pd_types
564        )
565    )
566    primary_key_db_type = (
567        get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int') or 'int', self.flavor)
568        if primary_key
569        else None
570    )
571    primary_key_constraint_name = (
572        sql_item_name(f'PK_{pipe.target}', self.flavor, None)
573        if primary_key is not None
574        else None
575    )
576    primary_key_clustered = "CLUSTERED" if _datetime is None else "NONCLUSTERED"
577    datetime_clustered = (
578        "CLUSTERED"
579        if not existing_clustered_primary_keys and _datetime is not None
580        else "NONCLUSTERED"
581    )
582    include_columns_str = "\n    ,".join(
583        [
584            sql_item_name(col, flavor=self.flavor) for col in existing_cols_types
585            if col != _datetime
586        ]
587    ).rstrip(',')
588    include_clause = (
589        (
590            f"\nINCLUDE (\n    {include_columns_str}\n)"
591        )
592        if datetime_clustered == 'NONCLUSTERED'
593        else ''
594    )
595
596    _id_index_name = (
597        sql_item_name(index_names['id'], self.flavor, None)
598        if index_names.get('id', None)
599        else None
600    )
601    _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
602    _create_space_partition = get_config('system', 'experimental', 'space')
603
604    ### create datetime index
605    dt_query = None
606    if _datetime is not None:
607        if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True):
608            _id_count = (
609                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
610                if (_id is not None and _create_space_partition) else None
611            )
612
613            chunk_interval = pipe.get_chunk_interval(debug=debug)
614            chunk_interval_minutes = (
615                chunk_interval
616                if isinstance(chunk_interval, int)
617                else int(chunk_interval.total_seconds() / 60)
618            )
619            chunk_time_interval = (
620                f"INTERVAL '{chunk_interval_minutes} MINUTES'"
621                if isinstance(chunk_interval, timedelta)
622                else f'{chunk_interval_minutes}'
623            )
624
625            dt_query = (
626                f"SELECT public.create_hypertable('{_pipe_name}', " +
627                f"'{_datetime}', "
628                + (
629                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
630                    else ''
631                )
632                + f'chunk_time_interval => {chunk_time_interval}, '
633                + 'if_not_exists => true, '
634                + "migrate_data => true);"
635            )
636        elif _datetime_index_name and _datetime != primary_key:
637            if self.flavor == 'mssql':
638                dt_query = (
639                    f"CREATE {datetime_clustered} INDEX {_datetime_index_name} "
640                    f"\nON {_pipe_name} ({_datetime_name}){include_clause}"
641                )
642            else:
643                dt_query = (
644                    f"CREATE INDEX {_datetime_index_name} "
645                    + f"ON {_pipe_name} ({_datetime_name})"
646                )
647
648    if dt_query:
649        index_queries[_datetime] = [dt_query]
650
651    primary_queries = []
652    if (
653        primary_key is not None
654        and primary_key.lower() not in existing_primary_keys
655        and not static
656    ):
657        if autoincrement and primary_key not in existing_cols_pd_types:
658            autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get(
659                self.flavor,
660                AUTO_INCREMENT_COLUMN_FLAVORS['default']
661            )
662            primary_queries.extend([
663                (
664                    f"ALTER TABLE {_pipe_name}\n"
665                    f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}"
666                ),
667            ])
668        elif not autoincrement and primary_key in existing_cols_pd_types:
669            if self.flavor == 'sqlite':
670                new_table_name = sql_item_name(
671                    f'_new_{pipe.target}',
672                    self.flavor,
673                    self.get_pipe_schema(pipe)
674                )
675                select_cols_str = ', '.join(
676                    [
677                        sql_item_name(col, self.flavor, None)
678                        for col in existing_cols_types
679                    ]
680                )
681                primary_queries.extend(
682                    get_create_table_queries(
683                        existing_cols_pd_types,
684                        f'_new_{pipe.target}',
685                        self.flavor,
686                        schema=self.get_pipe_schema(pipe),
687                        primary_key=primary_key,
688                    ) + [
689                        (
690                            f"INSERT INTO {new_table_name} ({select_cols_str})\n"
691                            f"SELECT {select_cols_str}\nFROM {_pipe_name}"
692                        ),
693                        f"DROP TABLE {_pipe_name}",
694                    ] + get_rename_table_queries(
695                        f'_new_{pipe.target}',
696                        pipe.target,
697                        self.flavor,
698                        schema=self.get_pipe_schema(pipe),
699                    )
700                )
701            elif self.flavor == 'oracle':
702                primary_queries.extend([
703                    (
704                        f"ALTER TABLE {_pipe_name}\n"
705                        f"MODIFY {primary_key_name} NOT NULL"
706                    ),
707                    (
708                        f"ALTER TABLE {_pipe_name}\n"
709                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
710                    )
711                ])
712            elif self.flavor in ('mysql', 'mariadb'):
713                primary_queries.extend([
714                    (
715                        f"ALTER TABLE {_pipe_name}\n"
716                        f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL"
717                    ),
718                    (
719                        f"ALTER TABLE {_pipe_name}\n"
720                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
721                    )
722                ])
723            elif self.flavor == 'timescaledb':
724                primary_queries.extend([
725                    (
726                        f"ALTER TABLE {_pipe_name}\n"
727                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
728                    ),
729                    (
730                        f"ALTER TABLE {_pipe_name}\n"
731                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + (
732                            f"{_datetime_name}, " if _datetime_name else ""
733                        ) + f"{primary_key_name})"
734                    ),
735                ])
736            elif self.flavor in ('citus', 'postgresql', 'duckdb', 'postgis'):
737                primary_queries.extend([
738                    (
739                        f"ALTER TABLE {_pipe_name}\n"
740                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
741                    ),
742                    (
743                        f"ALTER TABLE {_pipe_name}\n"
744                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
745                    ),
746                ])
747            else:
748                primary_queries.extend([
749                    (
750                        f"ALTER TABLE {_pipe_name}\n"
751                        f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL"
752                    ),
753                    (
754                        f"ALTER TABLE {_pipe_name}\n"
755                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})"
756                    ),
757                ])
758        index_queries[primary_key] = primary_queries
759
760    ### create id index
761    if _id_name is not None:
762        if self.flavor == 'timescaledb':
763            ### Already created indices via create_hypertable.
764            id_query = (
765                None if (_id is not None and _create_space_partition)
766                else (
767                    f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})"
768                    if _id is not None
769                    else None
770                )
771            )
772            pass
773        else: ### mssql, sqlite, etc.
774            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"
775
776        if id_query is not None:
777            index_queries[_id] = id_query if isinstance(id_query, list) else [id_query]
778
779    ### Create indices for other labels in `pipe.columns`.
780    other_index_names = {
781        ix_key: ix_unquoted
782        for ix_key, ix_unquoted in index_names.items()
783        if (
784            ix_key not in ('datetime', 'id', 'primary')
785            and ix_unquoted.lower() not in existing_ix_names
786        )
787    }
788    for ix_key, ix_unquoted in other_index_names.items():
789        ix_name = sql_item_name(ix_unquoted, self.flavor, None)
790        cols = indices[ix_key]
791        if not isinstance(cols, (list, tuple)):
792            cols = [cols]
793        if ix_key == 'unique' and upsert:
794            continue
795        cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col]
796        if not cols_names:
797            continue
798
799        cols_names_str = ", ".join(cols_names)
800        index_query_params_clause = f" ({cols_names_str})"
801        if self.flavor == 'postgis':
802            for col in cols:
803                col_typ = existing_cols_pd_types.get(cols[0], 'object')
804                if col_typ != 'object' and are_dtypes_equal(col_typ, 'geometry'):
805                    index_query_params_clause = f" USING GIST ({cols_names_str})"
806                    break
807
808        index_queries[ix_key] = [
809            f"CREATE INDEX {ix_name} ON {_pipe_name}{index_query_params_clause}"
810        ]
811
812    indices_cols_str = ', '.join(
813        list({
814            sql_item_name(ix, self.flavor)
815            for ix_key, ix in pipe.columns.items()
816            if ix and ix in existing_cols_types
817        })
818    )
819    coalesce_indices_cols_str = ', '.join(
820        [
821            (
822                (
823                    "COALESCE("
824                    + sql_item_name(ix, self.flavor)
825                    + ", "
826                    + get_null_replacement(existing_cols_types[ix], self.flavor)
827                    + ") "
828                )
829                if ix_key != 'datetime' and null_indices
830                else sql_item_name(ix, self.flavor)
831            )
832            for ix_key, ix in pipe.columns.items()
833            if ix and ix in existing_cols_types
834        ]
835    )
836    unique_index_name = sql_item_name(unique_index_name_unquoted, self.flavor)
837    constraint_name_unquoted = unique_index_name_unquoted.replace('IX_', 'UQ_')
838    constraint_name = sql_item_name(constraint_name_unquoted, self.flavor)
839    add_constraint_query = (
840        f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})"
841    )
842    unique_index_cols_str = (
843        indices_cols_str
844        if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS or not null_indices
845        else coalesce_indices_cols_str
846    )
847    create_unique_index_query = (
848        f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})"
849    )
850    constraint_queries = [create_unique_index_query]
851    if self.flavor != 'sqlite':
852        constraint_queries.append(add_constraint_query)
853    if upsert and indices_cols_str:
854        index_queries[unique_index_name] = constraint_queries
855    return index_queries

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of index names mapping to lists of queries.
def get_drop_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
858def get_drop_index_queries(
859    self,
860    pipe: mrsm.Pipe,
861    debug: bool = False,
862) -> Dict[str, List[str]]:
863    """
864    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.
865
866    Parameters
867    ----------
868    pipe: mrsm.Pipe
869        The pipe to which the queries will correspond.
870
871    Returns
872    -------
873    A dictionary of column names mapping to lists of queries.
874    """
875    ### NOTE: Due to breaking changes within DuckDB, indices must be skipped.
876    if self.flavor == 'duckdb':
877        return {}
878    if not pipe.exists(debug=debug):
879        return {}
880
881    from collections import defaultdict
882    from meerschaum.utils.sql import (
883        sql_item_name,
884        table_exists,
885        hypertable_queries,
886        DROP_INDEX_IF_EXISTS_FLAVORS,
887    )
888    drop_queries = defaultdict(lambda: [])
889    schema = self.get_pipe_schema(pipe)
890    index_schema = schema if self.flavor != 'mssql' else None
891    indices = {
892        ix_key: ix
893        for ix_key, ix in pipe.get_indices().items()
894    }
895    cols_indices = pipe.get_columns_indices(debug=debug)
896    existing_indices = set()
897    clustered_ix = None
898    for col, ix_metas in cols_indices.items():
899        for ix_meta in ix_metas:
900            ix_name = ix_meta.get('name', None)
901            if ix_meta.get('clustered', False):
902                clustered_ix = ix_name
903            existing_indices.add(ix_name.lower())
904    pipe_name = sql_item_name(pipe.target, self.flavor, schema)
905    pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None)
906    upsert = pipe.upsert
907
908    if self.flavor not in hypertable_queries:
909        is_hypertable = False
910    else:
911        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
912        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None
913
914    if_exists_str = "IF EXISTS " if self.flavor in DROP_INDEX_IF_EXISTS_FLAVORS else ""
915    if is_hypertable:
916        nuke_queries = []
917        temp_table = '_' + pipe.target + '_temp_migration'
918        temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe))
919
920        if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug):
921            nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}")
922        nuke_queries += [
923            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
924            f"DROP TABLE {if_exists_str}{pipe_name}",
925            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}",
926        ]
927        nuke_ix_keys = ('datetime', 'id')
928        nuked = False
929        for ix_key in nuke_ix_keys:
930            if ix_key in indices and not nuked:
931                drop_queries[ix_key].extend(nuke_queries)
932                nuked = True
933
934    for ix_key, ix_unquoted in indices.items():
935        if ix_key in drop_queries:
936            continue
937        if ix_unquoted.lower() not in existing_indices:
938            continue
939
940        if ix_key == 'unique' and upsert and self.flavor not in ('sqlite',) and not is_hypertable:
941            constraint_name_unquoted = ix_unquoted.replace('IX_', 'UQ_')
942            constraint_name = sql_item_name(constraint_name_unquoted, self.flavor)
943            constraint_or_index = (
944                "CONSTRAINT"
945                if self.flavor not in ('mysql', 'mariadb')
946                else 'INDEX'
947            )
948            drop_queries[ix_key].append(
949                f"ALTER TABLE {pipe_name}\n"
950                f"DROP {constraint_or_index} {constraint_name}"
951            )
952
953        query = (
954            (
955                f"ALTER TABLE {pipe_name}\n"
956                if self.flavor in ('mysql', 'mariadb')
957                else ''
958            )
959            + f"DROP INDEX {if_exists_str}"
960            + sql_item_name(ix_unquoted, self.flavor, index_schema)
961        )
962        if self.flavor == 'mssql':
963            query += f"\nON {pipe_name}"
964            if ix_unquoted == clustered_ix:
965                query += "\nWITH (ONLINE = ON, MAXDOP = 4)"
966        drop_queries[ix_key].append(query)
967
968
969    return drop_queries

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of column names mapping to lists of queries.
def get_add_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', _is_db_types: bool = False, debug: bool = False) -> List[str]:
3167def get_add_columns_queries(
3168    self,
3169    pipe: mrsm.Pipe,
3170    df: Union[pd.DataFrame, Dict[str, str]],
3171    _is_db_types: bool = False,
3172    debug: bool = False,
3173) -> List[str]:
3174    """
3175    Add new null columns of the correct type to a table from a dataframe.
3176
3177    Parameters
3178    ----------
3179    pipe: mrsm.Pipe
3180        The pipe to be altered.
3181
3182    df: Union[pd.DataFrame, Dict[str, str]]
3183        The pandas DataFrame which contains new columns.
3184        If a dictionary is provided, assume it maps columns to Pandas data types.
3185
3186    _is_db_types: bool, default False
3187        If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes.
3188
3189    Returns
3190    -------
3191    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
3192    """
3193    if not pipe.exists(debug=debug):
3194        return []
3195
3196    if pipe.parameters.get('static', False):
3197        return []
3198
3199    from decimal import Decimal
3200    import copy
3201    from meerschaum.utils.sql import (
3202        sql_item_name,
3203        SINGLE_ALTER_TABLE_FLAVORS,
3204        get_table_cols_types,
3205    )
3206    from meerschaum.utils.dtypes.sql import (
3207        get_pd_type_from_db_type,
3208        get_db_type_from_pd_type,
3209    )
3210    from meerschaum.utils.misc import flatten_list
3211    table_obj = self.get_pipe_table(pipe, debug=debug)
3212    is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False
3213    if is_dask:
3214        df = df.partitions[0].compute()
3215    df_cols_types = (
3216        {
3217            col: str(typ)
3218            for col, typ in df.dtypes.items()
3219        }
3220        if not isinstance(df, dict)
3221        else copy.deepcopy(df)
3222    )
3223    if not isinstance(df, dict) and len(df.index) > 0:
3224        for col, typ in list(df_cols_types.items()):
3225            if typ != 'object':
3226                continue
3227            val = df.iloc[0][col]
3228            if isinstance(val, (dict, list)):
3229                df_cols_types[col] = 'json'
3230            elif isinstance(val, Decimal):
3231                df_cols_types[col] = 'numeric'
3232            elif isinstance(val, str):
3233                df_cols_types[col] = 'str'
3234    db_cols_types = {
3235        col: get_pd_type_from_db_type(str(typ.type))
3236        for col, typ in table_obj.columns.items()
3237    } if table_obj is not None else {
3238        col: get_pd_type_from_db_type(typ)
3239        for col, typ in get_table_cols_types(
3240            pipe.target,
3241            self,
3242            schema=self.get_pipe_schema(pipe),
3243            debug=debug,
3244        ).items()
3245    }
3246    new_cols = set(df_cols_types) - set(db_cols_types)
3247    if not new_cols:
3248        return []
3249
3250    new_cols_types = {
3251        col: get_db_type_from_pd_type(
3252            df_cols_types[col],
3253            self.flavor
3254        )
3255        for col in new_cols
3256        if col and df_cols_types.get(col, None)
3257    }
3258
3259    alter_table_query = "ALTER TABLE " + sql_item_name(
3260        pipe.target, self.flavor, self.get_pipe_schema(pipe)
3261    )
3262    queries = []
3263    for col, typ in new_cols_types.items():
3264        add_col_query = (
3265            "\nADD "
3266            + sql_item_name(col, self.flavor, None)
3267            + " " + typ + ","
3268        )
3269
3270        if self.flavor in SINGLE_ALTER_TABLE_FLAVORS:
3271            queries.append(alter_table_query + add_col_query[:-1])
3272        else:
3273            alter_table_query += add_col_query
3274
3275    ### For most flavors, only one query is required.
3276    ### This covers SQLite which requires one query per column.
3277    if not queries:
3278        queries.append(alter_table_query[:-1])
3279
3280    if self.flavor != 'duckdb':
3281        return queries
3282
3283    ### NOTE: For DuckDB, we must drop and rebuild the indices.
3284    drop_index_queries = list(flatten_list(
3285        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
3286    ))
3287    create_index_queries = list(flatten_list(
3288        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
3289    ))
3290
3291    return drop_index_queries + queries + create_index_queries

Add new null columns of the correct type to a table from a dataframe.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
  • _is_db_types (bool, default False): If True, assume df is a dictionary mapping columns to SQL native dtypes.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def get_alter_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', debug: bool = False) -> List[str]:
3294def get_alter_columns_queries(
3295    self,
3296    pipe: mrsm.Pipe,
3297    df: Union[pd.DataFrame, Dict[str, str]],
3298    debug: bool = False,
3299) -> List[str]:
3300    """
3301    If we encounter a column of a different type, set the entire column to text.
3302    If the altered columns are numeric, alter to numeric instead.
3303
3304    Parameters
3305    ----------
3306    pipe: mrsm.Pipe
3307        The pipe to be altered.
3308
3309    df: Union[pd.DataFrame, Dict[str, str]]
3310        The pandas DataFrame which may contain altered columns.
3311        If a dict is provided, assume it maps columns to Pandas data types.
3312
3313    Returns
3314    -------
3315    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
3316    """
3317    if not pipe.exists(debug=debug):
3318        return []
3319    if pipe.static:
3320        return
3321    from meerschaum.utils.sql import (
3322        sql_item_name,
3323        get_table_cols_types,
3324        DROP_IF_EXISTS_FLAVORS,
3325        SINGLE_ALTER_TABLE_FLAVORS,
3326    )
3327    from meerschaum.utils.dataframe import get_numeric_cols
3328    from meerschaum.utils.dtypes import are_dtypes_equal
3329    from meerschaum.utils.dtypes.sql import (
3330        get_pd_type_from_db_type,
3331        get_db_type_from_pd_type,
3332    )
3333    from meerschaum.utils.misc import flatten_list, generate_password, items_str
3334    table_obj = self.get_pipe_table(pipe, debug=debug)
3335    target = pipe.target
3336    session_id = generate_password(3)
3337    numeric_cols = (
3338        get_numeric_cols(df)
3339        if not isinstance(df, dict)
3340        else [
3341            col
3342            for col, typ in df.items()
3343            if typ.startswith('numeric')
3344        ]
3345    )
3346    df_cols_types = (
3347        {
3348            col: str(typ)
3349            for col, typ in df.dtypes.items()
3350        }
3351        if not isinstance(df, dict)
3352        else df
3353    )
3354    db_cols_types = {
3355        col: get_pd_type_from_db_type(str(typ.type))
3356        for col, typ in table_obj.columns.items()
3357    } if table_obj is not None else {
3358        col: get_pd_type_from_db_type(typ)
3359        for col, typ in get_table_cols_types(
3360            pipe.target,
3361            self,
3362            schema=self.get_pipe_schema(pipe),
3363            debug=debug,
3364        ).items()
3365    }
3366    pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')]
3367    pd_db_df_aliases = {
3368        'int': 'bool',
3369        'float': 'bool',
3370        'numeric': 'bool',
3371        'guid': 'object',
3372    }
3373    if self.flavor == 'oracle':
3374        pd_db_df_aliases['int'] = 'numeric'
3375
3376    altered_cols = {
3377        col: (db_cols_types.get(col, 'object'), typ)
3378        for col, typ in df_cols_types.items()
3379        if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower())
3380        and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string')
3381    }
3382
3383    ### NOTE: Sometimes bools are coerced into ints or floats.
3384    altered_cols_to_ignore = set()
3385    for col, (db_typ, df_typ) in altered_cols.items():
3386        for db_alias, df_alias in pd_db_df_aliases.items():
3387            if db_alias in db_typ.lower() and df_alias in df_typ.lower():
3388                altered_cols_to_ignore.add(col)
3389
3390    ### Oracle's bool handling sometimes mixes NUMBER and INT.
3391    for bool_col in pipe_bool_cols:
3392        if bool_col not in altered_cols:
3393            continue
3394        db_is_bool_compatible = (
3395            are_dtypes_equal('int', altered_cols[bool_col][0])
3396            or are_dtypes_equal('float', altered_cols[bool_col][0])
3397            or are_dtypes_equal('numeric', altered_cols[bool_col][0])
3398            or are_dtypes_equal('bool', altered_cols[bool_col][0])
3399        )
3400        df_is_bool_compatible = (
3401            are_dtypes_equal('int', altered_cols[bool_col][1])
3402            or are_dtypes_equal('float', altered_cols[bool_col][1])
3403            or are_dtypes_equal('numeric', altered_cols[bool_col][1])
3404            or are_dtypes_equal('bool', altered_cols[bool_col][1])
3405        )
3406        if db_is_bool_compatible and df_is_bool_compatible:
3407            altered_cols_to_ignore.add(bool_col)
3408
3409    for col in altered_cols_to_ignore:
3410        _ = altered_cols.pop(col, None)
3411    if not altered_cols:
3412        return []
3413
3414    if numeric_cols:
3415        pipe.dtypes.update({col: 'numeric' for col in numeric_cols})
3416        edit_success, edit_msg = pipe.edit(debug=debug)
3417        if not edit_success:
3418            warn(
3419                f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n"
3420                + f"{edit_msg}"
3421            )
3422    else:
3423        numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ.startswith('numeric')])
3424
3425    numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False)
3426    text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False)
3427    altered_cols_types = {
3428        col: (
3429            numeric_type
3430            if col in numeric_cols
3431            else text_type
3432        )
3433        for col, (db_typ, typ) in altered_cols.items()
3434    }
3435
3436    if self.flavor == 'sqlite':
3437        temp_table_name = '-' + session_id + '_' + target
3438        rename_query = (
3439            "ALTER TABLE "
3440            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3441            + " RENAME TO "
3442            + sql_item_name(temp_table_name, self.flavor, None)
3443        )
3444        create_query = (
3445            "CREATE TABLE "
3446            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3447            + " (\n"
3448        )
3449        for col_name, col_obj in table_obj.columns.items():
3450            create_query += (
3451                sql_item_name(col_name, self.flavor, None)
3452                + " "
3453                + (
3454                    str(col_obj.type)
3455                    if col_name not in altered_cols
3456                    else altered_cols_types[col_name]
3457                )
3458                + ",\n"
3459            )
3460        create_query = create_query[:-2] + "\n)"
3461
3462        insert_query = (
3463            "INSERT INTO "
3464            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3465            + ' ('
3466            + ', '.join([
3467                sql_item_name(col_name, self.flavor, None)
3468                for col_name, _ in table_obj.columns.items()
3469            ])
3470            + ')'
3471            + "\nSELECT\n"
3472        )
3473        for col_name, col_obj in table_obj.columns.items():
3474            new_col_str = (
3475                sql_item_name(col_name, self.flavor, None)
3476                if col_name not in altered_cols
3477                else (
3478                    "CAST("
3479                    + sql_item_name(col_name, self.flavor, None)
3480                    + " AS "
3481                    + altered_cols_types[col_name]
3482                    + ")"
3483                )
3484            )
3485            insert_query += new_col_str + ",\n"
3486        insert_query = insert_query[:-2] + (
3487            f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}"
3488        )
3489
3490        if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
3491
3492        drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name(
3493            temp_table_name, self.flavor, self.get_pipe_schema(pipe)
3494        )
3495        return [
3496            rename_query,
3497            create_query,
3498            insert_query,
3499            drop_query,
3500        ]
3501
3502    queries = []
3503    if self.flavor == 'oracle':
3504        for col, typ in altered_cols_types.items():
3505            add_query = (
3506                "ALTER TABLE "
3507                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3508                + "\nADD " + sql_item_name(col + '_temp', self.flavor, None)
3509                + " " + typ
3510            )
3511            queries.append(add_query)
3512
3513        for col, typ in altered_cols_types.items():
3514            populate_temp_query = (
3515                "UPDATE "
3516                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3517                + "\nSET " + sql_item_name(col + '_temp', self.flavor, None)
3518                + ' = ' + sql_item_name(col, self.flavor, None)
3519            )
3520            queries.append(populate_temp_query)
3521
3522        for col, typ in altered_cols_types.items():
3523            set_old_cols_to_null_query = (
3524                "UPDATE "
3525                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3526                + "\nSET " + sql_item_name(col, self.flavor, None)
3527                + ' = NULL'
3528            )
3529            queries.append(set_old_cols_to_null_query)
3530
3531        for col, typ in altered_cols_types.items():
3532            alter_type_query = (
3533                "ALTER TABLE "
3534                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3535                + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' '
3536                + typ
3537            )
3538            queries.append(alter_type_query)
3539
3540        for col, typ in altered_cols_types.items():
3541            set_old_to_temp_query = (
3542                "UPDATE "
3543                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3544                + "\nSET " + sql_item_name(col, self.flavor, None)
3545                + ' = ' + sql_item_name(col + '_temp', self.flavor, None)
3546            )
3547            queries.append(set_old_to_temp_query)
3548
3549        for col, typ in altered_cols_types.items():
3550            drop_temp_query = (
3551                "ALTER TABLE "
3552                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3553                + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None)
3554            )
3555            queries.append(drop_temp_query)
3556
3557        return queries
3558
3559    query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3560    for col, typ in altered_cols_types.items():
3561        alter_col_prefix = (
3562            'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle')
3563            else 'MODIFY'
3564        )
3565        type_prefix = (
3566            '' if self.flavor in ('mssql', 'mariadb', 'mysql')
3567            else 'TYPE '
3568        )
3569        column_str = 'COLUMN' if self.flavor != 'oracle' else ''
3570        query_suffix = (
3571            f"\n{alter_col_prefix} {column_str} "
3572            + sql_item_name(col, self.flavor, None)
3573            + " " + type_prefix + typ + ","
3574        )
3575        if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS:
3576            query += query_suffix
3577        else:
3578            queries.append(query + query_suffix[:-1])
3579
3580    if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS:
3581        queries.append(query[:-1])
3582
3583    if self.flavor != 'duckdb':
3584        return queries
3585
3586    drop_index_queries = list(flatten_list(
3587        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
3588    ))
3589    create_index_queries = list(flatten_list(
3590        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
3591    ))
3592
3593    return drop_index_queries + queries + create_index_queries

If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def delete_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
972def delete_pipe(
973    self,
974    pipe: mrsm.Pipe,
975    debug: bool = False,
976) -> SuccessTuple:
977    """
978    Delete a Pipe's registration.
979    """
980    from meerschaum.utils.packages import attempt_import
981    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
982
983    if not pipe.id:
984        return False, f"{pipe} is not registered."
985
986    ### ensure pipes table exists
987    from meerschaum.connectors.sql.tables import get_tables
988    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
989
990    q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
991    if not self.exec(q, debug=debug):
992        return False, f"Failed to delete registration for {pipe}."
993
994    return True, "Success"

Delete a Pipe's registration.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, str, NoneType] = None, end: Union[datetime.datetime, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any) -> 'Union[pd.DataFrame, None]':
 997def get_pipe_data(
 998    self,
 999    pipe: mrsm.Pipe,
1000    select_columns: Optional[List[str]] = None,
1001    omit_columns: Optional[List[str]] = None,
1002    begin: Union[datetime, str, None] = None,
1003    end: Union[datetime, str, None] = None,
1004    params: Optional[Dict[str, Any]] = None,
1005    order: str = 'asc',
1006    limit: Optional[int] = None,
1007    begin_add_minutes: int = 0,
1008    end_add_minutes: int = 0,
1009    debug: bool = False,
1010    **kw: Any
1011) -> Union[pd.DataFrame, None]:
1012    """
1013    Access a pipe's data from the SQL instance.
1014
1015    Parameters
1016    ----------
1017    pipe: mrsm.Pipe:
1018        The pipe to get data from.
1019
1020    select_columns: Optional[List[str]], default None
1021        If provided, only select these given columns.
1022        Otherwise select all available columns (i.e. `SELECT *`).
1023
1024    omit_columns: Optional[List[str]], default None
1025        If provided, remove these columns from the selection.
1026
1027    begin: Union[datetime, str, None], default None
1028        If provided, get rows newer than or equal to this value.
1029
1030    end: Union[datetime, str, None], default None
1031        If provided, get rows older than or equal to this value.
1032
1033    params: Optional[Dict[str, Any]], default None
1034        Additional parameters to filter by.
1035        See `meerschaum.connectors.sql.build_where`.
1036
1037    order: Optional[str], default 'asc'
1038        The selection order for all of the indices in the query.
1039        If `None`, omit the `ORDER BY` clause.
1040
1041    limit: Optional[int], default None
1042        If specified, limit the number of rows retrieved to this value.
1043
1044    begin_add_minutes: int, default 0
1045        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`.
1046
1047    end_add_minutes: int, default 0
1048        The number of minutes to add to the `end` datetime (i.e. `DATEADD`.
1049
1050    chunksize: Optional[int], default -1
1051        The size of dataframe chunks to load into memory.
1052
1053    debug: bool, default False
1054        Verbosity toggle.
1055
1056    Returns
1057    -------
1058    A `pd.DataFrame` of the pipe's data.
1059
1060    """
1061    import json
1062    from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype
1063    from meerschaum.utils.packages import import_pandas
1064    from meerschaum.utils.dtypes import (
1065        attempt_cast_to_numeric,
1066        attempt_cast_to_uuid,
1067        attempt_cast_to_bytes,
1068        attempt_cast_to_geometry,
1069        are_dtypes_equal,
1070    )
1071    from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
1072    pd = import_pandas()
1073    is_dask = 'dask' in pd.__name__
1074
1075    cols_types = pipe.get_columns_types(debug=debug) if pipe.enforce else {}
1076    dtypes = {
1077        **{
1078            p_col: to_pandas_dtype(p_typ)
1079            for p_col, p_typ in pipe.dtypes.items()
1080        },
1081        **{
1082            col: get_pd_type_from_db_type(typ)
1083            for col, typ in cols_types.items()
1084        }
1085    } if pipe.enforce else {}
1086    if dtypes:
1087        if self.flavor == 'sqlite':
1088            if not pipe.columns.get('datetime', None):
1089                _dt = pipe.guess_datetime()
1090            else:
1091                _dt = pipe.get_columns('datetime')
1092
1093            if _dt:
1094                dt_type = dtypes.get(_dt, 'object').lower()
1095                if 'datetime' not in dt_type:
1096                    if 'int' not in dt_type:
1097                        dtypes[_dt] = 'datetime64[ns, UTC]'
1098
1099    existing_cols = cols_types.keys()
1100    select_columns = (
1101        [
1102            col
1103            for col in existing_cols
1104            if col not in (omit_columns or [])
1105        ]
1106        if not select_columns
1107        else [
1108            col
1109            for col in select_columns
1110            if col in existing_cols
1111            and col not in (omit_columns or [])
1112        ]
1113    ) if pipe.enforce else select_columns
1114    if select_columns:
1115        dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns}
1116    dtypes = {
1117        col: to_pandas_dtype(typ)
1118        for col, typ in dtypes.items()
1119        if col in select_columns and col not in (omit_columns or [])
1120    } if pipe.enforce else {}
1121    query = self.get_pipe_data_query(
1122        pipe,
1123        select_columns=select_columns,
1124        omit_columns=omit_columns,
1125        begin=begin,
1126        end=end,
1127        params=params,
1128        order=order,
1129        limit=limit,
1130        begin_add_minutes=begin_add_minutes,
1131        end_add_minutes=end_add_minutes,
1132        debug=debug,
1133        **kw
1134    )
1135
1136    if is_dask:
1137        index_col = pipe.columns.get('datetime', None)
1138        kw['index_col'] = index_col
1139
1140    numeric_columns = [
1141        col
1142        for col, typ in pipe.dtypes.items()
1143        if typ.startswith('numeric') and col in dtypes
1144    ]
1145    uuid_columns = [
1146        col
1147        for col, typ in pipe.dtypes.items()
1148        if typ == 'uuid' and col in dtypes
1149    ]
1150    bytes_columns = [
1151        col
1152        for col, typ in pipe.dtypes.items()
1153        if typ == 'bytes' and col in dtypes
1154    ]
1155    geometry_columns = [
1156        col
1157        for col, typ in pipe.dtypes.items()
1158        if typ.startswith('geometry') and col in dtypes
1159    ]
1160
1161    kw['coerce_float'] = kw.get('coerce_float', (len(numeric_columns) == 0))
1162
1163    df = self.read(
1164        query,
1165        dtype=dtypes,
1166        debug=debug,
1167        **kw
1168    )
1169    for col in numeric_columns:
1170        if col not in df.columns:
1171            continue
1172        df[col] = df[col].apply(attempt_cast_to_numeric)
1173
1174    for col in uuid_columns:
1175        if col not in df.columns:
1176            continue
1177        df[col] = df[col].apply(attempt_cast_to_uuid)
1178
1179    for col in bytes_columns:
1180        if col not in df.columns:
1181            continue
1182        df[col] = df[col].apply(attempt_cast_to_bytes)
1183
1184    for col in geometry_columns:
1185        if col not in df.columns:
1186            continue
1187        df[col] = df[col].apply(attempt_cast_to_geometry)
1188
1189    if self.flavor == 'sqlite':
1190        ignore_dt_cols = [
1191            col
1192            for col, dtype in pipe.dtypes.items()
1193            if not are_dtypes_equal(str(dtype), 'datetime')
1194        ]
1195        ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly
1196        df = (
1197            parse_df_datetimes(
1198                df,
1199                ignore_cols=ignore_dt_cols,
1200                chunksize=kw.get('chunksize', None),
1201                strip_timezone=(pipe.tzinfo is None),
1202                debug=debug,
1203            ) if isinstance(df, pd.DataFrame) else (
1204                [
1205                    parse_df_datetimes(
1206                        c,
1207                        ignore_cols=ignore_dt_cols,
1208                        chunksize=kw.get('chunksize', None),
1209                        strip_timezone=(pipe.tzinfo is None),
1210                        debug=debug,
1211                    )
1212                    for c in df
1213                ]
1214            )
1215        )
1216        for col, typ in dtypes.items():
1217            if typ != 'json':
1218                continue
1219            df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x)
1220    return df

Access a pipe's data from the SQL instance.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get data from.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
  • end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
  • params (Optional[Dict[str, Any]], default None): Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
  • order (Optional[str], default 'asc'): The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
  • limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
  • begin_add_minutes (int, default 0): The number of minutes to add to the begin datetime (i.e. DATEADD.
  • end_add_minutes (int, default 0): The number of minutes to add to the end datetime (i.e. DATEADD.
  • chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the pipe's data.
def get_pipe_data_query( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: Optional[str] = 'asc', sort_datetimes: bool = False, limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, skip_existing_cols_check: bool = False, debug: bool = False, **kw: Any) -> Optional[str]:
1223def get_pipe_data_query(
1224    self,
1225    pipe: mrsm.Pipe,
1226    select_columns: Optional[List[str]] = None,
1227    omit_columns: Optional[List[str]] = None,
1228    begin: Union[datetime, int, str, None] = None,
1229    end: Union[datetime, int, str, None] = None,
1230    params: Optional[Dict[str, Any]] = None,
1231    order: Optional[str] = 'asc',
1232    sort_datetimes: bool = False,
1233    limit: Optional[int] = None,
1234    begin_add_minutes: int = 0,
1235    end_add_minutes: int = 0,
1236    replace_nulls: Optional[str] = None,
1237    skip_existing_cols_check: bool = False,
1238    debug: bool = False,
1239    **kw: Any
1240) -> Union[str, None]:
1241    """
1242    Return the `SELECT` query for retrieving a pipe's data from its instance.
1243
1244    Parameters
1245    ----------
1246    pipe: mrsm.Pipe:
1247        The pipe to get data from.
1248
1249    select_columns: Optional[List[str]], default None
1250        If provided, only select these given columns.
1251        Otherwise select all available columns (i.e. `SELECT *`).
1252
1253    omit_columns: Optional[List[str]], default None
1254        If provided, remove these columns from the selection.
1255
1256    begin: Union[datetime, int, str, None], default None
1257        If provided, get rows newer than or equal to this value.
1258
1259    end: Union[datetime, str, None], default None
1260        If provided, get rows older than or equal to this value.
1261
1262    params: Optional[Dict[str, Any]], default None
1263        Additional parameters to filter by.
1264        See `meerschaum.connectors.sql.build_where`.
1265
1266    order: Optional[str], default None
1267        The selection order for all of the indices in the query.
1268        If `None`, omit the `ORDER BY` clause.
1269
1270    sort_datetimes: bool, default False
1271        Alias for `order='desc'`.
1272
1273    limit: Optional[int], default None
1274        If specified, limit the number of rows retrieved to this value.
1275
1276    begin_add_minutes: int, default 0
1277        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`).
1278
1279    end_add_minutes: int, default 0
1280        The number of minutes to add to the `end` datetime (i.e. `DATEADD`).
1281
1282    chunksize: Optional[int], default -1
1283        The size of dataframe chunks to load into memory.
1284
1285    replace_nulls: Optional[str], default None
1286        If provided, replace null values with this value.
1287
1288    skip_existing_cols_check: bool, default False
1289        If `True`, do not verify that querying columns are actually on the table.
1290
1291    debug: bool, default False
1292        Verbosity toggle.
1293
1294    Returns
1295    -------
1296    A `SELECT` query to retrieve a pipe's data.
1297    """
1298    from meerschaum.utils.misc import items_str
1299    from meerschaum.utils.sql import sql_item_name, dateadd_str
1300    from meerschaum.utils.dtypes import coerce_timezone
1301    from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type, get_db_type_from_pd_type
1302
1303    dt_col = pipe.columns.get('datetime', None)
1304    existing_cols = pipe.get_columns_types(debug=debug) if pipe.enforce else []
1305    skip_existing_cols_check = skip_existing_cols_check or not pipe.enforce
1306    dt_typ = get_pd_type_from_db_type(existing_cols[dt_col]) if dt_col in existing_cols else None
1307    dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
1308    select_columns = (
1309        [col for col in existing_cols]
1310        if not select_columns
1311        else [col for col in select_columns if skip_existing_cols_check or col in existing_cols]
1312    )
1313    if omit_columns:
1314        select_columns = [col for col in select_columns if col not in omit_columns]
1315
1316    if order is None and sort_datetimes:
1317        order = 'desc'
1318
1319    if begin == '':
1320        begin = pipe.get_sync_time(debug=debug)
1321        backtrack_interval = pipe.get_backtrack_interval(debug=debug)
1322        if begin is not None:
1323            begin -= backtrack_interval
1324
1325    begin, end = pipe.parse_date_bounds(begin, end)
1326    if isinstance(begin, datetime) and dt_typ:
1327        begin = coerce_timezone(begin, strip_utc=('utc' not in dt_typ.lower()))
1328    if isinstance(end, datetime) and dt_typ:
1329        end = coerce_timezone(end, strip_utc=('utc' not in dt_typ.lower()))
1330
1331    cols_names = [
1332        sql_item_name(col, self.flavor, None)
1333        for col in select_columns
1334    ]
1335    select_cols_str = (
1336        'SELECT\n    '
1337        + ',\n    '.join(
1338            [
1339                (
1340                    col_name
1341                    if not replace_nulls
1342                    else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}"
1343                )
1344                for col_name in cols_names
1345            ]
1346        )
1347    ) if cols_names else 'SELECT *'
1348    pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
1349    query = f"{select_cols_str}\nFROM {pipe_table_name}"
1350    where = ""
1351
1352    if order is not None:
1353        default_order = 'asc'
1354        if order not in ('asc', 'desc'):
1355            warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.")
1356            order = default_order
1357        order = order.upper()
1358
1359    if not pipe.columns.get('datetime', None):
1360        _dt = pipe.guess_datetime()
1361        dt = sql_item_name(_dt, self.flavor, None) if _dt else None
1362        is_guess = True
1363    else:
1364        _dt = pipe.get_columns('datetime')
1365        dt = sql_item_name(_dt, self.flavor, None)
1366        is_guess = False
1367
1368    quoted_indices = {
1369        key: sql_item_name(val, self.flavor, None)
1370        for key, val in pipe.columns.items()
1371        if val in existing_cols or skip_existing_cols_check
1372    }
1373
1374    if begin is not None or end is not None:
1375        if is_guess:
1376            if _dt is None:
1377                warn(
1378                    f"No datetime could be determined for {pipe}."
1379                    + "\n    Ignoring begin and end...",
1380                    stack=False,
1381                )
1382                begin, end = None, None
1383            else:
1384                warn(
1385                    f"A datetime wasn't specified for {pipe}.\n"
1386                    + f"    Using column \"{_dt}\" for datetime bounds...",
1387                    stack=False,
1388                )
1389
1390    is_dt_bound = False
1391    if begin is not None and (_dt in existing_cols or skip_existing_cols_check):
1392        begin_da = dateadd_str(
1393            flavor=self.flavor,
1394            datepart='minute',
1395            number=begin_add_minutes,
1396            begin=begin,
1397            db_type=dt_db_type,
1398        )
1399        where += f"\n    {dt} >= {begin_da}" + ("\n    AND\n    " if end is not None else "")
1400        is_dt_bound = True
1401
1402    if end is not None and (_dt in existing_cols or skip_existing_cols_check):
1403        if 'int' in str(type(end)).lower() and end == begin:
1404            end += 1
1405        end_da = dateadd_str(
1406            flavor=self.flavor,
1407            datepart='minute',
1408            number=end_add_minutes,
1409            begin=end,
1410            db_type=dt_db_type,
1411        )
1412        where += f"{dt} <  {end_da}"
1413        is_dt_bound = True
1414
1415    if params is not None:
1416        from meerschaum.utils.sql import build_where
1417        valid_params = {
1418            k: v
1419            for k, v in params.items()
1420            if k in existing_cols or skip_existing_cols_check
1421        }
1422        if valid_params:
1423            where += build_where(valid_params, self).replace(
1424                'WHERE', ('    AND' if is_dt_bound else "    ")
1425            )
1426
1427    if len(where) > 0:
1428        query += "\nWHERE " + where
1429
1430    if order is not None:
1431        ### Sort by indices, starting with datetime.
1432        order_by = ""
1433        if quoted_indices:
1434            order_by += "\nORDER BY "
1435            if _dt and (_dt in existing_cols or skip_existing_cols_check):
1436                order_by += dt + ' ' + order + ','
1437            for key, quoted_col_name in quoted_indices.items():
1438                if dt == quoted_col_name:
1439                    continue
1440                order_by += ' ' + quoted_col_name + ' ' + order + ','
1441            order_by = order_by[:-1]
1442
1443        query += order_by
1444
1445    if isinstance(limit, int):
1446        if self.flavor == 'mssql':
1447            query = f'SELECT TOP {limit}\n' + query[len("SELECT "):]
1448        elif self.flavor == 'oracle':
1449            query = (
1450                f"SELECT * FROM (\n  {query}\n)\n"
1451                + f"WHERE ROWNUM IN ({', '.join([str(i) for i in range(1, limit+1)])})"
1452            )
1453        else:
1454            query += f"\nLIMIT {limit}"
1455
1456    if debug:
1457        to_print = (
1458            []
1459            + ([f"begin='{begin}'"] if begin else [])
1460            + ([f"end='{end}'"] if end else [])
1461            + ([f"params={params}"] if params else [])
1462        )
1463        dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False))
1464
1465    return query

Return the SELECT query for retrieving a pipe's data from its instance.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get data from.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, str, None], default None): If provided, get rows newer than or equal to this value.
  • end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
  • params (Optional[Dict[str, Any]], default None): Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
  • order (Optional[str], default None): The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
  • sort_datetimes (bool, default False): Alias for order='desc'.
  • limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
  • begin_add_minutes (int, default 0): The number of minutes to add to the begin datetime (i.e. DATEADD).
  • end_add_minutes (int, default 0): The number of minutes to add to the end datetime (i.e. DATEADD).
  • chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
  • replace_nulls (Optional[str], default None): If provided, replace null values with this value.
  • skip_existing_cols_check (bool, default False): If True, do not verify that querying columns are actually on the table.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SELECT query to retrieve a pipe's data.
def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
20def register_pipe(
21    self,
22    pipe: mrsm.Pipe,
23    debug: bool = False,
24) -> SuccessTuple:
25    """
26    Register a new pipe.
27    A pipe's attributes must be set before registering.
28    """
29    from meerschaum.utils.debug import dprint
30    from meerschaum.utils.packages import attempt_import
31    from meerschaum.utils.sql import json_flavors
32
33    ### ensure pipes table exists
34    from meerschaum.connectors.sql.tables import get_tables
35    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
36
37    if pipe.get_id(debug=debug) is not None:
38        return False, f"{pipe} is already registered."
39
40    ### NOTE: if `parameters` is supplied in the Pipe constructor,
41    ###       then `pipe.parameters` will exist and not be fetched from the database.
42
43    ### 1. Prioritize the Pipe object's `parameters` first.
44    ###    E.g. if the user manually sets the `parameters` property
45    ###    or if the Pipe already exists
46    ###    (which shouldn't be able to be registered anyway but that's an issue for later).
47    parameters = None
48    try:
49        parameters = pipe.parameters
50    except Exception as e:
51        if debug:
52            dprint(str(e))
53        parameters = None
54
55    ### ensure `parameters` is a dictionary
56    if parameters is None:
57        parameters = {}
58
59    import json
60    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
61    values = {
62        'connector_keys' : pipe.connector_keys,
63        'metric_key'     : pipe.metric_key,
64        'location_key'   : pipe.location_key,
65        'parameters'     : (
66            json.dumps(parameters)
67            if self.flavor not in json_flavors
68            else parameters
69        ),
70    }
71    query = sqlalchemy.insert(pipes_tbl).values(**values)
72    result = self.exec(query, debug=debug)
73    if result is None:
74        return False, f"Failed to register {pipe}."
75    return True, f"Successfully registered {pipe}."

Register a new pipe. A pipe's attributes must be set before registering.

def edit_pipe( self, pipe: meerschaum.Pipe = None, patch: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 78def edit_pipe(
 79    self,
 80    pipe : mrsm.Pipe = None,
 81    patch: bool = False,
 82    debug: bool = False,
 83    **kw : Any
 84) -> SuccessTuple:
 85    """
 86    Persist a Pipe's parameters to its database.
 87
 88    Parameters
 89    ----------
 90    pipe: mrsm.Pipe, default None
 91        The pipe to be edited.
 92    patch: bool, default False
 93        If patch is `True`, update the existing parameters by cascading.
 94        Otherwise overwrite the parameters (default).
 95    debug: bool, default False
 96        Verbosity toggle.
 97    """
 98
 99    if pipe.id is None:
100        return False, f"{pipe} is not registered and cannot be edited."
101
102    from meerschaum.utils.packages import attempt_import
103    from meerschaum.utils.sql import json_flavors
104    if not patch:
105        parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {})
106    else:
107        from meerschaum import Pipe
108        from meerschaum.config._patch import apply_patch_to_config
109        original_parameters = Pipe(
110            pipe.connector_keys, pipe.metric_key, pipe.location_key,
111            mrsm_instance=pipe.instance_keys
112        ).parameters
113        parameters = apply_patch_to_config(
114            original_parameters,
115            pipe.parameters
116        )
117
118    ### ensure pipes table exists
119    from meerschaum.connectors.sql.tables import get_tables
120    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
121
122    import json
123    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
124
125    values = {
126        'parameters': (
127            json.dumps(parameters)
128            if self.flavor not in json_flavors
129            else parameters
130        ),
131    }
132    q = sqlalchemy.update(pipes_tbl).values(**values).where(
133        pipes_tbl.c.pipe_id == pipe.id
134    )
135
136    result = self.exec(q, debug=debug)
137    message = (
138        f"Successfully edited {pipe}."
139        if result is not None else f"Failed to edit {pipe}."
140    )
141    return (result is not None), message

Persist a Pipe's parameters to its database.

Parameters
  • pipe (mrsm.Pipe, default None): The pipe to be edited.
  • patch (bool, default False): If patch is True, update the existing parameters by cascading. Otherwise overwrite the parameters (default).
  • debug (bool, default False): Verbosity toggle.
def get_pipe_id(self, pipe: meerschaum.Pipe, debug: bool = False) -> Any:
1468def get_pipe_id(
1469    self,
1470    pipe: mrsm.Pipe,
1471    debug: bool = False,
1472) -> Any:
1473    """
1474    Get a Pipe's ID from the pipes table.
1475    """
1476    if pipe.temporary:
1477        return None
1478    from meerschaum.utils.packages import attempt_import
1479    sqlalchemy = attempt_import('sqlalchemy')
1480    from meerschaum.connectors.sql.tables import get_tables
1481    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
1482
1483    query = sqlalchemy.select(pipes_tbl.c.pipe_id).where(
1484        pipes_tbl.c.connector_keys == pipe.connector_keys
1485    ).where(
1486        pipes_tbl.c.metric_key == pipe.metric_key
1487    ).where(
1488        (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None
1489        else pipes_tbl.c.location_key.is_(None)
1490    )
1491    _id = self.value(query, debug=debug, silent=pipe.temporary)
1492    if _id is not None:
1493        _id = int(_id)
1494    return _id

Get a Pipe's ID from the pipes table.

def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, Any]:
1497def get_pipe_attributes(
1498    self,
1499    pipe: mrsm.Pipe,
1500    debug: bool = False,
1501) -> Dict[str, Any]:
1502    """
1503    Get a Pipe's attributes dictionary.
1504    """
1505    from meerschaum.connectors.sql.tables import get_tables
1506    from meerschaum.utils.packages import attempt_import
1507    sqlalchemy = attempt_import('sqlalchemy')
1508
1509    if pipe.get_id(debug=debug) is None:
1510        return {}
1511
1512    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
1513
1514    try:
1515        q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
1516        if debug:
1517            dprint(q)
1518        attributes = (
1519            dict(self.exec(q, silent=True, debug=debug).first()._mapping)
1520            if self.flavor != 'duckdb'
1521            else self.read(q, debug=debug).to_dict(orient='records')[0]
1522        )
1523    except Exception as e:
1524        import traceback
1525        traceback.print_exc()
1526        warn(e)
1527        print(pipe)
1528        return {}
1529
1530    ### handle non-PostgreSQL databases (text vs JSON)
1531    if not isinstance(attributes.get('parameters', None), dict):
1532        try:
1533            import json
1534            parameters = json.loads(attributes['parameters'])
1535            if isinstance(parameters, str) and parameters[0] == '{':
1536                parameters = json.loads(parameters)
1537            attributes['parameters'] = parameters
1538        except Exception:
1539            attributes['parameters'] = {}
1540
1541    return attributes

Get a Pipe's attributes dictionary.

def sync_pipe( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, str, Dict[Any, Any], None]' = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, _check_temporary_tables: bool = True, **kw: Any) -> Tuple[bool, str]:
1648def sync_pipe(
1649    self,
1650    pipe: mrsm.Pipe,
1651    df: Union[pd.DataFrame, str, Dict[Any, Any], None] = None,
1652    begin: Optional[datetime] = None,
1653    end: Optional[datetime] = None,
1654    chunksize: Optional[int] = -1,
1655    check_existing: bool = True,
1656    blocking: bool = True,
1657    debug: bool = False,
1658    _check_temporary_tables: bool = True,
1659    **kw: Any
1660) -> SuccessTuple:
1661    """
1662    Sync a pipe using a database connection.
1663
1664    Parameters
1665    ----------
1666    pipe: mrsm.Pipe
1667        The Meerschaum Pipe instance into which to sync the data.
1668
1669    df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
1670        An optional DataFrame or equivalent to sync into the pipe.
1671        Defaults to `None`.
1672
1673    begin: Optional[datetime], default None
1674        Optionally specify the earliest datetime to search for data.
1675        Defaults to `None`.
1676
1677    end: Optional[datetime], default None
1678        Optionally specify the latest datetime to search for data.
1679        Defaults to `None`.
1680
1681    chunksize: Optional[int], default -1
1682        Specify the number of rows to sync per chunk.
1683        If `-1`, resort to system configuration (default is `900`).
1684        A `chunksize` of `None` will sync all rows in one transaction.
1685        Defaults to `-1`.
1686
1687    check_existing: bool, default True
1688        If `True`, pull and diff with existing data from the pipe. Defaults to `True`.
1689
1690    blocking: bool, default True
1691        If `True`, wait for sync to finish and return its result, otherwise asyncronously sync.
1692        Defaults to `True`.
1693
1694    debug: bool, default False
1695        Verbosity toggle. Defaults to False.
1696
1697    kw: Any
1698        Catch-all for keyword arguments.
1699
1700    Returns
1701    -------
1702    A `SuccessTuple` of success (`bool`) and message (`str`).
1703    """
1704    from meerschaum.utils.packages import import_pandas
1705    from meerschaum.utils.sql import (
1706        get_update_queries,
1707        sql_item_name,
1708        UPDATE_QUERIES,
1709        get_reset_autoincrement_queries,
1710    )
1711    from meerschaum.utils.dtypes import are_dtypes_equal
1712    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
1713    from meerschaum import Pipe
1714    import time
1715    import copy
1716    pd = import_pandas()
1717    if df is None:
1718        msg = f"DataFrame is None. Cannot sync {pipe}."
1719        warn(msg)
1720        return False, msg
1721
1722    start = time.perf_counter()
1723    pipe_name = sql_item_name(pipe.target, self.flavor, schema=self.get_pipe_schema(pipe))
1724
1725    if not pipe.temporary and not pipe.get_id(debug=debug):
1726        register_tuple = pipe.register(debug=debug)
1727        if not register_tuple[0]:
1728            return register_tuple
1729
1730    ### df is the dataframe returned from the remote source
1731    ### via the connector
1732    if debug:
1733        dprint("Fetched data:\n" + str(df))
1734
1735    if not isinstance(df, pd.DataFrame):
1736        df = pipe.enforce_dtypes(
1737            df,
1738            chunksize=chunksize,
1739            safe_copy=kw.get('safe_copy', False),
1740            debug=debug,
1741        )
1742
1743    ### if table does not exist, create it with indices
1744    is_new = False
1745    if not pipe.exists(debug=debug):
1746        check_existing = False
1747        is_new = True
1748    else:
1749        ### Check for new columns.
1750        add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug)
1751        if add_cols_queries:
1752            _ = pipe.__dict__.pop('_columns_indices', None)
1753            _ = pipe.__dict__.pop('_columns_types', None)
1754            if not self.exec_queries(add_cols_queries, debug=debug):
1755                warn(f"Failed to add new columns to {pipe}.")
1756
1757        alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug)
1758        if alter_cols_queries:
1759            _ = pipe.__dict__.pop('_columns_indices', None)
1760            _ = pipe.__dict__.pop('_columns_types', None)
1761            if not self.exec_queries(alter_cols_queries, debug=debug):
1762                warn(f"Failed to alter columns for {pipe}.")
1763            else:
1764                _ = pipe.infer_dtypes(persist=True)
1765
1766    ### NOTE: Oracle SQL < 23c (2023) and SQLite does not support booleans,
1767    ### so infer bools and persist them to `dtypes`.
1768    if self.flavor in ('oracle', 'sqlite', 'mysql', 'mariadb'):
1769        pipe_dtypes = pipe.dtypes
1770        new_bool_cols = {
1771            col: 'bool[pyarrow]'
1772            for col, typ in df.dtypes.items()
1773            if col not in pipe_dtypes
1774            and are_dtypes_equal(str(typ), 'bool')
1775        }
1776        pipe_dtypes.update(new_bool_cols)
1777        pipe.dtypes = pipe_dtypes
1778        if new_bool_cols and not pipe.temporary:
1779            infer_bool_success, infer_bool_msg = pipe.edit(debug=debug)
1780            if not infer_bool_success:
1781                return infer_bool_success, infer_bool_msg
1782
1783    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES
1784    if upsert:
1785        check_existing = False
1786    kw['safe_copy'] = kw.get('safe_copy', False)
1787
1788    unseen_df, update_df, delta_df = (
1789        pipe.filter_existing(
1790            df,
1791            chunksize=chunksize,
1792            debug=debug,
1793            **kw
1794        ) if check_existing else (df, None, df)
1795    )
1796    if upsert:
1797        unseen_df, update_df, delta_df = (df.head(0), df, df)
1798
1799    if debug:
1800        dprint("Delta data:\n" + str(delta_df))
1801        dprint("Unseen data:\n" + str(unseen_df))
1802        if update_df is not None:
1803            dprint(("Update" if not upsert else "Upsert") + " data:\n" + str(update_df))
1804
1805    if_exists = kw.get('if_exists', 'append')
1806    if 'if_exists' in kw:
1807        kw.pop('if_exists')
1808    if 'name' in kw:
1809        kw.pop('name')
1810
1811    ### Insert new data into Pipe's table.
1812    unseen_kw = copy.deepcopy(kw)
1813    unseen_kw.update({
1814        'name': pipe.target,
1815        'if_exists': if_exists,
1816        'debug': debug,
1817        'as_dict': True,
1818        'safe_copy': kw.get('safe_copy', False),
1819        'chunksize': chunksize,
1820        'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True),
1821        'schema': self.get_pipe_schema(pipe),
1822    })
1823
1824    dt_col = pipe.columns.get('datetime', None)
1825    primary_key = pipe.columns.get('primary', None)
1826    autoincrement = (
1827        pipe.parameters.get('autoincrement', False)
1828        or (
1829            is_new
1830            and primary_key
1831            and primary_key
1832            not in pipe.dtypes
1833            and primary_key not in unseen_df.columns
1834        )
1835    )
1836    if autoincrement and autoincrement not in pipe.parameters:
1837        pipe.parameters['autoincrement'] = autoincrement
1838        edit_success, edit_msg = pipe.edit(debug=debug)
1839        if not edit_success:
1840            return edit_success, edit_msg
1841
1842    def _check_pk(_df_to_clear):
1843        if _df_to_clear is None:
1844            return
1845        if primary_key not in _df_to_clear.columns:
1846            return
1847        if not _df_to_clear[primary_key].notnull().any():
1848            del _df_to_clear[primary_key]
1849
1850    autoincrement_needs_reset = bool(
1851        autoincrement
1852        and primary_key
1853        and primary_key in unseen_df.columns
1854        and unseen_df[primary_key].notnull().any()
1855    )
1856    if autoincrement and primary_key:
1857        for _df_to_clear in (unseen_df, update_df, delta_df):
1858            _check_pk(_df_to_clear)
1859
1860    if is_new:
1861        create_success, create_msg = self.create_pipe_table_from_df(
1862            pipe,
1863            unseen_df,
1864            debug=debug,
1865        )
1866        if not create_success:
1867            return create_success, create_msg
1868
1869    do_identity_insert = bool(
1870        self.flavor in ('mssql',)
1871        and primary_key
1872        and primary_key in unseen_df.columns
1873        and autoincrement
1874    )
1875    stats = {'success': True, 'msg': ''}
1876    if len(unseen_df) > 0:
1877        with self.engine.connect() as connection:
1878            with connection.begin():
1879                if do_identity_insert:
1880                    identity_on_result = self.exec(
1881                        f"SET IDENTITY_INSERT {pipe_name} ON",
1882                        commit=False,
1883                        _connection=connection,
1884                        close=False,
1885                        debug=debug,
1886                    )
1887                    if identity_on_result is None:
1888                        return False, f"Could not enable identity inserts on {pipe}."
1889
1890                stats = self.to_sql(
1891                    unseen_df,
1892                    _connection=connection,
1893                    **unseen_kw
1894                )
1895
1896                if do_identity_insert:
1897                    identity_off_result = self.exec(
1898                        f"SET IDENTITY_INSERT {pipe_name} OFF",
1899                        commit=False,
1900                        _connection=connection,
1901                        close=False,
1902                        debug=debug,
1903                    )
1904                    if identity_off_result is None:
1905                        return False, f"Could not disable identity inserts on {pipe}."
1906
1907    if is_new:
1908        if not self.create_indices(pipe, debug=debug):
1909            warn(f"Failed to create indices for {pipe}. Continuing...")
1910
1911    if autoincrement_needs_reset:
1912        reset_autoincrement_queries = get_reset_autoincrement_queries(
1913            pipe.target,
1914            primary_key,
1915            self,
1916            schema=self.get_pipe_schema(pipe),
1917            debug=debug,
1918        )
1919        results = self.exec_queries(reset_autoincrement_queries, debug=debug)
1920        for result in results:
1921            if result is None:
1922                warn(f"Could not reset auto-incrementing primary key for {pipe}.", stack=False)
1923
1924    if update_df is not None and len(update_df) > 0:
1925        temp_target = self.get_temporary_target(
1926            pipe.target,
1927            label=('update' if not upsert else 'upsert'),
1928        )
1929        self._log_temporary_tables_creation(temp_target, create=(not pipe.temporary), debug=debug)
1930        temp_pipe = Pipe(
1931            pipe.connector_keys.replace(':', '_') + '_', pipe.metric_key, pipe.location_key,
1932            instance=pipe.instance_keys,
1933            columns={
1934                (ix_key if ix_key != 'primary' else 'primary_'): ix
1935                for ix_key, ix in pipe.columns.items()
1936                if ix and ix in update_df.columns
1937            },
1938            dtypes={
1939                col: typ
1940                for col, typ in pipe.dtypes.items()
1941                if col in update_df.columns
1942            },
1943            target=temp_target,
1944            temporary=True,
1945            enforce=False,
1946            static=True,
1947            autoincrement=False,
1948            parameters={
1949                'schema': self.internal_schema,
1950                'hypertable': False,
1951            },
1952        )
1953        temp_pipe.__dict__['_columns_types'] = {
1954            col: get_db_type_from_pd_type(
1955                pipe.dtypes.get(col, str(typ)),
1956                self.flavor,
1957            )
1958            for col, typ in update_df.dtypes.items()
1959        }
1960        now_ts = time.perf_counter()
1961        temp_pipe.__dict__['_columns_types_timestamp'] = now_ts
1962        temp_pipe.__dict__['_skip_check_indices'] = True
1963        temp_success, temp_msg = temp_pipe.sync(update_df, check_existing=False, debug=debug)
1964        if not temp_success:
1965            return temp_success, temp_msg
1966        existing_cols = pipe.get_columns_types(debug=debug)
1967        join_cols = [
1968            col
1969            for col_key, col in pipe.columns.items()
1970            if col and col in existing_cols
1971        ] if not primary_key or self.flavor == 'oracle' else (
1972            [dt_col, primary_key]
1973            if self.flavor == 'timescaledb' and dt_col and dt_col in update_df.columns
1974            else [primary_key]
1975        )
1976        update_queries = get_update_queries(
1977            pipe.target,
1978            temp_target,
1979            self,
1980            join_cols,
1981            upsert=upsert,
1982            schema=self.get_pipe_schema(pipe),
1983            patch_schema=self.internal_schema,
1984            datetime_col=(dt_col if dt_col in update_df.columns else None),
1985            identity_insert=(autoincrement and primary_key in update_df.columns),
1986            null_indices=pipe.null_indices,
1987            cast_columns=pipe.enforce,
1988            debug=debug,
1989        )
1990        update_results = self.exec_queries(
1991            update_queries,
1992            break_on_error=True,
1993            rollback=True,
1994            debug=debug,
1995        )
1996        update_success = all(update_results)
1997        self._log_temporary_tables_creation(
1998            temp_target,
1999            ready_to_drop=True,
2000            create=(not pipe.temporary),
2001            debug=debug,
2002        )
2003        if not update_success:
2004            warn(f"Failed to apply update to {pipe}.")
2005        stats['success'] = stats['success'] and update_success
2006        stats['msg'] = (
2007            (stats.get('msg', '') + f'\nFailed to apply update to {pipe}.').lstrip()
2008            if not update_success
2009            else stats.get('msg', '')
2010        )
2011
2012    stop = time.perf_counter()
2013    success = stats['success']
2014    if not success:
2015        return success, stats['msg'] or str(stats)
2016
2017    unseen_count = len(unseen_df.index) if unseen_df is not None else 0
2018    update_count = len(update_df.index) if update_df is not None else 0
2019    msg = (
2020        (
2021            f"Inserted {unseen_count:,}, "
2022            + f"updated {update_count:,} rows."
2023        )
2024        if not upsert
2025        else (
2026            f"Upserted {update_count:,} row"
2027            + ('s' if update_count != 1 else '')
2028            + "."
2029        )
2030    )
2031    if debug:
2032        msg = msg[:-1] + (
2033            f"\non table {sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))}\n"
2034            + f"in {round(stop - start, 2)} seconds."
2035        )
2036
2037    if _check_temporary_tables:
2038        drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables(
2039            refresh=False, debug=debug
2040        )
2041        if not drop_stale_success:
2042            warn(drop_stale_msg)
2043
2044    return success, msg

Sync a pipe using a database connection.

Parameters
  • pipe (mrsm.Pipe): The Meerschaum Pipe instance into which to sync the data.
  • df (Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]): An optional DataFrame or equivalent to sync into the pipe. Defaults to None.
  • begin (Optional[datetime], default None): Optionally specify the earliest datetime to search for data. Defaults to None.
  • end (Optional[datetime], default None): Optionally specify the latest datetime to search for data. Defaults to None.
  • chunksize (Optional[int], default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe. Defaults to True.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): Catch-all for keyword arguments.
Returns
  • A SuccessTuple of success (bool) and message (str).
def sync_pipe_inplace( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
2047def sync_pipe_inplace(
2048    self,
2049    pipe: 'mrsm.Pipe',
2050    params: Optional[Dict[str, Any]] = None,
2051    begin: Union[datetime, int, None] = None,
2052    end: Union[datetime, int, None] = None,
2053    chunksize: Optional[int] = -1,
2054    check_existing: bool = True,
2055    debug: bool = False,
2056    **kw: Any
2057) -> SuccessTuple:
2058    """
2059    If a pipe's connector is the same as its instance connector,
2060    it's more efficient to sync the pipe in-place rather than reading data into Pandas.
2061
2062    Parameters
2063    ----------
2064    pipe: mrsm.Pipe
2065        The pipe whose connector is the same as its instance.
2066
2067    params: Optional[Dict[str, Any]], default None
2068        Optional params dictionary to build the `WHERE` clause.
2069        See `meerschaum.utils.sql.build_where`.
2070
2071    begin: Union[datetime, int, None], default None
2072        Optionally specify the earliest datetime to search for data.
2073        Defaults to `None`.
2074
2075    end: Union[datetime, int, None], default None
2076        Optionally specify the latest datetime to search for data.
2077        Defaults to `None`.
2078
2079    chunksize: Optional[int], default -1
2080        Specify the number of rows to sync per chunk.
2081        If `-1`, resort to system configuration (default is `900`).
2082        A `chunksize` of `None` will sync all rows in one transaction.
2083        Defaults to `-1`.
2084
2085    check_existing: bool, default True
2086        If `True`, pull and diff with existing data from the pipe.
2087
2088    debug: bool, default False
2089        Verbosity toggle.
2090
2091    Returns
2092    -------
2093    A SuccessTuple.
2094    """
2095    if self.flavor == 'duckdb':
2096        return pipe.sync(
2097            params=params,
2098            begin=begin,
2099            end=end,
2100            chunksize=chunksize,
2101            check_existing=check_existing,
2102            debug=debug,
2103            _inplace=False,
2104            **kw
2105        )
2106    from meerschaum.utils.sql import (
2107        sql_item_name,
2108        get_update_queries,
2109        get_null_replacement,
2110        get_create_table_queries,
2111        get_create_schema_if_not_exists_queries,
2112        get_table_cols_types,
2113        session_execute,
2114        dateadd_str,
2115        UPDATE_QUERIES,
2116    )
2117    from meerschaum.utils.dtypes.sql import (
2118        get_pd_type_from_db_type,
2119        get_db_type_from_pd_type,
2120    )
2121    from meerschaum.utils.misc import generate_password
2122
2123    transaction_id_length = (
2124        mrsm.get_config(
2125            'system', 'connectors', 'sql', 'instance', 'temporary_target', 'transaction_id_length'
2126        )
2127    )
2128    transact_id = generate_password(transaction_id_length)
2129
2130    internal_schema = self.internal_schema
2131    target = pipe.target
2132    temp_table_roots = ['backtrack', 'new', 'delta', 'joined', 'unseen', 'update']
2133    temp_tables = {
2134        table_root: self.get_temporary_target(target, transact_id=transact_id, label=table_root)
2135        for table_root in temp_table_roots
2136    }
2137    temp_table_names = {
2138        table_root: sql_item_name(table_name_raw, self.flavor, internal_schema)
2139        for table_root, table_name_raw in temp_tables.items()
2140    }
2141    temp_table_aliases = {
2142        table_root: sql_item_name(table_root, self.flavor)
2143        for table_root in temp_table_roots
2144    }
2145    table_alias_as = " AS" if self.flavor != 'oracle' else ''
2146    metadef = self.get_pipe_metadef(
2147        pipe,
2148        params=params,
2149        begin=begin,
2150        end=end,
2151        check_existing=check_existing,
2152        debug=debug,
2153    )
2154    pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
2155    upsert = pipe.parameters.get('upsert', False) and f'{self.flavor}-upsert' in UPDATE_QUERIES
2156    static = pipe.parameters.get('static', False)
2157    database = getattr(self, 'database', self.parse_uri(self.URI).get('database', None))
2158    primary_key = pipe.columns.get('primary', None)
2159    primary_key_typ = pipe.dtypes.get(primary_key, None) if primary_key else None
2160    primary_key_db_type = (
2161        get_db_type_from_pd_type(primary_key_typ, self.flavor)
2162        if primary_key_typ
2163        else None
2164    )
2165    autoincrement = pipe.parameters.get('autoincrement', False)
2166    dt_col = pipe.columns.get('datetime', None)
2167    dt_col_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
2168    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
2169    dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
2170
2171    def clean_up_temp_tables(ready_to_drop: bool = False):
2172        log_success, log_msg = self._log_temporary_tables_creation(
2173            [
2174                table
2175                for table in temp_tables.values()
2176            ] if not upsert else [temp_tables['update']],
2177            ready_to_drop=ready_to_drop,
2178            create=(not pipe.temporary),
2179            debug=debug,
2180        )
2181        if not log_success:
2182            warn(log_msg)
2183        drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables(
2184            refresh=False,
2185            debug=debug,
2186        )
2187        if not drop_stale_success:
2188            warn(drop_stale_msg)
2189        return drop_stale_success, drop_stale_msg
2190
2191    sqlalchemy, sqlalchemy_orm = mrsm.attempt_import(
2192        'sqlalchemy',
2193        'sqlalchemy.orm',
2194    )
2195    if not pipe.exists(debug=debug):
2196        schema = self.get_pipe_schema(pipe)
2197        create_pipe_queries = get_create_table_queries(
2198            metadef,
2199            pipe.target,
2200            self.flavor,
2201            schema=schema,
2202            primary_key=primary_key,
2203            primary_key_db_type=primary_key_db_type,
2204            autoincrement=autoincrement,
2205            datetime_column=dt_col,
2206        )
2207        if schema:
2208            create_pipe_queries = (
2209                get_create_schema_if_not_exists_queries(schema, self.flavor)
2210                + create_pipe_queries
2211            )
2212
2213        results = self.exec_queries(create_pipe_queries, debug=debug)
2214        if not all(results):
2215            _ = clean_up_temp_tables()
2216            return False, f"Could not insert new data into {pipe} from its SQL query definition."
2217
2218        if not self.create_indices(pipe, debug=debug):
2219            warn(f"Failed to create indices for {pipe}. Continuing...")
2220
2221        rowcount = pipe.get_rowcount(debug=debug)
2222        _ = clean_up_temp_tables()
2223        return True, f"Inserted {rowcount:,}, updated 0 rows."
2224
2225    session = sqlalchemy_orm.Session(self.engine)
2226    connectable = session if self.flavor != 'duckdb' else self
2227
2228    create_new_query = get_create_table_queries(
2229        metadef,
2230        temp_tables[('new') if not upsert else 'update'],
2231        self.flavor,
2232        schema=internal_schema,
2233    )[0]
2234    (create_new_success, create_new_msg), create_new_results = session_execute(
2235        session,
2236        create_new_query,
2237        with_results=True,
2238        debug=debug,
2239    )
2240    if not create_new_success:
2241        _ = clean_up_temp_tables()
2242        return create_new_success, create_new_msg
2243    new_count = create_new_results[0].rowcount if create_new_results else 0
2244
2245    new_cols_types = get_table_cols_types(
2246        temp_tables[('new' if not upsert else 'update')],
2247        connectable=connectable,
2248        flavor=self.flavor,
2249        schema=internal_schema,
2250        database=database,
2251        debug=debug,
2252    ) if not static else pipe.get_columns_types(debug=debug)
2253    if not new_cols_types:
2254        return False, f"Failed to get new columns for {pipe}."
2255
2256    new_cols = {
2257        str(col_name): get_pd_type_from_db_type(str(col_type))
2258        for col_name, col_type in new_cols_types.items()
2259    }
2260    new_cols_str = '\n    ' + ',\n    '.join([
2261        sql_item_name(col, self.flavor)
2262        for col in new_cols
2263    ])
2264    def get_col_typ(col: str, cols_types: Dict[str, str]) -> str:
2265        if self.flavor == 'oracle' and new_cols_types.get(col, '').lower() == 'char':
2266            return new_cols_types[col]
2267        return cols_types[col]
2268
2269    add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug)
2270    if add_cols_queries:
2271        _ = pipe.__dict__.pop('_columns_types', None)
2272        _ = pipe.__dict__.pop('_columns_indices', None)
2273        self.exec_queries(add_cols_queries, debug=debug)
2274
2275    alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug)
2276    if alter_cols_queries:
2277        _ = pipe.__dict__.pop('_columns_types', None)
2278        self.exec_queries(alter_cols_queries, debug=debug)
2279
2280    insert_queries = [
2281        (
2282            f"INSERT INTO {pipe_name} ({new_cols_str})\n"
2283            f"SELECT {new_cols_str}\nFROM {temp_table_names['new']}{table_alias_as}"
2284            f" {temp_table_aliases['new']}"
2285        )
2286    ] if not check_existing and not upsert else []
2287
2288    new_queries = insert_queries
2289    new_success, new_msg = (
2290        session_execute(session, new_queries, debug=debug)
2291        if new_queries
2292        else (True, "Success")
2293    )
2294    if not new_success:
2295        _ = clean_up_temp_tables()
2296        return new_success, new_msg
2297
2298    if not check_existing:
2299        session.commit()
2300        _ = clean_up_temp_tables()
2301        return True, f"Inserted {new_count}, updated 0 rows."
2302
2303    min_dt_col_name_da = dateadd_str(
2304        flavor=self.flavor, begin=f"MIN({dt_col_name})", db_type=dt_db_type,
2305    )
2306    max_dt_col_name_da = dateadd_str(
2307        flavor=self.flavor, begin=f"MAX({dt_col_name})", db_type=dt_db_type,
2308    )
2309
2310    (new_dt_bounds_success, new_dt_bounds_msg), new_dt_bounds_results = session_execute(
2311        session,
2312        [
2313            "SELECT\n"
2314            f"    {min_dt_col_name_da} AS {sql_item_name('min_dt', self.flavor)},\n"
2315            f"    {max_dt_col_name_da} AS {sql_item_name('max_dt', self.flavor)}\n"
2316            f"FROM {temp_table_names['new' if not upsert else 'update']}\n"
2317            f"WHERE {dt_col_name} IS NOT NULL"
2318        ],
2319        with_results=True,
2320        debug=debug,
2321    ) if dt_col and not upsert else ((True, "Success"), None)
2322    if not new_dt_bounds_success:
2323        return (
2324            new_dt_bounds_success,
2325            f"Could not determine in-place datetime bounds:\n{new_dt_bounds_msg}"
2326        )
2327
2328    if dt_col and not upsert:
2329        begin, end = new_dt_bounds_results[0].fetchone()
2330
2331    backtrack_def = self.get_pipe_data_query(
2332        pipe,
2333        begin=begin,
2334        end=end,
2335        begin_add_minutes=0,
2336        end_add_minutes=1,
2337        params=params,
2338        debug=debug,
2339        order=None,
2340    )
2341    create_backtrack_query = get_create_table_queries(
2342        backtrack_def,
2343        temp_tables['backtrack'],
2344        self.flavor,
2345        schema=internal_schema,
2346    )[0]
2347    (create_backtrack_success, create_backtrack_msg), create_backtrack_results = session_execute(
2348        session,
2349        create_backtrack_query,
2350        with_results=True,
2351        debug=debug,
2352    ) if not upsert else ((True, "Success"), None)
2353
2354    if not create_backtrack_success:
2355        _ = clean_up_temp_tables()
2356        return create_backtrack_success, create_backtrack_msg
2357
2358    backtrack_cols_types = get_table_cols_types(
2359        temp_tables['backtrack'],
2360        connectable=connectable,
2361        flavor=self.flavor,
2362        schema=internal_schema,
2363        database=database,
2364        debug=debug,
2365    ) if not (upsert or static) else new_cols_types
2366
2367    common_cols = [col for col in new_cols if col in backtrack_cols_types]
2368    primary_key = pipe.columns.get('primary', None)
2369    on_cols = {
2370        col: new_cols.get(col)
2371        for col_key, col in pipe.columns.items()
2372        if (
2373            col
2374            and
2375            col_key != 'value'
2376            and col in backtrack_cols_types
2377            and col in new_cols
2378        )
2379    } if not primary_key or self.flavor == 'oracle' else {primary_key: new_cols.get(primary_key)}
2380
2381    null_replace_new_cols_str = (
2382        '\n    ' + ',\n    '.join([
2383            f"COALESCE({temp_table_aliases['new']}.{sql_item_name(col, self.flavor)}, "
2384            + get_null_replacement(get_col_typ(col, new_cols_types), self.flavor)
2385            + ") AS "
2386            + sql_item_name(col, self.flavor, None)
2387            for col, typ in new_cols.items()
2388        ])
2389    )
2390
2391    select_delta_query = (
2392        "SELECT"
2393        + null_replace_new_cols_str
2394        + f"\nFROM {temp_table_names['new']}{table_alias_as} {temp_table_aliases['new']}\n"
2395        + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as} {temp_table_aliases['backtrack']}"
2396        + "\n    ON\n    "
2397        + '\n    AND\n    '.join([
2398            (
2399                f"    COALESCE({temp_table_aliases['new']}."
2400                + sql_item_name(c, self.flavor, None)
2401                + ", "
2402                + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor)
2403                + ")"
2404                + '\n        =\n    '
2405                + f"    COALESCE({temp_table_aliases['backtrack']}."
2406                + sql_item_name(c, self.flavor, None)
2407                + ", "
2408                + get_null_replacement(get_col_typ(c, backtrack_cols_types), self.flavor)
2409                + ") "
2410            ) for c in common_cols
2411        ])
2412        + "\nWHERE\n    "
2413        + '\n    AND\n    '.join([
2414            (
2415                f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) + ' IS NULL'
2416            ) for c in common_cols
2417        ])
2418    )
2419    create_delta_query = get_create_table_queries(
2420        select_delta_query,
2421        temp_tables['delta'],
2422        self.flavor,
2423        schema=internal_schema,
2424    )[0]
2425    create_delta_success, create_delta_msg = session_execute(
2426        session,
2427        create_delta_query,
2428        debug=debug,
2429    ) if not upsert else (True, "Success")
2430    if not create_delta_success:
2431        _ = clean_up_temp_tables()
2432        return create_delta_success, create_delta_msg
2433
2434    delta_cols_types = get_table_cols_types(
2435        temp_tables['delta'],
2436        connectable=connectable,
2437        flavor=self.flavor,
2438        schema=internal_schema,
2439        database=database,
2440        debug=debug,
2441    ) if not (upsert or static) else new_cols_types
2442
2443    ### This is a weird bug on SQLite.
2444    ### Sometimes the backtrack dtypes are all empty strings.
2445    if not all(delta_cols_types.values()):
2446        delta_cols_types = new_cols_types
2447
2448    delta_cols = {
2449        col: get_pd_type_from_db_type(typ)
2450        for col, typ in delta_cols_types.items()
2451    }
2452    delta_cols_str = ', '.join([
2453        sql_item_name(col, self.flavor)
2454        for col in delta_cols
2455    ])
2456
2457    select_joined_query = (
2458        "SELECT\n    "
2459        + (',\n    '.join([
2460            (
2461                f"{temp_table_aliases['delta']}." + sql_item_name(c, self.flavor, None)
2462                + " AS " + sql_item_name(c + '_delta', self.flavor, None)
2463            ) for c in delta_cols
2464        ]))
2465        + ",\n    "
2466        + (',\n    '.join([
2467            (
2468                f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor, None)
2469                + " AS " + sql_item_name(c + '_backtrack', self.flavor, None)
2470            ) for c in backtrack_cols_types
2471        ]))
2472        + f"\nFROM {temp_table_names['delta']}{table_alias_as} {temp_table_aliases['delta']}\n"
2473        + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as}"
2474        + f" {temp_table_aliases['backtrack']}"
2475        + "\n    ON\n    "
2476        + '\n    AND\n    '.join([
2477            (
2478                f"    COALESCE({temp_table_aliases['delta']}." + sql_item_name(c, self.flavor)
2479                + ", "
2480                + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")"
2481                + '\n        =\n    '
2482                + f"    COALESCE({temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor)
2483                + ", "
2484                + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")"
2485            ) for c, typ in on_cols.items()
2486        ])
2487    )
2488
2489    create_joined_query = get_create_table_queries(
2490        select_joined_query,
2491        temp_tables['joined'],
2492        self.flavor,
2493        schema=internal_schema,
2494    )[0]
2495    create_joined_success, create_joined_msg = session_execute(
2496        session,
2497        create_joined_query,
2498        debug=debug,
2499    ) if on_cols and not upsert else (True, "Success")
2500    if not create_joined_success:
2501        _ = clean_up_temp_tables()
2502        return create_joined_success, create_joined_msg
2503
2504    select_unseen_query = (
2505        "SELECT\n    "
2506        + (',\n    '.join([
2507            (
2508                "CASE\n        WHEN " + sql_item_name(c + '_delta', self.flavor, None)
2509                + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor)
2510                + " THEN " + sql_item_name(c + '_delta', self.flavor, None)
2511                + "\n        ELSE NULL\n    END"
2512                + " AS " + sql_item_name(c, self.flavor, None)
2513            ) for c, typ in delta_cols.items()
2514        ]))
2515        + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n"
2516        + "WHERE\n    "
2517        + '\n    AND\n    '.join([
2518            (
2519                sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NULL'
2520            ) for c in delta_cols
2521        ])
2522    )
2523    create_unseen_query = get_create_table_queries(
2524        select_unseen_query,
2525        temp_tables['unseen'],
2526        self.flavor,
2527        internal_schema,
2528    )[0]
2529    (create_unseen_success, create_unseen_msg), create_unseen_results = session_execute(
2530        session,
2531        create_unseen_query,
2532        with_results=True,
2533        debug=debug
2534    ) if not upsert else ((True, "Success"), None)
2535    if not create_unseen_success:
2536        _ = clean_up_temp_tables()
2537        return create_unseen_success, create_unseen_msg
2538
2539    select_update_query = (
2540        "SELECT\n    "
2541        + (',\n    '.join([
2542            (
2543                "CASE\n        WHEN " + sql_item_name(c + '_delta', self.flavor, None)
2544                + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor)
2545                + " THEN " + sql_item_name(c + '_delta', self.flavor, None)
2546                + "\n        ELSE NULL\n    END"
2547                + " AS " + sql_item_name(c, self.flavor, None)
2548            ) for c, typ in delta_cols.items()
2549        ]))
2550        + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n"
2551        + "WHERE\n    "
2552        + '\n    OR\n    '.join([
2553            (
2554                sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NOT NULL'
2555            ) for c in delta_cols
2556        ])
2557    )
2558
2559    create_update_query = get_create_table_queries(
2560        select_update_query,
2561        temp_tables['update'],
2562        self.flavor,
2563        internal_schema,
2564    )[0]
2565    (create_update_success, create_update_msg), create_update_results = session_execute(
2566        session,
2567        create_update_query,
2568        with_results=True,
2569        debug=debug,
2570    ) if on_cols and not upsert else ((True, "Success"), [])
2571    apply_update_queries = (
2572        get_update_queries(
2573            pipe.target,
2574            temp_tables['update'],
2575            session,
2576            on_cols,
2577            upsert=upsert,
2578            schema=self.get_pipe_schema(pipe),
2579            patch_schema=internal_schema,
2580            datetime_col=pipe.columns.get('datetime', None),
2581            flavor=self.flavor,
2582            null_indices=pipe.null_indices,
2583            cast_columns=pipe.enforce,
2584            debug=debug,
2585        )
2586        if on_cols else []
2587    )
2588
2589    apply_unseen_queries = [
2590        (
2591            f"INSERT INTO {pipe_name} ({delta_cols_str})\n"
2592            + f"SELECT {delta_cols_str}\nFROM "
2593            + (
2594                temp_table_names['unseen']
2595                if on_cols
2596                else temp_table_names['delta']
2597            )
2598        ),
2599    ]
2600
2601    (apply_unseen_success, apply_unseen_msg), apply_unseen_results = session_execute(
2602        session,
2603        apply_unseen_queries,
2604        with_results=True,
2605        debug=debug,
2606    ) if not upsert else ((True, "Success"), None)
2607    if not apply_unseen_success:
2608        _ = clean_up_temp_tables()
2609        return apply_unseen_success, apply_unseen_msg
2610    unseen_count = apply_unseen_results[0].rowcount if apply_unseen_results else 0
2611
2612    (apply_update_success, apply_update_msg), apply_update_results = session_execute(
2613        session,
2614        apply_update_queries,
2615        with_results=True,
2616        debug=debug,
2617    )
2618    if not apply_update_success:
2619        _ = clean_up_temp_tables()
2620        return apply_update_success, apply_update_msg
2621    update_count = apply_update_results[0].rowcount if apply_update_results else 0
2622
2623    session.commit()
2624
2625    msg = (
2626        f"Inserted {unseen_count:,}, updated {update_count:,} rows."
2627        if not upsert
2628        else f"Upserted {update_count:,} row" + ('s' if update_count != 1 else '') + "."
2629    )
2630    _ = clean_up_temp_tables(ready_to_drop=True)
2631
2632    return True, msg

If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.

Parameters
  • pipe (mrsm.Pipe): The pipe whose connector is the same as its instance.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.
  • begin (Union[datetime, int, None], default None): Optionally specify the earliest datetime to search for data. Defaults to None.
  • end (Union[datetime, int, None], default None): Optionally specify the latest datetime to search for data. Defaults to None.
  • chunksize (Optional[int], default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple.
def get_sync_time( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, newest: bool = True, remote: bool = False, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
2635def get_sync_time(
2636    self,
2637    pipe: 'mrsm.Pipe',
2638    params: Optional[Dict[str, Any]] = None,
2639    newest: bool = True,
2640    remote: bool = False,
2641    debug: bool = False,
2642) -> Union[datetime, int, None]:
2643    """Get a Pipe's most recent datetime value.
2644
2645    Parameters
2646    ----------
2647    pipe: mrsm.Pipe
2648        The pipe to get the sync time for.
2649
2650    params: Optional[Dict[str, Any]], default None
2651        Optional params dictionary to build the `WHERE` clause.
2652        See `meerschaum.utils.sql.build_where`.
2653
2654    newest: bool, default True
2655        If `True`, get the most recent datetime (honoring `params`).
2656        If `False`, get the oldest datetime (ASC instead of DESC).
2657
2658    remote: bool, default False
2659        If `True`, return the sync time for the remote fetch definition.
2660
2661    Returns
2662    -------
2663    A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`.
2664    """
2665    from meerschaum.utils.sql import sql_item_name, build_where, wrap_query_with_cte
2666    src_name = sql_item_name('src', self.flavor)
2667    table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
2668
2669    dt_col = pipe.columns.get('datetime', None)
2670    if dt_col is None:
2671        return None
2672    dt_col_name = sql_item_name(dt_col, self.flavor, None)
2673
2674    if remote and pipe.connector.type != 'sql':
2675        warn(f"Cannot get the remote sync time for {pipe}.")
2676        return None
2677
2678    ASC_or_DESC = "DESC" if newest else "ASC"
2679    existing_cols = pipe.get_columns_types(debug=debug)
2680    valid_params = {}
2681    if params is not None:
2682        valid_params = {k: v for k, v in params.items() if k in existing_cols}
2683    flavor = self.flavor if not remote else pipe.connector.flavor
2684
2685    ### If no bounds are provided for the datetime column,
2686    ### add IS NOT NULL to the WHERE clause.
2687    if dt_col not in valid_params:
2688        valid_params[dt_col] = '_None'
2689    where = "" if not valid_params else build_where(valid_params, self)
2690    src_query = (
2691        f"SELECT {dt_col_name}\nFROM {table_name}{where}"
2692        if not remote
2693        else self.get_pipe_metadef(pipe, params=params, begin=None, end=None)
2694    )
2695
2696    base_query = (
2697        f"SELECT {dt_col_name}\n"
2698        f"FROM {src_name}{where}\n"
2699        f"ORDER BY {dt_col_name} {ASC_or_DESC}\n"
2700        f"LIMIT 1"
2701    )
2702    if self.flavor == 'mssql':
2703        base_query = (
2704            f"SELECT TOP 1 {dt_col_name}\n"
2705            f"FROM {src_name}{where}\n"
2706            f"ORDER BY {dt_col_name} {ASC_or_DESC}"
2707        )
2708    elif self.flavor == 'oracle':
2709        base_query = (
2710            "SELECT * FROM (\n"
2711            f"    SELECT {dt_col_name}\n"
2712            f"    FROM {src_name}{where}\n"
2713            f"    ORDER BY {dt_col_name} {ASC_or_DESC}\n"
2714            ") WHERE ROWNUM = 1"
2715        )
2716
2717    query = wrap_query_with_cte(src_query, base_query, flavor)
2718
2719    try:
2720        db_time = self.value(query, silent=True, debug=debug)
2721
2722        ### No datetime could be found.
2723        if db_time is None:
2724            return None
2725        ### sqlite returns str.
2726        if isinstance(db_time, str):
2727            dateutil_parser = mrsm.attempt_import('dateutil.parser')
2728            st = dateutil_parser.parse(db_time)
2729        ### Do nothing if a datetime object is returned.
2730        elif isinstance(db_time, datetime):
2731            if hasattr(db_time, 'to_pydatetime'):
2732                st = db_time.to_pydatetime()
2733            else:
2734                st = db_time
2735        ### Sometimes the datetime is actually a date.
2736        elif isinstance(db_time, date):
2737            st = datetime.combine(db_time, datetime.min.time())
2738        ### Adding support for an integer datetime axis.
2739        elif 'int' in str(type(db_time)).lower():
2740            st = int(db_time)
2741        ### Convert pandas timestamp to Python datetime.
2742        else:
2743            st = db_time.to_pydatetime()
2744
2745        sync_time = st
2746
2747    except Exception as e:
2748        sync_time = None
2749        warn(str(e))
2750
2751    return sync_time

Get a Pipe's most recent datetime value.

Parameters
  • pipe (mrsm.Pipe): The pipe to get the sync time for.
  • params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • remote (bool, default False): If True, return the sync time for the remote fetch definition.
Returns
  • A datetime object (or int if using an integer axis) if the pipe exists, otherwise None.
def pipe_exists(self, pipe: meerschaum.Pipe, debug: bool = False) -> bool:
2754def pipe_exists(
2755    self,
2756    pipe: mrsm.Pipe,
2757    debug: bool = False
2758) -> bool:
2759    """
2760    Check that a Pipe's table exists.
2761
2762    Parameters
2763    ----------
2764    pipe: mrsm.Pipe:
2765        The pipe to check.
2766
2767    debug: bool, default False
2768        Verbosity toggle.
2769
2770    Returns
2771    -------
2772    A `bool` corresponding to whether a pipe's table exists.
2773
2774    """
2775    from meerschaum.utils.sql import table_exists
2776    exists = table_exists(
2777        pipe.target,
2778        self,
2779        schema=self.get_pipe_schema(pipe),
2780        debug=debug,
2781    )
2782    if debug:
2783        from meerschaum.utils.debug import dprint
2784        dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.'))
2785    return exists

Check that a Pipe's table exists.

Parameters
  • pipe (mrsm.Pipe:): The pipe to check.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's table exists.
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> Optional[int]:
2788def get_pipe_rowcount(
2789    self,
2790    pipe: mrsm.Pipe,
2791    begin: Union[datetime, int, None] = None,
2792    end: Union[datetime, int, None] = None,
2793    params: Optional[Dict[str, Any]] = None,
2794    remote: bool = False,
2795    debug: bool = False
2796) -> Union[int, None]:
2797    """
2798    Get the rowcount for a pipe in accordance with given parameters.
2799
2800    Parameters
2801    ----------
2802    pipe: mrsm.Pipe
2803        The pipe to query with.
2804
2805    begin: Union[datetime, int, None], default None
2806        The begin datetime value.
2807
2808    end: Union[datetime, int, None], default None
2809        The end datetime value.
2810
2811    params: Optional[Dict[str, Any]], default None
2812        See `meerschaum.utils.sql.build_where`.
2813
2814    remote: bool, default False
2815        If `True`, get the rowcount for the remote table.
2816
2817    debug: bool, default False
2818        Verbosity toggle.
2819
2820    Returns
2821    -------
2822    An `int` for the number of rows if the `pipe` exists, otherwise `None`.
2823
2824    """
2825    from meerschaum.utils.sql import dateadd_str, sql_item_name, wrap_query_with_cte, build_where
2826    from meerschaum.connectors.sql._fetch import get_pipe_query
2827    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
2828    if remote:
2829        msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount."
2830        if 'fetch' not in pipe.parameters:
2831            error(msg)
2832            return None
2833        if 'definition' not in pipe.parameters['fetch']:
2834            error(msg)
2835            return None
2836
2837
2838    flavor = self.flavor if not remote else pipe.connector.flavor
2839    conn = self if not remote else pipe.connector
2840    _pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe))
2841    dt_col = pipe.columns.get('datetime', None)
2842    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
2843    dt_db_type = get_db_type_from_pd_type(dt_typ, flavor) if dt_typ else None
2844    if not dt_col:
2845        dt_col = pipe.guess_datetime()
2846        dt_name = sql_item_name(dt_col, flavor, None) if dt_col else None
2847        is_guess = True
2848    else:
2849        dt_col = pipe.get_columns('datetime')
2850        dt_name = sql_item_name(dt_col, flavor, None)
2851        is_guess = False
2852
2853    if begin is not None or end is not None:
2854        if is_guess:
2855            if dt_col is None:
2856                warn(
2857                    f"No datetime could be determined for {pipe}."
2858                    + "\n    Ignoring begin and end...",
2859                    stack=False,
2860                )
2861                begin, end = None, None
2862            else:
2863                warn(
2864                    f"A datetime wasn't specified for {pipe}.\n"
2865                    + f"    Using column \"{dt_col}\" for datetime bounds...",
2866                    stack=False,
2867                )
2868
2869
2870    _datetime_name = sql_item_name(dt_col, flavor)
2871    _cols_names = [
2872        sql_item_name(col, flavor)
2873        for col in set(
2874            (
2875                [dt_col]
2876                if dt_col
2877                else []
2878            ) + (
2879                []
2880                if params is None
2881                else list(params.keys())
2882            )
2883        )
2884    ]
2885    if not _cols_names:
2886        _cols_names = ['*']
2887
2888    src = (
2889        f"SELECT {', '.join(_cols_names)}\nFROM {_pipe_name}"
2890        if not remote
2891        else get_pipe_query(pipe)
2892    )
2893    parent_query = f"SELECT COUNT(*)\nFROM {sql_item_name('src', flavor)}"
2894    query = wrap_query_with_cte(src, parent_query, flavor)
2895    if begin is not None or end is not None:
2896        query += "\nWHERE"
2897    if begin is not None:
2898        query += (
2899            f"\n    {dt_name} >= "
2900            + dateadd_str(flavor, datepart='minute', number=0, begin=begin, db_type=dt_db_type)
2901        )
2902    if end is not None and begin is not None:
2903        query += "\n    AND"
2904    if end is not None:
2905        query += (
2906            f"\n    {dt_name} <  "
2907            + dateadd_str(flavor, datepart='minute', number=0, begin=end, db_type=dt_db_type)
2908        )
2909    if params is not None:
2910        existing_cols = pipe.get_columns_types(debug=debug)
2911        valid_params = {k: v for k, v in params.items() if k in existing_cols}
2912        if valid_params:
2913            query += build_where(valid_params, conn).replace('WHERE', (
2914                'AND' if (begin is not None or end is not None)
2915                    else 'WHERE'
2916                )
2917            )
2918
2919    result = conn.value(query, debug=debug, silent=True)
2920    try:
2921        return int(result)
2922    except Exception:
2923        return None

Get the rowcount for a pipe in accordance with given parameters.

Parameters
  • pipe (mrsm.Pipe): The pipe to query with.
  • begin (Union[datetime, int, None], default None): The begin datetime value.
  • end (Union[datetime, int, None], default None): The end datetime value.
  • params (Optional[Dict[str, Any]], default None): See meerschaum.utils.sql.build_where.
  • remote (bool, default False): If True, get the rowcount for the remote table.
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int for the number of rows if the pipe exists, otherwise None.
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw) -> Tuple[bool, str]:
2926def drop_pipe(
2927    self,
2928    pipe: mrsm.Pipe,
2929    debug: bool = False,
2930    **kw
2931) -> SuccessTuple:
2932    """
2933    Drop a pipe's tables but maintain its registration.
2934
2935    Parameters
2936    ----------
2937    pipe: mrsm.Pipe
2938        The pipe to drop.
2939
2940    Returns
2941    -------
2942    A `SuccessTuple` indicated success.
2943    """
2944    from meerschaum.utils.sql import table_exists, sql_item_name, DROP_IF_EXISTS_FLAVORS
2945    success = True
2946    target = pipe.target
2947    schema = self.get_pipe_schema(pipe)
2948    target_name = (
2949        sql_item_name(target, self.flavor, schema)
2950    )
2951    if table_exists(target, self, schema=schema, debug=debug):
2952        if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
2953        success = self.exec(
2954            f"DROP TABLE {if_exists_str} {target_name}", silent=True, debug=debug
2955        ) is not None
2956
2957    msg = "Success" if success else f"Failed to drop {pipe}."
2958    return success, msg

Drop a pipe's tables but maintain its registration.

Parameters
  • pipe (mrsm.Pipe): The pipe to drop.
Returns
  • A SuccessTuple indicated success.
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) -> Tuple[bool, str]:
2961def clear_pipe(
2962    self,
2963    pipe: mrsm.Pipe,
2964    begin: Union[datetime, int, None] = None,
2965    end: Union[datetime, int, None] = None,
2966    params: Optional[Dict[str, Any]] = None,
2967    debug: bool = False,
2968    **kw
2969) -> SuccessTuple:
2970    """
2971    Delete a pipe's data within a bounded or unbounded interval without dropping the table.
2972
2973    Parameters
2974    ----------
2975    pipe: mrsm.Pipe
2976        The pipe to clear.
2977        
2978    begin: Union[datetime, int, None], default None
2979        Beginning datetime. Inclusive.
2980
2981    end: Union[datetime, int, None], default None
2982         Ending datetime. Exclusive.
2983
2984    params: Optional[Dict[str, Any]], default None
2985         See `meerschaum.utils.sql.build_where`.
2986
2987    """
2988    if not pipe.exists(debug=debug):
2989        return True, f"{pipe} does not exist, so nothing was cleared."
2990
2991    from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str
2992    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
2993    pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
2994
2995    dt_col = pipe.columns.get('datetime', None)
2996    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
2997    dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
2998    if not pipe.columns.get('datetime', None):
2999        dt_col = pipe.guess_datetime()
3000        dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
3001        is_guess = True
3002    else:
3003        dt_col = pipe.get_columns('datetime')
3004        dt_name = sql_item_name(dt_col, self.flavor, None)
3005        is_guess = False
3006
3007    if begin is not None or end is not None:
3008        if is_guess:
3009            if dt_col is None:
3010                warn(
3011                    f"No datetime could be determined for {pipe}."
3012                    + "\n    Ignoring datetime bounds...",
3013                    stack=False,
3014                )
3015                begin, end = None, None
3016            else:
3017                warn(
3018                    f"A datetime wasn't specified for {pipe}.\n"
3019                    + f"    Using column \"{dt_col}\" for datetime bounds...",
3020                    stack=False,
3021                )
3022
3023    valid_params = {}
3024    if params is not None:
3025        existing_cols = pipe.get_columns_types(debug=debug)
3026        valid_params = {k: v for k, v in params.items() if k in existing_cols}
3027    clear_query = (
3028        f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n"
3029        + ('\n    AND ' + build_where(valid_params, self, with_where=False) if valid_params else '')
3030        + (
3031            (
3032                f'\n    AND {dt_name} >= '
3033                + dateadd_str(self.flavor, 'day', 0, begin, db_type=dt_db_type)
3034            )
3035            if begin is not None
3036            else ''
3037        ) + (
3038            (
3039                f'\n    AND {dt_name} <  '
3040                + dateadd_str(self.flavor, 'day', 0, end, db_type=dt_db_type)
3041            )
3042            if end is not None
3043            else ''
3044        )
3045    )
3046    success = self.exec(clear_query, silent=True, debug=debug) is not None
3047    msg = "Success" if success else f"Failed to clear {pipe}."
3048    return success, msg

Delete a pipe's data within a bounded or unbounded interval without dropping the table.

Parameters
  • pipe (mrsm.Pipe): The pipe to clear.
  • begin (Union[datetime, int, None], default None): Beginning datetime. Inclusive.
  • end (Union[datetime, int, None], default None): Ending datetime. Exclusive.
  • params (Optional[Dict[str, Any]], default None): See meerschaum.utils.sql.build_where.
def deduplicate_pipe( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
3652def deduplicate_pipe(
3653    self,
3654    pipe: mrsm.Pipe,
3655    begin: Union[datetime, int, None] = None,
3656    end: Union[datetime, int, None] = None,
3657    params: Optional[Dict[str, Any]] = None,
3658    debug: bool = False,
3659    **kwargs: Any
3660) -> SuccessTuple:
3661    """
3662    Delete duplicate values within a pipe's table.
3663
3664    Parameters
3665    ----------
3666    pipe: mrsm.Pipe
3667        The pipe whose table to deduplicate.
3668
3669    begin: Union[datetime, int, None], default None
3670        If provided, only deduplicate values greater than or equal to this value.
3671
3672    end: Union[datetime, int, None], default None
3673        If provided, only deduplicate values less than this value.
3674
3675    params: Optional[Dict[str, Any]], default None
3676        If provided, further limit deduplication to values which match this query dictionary.
3677
3678    debug: bool, default False
3679        Verbosity toggle.
3680
3681    Returns
3682    -------
3683    A `SuccessTuple` indicating success.
3684    """
3685    from meerschaum.utils.sql import (
3686        sql_item_name,
3687        get_rename_table_queries,
3688        DROP_IF_EXISTS_FLAVORS,
3689        get_create_table_query,
3690        format_cte_subquery,
3691        get_null_replacement,
3692    )
3693    from meerschaum.utils.misc import generate_password, flatten_list
3694
3695    pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
3696
3697    if not pipe.exists(debug=debug):
3698        return False, f"Table {pipe_table_name} does not exist."
3699
3700    dt_col = pipe.columns.get('datetime', None)
3701    cols_types = pipe.get_columns_types(debug=debug)
3702    existing_cols = pipe.get_columns_types(debug=debug)
3703
3704    get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}"
3705    old_rowcount = self.value(get_rowcount_query, debug=debug)
3706    if old_rowcount is None:
3707        return False, f"Failed to get rowcount for table {pipe_table_name}."
3708
3709    ### Non-datetime indices that in fact exist.
3710    indices = [
3711        col
3712        for key, col in pipe.columns.items()
3713        if col and col != dt_col and col in cols_types
3714    ]
3715    indices_names = [sql_item_name(index_col, self.flavor, None) for index_col in indices]
3716    existing_cols_names = [sql_item_name(col, self.flavor, None) for col in existing_cols]
3717    duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor, None)
3718    previous_row_number_name = sql_item_name('prev_row_num', self.flavor, None)
3719
3720    index_list_str = (
3721        sql_item_name(dt_col, self.flavor, None)
3722        if dt_col
3723        else ''
3724    )
3725    index_list_str_ordered = (
3726        (
3727            sql_item_name(dt_col, self.flavor, None) + " DESC"
3728        )
3729        if dt_col
3730        else ''
3731    )
3732    if indices:
3733        index_list_str += ', ' + ', '.join(indices_names)
3734        index_list_str_ordered += ', ' + ', '.join(indices_names)
3735    if index_list_str.startswith(','):
3736        index_list_str = index_list_str.lstrip(',').lstrip()
3737    if index_list_str_ordered.startswith(','):
3738        index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip()
3739
3740    cols_list_str = ', '.join(existing_cols_names)
3741
3742    try:
3743        ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()).
3744        is_old_mysql = (
3745            self.flavor in ('mysql', 'mariadb')
3746            and
3747            int(self.db_version.split('.')[0]) < 8
3748        )
3749    except Exception:
3750        is_old_mysql = False
3751
3752    src_query = f"""
3753        SELECT
3754            {cols_list_str},
3755            ROW_NUMBER() OVER (
3756                PARTITION BY
3757                {index_list_str}
3758                ORDER BY {index_list_str_ordered}
3759            ) AS {duplicate_row_number_name}
3760        FROM {pipe_table_name}
3761    """
3762    duplicates_cte_subquery = format_cte_subquery(
3763        src_query,
3764        self.flavor,
3765        sub_name = 'src',
3766        cols_to_select = cols_list_str,
3767    ) + f"""
3768        WHERE {duplicate_row_number_name} = 1
3769        """
3770    old_mysql_query = (
3771        f"""
3772        SELECT
3773            {index_list_str}
3774        FROM (
3775          SELECT
3776            {index_list_str},
3777            IF(
3778                @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')},
3779                @{duplicate_row_number_name} := 0,
3780                @{duplicate_row_number_name}
3781            ),
3782            @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')},
3783            @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """
3784        + f"""{duplicate_row_number_name}
3785          FROM
3786            {pipe_table_name},
3787            (
3788                SELECT @{duplicate_row_number_name} := 0
3789            ) AS {duplicate_row_number_name},
3790            (
3791                SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}'
3792            ) AS {previous_row_number_name}
3793          ORDER BY {index_list_str_ordered}
3794        ) AS t
3795        WHERE {duplicate_row_number_name} = 1
3796        """
3797    )
3798    if is_old_mysql:
3799        duplicates_cte_subquery = old_mysql_query
3800
3801    session_id = generate_password(3)
3802
3803    dedup_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='dedup')
3804    temp_old_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='old')
3805    temp_old_table_name = sql_item_name(temp_old_table, self.flavor, self.get_pipe_schema(pipe))
3806
3807    create_temporary_table_query = get_create_table_query(
3808        duplicates_cte_subquery,
3809        dedup_table,
3810        self.flavor,
3811    ) + f"""
3812    ORDER BY {index_list_str_ordered}
3813    """
3814    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
3815    alter_queries = flatten_list([
3816        get_rename_table_queries(
3817            pipe.target,
3818            temp_old_table,
3819            self.flavor,
3820            schema=self.get_pipe_schema(pipe),
3821        ),
3822        get_rename_table_queries(
3823            dedup_table,
3824            pipe.target,
3825            self.flavor,
3826            schema=self.get_pipe_schema(pipe),
3827        ),
3828        f"DROP TABLE {if_exists_str} {temp_old_table_name}",
3829    ])
3830
3831    self._log_temporary_tables_creation(temp_old_table, create=(not pipe.temporary), debug=debug)
3832    create_temporary_result = self.execute(create_temporary_table_query, debug=debug)
3833    if create_temporary_result is None:
3834        return False, f"Failed to deduplicate table {pipe_table_name}."
3835
3836    results = self.exec_queries(
3837        alter_queries,
3838        break_on_error=True,
3839        rollback=True,
3840        debug=debug,
3841    )
3842
3843    fail_query = None
3844    for result, query in zip(results, alter_queries):
3845        if result is None:
3846            fail_query = query
3847            break
3848    success = fail_query is None
3849
3850    new_rowcount = (
3851        self.value(get_rowcount_query, debug=debug)
3852        if success
3853        else None
3854    )
3855
3856    msg = (
3857        (
3858            f"Successfully deduplicated table {pipe_table_name}"
3859            + (
3860                f"\nfrom {old_rowcount:,} to {new_rowcount:,} rows"
3861                if old_rowcount != new_rowcount
3862                else ''
3863            ) + '.'
3864        )
3865        if success
3866        else f"Failed to execute query:\n{fail_query}"
3867    )
3868    return success, msg

Delete duplicate values within a pipe's table.

Parameters
  • pipe (mrsm.Pipe): The pipe whose table to deduplicate.
  • begin (Union[datetime, int, None], default None): If provided, only deduplicate values greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If provided, only deduplicate values less than this value.
  • params (Optional[Dict[str, Any]], default None): If provided, further limit deduplication to values which match this query dictionary.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple indicating success.
def get_pipe_table( self, pipe: meerschaum.Pipe, debug: bool = False) -> "Union['sqlalchemy.Table', None]":
3051def get_pipe_table(
3052    self,
3053    pipe: mrsm.Pipe,
3054    debug: bool = False,
3055) -> Union['sqlalchemy.Table', None]:
3056    """
3057    Return the `sqlalchemy.Table` object for a `mrsm.Pipe`.
3058
3059    Parameters
3060    ----------
3061    pipe: mrsm.Pipe:
3062        The pipe in question.
3063
3064    Returns
3065    -------
3066    A `sqlalchemy.Table` object. 
3067
3068    """
3069    from meerschaum.utils.sql import get_sqlalchemy_table
3070    if not pipe.exists(debug=debug):
3071        return None
3072    return get_sqlalchemy_table(
3073        pipe.target,
3074        connector=self,
3075        schema=self.get_pipe_schema(pipe),
3076        debug=debug,
3077        refresh=True,
3078    )

Return the sqlalchemy.Table object for a mrsm.Pipe.

Parameters
  • pipe (mrsm.Pipe:): The pipe in question.
Returns
  • A sqlalchemy.Table object.
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, str]:
3081def get_pipe_columns_types(
3082    self,
3083    pipe: mrsm.Pipe,
3084    debug: bool = False,
3085) -> Dict[str, str]:
3086    """
3087    Get the pipe's columns and types.
3088
3089    Parameters
3090    ----------
3091    pipe: mrsm.Pipe:
3092        The pipe to get the columns for.
3093
3094    Returns
3095    -------
3096    A dictionary of columns names (`str`) and types (`str`).
3097
3098    Examples
3099    --------
3100    >>> conn.get_pipe_columns_types(pipe)
3101    {
3102      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
3103      'id': 'BIGINT',
3104      'val': 'DOUBLE PRECISION',
3105    }
3106    >>> 
3107    """
3108    from meerschaum.utils.sql import get_table_cols_types
3109    if not pipe.exists(debug=debug):
3110        return {}
3111
3112    if self.flavor not in ('oracle', 'mysql', 'mariadb', 'sqlite'):
3113        return get_table_cols_types(
3114            pipe.target,
3115            self,
3116            flavor=self.flavor,
3117            schema=self.get_pipe_schema(pipe),
3118            debug=debug,
3119        )
3120
3121    table_columns = {}
3122    try:
3123        pipe_table = self.get_pipe_table(pipe, debug=debug)
3124        if pipe_table is None:
3125            return {}
3126        for col in pipe_table.columns:
3127            table_columns[str(col.name)] = str(col.type)
3128    except Exception as e:
3129        import traceback
3130        traceback.print_exc()
3131        warn(e)
3132        table_columns = {}
3133
3134    return table_columns

Get the pipe's columns and types.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get the columns for.
Returns
  • A dictionary of columns names (str) and types (str).
Examples
>>> conn.get_pipe_columns_types(pipe)
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_to_sql_dtype( self, pipe: meerschaum.Pipe, df: "'pd.DataFrame'", update_dtypes: bool = True) -> "Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']":
3596def get_to_sql_dtype(
3597    self,
3598    pipe: 'mrsm.Pipe',
3599    df: 'pd.DataFrame',
3600    update_dtypes: bool = True,
3601) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']:
3602    """
3603    Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`.
3604
3605    Parameters
3606    ----------
3607    pipe: mrsm.Pipe
3608        The pipe which may contain a `dtypes` parameter.
3609
3610    df: pd.DataFrame
3611        The DataFrame to be pushed via `to_sql()`.
3612
3613    update_dtypes: bool, default True
3614        If `True`, patch the pipe's dtypes onto the DataFrame's dtypes.
3615
3616    Returns
3617    -------
3618    A dictionary with `sqlalchemy` datatypes.
3619
3620    Examples
3621    --------
3622    >>> import pandas as pd
3623    >>> import meerschaum as mrsm
3624    >>> 
3625    >>> conn = mrsm.get_connector('sql:memory')
3626    >>> df = pd.DataFrame([{'a': {'b': 1}}])
3627    >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
3628    >>> get_to_sql_dtype(pipe, df)
3629    {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
3630    """
3631    from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols, get_uuid_cols
3632    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
3633    df_dtypes = {
3634        col: str(typ)
3635        for col, typ in df.dtypes.items()
3636    }
3637    json_cols = get_json_cols(df)
3638    numeric_cols = get_numeric_cols(df)
3639    uuid_cols = get_uuid_cols(df)
3640    df_dtypes.update({col: 'json' for col in json_cols})
3641    df_dtypes.update({col: 'numeric' for col in numeric_cols})
3642    df_dtypes.update({col: 'uuid' for col in uuid_cols})
3643    if update_dtypes:
3644        df_dtypes.update(pipe.dtypes)
3645    return {
3646        col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True)
3647        for col, typ in df_dtypes.items()
3648        if col and typ
3649    }

Given a pipe and DataFrame, return the dtype dictionary for to_sql().

Parameters
  • pipe (mrsm.Pipe): The pipe which may contain a dtypes parameter.
  • df (pd.DataFrame): The DataFrame to be pushed via to_sql().
  • update_dtypes (bool, default True): If True, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
  • A dictionary with sqlalchemy datatypes.
Examples
>>> import pandas as pd
>>> import meerschaum as mrsm
>>> 
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
def get_pipe_schema(self, pipe: meerschaum.Pipe) -> Optional[str]:
3871def get_pipe_schema(self, pipe: mrsm.Pipe) -> Union[str, None]:
3872    """
3873    Return the schema to use for this pipe.
3874    First check `pipe.parameters['schema']`, then check `self.schema`.
3875
3876    Parameters
3877    ----------
3878    pipe: mrsm.Pipe
3879        The pipe which may contain a configured schema.
3880
3881    Returns
3882    -------
3883    A schema string or `None` if nothing is configured.
3884    """
3885    return pipe.parameters.get('schema', self.schema)

Return the schema to use for this pipe. First check pipe.parameters['schema'], then check self.schema.

Parameters
  • pipe (mrsm.Pipe): The pipe which may contain a configured schema.
Returns
  • A schema string or None if nothing is configured.
def create_pipe_table_from_df( self, pipe: meerschaum.Pipe, df: "'pd.DataFrame'", debug: bool = False) -> Tuple[bool, str]:
1544def create_pipe_table_from_df(
1545    self,
1546    pipe: mrsm.Pipe,
1547    df: 'pd.DataFrame',
1548    debug: bool = False,
1549) -> mrsm.SuccessTuple:
1550    """
1551    Create a pipe's table from its configured dtypes and an incoming dataframe.
1552    """
1553    from meerschaum.utils.dataframe import (
1554        get_json_cols,
1555        get_numeric_cols,
1556        get_uuid_cols,
1557        get_datetime_cols,
1558        get_bytes_cols,
1559    )
1560    from meerschaum.utils.sql import (
1561        get_create_table_queries,
1562        sql_item_name,
1563        get_create_schema_if_not_exists_queries,
1564    )
1565    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
1566    primary_key = pipe.columns.get('primary', None)
1567    primary_key_typ = (
1568        pipe.dtypes.get(primary_key, str(df.dtypes.get(primary_key, 'int')))
1569        if primary_key
1570        else None
1571    )
1572    primary_key_db_type = (
1573        get_db_type_from_pd_type(primary_key_typ, self.flavor)
1574        if primary_key
1575        else None
1576    )
1577    dt_col = pipe.columns.get('datetime', None)
1578    new_dtypes = {
1579        **{
1580            col: str(typ)
1581            for col, typ in df.dtypes.items()
1582        },
1583        **{
1584            col: str(df.dtypes.get(col, 'int'))
1585            for col_ix, col in pipe.columns.items()
1586            if col and col_ix != 'primary'
1587        },
1588        **{
1589            col: 'uuid'
1590            for col in get_uuid_cols(df)
1591        },
1592        **{
1593            col: 'json'
1594            for col in get_json_cols(df)
1595        },
1596        **{
1597            col: 'numeric'
1598            for col in get_numeric_cols(df)
1599        },
1600        **{
1601            col: 'bytes'
1602            for col in get_bytes_cols(df)
1603        },
1604        **{
1605            col: 'datetime64[ns, UTC]'
1606            for col in get_datetime_cols(df, timezone_aware=True, timezone_naive=False)
1607        },
1608        **{
1609            col: 'datetime64[ns]'
1610            for col in get_datetime_cols(df, timezone_aware=False, timezone_naive=True)
1611        },
1612        **pipe.dtypes
1613    }
1614    autoincrement = (
1615        pipe.parameters.get('autoincrement', False)
1616        or (primary_key and primary_key not in new_dtypes)
1617    )
1618    if autoincrement:
1619        _ = new_dtypes.pop(primary_key, None)
1620
1621    schema = self.get_pipe_schema(pipe)
1622    create_table_queries = get_create_table_queries(
1623        new_dtypes,
1624        pipe.target,
1625        self.flavor,
1626        schema=schema,
1627        primary_key=primary_key,
1628        primary_key_db_type=primary_key_db_type,
1629        datetime_column=dt_col,
1630    )
1631    if schema:
1632        create_table_queries = (
1633            get_create_schema_if_not_exists_queries(schema, self.flavor)
1634            + create_table_queries
1635        )
1636    success = all(
1637        self.exec_queries(create_table_queries, break_on_error=True, rollback=True, debug=debug)
1638    )
1639    target_name = sql_item_name(pipe.target, schema=self.get_pipe_schema(pipe), flavor=self.flavor)
1640    msg = (
1641        "Success"
1642        if success
1643        else f"Failed to create {target_name}."
1644    )
1645    return success, msg

Create a pipe's table from its configured dtypes and an incoming dataframe.

def get_pipe_columns_indices( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[Dict[str, str]]]:
3137def get_pipe_columns_indices(
3138    self,
3139    pipe: mrsm.Pipe,
3140    debug: bool = False,
3141) -> Dict[str, List[Dict[str, str]]]:
3142    """
3143    Return a dictionary mapping columns to the indices created on those columns.
3144
3145    Parameters
3146    ----------
3147    pipe: mrsm.Pipe
3148        The pipe to be queried against.
3149
3150    Returns
3151    -------
3152    A dictionary mapping columns names to lists of dictionaries.
3153    The dictionaries in the lists contain the name and type of the indices.
3154    """
3155    if pipe.__dict__.get('_skip_check_indices', False):
3156        return {}
3157    from meerschaum.utils.sql import get_table_cols_indices
3158    return get_table_cols_indices(
3159        pipe.target,
3160        self,
3161        flavor=self.flavor,
3162        schema=self.get_pipe_schema(pipe),
3163        debug=debug,
3164    )

Return a dictionary mapping columns to the indices created on those columns.

Parameters
  • pipe (mrsm.Pipe): The pipe to be queried against.
Returns
  • A dictionary mapping columns names to lists of dictionaries.
  • The dictionaries in the lists contain the name and type of the indices.
@staticmethod
def get_temporary_target( target: str, transact_id: 'Optional[str, None]' = None, label: Optional[str] = None, separator: Optional[str] = None) -> str:
3888@staticmethod
3889def get_temporary_target(
3890    target: str,
3891    transact_id: Optional[str, None] = None,
3892    label: Optional[str] = None,
3893    separator: Optional[str] = None,
3894) -> str:
3895    """
3896    Return a unique(ish) temporary target for a pipe.
3897    """
3898    from meerschaum.utils.misc import generate_password
3899    temp_target_cf = (
3900        mrsm.get_config('system', 'connectors', 'sql', 'instance', 'temporary_target') or {}
3901    )
3902    transaction_id_len = temp_target_cf.get('transaction_id_length', 3)
3903    transact_id = transact_id or generate_password(transaction_id_len)
3904    temp_prefix = temp_target_cf.get('prefix', '_')
3905    separator = separator or temp_target_cf.get('separator', '_')
3906    return (
3907        temp_prefix
3908        + target
3909        + separator
3910        + transact_id
3911        + ((separator + label) if label else '')
3912    )

Return a unique(ish) temporary target for a pipe.

def create_pipe_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, debug: bool = False) -> Tuple[bool, str]:
314def create_pipe_indices(
315    self,
316    pipe: mrsm.Pipe,
317    columns: Optional[List[str]] = None,
318    debug: bool = False,
319) -> SuccessTuple:
320    """
321    Create a pipe's indices.
322    """
323    success = self.create_indices(pipe, columns=columns, debug=debug)
324    msg = (
325        "Success"
326        if success
327        else f"Failed to create indices for {pipe}."
328    )
329    return success, msg

Create a pipe's indices.

def drop_pipe_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, debug: bool = False) -> Tuple[bool, str]:
368def drop_pipe_indices(
369    self,
370    pipe: mrsm.Pipe,
371    columns: Optional[List[str]] = None,
372    debug: bool = False,
373) -> SuccessTuple:
374    """
375    Drop a pipe's indices.
376    """
377    success = self.drop_indices(pipe, columns=columns, debug=debug)
378    msg = (
379        "Success"
380        if success
381        else f"Failed to drop indices for {pipe}."
382    )
383    return success, msg

Drop a pipe's indices.

def get_pipe_index_names(self, pipe: meerschaum.Pipe) -> Dict[str, str]:
421def get_pipe_index_names(self, pipe: mrsm.Pipe) -> Dict[str, str]:
422    """
423    Return a dictionary mapping index keys to their names on the database.
424
425    Returns
426    -------
427    A dictionary of index keys to column names.
428    """
429    from meerschaum.utils.sql import DEFAULT_SCHEMA_FLAVORS
430    _parameters = pipe.parameters
431    _index_template = _parameters.get('index_template', "IX_{schema_str}{target}_{column_names}")
432    _schema = self.get_pipe_schema(pipe)
433    if _schema is None:
434        _schema = (
435            DEFAULT_SCHEMA_FLAVORS.get(self.flavor, None)
436            if self.flavor != 'mssql'
437            else None
438        )
439    schema_str = '' if _schema is None else f'{_schema}_'
440    schema_str = ''
441    _indices = pipe.indices
442    _target = pipe.target
443    _column_names = {
444        ix: (
445            '_'.join(cols)
446            if isinstance(cols, (list, tuple))
447            else str(cols)
448        )
449        for ix, cols in _indices.items()
450        if cols
451    }
452    _index_names = {
453        ix: _index_template.format(
454            target=_target,
455            column_names=column_names,
456            connector_keys=pipe.connector_keys,
457            metric_key=pipe.metric_key,
458            location_key=pipe.location_key,
459            schema_str=schema_str,
460        )
461        for ix, column_names in _column_names.items()
462    }
463    ### NOTE: Skip any duplicate indices.
464    seen_index_names = {}
465    for ix, index_name in _index_names.items():
466        if index_name in seen_index_names:
467            continue
468        seen_index_names[index_name] = ix
469    return {
470        ix: index_name
471        for index_name, ix in seen_index_names.items()
472    }

Return a dictionary mapping index keys to their names on the database.

Returns
  • A dictionary of index keys to column names.
def register_plugin( self, plugin: meerschaum.Plugin, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
17def register_plugin(
18    self,
19    plugin: 'mrsm.core.Plugin',
20    force: bool = False,
21    debug: bool = False,
22    **kw: Any
23) -> SuccessTuple:
24    """Register a new plugin to the plugins table."""
25    from meerschaum.utils.packages import attempt_import
26    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
27    from meerschaum.utils.sql import json_flavors
28    from meerschaum.connectors.sql.tables import get_tables
29    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
30
31    old_id = self.get_plugin_id(plugin, debug=debug)
32
33    ### Check for version conflict. May be overridden with `--force`.
34    if old_id is not None and not force:
35        old_version = self.get_plugin_version(plugin, debug=debug)
36        new_version = plugin.version
37        if old_version is None:
38            old_version = ''
39        if new_version is None:
40            new_version = ''
41
42        ### verify that the new version is greater than the old
43        packaging_version = attempt_import('packaging.version')
44        if (
45            old_version and new_version
46            and packaging_version.parse(old_version) >= packaging_version.parse(new_version)
47        ):
48            return False, (
49                f"Version '{new_version}' of plugin '{plugin}' " +
50                f"must be greater than existing version '{old_version}'."
51            )
52
53    bind_variables = {
54        'plugin_name': plugin.name,
55        'version': plugin.version,
56        'attributes': (
57            json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes
58        ),
59        'user_id': plugin.user_id,
60    }
61
62    if old_id is None:
63        query = sqlalchemy.insert(plugins_tbl).values(**bind_variables)
64    else:
65        query = (
66            sqlalchemy.update(plugins_tbl)
67            .values(**bind_variables)
68            .where(plugins_tbl.c.plugin_id == old_id)
69        )
70
71    result = self.exec(query, debug=debug)
72    if result is None:
73        return False, f"Failed to register plugin '{plugin}'."
74    return True, f"Successfully registered plugin '{plugin}'."

Register a new plugin to the plugins table.

def delete_plugin( self, plugin: meerschaum.Plugin, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
243def delete_plugin(
244    self,
245    plugin: 'mrsm.core.Plugin',
246    debug: bool = False,
247    **kw: Any
248) -> SuccessTuple:
249    """Delete a plugin from the plugins table."""
250    from meerschaum.utils.packages import attempt_import
251    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
252    from meerschaum.connectors.sql.tables import get_tables
253    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
254
255    plugin_id = self.get_plugin_id(plugin, debug=debug)
256    if plugin_id is None:
257        return True, f"Plugin '{plugin}' was not registered."
258
259    query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id)
260    result = self.exec(query, debug=debug)
261    if result is None:
262        return False, f"Failed to delete plugin '{plugin}'."
263    return True, f"Successfully deleted plugin '{plugin}'."

Delete a plugin from the plugins table.

def get_plugin_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[int]:
76def get_plugin_id(
77    self,
78    plugin: 'mrsm.core.Plugin',
79    debug: bool = False
80) -> Optional[int]:
81    """
82    Return a plugin's ID.
83    """
84    ### ensure plugins table exists
85    from meerschaum.connectors.sql.tables import get_tables
86    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
87    from meerschaum.utils.packages import attempt_import
88    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
89
90    query = (
91        sqlalchemy
92        .select(plugins_tbl.c.plugin_id)
93        .where(plugins_tbl.c.plugin_name == plugin.name)
94    )
95    
96    try:
97        return int(self.value(query, debug=debug))
98    except Exception:
99        return None

Return a plugin's ID.

def get_plugin_version( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
102def get_plugin_version(
103    self,
104    plugin: 'mrsm.core.Plugin',
105    debug: bool = False
106) -> Optional[str]:
107    """
108    Return a plugin's version.
109    """
110    ### ensure plugins table exists
111    from meerschaum.connectors.sql.tables import get_tables
112    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
113    from meerschaum.utils.packages import attempt_import
114    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
115    query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name)
116    return self.value(query, debug=debug)

Return a plugin's version.

def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) -> List[str]:
196def get_plugins(
197    self,
198    user_id: Optional[int] = None,
199    search_term: Optional[str] = None,
200    debug: bool = False,
201    **kw: Any
202) -> List[str]:
203    """
204    Return a list of all registered plugins.
205
206    Parameters
207    ----------
208    user_id: Optional[int], default None
209        If specified, filter plugins by a specific `user_id`.
210
211    search_term: Optional[str], default None
212        If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins.
213
214
215    Returns
216    -------
217    A list of plugin names.
218    """
219    ### ensure plugins table exists
220    from meerschaum.connectors.sql.tables import get_tables
221    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
222    from meerschaum.utils.packages import attempt_import
223    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
224
225    query = sqlalchemy.select(plugins_tbl.c.plugin_name)
226    if user_id is not None:
227        query = query.where(plugins_tbl.c.user_id == user_id)
228    if search_term is not None:
229        query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%'))
230
231    rows = (
232        self.execute(query).fetchall()
233        if self.flavor != 'duckdb'
234        else [
235            (row['plugin_name'],)
236            for row in self.read(query).to_dict(orient='records')
237        ]
238    )
239    
240    return [row[0] for row in rows]

Return a list of all registered plugins.

Parameters
  • user_id (Optional[int], default None): If specified, filter plugins by a specific user_id.
  • search_term (Optional[str], default None): If specified, add a WHERE plugin_name LIKE '{search_term}%' clause to filter the plugins.
Returns
  • A list of plugin names.
def get_plugin_user_id( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[int]:
118def get_plugin_user_id(
119    self,
120    plugin: 'mrsm.core.Plugin',
121    debug: bool = False
122) -> Optional[int]:
123    """
124    Return a plugin's user ID.
125    """
126    ### ensure plugins table exists
127    from meerschaum.connectors.sql.tables import get_tables
128    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
129    from meerschaum.utils.packages import attempt_import
130    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
131
132    query = (
133        sqlalchemy
134        .select(plugins_tbl.c.user_id)
135        .where(plugins_tbl.c.plugin_name == plugin.name)
136    )
137
138    try:
139        return int(self.value(query, debug=debug))
140    except Exception:
141        return None

Return a plugin's user ID.

def get_plugin_username( self, plugin: meerschaum.Plugin, debug: bool = False) -> Optional[str]:
143def get_plugin_username(
144    self,
145    plugin: 'mrsm.core.Plugin',
146    debug: bool = False
147) -> Optional[str]:
148    """
149    Return the username of a plugin's owner.
150    """
151    ### ensure plugins table exists
152    from meerschaum.connectors.sql.tables import get_tables
153    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
154    users = get_tables(mrsm_instance=self, debug=debug)['users']
155    from meerschaum.utils.packages import attempt_import
156    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
157
158    query = (
159        sqlalchemy.select(users.c.username)
160        .where(
161            users.c.user_id == plugins_tbl.c.user_id
162            and plugins_tbl.c.plugin_name == plugin.name
163        )
164    )
165
166    return self.value(query, debug=debug)

Return the username of a plugin's owner.

def get_plugin_attributes( self, plugin: meerschaum.Plugin, debug: bool = False) -> Dict[str, Any]:
169def get_plugin_attributes(
170    self,
171    plugin: 'mrsm.core.Plugin',
172    debug: bool = False
173) -> Dict[str, Any]:
174    """
175    Return the attributes of a plugin.
176    """
177    ### ensure plugins table exists
178    from meerschaum.connectors.sql.tables import get_tables
179    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
180    from meerschaum.utils.packages import attempt_import
181    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
182
183    query = (
184        sqlalchemy
185        .select(plugins_tbl.c.attributes)
186        .where(plugins_tbl.c.plugin_name == plugin.name)
187    )
188
189    _attr = self.value(query, debug=debug)
190    if isinstance(_attr, str):
191        _attr = json.loads(_attr)
192    elif _attr is None:
193        _attr = {}
194    return _attr

Return the attributes of a plugin.

def register_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
16def register_user(
17    self,
18    user: mrsm.core.User,
19    debug: bool = False,
20    **kw: Any
21) -> SuccessTuple:
22    """Register a new user."""
23    from meerschaum.utils.packages import attempt_import
24    from meerschaum.utils.sql import json_flavors
25    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
26
27    valid_tuple = valid_username(user.username)
28    if not valid_tuple[0]:
29        return valid_tuple
30
31    old_id = self.get_user_id(user, debug=debug)
32
33    if old_id is not None:
34        return False, f"User '{user}' already exists."
35
36    ### ensure users table exists
37    from meerschaum.connectors.sql.tables import get_tables
38    tables = get_tables(mrsm_instance=self, debug=debug)
39
40    import json
41    bind_variables = {
42        'username': user.username,
43        'email': user.email,
44        'password_hash': user.password_hash,
45        'user_type': user.type,
46        'attributes': (
47            json.dumps(user.attributes)
48            if self.flavor not in json_flavors
49            else user.attributes
50        ),
51    }
52    if old_id is not None:
53        return False, f"User '{user.username}' already exists."
54    if old_id is None:
55        query = (
56            sqlalchemy.insert(tables['users']).
57            values(**bind_variables)
58        )
59
60    result = self.exec(query, debug=debug)
61    if result is None:
62        return False, f"Failed to register user '{user}'."
63    return True, f"Successfully registered user '{user}'."

Register a new user.

def get_user_id( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[int]:
154def get_user_id(
155    self,
156    user: 'mrsm.core.User',
157    debug: bool = False
158) -> Optional[int]:
159    """If a user is registered, return the `user_id`."""
160    ### ensure users table exists
161    from meerschaum.utils.packages import attempt_import
162    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
163    from meerschaum.connectors.sql.tables import get_tables
164    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
165
166    query = (
167        sqlalchemy.select(users_tbl.c.user_id)
168        .where(users_tbl.c.username == user.username)
169    )
170
171    result = self.value(query, debug=debug)
172    if result is not None:
173        return int(result)
174    return None

If a user is registered, return the user_id.

def get_users(self, debug: bool = False, **kw: Any) -> List[str]:
248def get_users(
249    self,
250    debug: bool = False,
251    **kw: Any
252) -> List[str]:
253    """
254    Get the registered usernames.
255    """
256    ### ensure users table exists
257    from meerschaum.connectors.sql.tables import get_tables
258    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
259    from meerschaum.utils.packages import attempt_import
260    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
261
262    query = sqlalchemy.select(users_tbl.c.username)
263
264    return list(self.read(query, debug=debug)['username'])

Get the registered usernames.

def edit_user( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
100def edit_user(
101    self,
102    user: 'mrsm.core.User',
103    debug: bool = False,
104    **kw: Any
105) -> SuccessTuple:
106    """Update an existing user's metadata."""
107    from meerschaum.utils.packages import attempt_import
108    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
109    from meerschaum.connectors.sql.tables import get_tables
110    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
111
112    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
113    if user_id is None:
114        return False, (
115            f"User '{user.username}' does not exist. "
116            f"Register user '{user.username}' before editing."
117        )
118    user.user_id = user_id
119
120    import json
121    valid_tuple = valid_username(user.username)
122    if not valid_tuple[0]:
123        return valid_tuple
124
125    bind_variables = {
126        'user_id' : user_id,
127        'username' : user.username,
128    }
129    if user.password != '':
130        bind_variables['password_hash'] = user.password_hash
131    if user.email != '':
132        bind_variables['email'] = user.email
133    if user.attributes is not None and user.attributes != {}:
134        bind_variables['attributes'] = (
135            json.dumps(user.attributes) if self.flavor in ('duckdb',)
136            else user.attributes
137        )
138    if user.type != '':
139        bind_variables['user_type'] = user.type
140
141    query = (
142        sqlalchemy
143        .update(users_tbl)
144        .values(**bind_variables)
145        .where(users_tbl.c.user_id == user_id)
146    )
147
148    result = self.exec(query, debug=debug)
149    if result is None:
150        return False, f"Failed to edit user '{user}'."
151    return True, f"Successfully edited user '{user}'."

Update an existing user's metadata.

def delete_user( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Tuple[bool, str]:
216def delete_user(
217    self,
218    user: 'mrsm.core.User',
219    debug: bool = False
220) -> SuccessTuple:
221    """Delete a user's record from the users table."""
222    ### ensure users table exists
223    from meerschaum.connectors.sql.tables import get_tables
224    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
225    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
226    from meerschaum.utils.packages import attempt_import
227    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
228
229    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
230
231    if user_id is None:
232        return False, f"User '{user.username}' is not registered and cannot be deleted."
233
234    query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id)
235
236    result = self.exec(query, debug=debug)
237    if result is None:
238        return False, f"Failed to delete user '{user}'."
239
240    query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id)
241    result = self.exec(query, debug=debug)
242    if result is None:
243        return False, f"Failed to delete plugins of user '{user}'."
244
245    return True, f"Successfully deleted user '{user}'"

Delete a user's record from the users table.

def get_user_password_hash( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
267def get_user_password_hash(
268    self,
269    user: 'mrsm.core.User',
270    debug: bool = False,
271    **kw: Any
272) -> Optional[str]:
273    """
274    Return the password has for a user.
275    **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it.
276    """
277    from meerschaum.utils.debug import dprint
278    from meerschaum.connectors.sql.tables import get_tables
279    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
280    from meerschaum.utils.packages import attempt_import
281    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
282
283    if user.user_id is not None:
284        user_id = user.user_id
285        if debug:
286            dprint(f"Already given user_id: {user_id}")
287    else:
288        if debug:
289            dprint("Fetching user_id...")
290        user_id = self.get_user_id(user, debug=debug)
291
292    if user_id is None:
293        return None
294
295    query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id)
296
297    return self.value(query, debug=debug)

Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.

def get_user_type( self, user: meerschaum.core.User._User.User, debug: bool = False, **kw: Any) -> Optional[str]:
300def get_user_type(
301    self,
302    user: 'mrsm.core.User',
303    debug: bool = False,
304    **kw: Any
305) -> Optional[str]:
306    """
307    Return the user's type.
308    """
309    from meerschaum.connectors.sql.tables import get_tables
310    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
311    from meerschaum.utils.packages import attempt_import
312    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
313
314    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
315
316    if user_id is None:
317        return None
318
319    query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id)
320
321    return self.value(query, debug=debug)

Return the user's type.

def get_user_attributes( self, user: meerschaum.core.User._User.User, debug: bool = False) -> Optional[Dict[str, Any]]:
176def get_user_attributes(
177    self,
178    user: 'mrsm.core.User',
179    debug: bool = False
180) -> Union[Dict[str, Any], None]:
181    """
182    Return the user's attributes.
183    """
184    ### ensure users table exists
185    from meerschaum.utils.warnings import warn
186    from meerschaum.utils.packages import attempt_import
187    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
188    from meerschaum.connectors.sql.tables import get_tables
189    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
190
191    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
192
193    query = (
194        sqlalchemy.select(users_tbl.c.attributes)
195        .where(users_tbl.c.user_id == user_id)
196    )
197
198    result = self.value(query, debug=debug)
199    if result is not None and not isinstance(result, dict):
200        try:
201            result = dict(result)
202            _parsed = True
203        except Exception:
204            _parsed = False
205        if not _parsed:
206            try:
207                import json
208                result = json.loads(result)
209                _parsed = True
210            except Exception:
211                _parsed = False
212        if not _parsed:
213            warn(f"Received unexpected type for attributes: {result}")
214    return result

Return the user's attributes.

@classmethod
def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False) -> Union[SQLConnector, Dict[str, Union[str, int]]]:
15@classmethod
16def from_uri(
17    cls,
18    uri: str,
19    label: Optional[str] = None,
20    as_dict: bool = False,
21) -> Union[
22    'meerschaum.connectors.SQLConnector',
23    Dict[str, Union[str, int]],
24]:
25    """
26    Create a new SQLConnector from a URI string.
27
28    Parameters
29    ----------
30    uri: str
31        The URI connection string.
32
33    label: Optional[str], default None
34        If provided, use this as the connector label.
35        Otherwise use the determined database name.
36
37    as_dict: bool, default False
38        If `True`, return a dictionary of the keyword arguments
39        necessary to create a new `SQLConnector`, otherwise create a new object.
40
41    Returns
42    -------
43    A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`).
44    """
45
46    params = cls.parse_uri(uri)
47    params['uri'] = uri
48    flavor = params.get('flavor', None)
49    if not flavor or flavor not in cls.flavor_configs:
50        error(f"Invalid flavor '{flavor}' detected from the provided URI.")
51
52    if 'database' not in params:
53        error("Unable to determine the database from the provided URI.")
54
55    if flavor in ('sqlite', 'duckdb'):
56        if params['database'] == ':memory:':
57            params['label'] = label or f'memory_{flavor}'
58        else:
59            params['label'] = label or params['database'].split(os.path.sep)[-1].lower()
60    else:
61        params['label'] = label or (
62            (
63                (params['username'] + '@' if 'username' in params else '')
64                + params.get('host', '')
65                + ('/' if 'host' in params else '')
66                + params.get('database', '')
67            ).lower()
68        )
69
70    return cls(**params) if not as_dict else params

Create a new SQLConnector from a URI string.

Parameters
  • uri (str): The URI connection string.
  • label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
  • as_dict (bool, default False): If True, return a dictionary of the keyword arguments necessary to create a new SQLConnector, otherwise create a new object.
Returns
  • A new SQLConnector object or a dictionary of attributes (if as_dict is True).
@staticmethod
def parse_uri(uri: str) -> Dict[str, Any]:
 73@staticmethod
 74def parse_uri(uri: str) -> Dict[str, Any]:
 75    """
 76    Parse a URI string into a dictionary of parameters.
 77
 78    Parameters
 79    ----------
 80    uri: str
 81        The database connection URI.
 82
 83    Returns
 84    -------
 85    A dictionary of attributes.
 86
 87    Examples
 88    --------
 89    >>> parse_uri('sqlite:////home/foo/bar.db')
 90    {'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
 91    >>> parse_uri(
 92    ...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
 93    ...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
 94    ... )
 95    {'host': 'localhost', 'database': 'master', 'username': 'sa',
 96    'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
 97    'driver': 'ODBC Driver 17 for SQL Server'}
 98    >>> 
 99    """
100    from urllib.parse import parse_qs, urlparse
101    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
102    parser = sqlalchemy.engine.url.make_url
103    params = parser(uri).translate_connect_args()
104    params['flavor'] = uri.split(':')[0].split('+')[0]
105    if params['flavor'] == 'postgres':
106        params['flavor'] = 'postgresql'
107    if '?' in uri:
108        parsed_uri = urlparse(uri)
109        for key, value in parse_qs(parsed_uri.query).items():
110            params.update({key: value[0]})
111
112        if '--search_path' in params.get('options', ''):
113            params.update({'schema': params['options'].replace('--search_path=', '', 1)})
114    return params

Parse a URI string into a dictionary of parameters.

Parameters
  • uri (str): The database connection URI.
Returns
  • A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>