meerschaum.config.environment

Patch the runtime configuration from environment variables.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Patch the runtime configuration from environment variables.
  7"""
  8
  9import os
 10import re
 11import json
 12import contextlib
 13import copy
 14import pathlib
 15
 16from meerschaum.utils.typing import List, Union, Dict, Any, Optional
 17from meerschaum._internal.static import STATIC_CONFIG
 18
 19
 20def apply_environment_patches(env: Optional[Dict[str, Any]] = None) -> None:
 21    """
 22    Apply patches defined in `MRSM_CONFIG` and `MRSM_PATCH`.
 23    """
 24    config_var = STATIC_CONFIG['environment']['config']
 25    patch_var = STATIC_CONFIG['environment']['patch']
 26    apply_environment_config(config_var, env=env)
 27    apply_environment_config(patch_var, env=env)
 28
 29
 30def apply_environment_config(env_var: str, env: Optional[Dict[str, Any]] = None) -> None:
 31    """
 32    Parse a dictionary (simple or JSON) from an environment variable
 33    and apply it to the current configuration.
 34    """
 35    from meerschaum.config import get_config, set_config, _config
 36    from meerschaum.config._patch import apply_patch_to_config
 37
 38    env = env if env is not None else os.environ
 39
 40    if env_var not in env:
 41        return
 42
 43    from meerschaum.utils.misc import string_to_dict
 44    try:
 45        _patch = string_to_dict(str(os.environ[env_var]).lstrip())
 46    except Exception:
 47        _patch = None
 48
 49    error_msg = (
 50        f"Environment variable {env_var} is set but cannot be parsed.\n"
 51        f"Unset {env_var} or change to JSON or simplified dictionary format "
 52        "(see --help, under params for formatting)\n"
 53        f"{env_var} is set to:\n{os.environ[env_var]}\n"
 54        f"Skipping patching os environment into config..."
 55    )
 56
 57    if not isinstance(_patch, dict):
 58        print(error_msg)
 59        return
 60
 61    valids = []
 62
 63    def load_key(key: str) -> Union[Dict[str, Any], None]:
 64        try:
 65            c = get_config(key, warn=False)
 66        except Exception:
 67            c = None
 68        return c
 69
 70    ### This was multi-threaded, but I ran into all sorts of locking issues.
 71    keys = list(_patch.keys())
 72    for key in keys:
 73        _ = load_key(key)
 74
 75    ### Load and patch config files.
 76    set_config(
 77        apply_patch_to_config(
 78            _config(),
 79            _patch,
 80        )
 81    )
 82
 83
 84def apply_environment_uris(env: Optional[Dict[str, Any]] = None) -> None:
 85    """
 86    Patch temporary connectors defined in environment variables which start with
 87    `MRSM_SQL_` or `MRSM_API_`.
 88    """
 89    for env_var in get_connector_env_vars(env=env):
 90        apply_connector_uri(env_var, env=env)
 91
 92
 93def get_connector_env_regex() -> str:
 94    """
 95    Return the regex pattern for valid environment variable names for instance connectors.
 96    """
 97    return STATIC_CONFIG['environment']['uri_regex']
 98
 99
100def get_connector_env_vars(env: Optional[Dict[str, Any]] = None) -> List[str]:
101    """
102    Get the names of the environment variables which match the Meerschaum connector regex.
103
104    Examples
105    --------
106    >>> get_connector_environment_vars()
107    ['MRSM_SQL_FOO']
108    """
109    uri_regex = get_connector_env_regex()
110    env_vars = []
111
112    env = env if env is not None else os.environ
113
114    for env_var in env:
115        matched = re.match(uri_regex, env_var)
116        if matched is None:
117            continue
118        if env_var in STATIC_CONFIG['environment'].values():
119            continue
120        env_vars.append(env_var)
121
122    return env_vars
123
124
125def apply_connector_uri(env_var: str, env: Optional[Dict[str, Any]] = None) -> None:
126    """
127    Parse and validate a URI obtained from an environment variable.
128    """
129    from meerschaum.config import get_config, set_config, _config
130    from meerschaum.config._patch import apply_patch_to_config
131    from meerschaum.config._read_config import search_and_substitute_config
132    from meerschaum.utils.warnings import warn
133
134    env = env if env is not None else os.environ
135
136    if env_var not in env:
137        return
138
139    uri_regex = get_connector_env_regex()
140    matched = re.match(uri_regex, env_var)
141    groups = matched.groups()
142    typ, label = groups[0].lower(), groups[1].lower()
143    if not typ or not label:
144        return
145
146    uri = env[env_var]
147
148    if uri.lstrip().startswith('{') and uri.rstrip().endswith('}'):
149        try:
150            conn_attrs = json.loads(uri)
151        except Exception:
152            warn(f"Unable to parse JSON for environment connector '{typ}:{label}'.")
153            conn_attrs = {'uri': uri}
154    else:
155        conn_attrs = {'uri': uri}
156
157    set_config(
158        apply_patch_to_config(
159            {'meerschaum': get_config('meerschaum')},
160            {'meerschaum': {'connectors': {typ: {label: conn_attrs}}}},
161        )
162    )
163
164
165def get_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
166    """
167    Return all environment variables which begin with `'MRSM_'`.
168    """
169    prefix = STATIC_CONFIG['environment']['prefix']
170    env = env if env is not None else os.environ
171    return {
172        env_var: env_val
173        for env_var, env_val in env.items()
174        if env_var.startswith(prefix)
175    }
176
177
178def get_daemon_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
179    """
180    Return the daemon-specific environment vars in the current environment.
181    """
182    env = env if env is not None else os.environ
183
184    daemon_env_var_names = (
185        STATIC_CONFIG['environment']['systemd_log_path'],
186        STATIC_CONFIG['environment']['systemd_result_path'],
187        STATIC_CONFIG['environment']['systemd_delete_job'],
188        STATIC_CONFIG['environment']['systemd_stdin_path'],
189        STATIC_CONFIG['environment']['daemon_id'],
190    )
191    return {
192        env_var: env.get(env_var, '')
193        for env_var in daemon_env_var_names
194        if env_var in env
195    }
196
197
198@contextlib.contextmanager
199def replace_env(env: Union[Dict[str, Any], None]):
200    """
201    Temporarily replace environment variables and current configuration.
202
203    Parameters
204    ----------
205    env: Dict[str, Any]
206        The new environment dictionary to be patched on `os.environ`.
207    """
208    if env is None:
209        try:
210            yield
211        except Exception:
212            pass
213        return
214
215    from meerschaum.config import _config, set_config
216    import meerschaum.config.paths as paths
217
218    old_environ = dict(os.environ)
219    old_config = copy.deepcopy(_config())
220    old_root_dir_path = paths.ROOT_DIR_PATH
221    old_plugins_dir_paths = paths.PLUGINS_DIR_PATHS
222    old_venvs_dir_path = paths.VIRTENV_RESOURCES_PATH
223    old_config_dir_path = paths.CONFIG_DIR_PATH
224
225    os.environ.update(env)
226
227    root_dir_env_var = STATIC_CONFIG['environment']['root']
228    plugins_dir_env_var = STATIC_CONFIG['environment']['plugins']
229    config_dir_env_var = STATIC_CONFIG['environment']['config_dir']
230    venvs_dir_env_var = STATIC_CONFIG['environment']['venvs']
231
232    replaced_root = False
233    if root_dir_env_var in env:
234        root_dir_path = pathlib.Path(env[root_dir_env_var])
235        paths.set_root(root_dir_path)
236        replaced_root = True
237
238    replaced_plugins = False
239    if plugins_dir_env_var in env:
240        plugins_dir_paths = env[plugins_dir_env_var]
241        paths.set_plugins_dir_paths(plugins_dir_paths)
242        replaced_plugins = True
243
244    replaced_venvs = False
245    if venvs_dir_env_var in env:
246        venv_dir_path = pathlib.Path(env[venvs_dir_env_var])
247        paths.set_venvs_dir_path(venv_dir_path)
248        replaced_venvs = True
249
250    replaced_config_dir = False
251    if config_dir_env_var in env:
252        config_dir_path = pathlib.Path(env[config_dir_env_var])
253        paths.set_config_dir_path(config_dir_path)
254        replaced_config_dir = True
255
256    apply_environment_patches(env)
257    apply_environment_uris(env)
258
259    try:
260        yield
261    finally:
262        os.environ.clear()
263        os.environ.update(old_environ)
264
265        if replaced_root:
266            paths.set_root(old_root_dir_path)
267
268        if replaced_plugins:
269            paths.set_plugins_dir_paths(old_plugins_dir_paths)
270
271        if replaced_venvs:
272            paths.set_venvs_dir_path(old_venvs_dir_path)
273
274        if replaced_config_dir:
275            paths.set_config_dir_path(old_config_dir_path)
276
277        _config().clear()
278        set_config(old_config)
def apply_environment_patches(env: Optional[Dict[str, Any]] = None) -> None:
21def apply_environment_patches(env: Optional[Dict[str, Any]] = None) -> None:
22    """
23    Apply patches defined in `MRSM_CONFIG` and `MRSM_PATCH`.
24    """
25    config_var = STATIC_CONFIG['environment']['config']
26    patch_var = STATIC_CONFIG['environment']['patch']
27    apply_environment_config(config_var, env=env)
28    apply_environment_config(patch_var, env=env)

Apply patches defined in MRSM_CONFIG and MRSM_PATCH.

def apply_environment_config(env_var: str, env: Optional[Dict[str, Any]] = None) -> None:
31def apply_environment_config(env_var: str, env: Optional[Dict[str, Any]] = None) -> None:
32    """
33    Parse a dictionary (simple or JSON) from an environment variable
34    and apply it to the current configuration.
35    """
36    from meerschaum.config import get_config, set_config, _config
37    from meerschaum.config._patch import apply_patch_to_config
38
39    env = env if env is not None else os.environ
40
41    if env_var not in env:
42        return
43
44    from meerschaum.utils.misc import string_to_dict
45    try:
46        _patch = string_to_dict(str(os.environ[env_var]).lstrip())
47    except Exception:
48        _patch = None
49
50    error_msg = (
51        f"Environment variable {env_var} is set but cannot be parsed.\n"
52        f"Unset {env_var} or change to JSON or simplified dictionary format "
53        "(see --help, under params for formatting)\n"
54        f"{env_var} is set to:\n{os.environ[env_var]}\n"
55        f"Skipping patching os environment into config..."
56    )
57
58    if not isinstance(_patch, dict):
59        print(error_msg)
60        return
61
62    valids = []
63
64    def load_key(key: str) -> Union[Dict[str, Any], None]:
65        try:
66            c = get_config(key, warn=False)
67        except Exception:
68            c = None
69        return c
70
71    ### This was multi-threaded, but I ran into all sorts of locking issues.
72    keys = list(_patch.keys())
73    for key in keys:
74        _ = load_key(key)
75
76    ### Load and patch config files.
77    set_config(
78        apply_patch_to_config(
79            _config(),
80            _patch,
81        )
82    )

Parse a dictionary (simple or JSON) from an environment variable and apply it to the current configuration.

def apply_environment_uris(env: Optional[Dict[str, Any]] = None) -> None:
85def apply_environment_uris(env: Optional[Dict[str, Any]] = None) -> None:
86    """
87    Patch temporary connectors defined in environment variables which start with
88    `MRSM_SQL_` or `MRSM_API_`.
89    """
90    for env_var in get_connector_env_vars(env=env):
91        apply_connector_uri(env_var, env=env)

Patch temporary connectors defined in environment variables which start with MRSM_SQL_ or MRSM_API_.

def get_connector_env_regex() -> str:
94def get_connector_env_regex() -> str:
95    """
96    Return the regex pattern for valid environment variable names for instance connectors.
97    """
98    return STATIC_CONFIG['environment']['uri_regex']

Return the regex pattern for valid environment variable names for instance connectors.

def get_connector_env_vars(env: Optional[Dict[str, Any]] = None) -> List[str]:
101def get_connector_env_vars(env: Optional[Dict[str, Any]] = None) -> List[str]:
102    """
103    Get the names of the environment variables which match the Meerschaum connector regex.
104
105    Examples
106    --------
107    >>> get_connector_environment_vars()
108    ['MRSM_SQL_FOO']
109    """
110    uri_regex = get_connector_env_regex()
111    env_vars = []
112
113    env = env if env is not None else os.environ
114
115    for env_var in env:
116        matched = re.match(uri_regex, env_var)
117        if matched is None:
118            continue
119        if env_var in STATIC_CONFIG['environment'].values():
120            continue
121        env_vars.append(env_var)
122
123    return env_vars

Get the names of the environment variables which match the Meerschaum connector regex.

Examples
>>> get_connector_environment_vars()
['MRSM_SQL_FOO']
def apply_connector_uri(env_var: str, env: Optional[Dict[str, Any]] = None) -> None:
126def apply_connector_uri(env_var: str, env: Optional[Dict[str, Any]] = None) -> None:
127    """
128    Parse and validate a URI obtained from an environment variable.
129    """
130    from meerschaum.config import get_config, set_config, _config
131    from meerschaum.config._patch import apply_patch_to_config
132    from meerschaum.config._read_config import search_and_substitute_config
133    from meerschaum.utils.warnings import warn
134
135    env = env if env is not None else os.environ
136
137    if env_var not in env:
138        return
139
140    uri_regex = get_connector_env_regex()
141    matched = re.match(uri_regex, env_var)
142    groups = matched.groups()
143    typ, label = groups[0].lower(), groups[1].lower()
144    if not typ or not label:
145        return
146
147    uri = env[env_var]
148
149    if uri.lstrip().startswith('{') and uri.rstrip().endswith('}'):
150        try:
151            conn_attrs = json.loads(uri)
152        except Exception:
153            warn(f"Unable to parse JSON for environment connector '{typ}:{label}'.")
154            conn_attrs = {'uri': uri}
155    else:
156        conn_attrs = {'uri': uri}
157
158    set_config(
159        apply_patch_to_config(
160            {'meerschaum': get_config('meerschaum')},
161            {'meerschaum': {'connectors': {typ: {label: conn_attrs}}}},
162        )
163    )

Parse and validate a URI obtained from an environment variable.

def get_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
166def get_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
167    """
168    Return all environment variables which begin with `'MRSM_'`.
169    """
170    prefix = STATIC_CONFIG['environment']['prefix']
171    env = env if env is not None else os.environ
172    return {
173        env_var: env_val
174        for env_var, env_val in env.items()
175        if env_var.startswith(prefix)
176    }

Return all environment variables which begin with 'MRSM_'.

def get_daemon_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
179def get_daemon_env_vars(env: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
180    """
181    Return the daemon-specific environment vars in the current environment.
182    """
183    env = env if env is not None else os.environ
184
185    daemon_env_var_names = (
186        STATIC_CONFIG['environment']['systemd_log_path'],
187        STATIC_CONFIG['environment']['systemd_result_path'],
188        STATIC_CONFIG['environment']['systemd_delete_job'],
189        STATIC_CONFIG['environment']['systemd_stdin_path'],
190        STATIC_CONFIG['environment']['daemon_id'],
191    )
192    return {
193        env_var: env.get(env_var, '')
194        for env_var in daemon_env_var_names
195        if env_var in env
196    }

Return the daemon-specific environment vars in the current environment.

@contextlib.contextmanager
def replace_env(env: Optional[Dict[str, Any]]):
199@contextlib.contextmanager
200def replace_env(env: Union[Dict[str, Any], None]):
201    """
202    Temporarily replace environment variables and current configuration.
203
204    Parameters
205    ----------
206    env: Dict[str, Any]
207        The new environment dictionary to be patched on `os.environ`.
208    """
209    if env is None:
210        try:
211            yield
212        except Exception:
213            pass
214        return
215
216    from meerschaum.config import _config, set_config
217    import meerschaum.config.paths as paths
218
219    old_environ = dict(os.environ)
220    old_config = copy.deepcopy(_config())
221    old_root_dir_path = paths.ROOT_DIR_PATH
222    old_plugins_dir_paths = paths.PLUGINS_DIR_PATHS
223    old_venvs_dir_path = paths.VIRTENV_RESOURCES_PATH
224    old_config_dir_path = paths.CONFIG_DIR_PATH
225
226    os.environ.update(env)
227
228    root_dir_env_var = STATIC_CONFIG['environment']['root']
229    plugins_dir_env_var = STATIC_CONFIG['environment']['plugins']
230    config_dir_env_var = STATIC_CONFIG['environment']['config_dir']
231    venvs_dir_env_var = STATIC_CONFIG['environment']['venvs']
232
233    replaced_root = False
234    if root_dir_env_var in env:
235        root_dir_path = pathlib.Path(env[root_dir_env_var])
236        paths.set_root(root_dir_path)
237        replaced_root = True
238
239    replaced_plugins = False
240    if plugins_dir_env_var in env:
241        plugins_dir_paths = env[plugins_dir_env_var]
242        paths.set_plugins_dir_paths(plugins_dir_paths)
243        replaced_plugins = True
244
245    replaced_venvs = False
246    if venvs_dir_env_var in env:
247        venv_dir_path = pathlib.Path(env[venvs_dir_env_var])
248        paths.set_venvs_dir_path(venv_dir_path)
249        replaced_venvs = True
250
251    replaced_config_dir = False
252    if config_dir_env_var in env:
253        config_dir_path = pathlib.Path(env[config_dir_env_var])
254        paths.set_config_dir_path(config_dir_path)
255        replaced_config_dir = True
256
257    apply_environment_patches(env)
258    apply_environment_uris(env)
259
260    try:
261        yield
262    finally:
263        os.environ.clear()
264        os.environ.update(old_environ)
265
266        if replaced_root:
267            paths.set_root(old_root_dir_path)
268
269        if replaced_plugins:
270            paths.set_plugins_dir_paths(old_plugins_dir_paths)
271
272        if replaced_venvs:
273            paths.set_venvs_dir_path(old_venvs_dir_path)
274
275        if replaced_config_dir:
276            paths.set_config_dir_path(old_config_dir_path)
277
278        _config().clear()
279        set_config(old_config)

Temporarily replace environment variables and current configuration.

Parameters
  • env (Dict[str, Any]): The new environment dictionary to be patched on os.environ.