meerschaum.config.environment
Patch the runtime configuration from environment variables.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Patch the runtime configuration from environment variables. 7""" 8 9import os 10import re 11import json 12import contextlib 13import copy 14import pathlib 15 16from meerschaum.utils.typing import List, Union, Dict, Any, Optional 17from meerschaum._internal.static import STATIC_CONFIG 18 19 20def apply_environment_patches(env: Optional[Dict[str, Any]] = None) -> None: 21 """ 22 Apply patches defined in `MRSM_CONFIG` and `MRSM_PATCH`. 23 """ 24 config_var = STATIC_CONFIG['environment']['config'] 25 patch_var = STATIC_CONFIG['environment']['patch'] 26 apply_environment_config(config_var, env=env) 27 apply_environment_config(patch_var, env=env) 28 29 30def apply_environment_config(env_var: str, env: Optional[Dict[str, Any]] = None) -> None: 31 """ 32 Parse a dictionary (simple or JSON) from an environment variable 33 and apply it to the current configuration. 34 """ 35 from meerschaum.config import get_config, set_config, _config 36 from meerschaum.config._patch import apply_patch_to_config 37 38 env = env if env is not None else os.environ 39 40 if env_var not in env: 41 return 42 43 from meerschaum.utils.misc import string_to_dict 44 try: 45 _patch = string_to_dict(str(os.environ[env_var]).lstrip()) 46 except Exception: 47 _patch = None 48 49 error_msg = ( 50 f"Environment variable {env_var} is set but cannot be parsed.\n" 51 f"Unset {env_var} or change to JSON or simplified dictionary format " 52 "(see --help, under params for formatting)\n" 53 f"{env_var} is set to:\n{os.environ[env_var]}\n" 54 f"Skipping patching os environment into config..." 55 ) 56 57 if not isinstance(_patch, dict): 58 print(error_msg) 59 return 60 61 valids = [] 62 63 def load_key(key: str) -> Union[Dict[str, Any], None]: 64 try: 65 c = get_config(key, warn=False) 66 except Exception: 67 c = None 68 return c 69 70 ### This was multi-threaded, but I ran into all sorts of locking issues. 71 keys = list(_patch.keys()) 72 for key in keys: 73 _ = load_key(key) 74 75 ### Load and patch config files. 76 set_config( 77 apply_patch_to_config( 78 _config(), 79 _patch, 80 ) 81 ) 82 83 84def apply_environment_uris(env: Optional[Dict[str, Any]] = None) -> None: 85 """ 86 Patch temporary connectors defined in environment variables which start with 87 `MRSM_SQL_` or `MRSM_API_`. 88 """ 89 for env_var in get_connector_env_vars(env=env): 90 apply_connector_uri(env_var, env=env) 91 92 93def get_connector_env_regex() -> str: 94 """ 95 Return the regex pattern for valid environment variable names for instance connectors. 96 """ 97 return STATIC_CONFIG['environment']['uri_regex'] 98 99 100def get_connector_env_vars(env: Optional[Dict[str, Any]] = None) -> List[str]: 101 """ 102 Get the names of the environment variables which match the Meerschaum connector regex. 103 104 Examples 105 -------- 106 >>> get_connector_environment_vars() 107 ['MRSM_SQL_FOO'] 108 """ 109 uri_regex = get_connector_env_regex() 110 env_vars = [] 111 112 env = env if env is not None else os.environ 113 114 for env_var in env: 115 matched = re.match(uri_regex, env_var) 116 if matched is None: 117 continue 118 if env_var in STATIC_CONFIG['environment'].values(): 119 continue 120 env_vars.append(env_var) 121 122 return env_vars 123 124 125def apply_connector_uri(env_var: str, env: Optional[Dict[str, Any]] = None) -> None: 126 """ 127 Parse and validate a URI obtained from an environment variable. 128 """ 129 from meerschaum.config import get_config, set_config, _config 130 from meerschaum.config._patch import apply_patch_to_config 131 from meerschaum.config._read_config import search_and_substitute_config 132 from meerschaum.utils.warnings import warn 133 134 env = env if env is not None else os.environ 135 136 if env_var not in env: 137 return 138 139 uri_regex = get_connector_env_regex() 140 matched = re.match(uri_regex, env_var) 141 groups = matched.groups() 142 typ, label = groups[0].lower(), groups[1].lower() 143 if not typ or not label: 144 return 145 146 uri = env[env_var] 147 148 if uri.lstrip().startswith('{') and uri.rstrip().endswith('}'): 149 try: 150 conn_attrs = json.loads(uri) 151 except Exception: 152 warn(f"Unable to parse JSON for environment connector '{typ}:{label}'.") 153 conn_attrs = {'uri': uri} 154 else: 155 conn_attrs = {'uri': uri} 156 157 set_config( 158 apply_patch_to_config( 159 {'meerschaum': get_config('meerschaum')}, 160 {'meerschaum': {'connectors': {typ: {label: conn_attrs}}}}, 161 ) 162 ) 163 164 165def get_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 166 """ 167 Return all environment variables which begin with `'MRSM_'`. 168 """ 169 prefix = STATIC_CONFIG['environment']['prefix'] 170 env = env if env is not None else os.environ 171 return { 172 env_var: env_val 173 for env_var, env_val in env.items() 174 if env_var.startswith(prefix) 175 } 176 177 178def get_daemon_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, str]: 179 """ 180 Return the daemon-specific environment vars in the current environment. 181 """ 182 env = env if env is not None else os.environ 183 184 daemon_env_var_names = ( 185 STATIC_CONFIG['environment']['systemd_log_path'], 186 STATIC_CONFIG['environment']['systemd_result_path'], 187 STATIC_CONFIG['environment']['systemd_delete_job'], 188 STATIC_CONFIG['environment']['systemd_stdin_path'], 189 STATIC_CONFIG['environment']['daemon_id'], 190 ) 191 return { 192 env_var: env.get(env_var, '') 193 for env_var in daemon_env_var_names 194 if env_var in env 195 } 196 197 198@contextlib.contextmanager 199def replace_env(env: Union[Dict[str, Any], None]): 200 """ 201 Temporarily replace environment variables and current configuration. 202 203 Parameters 204 ---------- 205 env: Dict[str, Any] 206 The new environment dictionary to be patched on `os.environ`. 207 """ 208 if env is None: 209 try: 210 yield 211 except Exception: 212 pass 213 return 214 215 from meerschaum.config import _config, set_config 216 import meerschaum.config.paths as paths 217 218 old_environ = dict(os.environ) 219 old_config = copy.deepcopy(_config()) 220 old_root_dir_path = paths.ROOT_DIR_PATH 221 old_plugins_dir_paths = paths.PLUGINS_DIR_PATHS 222 old_venvs_dir_path = paths.VIRTENV_RESOURCES_PATH 223 old_config_dir_path = paths.CONFIG_DIR_PATH 224 225 os.environ.update(env) 226 227 root_dir_env_var = STATIC_CONFIG['environment']['root'] 228 plugins_dir_env_var = STATIC_CONFIG['environment']['plugins'] 229 config_dir_env_var = STATIC_CONFIG['environment']['config_dir'] 230 venvs_dir_env_var = STATIC_CONFIG['environment']['venvs'] 231 232 replaced_root = False 233 if root_dir_env_var in env: 234 root_dir_path = pathlib.Path(env[root_dir_env_var]) 235 paths.set_root(root_dir_path) 236 replaced_root = True 237 238 replaced_plugins = False 239 if plugins_dir_env_var in env: 240 plugins_dir_paths = env[plugins_dir_env_var] 241 paths.set_plugins_dir_paths(plugins_dir_paths) 242 replaced_plugins = True 243 244 replaced_venvs = False 245 if venvs_dir_env_var in env: 246 venv_dir_path = pathlib.Path(env[venvs_dir_env_var]) 247 paths.set_venvs_dir_path(venv_dir_path) 248 replaced_venvs = True 249 250 replaced_config_dir = False 251 if config_dir_env_var in env: 252 config_dir_path = pathlib.Path(env[config_dir_env_var]) 253 paths.set_config_dir_path(config_dir_path) 254 replaced_config_dir = True 255 256 apply_environment_patches(env) 257 apply_environment_uris(env) 258 259 try: 260 yield 261 finally: 262 os.environ.clear() 263 os.environ.update(old_environ) 264 265 if replaced_root: 266 paths.set_root(old_root_dir_path) 267 268 if replaced_plugins: 269 paths.set_plugins_dir_paths(old_plugins_dir_paths) 270 271 if replaced_venvs: 272 paths.set_venvs_dir_path(old_venvs_dir_path) 273 274 if replaced_config_dir: 275 paths.set_config_dir_path(old_config_dir_path) 276 277 _config().clear() 278 set_config(old_config)
21def apply_environment_patches(env: Optional[Dict[str, Any]] = None) -> None: 22 """ 23 Apply patches defined in `MRSM_CONFIG` and `MRSM_PATCH`. 24 """ 25 config_var = STATIC_CONFIG['environment']['config'] 26 patch_var = STATIC_CONFIG['environment']['patch'] 27 apply_environment_config(config_var, env=env) 28 apply_environment_config(patch_var, env=env)
Apply patches defined in MRSM_CONFIG and MRSM_PATCH.
31def apply_environment_config(env_var: str, env: Optional[Dict[str, Any]] = None) -> None: 32 """ 33 Parse a dictionary (simple or JSON) from an environment variable 34 and apply it to the current configuration. 35 """ 36 from meerschaum.config import get_config, set_config, _config 37 from meerschaum.config._patch import apply_patch_to_config 38 39 env = env if env is not None else os.environ 40 41 if env_var not in env: 42 return 43 44 from meerschaum.utils.misc import string_to_dict 45 try: 46 _patch = string_to_dict(str(os.environ[env_var]).lstrip()) 47 except Exception: 48 _patch = None 49 50 error_msg = ( 51 f"Environment variable {env_var} is set but cannot be parsed.\n" 52 f"Unset {env_var} or change to JSON or simplified dictionary format " 53 "(see --help, under params for formatting)\n" 54 f"{env_var} is set to:\n{os.environ[env_var]}\n" 55 f"Skipping patching os environment into config..." 56 ) 57 58 if not isinstance(_patch, dict): 59 print(error_msg) 60 return 61 62 valids = [] 63 64 def load_key(key: str) -> Union[Dict[str, Any], None]: 65 try: 66 c = get_config(key, warn=False) 67 except Exception: 68 c = None 69 return c 70 71 ### This was multi-threaded, but I ran into all sorts of locking issues. 72 keys = list(_patch.keys()) 73 for key in keys: 74 _ = load_key(key) 75 76 ### Load and patch config files. 77 set_config( 78 apply_patch_to_config( 79 _config(), 80 _patch, 81 ) 82 )
Parse a dictionary (simple or JSON) from an environment variable and apply it to the current configuration.
85def apply_environment_uris(env: Optional[Dict[str, Any]] = None) -> None: 86 """ 87 Patch temporary connectors defined in environment variables which start with 88 `MRSM_SQL_` or `MRSM_API_`. 89 """ 90 for env_var in get_connector_env_vars(env=env): 91 apply_connector_uri(env_var, env=env)
Patch temporary connectors defined in environment variables which start with
MRSM_SQL_ or MRSM_API_.
94def get_connector_env_regex() -> str: 95 """ 96 Return the regex pattern for valid environment variable names for instance connectors. 97 """ 98 return STATIC_CONFIG['environment']['uri_regex']
Return the regex pattern for valid environment variable names for instance connectors.
101def get_connector_env_vars(env: Optional[Dict[str, Any]] = None) -> List[str]: 102 """ 103 Get the names of the environment variables which match the Meerschaum connector regex. 104 105 Examples 106 -------- 107 >>> get_connector_environment_vars() 108 ['MRSM_SQL_FOO'] 109 """ 110 uri_regex = get_connector_env_regex() 111 env_vars = [] 112 113 env = env if env is not None else os.environ 114 115 for env_var in env: 116 matched = re.match(uri_regex, env_var) 117 if matched is None: 118 continue 119 if env_var in STATIC_CONFIG['environment'].values(): 120 continue 121 env_vars.append(env_var) 122 123 return env_vars
Get the names of the environment variables which match the Meerschaum connector regex.
Examples
>>> get_connector_environment_vars()
['MRSM_SQL_FOO']
126def apply_connector_uri(env_var: str, env: Optional[Dict[str, Any]] = None) -> None: 127 """ 128 Parse and validate a URI obtained from an environment variable. 129 """ 130 from meerschaum.config import get_config, set_config, _config 131 from meerschaum.config._patch import apply_patch_to_config 132 from meerschaum.config._read_config import search_and_substitute_config 133 from meerschaum.utils.warnings import warn 134 135 env = env if env is not None else os.environ 136 137 if env_var not in env: 138 return 139 140 uri_regex = get_connector_env_regex() 141 matched = re.match(uri_regex, env_var) 142 groups = matched.groups() 143 typ, label = groups[0].lower(), groups[1].lower() 144 if not typ or not label: 145 return 146 147 uri = env[env_var] 148 149 if uri.lstrip().startswith('{') and uri.rstrip().endswith('}'): 150 try: 151 conn_attrs = json.loads(uri) 152 except Exception: 153 warn(f"Unable to parse JSON for environment connector '{typ}:{label}'.") 154 conn_attrs = {'uri': uri} 155 else: 156 conn_attrs = {'uri': uri} 157 158 set_config( 159 apply_patch_to_config( 160 {'meerschaum': get_config('meerschaum')}, 161 {'meerschaum': {'connectors': {typ: {label: conn_attrs}}}}, 162 ) 163 )
Parse and validate a URI obtained from an environment variable.
166def get_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: 167 """ 168 Return all environment variables which begin with `'MRSM_'`. 169 """ 170 prefix = STATIC_CONFIG['environment']['prefix'] 171 env = env if env is not None else os.environ 172 return { 173 env_var: env_val 174 for env_var, env_val in env.items() 175 if env_var.startswith(prefix) 176 }
Return all environment variables which begin with 'MRSM_'.
179def get_daemon_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, str]: 180 """ 181 Return the daemon-specific environment vars in the current environment. 182 """ 183 env = env if env is not None else os.environ 184 185 daemon_env_var_names = ( 186 STATIC_CONFIG['environment']['systemd_log_path'], 187 STATIC_CONFIG['environment']['systemd_result_path'], 188 STATIC_CONFIG['environment']['systemd_delete_job'], 189 STATIC_CONFIG['environment']['systemd_stdin_path'], 190 STATIC_CONFIG['environment']['daemon_id'], 191 ) 192 return { 193 env_var: env.get(env_var, '') 194 for env_var in daemon_env_var_names 195 if env_var in env 196 }
Return the daemon-specific environment vars in the current environment.
199@contextlib.contextmanager 200def replace_env(env: Union[Dict[str, Any], None]): 201 """ 202 Temporarily replace environment variables and current configuration. 203 204 Parameters 205 ---------- 206 env: Dict[str, Any] 207 The new environment dictionary to be patched on `os.environ`. 208 """ 209 if env is None: 210 try: 211 yield 212 except Exception: 213 pass 214 return 215 216 from meerschaum.config import _config, set_config 217 import meerschaum.config.paths as paths 218 219 old_environ = dict(os.environ) 220 old_config = copy.deepcopy(_config()) 221 old_root_dir_path = paths.ROOT_DIR_PATH 222 old_plugins_dir_paths = paths.PLUGINS_DIR_PATHS 223 old_venvs_dir_path = paths.VIRTENV_RESOURCES_PATH 224 old_config_dir_path = paths.CONFIG_DIR_PATH 225 226 os.environ.update(env) 227 228 root_dir_env_var = STATIC_CONFIG['environment']['root'] 229 plugins_dir_env_var = STATIC_CONFIG['environment']['plugins'] 230 config_dir_env_var = STATIC_CONFIG['environment']['config_dir'] 231 venvs_dir_env_var = STATIC_CONFIG['environment']['venvs'] 232 233 replaced_root = False 234 if root_dir_env_var in env: 235 root_dir_path = pathlib.Path(env[root_dir_env_var]) 236 paths.set_root(root_dir_path) 237 replaced_root = True 238 239 replaced_plugins = False 240 if plugins_dir_env_var in env: 241 plugins_dir_paths = env[plugins_dir_env_var] 242 paths.set_plugins_dir_paths(plugins_dir_paths) 243 replaced_plugins = True 244 245 replaced_venvs = False 246 if venvs_dir_env_var in env: 247 venv_dir_path = pathlib.Path(env[venvs_dir_env_var]) 248 paths.set_venvs_dir_path(venv_dir_path) 249 replaced_venvs = True 250 251 replaced_config_dir = False 252 if config_dir_env_var in env: 253 config_dir_path = pathlib.Path(env[config_dir_env_var]) 254 paths.set_config_dir_path(config_dir_path) 255 replaced_config_dir = True 256 257 apply_environment_patches(env) 258 apply_environment_uris(env) 259 260 try: 261 yield 262 finally: 263 os.environ.clear() 264 os.environ.update(old_environ) 265 266 if replaced_root: 267 paths.set_root(old_root_dir_path) 268 269 if replaced_plugins: 270 paths.set_plugins_dir_paths(old_plugins_dir_paths) 271 272 if replaced_venvs: 273 paths.set_venvs_dir_path(old_venvs_dir_path) 274 275 if replaced_config_dir: 276 paths.set_config_dir_path(old_config_dir_path) 277 278 _config().clear() 279 set_config(old_config)
Temporarily replace environment variables and current configuration.
Parameters
- env (Dict[str, Any]):
The new environment dictionary to be patched on
os.environ.