meerschaum.connectors.sql

Subpackage for SQLConnector subclass

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Subpackage for SQLConnector subclass
 7"""
 8
 9from meerschaum.connectors.sql._SQLConnector import SQLConnector
10
11__all__ = ('SQLConnector',)
class SQLConnector(meerschaum.connectors._Connector.Connector):
 18class SQLConnector(Connector):
 19    """
 20    Connect to SQL databases via `sqlalchemy`.
 21    
 22    SQLConnectors may be used as Meerschaum instance connectors.
 23    Read more about connectors and instances at
 24    https://meerschaum.io/reference/connectors/
 25
 26    """
 27
 28    IS_INSTANCE: bool = True
 29
 30    from ._create_engine import flavor_configs, create_engine
 31    from ._sql import (
 32        read,
 33        value,
 34        exec,
 35        execute,
 36        to_sql,
 37        exec_queries,
 38        get_connection,
 39        _cleanup_connections,
 40    )
 41    from meerschaum.utils.sql import test_connection
 42    from ._fetch import fetch, get_pipe_metadef
 43    from ._cli import cli, _cli_exit
 44    from ._pipes import (
 45        fetch_pipes_keys,
 46        create_indices,
 47        drop_indices,
 48        get_create_index_queries,
 49        get_drop_index_queries,
 50        get_add_columns_queries,
 51        get_alter_columns_queries,
 52        delete_pipe,
 53        get_pipe_data,
 54        get_pipe_data_query,
 55        register_pipe,
 56        edit_pipe,
 57        get_pipe_id,
 58        get_pipe_attributes,
 59        sync_pipe,
 60        sync_pipe_inplace,
 61        get_sync_time,
 62        pipe_exists,
 63        get_pipe_rowcount,
 64        drop_pipe,
 65        clear_pipe,
 66        deduplicate_pipe,
 67        get_pipe_table,
 68        get_pipe_columns_types,
 69        get_to_sql_dtype,
 70        get_pipe_schema,
 71        create_pipe_table_from_df,
 72        get_pipe_columns_indices,
 73    )
 74    from ._plugins import (
 75        register_plugin,
 76        delete_plugin,
 77        get_plugin_id,
 78        get_plugin_version,
 79        get_plugins,
 80        get_plugin_user_id,
 81        get_plugin_username,
 82        get_plugin_attributes,
 83    )
 84    from ._users import (
 85        register_user,
 86        get_user_id,
 87        get_users,
 88        edit_user,
 89        delete_user,
 90        get_user_password_hash,
 91        get_user_type,
 92        get_user_attributes,
 93    )
 94    from ._uri import from_uri, parse_uri
 95    from ._instance import (
 96        _log_temporary_tables_creation,
 97        _drop_temporary_table,
 98        _drop_temporary_tables,
 99        _drop_old_temporary_tables,
100    )
101
102    def __init__(
103        self,
104        label: Optional[str] = None,
105        flavor: Optional[str] = None,
106        wait: bool = False,
107        connect: bool = False,
108        debug: bool = False,
109        **kw: Any
110    ):
111        """
112        Parameters
113        ----------
114        label: str, default 'main'
115            The identifying label for the connector.
116            E.g. for `sql:main`, 'main' is the label.
117            Defaults to 'main'.
118
119        flavor: Optional[str], default None
120            The database flavor, e.g.
121            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
122            To see supported flavors, run the `bootstrap connectors` command.
123
124        wait: bool, default False
125            If `True`, block until a database connection has been made.
126            Defaults to `False`.
127
128        connect: bool, default False
129            If `True`, immediately attempt to connect the database and raise
130            a warning if the connection fails.
131            Defaults to `False`.
132
133        debug: bool, default False
134            Verbosity toggle.
135            Defaults to `False`.
136
137        kw: Any
138            All other arguments will be passed to the connector's attributes.
139            Therefore, a connector may be made without being registered,
140            as long enough parameters are supplied to the constructor.
141        """
142        if 'uri' in kw:
143            uri = kw['uri']
144            if uri.startswith('postgres') and not uri.startswith('postgresql'):
145                uri = uri.replace('postgres', 'postgresql', 1)
146            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
147                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
148            if uri.startswith('timescaledb://'):
149                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
150                flavor = 'timescaledb'
151            kw['uri'] = uri
152            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
153            label = label or from_uri_params.get('label', None)
154            _ = from_uri_params.pop('label', None)
155
156            ### Sometimes the flavor may be provided with a URI.
157            kw.update(from_uri_params)
158            if flavor:
159                kw['flavor'] = flavor
160
161        ### set __dict__ in base class
162        super().__init__(
163            'sql',
164            label = label or self.__dict__.get('label', None),
165            **kw
166        )
167
168        if self.__dict__.get('flavor', None) == 'sqlite':
169            self._reset_attributes()
170            self._set_attributes(
171                'sql',
172                label = label,
173                inherit_default = False,
174                **kw
175            )
176            ### For backwards compatability reasons, set the path for sql:local if its missing.
177            if self.label == 'local' and not self.__dict__.get('database', None):
178                from meerschaum.config._paths import SQLITE_DB_PATH
179                self.database = str(SQLITE_DB_PATH)
180
181        ### ensure flavor and label are set accordingly
182        if 'flavor' not in self.__dict__:
183            if flavor is None and 'uri' not in self.__dict__:
184                raise Exception(
185                    f"    Missing flavor. Provide flavor as a key for '{self}'."
186                )
187            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
188
189        if self.flavor == 'postgres':
190            self.flavor = 'postgresql'
191
192        self._debug = debug
193        ### Store the PID and thread at initialization
194        ### so we can dispose of the Pool in child processes or threads.
195        import os, threading
196        self._pid = os.getpid()
197        self._thread_ident = threading.current_thread().ident
198        self._sessions = {}
199        self._locks = {'_sessions': threading.RLock(), }
200
201        ### verify the flavor's requirements are met
202        if self.flavor not in self.flavor_configs:
203            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
204        if not self.__dict__.get('uri'):
205            self.verify_attributes(
206                self.flavor_configs[self.flavor].get('requirements', set()),
207                debug=debug,
208            )
209
210        if wait:
211            from meerschaum.connectors.poll import retry_connect
212            retry_connect(connector=self, debug=debug)
213
214        if connect:
215            if not self.test_connection(debug=debug):
216                warn(f"Failed to connect with connector '{self}'!", stack=False)
217
218    @property
219    def Session(self):
220        if '_Session' not in self.__dict__:
221            if self.engine is None:
222                return None
223
224            from meerschaum.utils.packages import attempt_import
225            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
226            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
227            self._Session = sqlalchemy_orm.scoped_session(session_factory)
228
229        return self._Session
230
231    @property
232    def engine(self):
233        """
234        Return the SQLAlchemy engine connected to the configured database.
235        """
236        import os
237        import threading
238        if '_engine' not in self.__dict__:
239            self._engine, self._engine_str = self.create_engine(include_uri=True)
240
241        same_process = os.getpid() == self._pid
242        same_thread = threading.current_thread().ident == self._thread_ident
243
244        ### handle child processes
245        if not same_process:
246            self._pid = os.getpid()
247            self._thread = threading.current_thread()
248            warn("Different PID detected. Disposing of connections...")
249            self._engine.dispose()
250
251        ### handle different threads
252        if not same_thread:
253            if self.flavor == 'duckdb':
254                warn("Different thread detected.")
255                self._engine.dispose()
256
257        return self._engine
258
259    @property
260    def DATABASE_URL(self) -> str:
261        """
262        Return the URI connection string (alias for `SQLConnector.URI`.
263        """
264        _ = self.engine
265        return str(self._engine_str)
266
267    @property
268    def URI(self) -> str:
269        """
270        Return the URI connection string.
271        """
272        _ = self.engine
273        return str(self._engine_str)
274
275    @property
276    def IS_THREAD_SAFE(self) -> str:
277        """
278        Return whether this connector may be multithreaded.
279        """
280        if self.flavor in ('duckdb', 'oracle'):
281            return False
282        if self.flavor == 'sqlite':
283            return ':memory:' not in self.URI
284        return True
285
286
287    @property
288    def metadata(self):
289        """
290        Return the metadata bound to this configured schema.
291        """
292        from meerschaum.utils.packages import attempt_import
293        sqlalchemy = attempt_import('sqlalchemy')
294        if '_metadata' not in self.__dict__:
295            self._metadata = sqlalchemy.MetaData(schema=self.schema)
296        return self._metadata
297
298
299    @property
300    def instance_schema(self):
301        """
302        Return the schema name for Meerschaum tables. 
303        """
304        return self.schema
305
306
307    @property
308    def internal_schema(self):
309        """
310        Return the schema name for internal tables. 
311        """
312        from meerschaum.config.static import STATIC_CONFIG
313        from meerschaum.utils.packages import attempt_import
314        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
315        schema_name = self.__dict__.get('internal_schema', None) or (
316            STATIC_CONFIG['sql']['internal_schema']
317            if self.flavor not in NO_SCHEMA_FLAVORS
318            else self.schema
319        )
320
321        if '_internal_schema' not in self.__dict__:
322            self._internal_schema = schema_name
323        return self._internal_schema
324
325
326    @property
327    def db(self) -> Optional[databases.Database]:
328        from meerschaum.utils.packages import attempt_import
329        databases = attempt_import('databases', lazy=False, install=True)
330        url = self.DATABASE_URL
331        if 'mysql' in url:
332            url = url.replace('+pymysql', '')
333        if '_db' not in self.__dict__:
334            try:
335                self._db = databases.Database(url)
336            except KeyError:
337                ### Likely encountered an unsupported flavor.
338                from meerschaum.utils.warnings import warn
339                self._db = None
340        return self._db
341
342
343    @property
344    def db_version(self) -> Union[str, None]:
345        """
346        Return the database version.
347        """
348        _db_version = self.__dict__.get('_db_version', None)
349        if _db_version is not None:
350            return _db_version
351
352        from meerschaum.utils.sql import get_db_version
353        self._db_version = get_db_version(self)
354        return self._db_version
355
356
357    @property
358    def schema(self) -> Union[str, None]:
359        """
360        Return the default schema to use.
361        A value of `None` will not prepend a schema.
362        """
363        if 'schema' in self.__dict__:
364            return self.__dict__['schema']
365
366        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
367        if self.flavor in NO_SCHEMA_FLAVORS:
368            self.__dict__['schema'] = None
369            return None
370
371        sqlalchemy = mrsm.attempt_import('sqlalchemy')
372        _schema = sqlalchemy.inspect(self.engine).default_schema_name
373        self.__dict__['schema'] = _schema
374        return _schema
375
376
377    def __getstate__(self):
378        return self.__dict__
379
380    def __setstate__(self, d):
381        self.__dict__.update(d)
382
383    def __call__(self):
384        return self

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

SQLConnector( label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)
102    def __init__(
103        self,
104        label: Optional[str] = None,
105        flavor: Optional[str] = None,
106        wait: bool = False,
107        connect: bool = False,
108        debug: bool = False,
109        **kw: Any
110    ):
111        """
112        Parameters
113        ----------
114        label: str, default 'main'
115            The identifying label for the connector.
116            E.g. for `sql:main`, 'main' is the label.
117            Defaults to 'main'.
118
119        flavor: Optional[str], default None
120            The database flavor, e.g.
121            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
122            To see supported flavors, run the `bootstrap connectors` command.
123
124        wait: bool, default False
125            If `True`, block until a database connection has been made.
126            Defaults to `False`.
127
128        connect: bool, default False
129            If `True`, immediately attempt to connect the database and raise
130            a warning if the connection fails.
131            Defaults to `False`.
132
133        debug: bool, default False
134            Verbosity toggle.
135            Defaults to `False`.
136
137        kw: Any
138            All other arguments will be passed to the connector's attributes.
139            Therefore, a connector may be made without being registered,
140            as long enough parameters are supplied to the constructor.
141        """
142        if 'uri' in kw:
143            uri = kw['uri']
144            if uri.startswith('postgres') and not uri.startswith('postgresql'):
145                uri = uri.replace('postgres', 'postgresql', 1)
146            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
147                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
148            if uri.startswith('timescaledb://'):
149                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
150                flavor = 'timescaledb'
151            kw['uri'] = uri
152            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
153            label = label or from_uri_params.get('label', None)
154            _ = from_uri_params.pop('label', None)
155
156            ### Sometimes the flavor may be provided with a URI.
157            kw.update(from_uri_params)
158            if flavor:
159                kw['flavor'] = flavor
160
161        ### set __dict__ in base class
162        super().__init__(
163            'sql',
164            label = label or self.__dict__.get('label', None),
165            **kw
166        )
167
168        if self.__dict__.get('flavor', None) == 'sqlite':
169            self._reset_attributes()
170            self._set_attributes(
171                'sql',
172                label = label,
173                inherit_default = False,
174                **kw
175            )
176            ### For backwards compatability reasons, set the path for sql:local if its missing.
177            if self.label == 'local' and not self.__dict__.get('database', None):
178                from meerschaum.config._paths import SQLITE_DB_PATH
179                self.database = str(SQLITE_DB_PATH)
180
181        ### ensure flavor and label are set accordingly
182        if 'flavor' not in self.__dict__:
183            if flavor is None and 'uri' not in self.__dict__:
184                raise Exception(
185                    f"    Missing flavor. Provide flavor as a key for '{self}'."
186                )
187            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
188
189        if self.flavor == 'postgres':
190            self.flavor = 'postgresql'
191
192        self._debug = debug
193        ### Store the PID and thread at initialization
194        ### so we can dispose of the Pool in child processes or threads.
195        import os, threading
196        self._pid = os.getpid()
197        self._thread_ident = threading.current_thread().ident
198        self._sessions = {}
199        self._locks = {'_sessions': threading.RLock(), }
200
201        ### verify the flavor's requirements are met
202        if self.flavor not in self.flavor_configs:
203            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
204        if not self.__dict__.get('uri'):
205            self.verify_attributes(
206                self.flavor_configs[self.flavor].get('requirements', set()),
207                debug=debug,
208            )
209
210        if wait:
211            from meerschaum.connectors.poll import retry_connect
212            retry_connect(connector=self, debug=debug)
213
214        if connect:
215            if not self.test_connection(debug=debug):
216                warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
  • label (str, default 'main'): The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
  • flavor (Optional[str], default None): The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
  • wait (bool, default False): If True, block until a database connection has been made. Defaults to False.
  • connect (bool, default False): If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
IS_INSTANCE: bool = True
Session
218    @property
219    def Session(self):
220        if '_Session' not in self.__dict__:
221            if self.engine is None:
222                return None
223
224            from meerschaum.utils.packages import attempt_import
225            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
226            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
227            self._Session = sqlalchemy_orm.scoped_session(session_factory)
228
229        return self._Session
engine
231    @property
232    def engine(self):
233        """
234        Return the SQLAlchemy engine connected to the configured database.
235        """
236        import os
237        import threading
238        if '_engine' not in self.__dict__:
239            self._engine, self._engine_str = self.create_engine(include_uri=True)
240
241        same_process = os.getpid() == self._pid
242        same_thread = threading.current_thread().ident == self._thread_ident
243
244        ### handle child processes
245        if not same_process:
246            self._pid = os.getpid()
247            self._thread = threading.current_thread()
248            warn("Different PID detected. Disposing of connections...")
249            self._engine.dispose()
250
251        ### handle different threads
252        if not same_thread:
253            if self.flavor == 'duckdb':
254                warn("Different thread detected.")
255                self._engine.dispose()
256
257        return self._engine

Return the SQLAlchemy engine connected to the configured database.

DATABASE_URL: str
259    @property
260    def DATABASE_URL(self) -> str:
261        """
262        Return the URI connection string (alias for `SQLConnector.URI`.
263        """
264        _ = self.engine
265        return str(self._engine_str)

Return the URI connection string (alias for SQLConnector.URI.

URI: str
267    @property
268    def URI(self) -> str:
269        """
270        Return the URI connection string.
271        """
272        _ = self.engine
273        return str(self._engine_str)

Return the URI connection string.

IS_THREAD_SAFE: str
275    @property
276    def IS_THREAD_SAFE(self) -> str:
277        """
278        Return whether this connector may be multithreaded.
279        """
280        if self.flavor in ('duckdb', 'oracle'):
281            return False
282        if self.flavor == 'sqlite':
283            return ':memory:' not in self.URI
284        return True

Return whether this connector may be multithreaded.

metadata
287    @property
288    def metadata(self):
289        """
290        Return the metadata bound to this configured schema.
291        """
292        from meerschaum.utils.packages import attempt_import
293        sqlalchemy = attempt_import('sqlalchemy')
294        if '_metadata' not in self.__dict__:
295            self._metadata = sqlalchemy.MetaData(schema=self.schema)
296        return self._metadata

Return the metadata bound to this configured schema.

instance_schema
299    @property
300    def instance_schema(self):
301        """
302        Return the schema name for Meerschaum tables. 
303        """
304        return self.schema

Return the schema name for Meerschaum tables.

internal_schema
307    @property
308    def internal_schema(self):
309        """
310        Return the schema name for internal tables. 
311        """
312        from meerschaum.config.static import STATIC_CONFIG
313        from meerschaum.utils.packages import attempt_import
314        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
315        schema_name = self.__dict__.get('internal_schema', None) or (
316            STATIC_CONFIG['sql']['internal_schema']
317            if self.flavor not in NO_SCHEMA_FLAVORS
318            else self.schema
319        )
320
321        if '_internal_schema' not in self.__dict__:
322            self._internal_schema = schema_name
323        return self._internal_schema

Return the schema name for internal tables.

db: 'Optional[databases.Database]'
326    @property
327    def db(self) -> Optional[databases.Database]:
328        from meerschaum.utils.packages import attempt_import
329        databases = attempt_import('databases', lazy=False, install=True)
330        url = self.DATABASE_URL
331        if 'mysql' in url:
332            url = url.replace('+pymysql', '')
333        if '_db' not in self.__dict__:
334            try:
335                self._db = databases.Database(url)
336            except KeyError:
337                ### Likely encountered an unsupported flavor.
338                from meerschaum.utils.warnings import warn
339                self._db = None
340        return self._db
db_version: Optional[str]
343    @property
344    def db_version(self) -> Union[str, None]:
345        """
346        Return the database version.
347        """
348        _db_version = self.__dict__.get('_db_version', None)
349        if _db_version is not None:
350            return _db_version
351
352        from meerschaum.utils.sql import get_db_version
353        self._db_version = get_db_version(self)
354        return self._db_version

Return the database version.

schema: Optional[str]
357    @property
358    def schema(self) -> Union[str, None]:
359        """
360        Return the default schema to use.
361        A value of `None` will not prepend a schema.
362        """
363        if 'schema' in self.__dict__:
364            return self.__dict__['schema']
365
366        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
367        if self.flavor in NO_SCHEMA_FLAVORS:
368            self.__dict__['schema'] = None
369            return None
370
371        sqlalchemy = mrsm.attempt_import('sqlalchemy')
372        _schema = sqlalchemy.inspect(self.engine).default_schema_name
373        self.__dict__['schema'] = _schema
374        return _schema

Return the default schema to use. A value of None will not prepend a schema.

flavor_configs = {'timescaledb': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'username', 'password'}, 'defaults': {'port': 5432}}, 'postgresql': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'username', 'password'}, 'defaults': {'port': 5432}}, 'citus': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'username', 'password'}, 'defaults': {'port': 5432}}, 'mssql': {'engine': 'mssql+pyodbc', 'create_engine': {'fast_executemany': True, 'isolation_level': 'AUTOCOMMIT', 'use_setinputsizes': False, 'pool_pre_ping': True, 'ignore_no_transaction_on_rollback': True}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'host', 'database', 'username', 'password'}, 'defaults': {'port': 1433, 'options': 'driver=ODBC Driver 18 for SQL Server&UseFMTONLY=Yes&TrustServerCertificate=yes&Encrypt=no&MARS_Connection=yes'}}, 'mysql': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'host', 'database', 'username', 'password'}, 'defaults': {'port': 3306}}, 'mariadb': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'host', 'database', 'username', 'password'}, 'defaults': {'port': 3306}}, 'oracle': {'engine': 'oracle+cx_oracle', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'host', 'database', 'username', 'password'}, 'defaults': {'port': 1521}}, 'sqlite': {'engine': 'sqlite', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database'}, 'defaults': {}}, 'duckdb': {'engine': 'duckdb', 'create_engine': {}, 'omit_create_engine': {'ALL'}, 'to_sql': {'method': 'multi'}, 'requirements': '', 'defaults': {}}, 'cockroachdb': {'engine': 'cockroachdb', 'omit_create_engine': {'method'}, 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'to_sql': {'method': 'multi'}, 'requirements': {'host'}, 'defaults': {'port': 26257, 'database': 'defaultdb', 'username': 'root', 'password': 'admin'}}}
def create_engine( self, include_uri: bool = False, debug: bool = False, **kw) -> 'sqlalchemy.engine.Engine':
180def create_engine(
181    self,
182    include_uri: bool = False,
183    debug: bool = False,
184    **kw
185) -> 'sqlalchemy.engine.Engine':
186    """Create a sqlalchemy engine by building the engine string."""
187    from meerschaum.utils.packages import attempt_import
188    from meerschaum.utils.warnings import error, warn
189    sqlalchemy = attempt_import('sqlalchemy')
190    import urllib
191    import copy
192    ### Install and patch required drivers.
193    if self.flavor in install_flavor_drivers:
194        attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False)
195        if self.flavor == 'mssql':
196            pyodbc = attempt_import('pyodbc', debug=debug, lazy=False, warn=False)
197            pyodbc.pooling = False
198    if self.flavor in require_patching_flavors:
199        from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution
200        import pathlib
201        for install_name, import_name in require_patching_flavors[self.flavor]:
202            pkg = attempt_import(
203                import_name,
204                debug=debug,
205                lazy=False,
206                warn=False
207            )
208            _monkey_patch_get_distribution(
209                install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm')
210            )
211
212    ### supplement missing values with defaults (e.g. port number)
213    for a, value in flavor_configs[self.flavor]['defaults'].items():
214        if a not in self.__dict__:
215            self.__dict__[a] = value
216
217    ### Verify that everything is in order.
218    if self.flavor not in flavor_configs:
219        error(f"Cannot create a connector with the flavor '{self.flavor}'.")
220
221    _engine = flavor_configs[self.flavor].get('engine', None)
222    _username = self.__dict__.get('username', None)
223    _password = self.__dict__.get('password', None)
224    _host = self.__dict__.get('host', None)
225    _port = self.__dict__.get('port', None)
226    _database = self.__dict__.get('database', None)
227    _options = self.__dict__.get('options', {})
228    if isinstance(_options, str):
229        _options = dict(urllib.parse.parse_qsl(_options))
230    _uri = self.__dict__.get('uri', None)
231
232    ### Handle registering specific dialects (due to installing in virtual environments).
233    if self.flavor in flavor_dialects:
234        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])
235
236    ### self._sys_config was deepcopied and can be updated safely
237    if self.flavor in ("sqlite", "duckdb"):
238        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
239        if 'create_engine' not in self._sys_config:
240            self._sys_config['create_engine'] = {}
241        if 'connect_args' not in self._sys_config['create_engine']:
242            self._sys_config['create_engine']['connect_args'] = {}
243        self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False})
244    else:
245        engine_str = (
246            _engine + "://" + (_username if _username is not None else '') +
247            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
248            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
249            (("/" + _database) if _database is not None else '')
250            + (("?" + urllib.parse.urlencode(_options)) if _options else '')
251        ) if not _uri else _uri
252
253        ### Sometimes the timescaledb:// flavor can slip in.
254        if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri:
255            engine_str = engine_str.replace(f'{self.flavor}', 'postgresql', 1)
256
257    if debug:
258        dprint(
259            (
260                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
261                    if _password is not None else engine_str
262            ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}"
263        )
264
265    _kw_copy = copy.deepcopy(kw)
266
267    ### NOTE: Order of inheritance:
268    ###       1. Defaults
269    ###       2. System configuration
270    ###       3. Connector configuration
271    ###       4. Keyword arguments
272    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
273    def _apply_create_engine_args(update):
274        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
275            _create_engine_args.update(
276                { k: v for k, v in update.items()
277                    if 'omit_create_engine' not in flavor_configs[self.flavor]
278                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
279                }
280            )
281    _apply_create_engine_args(self._sys_config.get('create_engine', {}))
282    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
283    _apply_create_engine_args(_kw_copy)
284
285    try:
286        engine = sqlalchemy.create_engine(
287            engine_str,
288            ### I know this looks confusing, and maybe it's bad code,
289            ### but it's simple. It dynamically parses the config string
290            ### and splits it to separate the class name (QueuePool)
291            ### from the module name (sqlalchemy.pool).
292            poolclass    = getattr(
293                attempt_import(
294                    ".".join(self._sys_config['poolclass'].split('.')[:-1])
295                ),
296                self._sys_config['poolclass'].split('.')[-1]
297            ),
298            echo         = debug,
299            **_create_engine_args
300        )
301    except Exception as e:
302        warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False)
303        engine = None
304
305    if include_uri:
306        return engine, engine_str
307    return engine

Create a sqlalchemy engine by building the engine string.

def read( self, query_or_table: 'Union[str, sqlalchemy.Query]', params: Union[Dict[str, Any], List[str], NoneType] = None, dtype: Optional[Dict[str, Any]] = None, coerce_float: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.core.frame.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, schema: Optional[str] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any) -> 'Union[pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None]':
 26def read(
 27    self,
 28    query_or_table: Union[str, sqlalchemy.Query],
 29    params: Union[Dict[str, Any], List[str], None] = None,
 30    dtype: Optional[Dict[str, Any]] = None,
 31    coerce_float: bool = True,
 32    chunksize: Optional[int] = -1,
 33    workers: Optional[int] = None,
 34    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
 35    as_hook_results: bool = False,
 36    chunks: Optional[int] = None,
 37    schema: Optional[str] = None,
 38    as_chunks: bool = False,
 39    as_iterator: bool = False,
 40    as_dask: bool = False,
 41    index_col: Optional[str] = None,
 42    silent: bool = False,
 43    debug: bool = False,
 44    **kw: Any
 45) -> Union[
 46    pandas.DataFrame,
 47    dask.DataFrame,
 48    List[pandas.DataFrame],
 49    List[Any],
 50    None,
 51]:
 52    """
 53    Read a SQL query or table into a pandas dataframe.
 54
 55    Parameters
 56    ----------
 57    query_or_table: Union[str, sqlalchemy.Query]
 58        The SQL query (sqlalchemy Query or string) or name of the table from which to select.
 59
 60    params: Optional[Dict[str, Any]], default None
 61        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
 62        See the pandas documentation for more information:
 63        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
 64
 65    dtype: Optional[Dict[str, Any]], default None
 66        A dictionary of data types to pass to `pandas.read_sql()`.
 67        See the pandas documentation for more information:
 68        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
 69
 70    chunksize: Optional[int], default -1
 71        How many chunks to read at a time. `None` will read everything in one large chunk.
 72        Defaults to system configuration.
 73
 74        **NOTE:** DuckDB does not allow for chunking.
 75
 76    workers: Optional[int], default None
 77        How many threads to use when consuming the generator.
 78        Only applies if `chunk_hook` is provided.
 79
 80    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
 81        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
 82        See `--sync-chunks` for an example.
 83        **NOTE:** `as_iterator` MUST be False (default).
 84
 85    as_hook_results: bool, default False
 86        If `True`, return a `List` of the outputs of the hook function.
 87        Only applicable if `chunk_hook` is not None.
 88
 89        **NOTE:** `as_iterator` MUST be `False` (default).
 90
 91    chunks: Optional[int], default None
 92        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
 93        return into a single dataframe.
 94        For example, to limit the returned dataframe to 100,000 rows,
 95        you could specify a `chunksize` of `1000` and `chunks` of `100`.
 96
 97    schema: Optional[str], default None
 98        If just a table name is provided, optionally specify the table schema.
 99        Defaults to `SQLConnector.schema`.
100
101    as_chunks: bool, default False
102        If `True`, return a list of DataFrames.
103        Otherwise return a single DataFrame.
104
105    as_iterator: bool, default False
106        If `True`, return the pandas DataFrame iterator.
107        `chunksize` must not be `None` (falls back to 1000 if so),
108        and hooks are not called in this case.
109
110    index_col: Optional[str], default None
111        If using Dask, use this column as the index column.
112        If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
113
114    silent: bool, default False
115        If `True`, don't raise warnings in case of errors.
116        Defaults to `False`.
117
118    Returns
119    -------
120    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
121    or `None` if something breaks.
122
123    """
124    if chunks is not None and chunks <= 0:
125        return []
126    from meerschaum.utils.sql import sql_item_name, truncate_item_name
127    from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone
128    from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS, TIMEZONE_NAIVE_FLAVORS
129    from meerschaum.utils.packages import attempt_import, import_pandas
130    from meerschaum.utils.pool import get_pool
131    from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols
132    import warnings
133    import traceback
134    from decimal import Decimal
135    pd = import_pandas()
136    dd = None
137    is_dask = 'dask' in pd.__name__
138    pandas = attempt_import('pandas')
139    is_dask = dd is not None
140    npartitions = chunksize_to_npartitions(chunksize)
141    if is_dask:
142        chunksize = None
143    schema = schema or self.schema
144    utc_dt_cols = [
145        col
146        for col, typ in dtype.items()
147        if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower()
148    ] if dtype else []
149
150    if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS:
151        dtype = dtype.copy()
152        for col in utc_dt_cols:
153            dtype[col] = 'datetime64[ns]'
154
155    pool = get_pool(workers=workers)
156    sqlalchemy = attempt_import("sqlalchemy")
157    default_chunksize = self._sys_config.get('chunksize', None)
158    chunksize = chunksize if chunksize != -1 else default_chunksize
159    if chunksize is None and as_iterator:
160        if not silent and self.flavor not in _disallow_chunks_flavors:
161            warn(
162                "An iterator may only be generated if chunksize is not None.\n"
163                + "Falling back to a chunksize of 1000.", stacklevel=3,
164            )
165        chunksize = 1000
166    if chunksize is not None and self.flavor in _max_chunks_flavors:
167        if chunksize > _max_chunks_flavors[self.flavor]:
168            if chunksize != default_chunksize:
169                warn(
170                    f"The specified chunksize of {chunksize} exceeds the maximum of "
171                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
172                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
173                    stacklevel=3,
174                )
175            chunksize = _max_chunks_flavors[self.flavor]
176
177    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
178        chunksize = None
179
180    if debug:
181        import time
182        start = time.perf_counter()
183        dprint(f"[{self}]\n{query_or_table}")
184        dprint(f"[{self}] Fetching with chunksize: {chunksize}")
185
186    ### This might be sqlalchemy object or the string of a table name.
187    ### We check for spaces and quotes to see if it might be a weird table.
188    if (
189        ' ' not in str(query_or_table)
190        or (
191            ' ' in str(query_or_table)
192            and str(query_or_table).startswith('"')
193            and str(query_or_table).endswith('"')
194        )
195    ):
196        truncated_table_name = truncate_item_name(str(query_or_table), self.flavor)
197        if truncated_table_name != str(query_or_table) and not silent:
198            warn(
199                f"Table '{query_or_table}' is too long for '{self.flavor}',"
200                + f" will instead read the table '{truncated_table_name}'."
201            )
202
203        query_or_table = sql_item_name(str(query_or_table), self.flavor, schema)
204        if debug:
205            dprint(f"[{self}] Reading from table {query_or_table}")
206        formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table))
207        str_query = f"SELECT * FROM {query_or_table}"
208    else:
209        str_query = query_or_table
210
211    formatted_query = (
212        sqlalchemy.text(str_query)
213        if not is_dask and isinstance(str_query, str)
214        else format_sql_query_for_dask(str_query)
215    )
216
217    chunk_list = []
218    chunk_hook_results = []
219    def _process_chunk(_chunk, _retry_on_failure: bool = True):
220        if self.flavor in TIMEZONE_NAIVE_FLAVORS:
221            for col in utc_dt_cols:
222                _chunk[col] = coerce_timezone(_chunk[col], strip_timezone=False)
223        if not as_hook_results:
224            chunk_list.append(_chunk)
225        if chunk_hook is None:
226            return None
227
228        result = None
229        try:
230            result = chunk_hook(
231                _chunk,
232                workers=workers,
233                chunksize=chunksize,
234                debug=debug,
235                **kw
236            )
237        except Exception:
238            result = False, traceback.format_exc()
239            from meerschaum.utils.formatting import get_console
240            if not silent:
241                get_console().print_exception()
242
243        ### If the chunk fails to process, try it again one more time.
244        if isinstance(result, tuple) and result[0] is False:
245            if _retry_on_failure:
246                return _process_chunk(_chunk, _retry_on_failure=False)
247
248        return result
249
250    try:
251        stream_results = not as_iterator and chunk_hook is not None and chunksize is not None
252        with warnings.catch_warnings():
253            warnings.filterwarnings('ignore', 'case sensitivity issues')
254
255            read_sql_query_kwargs = {
256                'params': params,
257                'dtype': dtype,
258                'coerce_float': coerce_float,
259                'index_col': index_col,
260            }
261            if is_dask:
262                if index_col is None:
263                    dd = None
264                    pd = attempt_import('pandas')
265                    read_sql_query_kwargs.update({
266                        'chunksize': chunksize,
267                    })
268            else:
269                read_sql_query_kwargs.update({
270                    'chunksize': chunksize,
271                })
272
273            if is_dask and dd is not None:
274                ddf = dd.read_sql_query(
275                    formatted_query,
276                    self.URI,
277                    **read_sql_query_kwargs
278                )
279            else:
280
281                def get_chunk_generator(connectable):
282                    chunk_generator = pd.read_sql_query(
283                        formatted_query,
284                        self.engine,
285                        **read_sql_query_kwargs
286                    )
287                    to_return = (
288                        chunk_generator
289                        if as_iterator or chunksize is None
290                        else (
291                            list(pool.imap(_process_chunk, chunk_generator))
292                            if as_hook_results
293                            else None
294                        )
295                    )
296                    return chunk_generator, to_return
297
298                if self.flavor in SKIP_READ_TRANSACTION_FLAVORS:
299                    chunk_generator, to_return = get_chunk_generator(self.engine)
300                else:
301                    with self.engine.begin() as transaction:
302                        with transaction.execution_options(stream_results=stream_results) as connection:
303                            chunk_generator, to_return = get_chunk_generator(connection)
304
305                if to_return is not None:
306                    return to_return
307
308    except Exception as e:
309        if debug:
310            dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n")
311        if not silent:
312            warn(str(e), stacklevel=3)
313        from meerschaum.utils.formatting import get_console
314        if not silent:
315            get_console().print_exception()
316
317        return None
318
319    if is_dask and dd is not None:
320        ddf = ddf.reset_index()
321        return ddf
322
323    chunk_list = []
324    read_chunks = 0
325    chunk_hook_results = []
326    if chunksize is None:
327        chunk_list.append(chunk_generator)
328    elif as_iterator:
329        return chunk_generator
330    else:
331        try:
332            for chunk in chunk_generator:
333                if chunk_hook is not None:
334                    chunk_hook_results.append(
335                        chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
336                    )
337                chunk_list.append(chunk)
338                read_chunks += 1
339                if chunks is not None and read_chunks >= chunks:
340                    break
341        except Exception as e:
342            warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
343            from meerschaum.utils.formatting import get_console
344            if not silent:
345                get_console().print_exception()
346
347    read_chunks = 0
348    try:
349        for chunk in chunk_generator:
350            if chunk_hook is not None:
351                chunk_hook_results.append(
352                    chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
353                )
354            chunk_list.append(chunk)
355            read_chunks += 1
356            if chunks is not None and read_chunks >= chunks:
357                break
358    except Exception as e:
359        warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
360        from meerschaum.utils.formatting import get_console
361        if not silent:
362            get_console().print_exception()
363
364        return None
365
366    ### If no chunks returned, read without chunks
367    ### to get columns
368    if len(chunk_list) == 0:
369        with warnings.catch_warnings():
370            warnings.filterwarnings('ignore', 'case sensitivity issues')
371            _ = read_sql_query_kwargs.pop('chunksize', None)
372            with self.engine.begin() as connection:
373                chunk_list.append(
374                    pd.read_sql_query(
375                        formatted_query,
376                        connection,
377                        **read_sql_query_kwargs
378                    )
379                )
380
381    ### call the hook on any missed chunks.
382    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
383        for c in chunk_list[len(chunk_hook_results):]:
384            chunk_hook_results.append(
385                chunk_hook(c, chunksize=chunksize, debug=debug, **kw)
386            )
387
388    ### chunksize is not None so must iterate
389    if debug:
390        end = time.perf_counter()
391        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")
392
393    if as_hook_results:
394        return chunk_hook_results
395    
396    ### Skip `pd.concat()` if `as_chunks` is specified.
397    if as_chunks:
398        for c in chunk_list:
399            c.reset_index(drop=True, inplace=True)
400            for col in get_numeric_cols(c):
401                c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
402        return chunk_list
403
404    df = pd.concat(chunk_list).reset_index(drop=True)
405    ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes.
406    for col in get_numeric_cols(df):
407        df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
408
409    return df

Read a SQL query or table into a pandas dataframe.

Parameters
  • query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
  • params (Optional[Dict[str, Any]], default None): List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
  • dtype (Optional[Dict[str, Any]], default None): A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
  • chunksize (Optional[int], default -1): How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

    NOTE: DuckDB does not allow for chunking.

  • workers (Optional[int], default None): How many threads to use when consuming the generator. Only applies if chunk_hook is provided.
  • chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None): Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
  • as_hook_results (bool, default False): If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

    NOTE: as_iterator MUST be False (default).

  • chunks (Optional[int], default None): Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
  • schema (Optional[str], default None): If just a table name is provided, optionally specify the table schema. Defaults to SQLConnector.schema.
  • as_chunks (bool, default False): If True, return a list of DataFrames. Otherwise return a single DataFrame.
  • as_iterator (bool, default False): If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case.
  • index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
  • silent (bool, default False): If True, don't raise warnings in case of errors. Defaults to False.
Returns
  • A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators,
  • or None if something breaks.
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) -> Any:
412def value(
413    self,
414    query: str,
415    *args: Any,
416    use_pandas: bool = False,
417    **kw: Any
418) -> Any:
419    """
420    Execute the provided query and return the first value.
421
422    Parameters
423    ----------
424    query: str
425        The SQL query to execute.
426        
427    *args: Any
428        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
429        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
430        
431    use_pandas: bool, default False
432        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
433        `meerschaum.connectors.sql.SQLConnector.exec` (default).
434        **NOTE:** This is always `True` for DuckDB.
435
436    **kw: Any
437        See `args`.
438
439    Returns
440    -------
441    Any value returned from the query.
442
443    """
444    from meerschaum.utils.packages import attempt_import
445    sqlalchemy = attempt_import('sqlalchemy')
446    if self.flavor == 'duckdb':
447        use_pandas = True
448    if use_pandas:
449        try:
450            return self.read(query, *args, **kw).iloc[0, 0]
451        except Exception:
452            return None
453
454    _close = kw.get('close', True)
455    _commit = kw.get('commit', (self.flavor != 'mssql'))
456
457    #  _close = True
458    #  _commit = True
459
460    try:
461        result, connection = self.exec(
462            query,
463            *args,
464            with_connection=True,
465            close=False,
466            commit=_commit,
467            **kw
468        )
469        first = result.first() if result is not None else None
470        _val = first[0] if first is not None else None
471    except Exception as e:
472        warn(e, stacklevel=3)
473        return None
474    if _close:
475        try:
476            connection.close()
477        except Exception as e:
478            warn("Failed to close connection with exception:\n" + str(e))
479    return _val

Execute the provided query and return the first value.

Parameters
Returns
  • Any value returned from the query.
def exec( self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, _connection=None, _transaction=None, **kw: Any) -> 'Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]':
493def exec(
494    self,
495    query: str,
496    *args: Any,
497    silent: bool = False,
498    debug: bool = False,
499    commit: Optional[bool] = None,
500    close: Optional[bool] = None,
501    with_connection: bool = False,
502    _connection=None,
503    _transaction=None,
504    **kw: Any
505) -> Union[
506        sqlalchemy.engine.result.resultProxy,
507        sqlalchemy.engine.cursor.LegacyCursorResult,
508        Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
509        Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
510        None
511]:
512    """
513    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
514
515    If inserting data, please use bind variables to avoid SQL injection!
516
517    Parameters
518    ----------
519    query: Union[str, List[str], Tuple[str]]
520        The query to execute.
521        If `query` is a list or tuple, call `self.exec_queries()` instead.
522
523    args: Any
524        Arguments passed to `sqlalchemy.engine.execute`.
525
526    silent: bool, default False
527        If `True`, suppress warnings.
528
529    commit: Optional[bool], default None
530        If `True`, commit the changes after execution.
531        Causes issues with flavors like `'mssql'`.
532        This does not apply if `query` is a list of strings.
533
534    close: Optional[bool], default None
535        If `True`, close the connection after execution.
536        Causes issues with flavors like `'mssql'`.
537        This does not apply if `query` is a list of strings.
538
539    with_connection: bool, default False
540        If `True`, return a tuple including the connection object.
541        This does not apply if `query` is a list of strings.
542
543    Returns
544    -------
545    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.
546
547    """
548    if isinstance(query, (list, tuple)):
549        return self.exec_queries(
550            list(query),
551            *args,
552            silent=silent,
553            debug=debug,
554            **kw
555        )
556
557    from meerschaum.utils.packages import attempt_import
558    sqlalchemy = attempt_import("sqlalchemy")
559    if debug:
560        dprint(f"[{self}] Executing query:\n{query}")
561
562    _close = close if close is not None else (self.flavor != 'mssql')
563    _commit = commit if commit is not None else (
564        (self.flavor != 'mssql' or 'select' not in str(query).lower())
565    )
566
567    ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+).
568    if not hasattr(query, 'compile'):
569        query = sqlalchemy.text(query)
570
571    connection = _connection if _connection is not None else self.get_connection()
572
573    try:
574        transaction = (
575            _transaction
576            if _transaction is not None else (
577                connection.begin()
578                if _commit
579                else None
580            )
581        )
582    except sqlalchemy.exc.InvalidRequestError as e:
583        if _connection is not None or _transaction is not None:
584            raise e
585        connection = self.get_connection(rebuild=True)
586        transaction = connection.begin()
587
588    if transaction is not None and not transaction.is_active and _transaction is not None:
589        connection = self.get_connection(rebuild=True)
590        transaction = connection.begin() if _commit else None
591
592    result = None
593    try:
594        result = connection.execute(query, *args, **kw)
595        if _commit:
596            transaction.commit()
597    except Exception as e:
598        if debug:
599            dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}")
600        if not silent:
601            warn(str(e), stacklevel=3)
602        result = None
603        if _commit:
604            transaction.rollback()
605            connection = self.get_connection(rebuild=True)
606    finally:
607        if _close:
608            connection.close()
609
610    if with_connection:
611        return result, connection
612
613    return result

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters
  • query (Union[str, List[str], Tuple[str]]): The query to execute. If query is a list or tuple, call self.exec_queries() instead.
  • args (Any): Arguments passed to sqlalchemy.engine.execute.
  • silent (bool, default False): If True, suppress warnings.
  • commit (Optional[bool], default None): If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • close (Optional[bool], default None): If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • with_connection (bool, default False): If True, return a tuple including the connection object. This does not apply if query is a list of strings.
Returns
  • The sqlalchemy result object, or a tuple with the connection if with_connection is provided.
def execute( self, *args: Any, **kw: Any) -> 'Optional[sqlalchemy.engine.result.resultProxy]':
482def execute(
483    self,
484    *args : Any,
485    **kw : Any
486) -> Optional[sqlalchemy.engine.result.resultProxy]:
487    """
488    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
489    """
490    return self.exec(*args, **kw)
def to_sql( self, df: pandas.core.frame.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, schema: Optional[str] = None, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, _connection=None, _transaction=None, **kw) -> Union[bool, Tuple[bool, str]]:
710def to_sql(
711    self,
712    df: pandas.DataFrame,
713    name: str = None,
714    index: bool = False,
715    if_exists: str = 'replace',
716    method: str = "",
717    chunksize: Optional[int] = -1,
718    schema: Optional[str] = None,
719    silent: bool = False,
720    debug: bool = False,
721    as_tuple: bool = False,
722    as_dict: bool = False,
723    _connection=None,
724    _transaction=None,
725    **kw
726) -> Union[bool, SuccessTuple]:
727    """
728    Upload a DataFrame's contents to the SQL server.
729
730    Parameters
731    ----------
732    df: pd.DataFrame
733        The DataFrame to be uploaded.
734
735    name: str
736        The name of the table to be created.
737
738    index: bool, default False
739        If True, creates the DataFrame's indices as columns.
740
741    if_exists: str, default 'replace'
742        Drop and create the table ('replace') or append if it exists
743        ('append') or raise Exception ('fail').
744        Options are ['replace', 'append', 'fail'].
745
746    method: str, default ''
747        None or multi. Details on pandas.to_sql.
748
749    chunksize: Optional[int], default -1
750        How many rows to insert at a time.
751
752    schema: Optional[str], default None
753        Optionally override the schema for the table.
754        Defaults to `SQLConnector.schema`.
755
756    as_tuple: bool, default False
757        If `True`, return a (success_bool, message) tuple instead of a `bool`.
758        Defaults to `False`.
759
760    as_dict: bool, default False
761        If `True`, return a dictionary of transaction information.
762        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
763        `method`, and `target`.
764
765    kw: Any
766        Additional arguments will be passed to the DataFrame's `to_sql` function
767
768    Returns
769    -------
770    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
771    """
772    import time
773    import json
774    import decimal
775    from decimal import Decimal, Context
776    from meerschaum.utils.warnings import error, warn
777    import warnings
778    import functools
779    if name is None:
780        error(f"Name must not be `None` to insert data into {self}.")
781
782    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
783    kw.pop('name', None)
784
785    schema = schema or self.schema
786
787    from meerschaum.utils.sql import (
788        sql_item_name,
789        table_exists,
790        json_flavors,
791        truncate_item_name,
792        DROP_IF_EXISTS_FLAVORS,
793    )
794    from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols, get_uuid_cols
795    from meerschaum.utils.dtypes import are_dtypes_equal, quantize_decimal, coerce_timezone
796    from meerschaum.utils.dtypes.sql import (
797        NUMERIC_PRECISION_FLAVORS,
798        PD_TO_SQLALCHEMY_DTYPES_FLAVORS,
799        get_db_type_from_pd_type,
800    )
801    from meerschaum.connectors.sql._create_engine import flavor_configs
802    from meerschaum.utils.packages import attempt_import, import_pandas
803    sqlalchemy = attempt_import('sqlalchemy', debug=debug)
804    pd = import_pandas()
805    is_dask = 'dask' in df.__module__
806
807    stats = {'target': name, }
808    ### resort to defaults if None
809    if method == "":
810        if self.flavor in _bulk_flavors:
811            method = functools.partial(psql_insert_copy, schema=self.schema)
812        else:
813            ### Should resolve to 'multi' or `None`.
814            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
815    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)
816
817    default_chunksize = self._sys_config.get('chunksize', None)
818    chunksize = chunksize if chunksize != -1 else default_chunksize
819    if chunksize is not None and self.flavor in _max_chunks_flavors:
820        if chunksize > _max_chunks_flavors[self.flavor]:
821            if chunksize != default_chunksize:
822                warn(
823                    f"The specified chunksize of {chunksize} exceeds the maximum of "
824                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
825                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
826                    stacklevel = 3,
827                )
828            chunksize = _max_chunks_flavors[self.flavor]
829    stats['chunksize'] = chunksize
830
831    success, msg = False, "Default to_sql message"
832    start = time.perf_counter()
833    if debug:
834        msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..."
835        print(msg, end="", flush=True)
836    stats['num_rows'] = len(df)
837
838    ### Check if the name is too long.
839    truncated_name = truncate_item_name(name, self.flavor)
840    if name != truncated_name:
841        warn(
842            f"Table '{name}' is too long for '{self.flavor}',"
843            + f" will instead create the table '{truncated_name}'."
844        )
845
846    ### filter out non-pandas args
847    import inspect
848    to_sql_params = inspect.signature(df.to_sql).parameters
849    to_sql_kw = {}
850    for k, v in kw.items():
851        if k in to_sql_params:
852            to_sql_kw[k] = v
853
854    to_sql_kw.update({
855        'name': truncated_name,
856        'schema': schema,
857        ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI),
858        'index': index,
859        'if_exists': if_exists,
860        'method': method,
861        'chunksize': chunksize,
862    })
863    if is_dask:
864        to_sql_kw.update({
865            'parallel': True,
866        })
867    elif _connection is not None:
868        to_sql_kw['con'] = _connection
869
870    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
871    if self.flavor == 'oracle':
872        ### For some reason 'replace' doesn't work properly in pandas,
873        ### so try dropping first.
874        if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug):
875            success = self.exec(
876                f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema)
877            ) is not None
878            if not success:
879                warn(f"Unable to drop {name}")
880
881        ### Enforce NVARCHAR(2000) as text instead of CLOB.
882        dtype = to_sql_kw.get('dtype', {})
883        for col, typ in df.dtypes.items():
884            if are_dtypes_equal(str(typ), 'object'):
885                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
886            elif are_dtypes_equal(str(typ), 'int'):
887                dtype[col] = sqlalchemy.types.INTEGER
888        to_sql_kw['dtype'] = dtype
889    elif self.flavor == 'duckdb':
890        dtype = to_sql_kw.get('dtype', {})
891        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
892        for col in dt_cols:
893            df[col] = coerce_timezone(df[col], strip_utc=False)
894    elif self.flavor == 'mssql':
895        dtype = to_sql_kw.get('dtype', {})
896        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
897        new_dtype = {}
898        for col in dt_cols:
899            if col in dtype:
900                continue
901            dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True)
902            if col not in dtype:
903                new_dtype[col] = dt_typ
904
905        dtype.update(new_dtype)
906        to_sql_kw['dtype'] = dtype
907
908    ### Check for JSON columns.
909    if self.flavor not in json_flavors:
910        json_cols = get_json_cols(df)
911        if json_cols:
912            for col in json_cols:
913                df[col] = df[col].apply(
914                    (
915                        lambda x: json.dumps(x, default=str, sort_keys=True)
916                        if not isinstance(x, Hashable)
917                        else x
918                    )
919                )
920
921    ### Check for numeric columns.
922    numeric_scale, numeric_precision = NUMERIC_PRECISION_FLAVORS.get(self.flavor, (None, None))
923    if numeric_precision is not None and numeric_scale is not None:
924        numeric_cols = get_numeric_cols(df)
925        for col in numeric_cols:
926            df[col] = df[col].apply(
927                lambda x: (
928                    quantize_decimal(x, numeric_scale, numeric_precision)
929                    if isinstance(x, Decimal)
930                    else x
931                )
932            )
933
934    if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid':
935        uuid_cols = get_uuid_cols(df)
936        for col in uuid_cols:
937            df[col] = df[col].astype(str)
938
939    try:
940        with warnings.catch_warnings():
941            warnings.filterwarnings('ignore')
942            df.to_sql(**to_sql_kw)
943        success = True
944    except Exception as e:
945        if not silent:
946            warn(str(e))
947        success, msg = False, str(e)
948
949    end = time.perf_counter()
950    if success:
951        msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}."
952    stats['start'] = start
953    stats['end'] = end
954    stats['duration'] = end - start
955
956    if debug:
957        print(f" done.", flush=True)
958        dprint(msg)
959
960    stats['success'] = success
961    stats['msg'] = msg
962    if as_tuple:
963        return success, msg
964    if as_dict:
965        return stats
966    return success

Upload a DataFrame's contents to the SQL server.

Parameters
  • df (pd.DataFrame): The DataFrame to be uploaded.
  • name (str): The name of the table to be created.
  • index (bool, default False): If True, creates the DataFrame's indices as columns.
  • if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
  • method (str, default ''): None or multi. Details on pandas.to_sql.
  • chunksize (Optional[int], default -1): How many rows to insert at a time.
  • schema (Optional[str], default None): Optionally override the schema for the table. Defaults to SQLConnector.schema.
  • as_tuple (bool, default False): If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
  • as_dict (bool, default False): If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
  • kw (Any): Additional arguments will be passed to the DataFrame's to_sql function
Returns
  • Either a bool or a SuccessTuple (depends on as_tuple).
def exec_queries( self, queries: "List[Union[str, Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]]]", break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False) -> 'List[sqlalchemy.engine.cursor.LegacyCursorResult]':
616def exec_queries(
617    self,
618    queries: List[
619        Union[
620            str,
621            Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]
622        ]
623    ],
624    break_on_error: bool = False,
625    rollback: bool = True,
626    silent: bool = False,
627    debug: bool = False,
628) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]:
629    """
630    Execute a list of queries in a single transaction.
631
632    Parameters
633    ----------
634    queries: List[
635        Union[
636            str,
637            Tuple[str, Callable[[], List[str]]]
638        ]
639    ]
640        The queries in the transaction to be executed.
641        If a query is a tuple, the second item of the tuple
642        will be considered a callable hook that returns a list of queries to be executed
643        before the next item in the list.
644
645    break_on_error: bool, default False
646        If `True`, stop executing when a query fails.
647
648    rollback: bool, default True
649        If `break_on_error` is `True`, rollback the transaction if a query fails.
650
651    silent: bool, default False
652        If `True`, suppress warnings.
653
654    Returns
655    -------
656    A list of SQLAlchemy results.
657    """
658    from meerschaum.utils.warnings import warn
659    from meerschaum.utils.debug import dprint
660    from meerschaum.utils.packages import attempt_import
661    sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm')
662    session = sqlalchemy_orm.Session(self.engine)
663
664    result = None
665    results = []
666    with session.begin():
667        for query in queries:
668            hook = None
669            result = None
670
671            if isinstance(query, tuple):
672                query, hook = query
673            if isinstance(query, str):
674                query = sqlalchemy.text(query)
675
676            if debug:
677                dprint(f"[{self}]\n" + str(query))
678
679            try:
680                result = session.execute(query)
681                session.flush()
682            except Exception as e:
683                msg = (f"Encountered error while executing:\n{e}")
684                if not silent:
685                    warn(msg)
686                elif debug:
687                    dprint(f"[{self}]\n" + str(msg))
688                result = None
689            if result is None and break_on_error:
690                if rollback:
691                    session.rollback()
692                break
693            elif result is not None and hook is not None:
694                hook_queries = hook(session)
695                if hook_queries:
696                    hook_results = self.exec_queries(
697                        hook_queries,
698                        break_on_error = break_on_error,
699                        rollback=rollback,
700                        silent=silent,
701                        debug=debug,
702                    )
703                    result = (result, hook_results)
704
705            results.append(result)
706
707    return results

Execute a list of queries in a single transaction.

Parameters
  • queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
  • ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
  • break_on_error (bool, default False): If True, stop executing when a query fails.
  • rollback (bool, default True): If break_on_error is True, rollback the transaction if a query fails.
  • silent (bool, default False): If True, suppress warnings.
Returns
  • A list of SQLAlchemy results.
def get_connection(self, rebuild: bool = False) -> "'sqlalchemy.engine.base.Connection'":
1065def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection':
1066    """
1067    Return the current alive connection.
1068
1069    Parameters
1070    ----------
1071    rebuild: bool, default False
1072        If `True`, close the previous connection and open a new one.
1073
1074    Returns
1075    -------
1076    A `sqlalchemy.engine.base.Connection` object.
1077    """
1078    import threading
1079    if '_thread_connections' not in self.__dict__:
1080        self.__dict__['_thread_connections'] = {}
1081
1082    self._cleanup_connections()
1083
1084    thread_id = threading.get_ident()
1085
1086    thread_connections = self.__dict__.get('_thread_connections', {})
1087    connection = thread_connections.get(thread_id, None)
1088
1089    if rebuild and connection is not None:
1090        try:
1091            connection.close()
1092        except Exception:
1093            pass
1094
1095        _ = thread_connections.pop(thread_id, None)
1096        connection = None
1097
1098    if connection is None or connection.closed:
1099        connection = self.engine.connect()
1100        thread_connections[thread_id] = connection
1101
1102    return connection

Return the current alive connection.

Parameters
  • rebuild (bool, default False): If True, close the previous connection and open a new one.
Returns
  • A sqlalchemy.engine.base.Connection object.
def test_connection(self, **kw: Any) -> Optional[bool]:
642def test_connection(
643    self,
644    **kw: Any
645) -> Union[bool, None]:
646    """
647    Test if a successful connection to the database may be made.
648
649    Parameters
650    ----------
651    **kw:
652        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
653
654    Returns
655    -------
656    `True` if a connection is made, otherwise `False` or `None` in case of failure.
657
658    """
659    import warnings
660    from meerschaum.connectors.poll import retry_connect
661    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
662    _default_kw.update(kw)
663    with warnings.catch_warnings():
664        warnings.filterwarnings('ignore', 'Could not')
665        try:
666            return retry_connect(**_default_kw)
667        except Exception as e:
668            return False

Test if a successful connection to the database may be made.

Parameters
Returns
  • True if a connection is made, otherwise False or None in case of failure.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, chunk_hook: "Optional[Callable[['pd.DataFrame'], Any]]" = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', List[Any], None]":
 17def fetch(
 18    self,
 19    pipe: mrsm.Pipe,
 20    begin: Union[datetime, int, str, None] = '',
 21    end: Union[datetime, int, str, None] = None,
 22    check_existing: bool = True,
 23    chunk_hook: Optional[Callable[['pd.DataFrame'], Any]] = None,
 24    chunksize: Optional[int] = -1,
 25    workers: Optional[int] = None,
 26    debug: bool = False,
 27    **kw: Any
 28) -> Union['pd.DataFrame', List[Any], None]:
 29    """Execute the SQL definition and return a Pandas DataFrame.
 30
 31    Parameters
 32    ----------
 33    pipe: mrsm.Pipe
 34        The pipe object which contains the `fetch` metadata.
 35
 36        - pipe.columns['datetime']: str
 37            - Name of the datetime column for the remote table.
 38        - pipe.parameters['fetch']: Dict[str, Any]
 39            - Parameters necessary to execute a query.
 40        - pipe.parameters['fetch']['definition']: str
 41            - Raw SQL query to execute to generate the pandas DataFrame.
 42        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
 43            - How many minutes before `begin` to search for data (*optional*).
 44
 45    begin: Union[datetime, int, str, None], default None
 46        Most recent datatime to search for data.
 47        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
 48
 49    end: Union[datetime, int, str, None], default None
 50        The latest datetime to search for data.
 51        If `end` is `None`, do not bound 
 52
 53    check_existing: bool, defult True
 54        If `False`, use a backtrack interval of 0 minutes.
 55
 56    chunk_hook: Callable[[pd.DataFrame], Any], default None
 57        A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame.
 58
 59    chunksize: Optional[int], default -1
 60        How many rows to load into memory at once (when `chunk_hook` is provided).
 61        Otherwise the entire result set is loaded into memory.
 62
 63    workers: Optional[int], default None
 64        How many threads to use when consuming the generator (when `chunk_hook is provided).
 65        Defaults to the number of cores.
 66
 67    debug: bool, default False
 68        Verbosity toggle.
 69
 70    Returns
 71    -------
 72    A pandas DataFrame or `None`.
 73    If `chunk_hook` is not None, return a list of the hook function's results.
 74    """
 75    meta_def = self.get_pipe_metadef(
 76        pipe,
 77        begin=begin,
 78        end=end,
 79        check_existing=check_existing,
 80        debug=debug,
 81        **kw
 82    )
 83    as_hook_results = chunk_hook is not None
 84    chunks = self.read(
 85        meta_def,
 86        chunk_hook=chunk_hook,
 87        as_hook_results=as_hook_results,
 88        chunksize=chunksize,
 89        workers=workers,
 90        debug=debug,
 91    )
 92    ### if sqlite, parse for datetimes
 93    if not as_hook_results and self.flavor == 'sqlite':
 94        from meerschaum.utils.dataframe import parse_df_datetimes
 95        from meerschaum.utils.dtypes import are_dtypes_equal
 96        ignore_cols = [
 97            col
 98            for col, dtype in pipe.dtypes.items()
 99            if not are_dtypes_equal(str(dtype), 'datetime')
100        ]
101        return (
102            parse_df_datetimes(
103                chunk,
104                ignore_cols=ignore_cols,
105                strip_timezone=(pipe.tzinfo is None),
106                debug=debug,
107            )
108            for chunk in chunks
109        )
110    return chunks

Execute the SQL definition and return a Pandas DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe object which contains the fetch metadata.

    • pipe.columns['datetime']: str
      • Name of the datetime column for the remote table.
    • pipe.parameters['fetch']: Dict[str, Any]
      • Parameters necessary to execute a query.
    • pipe.parameters['fetch']['definition']: str
      • Raw SQL query to execute to generate the pandas DataFrame.
    • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
      • How many minutes before begin to search for data (optional).
  • begin (Union[datetime, int, str, None], default None): Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
  • end (Union[datetime, int, str, None], default None): The latest datetime to search for data. If end is None, do not bound
  • check_existing (bool, defult True): If False, use a backtrack interval of 0 minutes.
  • chunk_hook (Callable[[pd.DataFrame], Any], default None): A function to pass to SQLConnector.read() that accepts a Pandas DataFrame.
  • chunksize (Optional[int], default -1): How many rows to load into memory at once (when chunk_hook is provided). Otherwise the entire result set is loaded into memory.
  • workers (Optional[int], default None): How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas DataFrame or None.
  • If chunk_hook is not None, return a list of the hook function's results.
def get_pipe_metadef( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, debug: bool = False, **kw: Any) -> Optional[str]:
113def get_pipe_metadef(
114    self,
115    pipe: mrsm.Pipe,
116    params: Optional[Dict[str, Any]] = None,
117    begin: Union[datetime, int, str, None] = '',
118    end: Union[datetime, int, str, None] = None,
119    check_existing: bool = True,
120    debug: bool = False,
121    **kw: Any
122) -> Union[str, None]:
123    """
124    Return a pipe's meta definition fetch query.
125
126    params: Optional[Dict[str, Any]], default None
127        Optional params dictionary to build the `WHERE` clause.
128        See `meerschaum.utils.sql.build_where`.
129
130    begin: Union[datetime, int, str, None], default None
131        Most recent datatime to search for data.
132        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
133
134    end: Union[datetime, int, str, None], default None
135        The latest datetime to search for data.
136        If `end` is `None`, do not bound 
137
138    check_existing: bool, default True
139        If `True`, apply the backtrack interval.
140
141    debug: bool, default False
142        Verbosity toggle.
143
144    Returns
145    -------
146    A pipe's meta definition fetch query string.
147    """
148    from meerschaum.utils.debug import dprint
149    from meerschaum.utils.warnings import warn, error
150    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
151    from meerschaum.utils.misc import is_int
152    from meerschaum.config import get_config
153
154    definition = get_pipe_query(pipe)
155
156    if not pipe.columns.get('datetime', None):
157        _dt = pipe.guess_datetime()
158        dt_name = sql_item_name(_dt, self.flavor, None) if _dt else None
159        is_guess = True
160    else:
161        _dt = pipe.get_columns('datetime')
162        dt_name = sql_item_name(_dt, self.flavor, None)
163        is_guess = False
164
165    if begin not in (None, '') or end is not None:
166        if is_guess:
167            if _dt is None:
168                warn(
169                    f"Unable to determine a datetime column for {pipe}."
170                    + "\n    Ignoring begin and end...",
171                    stack = False,
172                )
173                begin, end = '', None
174            else:
175                warn(
176                    f"A datetime wasn't specified for {pipe}.\n"
177                    + f"    Using column \"{_dt}\" for datetime bounds...",
178                    stack = False
179                )
180
181    apply_backtrack = begin == '' and check_existing
182    backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug)
183    btm = (
184        int(backtrack_interval.total_seconds() / 60)
185        if isinstance(backtrack_interval, timedelta)
186        else backtrack_interval
187    )
188    begin = (
189        pipe.get_sync_time(debug=debug)
190        if begin == ''
191        else begin
192    )
193
194    if begin and end and begin >= end:
195        begin = None
196
197    if dt_name:
198        begin_da = (
199            dateadd_str(
200                flavor=self.flavor,
201                datepart='minute',
202                number=((-1 * btm) if apply_backtrack else 0),
203                begin=begin,
204            )
205            if begin
206            else None
207        )
208        end_da = (
209            dateadd_str(
210                flavor=self.flavor,
211                datepart='minute',
212                number=0,
213                begin=end,
214            )
215            if end
216            else None
217        )
218
219    meta_def = (
220        _simple_fetch_query(pipe, self.flavor) if (
221            (not (pipe.columns or {}).get('id', None))
222            or (not get_config('system', 'experimental', 'join_fetch'))
223        ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw)
224    )
225
226    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
227    if dt_name and (begin_da or end_da):
228        definition_dt_name = (
229            dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}")
230            if not is_int((begin_da or end_da))
231            else f"definition.{dt_name}"
232        )
233        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
234        has_where = True
235        if begin_da:
236            meta_def += f"{definition_dt_name} >= {begin_da}"
237        if begin_da and end_da:
238            meta_def += " AND "
239        if end_da:
240            meta_def += f"{definition_dt_name} < {end_da}"
241
242    if params is not None:
243        params_where = build_where(params, self, with_where=False)
244        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
245        has_where = True
246        meta_def += params_where
247
248    return meta_def

Return a pipe's meta definition fetch query.

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.

begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime, int, str, None], default None The latest datetime to search for data. If end is None, do not bound

check_existing: bool, default True If True, apply the backtrack interval.

debug: bool, default False Verbosity toggle.

Returns
  • A pipe's meta definition fetch query string.
def cli(self, debug: bool = False) -> Tuple[bool, str]:
35def cli(
36        self,
37        debug: bool = False,
38    ) -> SuccessTuple:
39    """
40    Launch a subprocess for an interactive CLI.
41    """
42    from meerschaum.utils.venv import venv_exec
43    env = copy.deepcopy(dict(os.environ))
44    env[f'MRSM_SQL_{self.label.upper()}'] = json.dumps(self.meta)
45    cli_code = (
46        "import sys\n"
47        "import meerschaum as mrsm\n"
48        f"conn = mrsm.get_connector('sql:{self.label}')\n"
49        "success, msg = conn._cli_exit()\n"
50        "mrsm.pprint((success, msg))\n"
51        "if not success:\n"
52        "    raise Exception(msg)"
53    )
54    try:
55        _ = venv_exec(cli_code, venv=None, debug=debug, capture_output=False)
56    except Exception as e:
57        return False, f"[{self}] Failed to start CLI:\n{e}"
58    return True, "Success"

Launch a subprocess for an interactive CLI.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Optional[List[Tuple[str, str, Optional[str]]]]:
144def fetch_pipes_keys(
145    self,
146    connector_keys: Optional[List[str]] = None,
147    metric_keys: Optional[List[str]] = None,
148    location_keys: Optional[List[str]] = None,
149    tags: Optional[List[str]] = None,
150    params: Optional[Dict[str, Any]] = None,
151    debug: bool = False
152) -> Optional[List[Tuple[str, str, Optional[str]]]]:
153    """
154    Return a list of tuples corresponding to the parameters provided.
155
156    Parameters
157    ----------
158    connector_keys: Optional[List[str]], default None
159        List of connector_keys to search by.
160
161    metric_keys: Optional[List[str]], default None
162        List of metric_keys to search by.
163
164    location_keys: Optional[List[str]], default None
165        List of location_keys to search by.
166
167    params: Optional[Dict[str, Any]], default None
168        Dictionary of additional parameters to search by.
169        E.g. `--params pipe_id:1`
170
171    debug: bool, default False
172        Verbosity toggle.
173    """
174    from meerschaum.utils.debug import dprint
175    from meerschaum.utils.packages import attempt_import
176    from meerschaum.utils.misc import separate_negation_values, flatten_list
177    from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists
178    from meerschaum.config.static import STATIC_CONFIG
179    import json
180    from copy import deepcopy
181    sqlalchemy, sqlalchemy_sql_functions = attempt_import('sqlalchemy', 'sqlalchemy.sql.functions')
182    coalesce = sqlalchemy_sql_functions.coalesce
183
184    if connector_keys is None:
185        connector_keys = []
186    if metric_keys is None:
187        metric_keys = []
188    if location_keys is None:
189        location_keys = []
190    else:
191        location_keys = [
192            (
193                lk
194                if lk not in ('[None]', 'None', 'null')
195                else 'None'
196            )
197            for lk in location_keys
198        ]
199    if tags is None:
200        tags = []
201
202    if params is None:
203        params = {}
204
205    ### Add three primary keys to params dictionary
206    ###   (separated for convenience of arguments).
207    cols = {
208        'connector_keys': [str(ck) for ck in connector_keys],
209        'metric_key': [str(mk) for mk in metric_keys],
210        'location_key': [str(lk) for lk in location_keys],
211    }
212
213    ### Make deep copy so we don't mutate this somewhere else.
214    parameters = deepcopy(params)
215    for col, vals in cols.items():
216        if vals not in [[], ['*']]:
217            parameters[col] = vals
218
219    if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug):
220        return []
221
222    from meerschaum.connectors.sql.tables import get_tables
223    pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes']
224
225    _params = {}
226    for k, v in parameters.items():
227        _v = json.dumps(v) if isinstance(v, dict) else v
228        _params[k] = _v
229
230    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
231    ### Parse regular params.
232    ### If a param begins with '_', negate it instead.
233    _where = [
234        (
235            (coalesce(pipes_tbl.c[key], 'None') == val)
236            if not str(val).startswith(negation_prefix)
237            else (pipes_tbl.c[key] != key)
238        ) for key, val in _params.items()
239        if not isinstance(val, (list, tuple)) and key in pipes_tbl.c
240    ]
241    select_cols = (
242        [
243            pipes_tbl.c.connector_keys,
244            pipes_tbl.c.metric_key,
245            pipes_tbl.c.location_key,
246        ]
247    )
248
249    q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where))
250    for c, vals in cols.items():
251        if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c:
252            continue
253        _in_vals, _ex_vals = separate_negation_values(vals)
254        q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q
255        q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q
256
257    ### Finally, parse tags.
258    tag_groups = [tag.split(',') for tag in tags]
259    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
260
261    ors, nands = [], []
262    for _in_tags, _ex_tags in in_ex_tag_groups:
263        sub_ands = []
264        for nt in _in_tags:
265            sub_ands.append(
266                sqlalchemy.cast(
267                    pipes_tbl.c['parameters'],
268                    sqlalchemy.String,
269                ).like(f'%"tags":%"{nt}"%')
270            )
271        if sub_ands:
272            ors.append(sqlalchemy.and_(*sub_ands))
273
274        for xt in _ex_tags:
275            nands.append(
276                sqlalchemy.cast(
277                    pipes_tbl.c['parameters'],
278                    sqlalchemy.String,
279                ).not_like(f'%"tags":%"{xt}"%')
280            )
281
282    q = q.where(sqlalchemy.and_(*nands)) if nands else q
283    q = q.where(sqlalchemy.or_(*ors)) if ors else q
284    loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key'])
285    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
286        loc_asc = sqlalchemy.nullsfirst(loc_asc)
287    q = q.order_by(
288        sqlalchemy.asc(pipes_tbl.c['connector_keys']),
289        sqlalchemy.asc(pipes_tbl.c['metric_key']),
290        loc_asc,
291    )
292
293    ### execute the query and return a list of tuples
294    if debug:
295        dprint(q.compile(compile_kwargs={'literal_binds': True}))
296    try:
297        rows = (
298            self.execute(q).fetchall()
299            if self.flavor != 'duckdb'
300            else [
301                (row['connector_keys'], row['metric_key'], row['location_key'])
302                for row in self.read(q).to_dict(orient='records')
303            ]
304        )
305    except Exception as e:
306        error(str(e))
307
308    return [(row[0], row[1], row[2]) for row in rows]

Return a list of tuples corresponding to the parameters provided.

Parameters
  • connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
  • metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
  • location_keys (Optional[List[str]], default None): List of location_keys to search by.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. E.g. --params pipe_id:1
  • debug (bool, default False): Verbosity toggle.
def create_indices( self, pipe: meerschaum.Pipe, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
311def create_indices(
312    self,
313    pipe: mrsm.Pipe,
314    indices: Optional[List[str]] = None,
315    debug: bool = False
316) -> bool:
317    """
318    Create a pipe's indices.
319    """
320    from meerschaum.utils.sql import sql_item_name, update_queries
321    from meerschaum.utils.debug import dprint
322    if debug:
323        dprint(f"Creating indices for {pipe}...")
324    if not pipe.indices:
325        warn(f"{pipe} has no index columns; skipping index creation.", stack=False)
326        return True
327
328    _ = pipe.__dict__.pop('_columns_indices', None)
329    ix_queries = {
330        ix: queries
331        for ix, queries in self.get_create_index_queries(pipe, debug=debug).items()
332        if indices is None or ix in indices
333    }
334    success = True
335    for ix, queries in ix_queries.items():
336        ix_success = all(self.exec_queries(queries, debug=debug, silent=False))
337        success = success and ix_success
338        if not ix_success:
339            warn(f"Failed to create index on column: {ix}")
340
341    return success

Create a pipe's indices.

def drop_indices( self, pipe: meerschaum.Pipe, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
344def drop_indices(
345    self,
346    pipe: mrsm.Pipe,
347    indices: Optional[List[str]] = None,
348    debug: bool = False
349) -> bool:
350    """
351    Drop a pipe's indices.
352    """
353    from meerschaum.utils.debug import dprint
354    if debug:
355        dprint(f"Dropping indices for {pipe}...")
356    if not pipe.columns:
357        warn(f"Unable to drop indices for {pipe} without columns.", stack=False)
358        return False
359    ix_queries = {
360        ix: queries
361        for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items()
362        if indices is None or ix in indices
363    }
364    success = True
365    for ix, queries in ix_queries.items():
366        ix_success = all(self.exec_queries(queries, debug=debug, silent=True))
367        if not ix_success:
368            success = False
369            if debug:
370                dprint(f"Failed to drop index on column: {ix}")
371    return success

Drop a pipe's indices.

def get_create_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
374def get_create_index_queries(
375    self,
376    pipe: mrsm.Pipe,
377    debug: bool = False,
378) -> Dict[str, List[str]]:
379    """
380    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.
381
382    Parameters
383    ----------
384    pipe: mrsm.Pipe
385        The pipe to which the queries will correspond.
386
387    Returns
388    -------
389    A dictionary of index names mapping to lists of queries.
390    """
391    ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly.
392    if self.flavor == 'duckdb':
393        return {}
394    from meerschaum.utils.sql import (
395        sql_item_name,
396        get_distinct_col_count,
397        update_queries,
398        get_null_replacement,
399        get_create_table_queries,
400        get_rename_table_queries,
401        COALESCE_UNIQUE_INDEX_FLAVORS,
402    )
403    from meerschaum.utils.dtypes.sql import (
404        get_db_type_from_pd_type,
405        get_pd_type_from_db_type,
406        AUTO_INCREMENT_COLUMN_FLAVORS,
407    )
408    from meerschaum.config import get_config
409    index_queries = {}
410
411    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in update_queries
412    static = pipe.parameters.get('static', False)
413    index_names = pipe.get_indices()
414    indices = pipe.indices
415    existing_cols_types = pipe.get_columns_types(debug=debug)
416    existing_cols_pd_types = {
417        col: get_pd_type_from_db_type(typ)
418        for col, typ in existing_cols_types.items()
419    }
420    existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug)
421    existing_ix_names = set()
422    existing_primary_keys = []
423    for col, col_indices in existing_cols_indices.items():
424        for col_ix_doc in col_indices:
425            existing_ix_names.add(col_ix_doc.get('name', None))
426            if col_ix_doc.get('type', None) == 'PRIMARY KEY':
427                existing_primary_keys.append(col)
428
429    _datetime = pipe.get_columns('datetime', error=False)
430    _datetime_name = (
431        sql_item_name(_datetime, self.flavor, None)
432        if _datetime is not None else None
433    )
434    _datetime_index_name = (
435        sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None)
436        if index_names.get('datetime', None)
437        else None
438    )
439    _id = pipe.get_columns('id', error=False)
440    _id_name = (
441        sql_item_name(_id, self.flavor, None)
442        if _id is not None
443        else None
444    )
445    primary_key = pipe.columns.get('primary', None)
446    primary_key_name = (
447        sql_item_name(primary_key, flavor=self.flavor, schema=None)
448        if primary_key
449        else None
450    )
451    autoincrement = (
452        pipe.parameters.get('autoincrement', False)
453        or (
454            primary_key is not None
455            and primary_key not in existing_cols_pd_types
456        )
457    )
458    primary_key_db_type = (
459        get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int'), self.flavor)
460        if primary_key
461        else None
462    )
463    primary_key_constraint_name = (
464        sql_item_name(f'pk_{pipe.target}', self.flavor, None)
465        if primary_key is not None
466        else None
467    )
468
469    _id_index_name = (
470        sql_item_name(index_names['id'], self.flavor, None)
471        if index_names.get('id', None)
472        else None
473    )
474    _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
475    _create_space_partition = get_config('system', 'experimental', 'space')
476
477    ### create datetime index
478    if _datetime is not None:
479        if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True):
480            _id_count = (
481                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
482                if (_id is not None and _create_space_partition) else None
483            )
484
485            chunk_interval = pipe.get_chunk_interval(debug=debug)
486            chunk_interval_minutes = (
487                chunk_interval
488                if isinstance(chunk_interval, int)
489                else int(chunk_interval.total_seconds() / 60)
490            )
491            chunk_time_interval = (
492                f"INTERVAL '{chunk_interval_minutes} MINUTES'"
493                if isinstance(chunk_interval, timedelta)
494                else f'{chunk_interval_minutes}'
495            )
496
497            dt_query = (
498                f"SELECT public.create_hypertable('{_pipe_name}', " +
499                f"'{_datetime}', "
500                + (
501                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
502                    else ''
503                )
504                + f'chunk_time_interval => {chunk_time_interval}, '
505                + 'if_not_exists => true, '
506                + "migrate_data => true);"
507            )
508        elif self.flavor == 'mssql':
509            dt_query = (
510                "CREATE "
511                + ("CLUSTERED " if not primary_key else '')
512                + f"INDEX {_datetime_index_name} "
513                + f"ON {_pipe_name} ({_datetime_name})"
514            )
515        else: ### mssql, sqlite, etc.
516            dt_query = (
517                f"CREATE INDEX {_datetime_index_name} "
518                + f"ON {_pipe_name} ({_datetime_name})"
519            )
520
521        index_queries[_datetime] = [dt_query]
522
523    primary_queries = []
524    if (
525        primary_key is not None
526        and primary_key not in existing_primary_keys
527        and not static
528    ):
529        if autoincrement and primary_key not in existing_cols_pd_types:
530            autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get(
531                self.flavor,
532                AUTO_INCREMENT_COLUMN_FLAVORS['default']
533            )
534            primary_queries.extend([
535                (
536                    f"ALTER TABLE {_pipe_name}\n"
537                    f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}"
538                ),
539            ])
540        elif not autoincrement and primary_key in existing_cols_pd_types:
541            if self.flavor == 'sqlite':
542                new_table_name = sql_item_name(
543                    f'_new_{pipe.target}',
544                    self.flavor,
545                    self.get_pipe_schema(pipe)
546                )
547                select_cols_str = ', '.join(
548                    [
549                        sql_item_name(col, self.flavor, None)
550                        for col in existing_cols_types
551                    ]
552                )
553                primary_queries.extend(
554                    get_create_table_queries(
555                        existing_cols_pd_types,
556                        f'_new_{pipe.target}',
557                        self.flavor,
558                        schema=self.get_pipe_schema(pipe),
559                        primary_key=primary_key,
560                    ) + [
561                        (
562                            f"INSERT INTO {new_table_name} ({select_cols_str})\n"
563                            f"SELECT {select_cols_str}\nFROM {_pipe_name}"
564                        ),
565                        f"DROP TABLE {_pipe_name}",
566                    ] + get_rename_table_queries(
567                        f'_new_{pipe.target}',
568                        pipe.target,
569                        self.flavor,
570                        schema=self.get_pipe_schema(pipe),
571                    )
572                )
573            elif self.flavor == 'oracle':
574                primary_queries.extend([
575                    (
576                        f"ALTER TABLE {_pipe_name}\n"
577                        f"MODIFY {primary_key_name} NOT NULL"
578                    ),
579                    (
580                        f"ALTER TABLE {_pipe_name}\n"
581                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
582                    )
583                ])
584            elif self.flavor in ('mysql', 'mariadb'):
585                primary_queries.extend([
586                    (
587                        f"ALTER TABLE {_pipe_name}\n"
588                        f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL"
589                    ),
590                    (
591                        f"ALTER TABLE {_pipe_name}\n"
592                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
593                    )
594                ])
595            elif self.flavor == 'timescaledb':
596                primary_queries.extend([
597                    (
598                        f"ALTER TABLE {_pipe_name}\n"
599                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
600                    ),
601                    (
602                        f"ALTER TABLE {_pipe_name}\n"
603                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + (
604                            f"{_datetime_name}, " if _datetime_name else ""
605                        ) + f"{primary_key_name})"
606                    ),
607                ])
608            elif self.flavor in ('citus', 'postgresql', 'duckdb'):
609                primary_queries.extend([
610                    (
611                        f"ALTER TABLE {_pipe_name}\n"
612                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
613                    ),
614                    (
615                        f"ALTER TABLE {_pipe_name}\n"
616                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
617                    ),
618                ])
619            else:
620                primary_queries.extend([
621                    (
622                        f"ALTER TABLE {_pipe_name}\n"
623                        f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL"
624                    ),
625                    (
626                        f"ALTER TABLE {_pipe_name}\n"
627                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
628                    ),
629                ])
630        index_queries[primary_key] = primary_queries
631
632    ### create id index
633    if _id_name is not None:
634        if self.flavor == 'timescaledb':
635            ### Already created indices via create_hypertable.
636            id_query = (
637                None if (_id is not None and _create_space_partition)
638                else (
639                    f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})"
640                    if _id is not None
641                    else None
642                )
643            )
644            pass
645        else: ### mssql, sqlite, etc.
646            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"
647
648        if id_query is not None:
649            index_queries[_id] = id_query if isinstance(id_query, list) else [id_query]
650
651    ### Create indices for other labels in `pipe.columns`.
652    other_index_names = {
653        ix_key: ix_unquoted
654        for ix_key, ix_unquoted in index_names.items()
655        if ix_key not in ('datetime', 'id', 'primary') and ix_unquoted not in existing_ix_names
656    }
657    for ix_key, ix_unquoted in other_index_names.items():
658        ix_name = sql_item_name(ix_unquoted, self.flavor, None)
659        cols = indices[ix_key]
660        if not isinstance(cols, (list, tuple)):
661            cols = [cols]
662        cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col]
663        if not cols_names:
664            continue
665        cols_names_str = ", ".join(cols_names)
666        index_queries[ix_key] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({cols_names_str})"]
667
668    indices_cols_str = ', '.join(
669        list({
670            sql_item_name(ix, self.flavor)
671            for ix_key, ix in pipe.columns.items()
672            if ix and ix in existing_cols_types
673        })
674    )
675    coalesce_indices_cols_str = ', '.join(
676        [
677            (
678                "COALESCE("
679                + sql_item_name(ix, self.flavor)
680                + ", "
681                + get_null_replacement(existing_cols_types[ix], self.flavor)
682                + ") "
683            ) if ix_key != 'datetime' else (sql_item_name(ix, self.flavor))
684            for ix_key, ix in pipe.columns.items()
685            if ix and ix in existing_cols_types
686        ]
687    )
688    unique_index_name = sql_item_name(pipe.target + '_unique_index', self.flavor)
689    constraint_name = sql_item_name(pipe.target + '_constraint', self.flavor)
690    add_constraint_query = (
691        f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})"
692    )
693    unique_index_cols_str = (
694        indices_cols_str
695        if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS
696        else coalesce_indices_cols_str
697    )
698    create_unique_index_query = (
699        f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})"
700    )
701    constraint_queries = [create_unique_index_query]
702    if self.flavor != 'sqlite':
703        constraint_queries.append(add_constraint_query)
704    if upsert and indices_cols_str:
705        index_queries[unique_index_name] = constraint_queries
706    return index_queries

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of index names mapping to lists of queries.
def get_drop_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
709def get_drop_index_queries(
710    self,
711    pipe: mrsm.Pipe,
712    debug: bool = False,
713) -> Dict[str, List[str]]:
714    """
715    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.
716
717    Parameters
718    ----------
719    pipe: mrsm.Pipe
720        The pipe to which the queries will correspond.
721
722    Returns
723    -------
724    A dictionary of column names mapping to lists of queries.
725    """
726    ### NOTE: Due to breaking changes within DuckDB, indices must be skipped.
727    if self.flavor == 'duckdb':
728        return {}
729    if not pipe.exists(debug=debug):
730        return {}
731    from meerschaum.utils.sql import (
732        sql_item_name,
733        table_exists,
734        hypertable_queries,
735        DROP_IF_EXISTS_FLAVORS,
736    )
737    drop_queries = {}
738    schema = self.get_pipe_schema(pipe)
739    schema_prefix = (schema + '_') if schema else ''
740    indices = {
741        col: schema_prefix + ix
742        for col, ix in pipe.get_indices().items()
743    }
744    pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
745    pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None)
746
747    if self.flavor not in hypertable_queries:
748        is_hypertable = False
749    else:
750        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
751        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None
752
753    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
754    if is_hypertable:
755        nuke_queries = []
756        temp_table = '_' + pipe.target + '_temp_migration'
757        temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe))
758
759        if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug):
760            nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}")
761        nuke_queries += [
762            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
763            f"DROP TABLE {if_exists_str} {pipe_name}",
764            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}",
765        ]
766        nuke_ix_keys = ('datetime', 'id')
767        nuked = False
768        for ix_key in nuke_ix_keys:
769            if ix_key in indices and not nuked:
770                drop_queries[ix_key] = nuke_queries
771                nuked = True
772
773    drop_queries.update({
774        ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor, None)]
775        for ix_key, ix_unquoted in indices.items()
776        if ix_key not in drop_queries
777    })
778    return drop_queries

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of column names mapping to lists of queries.
def get_add_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', _is_db_types: bool = False, debug: bool = False) -> List[str]:
2827def get_add_columns_queries(
2828    self,
2829    pipe: mrsm.Pipe,
2830    df: Union[pd.DataFrame, Dict[str, str]],
2831    _is_db_types: bool = False,
2832    debug: bool = False,
2833) -> List[str]:
2834    """
2835    Add new null columns of the correct type to a table from a dataframe.
2836
2837    Parameters
2838    ----------
2839    pipe: mrsm.Pipe
2840        The pipe to be altered.
2841
2842    df: Union[pd.DataFrame, Dict[str, str]]
2843        The pandas DataFrame which contains new columns.
2844        If a dictionary is provided, assume it maps columns to Pandas data types.
2845
2846    _is_db_types: bool, default False
2847        If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes.
2848
2849    Returns
2850    -------
2851    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
2852    """
2853    if not pipe.exists(debug=debug):
2854        return []
2855
2856    if pipe.parameters.get('static', False):
2857        return []
2858
2859    from decimal import Decimal
2860    import copy
2861    from meerschaum.utils.sql import (
2862        sql_item_name,
2863        SINGLE_ALTER_TABLE_FLAVORS,
2864        get_table_cols_types,
2865    )
2866    from meerschaum.utils.dtypes.sql import (
2867        get_pd_type_from_db_type,
2868        get_db_type_from_pd_type,
2869    )
2870    from meerschaum.utils.misc import flatten_list
2871    table_obj = self.get_pipe_table(pipe, debug=debug)
2872    is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False
2873    if is_dask:
2874        df = df.partitions[0].compute()
2875    df_cols_types = (
2876        {
2877            col: str(typ)
2878            for col, typ in df.dtypes.items()
2879        }
2880        if not isinstance(df, dict)
2881        else copy.deepcopy(df)
2882    )
2883    if not isinstance(df, dict) and len(df.index) > 0:
2884        for col, typ in list(df_cols_types.items()):
2885            if typ != 'object':
2886                continue
2887            val = df.iloc[0][col]
2888            if isinstance(val, (dict, list)):
2889                df_cols_types[col] = 'json'
2890            elif isinstance(val, Decimal):
2891                df_cols_types[col] = 'numeric'
2892            elif isinstance(val, str):
2893                df_cols_types[col] = 'str'
2894    db_cols_types = {
2895        col: get_pd_type_from_db_type(str(typ.type))
2896        for col, typ in table_obj.columns.items()
2897    } if table_obj is not None else {
2898        col: get_pd_type_from_db_type(typ)
2899        for col, typ in get_table_cols_types(
2900            pipe.target,
2901            self,
2902            schema=self.get_pipe_schema(pipe),
2903            debug=debug,
2904        ).items()
2905    }
2906    new_cols = set(df_cols_types) - set(db_cols_types)
2907    if not new_cols:
2908        return []
2909
2910    new_cols_types = {
2911        col: get_db_type_from_pd_type(
2912            df_cols_types[col],
2913            self.flavor
2914        ) for col in new_cols
2915    }
2916
2917    alter_table_query = "ALTER TABLE " + sql_item_name(
2918        pipe.target, self.flavor, self.get_pipe_schema(pipe)
2919    )
2920    queries = []
2921    for col, typ in new_cols_types.items():
2922        add_col_query = (
2923            "\nADD "
2924            + sql_item_name(col, self.flavor, None)
2925            + " " + typ + ","
2926        )
2927
2928        if self.flavor in SINGLE_ALTER_TABLE_FLAVORS:
2929            queries.append(alter_table_query + add_col_query[:-1])
2930        else:
2931            alter_table_query += add_col_query
2932
2933    ### For most flavors, only one query is required.
2934    ### This covers SQLite which requires one query per column.
2935    if not queries:
2936        queries.append(alter_table_query[:-1])
2937
2938    if self.flavor != 'duckdb':
2939        return queries
2940
2941    ### NOTE: For DuckDB, we must drop and rebuild the indices.
2942    drop_index_queries = list(flatten_list(
2943        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
2944    ))
2945    create_index_queries = list(flatten_list(
2946        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
2947    ))
2948
2949    return drop_index_queries + queries + create_index_queries

Add new null columns of the correct type to a table from a dataframe.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
  • _is_db_types (bool, default False): If True, assume df is a dictionary mapping columns to SQL native dtypes.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def get_alter_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', debug: bool = False) -> List[str]:
2952def get_alter_columns_queries(
2953    self,
2954    pipe: mrsm.Pipe,
2955    df: Union[pd.DataFrame, Dict[str, str]],
2956    debug: bool = False,
2957) -> List[str]:
2958    """
2959    If we encounter a column of a different type, set the entire column to text.
2960    If the altered columns are numeric, alter to numeric instead.
2961
2962    Parameters
2963    ----------
2964    pipe: mrsm.Pipe
2965        The pipe to be altered.
2966
2967    df: Union[pd.DataFrame, Dict[str, str]]
2968        The pandas DataFrame which may contain altered columns.
2969        If a dict is provided, assume it maps columns to Pandas data types.
2970
2971    Returns
2972    -------
2973    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
2974    """
2975    if not pipe.exists(debug=debug):
2976        return []
2977    if pipe.static:
2978        return
2979    from meerschaum.utils.sql import sql_item_name, DROP_IF_EXISTS_FLAVORS, get_table_cols_types
2980    from meerschaum.utils.dataframe import get_numeric_cols
2981    from meerschaum.utils.dtypes import are_dtypes_equal
2982    from meerschaum.utils.dtypes.sql import (
2983        get_pd_type_from_db_type,
2984        get_db_type_from_pd_type,
2985    )
2986    from meerschaum.utils.misc import flatten_list, generate_password, items_str
2987    table_obj = self.get_pipe_table(pipe, debug=debug)
2988    target = pipe.target
2989    session_id = generate_password(3)
2990    numeric_cols = (
2991        get_numeric_cols(df)
2992        if not isinstance(df, dict)
2993        else [
2994            col
2995            for col, typ in df.items()
2996            if typ == 'numeric'
2997        ]
2998    )
2999    df_cols_types = (
3000        {
3001            col: str(typ)
3002            for col, typ in df.dtypes.items()
3003        }
3004        if not isinstance(df, dict)
3005        else df
3006    )
3007    db_cols_types = {
3008        col: get_pd_type_from_db_type(str(typ.type))
3009        for col, typ in table_obj.columns.items()
3010    } if table_obj is not None else {
3011        col: get_pd_type_from_db_type(typ)
3012        for col, typ in get_table_cols_types(
3013            pipe.target,
3014            self,
3015            schema=self.get_pipe_schema(pipe),
3016            debug=debug,
3017        ).items()
3018    }
3019    pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')]
3020    pd_db_df_aliases = {
3021        'int': 'bool',
3022        'float': 'bool',
3023        'numeric': 'bool',
3024        'guid': 'object',
3025    }
3026    if self.flavor == 'oracle':
3027        pd_db_df_aliases['int'] = 'numeric'
3028
3029    altered_cols = {
3030        col: (db_cols_types.get(col, 'object'), typ)
3031        for col, typ in df_cols_types.items()
3032        if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower())
3033        and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string')
3034    }
3035
3036    ### NOTE: Sometimes bools are coerced into ints or floats.
3037    altered_cols_to_ignore = set()
3038    for col, (db_typ, df_typ) in altered_cols.items():
3039        for db_alias, df_alias in pd_db_df_aliases.items():
3040            if db_alias in db_typ.lower() and df_alias in df_typ.lower():
3041                altered_cols_to_ignore.add(col)
3042
3043    ### Oracle's bool handling sometimes mixes NUMBER and INT.
3044    for bool_col in pipe_bool_cols:
3045        if bool_col not in altered_cols:
3046            continue
3047        db_is_bool_compatible = (
3048            are_dtypes_equal('int', altered_cols[bool_col][0])
3049            or are_dtypes_equal('float', altered_cols[bool_col][0])
3050            or are_dtypes_equal('numeric', altered_cols[bool_col][0])
3051            or are_dtypes_equal('bool', altered_cols[bool_col][0])
3052        )
3053        df_is_bool_compatible = (
3054            are_dtypes_equal('int', altered_cols[bool_col][1])
3055            or are_dtypes_equal('float', altered_cols[bool_col][1])
3056            or are_dtypes_equal('numeric', altered_cols[bool_col][1])
3057            or are_dtypes_equal('bool', altered_cols[bool_col][1])
3058        )
3059        if db_is_bool_compatible and df_is_bool_compatible:
3060            altered_cols_to_ignore.add(bool_col)
3061
3062    for col in altered_cols_to_ignore:
3063        _ = altered_cols.pop(col, None)
3064    if not altered_cols:
3065        return []
3066
3067    if numeric_cols:
3068        pipe.dtypes.update({col: 'numeric' for col in numeric_cols})
3069        edit_success, edit_msg = pipe.edit(debug=debug)
3070        if not edit_success:
3071            warn(
3072                f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n"
3073                + f"{edit_msg}"
3074            )
3075    else:
3076        numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ == 'numeric'])
3077
3078    numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False)
3079    text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False)
3080    altered_cols_types = {
3081        col: (
3082            numeric_type
3083            if col in numeric_cols
3084            else text_type
3085        )
3086        for col, (db_typ, typ) in altered_cols.items()
3087    }
3088
3089    if self.flavor == 'sqlite':
3090        temp_table_name = '-' + session_id + '_' + target
3091        rename_query = (
3092            "ALTER TABLE "
3093            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3094            + " RENAME TO "
3095            + sql_item_name(temp_table_name, self.flavor, None)
3096        )
3097        create_query = (
3098            "CREATE TABLE "
3099            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3100            + " (\n"
3101        )
3102        for col_name, col_obj in table_obj.columns.items():
3103            create_query += (
3104                sql_item_name(col_name, self.flavor, None)
3105                + " "
3106                + (
3107                    str(col_obj.type)
3108                    if col_name not in altered_cols
3109                    else altered_cols_types[col_name]
3110                )
3111                + ",\n"
3112            )
3113        create_query = create_query[:-2] + "\n)"
3114
3115        insert_query = (
3116            "INSERT INTO "
3117            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3118            + ' ('
3119            + ', '.join([
3120                sql_item_name(col_name, self.flavor, None)
3121                for col_name, _ in table_obj.columns.items()
3122            ])
3123            + ')'
3124            + "\nSELECT\n"
3125        )
3126        for col_name, col_obj in table_obj.columns.items():
3127            new_col_str = (
3128                sql_item_name(col_name, self.flavor, None)
3129                if col_name not in altered_cols
3130                else (
3131                    "CAST("
3132                    + sql_item_name(col_name, self.flavor, None)
3133                    + " AS "
3134                    + altered_cols_types[col_name]
3135                    + ")"
3136                )
3137            )
3138            insert_query += new_col_str + ",\n"
3139        insert_query = insert_query[:-2] + (
3140            f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}"
3141        )
3142
3143        if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
3144
3145        drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name(
3146            temp_table_name, self.flavor, self.get_pipe_schema(pipe)
3147        )
3148        return [
3149            rename_query,
3150            create_query,
3151            insert_query,
3152            drop_query,
3153        ]
3154
3155    queries = []
3156    if self.flavor == 'oracle':
3157        for col, typ in altered_cols_types.items():
3158            add_query = (
3159                "ALTER TABLE "
3160                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3161                + "\nADD " + sql_item_name(col + '_temp', self.flavor, None)
3162                + " " + typ
3163            )
3164            queries.append(add_query)
3165
3166        for col, typ in altered_cols_types.items():
3167            populate_temp_query = (
3168                "UPDATE "
3169                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3170                + "\nSET " + sql_item_name(col + '_temp', self.flavor, None)
3171                + ' = ' + sql_item_name(col, self.flavor, None)
3172            )
3173            queries.append(populate_temp_query)
3174
3175        for col, typ in altered_cols_types.items():
3176            set_old_cols_to_null_query = (
3177                "UPDATE "
3178                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3179                + "\nSET " + sql_item_name(col, self.flavor, None)
3180                + ' = NULL'
3181            )
3182            queries.append(set_old_cols_to_null_query)
3183
3184        for col, typ in altered_cols_types.items():
3185            alter_type_query = (
3186                "ALTER TABLE "
3187                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3188                + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' '
3189                + typ
3190            )
3191            queries.append(alter_type_query)
3192
3193        for col, typ in altered_cols_types.items():
3194            set_old_to_temp_query = (
3195                "UPDATE "
3196                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3197                + "\nSET " + sql_item_name(col, self.flavor, None)
3198                + ' = ' + sql_item_name(col + '_temp', self.flavor, None)
3199            )
3200            queries.append(set_old_to_temp_query)
3201
3202        for col, typ in altered_cols_types.items():
3203            drop_temp_query = (
3204                "ALTER TABLE "
3205                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3206                + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None)
3207            )
3208            queries.append(drop_temp_query)
3209
3210        return queries
3211
3212    query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
3213    for col, typ in altered_cols_types.items():
3214        alter_col_prefix = (
3215            'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle')
3216            else 'MODIFY'
3217        )
3218        type_prefix = (
3219            '' if self.flavor in ('mssql', 'mariadb', 'mysql')
3220            else 'TYPE '
3221        )
3222        column_str = 'COLUMN' if self.flavor != 'oracle' else ''
3223        query += (
3224            f"\n{alter_col_prefix} {column_str} "
3225            + sql_item_name(col, self.flavor, None)
3226            + " " + type_prefix + typ + ","
3227        )
3228
3229    query = query[:-1]
3230    queries.append(query)
3231    if self.flavor != 'duckdb':
3232        return queries
3233
3234    drop_index_queries = list(flatten_list(
3235        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
3236    ))
3237    create_index_queries = list(flatten_list(
3238        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
3239    ))
3240
3241    return drop_index_queries + queries + create_index_queries

If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def delete_pipe( self, pipe: meerschaum.Pipe, debug: bool = False) -> Tuple[bool, str]:
781def delete_pipe(
782    self,
783    pipe: mrsm.Pipe,
784    debug: bool = False,
785) -> SuccessTuple:
786    """
787    Delete a Pipe's registration.
788    """
789    from meerschaum.utils.sql import sql_item_name
790    from meerschaum.utils.debug import dprint
791    from meerschaum.utils.packages import attempt_import
792    sqlalchemy = attempt_import('sqlalchemy')
793
794    if not pipe.id:
795        return False, f"{pipe} is not registered."
796
797    ### ensure pipes table exists
798    from meerschaum.connectors.sql.tables import get_tables
799    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
800
801    q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
802    if not self.exec(q, debug=debug):
803        return False, f"Failed to delete registration for {pipe}."
804
805    return True, "Success"

Delete a Pipe's registration.

def get_pipe_data( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, str, NoneType] = None, end: Union[datetime.datetime, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any) -> 'Union[pd.DataFrame, None]':
 808def get_pipe_data(
 809    self,
 810    pipe: mrsm.Pipe,
 811    select_columns: Optional[List[str]] = None,
 812    omit_columns: Optional[List[str]] = None,
 813    begin: Union[datetime, str, None] = None,
 814    end: Union[datetime, str, None] = None,
 815    params: Optional[Dict[str, Any]] = None,
 816    order: str = 'asc',
 817    limit: Optional[int] = None,
 818    begin_add_minutes: int = 0,
 819    end_add_minutes: int = 0,
 820    debug: bool = False,
 821    **kw: Any
 822) -> Union[pd.DataFrame, None]:
 823    """
 824    Access a pipe's data from the SQL instance.
 825
 826    Parameters
 827    ----------
 828    pipe: mrsm.Pipe:
 829        The pipe to get data from.
 830
 831    select_columns: Optional[List[str]], default None
 832        If provided, only select these given columns.
 833        Otherwise select all available columns (i.e. `SELECT *`).
 834
 835    omit_columns: Optional[List[str]], default None
 836        If provided, remove these columns from the selection.
 837
 838    begin: Union[datetime, str, None], default None
 839        If provided, get rows newer than or equal to this value.
 840
 841    end: Union[datetime, str, None], default None
 842        If provided, get rows older than or equal to this value.
 843
 844    params: Optional[Dict[str, Any]], default None
 845        Additional parameters to filter by.
 846        See `meerschaum.connectors.sql.build_where`.
 847
 848    order: Optional[str], default 'asc'
 849        The selection order for all of the indices in the query.
 850        If `None`, omit the `ORDER BY` clause.
 851
 852    limit: Optional[int], default None
 853        If specified, limit the number of rows retrieved to this value.
 854
 855    begin_add_minutes: int, default 0
 856        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`.
 857
 858    end_add_minutes: int, default 0
 859        The number of minutes to add to the `end` datetime (i.e. `DATEADD`.
 860
 861    chunksize: Optional[int], default -1
 862        The size of dataframe chunks to load into memory.
 863
 864    debug: bool, default False
 865        Verbosity toggle.
 866
 867    Returns
 868    -------
 869    A `pd.DataFrame` of the pipe's data.
 870
 871    """
 872    import json
 873    from meerschaum.utils.sql import sql_item_name
 874    from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype
 875    from meerschaum.utils.packages import import_pandas
 876    from meerschaum.utils.dtypes import (
 877        attempt_cast_to_numeric,
 878        attempt_cast_to_uuid,
 879        are_dtypes_equal,
 880    )
 881    from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
 882    pd = import_pandas()
 883    is_dask = 'dask' in pd.__name__
 884
 885    cols_types = pipe.get_columns_types(debug=debug)
 886    dtypes = {
 887        **{
 888            p_col: to_pandas_dtype(p_typ)
 889            for p_col, p_typ in pipe.dtypes.items()
 890        },
 891        **{
 892            col: get_pd_type_from_db_type(typ)
 893            for col, typ in cols_types.items()
 894        }
 895    }
 896    if dtypes:
 897        if self.flavor == 'sqlite':
 898            if not pipe.columns.get('datetime', None):
 899                _dt = pipe.guess_datetime()
 900                dt = sql_item_name(_dt, self.flavor, None) if _dt else None
 901                is_guess = True
 902            else:
 903                _dt = pipe.get_columns('datetime')
 904                dt = sql_item_name(_dt, self.flavor, None)
 905                is_guess = False
 906
 907            if _dt:
 908                dt_type = dtypes.get(_dt, 'object').lower()
 909                if 'datetime' not in dt_type:
 910                    if 'int' not in dt_type:
 911                        dtypes[_dt] = 'datetime64[ns, UTC]'
 912    existing_cols = pipe.get_columns_types(debug=debug)
 913    select_columns = (
 914        [
 915            col
 916            for col in existing_cols
 917            if col not in (omit_columns or [])
 918        ]
 919        if not select_columns
 920        else [
 921            col
 922            for col in select_columns
 923            if col in existing_cols
 924            and col not in (omit_columns or [])
 925        ]
 926    )
 927    if select_columns:
 928        dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns}
 929    dtypes = {
 930        col: to_pandas_dtype(typ)
 931        for col, typ in dtypes.items()
 932        if col in select_columns and col not in (omit_columns or [])
 933    }
 934    query = self.get_pipe_data_query(
 935        pipe,
 936        select_columns=select_columns,
 937        omit_columns=omit_columns,
 938        begin=begin,
 939        end=end,
 940        params=params,
 941        order=order,
 942        limit=limit,
 943        begin_add_minutes=begin_add_minutes,
 944        end_add_minutes=end_add_minutes,
 945        debug=debug,
 946        **kw
 947    )
 948
 949    if is_dask:
 950        index_col = pipe.columns.get('datetime', None)
 951        kw['index_col'] = index_col
 952
 953    numeric_columns = [
 954        col
 955        for col, typ in pipe.dtypes.items()
 956        if typ == 'numeric' and col in dtypes
 957    ]
 958    uuid_columns = [
 959        col
 960        for col, typ in pipe.dtypes.items()
 961        if typ == 'uuid' and col in dtypes
 962    ]
 963
 964    kw['coerce_float'] = kw.get('coerce_float', (len(numeric_columns) == 0))
 965
 966    df = self.read(
 967        query,
 968        dtype=dtypes,
 969        debug=debug,
 970        **kw
 971    )
 972    for col in numeric_columns:
 973        if col not in df.columns:
 974            continue
 975        df[col] = df[col].apply(attempt_cast_to_numeric)
 976
 977    for col in uuid_columns:
 978        if col not in df.columns:
 979            continue
 980        df[col] = df[col].apply(attempt_cast_to_uuid)
 981
 982    if self.flavor == 'sqlite':
 983        ignore_dt_cols = [
 984            col
 985            for col, dtype in pipe.dtypes.items()
 986            if not are_dtypes_equal(str(dtype), 'datetime')
 987        ]
 988        ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly
 989        df = (
 990            parse_df_datetimes(
 991                df,
 992                ignore_cols=ignore_dt_cols,
 993                chunksize=kw.get('chunksize', None),
 994                strip_timezone=(pipe.tzinfo is None),
 995                debug=debug,
 996            ) if isinstance(df, pd.DataFrame) else (
 997                [
 998                    parse_df_datetimes(
 999                        c,
1000                        ignore_cols=ignore_dt_cols,
1001                        chunksize=kw.get('chunksize', None),
1002                        strip_timezone=(pipe.tzinfo is None),
1003                        debug=debug,
1004                    )
1005                    for c in df
1006                ]
1007            )
1008        )
1009        for col, typ in dtypes.items():
1010            if typ != 'json':
1011                continue
1012            df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x)
1013    return df

Access a pipe's data from the SQL instance.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get data from.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
  • end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
  • params (Optional[Dict[str, Any]], default None): Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
  • order (Optional[str], default 'asc'): The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
  • limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
  • begin_add_minutes (int, default 0): The number of minutes to add to the begin datetime (i.e. DATEADD.
  • end_add_minutes (int, default 0): The number of minutes to add to the end datetime (i.e. DATEADD.
  • chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the pipe's data.
def get_pipe_data_query( self, pipe: meerschaum.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: Optional[str] = 'asc', sort_datetimes: bool = False, limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, skip_existing_cols_check: bool = False, debug: bool = False, **kw: Any) -> Optional[str]:
1016def get_pipe_data_query(
1017    self,
1018    pipe: mrsm.Pipe,
1019    select_columns: Optional[List[str]] = None,
1020    omit_columns: Optional[List[str]] = None,
1021    begin: Union[datetime, int, str, None] = None,
1022    end: Union[datetime, int, str, None] = None,
1023    params: Optional[Dict[str, Any]] = None,
1024    order: Optional[str] = 'asc',
1025    sort_datetimes: bool = False,
1026    limit: Optional[int] = None,
1027    begin_add_minutes: int = 0,
1028    end_add_minutes: int = 0,
1029    replace_nulls: Optional[str] = None,
1030    skip_existing_cols_check: bool = False,
1031    debug: bool = False,
1032    **kw: Any
1033) -> Union[str, None]:
1034    """
1035    Return the `SELECT` query for retrieving a pipe's data from its instance.
1036
1037    Parameters
1038    ----------
1039    pipe: mrsm.Pipe:
1040        The pipe to get data from.
1041
1042    select_columns: Optional[List[str]], default None
1043        If provided, only select these given columns.
1044        Otherwise select all available columns (i.e. `SELECT *`).
1045
1046    omit_columns: Optional[List[str]], default None
1047        If provided, remove these columns from the selection.
1048
1049    begin: Union[datetime, int, str, None], default None
1050        If provided, get rows newer than or equal to this value.
1051
1052    end: Union[datetime, str, None], default None
1053        If provided, get rows older than or equal to this value.
1054
1055    params: Optional[Dict[str, Any]], default None
1056        Additional parameters to filter by.
1057        See `meerschaum.connectors.sql.build_where`.
1058
1059    order: Optional[str], default None
1060        The selection order for all of the indices in the query.
1061        If `None`, omit the `ORDER BY` clause.
1062
1063    sort_datetimes: bool, default False
1064        Alias for `order='desc'`.
1065
1066    limit: Optional[int], default None
1067        If specified, limit the number of rows retrieved to this value.
1068
1069    begin_add_minutes: int, default 0
1070        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`).
1071
1072    end_add_minutes: int, default 0
1073        The number of minutes to add to the `end` datetime (i.e. `DATEADD`).
1074
1075    chunksize: Optional[int], default -1
1076        The size of dataframe chunks to load into memory.
1077
1078    replace_nulls: Optional[str], default None
1079        If provided, replace null values with this value.
1080
1081    skip_existing_cols_check: bool, default False
1082        If `True`, do not verify that querying columns are actually on the table.
1083
1084    debug: bool, default False
1085        Verbosity toggle.
1086
1087    Returns
1088    -------
1089    A `SELECT` query to retrieve a pipe's data.
1090    """
1091    from meerschaum.utils.misc import items_str
1092    from meerschaum.utils.sql import sql_item_name, dateadd_str
1093    from meerschaum.utils.dtypes import coerce_timezone
1094    from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type
1095
1096    dt_col = pipe.columns.get('datetime', None)
1097    existing_cols = pipe.get_columns_types(debug=debug)
1098    dt_typ = get_pd_type_from_db_type(existing_cols[dt_col]) if dt_col in existing_cols else None
1099    select_columns = (
1100        [col for col in existing_cols]
1101        if not select_columns
1102        else [col for col in select_columns if col in existing_cols or skip_existing_cols_check]
1103    )
1104    if omit_columns:
1105        select_columns = [col for col in select_columns if col not in omit_columns]
1106
1107    if order is None and sort_datetimes:
1108        order = 'desc'
1109
1110    if begin == '':
1111        begin = pipe.get_sync_time(debug=debug)
1112        backtrack_interval = pipe.get_backtrack_interval(debug=debug)
1113        if begin is not None:
1114            begin -= backtrack_interval
1115
1116    begin, end = pipe.parse_date_bounds(begin, end)
1117    if isinstance(begin, datetime) and dt_typ:
1118        begin = coerce_timezone(begin, strip_utc=('utc' not in dt_typ.lower()))
1119    if isinstance(end, datetime) and dt_typ:
1120        end = coerce_timezone(end, strip_utc=('utc' not in dt_typ.lower()))
1121
1122    cols_names = [
1123        sql_item_name(col, self.flavor, None)
1124        for col in select_columns
1125    ]
1126    select_cols_str = (
1127        'SELECT\n    '
1128        + ',\n    '.join(
1129            [
1130                (
1131                    col_name
1132                    if not replace_nulls
1133                    else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}"
1134                )
1135                for col_name in cols_names
1136            ]
1137        )
1138    ) if cols_names else 'SELECT *'
1139    pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
1140    query = f"{select_cols_str}\nFROM {pipe_table_name}"
1141    where = ""
1142
1143    if order is not None:
1144        default_order = 'asc'
1145        if order not in ('asc', 'desc'):
1146            warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.")
1147            order = default_order
1148        order = order.upper()
1149
1150    if not pipe.columns.get('datetime', None):
1151        _dt = pipe.guess_datetime()
1152        dt = sql_item_name(_dt, self.flavor, None) if _dt else None
1153        is_guess = True
1154    else:
1155        _dt = pipe.get_columns('datetime')
1156        dt = sql_item_name(_dt, self.flavor, None)
1157        is_guess = False
1158
1159    quoted_indices = {
1160        key: sql_item_name(val, self.flavor, None)
1161        for key, val in pipe.columns.items()
1162        if val in existing_cols or skip_existing_cols_check
1163    }
1164
1165    if begin is not None or end is not None:
1166        if is_guess:
1167            if _dt is None:
1168                warn(
1169                    f"No datetime could be determined for {pipe}."
1170                    + "\n    Ignoring begin and end...",
1171                    stack=False,
1172                )
1173                begin, end = None, None
1174            else:
1175                warn(
1176                    f"A datetime wasn't specified for {pipe}.\n"
1177