meerschaum.utils
The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6The utils module contains utility functions. 7These include tools from primary utilities (get_pipes) 8to miscellaneous helper functions. 9""" 10 11__all__ = ( 12 'daemon', 13 'dataframe', 14 'debug', 15 'dtypes', 16 'formatting', 17 'interactive', 18 'misc', 19 'networking', 20 'packages', 21 'pipes', 22 'pool', 23 'process', 24 'prompt', 25 'schedule', 26 'sql', 27 'threading', 28 'typing', 29 'venv', 30 'warnings', 31 'yaml', 32 "get_pipes", 33 "fetch_pipes_keys", 34) 35from meerschaum.utils._get_pipes import get_pipes, fetch_pipes_keys
def
get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, targets: Optional[List[str]] = None, datetime_dtypes: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, instance: Union[str, meerschaum.InstanceConnector, NoneType] = None, as_list: bool = False, as_tags_dict: bool = False, as_targets_dict: bool = False, method: str = 'registered', workers: Optional[int] = None, debug: bool = False, _cache_parameters: bool = True, **kw: Any) -> Union[Dict[str, Dict[str, Dict[Optional[str], meerschaum.Pipe]]], List[meerschaum.Pipe], Dict[str, meerschaum.Pipe]]:
29def get_pipes( 30 connector_keys: Union[str, List[str], None] = None, 31 metric_keys: Union[str, List[str], None] = None, 32 location_keys: Union[str, List[str], None] = None, 33 tags: Optional[List[str]] = None, 34 targets: Optional[List[str]] = None, 35 datetime_dtypes: Optional[List[str]] = None, 36 params: Optional[Dict[str, Any]] = None, 37 mrsm_instance: Union[str, InstanceConnector, None] = None, 38 instance: Union[str, InstanceConnector, None] = None, 39 as_list: bool = False, 40 as_tags_dict: bool = False, 41 as_targets_dict: bool = False, 42 method: str = 'registered', 43 workers: Optional[int] = None, 44 debug: bool = False, 45 _cache_parameters: bool = True, 46 **kw: Any 47) -> Union[PipesDict, List[mrsm.Pipe], Dict[str, mrsm.Pipe]]: 48 """ 49 Return a dictionary or list of `meerschaum.Pipe` objects. 50 51 Parameters 52 ---------- 53 connector_keys: Union[str, List[str], None], default None 54 String or list of connector keys. 55 If omitted or is `'*'`, fetch all possible keys. 56 If a string begins with `'_'`, select keys that do NOT match the string. 57 58 metric_keys: Union[str, List[str], None], default None 59 String or list of metric keys. See `connector_keys` for formatting. 60 61 location_keys: Union[str, List[str], None], default None 62 String or list of location keys. See `connector_keys` for formatting. 63 64 tags: Optional[List[str]], default None 65 If provided, only include pipes with these tags. 66 67 datetime_dtypes: Optional[List[str]], default None 68 If provided, only include pipes with the corresponding `datetime` axis dtypes. 69 Accepted values are `datetime`, `int`, `None` (or `null`, etc.). 70 May be negated by `_`. 71 72 params: Optional[Dict[str, Any]], default None 73 Dictionary of additional parameters to search by. 74 Params are parsed into a SQL WHERE clause. 75 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 76 77 mrsm_instance: Union[str, InstanceConnector, None], default None 78 Connector keys for the Meerschaum instance of the pipes. 79 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 80 `meerschaum.connectors.api.APIConnector.APIConnector`. 81 82 as_list: bool, default False 83 If `True`, return pipes in a list instead of a hierarchical dictionary. 84 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 85 `True` : `[Pipe]` 86 87 as_tags_dict: bool, default False 88 If `True`, return a dictionary mapping tags to pipes. 89 Pipes with multiple tags will be repeated. 90 91 as_targets_dict: bool, default False 92 If `True`, return a dictionary mapping `(schema, target)` tuples to pipes. 93 Pipes sharing the same target across different schemata are grouped separately. 94 95 method: str, default 'registered' 96 Available options: `['registered', 'explicit', 'all']` 97 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 98 (API or SQL connector, depends on mrsm_instance). 99 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 100 instead of consulting the pipes table. Useful for creating non-existent pipes. 101 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 102 **NOTE:** Method `'all'` is not implemented! 103 104 workers: Optional[int], default None 105 If provided (and `as_tags_dict` or `as_targets_dict` is `True`), set the number of workers 106 for the pool to fetch tags or targets. 107 Only takes effect if the instance connector supports multi-threading. 108 109 **kw: Any: 110 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 111 112 Returns 113 ------- 114 A dictionary of dictionaries and `meerschaum.Pipe` objects 115 in the connector, metric, location hierarchy. 116 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 117 If `as_tags_dict` is `True`, return a dictionary mapping tags to pipes. 118 If `as_targets_dict` is `True`, return a dictionary mapping targets to pipes. 119 120 Examples 121 -------- 122 ``` 123 >>> ### Manual definition: 124 >>> pipes = { 125 ... <connector_keys>: { 126 ... <metric_key>: { 127 ... <location_key>: Pipe( 128 ... <connector_keys>, 129 ... <metric_key>, 130 ... <location_key>, 131 ... ), 132 ... }, 133 ... }, 134 ... }, 135 >>> ### Accessing a single pipe: 136 >>> pipes['sql:main']['weather'][None] 137 >>> ### Return a list instead: 138 >>> get_pipes(as_list=True) 139 [Pipe('sql:main', 'weather')] 140 >>> get_pipes(as_tags_dict=True) 141 {'gvl': Pipe('sql:main', 'weather')} 142 ``` 143 """ 144 import json 145 from collections import defaultdict 146 from meerschaum.config import get_config 147 from meerschaum.config.static import STATIC_CONFIG 148 from meerschaum.utils.warnings import error 149 from meerschaum.utils.misc import filter_keywords, separate_negation_values 150 from meerschaum.utils.pool import get_pool 151 from meerschaum.utils.pipes import replace_pipes_syntax 152 from meerschaum.utils.debug import dprint 153 from meerschaum.utils.dtypes import value_is_null, get_current_timestamp 154 from meerschaum import Pipe 155 156 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 157 if datetime_dtypes: 158 if isinstance(datetime_dtypes, str): 159 datetime_dtypes = [datetime_dtypes] 160 for _dt in datetime_dtypes: 161 _clean = str(_dt).lstrip(negation_prefix).lower() 162 if _clean not in ('datetime', 'int') and not value_is_null(_clean): 163 error(f"Invalid datetime dtype '{_dt}'.") 164 165 if connector_keys is None: 166 connector_keys = [] 167 if metric_keys is None: 168 metric_keys = [] 169 if location_keys is None: 170 location_keys = [] 171 if params is None: 172 params = {} 173 if tags is None: 174 tags = [] 175 176 if isinstance(connector_keys, str): 177 connector_keys = [connector_keys] 178 if isinstance(metric_keys, str): 179 metric_keys = [metric_keys] 180 if isinstance(location_keys, str): 181 location_keys = [location_keys] 182 183 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 184 if mrsm_instance is None: 185 mrsm_instance = instance 186 if mrsm_instance is None: 187 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 188 if isinstance(mrsm_instance, str): 189 from meerschaum.connectors.parse import parse_instance_keys 190 connector = parse_instance_keys(keys=mrsm_instance, debug=debug) 191 else: 192 from meerschaum.connectors import instance_types 193 valid_connector = False 194 if hasattr(mrsm_instance, 'type'): 195 if mrsm_instance.type in instance_types: 196 valid_connector = True 197 if not valid_connector: 198 error(f"Invalid instance connector: {mrsm_instance}") 199 connector = mrsm_instance 200 if debug: 201 dprint(f"Using instance connector: {connector}") 202 if not connector: 203 error(f"Could not create connector from keys: '{mrsm_instance}'") 204 205 ### Get a list of tuples for the keys needed to build pipes. 206 result = fetch_pipes_keys( 207 method, 208 connector, 209 connector_keys = connector_keys, 210 metric_keys = metric_keys, 211 location_keys = location_keys, 212 tags = tags, 213 params = params, 214 workers = workers, 215 debug = debug 216 ) 217 if result is None: 218 error("Unable to build pipes!") 219 result_items: List[Tuple] = ( 220 list(result.items()) 221 if isinstance(result, dict) 222 else [(None, keys_tuple) for keys_tuple in result] 223 ) 224 225 ### Populate the `pipes` dictionary with Pipes based on the keys 226 ### obtained from the chosen `method`. 227 in_dtypes, ex_dtypes = separate_negation_values(datetime_dtypes or []) 228 in_targets, ex_targets = separate_negation_values(targets or []) 229 pipes: PipesDict = {} 230 targets_pipes: Dict[Tuple[Optional[str], str], List[mrsm.Pipe]] = defaultdict(lambda: []) 231 connector_schema = getattr(connector, 'schema', None) 232 connector_is_sql = getattr(connector, 'type', None) == 'sql' 233 connector_flavor = getattr(connector, 'flavor', None) 234 for pipe_id, keys_tuple in result_items: 235 ck, mk, lk = keys_tuple[0], keys_tuple[1], keys_tuple[2] 236 pipe_tags_or_parameters = keys_tuple[3] if len(keys_tuple) == 4 else None 237 pipe_parameters = ( 238 pipe_tags_or_parameters 239 if isinstance(pipe_tags_or_parameters, (dict, str)) 240 else None 241 ) 242 if isinstance(pipe_parameters, str): 243 pipe_parameters = json.loads(pipe_parameters) 244 pipe_tags = ( 245 pipe_tags_or_parameters 246 if isinstance(pipe_tags_or_parameters, list) 247 else ( 248 pipe_tags_or_parameters.get('tags', []) 249 if isinstance(pipe_tags_or_parameters, dict) 250 else None 251 ) 252 ) 253 254 pipe = Pipe( 255 ck, mk, lk, 256 mrsm_instance = connector, 257 parameters = pipe_parameters, 258 tags = pipe_tags, 259 debug = debug, 260 **filter_keywords(Pipe, **kw) 261 ) 262 pipe.__dict__['_tags'] = pipe_tags 263 if pipe_id is not None: 264 pipe._cache_value('_id', pipe_id, memory_only=True, debug=debug) 265 if pipe_parameters is not None: 266 now = get_current_timestamp('ms', as_int=True) / 1000 267 full_attributes = { 268 'connector_keys': ck, 269 'metric_key': mk, 270 'location_key': lk, 271 'parameters': pipe_parameters, 272 } 273 if pipe_id is not None: 274 full_attributes['pipe_id'] = pipe_id 275 pipe._cache_value('attributes', full_attributes, memory_only=True, debug=debug) 276 pipe._cache_value('_attributes_sync_time', now, memory_only=True, debug=debug) 277 278 if datetime_dtypes or targets: 279 parameters_str = str(pipe_parameters) 280 if pipe_parameters is None or 'MRSM{' in parameters_str or 'Pipe(' in parameters_str: 281 pipe_parameters = pipe.get_parameters(debug=debug) 282 283 keep_pipe = True 284 285 if datetime_dtypes: 286 columns_val = (pipe_parameters or {}).get('columns', {}) or {} 287 dt_col = columns_val.get('datetime', None) 288 pipe_dtypes = ( 289 ((pipe_parameters or {}).get('dtypes', None) or {}) 290 if dt_col 291 else None 292 ) 293 dt_typ = pipe_dtypes.get(dt_col, None) if dt_col else None 294 295 def _dtype_matches(clean_d): 296 if not dt_col: 297 return value_is_null(clean_d) 298 return ( 299 (clean_d == 'int' and 'int' in str(dt_typ).lower()) 300 or 301 (clean_d == 'datetime' and 'int' not in str(dt_typ).lower()) 302 ) 303 304 in_match = not in_dtypes or any(_dtype_matches(d) for d in in_dtypes) 305 ex_match = bool(ex_dtypes and any(_dtype_matches(d) for d in ex_dtypes)) 306 keep_pipe = keep_pipe and in_match and not ex_match 307 if not keep_pipe: 308 continue 309 310 if targets: 311 pipe_target = pipe.target 312 in_target_match = not in_targets or any(t == pipe_target for t in in_targets) 313 ex_target_match = bool(ex_targets and any(t == pipe_target for t in ex_targets)) 314 keep_pipe = keep_pipe and in_target_match and not ex_target_match 315 if not keep_pipe: 316 continue 317 318 if ck not in pipes: 319 pipes[ck] = {} 320 321 if mk not in pipes[ck]: 322 pipes[ck][mk] = {} 323 324 325 pipes[ck][mk][lk] = pipe 326 327 if as_targets_dict: 328 raw_params = pipe_parameters if isinstance(pipe_parameters, dict) else {} 329 schema = raw_params.get('schema') or connector_schema 330 explicit_target = ( 331 raw_params.get('target') 332 or raw_params.get('target_name') 333 or raw_params.get('target_table') 334 or raw_params.get('target_table_name') 335 ) 336 if explicit_target: 337 target_name = ( 338 replace_pipes_syntax(explicit_target, _pipe=pipe) 339 if isinstance(explicit_target, str) and '{{' in explicit_target 340 else explicit_target 341 ) 342 else: 343 target_name = pipe._target_legacy() 344 if connector_is_sql and connector_flavor: 345 from meerschaum.utils.sql import truncate_item_name 346 target_name = truncate_item_name(target_name, connector_flavor) 347 targets_pipes[(schema, target_name)].append(pipe) 348 349 if not as_list and not as_tags_dict and not as_targets_dict: 350 return pipes 351 352 from meerschaum.utils.pipes import flatten_pipes_dict 353 pipes_list = flatten_pipes_dict(pipes) 354 if as_list: 355 return pipes_list 356 357 pool = get_pool(workers=(workers if connector.IS_THREAD_SAFE else 1)) 358 359 def gather_pipe_tags(pipe: mrsm.Pipe) -> Tuple[mrsm.Pipe, List[str]]: 360 _tags = pipe.__dict__.get('_tags', None) 361 gathered_tags = _tags if _tags is not None else pipe.tags 362 return pipe, (gathered_tags or []) 363 364 if as_tags_dict: 365 tags_pipes = defaultdict(lambda: []) 366 pipes_tags = dict(pool.map(gather_pipe_tags, pipes_list)) 367 for pipe, tags in pipes_tags.items(): 368 for tag in (tags or []): 369 tags_pipes[tag].append(pipe) 370 371 return dict(tags_pipes) 372 373 if as_targets_dict: 374 return dict(targets_pipes) 375 376 raise NotImplementedError("No futher options for returning pipes.")
Return a dictionary or list of meerschaum.Pipe objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*', fetch all possible keys. If a string begins with'_', select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keysfor formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keysfor formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- datetime_dtypes (Optional[List[str]], default None):
If provided, only include pipes with the corresponding
datetimeaxis dtypes. Accepted values aredatetime,int,None(ornull, etc.). May be negated by_. - params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}equates to'WHERE a = 1 AND b = 2' - mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.sql.SQLConnector.SQLConnectorormeerschaum.connectors.api.APIConnector.APIConnector. - as_list (bool, default False):
If
True, return pipes in a list instead of a hierarchical dictionary.False:{connector_keys: {metric_key: {location_key: Pipe}}}True:[Pipe] - as_tags_dict (bool, default False):
If
True, return a dictionary mapping tags to pipes. Pipes with multiple tags will be repeated. - as_targets_dict (bool, default False):
If
True, return a dictionary mapping(schema, target)tuples to pipes. Pipes sharing the same target across different schemata are grouped separately. - method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']If'registered'(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all', create pipes from predefined metrics and locations. Requiredconnector_keys. NOTE: Method'all'is not implemented! - workers (Optional[int], default None):
If provided (and
as_tags_dictoras_targets_dictisTrue), set the number of workers for the pool to fetch tags or targets. Only takes effect if the instance connector supports multi-threading. - **kw (Any:):
Keyword arguments to pass to the
meerschaum.Pipeconstructor.
Returns
- A dictionary of dictionaries and
meerschaum.Pipeobjects - in the connector, metric, location hierarchy.
- If
as_listisTrue, return a list ofmeerschaum.Pipeobjects. - If
as_tags_dictisTrue, return a dictionary mapping tags to pipes. - If
as_targets_dictisTrue, return a dictionary mapping targets to pipes.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[Pipe('sql:main', 'weather')]
>>> get_pipes(as_tags_dict=True)
{'gvl': Pipe('sql:main', 'weather')}
def
fetch_pipes_keys( method: str, connector: meerschaum.InstanceConnector, **kw: Any) -> Union[List[Tuple[str, str, str]], List[Tuple[str, str, str, Union[str, Dict[str, Any]]]]]:
379def fetch_pipes_keys( 380 method: str, 381 connector: 'mrsm.connectors.InstanceConnector', 382 **kw: Any 383) -> Union[List[Tuple[str, str, str]], List[Tuple[str, str, str, Union[str, Dict[str, Any]]]]]: 384 """ 385 Fetch keys for pipes according to a method. 386 387 Parameters 388 ---------- 389 method: str 390 The method by which to fetch the keys. See `get_pipes()` above. 391 392 connector: meerschaum.connectors.InstanceConnector 393 The connector to use to fetch the keys. 394 Must be of type `meerschaum.connectors.sql.SQLConnector.SQLConnector` 395 or `meerschaum.connectors.api.APIConnector.APIConnector`. 396 397 connector_keys: Optional[List[str]], default None 398 The list of `connector_keys` to filter by. 399 400 metric_keys: Optional[List[str]], default None 401 The list of `metric_keys` to filter by. 402 403 location_keys: Optional[List[str]], default None 404 The list of `location_keys` to filter by. 405 406 params: Optional[Dict[str, Any]], default None 407 A dictionary of parameters to filter by. 408 409 debug: bool 410 Verbosity toggle. 411 412 Returns 413 ------- 414 A list or a dictionary (with pipe IDs as indices) of tuples of strings (or `None` for `location_key`) 415 in the form `(connector_keys, metric_key, location_key)`. 416 Optionally the parameters or tags may be returned alongside the keys. 417 Note the return value depends on the instance connector implementation. 418 419 Examples 420 -------- 421 >>> fetch_pipes_keys(metric_keys=['weather']) 422 {1: ('sql:main', 'weather', None, {'columns': {'datetime': 'ts', 'id': 'station'}})} 423 """ 424 from meerschaum.utils.warnings import error 425 426 def _registered( 427 connector_keys: Optional[List[str]] = None, 428 metric_keys: Optional[List[str]] = None, 429 location_keys: Optional[List[str]] = None, 430 tags: Optional[List[str]] = None, 431 params: Optional[Dict[str, Any]] = None, 432 debug: bool = False, 433 **kw 434 ) -> List[Tuple[str, str, str]]: 435 """ 436 Get keys from the pipes table or the API directly. 437 Builds query or URL based on provided keys and parameters. 438 439 Only works for SQL and API Connectors. 440 """ 441 if connector_keys is None: 442 connector_keys = [] 443 if metric_keys is None: 444 metric_keys = [] 445 if location_keys is None: 446 location_keys = [] 447 if params is None: 448 params = {} 449 if tags is None: 450 tags = [] 451 452 return connector.fetch_pipes_keys( 453 connector_keys = connector_keys, 454 metric_keys = metric_keys, 455 location_keys = location_keys, 456 tags = tags, 457 params = params, 458 debug = debug 459 ) 460 461 def _explicit( 462 connector_keys: Optional[List[str]] = None, 463 metric_keys: Optional[List[str]] = None, 464 location_keys: Optional[List[str]] = None, 465 params: Optional[Dict[str, Any]] = None, 466 tags: Optional[List[str]] = None, 467 debug: bool = False, 468 **kw 469 ) -> List[Tuple[str, str, str]]: 470 """ 471 Explicitly build Pipes based on provided keys. 472 Raises an error if `connector_keys` or `metric_keys` is empty, 473 and assumes `location_keys = [None]` if empty. 474 """ 475 476 if connector_keys is None: 477 connector_keys = [] 478 if metric_keys is None: 479 metric_keys = [] 480 if location_keys is None: 481 location_keys = [] 482 if params is None: 483 params = {} 484 485 if not isinstance(connector_keys, list): 486 connector_keys = [connector_keys] 487 if not isinstance(metric_keys, list): 488 metric_keys = [metric_keys] 489 if not isinstance(location_keys, list): 490 location_keys = [location_keys] 491 492 missing_keys = [] 493 if len(connector_keys) == 0: 494 missing_keys.append('connector_keys') 495 if len(metric_keys) == 0: 496 missing_keys.append('metric_keys') 497 if len(location_keys) == 0: 498 location_keys.append(None) 499 if len(missing_keys) > 0: 500 error_message = "Missing parameters: '" + "', '".join(missing_keys) + "'" 501 error_message += "\nSee --help for information for passing parameters." 502 error(error_message) 503 result = [] 504 for ck in connector_keys: 505 for mk in metric_keys: 506 for lk in location_keys: 507 result.append((ck, mk, lk)) 508 return result 509 510 _method_functions = { 511 'registered' : _registered, 512 'explicit' : _explicit, 513 } 514 if method not in _method_functions: 515 error(f"Method '{method}' is not supported!", NotImplementedError) 516 return _method_functions[method](**kw)
Fetch keys for pipes according to a method.
Parameters
- method (str):
The method by which to fetch the keys. See
get_pipes()above. - connector (meerschaum.connectors.InstanceConnector):
The connector to use to fetch the keys.
Must be of type
meerschaum.connectors.sql.SQLConnector.SQLConnectorormeerschaum.connectors.api.APIConnector.APIConnector. - connector_keys (Optional[List[str]], default None):
The list of
connector_keysto filter by. - metric_keys (Optional[List[str]], default None):
The list of
metric_keysto filter by. - location_keys (Optional[List[str]], default None):
The list of
location_keysto filter by. - params (Optional[Dict[str, Any]], default None): A dictionary of parameters to filter by.
- debug (bool): Verbosity toggle.
Returns
- A list or a dictionary (with pipe IDs as indices) of tuples of strings (or
Noneforlocation_key) - in the form
(connector_keys, metric_key, location_key). - Optionally the parameters or tags may be returned alongside the keys.
- Note the return value depends on the instance connector implementation.
Examples
>>> fetch_pipes_keys(metric_keys=['weather'])
{1: ('sql:main', 'weather', None, {'columns': {'datetime': 'ts', 'id': 'station'}})}