meerschaum.utils.daemon
Manage Daemons via the Daemon class.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Manage Daemons via the `Daemon` class. 7""" 8 9from __future__ import annotations 10 11import os 12 13from meerschaum.utils.typing import SuccessTuple, List, Optional, Callable, Any, Union 14from meerschaum.utils.daemon.StdinFile import StdinFile 15from meerschaum.utils.daemon.Daemon import Daemon 16from meerschaum.utils.daemon.RotatingFile import RotatingFile 17from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor 18from meerschaum.utils.daemon._names import get_new_daemon_name 19 20 21__all__ = ( 22 'daemon_action', 23 'daemon_entry', 24 'get_daemons', 25 'get_daemon_ids', 26 'get_running_daemons', 27 'get_stopped_daemons', 28 'get_paused_daemons', 29 'get_filtered_daemons', 30 'get_new_daemon_name', 31 'run_daemon', 32 'running_in_daemon', 33 'Daemon', 34 'StdinFile', 35 'RotatingFile', 36 'FileDescriptorInterceptor', 37) 38 39_daemons = {} 40 41 42def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple: 43 """Parse sysargs and execute a Meerschaum action as a daemon. 44 45 Parameters 46 ---------- 47 sysargs: Optional[List[str]], default None 48 The command line arguments used in a Meerschaum action. 49 50 Returns 51 ------- 52 A SuccessTuple. 53 """ 54 from meerschaum._internal.entry import entry 55 _args = {} 56 if '--name' in sysargs or '--job-name' in sysargs: 57 from meerschaum._internal.arguments._parse_arguments import parse_arguments 58 _args = parse_arguments(sysargs) 59 filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')] 60 try: 61 label = shlex.join(filtered_sysargs) if sysargs else None 62 except Exception: 63 label = ' '.join(filtered_sysargs) if sysargs else None 64 65 name = _args.get('name', None) 66 daemon = None 67 if name: 68 try: 69 daemon = Daemon(daemon_id=name) 70 except Exception: 71 daemon = None 72 73 if daemon is not None: 74 existing_sysargs = daemon.properties['target']['args'][0] 75 existing_kwargs = parse_arguments(existing_sysargs) 76 77 ### Remove sysargs because flags are aliased. 78 _ = _args.pop('daemon', None) 79 _ = _args.pop('sysargs', None) 80 _ = _args.pop('filtered_sysargs', None) 81 debug = _args.pop('debug', None) 82 _args['sub_args'] = sorted(_args.get('sub_args', [])) 83 _ = existing_kwargs.pop('daemon', None) 84 _ = existing_kwargs.pop('sysargs', None) 85 _ = existing_kwargs.pop('filtered_sysargs', None) 86 _ = existing_kwargs.pop('debug', None) 87 existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', [])) 88 89 ### Only run if the kwargs equal or no actions are provided. 90 if existing_kwargs == _args or not _args.get('action', []): 91 if daemon.status == 'running': 92 return True, f"Daemon '{daemon}' is already running." 93 return daemon.run( 94 debug=debug, 95 allow_dirty_run=True, 96 ) 97 98 success_tuple = run_daemon( 99 entry, 100 filtered_sysargs, 101 daemon_id=_args.get('name', None) if _args else None, 102 label=label, 103 keep_daemon_output=('--rm' not in (sysargs or [])), 104 ) 105 return success_tuple 106 107 108def daemon_action(**kw) -> SuccessTuple: 109 """Execute a Meerschaum action as a daemon.""" 110 from meerschaum.utils.packages import run_python_package 111 from meerschaum.utils.threading import Thread 112 from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs 113 from meerschaum.actions import get_action 114 115 kw['daemon'] = True 116 kw['shell'] = False 117 118 action = kw.get('action', None) 119 if action and get_action(action) is None: 120 if not kw.get('allow_shell_job') and not kw.get('force'): 121 return False, ( 122 f"Action '{action}' isn't recognized.\n\n" 123 + " Include `--allow-shell-job`, `--force`, or `-f`\n " 124 + "to enable shell commands to run as Meerschaum jobs." 125 ) 126 127 sysargs = parse_dict_to_sysargs(kw) 128 rc = run_python_package('meerschaum', sysargs, venv=None, debug=False) 129 msg = "Success" if rc == 0 else f"Daemon returned code: {rc}" 130 return rc == 0, msg 131 132 133def run_daemon( 134 func: Callable[[Any], Any], 135 *args, 136 daemon_id: Optional[str] = None, 137 keep_daemon_output: bool = True, 138 allow_dirty_run: bool = False, 139 label: Optional[str] = None, 140 **kw 141) -> Any: 142 """Execute a function as a daemon.""" 143 daemon = Daemon( 144 func, 145 daemon_id=daemon_id, 146 target_args=[arg for arg in args], 147 target_kw=kw, 148 label=label, 149 ) 150 return daemon.run( 151 keep_daemon_output=keep_daemon_output, 152 allow_dirty_run=allow_dirty_run, 153 ) 154 155 156def get_daemons() -> List[Daemon]: 157 """ 158 Return all existing Daemons, sorted by end time. 159 """ 160 daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()] 161 daemons_status = {daemon: daemon.status for daemon in daemons} 162 running_daemons = { 163 daemon: daemons_status[daemon] 164 for daemon in daemons 165 if daemons_status[daemon] == 'running' 166 } 167 paused_daemons = { 168 daemon: daemons_status[daemon] 169 for daemon in daemons 170 if daemons_status[daemon] == 'paused' 171 } 172 stopped_daemons = { 173 daemon: daemons_status[daemon] 174 for daemon in daemons 175 if daemons_status[daemon] == 'stopped' 176 } 177 daemons_began = { 178 daemon: daemon.properties.get('process', {}).get('began', '9999') 179 for daemon in daemons 180 } 181 daemons_paused = { 182 daemon: daemon.properties.get('process', {}).get('paused', '9999') 183 for daemon in daemons 184 } 185 daemons_ended = { 186 daemon: daemon.properties.get('process', {}).get('ended', '9999') 187 for daemon in daemons 188 } 189 sorted_stopped_daemons = [ 190 daemon 191 for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x]) 192 ] 193 sorted_paused_daemons = [ 194 daemon 195 for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x]) 196 ] 197 sorted_running_daemons = [ 198 daemon 199 for daemon in sorted(running_daemons, key=lambda x: daemons_began[x]) 200 ] 201 return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons 202 203 204def get_daemon_ids() -> List[str]: 205 """ 206 Return the IDs of all daemons on disk. 207 """ 208 import meerschaum.config.paths as paths 209 return [ 210 daemon_dir 211 for daemon_dir in sorted(os.listdir(paths.DAEMON_RESOURCES_PATH)) 212 if (paths.DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists() 213 ] 214 215 216def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 217 """ 218 Return a list of currently running daemons. 219 """ 220 if daemons is None: 221 daemons = get_daemons() 222 return [ 223 d 224 for d in daemons 225 if d.status == 'running' 226 ] 227 228 229def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 230 """ 231 Return a list of active but paused daemons. 232 """ 233 if daemons is None: 234 daemons = get_daemons() 235 return [ 236 d 237 for d in daemons 238 if d.status == 'paused' 239 ] 240 241 242def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 243 """ 244 Return a list of stopped daemons. 245 """ 246 if daemons is None: 247 daemons = get_daemons() 248 249 return [ 250 d 251 for d in daemons 252 if d.status == 'stopped' 253 ] 254 255 256def get_filtered_daemons( 257 filter_list: Optional[List[str]] = None, 258 warn: bool = False, 259) -> List[Daemon]: 260 """ 261 Return a list of `Daemons` filtered by a list of `daemon_ids`. 262 Only `Daemons` that exist are returned. 263 264 If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`). 265 266 Parameters 267 ---------- 268 filter_list: Optional[List[str]], default None 269 List of `daemon_ids` to include. If `daemon_ids` is `None` or empty, 270 return all `Daemons`. 271 272 warn: bool, default False 273 If `True`, raise warnings for non-existent `daemon_ids`. 274 275 Returns 276 ------- 277 A list of Daemon objects. 278 279 """ 280 if not filter_list: 281 daemons = get_daemons() 282 return [d for d in daemons if not d.hidden] 283 284 from meerschaum.utils.warnings import warn as _warn 285 daemons = [] 286 for d_id in filter_list: 287 try: 288 d = Daemon(daemon_id=d_id) 289 _exists = d.path.exists() 290 except Exception: 291 _exists = False 292 if not _exists: 293 if warn: 294 _warn(f"Daemon '{d_id}' does not exist.", stack=False) 295 continue 296 daemons.append(d) 297 return daemons 298 299 300def running_in_daemon() -> bool: 301 """ 302 Return whether the current thread is running in a Daemon context. 303 """ 304 from meerschaum._internal.static import STATIC_CONFIG 305 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 306 return daemon_env_var in os.environ 307 308 309def get_current_daemon() -> Union[Daemon, None]: 310 """ 311 If running withing a daemon context, return the corresponding `Daemon`. 312 Otherwise return `None`. 313 """ 314 from meerschaum._internal.static import STATIC_CONFIG 315 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 316 daemon_id = os.environ.get(daemon_env_var, None) 317 if daemon_id is None: 318 return None 319 320 return _daemons.get(daemon_id, Daemon(daemon_id=daemon_id))
109def daemon_action(**kw) -> SuccessTuple: 110 """Execute a Meerschaum action as a daemon.""" 111 from meerschaum.utils.packages import run_python_package 112 from meerschaum.utils.threading import Thread 113 from meerschaum._internal.arguments._parse_arguments import parse_dict_to_sysargs 114 from meerschaum.actions import get_action 115 116 kw['daemon'] = True 117 kw['shell'] = False 118 119 action = kw.get('action', None) 120 if action and get_action(action) is None: 121 if not kw.get('allow_shell_job') and not kw.get('force'): 122 return False, ( 123 f"Action '{action}' isn't recognized.\n\n" 124 + " Include `--allow-shell-job`, `--force`, or `-f`\n " 125 + "to enable shell commands to run as Meerschaum jobs." 126 ) 127 128 sysargs = parse_dict_to_sysargs(kw) 129 rc = run_python_package('meerschaum', sysargs, venv=None, debug=False) 130 msg = "Success" if rc == 0 else f"Daemon returned code: {rc}" 131 return rc == 0, msg
Execute a Meerschaum action as a daemon.
43def daemon_entry(sysargs: Optional[List[str]] = None) -> SuccessTuple: 44 """Parse sysargs and execute a Meerschaum action as a daemon. 45 46 Parameters 47 ---------- 48 sysargs: Optional[List[str]], default None 49 The command line arguments used in a Meerschaum action. 50 51 Returns 52 ------- 53 A SuccessTuple. 54 """ 55 from meerschaum._internal.entry import entry 56 _args = {} 57 if '--name' in sysargs or '--job-name' in sysargs: 58 from meerschaum._internal.arguments._parse_arguments import parse_arguments 59 _args = parse_arguments(sysargs) 60 filtered_sysargs = [arg for arg in sysargs if arg not in ('-d', '--daemon')] 61 try: 62 label = shlex.join(filtered_sysargs) if sysargs else None 63 except Exception: 64 label = ' '.join(filtered_sysargs) if sysargs else None 65 66 name = _args.get('name', None) 67 daemon = None 68 if name: 69 try: 70 daemon = Daemon(daemon_id=name) 71 except Exception: 72 daemon = None 73 74 if daemon is not None: 75 existing_sysargs = daemon.properties['target']['args'][0] 76 existing_kwargs = parse_arguments(existing_sysargs) 77 78 ### Remove sysargs because flags are aliased. 79 _ = _args.pop('daemon', None) 80 _ = _args.pop('sysargs', None) 81 _ = _args.pop('filtered_sysargs', None) 82 debug = _args.pop('debug', None) 83 _args['sub_args'] = sorted(_args.get('sub_args', [])) 84 _ = existing_kwargs.pop('daemon', None) 85 _ = existing_kwargs.pop('sysargs', None) 86 _ = existing_kwargs.pop('filtered_sysargs', None) 87 _ = existing_kwargs.pop('debug', None) 88 existing_kwargs['sub_args'] = sorted(existing_kwargs.get('sub_args', [])) 89 90 ### Only run if the kwargs equal or no actions are provided. 91 if existing_kwargs == _args or not _args.get('action', []): 92 if daemon.status == 'running': 93 return True, f"Daemon '{daemon}' is already running." 94 return daemon.run( 95 debug=debug, 96 allow_dirty_run=True, 97 ) 98 99 success_tuple = run_daemon( 100 entry, 101 filtered_sysargs, 102 daemon_id=_args.get('name', None) if _args else None, 103 label=label, 104 keep_daemon_output=('--rm' not in (sysargs or [])), 105 ) 106 return success_tuple
Parse sysargs and execute a Meerschaum action as a daemon.
Parameters
- sysargs (Optional[List[str]], default None): The command line arguments used in a Meerschaum action.
Returns
- A SuccessTuple.
157def get_daemons() -> List[Daemon]: 158 """ 159 Return all existing Daemons, sorted by end time. 160 """ 161 daemons = [Daemon(daemon_id=d_id) for d_id in get_daemon_ids()] 162 daemons_status = {daemon: daemon.status for daemon in daemons} 163 running_daemons = { 164 daemon: daemons_status[daemon] 165 for daemon in daemons 166 if daemons_status[daemon] == 'running' 167 } 168 paused_daemons = { 169 daemon: daemons_status[daemon] 170 for daemon in daemons 171 if daemons_status[daemon] == 'paused' 172 } 173 stopped_daemons = { 174 daemon: daemons_status[daemon] 175 for daemon in daemons 176 if daemons_status[daemon] == 'stopped' 177 } 178 daemons_began = { 179 daemon: daemon.properties.get('process', {}).get('began', '9999') 180 for daemon in daemons 181 } 182 daemons_paused = { 183 daemon: daemon.properties.get('process', {}).get('paused', '9999') 184 for daemon in daemons 185 } 186 daemons_ended = { 187 daemon: daemon.properties.get('process', {}).get('ended', '9999') 188 for daemon in daemons 189 } 190 sorted_stopped_daemons = [ 191 daemon 192 for daemon in sorted(stopped_daemons, key=lambda x: daemons_ended[x]) 193 ] 194 sorted_paused_daemons = [ 195 daemon 196 for daemon in sorted(paused_daemons, key=lambda x: daemons_paused[x]) 197 ] 198 sorted_running_daemons = [ 199 daemon 200 for daemon in sorted(running_daemons, key=lambda x: daemons_began[x]) 201 ] 202 return sorted_stopped_daemons + sorted_paused_daemons + sorted_running_daemons
Return all existing Daemons, sorted by end time.
205def get_daemon_ids() -> List[str]: 206 """ 207 Return the IDs of all daemons on disk. 208 """ 209 import meerschaum.config.paths as paths 210 return [ 211 daemon_dir 212 for daemon_dir in sorted(os.listdir(paths.DAEMON_RESOURCES_PATH)) 213 if (paths.DAEMON_RESOURCES_PATH / daemon_dir / 'properties.json').exists() 214 ]
Return the IDs of all daemons on disk.
217def get_running_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 218 """ 219 Return a list of currently running daemons. 220 """ 221 if daemons is None: 222 daemons = get_daemons() 223 return [ 224 d 225 for d in daemons 226 if d.status == 'running' 227 ]
Return a list of currently running daemons.
243def get_stopped_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 244 """ 245 Return a list of stopped daemons. 246 """ 247 if daemons is None: 248 daemons = get_daemons() 249 250 return [ 251 d 252 for d in daemons 253 if d.status == 'stopped' 254 ]
Return a list of stopped daemons.
230def get_paused_daemons(daemons: Optional[List[Daemon]] = None) -> List[Daemon]: 231 """ 232 Return a list of active but paused daemons. 233 """ 234 if daemons is None: 235 daemons = get_daemons() 236 return [ 237 d 238 for d in daemons 239 if d.status == 'paused' 240 ]
Return a list of active but paused daemons.
257def get_filtered_daemons( 258 filter_list: Optional[List[str]] = None, 259 warn: bool = False, 260) -> List[Daemon]: 261 """ 262 Return a list of `Daemons` filtered by a list of `daemon_ids`. 263 Only `Daemons` that exist are returned. 264 265 If `filter_list` is `None` or empty, return all `Daemons` (from `get_daemons()`). 266 267 Parameters 268 ---------- 269 filter_list: Optional[List[str]], default None 270 List of `daemon_ids` to include. If `daemon_ids` is `None` or empty, 271 return all `Daemons`. 272 273 warn: bool, default False 274 If `True`, raise warnings for non-existent `daemon_ids`. 275 276 Returns 277 ------- 278 A list of Daemon objects. 279 280 """ 281 if not filter_list: 282 daemons = get_daemons() 283 return [d for d in daemons if not d.hidden] 284 285 from meerschaum.utils.warnings import warn as _warn 286 daemons = [] 287 for d_id in filter_list: 288 try: 289 d = Daemon(daemon_id=d_id) 290 _exists = d.path.exists() 291 except Exception: 292 _exists = False 293 if not _exists: 294 if warn: 295 _warn(f"Daemon '{d_id}' does not exist.", stack=False) 296 continue 297 daemons.append(d) 298 return daemons
Return a list of Daemons filtered by a list of daemon_ids.
Only Daemons that exist are returned.
If filter_list is None or empty, return all Daemons (from get_daemons()).
Parameters
- filter_list (Optional[List[str]], default None):
List of
daemon_idsto include. Ifdaemon_idsisNoneor empty, return allDaemons. - warn (bool, default False):
If
True, raise warnings for non-existentdaemon_ids.
Returns
- A list of Daemon objects.
117def get_new_daemon_name() -> str: 118 """ 119 Generate a new random name until a unique one is found 120 (up to ~6000 maximum possibilities). 121 """ 122 import meerschaum.config.paths as paths 123 existing_names = ( 124 os.listdir(paths.DAEMON_RESOURCES_PATH) 125 if paths.DAEMON_RESOURCES_PATH.exists() 126 else [] 127 ) 128 while True: 129 _name = generate_random_name() 130 if _name in existing_names: 131 continue 132 break 133 return _name
Generate a new random name until a unique one is found (up to ~6000 maximum possibilities).
134def run_daemon( 135 func: Callable[[Any], Any], 136 *args, 137 daemon_id: Optional[str] = None, 138 keep_daemon_output: bool = True, 139 allow_dirty_run: bool = False, 140 label: Optional[str] = None, 141 **kw 142) -> Any: 143 """Execute a function as a daemon.""" 144 daemon = Daemon( 145 func, 146 daemon_id=daemon_id, 147 target_args=[arg for arg in args], 148 target_kw=kw, 149 label=label, 150 ) 151 return daemon.run( 152 keep_daemon_output=keep_daemon_output, 153 allow_dirty_run=allow_dirty_run, 154 )
Execute a function as a daemon.
301def running_in_daemon() -> bool: 302 """ 303 Return whether the current thread is running in a Daemon context. 304 """ 305 from meerschaum._internal.static import STATIC_CONFIG 306 daemon_env_var = STATIC_CONFIG['environment']['daemon_id'] 307 return daemon_env_var in os.environ
Return whether the current thread is running in a Daemon context.
42class Daemon: 43 """ 44 Daemonize Python functions into background processes. 45 46 Examples 47 -------- 48 >>> import meerschaum as mrsm 49 >>> from meerschaum.utils.daemons import Daemon 50 >>> daemon = Daemon(print, ('hi',)) 51 >>> success, msg = daemon.run() 52 >>> print(daemon.log_text) 53 54 2024-07-29 18:03 | hi 55 2024-07-29 18:03 | 56 >>> daemon.run(allow_dirty_run=True) 57 >>> print(daemon.log_text) 58 59 2024-07-29 18:03 | hi 60 2024-07-29 18:03 | 61 2024-07-29 18:05 | hi 62 2024-07-29 18:05 | 63 >>> mrsm.pprint(daemon.properties) 64 { 65 'label': 'print', 66 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 67 'result': None, 68 'process': {'ended': '2024-07-29T18:03:33.752806'} 69 } 70 71 """ 72 73 def __new__( 74 cls, 75 *args, 76 daemon_id: Optional[str] = None, 77 **kw 78 ): 79 """ 80 If a daemon_id is provided and already exists, read from its pickle file. 81 """ 82 instance = super(Daemon, cls).__new__(cls) 83 if daemon_id is not None: 84 instance.daemon_id = daemon_id 85 if instance.pickle_path.exists(): 86 instance = instance.read_pickle() 87 return instance 88 89 @classmethod 90 def from_properties_file(cls, daemon_id: str) -> Daemon: 91 """ 92 Return a Daemon from a properties dictionary. 93 """ 94 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 95 if not properties_path.exists(): 96 raise OSError(f"Properties file '{properties_path}' does not exist.") 97 98 try: 99 with open(properties_path, 'r', encoding='utf-8') as f: 100 properties = json.load(f) 101 except Exception: 102 properties = {} 103 104 if not properties: 105 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 106 107 daemon_id = properties_path.parent.name 108 target_cf = properties.get('target', {}) 109 target_module_name = target_cf.get('module', None) 110 target_function_name = target_cf.get('name', None) 111 target_args = target_cf.get('args', None) 112 target_kw = target_cf.get('kw', None) 113 label = properties.get('label', None) 114 115 if None in [ 116 target_module_name, 117 target_function_name, 118 target_args, 119 target_kw, 120 ]: 121 raise ValueError("Missing target function information.") 122 123 target_module = importlib.import_module(target_module_name) 124 target_function = getattr(target_module, target_function_name) 125 126 return Daemon( 127 daemon_id=daemon_id, 128 target=target_function, 129 target_args=target_args, 130 target_kw=target_kw, 131 properties=properties, 132 label=label, 133 ) 134 135 136 def __init__( 137 self, 138 target: Optional[Callable[[Any], Any]] = None, 139 target_args: Union[List[Any], Tuple[Any], None] = None, 140 target_kw: Optional[Dict[str, Any]] = None, 141 env: Optional[Dict[str, str]] = None, 142 daemon_id: Optional[str] = None, 143 label: Optional[str] = None, 144 properties: Optional[Dict[str, Any]] = None, 145 pickle: bool = True, 146 ): 147 """ 148 Parameters 149 ---------- 150 target: Optional[Callable[[Any], Any]], default None, 151 The function to execute in a child process. 152 153 target_args: Union[List[Any], Tuple[Any], None], default None 154 Positional arguments to pass to the target function. 155 156 target_kw: Optional[Dict[str, Any]], default None 157 Keyword arguments to pass to the target function. 158 159 env: Optional[Dict[str, str]], default None 160 If provided, set these environment variables in the daemon process. 161 162 daemon_id: Optional[str], default None 163 Build a `Daemon` from an existing `daemon_id`. 164 If `daemon_id` is provided, other arguments are ignored and are derived 165 from the existing pickled `Daemon`. 166 167 label: Optional[str], default None 168 Label string to help identifiy a daemon. 169 If `None`, use the function name instead. 170 171 properties: Optional[Dict[str, Any]], default None 172 Override reading from the properties JSON by providing an existing dictionary. 173 """ 174 _pickle = self.__dict__.get('_pickle', False) 175 if daemon_id is not None: 176 self.daemon_id = daemon_id 177 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 178 179 if not self.properties_path.exists(): 180 raise Exception( 181 f"Daemon '{self.daemon_id}' does not exist. " 182 + "Pass a target to create a new Daemon." 183 ) 184 185 try: 186 new_daemon = self.from_properties_file(daemon_id) 187 except Exception: 188 new_daemon = None 189 190 if new_daemon is not None: 191 new_daemon.write_pickle() 192 target = new_daemon.target 193 target_args = new_daemon.target_args 194 target_kw = new_daemon.target_kw 195 label = new_daemon.label 196 self._properties = new_daemon.properties 197 else: 198 try: 199 self.properties_path.unlink() 200 except Exception: 201 pass 202 203 raise Exception( 204 f"Could not recover daemon '{self.daemon_id}' " 205 + "from its properties file." 206 ) 207 208 if 'target' not in self.__dict__: 209 if target is None: 210 error("Cannot create a Daemon without a target.") 211 self.target = target 212 213 self.pickle = pickle 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 elif label is not None: 229 self.label = label 230 231 if 'daemon_id' not in self.__dict__: 232 self.daemon_id = get_new_daemon_name() 233 if '_properties' not in self.__dict__: 234 self._properties = properties 235 elif properties: 236 if self._properties is None: 237 self._properties = {} 238 self._properties.update(properties) 239 if self._properties is None: 240 self._properties = {} 241 242 self._properties.update({'label': self.label}) 243 if env: 244 self._properties.update({'env': env}) 245 246 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 247 _ = self.process 248 249 250 def _run_exit( 251 self, 252 keep_daemon_output: bool = True, 253 allow_dirty_run: bool = False, 254 ) -> Any: 255 """Run the daemon's target function. 256 NOTE: This WILL EXIT the parent process! 257 258 Parameters 259 ---------- 260 keep_daemon_output: bool, default True 261 If `False`, delete the daemon's output directory upon exiting. 262 263 allow_dirty_run, bool, default False: 264 If `True`, run the daemon, even if the `daemon_id` directory exists. 265 This option is dangerous because if the same `daemon_id` runs twice, 266 the last to finish will overwrite the output of the first. 267 268 Returns 269 ------- 270 Nothing — this will exit the parent process. 271 """ 272 import platform 273 import sys 274 import os 275 import traceback 276 from meerschaum.utils.warnings import warn 277 from meerschaum.config import get_config 278 daemon = attempt_import('daemon') 279 lines = get_config('jobs', 'terminal', 'lines') 280 columns = get_config('jobs', 'terminal', 'columns') 281 282 if platform.system() == 'Windows': 283 return False, "Windows is no longer supported." 284 285 self._setup(allow_dirty_run) 286 287 _daemons.append(self) 288 289 logs_cf = self.properties.get('logs', {}) 290 log_refresh_seconds = logs_cf.get('refresh_files_seconds', None) 291 if log_refresh_seconds is None: 292 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 293 write_timestamps = logs_cf.get('write_timestamps', None) 294 if write_timestamps is None: 295 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 296 297 self._log_refresh_timer = RepeatTimer( 298 log_refresh_seconds, 299 partial(self.rotating_log.refresh_files, start_interception=write_timestamps), 300 ) 301 302 capture_stdin = logs_cf.get('stdin', True) 303 cwd = self.properties.get('cwd', os.getcwd()) 304 305 ### NOTE: The SIGINT handler has been removed so that child processes may handle 306 ### KeyboardInterrupts themselves. 307 ### The previous aggressive approach was redundant because of the SIGTERM handler. 308 self._daemon_context = daemon.DaemonContext( 309 pidfile=self.pid_lock, 310 stdout=self.rotating_log, 311 stderr=self.rotating_log, 312 stdin=(self.stdin_file if capture_stdin else None), 313 working_directory=cwd, 314 detach_process=True, 315 files_preserve=list(self.rotating_log.subfile_objects.values()), 316 signal_map={ 317 signal.SIGTERM: self._handle_sigterm, 318 }, 319 ) 320 321 if capture_stdin and sys.stdin is None: 322 raise OSError("Cannot daemonize without stdin.") 323 324 try: 325 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 326 with self._daemon_context: 327 if capture_stdin: 328 sys.stdin = self.stdin_file 329 _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None) 330 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 331 os.environ['PYTHONUNBUFFERED'] = '1' 332 333 ### Allow the user to override environment variables. 334 env = self.properties.get('env', {}) 335 if env and isinstance(env, dict): 336 os.environ.update({str(k): str(v) for k, v in env.items()}) 337 338 self.rotating_log.refresh_files(start_interception=True) 339 result = None 340 try: 341 with open(self.pid_path, 'w+', encoding='utf-8') as f: 342 f.write(str(os.getpid())) 343 344 ### NOTE: The timer fails to start for remote actions to localhost. 345 try: 346 if not self._log_refresh_timer.is_running(): 347 self._log_refresh_timer.start() 348 except Exception: 349 pass 350 351 self.properties['result'] = None 352 self._capture_process_timestamp('began') 353 result = self.target(*self.target_args, **self.target_kw) 354 self.properties['result'] = result 355 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 356 result = False, traceback.format_exc() 357 except Exception as e: 358 warn( 359 f"Exception in daemon target function: {traceback.format_exc()}", 360 ) 361 result = False, str(e) 362 finally: 363 _results[self.daemon_id] = result 364 self.properties['result'] = result 365 366 if keep_daemon_output: 367 self._capture_process_timestamp('ended') 368 else: 369 self.cleanup() 370 371 self._log_refresh_timer.cancel() 372 try: 373 if self.pid is None and self.pid_path.exists(): 374 self.pid_path.unlink() 375 except Exception: 376 pass 377 378 if is_success_tuple(result): 379 try: 380 mrsm.pprint(result) 381 except BrokenPipeError: 382 pass 383 384 except Exception: 385 daemon_error = traceback.format_exc() 386 import meerschaum.config.paths as paths 387 with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 388 f.write( 389 f"Error in Daemon '{self}':\n\n" 390 f"{sys.stdin=}\n" 391 f"{self.stdin_file_path=}\n" 392 f"{self.stdin_file_path.exists()=}\n\n" 393 f"{daemon_error}\n\n" 394 ) 395 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 396 397 def _capture_process_timestamp( 398 self, 399 process_key: str, 400 write_properties: bool = True, 401 ) -> None: 402 """ 403 Record the current timestamp to the parameters `process:<process_key>`. 404 405 Parameters 406 ---------- 407 process_key: str 408 Under which key to store the timestamp. 409 410 write_properties: bool, default True 411 If `True` persist the properties to disk immediately after capturing the timestamp. 412 """ 413 if 'process' not in self.properties: 414 self.properties['process'] = {} 415 416 if process_key not in ('began', 'ended', 'paused', 'stopped'): 417 raise ValueError(f"Invalid key '{process_key}'.") 418 419 self.properties['process'][process_key] = ( 420 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 421 ) 422 if write_properties: 423 self.write_properties() 424 425 def run( 426 self, 427 keep_daemon_output: bool = True, 428 allow_dirty_run: bool = False, 429 wait: bool = False, 430 timeout: Union[int, float] = 4, 431 debug: bool = False, 432 ) -> SuccessTuple: 433 """Run the daemon as a child process and continue executing the parent. 434 435 Parameters 436 ---------- 437 keep_daemon_output: bool, default True 438 If `False`, delete the daemon's output directory upon exiting. 439 440 allow_dirty_run: bool, default False 441 If `True`, run the daemon, even if the `daemon_id` directory exists. 442 This option is dangerous because if the same `daemon_id` runs concurrently, 443 the last to finish will overwrite the output of the first. 444 445 wait: bool, default True 446 If `True`, block until `Daemon.status` is running (or the timeout expires). 447 448 timeout: Union[int, float], default 4 449 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 450 451 Returns 452 ------- 453 A SuccessTuple indicating success. 454 455 """ 456 import platform 457 if platform.system() == 'Windows': 458 return False, "Cannot run background jobs on Windows." 459 460 ### The daemon might exist and be paused. 461 if self.status == 'paused': 462 return self.resume() 463 464 self._remove_stop_file() 465 if self.status == 'running': 466 return True, f"Daemon '{self}' is already running." 467 468 self.mkdir_if_not_exists(allow_dirty_run) 469 _write_pickle_success_tuple = self.write_pickle() 470 if not _write_pickle_success_tuple[0]: 471 return _write_pickle_success_tuple 472 473 _launch_daemon_code = ( 474 "from meerschaum.utils.daemon import Daemon, _daemons; " 475 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 476 f"_daemons['{self.daemon_id}'] = daemon; " 477 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 478 "allow_dirty_run=True)" 479 ) 480 env = dict(os.environ) 481 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 482 msg = ( 483 "Success" 484 if _launch_success_bool 485 else f"Failed to start daemon '{self.daemon_id}'." 486 ) 487 if not wait or not _launch_success_bool: 488 return _launch_success_bool, msg 489 490 timeout = self.get_timeout_seconds(timeout) 491 check_timeout_interval = self.get_check_timeout_interval_seconds() 492 493 if not timeout: 494 success = self.status == 'running' 495 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 496 if success: 497 self._capture_process_timestamp('began') 498 return success, msg 499 500 begin = time.perf_counter() 501 while (time.perf_counter() - begin) < timeout: 502 if self.status == 'running': 503 self._capture_process_timestamp('began') 504 return True, "Success" 505 time.sleep(check_timeout_interval) 506 507 return False, ( 508 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 509 + ('s' if timeout != 1 else '') + '.' 510 ) 511 512 513 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 514 """ 515 Forcibly terminate a running daemon. 516 Sends a SIGTERM signal to the process. 517 518 Parameters 519 ---------- 520 timeout: Optional[int], default 3 521 How many seconds to wait for the process to terminate. 522 523 Returns 524 ------- 525 A SuccessTuple indicating success. 526 """ 527 if self.status != 'paused': 528 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 529 if success: 530 self._write_stop_file('kill') 531 self.stdin_file.close() 532 self._remove_blocking_stdin_file() 533 return success, msg 534 535 if self.status == 'stopped': 536 self._write_stop_file('kill') 537 self.stdin_file.close() 538 self._remove_blocking_stdin_file() 539 return True, "Process has already stopped." 540 541 psutil = attempt_import('psutil') 542 process = self.process 543 try: 544 process.terminate() 545 process.kill() 546 process.wait(timeout=timeout) 547 except Exception as e: 548 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 549 550 try: 551 if process.status(): 552 return False, "Failed to stop daemon '{self}' ({process})." 553 except psutil.NoSuchProcess: 554 pass 555 556 if self.pid_path.exists(): 557 try: 558 self.pid_path.unlink() 559 except Exception: 560 pass 561 562 self._write_stop_file('kill') 563 self.stdin_file.close() 564 self._remove_blocking_stdin_file() 565 return True, "Success" 566 567 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 568 """Gracefully quit a running daemon.""" 569 if self.status == 'paused': 570 return self.kill(timeout) 571 572 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 573 if signal_success: 574 self._write_stop_file('quit') 575 self.stdin_file.close() 576 self._remove_blocking_stdin_file() 577 return signal_success, signal_msg 578 579 def pause( 580 self, 581 timeout: Union[int, float, None] = None, 582 check_timeout_interval: Union[float, int, None] = None, 583 ) -> SuccessTuple: 584 """ 585 Pause the daemon if it is running. 586 587 Parameters 588 ---------- 589 timeout: Union[float, int, None], default None 590 The maximum number of seconds to wait for a process to suspend. 591 592 check_timeout_interval: Union[float, int, None], default None 593 The number of seconds to wait between checking if the process is still running. 594 595 Returns 596 ------- 597 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 598 """ 599 self._remove_blocking_stdin_file() 600 601 if self.process is None: 602 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 603 604 if self.status == 'paused': 605 return True, f"Daemon '{self.daemon_id}' is already paused." 606 607 self._write_stop_file('pause') 608 self.stdin_file.close() 609 self._remove_blocking_stdin_file() 610 try: 611 self.process.suspend() 612 except Exception as e: 613 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 614 615 timeout = self.get_timeout_seconds(timeout) 616 check_timeout_interval = self.get_check_timeout_interval_seconds( 617 check_timeout_interval 618 ) 619 620 psutil = attempt_import('psutil') 621 622 if not timeout: 623 try: 624 success = self.process.status() == 'stopped' 625 except psutil.NoSuchProcess: 626 success = True 627 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 628 if success: 629 self._capture_process_timestamp('paused') 630 return success, msg 631 632 begin = time.perf_counter() 633 while (time.perf_counter() - begin) < timeout: 634 try: 635 if self.process.status() == 'stopped': 636 self._capture_process_timestamp('paused') 637 return True, "Success" 638 except psutil.NoSuchProcess as e: 639 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 640 time.sleep(check_timeout_interval) 641 642 return False, ( 643 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 644 + ('s' if timeout != 1 else '') + '.' 645 ) 646 647 def resume( 648 self, 649 timeout: Union[int, float, None] = None, 650 check_timeout_interval: Union[float, int, None] = None, 651 ) -> SuccessTuple: 652 """ 653 Resume the daemon if it is paused. 654 655 Parameters 656 ---------- 657 timeout: Union[float, int, None], default None 658 The maximum number of seconds to wait for a process to resume. 659 660 check_timeout_interval: Union[float, int, None], default None 661 The number of seconds to wait between checking if the process is still stopped. 662 663 Returns 664 ------- 665 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 666 """ 667 if self.status == 'running': 668 return True, f"Daemon '{self.daemon_id}' is already running." 669 670 if self.status == 'stopped': 671 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 672 673 self._remove_stop_file() 674 try: 675 if self.process is None: 676 return False, f"Cannot resume daemon '{self.daemon_id}'." 677 678 self.process.resume() 679 except Exception as e: 680 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 681 682 timeout = self.get_timeout_seconds(timeout) 683 check_timeout_interval = self.get_check_timeout_interval_seconds( 684 check_timeout_interval 685 ) 686 687 if not timeout: 688 success = self.status == 'running' 689 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 690 if success: 691 self._capture_process_timestamp('began') 692 return success, msg 693 694 begin = time.perf_counter() 695 while (time.perf_counter() - begin) < timeout: 696 if self.status == 'running': 697 self._capture_process_timestamp('began') 698 return True, "Success" 699 time.sleep(check_timeout_interval) 700 701 return False, ( 702 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 703 + ('s' if timeout != 1 else '') + '.' 704 ) 705 706 def _write_stop_file(self, action: str) -> SuccessTuple: 707 """Write the stop file timestamp and action.""" 708 if action not in ('quit', 'kill', 'pause'): 709 return False, f"Unsupported action '{action}'." 710 711 if not self.stop_path.parent.exists(): 712 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 713 714 with open(self.stop_path, 'w+', encoding='utf-8') as f: 715 json.dump( 716 { 717 'stop_time': datetime.now(timezone.utc).isoformat(), 718 'action': action, 719 }, 720 f 721 ) 722 723 return True, "Success" 724 725 def _remove_stop_file(self) -> SuccessTuple: 726 """Remove the stop file""" 727 if not self.stop_path.exists(): 728 return True, "Stop file does not exist." 729 730 try: 731 self.stop_path.unlink() 732 except Exception as e: 733 return False, f"Failed to remove stop file:\n{e}" 734 735 return True, "Success" 736 737 def _read_stop_file(self) -> Dict[str, Any]: 738 """ 739 Read the stop file if it exists. 740 """ 741 if not self.stop_path.exists(): 742 return {} 743 744 try: 745 with open(self.stop_path, 'r', encoding='utf-8') as f: 746 data = json.load(f) 747 return data 748 except Exception: 749 return {} 750 751 def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple: 752 """ 753 Remove the blocking STDIN file if it exists. 754 """ 755 try: 756 if self.blocking_stdin_file_path.exists(): 757 self.blocking_stdin_file_path.unlink() 758 except Exception as e: 759 return False, str(e) 760 761 return True, "Success" 762 763 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 764 """ 765 Handle `SIGTERM` within the `Daemon` context. 766 This method is injected into the `DaemonContext`. 767 """ 768 from meerschaum.utils.process import signal_handler 769 from meerschaum.utils.threading import request_stop, interrupt_threads 770 signal_handler(signal_number, stack_frame) 771 772 ### Tell cooperative loops (e.g. `sync pipes`) to stop, then actively unwind 773 ### any worker threads so they cannot keep the process alive as a zombie. 774 request_stop() 775 interrupt_threads(SystemExit) 776 777 timer = self.__dict__.get('_log_refresh_timer', None) 778 if timer is not None: 779 timer.cancel() 780 781 daemon_context = self.__dict__.get('_daemon_context', None) 782 if daemon_context is not None: 783 daemon_context.close() 784 785 _close_pools() 786 raise SystemExit(0) 787 788 def _send_signal( 789 self, 790 signal_to_send, 791 timeout: Union[float, int, None] = None, 792 check_timeout_interval: Union[float, int, None] = None, 793 ) -> SuccessTuple: 794 """Send a signal to the daemon process. 795 796 Parameters 797 ---------- 798 signal_to_send: 799 The signal the send to the daemon, e.g. `signals.SIGINT`. 800 801 timeout: Union[float, int, None], default None 802 The maximum number of seconds to wait for a process to terminate. 803 804 check_timeout_interval: Union[float, int, None], default None 805 The number of seconds to wait between checking if the process is still running. 806 807 Returns 808 ------- 809 A SuccessTuple indicating success. 810 """ 811 try: 812 pid = self.pid 813 if pid is None: 814 return ( 815 False, 816 f"Daemon '{self.daemon_id}' is not running, " 817 + f"cannot send signal '{signal_to_send}'." 818 ) 819 820 os.kill(pid, signal_to_send) 821 except Exception: 822 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 823 824 timeout = self.get_timeout_seconds(timeout) 825 check_timeout_interval = self.get_check_timeout_interval_seconds( 826 check_timeout_interval 827 ) 828 829 if not timeout: 830 return True, f"Successfully sent '{signal_to_send}' to daemon '{self.daemon_id}'." 831 832 begin = time.perf_counter() 833 while (time.perf_counter() - begin) < timeout: 834 if not self.status == 'running': 835 return True, "Success" 836 time.sleep(check_timeout_interval) 837 838 return False, ( 839 f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second" 840 + ('s' if timeout != 1 else '') + '.' 841 ) 842 843 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 844 """Create the Daemon's directory. 845 If `allow_dirty_run` is `False` and the directory already exists, 846 raise a `FileExistsError`. 847 """ 848 try: 849 self.path.mkdir(parents=True, exist_ok=True) 850 _already_exists = any(os.scandir(self.path)) 851 except FileExistsError: 852 _already_exists = True 853 854 if _already_exists and not allow_dirty_run: 855 error( 856 f"Daemon '{self.daemon_id}' already exists. " + 857 "To allow this daemon to run, do one of the following:\n" 858 + " - Execute `daemon.cleanup()`.\n" 859 + f" - Delete the directory '{self.path}'.\n" 860 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 861 FileExistsError, 862 ) 863 864 @property 865 def process(self) -> Union['psutil.Process', None]: 866 """ 867 Return the psutil process for the Daemon. 868 """ 869 psutil = attempt_import('psutil') 870 pid = self.pid 871 if pid is None: 872 return None 873 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 874 try: 875 self._process = psutil.Process(int(pid)) 876 process_exists = True 877 except Exception: 878 process_exists = False 879 if not process_exists: 880 _ = self.__dict__.pop('_process', None) 881 try: 882 if self.pid_path.exists(): 883 self.pid_path.unlink() 884 except Exception: 885 pass 886 return None 887 return self._process 888 889 @property 890 def status(self) -> str: 891 """ 892 Return the running status of this Daemon. 893 """ 894 if self.process is None: 895 return 'stopped' 896 897 psutil = attempt_import('psutil', lazy=False) 898 try: 899 if self.process.status() == 'stopped': 900 return 'paused' 901 if self.process.status() == 'zombie': 902 raise psutil.NoSuchProcess(self.process.pid) 903 except (psutil.NoSuchProcess, AttributeError): 904 if self.pid_path.exists(): 905 try: 906 self.pid_path.unlink() 907 except Exception: 908 pass 909 return 'stopped' 910 911 return 'running' 912 913 @classmethod 914 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 915 """ 916 Return a Daemon's path from its `daemon_id`. 917 """ 918 import meerschaum.config.paths as paths 919 return paths.DAEMON_RESOURCES_PATH / daemon_id 920 921 @property 922 def path(self) -> pathlib.Path: 923 """ 924 Return the path for this Daemon's directory. 925 """ 926 return self._get_path_from_daemon_id(self.daemon_id) 927 928 @classmethod 929 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 930 """ 931 Return the `properties.json` path for a given `daemon_id`. 932 """ 933 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 934 935 @property 936 def properties_path(self) -> pathlib.Path: 937 """ 938 Return the `propterties.json` path for this Daemon. 939 """ 940 return self._get_properties_path_from_daemon_id(self.daemon_id) 941 942 @property 943 def stop_path(self) -> pathlib.Path: 944 """ 945 Return the path for the stop file (created when manually stopped). 946 """ 947 return self.path / '.stop.json' 948 949 @property 950 def log_path(self) -> pathlib.Path: 951 """ 952 Return the log path. 953 """ 954 logs_cf = self.properties.get('logs', None) or {} 955 if 'path' not in logs_cf: 956 import meerschaum.config.paths as paths 957 return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 958 959 return pathlib.Path(logs_cf['path']) 960 961 @property 962 def stdin_file_path(self) -> pathlib.Path: 963 """ 964 Return the stdin file path. 965 """ 966 return self.path / 'input.stdin' 967 968 @property 969 def blocking_stdin_file_path(self) -> pathlib.Path: 970 """ 971 Return the stdin file path. 972 """ 973 if '_blocking_stdin_file_path' in self.__dict__: 974 return self._blocking_stdin_file_path 975 976 return self.path / 'input.stdin.block' 977 978 @property 979 def prompt_kwargs_file_path(self) -> pathlib.Path: 980 """ 981 Return the file path to the kwargs for the invoking `prompt()`. 982 """ 983 return self.path / 'prompt_kwargs.json' 984 985 @property 986 def log_offset_path(self) -> pathlib.Path: 987 """ 988 Return the log offset file path. 989 """ 990 import meerschaum.config.paths as paths 991 return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 992 993 @property 994 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 995 """ 996 Return the process lock context manager. 997 """ 998 if '_log_offset_lock' in self.__dict__: 999 return self._log_offset_lock 1000 1001 fasteners = attempt_import('fasteners') 1002 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 1003 return self._log_offset_lock 1004 1005 @property 1006 def rotating_log(self) -> RotatingFile: 1007 """ 1008 The rotating log file for the daemon's output. 1009 """ 1010 if '_rotating_log' in self.__dict__: 1011 return self._rotating_log 1012 1013 logs_cf = self.properties.get('logs', None) or {} 1014 write_timestamps = logs_cf.get('write_timestamps', None) 1015 if write_timestamps is None: 1016 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1017 1018 timestamp_format = logs_cf.get('timestamp_format', None) 1019 if timestamp_format is None: 1020 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1021 1022 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1023 if num_files_to_keep is None: 1024 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1025 1026 max_file_size = logs_cf.get('max_file_size', None) 1027 if max_file_size is None: 1028 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1029 1030 redirect_streams = logs_cf.get('redirect_streams', True) 1031 1032 self._rotating_log = RotatingFile( 1033 self.log_path, 1034 redirect_streams=redirect_streams, 1035 write_timestamps=write_timestamps, 1036 timestamp_format=timestamp_format, 1037 num_files_to_keep=num_files_to_keep, 1038 max_file_size=max_file_size, 1039 ) 1040 return self._rotating_log 1041 1042 @property 1043 def stdin_file(self): 1044 """ 1045 Return the file handler for the stdin file. 1046 """ 1047 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1048 return _stdin_file 1049 1050 self._stdin_file = StdinFile( 1051 self.stdin_file_path, 1052 lock_file_path=self.blocking_stdin_file_path, 1053 ) 1054 return self._stdin_file 1055 1056 @property 1057 def log_text(self) -> Union[str, None]: 1058 """ 1059 Read the log files and return their contents. 1060 Returns `None` if the log file does not exist. 1061 """ 1062 logs_cf = self.properties.get('logs', None) or {} 1063 write_timestamps = logs_cf.get('write_timestamps', None) 1064 if write_timestamps is None: 1065 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1066 1067 timestamp_format = logs_cf.get('timestamp_format', None) 1068 if timestamp_format is None: 1069 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1070 1071 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1072 if num_files_to_keep is None: 1073 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1074 1075 max_file_size = logs_cf.get('max_file_size', None) 1076 if max_file_size is None: 1077 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1078 1079 new_rotating_log = RotatingFile( 1080 self.rotating_log.file_path, 1081 num_files_to_keep=num_files_to_keep, 1082 max_file_size=max_file_size, 1083 write_timestamps=write_timestamps, 1084 timestamp_format=timestamp_format, 1085 ) 1086 return new_rotating_log.read() 1087 1088 def readlines(self) -> List[str]: 1089 """ 1090 Read the next log lines, persisting the cursor for later use. 1091 Note this will alter the cursor of `self.rotating_log`. 1092 """ 1093 self.rotating_log._cursor = self._read_log_offset() 1094 lines = self.rotating_log.readlines() 1095 self._write_log_offset() 1096 return lines 1097 1098 def _read_log_offset(self) -> Tuple[int, int]: 1099 """ 1100 Return the current log offset cursor. 1101 1102 Returns 1103 ------- 1104 A tuple of the form (`subfile_index`, `position`). 1105 """ 1106 if not self.log_offset_path.exists(): 1107 return 0, 0 1108 1109 try: 1110 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 1111 cursor_text = f.read() 1112 cursor_parts = cursor_text.split(' ') 1113 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 1114 return subfile_index, subfile_position 1115 except Exception as e: 1116 warn(f"Failed to read cursor:\n{e}") 1117 return 0, 0 1118 1119 def _write_log_offset(self) -> None: 1120 """ 1121 Write the current log offset file. 1122 """ 1123 with self.log_offset_lock: 1124 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 1125 subfile_index = self.rotating_log._cursor[0] 1126 subfile_position = self.rotating_log._cursor[1] 1127 f.write(f"{subfile_index} {subfile_position}") 1128 1129 @property 1130 def pid(self) -> Union[int, None]: 1131 """ 1132 Read the PID file and return its contents. 1133 Returns `None` if the PID file does not exist. 1134 """ 1135 if not self.pid_path.exists(): 1136 return None 1137 try: 1138 with open(self.pid_path, 'r', encoding='utf-8') as f: 1139 text = f.read() 1140 if len(text) == 0: 1141 return None 1142 pid = int(text.rstrip()) 1143 except Exception as e: 1144 warn(e) 1145 text = None 1146 pid = None 1147 return pid 1148 1149 @property 1150 def pid_path(self) -> pathlib.Path: 1151 """ 1152 Return the path to a file containing the PID for this Daemon. 1153 """ 1154 return self.path / 'process.pid' 1155 1156 @property 1157 def pid_lock(self) -> 'fasteners.InterProcessLock': 1158 """ 1159 Return the process lock context manager. 1160 """ 1161 if '_pid_lock' in self.__dict__: 1162 return self._pid_lock 1163 1164 fasteners = attempt_import('fasteners') 1165 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1166 return self._pid_lock 1167 1168 @property 1169 def pickle_path(self) -> pathlib.Path: 1170 """ 1171 Return the path for the pickle file. 1172 """ 1173 return self.path / 'pickle.pkl' 1174 1175 def read_properties(self) -> Optional[Dict[str, Any]]: 1176 """Read the properties JSON file and return the dictionary.""" 1177 if not self.properties_path.exists(): 1178 return None 1179 try: 1180 with open(self.properties_path, 'r', encoding='utf-8') as file: 1181 properties = json.load(file) 1182 except Exception: 1183 properties = {} 1184 1185 return properties or {} 1186 1187 def read_pickle(self) -> Daemon: 1188 """Read a Daemon's pickle file and return the `Daemon`.""" 1189 import pickle 1190 import traceback 1191 if not self.pickle_path.exists(): 1192 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1193 1194 if self.pickle_path.stat().st_size == 0: 1195 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1196 1197 try: 1198 with open(self.pickle_path, 'rb') as pickle_file: 1199 daemon = pickle.load(pickle_file) 1200 success, msg = True, 'Success' 1201 except Exception as e: 1202 success, msg = False, str(e) 1203 daemon = None 1204 traceback.print_exception(type(e), e, e.__traceback__) 1205 if not success: 1206 error(msg) 1207 return daemon 1208 1209 @property 1210 def properties(self) -> Dict[str, Any]: 1211 """ 1212 Return the contents of the properties JSON file. 1213 """ 1214 try: 1215 _file_properties = self.read_properties() or {} 1216 except Exception: 1217 traceback.print_exc() 1218 _file_properties = {} 1219 1220 if not self._properties: 1221 self._properties = _file_properties 1222 1223 if self._properties is None: 1224 self._properties = {} 1225 1226 if ( 1227 self._properties.get('result', None) is None 1228 and _file_properties.get('result', None) is not None 1229 ): 1230 _ = self._properties.pop('result', None) 1231 1232 if _file_properties is not None: 1233 self._properties = apply_patch_to_config( 1234 _file_properties, 1235 self._properties, 1236 ) 1237 1238 return self._properties 1239 1240 @property 1241 def hidden(self) -> bool: 1242 """ 1243 Return a bool indicating whether this Daemon should be displayed. 1244 """ 1245 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1246 1247 def write_properties(self) -> SuccessTuple: 1248 """Write the properties dictionary to the properties JSON file 1249 (only if self.properties exists). 1250 """ 1251 from meerschaum.utils.misc import generate_password 1252 success, msg = ( 1253 False, 1254 f"No properties to write for daemon '{self.daemon_id}'." 1255 ) 1256 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1257 props = self.properties 1258 if props is not None: 1259 try: 1260 self.path.mkdir(parents=True, exist_ok=True) 1261 if self.properties_path.exists(): 1262 self.properties_path.rename(backup_path) 1263 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1264 json.dump(props, properties_file) 1265 success, msg = True, 'Success' 1266 except Exception as e: 1267 success, msg = False, str(e) 1268 1269 try: 1270 if backup_path.exists(): 1271 if not success: 1272 backup_path.rename(self.properties_path) 1273 else: 1274 backup_path.unlink() 1275 except Exception as e: 1276 success, msg = False, str(e) 1277 1278 return success, msg 1279 1280 def write_pickle(self) -> SuccessTuple: 1281 """Write the pickle file for the daemon.""" 1282 import pickle 1283 import traceback 1284 from meerschaum.utils.misc import generate_password 1285 1286 if not self.pickle: 1287 return True, "Success" 1288 1289 from meerschaum._internal.entry import _shells 1290 if _shells: 1291 from meerschaum._internal.shell.Shell import revert_input 1292 revert_input() 1293 1294 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1295 try: 1296 self.path.mkdir(parents=True, exist_ok=True) 1297 if self.pickle_path.exists(): 1298 self.pickle_path.rename(backup_path) 1299 with open(self.pickle_path, 'wb+') as pickle_file: 1300 pickle.dump(self, pickle_file) 1301 success, msg = True, "Success" 1302 except Exception as e: 1303 success, msg = False, str(e) 1304 traceback.print_exception(type(e), e, e.__traceback__) 1305 try: 1306 if backup_path.exists(): 1307 if not success: 1308 backup_path.rename(self.pickle_path) 1309 else: 1310 backup_path.unlink() 1311 except Exception as e: 1312 success, msg = False, str(e) 1313 return success, msg 1314 1315 1316 def _setup( 1317 self, 1318 allow_dirty_run: bool = False, 1319 ) -> None: 1320 """ 1321 Update properties before starting the Daemon. 1322 """ 1323 if self.properties is None: 1324 self._properties = {} 1325 1326 self._properties.update({ 1327 'target': { 1328 'name': self.target.__name__, 1329 'module': self.target.__module__, 1330 'args': self.target_args, 1331 'kw': self.target_kw, 1332 }, 1333 }) 1334 self.mkdir_if_not_exists(allow_dirty_run) 1335 _write_properties_success_tuple = self.write_properties() 1336 if not _write_properties_success_tuple[0]: 1337 error(_write_properties_success_tuple[1]) 1338 1339 _write_pickle_success_tuple = self.write_pickle() 1340 if not _write_pickle_success_tuple[0]: 1341 error(_write_pickle_success_tuple[1]) 1342 1343 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1344 """ 1345 Remove a daemon's directory after execution. 1346 1347 Parameters 1348 ---------- 1349 keep_logs: bool, default False 1350 If `True`, skip deleting the daemon's log files. 1351 1352 Returns 1353 ------- 1354 A `SuccessTuple` indicating success. 1355 """ 1356 if self.path.exists(): 1357 try: 1358 shutil.rmtree(self.path) 1359 except Exception as e: 1360 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1361 warn(msg) 1362 return False, msg 1363 if not keep_logs: 1364 self.rotating_log.delete() 1365 try: 1366 if self.log_offset_path.exists(): 1367 self.log_offset_path.unlink() 1368 except Exception as e: 1369 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1370 warn(msg) 1371 return False, msg 1372 return True, "Success" 1373 1374 1375 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1376 """ 1377 Return the timeout value to use. Use `--timeout-seconds` if provided, 1378 else the configured default (8). 1379 """ 1380 if isinstance(timeout, (int, float)): 1381 return timeout 1382 return get_config('jobs', 'timeout_seconds') 1383 1384 1385 def get_check_timeout_interval_seconds( 1386 self, 1387 check_timeout_interval: Union[int, float, None] = None, 1388 ) -> Union[int, float]: 1389 """ 1390 Return the interval value to check the status of timeouts. 1391 """ 1392 if isinstance(check_timeout_interval, (int, float)): 1393 return check_timeout_interval 1394 return get_config('jobs', 'check_timeout_interval_seconds') 1395 1396 @property 1397 def target_args(self) -> Union[Tuple[Any], None]: 1398 """ 1399 Return the positional arguments to pass to the target function. 1400 """ 1401 target_args = ( 1402 self.__dict__.get('_target_args', None) 1403 or self.properties.get('target', {}).get('args', None) 1404 ) 1405 if target_args is None: 1406 return tuple([]) 1407 1408 return tuple(target_args) 1409 1410 @property 1411 def target_kw(self) -> Union[Dict[str, Any], None]: 1412 """ 1413 Return the keyword arguments to pass to the target function. 1414 """ 1415 target_kw = ( 1416 self.__dict__.get('_target_kw', None) 1417 or self.properties.get('target', {}).get('kw', None) 1418 ) 1419 if target_kw is None: 1420 return {} 1421 1422 return {key: val for key, val in target_kw.items()} 1423 1424 @staticmethod 1425 def _get_target_reference(target) -> Union[Dict[str, str], None]: 1426 """ 1427 If `target` is an importable, top-level function (e.g. `entry`), return a 1428 ``{'module': ..., 'qualname': ...}`` reference so it can be re-imported in the 1429 daemon process instead of pickled by value. 1430 1431 Pickling such a target by value serializes its entire global graph, which can 1432 reach unpicklable live state (e.g. a connector caching an `_asyncio.Task`) and 1433 raise `TypeError: cannot pickle '_asyncio.Task' object`. dill also falls back to 1434 by-value pickling whenever it cannot match the function by identity — which 1435 happens when two copies of a package are importable (a stale install shadowing 1436 a venv), so `byref=True` alone is not enough. Returns `None` for closures, 1437 lambdas, and anything not importable, which fall back to dill. 1438 """ 1439 module = getattr(target, '__module__', None) 1440 qualname = getattr(target, '__qualname__', None) 1441 if not module or not qualname: 1442 return None 1443 if '<locals>' in qualname or '<lambda>' in qualname: 1444 return None 1445 return {'module': module, 'qualname': qualname} 1446 1447 @staticmethod 1448 def _load_target_reference(target_ref: Dict[str, str]): 1449 """ 1450 Re-import a target from a `{'module': ..., 'qualname': ...}` reference. 1451 """ 1452 import importlib 1453 obj = importlib.import_module(target_ref['module']) 1454 for part in target_ref['qualname'].split('.'): 1455 obj = getattr(obj, part) 1456 return obj 1457 1458 def __getstate__(self): 1459 """ 1460 Pickle this Daemon. 1461 """ 1462 dill = attempt_import('dill') 1463 state = { 1464 'target_args': self.target_args, 1465 'target_kw': self.target_kw, 1466 'daemon_id': self.daemon_id, 1467 'label': self.label, 1468 'properties': self.properties, 1469 } 1470 target_ref = self._get_target_reference(self.target) 1471 if target_ref is not None: 1472 ### Store by reference (re-imported in the daemon process), not by value. 1473 state['target'] = None 1474 state['target_ref'] = target_ref 1475 else: 1476 state['target'] = dill.dumps(self.target, byref=True) 1477 return state 1478 1479 def __setstate__(self, _state: Dict[str, Any]): 1480 """ 1481 Restore this Daemon from a pickled state. 1482 If the properties file exists, skip the old pickled version. 1483 """ 1484 dill = attempt_import('dill') 1485 target_ref = _state.pop('target_ref', None) 1486 if target_ref is not None and _state.get('target', None) is None: 1487 _state['target'] = self._load_target_reference(target_ref) 1488 else: 1489 _state['target'] = dill.loads(_state['target']) 1490 self._pickle = True 1491 daemon_id = _state.get('daemon_id', None) 1492 if not daemon_id: 1493 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1494 1495 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1496 ignore_properties = properties_path.exists() 1497 if ignore_properties: 1498 _state = { 1499 key: val 1500 for key, val in _state.items() 1501 if key != 'properties' 1502 } 1503 self.__init__(**_state) 1504 1505 1506 def __repr__(self): 1507 return str(self) 1508 1509 def __str__(self): 1510 return self.daemon_id 1511 1512 def __eq__(self, other): 1513 if not isinstance(other, Daemon): 1514 return False 1515 return self.daemon_id == other.daemon_id 1516 1517 def __hash__(self): 1518 return hash(self.daemon_id)
Daemonize Python functions into background processes.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 |
>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |
>>> mrsm.pprint(daemon.properties)
{
'label': 'print',
'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
'result': None,
'process': {'ended': '2024-07-29T18:03:33.752806'}
}
136 def __init__( 137 self, 138 target: Optional[Callable[[Any], Any]] = None, 139 target_args: Union[List[Any], Tuple[Any], None] = None, 140 target_kw: Optional[Dict[str, Any]] = None, 141 env: Optional[Dict[str, str]] = None, 142 daemon_id: Optional[str] = None, 143 label: Optional[str] = None, 144 properties: Optional[Dict[str, Any]] = None, 145 pickle: bool = True, 146 ): 147 """ 148 Parameters 149 ---------- 150 target: Optional[Callable[[Any], Any]], default None, 151 The function to execute in a child process. 152 153 target_args: Union[List[Any], Tuple[Any], None], default None 154 Positional arguments to pass to the target function. 155 156 target_kw: Optional[Dict[str, Any]], default None 157 Keyword arguments to pass to the target function. 158 159 env: Optional[Dict[str, str]], default None 160 If provided, set these environment variables in the daemon process. 161 162 daemon_id: Optional[str], default None 163 Build a `Daemon` from an existing `daemon_id`. 164 If `daemon_id` is provided, other arguments are ignored and are derived 165 from the existing pickled `Daemon`. 166 167 label: Optional[str], default None 168 Label string to help identifiy a daemon. 169 If `None`, use the function name instead. 170 171 properties: Optional[Dict[str, Any]], default None 172 Override reading from the properties JSON by providing an existing dictionary. 173 """ 174 _pickle = self.__dict__.get('_pickle', False) 175 if daemon_id is not None: 176 self.daemon_id = daemon_id 177 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 178 179 if not self.properties_path.exists(): 180 raise Exception( 181 f"Daemon '{self.daemon_id}' does not exist. " 182 + "Pass a target to create a new Daemon." 183 ) 184 185 try: 186 new_daemon = self.from_properties_file(daemon_id) 187 except Exception: 188 new_daemon = None 189 190 if new_daemon is not None: 191 new_daemon.write_pickle() 192 target = new_daemon.target 193 target_args = new_daemon.target_args 194 target_kw = new_daemon.target_kw 195 label = new_daemon.label 196 self._properties = new_daemon.properties 197 else: 198 try: 199 self.properties_path.unlink() 200 except Exception: 201 pass 202 203 raise Exception( 204 f"Could not recover daemon '{self.daemon_id}' " 205 + "from its properties file." 206 ) 207 208 if 'target' not in self.__dict__: 209 if target is None: 210 error("Cannot create a Daemon without a target.") 211 self.target = target 212 213 self.pickle = pickle 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 elif label is not None: 229 self.label = label 230 231 if 'daemon_id' not in self.__dict__: 232 self.daemon_id = get_new_daemon_name() 233 if '_properties' not in self.__dict__: 234 self._properties = properties 235 elif properties: 236 if self._properties is None: 237 self._properties = {} 238 self._properties.update(properties) 239 if self._properties is None: 240 self._properties = {} 241 242 self._properties.update({'label': self.label}) 243 if env: 244 self._properties.update({'env': env}) 245 246 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 247 _ = self.process
Parameters
- target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
- target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
- target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
- daemon_id (Optional[str], default None):
Build a
Daemonfrom an existingdaemon_id. Ifdaemon_idis provided, other arguments are ignored and are derived from the existing pickledDaemon. - label (Optional[str], default None):
Label string to help identifiy a daemon.
If
None, use the function name instead. - properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
89 @classmethod 90 def from_properties_file(cls, daemon_id: str) -> Daemon: 91 """ 92 Return a Daemon from a properties dictionary. 93 """ 94 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 95 if not properties_path.exists(): 96 raise OSError(f"Properties file '{properties_path}' does not exist.") 97 98 try: 99 with open(properties_path, 'r', encoding='utf-8') as f: 100 properties = json.load(f) 101 except Exception: 102 properties = {} 103 104 if not properties: 105 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 106 107 daemon_id = properties_path.parent.name 108 target_cf = properties.get('target', {}) 109 target_module_name = target_cf.get('module', None) 110 target_function_name = target_cf.get('name', None) 111 target_args = target_cf.get('args', None) 112 target_kw = target_cf.get('kw', None) 113 label = properties.get('label', None) 114 115 if None in [ 116 target_module_name, 117 target_function_name, 118 target_args, 119 target_kw, 120 ]: 121 raise ValueError("Missing target function information.") 122 123 target_module = importlib.import_module(target_module_name) 124 target_function = getattr(target_module, target_function_name) 125 126 return Daemon( 127 daemon_id=daemon_id, 128 target=target_function, 129 target_args=target_args, 130 target_kw=target_kw, 131 properties=properties, 132 label=label, 133 )
Return a Daemon from a properties dictionary.
425 def run( 426 self, 427 keep_daemon_output: bool = True, 428 allow_dirty_run: bool = False, 429 wait: bool = False, 430 timeout: Union[int, float] = 4, 431 debug: bool = False, 432 ) -> SuccessTuple: 433 """Run the daemon as a child process and continue executing the parent. 434 435 Parameters 436 ---------- 437 keep_daemon_output: bool, default True 438 If `False`, delete the daemon's output directory upon exiting. 439 440 allow_dirty_run: bool, default False 441 If `True`, run the daemon, even if the `daemon_id` directory exists. 442 This option is dangerous because if the same `daemon_id` runs concurrently, 443 the last to finish will overwrite the output of the first. 444 445 wait: bool, default True 446 If `True`, block until `Daemon.status` is running (or the timeout expires). 447 448 timeout: Union[int, float], default 4 449 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 450 451 Returns 452 ------- 453 A SuccessTuple indicating success. 454 455 """ 456 import platform 457 if platform.system() == 'Windows': 458 return False, "Cannot run background jobs on Windows." 459 460 ### The daemon might exist and be paused. 461 if self.status == 'paused': 462 return self.resume() 463 464 self._remove_stop_file() 465 if self.status == 'running': 466 return True, f"Daemon '{self}' is already running." 467 468 self.mkdir_if_not_exists(allow_dirty_run) 469 _write_pickle_success_tuple = self.write_pickle() 470 if not _write_pickle_success_tuple[0]: 471 return _write_pickle_success_tuple 472 473 _launch_daemon_code = ( 474 "from meerschaum.utils.daemon import Daemon, _daemons; " 475 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 476 f"_daemons['{self.daemon_id}'] = daemon; " 477 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 478 "allow_dirty_run=True)" 479 ) 480 env = dict(os.environ) 481 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 482 msg = ( 483 "Success" 484 if _launch_success_bool 485 else f"Failed to start daemon '{self.daemon_id}'." 486 ) 487 if not wait or not _launch_success_bool: 488 return _launch_success_bool, msg 489 490 timeout = self.get_timeout_seconds(timeout) 491 check_timeout_interval = self.get_check_timeout_interval_seconds() 492 493 if not timeout: 494 success = self.status == 'running' 495 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 496 if success: 497 self._capture_process_timestamp('began') 498 return success, msg 499 500 begin = time.perf_counter() 501 while (time.perf_counter() - begin) < timeout: 502 if self.status == 'running': 503 self._capture_process_timestamp('began') 504 return True, "Success" 505 time.sleep(check_timeout_interval) 506 507 return False, ( 508 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 509 + ('s' if timeout != 1 else '') + '.' 510 )
Run the daemon as a child process and continue executing the parent.
Parameters
- keep_daemon_output (bool, default True):
If
False, delete the daemon's output directory upon exiting. - allow_dirty_run (bool, default False):
If
True, run the daemon, even if thedaemon_iddirectory exists. This option is dangerous because if the samedaemon_idruns concurrently, the last to finish will overwrite the output of the first. - wait (bool, default True):
If
True, block untilDaemon.statusis running (or the timeout expires). - timeout (Union[int, float], default 4):
If
waitisTrue, block for up totimeoutseconds before returning a failure.
Returns
- A SuccessTuple indicating success.
513 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 514 """ 515 Forcibly terminate a running daemon. 516 Sends a SIGTERM signal to the process. 517 518 Parameters 519 ---------- 520 timeout: Optional[int], default 3 521 How many seconds to wait for the process to terminate. 522 523 Returns 524 ------- 525 A SuccessTuple indicating success. 526 """ 527 if self.status != 'paused': 528 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 529 if success: 530 self._write_stop_file('kill') 531 self.stdin_file.close() 532 self._remove_blocking_stdin_file() 533 return success, msg 534 535 if self.status == 'stopped': 536 self._write_stop_file('kill') 537 self.stdin_file.close() 538 self._remove_blocking_stdin_file() 539 return True, "Process has already stopped." 540 541 psutil = attempt_import('psutil') 542 process = self.process 543 try: 544 process.terminate() 545 process.kill() 546 process.wait(timeout=timeout) 547 except Exception as e: 548 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 549 550 try: 551 if process.status(): 552 return False, "Failed to stop daemon '{self}' ({process})." 553 except psutil.NoSuchProcess: 554 pass 555 556 if self.pid_path.exists(): 557 try: 558 self.pid_path.unlink() 559 except Exception: 560 pass 561 562 self._write_stop_file('kill') 563 self.stdin_file.close() 564 self._remove_blocking_stdin_file() 565 return True, "Success"
Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.
Parameters
- timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
- A SuccessTuple indicating success.
567 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 568 """Gracefully quit a running daemon.""" 569 if self.status == 'paused': 570 return self.kill(timeout) 571 572 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 573 if signal_success: 574 self._write_stop_file('quit') 575 self.stdin_file.close() 576 self._remove_blocking_stdin_file() 577 return signal_success, signal_msg
Gracefully quit a running daemon.
579 def pause( 580 self, 581 timeout: Union[int, float, None] = None, 582 check_timeout_interval: Union[float, int, None] = None, 583 ) -> SuccessTuple: 584 """ 585 Pause the daemon if it is running. 586 587 Parameters 588 ---------- 589 timeout: Union[float, int, None], default None 590 The maximum number of seconds to wait for a process to suspend. 591 592 check_timeout_interval: Union[float, int, None], default None 593 The number of seconds to wait between checking if the process is still running. 594 595 Returns 596 ------- 597 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 598 """ 599 self._remove_blocking_stdin_file() 600 601 if self.process is None: 602 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 603 604 if self.status == 'paused': 605 return True, f"Daemon '{self.daemon_id}' is already paused." 606 607 self._write_stop_file('pause') 608 self.stdin_file.close() 609 self._remove_blocking_stdin_file() 610 try: 611 self.process.suspend() 612 except Exception as e: 613 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 614 615 timeout = self.get_timeout_seconds(timeout) 616 check_timeout_interval = self.get_check_timeout_interval_seconds( 617 check_timeout_interval 618 ) 619 620 psutil = attempt_import('psutil') 621 622 if not timeout: 623 try: 624 success = self.process.status() == 'stopped' 625 except psutil.NoSuchProcess: 626 success = True 627 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 628 if success: 629 self._capture_process_timestamp('paused') 630 return success, msg 631 632 begin = time.perf_counter() 633 while (time.perf_counter() - begin) < timeout: 634 try: 635 if self.process.status() == 'stopped': 636 self._capture_process_timestamp('paused') 637 return True, "Success" 638 except psutil.NoSuchProcess as e: 639 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 640 time.sleep(check_timeout_interval) 641 642 return False, ( 643 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 644 + ('s' if timeout != 1 else '') + '.' 645 )
Pause the daemon if it is running.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
- A
SuccessTupleindicating whether theDaemonprocess was successfully suspended.
647 def resume( 648 self, 649 timeout: Union[int, float, None] = None, 650 check_timeout_interval: Union[float, int, None] = None, 651 ) -> SuccessTuple: 652 """ 653 Resume the daemon if it is paused. 654 655 Parameters 656 ---------- 657 timeout: Union[float, int, None], default None 658 The maximum number of seconds to wait for a process to resume. 659 660 check_timeout_interval: Union[float, int, None], default None 661 The number of seconds to wait between checking if the process is still stopped. 662 663 Returns 664 ------- 665 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 666 """ 667 if self.status == 'running': 668 return True, f"Daemon '{self.daemon_id}' is already running." 669 670 if self.status == 'stopped': 671 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 672 673 self._remove_stop_file() 674 try: 675 if self.process is None: 676 return False, f"Cannot resume daemon '{self.daemon_id}'." 677 678 self.process.resume() 679 except Exception as e: 680 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 681 682 timeout = self.get_timeout_seconds(timeout) 683 check_timeout_interval = self.get_check_timeout_interval_seconds( 684 check_timeout_interval 685 ) 686 687 if not timeout: 688 success = self.status == 'running' 689 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 690 if success: 691 self._capture_process_timestamp('began') 692 return success, msg 693 694 begin = time.perf_counter() 695 while (time.perf_counter() - begin) < timeout: 696 if self.status == 'running': 697 self._capture_process_timestamp('began') 698 return True, "Success" 699 time.sleep(check_timeout_interval) 700 701 return False, ( 702 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 703 + ('s' if timeout != 1 else '') + '.' 704 )
Resume the daemon if it is paused.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
- A
SuccessTupleindicating whether theDaemonprocess was successfully resumed.
843 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 844 """Create the Daemon's directory. 845 If `allow_dirty_run` is `False` and the directory already exists, 846 raise a `FileExistsError`. 847 """ 848 try: 849 self.path.mkdir(parents=True, exist_ok=True) 850 _already_exists = any(os.scandir(self.path)) 851 except FileExistsError: 852 _already_exists = True 853 854 if _already_exists and not allow_dirty_run: 855 error( 856 f"Daemon '{self.daemon_id}' already exists. " + 857 "To allow this daemon to run, do one of the following:\n" 858 + " - Execute `daemon.cleanup()`.\n" 859 + f" - Delete the directory '{self.path}'.\n" 860 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 861 FileExistsError, 862 )
Create the Daemon's directory.
If allow_dirty_run is False and the directory already exists,
raise a FileExistsError.
864 @property 865 def process(self) -> Union['psutil.Process', None]: 866 """ 867 Return the psutil process for the Daemon. 868 """ 869 psutil = attempt_import('psutil') 870 pid = self.pid 871 if pid is None: 872 return None 873 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 874 try: 875 self._process = psutil.Process(int(pid)) 876 process_exists = True 877 except Exception: 878 process_exists = False 879 if not process_exists: 880 _ = self.__dict__.pop('_process', None) 881 try: 882 if self.pid_path.exists(): 883 self.pid_path.unlink() 884 except Exception: 885 pass 886 return None 887 return self._process
Return the psutil process for the Daemon.
889 @property 890 def status(self) -> str: 891 """ 892 Return the running status of this Daemon. 893 """ 894 if self.process is None: 895 return 'stopped' 896 897 psutil = attempt_import('psutil', lazy=False) 898 try: 899 if self.process.status() == 'stopped': 900 return 'paused' 901 if self.process.status() == 'zombie': 902 raise psutil.NoSuchProcess(self.process.pid) 903 except (psutil.NoSuchProcess, AttributeError): 904 if self.pid_path.exists(): 905 try: 906 self.pid_path.unlink() 907 except Exception: 908 pass 909 return 'stopped' 910 911 return 'running'
Return the running status of this Daemon.
921 @property 922 def path(self) -> pathlib.Path: 923 """ 924 Return the path for this Daemon's directory. 925 """ 926 return self._get_path_from_daemon_id(self.daemon_id)
Return the path for this Daemon's directory.
935 @property 936 def properties_path(self) -> pathlib.Path: 937 """ 938 Return the `propterties.json` path for this Daemon. 939 """ 940 return self._get_properties_path_from_daemon_id(self.daemon_id)
Return the propterties.json path for this Daemon.
942 @property 943 def stop_path(self) -> pathlib.Path: 944 """ 945 Return the path for the stop file (created when manually stopped). 946 """ 947 return self.path / '.stop.json'
Return the path for the stop file (created when manually stopped).
949 @property 950 def log_path(self) -> pathlib.Path: 951 """ 952 Return the log path. 953 """ 954 logs_cf = self.properties.get('logs', None) or {} 955 if 'path' not in logs_cf: 956 import meerschaum.config.paths as paths 957 return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 958 959 return pathlib.Path(logs_cf['path'])
Return the log path.
961 @property 962 def stdin_file_path(self) -> pathlib.Path: 963 """ 964 Return the stdin file path. 965 """ 966 return self.path / 'input.stdin'
Return the stdin file path.
968 @property 969 def blocking_stdin_file_path(self) -> pathlib.Path: 970 """ 971 Return the stdin file path. 972 """ 973 if '_blocking_stdin_file_path' in self.__dict__: 974 return self._blocking_stdin_file_path 975 976 return self.path / 'input.stdin.block'
Return the stdin file path.
978 @property 979 def prompt_kwargs_file_path(self) -> pathlib.Path: 980 """ 981 Return the file path to the kwargs for the invoking `prompt()`. 982 """ 983 return self.path / 'prompt_kwargs.json'
Return the file path to the kwargs for the invoking prompt().
985 @property 986 def log_offset_path(self) -> pathlib.Path: 987 """ 988 Return the log offset file path. 989 """ 990 import meerschaum.config.paths as paths 991 return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
Return the log offset file path.
993 @property 994 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 995 """ 996 Return the process lock context manager. 997 """ 998 if '_log_offset_lock' in self.__dict__: 999 return self._log_offset_lock 1000 1001 fasteners = attempt_import('fasteners') 1002 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 1003 return self._log_offset_lock
Return the process lock context manager.
1005 @property 1006 def rotating_log(self) -> RotatingFile: 1007 """ 1008 The rotating log file for the daemon's output. 1009 """ 1010 if '_rotating_log' in self.__dict__: 1011 return self._rotating_log 1012 1013 logs_cf = self.properties.get('logs', None) or {} 1014 write_timestamps = logs_cf.get('write_timestamps', None) 1015 if write_timestamps is None: 1016 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1017 1018 timestamp_format = logs_cf.get('timestamp_format', None) 1019 if timestamp_format is None: 1020 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1021 1022 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1023 if num_files_to_keep is None: 1024 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1025 1026 max_file_size = logs_cf.get('max_file_size', None) 1027 if max_file_size is None: 1028 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1029 1030 redirect_streams = logs_cf.get('redirect_streams', True) 1031 1032 self._rotating_log = RotatingFile( 1033 self.log_path, 1034 redirect_streams=redirect_streams, 1035 write_timestamps=write_timestamps, 1036 timestamp_format=timestamp_format, 1037 num_files_to_keep=num_files_to_keep, 1038 max_file_size=max_file_size, 1039 ) 1040 return self._rotating_log
The rotating log file for the daemon's output.
1042 @property 1043 def stdin_file(self): 1044 """ 1045 Return the file handler for the stdin file. 1046 """ 1047 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1048 return _stdin_file 1049 1050 self._stdin_file = StdinFile( 1051 self.stdin_file_path, 1052 lock_file_path=self.blocking_stdin_file_path, 1053 ) 1054 return self._stdin_file
Return the file handler for the stdin file.
1056 @property 1057 def log_text(self) -> Union[str, None]: 1058 """ 1059 Read the log files and return their contents. 1060 Returns `None` if the log file does not exist. 1061 """ 1062 logs_cf = self.properties.get('logs', None) or {} 1063 write_timestamps = logs_cf.get('write_timestamps', None) 1064 if write_timestamps is None: 1065 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1066 1067 timestamp_format = logs_cf.get('timestamp_format', None) 1068 if timestamp_format is None: 1069 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1070 1071 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1072 if num_files_to_keep is None: 1073 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1074 1075 max_file_size = logs_cf.get('max_file_size', None) 1076 if max_file_size is None: 1077 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1078 1079 new_rotating_log = RotatingFile( 1080 self.rotating_log.file_path, 1081 num_files_to_keep=num_files_to_keep, 1082 max_file_size=max_file_size, 1083 write_timestamps=write_timestamps, 1084 timestamp_format=timestamp_format, 1085 ) 1086 return new_rotating_log.read()
Read the log files and return their contents.
Returns None if the log file does not exist.
1088 def readlines(self) -> List[str]: 1089 """ 1090 Read the next log lines, persisting the cursor for later use. 1091 Note this will alter the cursor of `self.rotating_log`. 1092 """ 1093 self.rotating_log._cursor = self._read_log_offset() 1094 lines = self.rotating_log.readlines() 1095 self._write_log_offset() 1096 return lines
Read the next log lines, persisting the cursor for later use.
Note this will alter the cursor of self.rotating_log.
1129 @property 1130 def pid(self) -> Union[int, None]: 1131 """ 1132 Read the PID file and return its contents. 1133 Returns `None` if the PID file does not exist. 1134 """ 1135 if not self.pid_path.exists(): 1136 return None 1137 try: 1138 with open(self.pid_path, 'r', encoding='utf-8') as f: 1139 text = f.read() 1140 if len(text) == 0: 1141 return None 1142 pid = int(text.rstrip()) 1143 except Exception as e: 1144 warn(e) 1145 text = None 1146 pid = None 1147 return pid
Read the PID file and return its contents.
Returns None if the PID file does not exist.
1149 @property 1150 def pid_path(self) -> pathlib.Path: 1151 """ 1152 Return the path to a file containing the PID for this Daemon. 1153 """ 1154 return self.path / 'process.pid'
Return the path to a file containing the PID for this Daemon.
1156 @property 1157 def pid_lock(self) -> 'fasteners.InterProcessLock': 1158 """ 1159 Return the process lock context manager. 1160 """ 1161 if '_pid_lock' in self.__dict__: 1162 return self._pid_lock 1163 1164 fasteners = attempt_import('fasteners') 1165 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1166 return self._pid_lock
Return the process lock context manager.
1168 @property 1169 def pickle_path(self) -> pathlib.Path: 1170 """ 1171 Return the path for the pickle file. 1172 """ 1173 return self.path / 'pickle.pkl'
Return the path for the pickle file.
1175 def read_properties(self) -> Optional[Dict[str, Any]]: 1176 """Read the properties JSON file and return the dictionary.""" 1177 if not self.properties_path.exists(): 1178 return None 1179 try: 1180 with open(self.properties_path, 'r', encoding='utf-8') as file: 1181 properties = json.load(file) 1182 except Exception: 1183 properties = {} 1184 1185 return properties or {}
Read the properties JSON file and return the dictionary.
1187 def read_pickle(self) -> Daemon: 1188 """Read a Daemon's pickle file and return the `Daemon`.""" 1189 import pickle 1190 import traceback 1191 if not self.pickle_path.exists(): 1192 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1193 1194 if self.pickle_path.stat().st_size == 0: 1195 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1196 1197 try: 1198 with open(self.pickle_path, 'rb') as pickle_file: 1199 daemon = pickle.load(pickle_file) 1200 success, msg = True, 'Success' 1201 except Exception as e: 1202 success, msg = False, str(e) 1203 daemon = None 1204 traceback.print_exception(type(e), e, e.__traceback__) 1205 if not success: 1206 error(msg) 1207 return daemon
Read a Daemon's pickle file and return the Daemon.
1209 @property 1210 def properties(self) -> Dict[str, Any]: 1211 """ 1212 Return the contents of the properties JSON file. 1213 """ 1214 try: 1215 _file_properties = self.read_properties() or {} 1216 except Exception: 1217 traceback.print_exc() 1218 _file_properties = {} 1219 1220 if not self._properties: 1221 self._properties = _file_properties 1222 1223 if self._properties is None: 1224 self._properties = {} 1225 1226 if ( 1227 self._properties.get('result', None) is None 1228 and _file_properties.get('result', None) is not None 1229 ): 1230 _ = self._properties.pop('result', None) 1231 1232 if _file_properties is not None: 1233 self._properties = apply_patch_to_config( 1234 _file_properties, 1235 self._properties, 1236 ) 1237 1238 return self._properties
Return the contents of the properties JSON file.
1247 def write_properties(self) -> SuccessTuple: 1248 """Write the properties dictionary to the properties JSON file 1249 (only if self.properties exists). 1250 """ 1251 from meerschaum.utils.misc import generate_password 1252 success, msg = ( 1253 False, 1254 f"No properties to write for daemon '{self.daemon_id}'." 1255 ) 1256 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1257 props = self.properties 1258 if props is not None: 1259 try: 1260 self.path.mkdir(parents=True, exist_ok=True) 1261 if self.properties_path.exists(): 1262 self.properties_path.rename(backup_path) 1263 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1264 json.dump(props, properties_file) 1265 success, msg = True, 'Success' 1266 except Exception as e: 1267 success, msg = False, str(e) 1268 1269 try: 1270 if backup_path.exists(): 1271 if not success: 1272 backup_path.rename(self.properties_path) 1273 else: 1274 backup_path.unlink() 1275 except Exception as e: 1276 success, msg = False, str(e) 1277 1278 return success, msg
Write the properties dictionary to the properties JSON file (only if self.properties exists).
1280 def write_pickle(self) -> SuccessTuple: 1281 """Write the pickle file for the daemon.""" 1282 import pickle 1283 import traceback 1284 from meerschaum.utils.misc import generate_password 1285 1286 if not self.pickle: 1287 return True, "Success" 1288 1289 from meerschaum._internal.entry import _shells 1290 if _shells: 1291 from meerschaum._internal.shell.Shell import revert_input 1292 revert_input() 1293 1294 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1295 try: 1296 self.path.mkdir(parents=True, exist_ok=True) 1297 if self.pickle_path.exists(): 1298 self.pickle_path.rename(backup_path) 1299 with open(self.pickle_path, 'wb+') as pickle_file: 1300 pickle.dump(self, pickle_file) 1301 success, msg = True, "Success" 1302 except Exception as e: 1303 success, msg = False, str(e) 1304 traceback.print_exception(type(e), e, e.__traceback__) 1305 try: 1306 if backup_path.exists(): 1307 if not success: 1308 backup_path.rename(self.pickle_path) 1309 else: 1310 backup_path.unlink() 1311 except Exception as e: 1312 success, msg = False, str(e) 1313 return success, msg
Write the pickle file for the daemon.
1343 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1344 """ 1345 Remove a daemon's directory after execution. 1346 1347 Parameters 1348 ---------- 1349 keep_logs: bool, default False 1350 If `True`, skip deleting the daemon's log files. 1351 1352 Returns 1353 ------- 1354 A `SuccessTuple` indicating success. 1355 """ 1356 if self.path.exists(): 1357 try: 1358 shutil.rmtree(self.path) 1359 except Exception as e: 1360 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1361 warn(msg) 1362 return False, msg 1363 if not keep_logs: 1364 self.rotating_log.delete() 1365 try: 1366 if self.log_offset_path.exists(): 1367 self.log_offset_path.unlink() 1368 except Exception as e: 1369 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1370 warn(msg) 1371 return False, msg 1372 return True, "Success"
Remove a daemon's directory after execution.
Parameters
- keep_logs (bool, default False):
If
True, skip deleting the daemon's log files.
Returns
- A
SuccessTupleindicating success.
1375 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1376 """ 1377 Return the timeout value to use. Use `--timeout-seconds` if provided, 1378 else the configured default (8). 1379 """ 1380 if isinstance(timeout, (int, float)): 1381 return timeout 1382 return get_config('jobs', 'timeout_seconds')
Return the timeout value to use. Use --timeout-seconds if provided,
else the configured default (8).
1385 def get_check_timeout_interval_seconds( 1386 self, 1387 check_timeout_interval: Union[int, float, None] = None, 1388 ) -> Union[int, float]: 1389 """ 1390 Return the interval value to check the status of timeouts. 1391 """ 1392 if isinstance(check_timeout_interval, (int, float)): 1393 return check_timeout_interval 1394 return get_config('jobs', 'check_timeout_interval_seconds')
Return the interval value to check the status of timeouts.
1396 @property 1397 def target_args(self) -> Union[Tuple[Any], None]: 1398 """ 1399 Return the positional arguments to pass to the target function. 1400 """ 1401 target_args = ( 1402 self.__dict__.get('_target_args', None) 1403 or self.properties.get('target', {}).get('args', None) 1404 ) 1405 if target_args is None: 1406 return tuple([]) 1407 1408 return tuple(target_args)
Return the positional arguments to pass to the target function.
1410 @property 1411 def target_kw(self) -> Union[Dict[str, Any], None]: 1412 """ 1413 Return the keyword arguments to pass to the target function. 1414 """ 1415 target_kw = ( 1416 self.__dict__.get('_target_kw', None) 1417 or self.properties.get('target', {}).get('kw', None) 1418 ) 1419 if target_kw is None: 1420 return {} 1421 1422 return {key: val for key, val in target_kw.items()}
Return the keyword arguments to pass to the target function.
22class StdinFile(io.TextIOBase): 23 """ 24 Redirect user input into a Daemon's context. 25 """ 26 def __init__( 27 self, 28 file_path: Union[pathlib.Path, str], 29 lock_file_path: Optional[pathlib.Path] = None, 30 decode: bool = True, 31 refresh_seconds: Union[int, float, None] = None, 32 ): 33 if isinstance(file_path, str): 34 file_path = pathlib.Path(file_path) 35 36 self.file_path = file_path 37 self.blocking_file_path = ( 38 lock_file_path 39 if lock_file_path is not None 40 else (file_path.parent / (file_path.name + '.block')) 41 ) 42 self._file_handler = None 43 self._fd = None 44 self.sel = selectors.DefaultSelector() 45 self.decode = decode 46 self._write_fp = None 47 self._refresh_seconds = refresh_seconds 48 49 @property 50 def encoding(self): 51 return 'utf-8' 52 53 @property 54 def file_handler(self): 55 """ 56 Return the read file handler to the provided file path. 57 """ 58 if self._file_handler is not None: 59 return self._file_handler 60 61 if not self.file_path.exists(): 62 self.file_path.parent.mkdir(parents=True, exist_ok=True) 63 os.mkfifo(self.file_path.as_posix(), mode=0o600) 64 65 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 66 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 67 self.sel.register(self._file_handler, selectors.EVENT_READ) 68 return self._file_handler 69 70 def write(self, data): 71 if self._write_fp is None: 72 self.file_path.parent.mkdir(parents=True, exist_ok=True) 73 if not self.file_path.exists(): 74 os.mkfifo(self.file_path.as_posix(), mode=0o600) 75 self._write_fp = open(self.file_path, 'wb') 76 77 if isinstance(data, str): 78 data = data.encode('utf-8') 79 try: 80 self._write_fp.write(data) 81 self._write_fp.flush() 82 except BrokenPipeError: 83 pass 84 85 def fileno(self): 86 fileno = self.file_handler.fileno() 87 return fileno 88 89 def read(self, size=-1): 90 """ 91 Read from the FIFO pipe, blocking on EOFError. 92 """ 93 _ = self.file_handler 94 while True: 95 try: 96 data = self._file_handler.read(size) 97 if data: 98 try: 99 if self.blocking_file_path.exists(): 100 self.blocking_file_path.unlink() 101 except Exception: 102 warn(traceback.format_exc()) 103 return data.decode('utf-8') if self.decode else data 104 except (OSError, EOFError): 105 pass 106 107 if not self.blocking_file_path.exists(): 108 self.blocking_file_path.touch() 109 time.sleep(self.refresh_seconds) 110 111 def readline(self, size=-1): 112 line = '' if self.decode else b'' 113 while True: 114 data = self.read(1) 115 if not data or ((data == '\n') if self.decode else (data == b'\n')): 116 break 117 line += data 118 119 return line 120 121 def close(self): 122 if self._file_handler is not None: 123 self.sel.unregister(self._file_handler) 124 self._file_handler.close() 125 try: 126 os.close(self._fd) 127 except OSError: 128 pass 129 self._file_handler = None 130 self._fd = None 131 132 if self._write_fp is not None: 133 try: 134 self._write_fp.close() 135 except BrokenPipeError: 136 pass 137 self._write_fp = None 138 139 try: 140 if self.blocking_file_path.exists(): 141 self.blocking_file_path.unlink() 142 except Exception: 143 pass 144 super().close() 145 146 def is_open(self): 147 return self._file_handler is not None 148 149 def isatty(self) -> bool: 150 return False 151 152 @property 153 def refresh_seconds(self) -> Union[int, float]: 154 """ 155 How many seconds between checking for blocking functions. 156 """ 157 if not self._refresh_seconds: 158 self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds') 159 return self._refresh_seconds 160 161 def __str__(self) -> str: 162 return f"StdinFile('{self.file_path}')" 163 164 def __repr__(self) -> str: 165 return str(self)
Redirect user input into a Daemon's context.
26 def __init__( 27 self, 28 file_path: Union[pathlib.Path, str], 29 lock_file_path: Optional[pathlib.Path] = None, 30 decode: bool = True, 31 refresh_seconds: Union[int, float, None] = None, 32 ): 33 if isinstance(file_path, str): 34 file_path = pathlib.Path(file_path) 35 36 self.file_path = file_path 37 self.blocking_file_path = ( 38 lock_file_path 39 if lock_file_path is not None 40 else (file_path.parent / (file_path.name + '.block')) 41 ) 42 self._file_handler = None 43 self._fd = None 44 self.sel = selectors.DefaultSelector() 45 self.decode = decode 46 self._write_fp = None 47 self._refresh_seconds = refresh_seconds
53 @property 54 def file_handler(self): 55 """ 56 Return the read file handler to the provided file path. 57 """ 58 if self._file_handler is not None: 59 return self._file_handler 60 61 if not self.file_path.exists(): 62 self.file_path.parent.mkdir(parents=True, exist_ok=True) 63 os.mkfifo(self.file_path.as_posix(), mode=0o600) 64 65 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 66 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 67 self.sel.register(self._file_handler, selectors.EVENT_READ) 68 return self._file_handler
Return the read file handler to the provided file path.
70 def write(self, data): 71 if self._write_fp is None: 72 self.file_path.parent.mkdir(parents=True, exist_ok=True) 73 if not self.file_path.exists(): 74 os.mkfifo(self.file_path.as_posix(), mode=0o600) 75 self._write_fp = open(self.file_path, 'wb') 76 77 if isinstance(data, str): 78 data = data.encode('utf-8') 79 try: 80 self._write_fp.write(data) 81 self._write_fp.flush() 82 except BrokenPipeError: 83 pass
Write string s to stream.
Return the number of characters written (which is always equal to the length of the string).
Return underlying file descriptor if one exists.
Raise OSError if the IO object does not use a file descriptor.
89 def read(self, size=-1): 90 """ 91 Read from the FIFO pipe, blocking on EOFError. 92 """ 93 _ = self.file_handler 94 while True: 95 try: 96 data = self._file_handler.read(size) 97 if data: 98 try: 99 if self.blocking_file_path.exists(): 100 self.blocking_file_path.unlink() 101 except Exception: 102 warn(traceback.format_exc()) 103 return data.decode('utf-8') if self.decode else data 104 except (OSError, EOFError): 105 pass 106 107 if not self.blocking_file_path.exists(): 108 self.blocking_file_path.touch() 109 time.sleep(self.refresh_seconds)
Read from the FIFO pipe, blocking on EOFError.
111 def readline(self, size=-1): 112 line = '' if self.decode else b'' 113 while True: 114 data = self.read(1) 115 if not data or ((data == '\n') if self.decode else (data == b'\n')): 116 break 117 line += data 118 119 return line
Read until newline or EOF.
Return an empty string if EOF is hit immediately. If size is specified, at most size characters will be read.
121 def close(self): 122 if self._file_handler is not None: 123 self.sel.unregister(self._file_handler) 124 self._file_handler.close() 125 try: 126 os.close(self._fd) 127 except OSError: 128 pass 129 self._file_handler = None 130 self._fd = None 131 132 if self._write_fp is not None: 133 try: 134 self._write_fp.close() 135 except BrokenPipeError: 136 pass 137 self._write_fp = None 138 139 try: 140 if self.blocking_file_path.exists(): 141 self.blocking_file_path.unlink() 142 except Exception: 143 pass 144 super().close()
Flush and close the IO object.
This method has no effect if the file is already closed.
Return whether this is an 'interactive' stream.
Return False if it can't be determined.
152 @property 153 def refresh_seconds(self) -> Union[int, float]: 154 """ 155 How many seconds between checking for blocking functions. 156 """ 157 if not self._refresh_seconds: 158 self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds') 159 return self._refresh_seconds
How many seconds between checking for blocking functions.
27class RotatingFile(io.IOBase): 28 """ 29 A `RotatingFile` may be treated like a normal file-like object. 30 Under the hood, however, it will create new sub-files and delete old ones. 31 """ 32 33 SEEK_BACK_ATTEMPTS: int = 5 34 35 def __init__( 36 self, 37 file_path: pathlib.Path, 38 num_files_to_keep: Optional[int] = None, 39 max_file_size: Optional[int] = None, 40 redirect_streams: bool = False, 41 write_timestamps: bool = False, 42 timestamp_format: Optional[str] = None, 43 write_callback: Optional[Callable[[str], None]] = None, 44 ): 45 """ 46 Create a file-like object which manages other files. 47 48 Parameters 49 ---------- 50 num_files_to_keep: int, default None 51 How many sub-files to keep at any given time. 52 Defaults to the configured value (5). 53 54 max_file_size: int, default None 55 How large in bytes each sub-file can grow before another file is created. 56 Note that this is not a hard limit but rather a threshold 57 which may be slightly exceeded. 58 Defaults to the configured value (100_000). 59 60 redirect_streams: bool, default False 61 If `True`, redirect previous file streams when opening a new file descriptor. 62 63 NOTE: Only set this to `True` if you are entering into a daemon context. 64 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 65 66 write_timestamps: bool, default False 67 If `True`, prepend the current UTC timestamp to each line of the file. 68 69 timestamp_format: str, default None 70 If `write_timestamps` is `True`, use this format for the timestamps. 71 Defaults to `'%Y-%m-%d %H:%M'`. 72 73 write_callback: Optional[Callable[[str], None]], default None 74 If provided, execute this callback with the data to be written. 75 """ 76 self.file_path = pathlib.Path(file_path) 77 if num_files_to_keep is None: 78 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 79 if max_file_size is None: 80 max_file_size = get_config('jobs', 'logs', 'max_file_size') 81 if timestamp_format is None: 82 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 83 if num_files_to_keep < 1: 84 raise ValueError("At least 1 file must be kept.") 85 if max_file_size < 100: 86 raise ValueError("Subfiles must contain at least 100 bytes.") 87 88 self.num_files_to_keep = num_files_to_keep 89 self.max_file_size = max_file_size 90 self.redirect_streams = redirect_streams 91 self.write_timestamps = write_timestamps 92 self.timestamp_format = timestamp_format 93 self.write_callback = write_callback 94 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?') 95 96 ### When subfiles are opened, map from their index to the file objects. 97 self.subfile_objects = {} 98 self._redirected_subfile_objects = {} 99 self._current_file_obj = None 100 self._previous_file_obj = None 101 102 ### When reading, keep track of the file index and position. 103 self._cursor: Tuple[int, int] = (0, 0) 104 105 ### Don't forget to close any stray files. 106 atexit.register(self.close) 107 108 109 def fileno(self): 110 """ 111 Return the file descriptor for the latest subfile. 112 """ 113 self.refresh_files(start_interception=False) 114 return self._current_file_obj.fileno() 115 116 117 def get_latest_subfile_path(self) -> pathlib.Path: 118 """ 119 Return the path for the latest subfile to which to write into. 120 """ 121 return self.get_subfile_path_from_index( 122 self.get_latest_subfile_index() 123 ) 124 125 126 def get_remaining_subfile_size(self, subfile_index: int) -> int: 127 """ 128 Return the remaining buffer size for a subfile. 129 130 Parameters 131 --------- 132 subfile_index: int 133 The index of the subfile to be checked. 134 135 Returns 136 ------- 137 The remaining size in bytes. 138 """ 139 subfile_path = self.get_subfile_path_from_index(subfile_index) 140 if not subfile_path.exists(): 141 return self.max_file_size 142 143 return self.max_file_size - os.path.getsize(subfile_path) 144 145 146 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 147 """ 148 Return whether a given subfile is too large. 149 150 Parameters 151 ---------- 152 subfile_index: int 153 The index of the subfile to be checked. 154 155 potential_new_len: int, default 0 156 The length of a potential write of new data. 157 158 Returns 159 ------- 160 A bool indicating the subfile is or will be too large. 161 """ 162 subfile_path = self.get_subfile_path_from_index(subfile_index) 163 if not subfile_path.exists(): 164 return False 165 166 self.flush() 167 168 return ( 169 (os.path.getsize(subfile_path) + potential_new_len) 170 >= 171 self.max_file_size 172 ) 173 174 175 def get_latest_subfile_index(self) -> int: 176 """ 177 Return the latest existing subfile index. 178 If no index may be found, return -1. 179 """ 180 existing_subfile_paths = self.get_existing_subfile_paths() 181 latest_index = ( 182 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 183 if existing_subfile_paths 184 else 0 185 ) 186 return latest_index 187 188 189 def get_index_from_subfile_name(self, subfile_name: str) -> int: 190 """ 191 Return the index from a given subfile name. 192 If the file name cannot be parsed, return -1. 193 """ 194 try: 195 return int(subfile_name.replace(self.file_path.name + '.', '')) 196 except Exception: 197 return -1 198 199 200 def get_subfile_name_from_index(self, subfile_index: int) -> str: 201 """ 202 Return the subfile name from the given index. 203 """ 204 return f'{self.file_path.name}.{subfile_index}' 205 206 207 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 208 """ 209 Return the subfile's path from its index. 210 """ 211 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index) 212 213 214 def get_existing_subfile_indices(self) -> List[int]: 215 """ 216 Return of list of subfile indices which exist on disk. 217 """ 218 existing_subfile_paths = self.get_existing_subfile_paths() 219 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths] 220 221 222 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 223 """ 224 Return a list of file paths that match the input filename pattern. 225 """ 226 if not self.file_path.parent.exists(): 227 return [] 228 229 subfile_names_indices = sorted( 230 [ 231 (file_name, self.get_index_from_subfile_name(file_name)) 232 for file_name in os.listdir(self.file_path.parent) 233 if ( 234 file_name.startswith(self.file_path.name) 235 and re.match(self.subfile_regex_pattern, file_name) 236 ) 237 ], 238 key=lambda x: x[1], 239 ) 240 return [ 241 (self.file_path.parent / file_name) 242 for file_name, _ in subfile_names_indices 243 ] 244 245 246 def refresh_files( 247 self, 248 potential_new_len: int = 0, 249 start_interception: bool = False, 250 ) -> 'io.TextIOWrapper': 251 """ 252 Check the state of the subfiles. 253 If the latest subfile is too large, create a new file and delete old ones. 254 255 Parameters 256 ---------- 257 potential_new_len: int, default 0 258 259 start_interception: bool, default False 260 If `True`, kick off the file interception threads. 261 """ 262 self.flush() 263 264 latest_subfile_index = self.get_latest_subfile_index() 265 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 266 267 ### First run with existing log files: open the most recent log file. 268 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 269 270 ### Sometimes a new file is created but output doesn't switch over. 271 lost_latest_handle = ( 272 self._current_file_obj is not None 273 and 274 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 275 ) 276 if is_first_run_with_logs or lost_latest_handle: 277 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 278 if self.redirect_streams: 279 try: 280 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 281 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 282 except OSError: 283 warn( 284 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 285 ) 286 if start_interception and self.write_timestamps: 287 self.start_log_fd_interception() 288 289 create_new_file = ( 290 (latest_subfile_index == -1) 291 or 292 self._current_file_obj is None 293 or 294 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 295 ) 296 if create_new_file: 297 self.increment_subfiles() 298 299 return self._current_file_obj 300 301 def increment_subfiles(self, increment_by: int = 1): 302 """ 303 Create a new subfile and switch the file pointer over. 304 """ 305 latest_subfile_index = self.get_latest_subfile_index() 306 old_subfile_index = latest_subfile_index 307 new_subfile_index = old_subfile_index + increment_by 308 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 309 self._previous_file_obj = self._current_file_obj 310 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 311 self.subfile_objects[new_subfile_index] = self._current_file_obj 312 self.flush() 313 314 if self.redirect_streams: 315 if self._previous_file_obj is not None: 316 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 317 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 318 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 319 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 320 self.close(unused_only=True) 321 322 ### Sanity check in case writing somehow fails. 323 if self._previous_file_obj is self._current_file_obj: 324 self._previous_file_obj = None 325 326 self.delete(unused_only=True) 327 328 def close(self, unused_only: bool = False) -> None: 329 """ 330 Close any open file descriptors. 331 332 Parameters 333 ---------- 334 unused_only: bool, default False 335 If `True`, only close file descriptors not currently in use. 336 """ 337 self.stop_log_fd_interception(unused_only=unused_only) 338 subfile_indices = sorted(self.subfile_objects.keys()) 339 for subfile_index in subfile_indices: 340 subfile_object = self.subfile_objects[subfile_index] 341 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 342 continue 343 try: 344 if not subfile_object.closed: 345 subfile_object.close() 346 except Exception: 347 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 348 349 _ = self.subfile_objects.pop(subfile_index, None) 350 if self.redirect_streams: 351 _ = self._redirected_subfile_objects.pop(subfile_index, None) 352 353 if not unused_only: 354 self._previous_file_obj = None 355 self._current_file_obj = None 356 357 358 def get_timestamp_prefix_str(self) -> str: 359 """ 360 Return the current minute prefix string. 361 """ 362 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | ' 363 364 365 def write(self, data: str) -> None: 366 """ 367 Write the given text into the latest subfile. 368 If the subfile will be too large, create a new subfile. 369 If too many subfiles exist at once, the oldest one will be deleted. 370 371 NOTE: This will not split data across multiple files. 372 As such, if data is larger than max_file_size, then the corresponding subfile 373 may exceed this limit. 374 """ 375 try: 376 if callable(self.write_callback): 377 self.write_callback(data) 378 except Exception: 379 warn(f"Failed to execute write callback:\n{traceback.format_exc()}") 380 381 try: 382 self.file_path.parent.mkdir(exist_ok=True, parents=True) 383 if isinstance(data, bytes): 384 data = data.decode('utf-8') 385 386 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 387 suffix_str = "\n" if self.write_timestamps else "" 388 self.refresh_files( 389 potential_new_len = len(prefix_str + data + suffix_str), 390 start_interception = self.write_timestamps, 391 ) 392 try: 393 if prefix_str: 394 self._current_file_obj.write(prefix_str) 395 self._current_file_obj.write(data) 396 if suffix_str: 397 self._current_file_obj.write(suffix_str) 398 except BrokenPipeError: 399 warn("BrokenPipeError encountered. The daemon may have been terminated.") 400 return 401 except Exception: 402 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 403 self.flush() 404 self.delete(unused_only=True) 405 except Exception as e: 406 warn(f"Unexpected error in RotatingFile.write: {e}") 407 408 409 def delete(self, unused_only: bool = False) -> None: 410 """ 411 Delete old subfiles. 412 413 Parameters 414 ---------- 415 unused_only: bool, default False 416 If `True`, only delete subfiles which are no longer needed. 417 """ 418 existing_subfile_paths = self.get_existing_subfile_paths() 419 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 420 return 421 422 self.flush() 423 self.close(unused_only=unused_only) 424 425 end_ix = ( 426 (-1 * self.num_files_to_keep) 427 if unused_only 428 else len(existing_subfile_paths) 429 ) 430 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 431 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 432 433 try: 434 subfile_path_to_delete.unlink() 435 except Exception: 436 warn( 437 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 438 + f"{traceback.format_exc()}" 439 ) 440 441 442 def read(self, *args, **kwargs) -> str: 443 """ 444 Read the contents of the existing subfiles. 445 """ 446 existing_subfile_indices = [ 447 self.get_index_from_subfile_name(subfile_path.name) 448 for subfile_path in self.get_existing_subfile_paths() 449 ] 450 paths_to_read = [ 451 self.get_subfile_path_from_index(subfile_index) 452 for subfile_index in existing_subfile_indices 453 if subfile_index >= self._cursor[0] 454 ] 455 buffer = '' 456 refresh_cursor = True 457 for subfile_path in paths_to_read: 458 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 459 seek_ix = ( 460 self._cursor[1] 461 if subfile_index == self._cursor[0] 462 else 0 463 ) 464 465 if ( 466 subfile_index in self.subfile_objects 467 and 468 subfile_index not in self._redirected_subfile_objects 469 ): 470 subfile_object = self.subfile_objects[subfile_index] 471 for i in range(self.SEEK_BACK_ATTEMPTS): 472 try: 473 subfile_object.seek(max(seek_ix - i, 0)) 474 buffer += subfile_object.read() 475 except UnicodeDecodeError: 476 continue 477 break 478 else: 479 with open(subfile_path, 'r', encoding='utf-8') as f: 480 for i in range(self.SEEK_BACK_ATTEMPTS): 481 try: 482 f.seek(max(seek_ix - i, 0)) 483 buffer += f.read() 484 except UnicodeDecodeError: 485 continue 486 break 487 488 ### Handle the case when no files have yet been opened. 489 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 490 self._cursor = (subfile_index, f.tell()) 491 refresh_cursor = False 492 493 if refresh_cursor: 494 self.refresh_cursor() 495 return buffer 496 497 498 def refresh_cursor(self) -> None: 499 """ 500 Update the cursor to the latest subfile index and file.tell() value. 501 """ 502 self.flush() 503 existing_subfile_paths = self.get_existing_subfile_paths() 504 current_ix = ( 505 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 506 if existing_subfile_paths 507 else 0 508 ) 509 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 510 self._cursor = (current_ix, position) 511 512 513 def readlines(self) -> List[str]: 514 """ 515 Return a list of lines of text. 516 """ 517 existing_subfile_indices = [ 518 self.get_index_from_subfile_name(subfile_path.name) 519 for subfile_path in self.get_existing_subfile_paths() 520 ] 521 paths_to_read = [ 522 self.get_subfile_path_from_index(subfile_index) 523 for subfile_index in existing_subfile_indices 524 if subfile_index >= self._cursor[0] 525 ] 526 527 lines = [] 528 refresh_cursor = True 529 for subfile_path in paths_to_read: 530 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 531 seek_ix = ( 532 self._cursor[1] 533 if subfile_index == self._cursor[0] 534 else 0 535 ) 536 537 subfile_lines = [] 538 if ( 539 subfile_index in self.subfile_objects 540 and 541 subfile_index not in self._redirected_subfile_objects 542 ): 543 subfile_object = self.subfile_objects[subfile_index] 544 for i in range(self.SEEK_BACK_ATTEMPTS): 545 try: 546 subfile_object.seek(max((seek_ix - i), 0)) 547 subfile_lines = subfile_object.readlines() 548 except UnicodeDecodeError: 549 continue 550 break 551 else: 552 with open(subfile_path, 'r', encoding='utf-8') as f: 553 for i in range(self.SEEK_BACK_ATTEMPTS): 554 try: 555 f.seek(max(seek_ix - i, 0)) 556 subfile_lines = f.readlines() 557 except UnicodeDecodeError: 558 continue 559 break 560 561 ### Handle the case when no files have yet been opened. 562 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 563 self._cursor = (subfile_index, f.tell()) 564 refresh_cursor = False 565 566 ### Sometimes a line may span multiple files. 567 if lines and subfile_lines and not lines[-1].endswith('\n'): 568 lines[-1] += subfile_lines[0] 569 new_lines = subfile_lines[1:] 570 else: 571 new_lines = subfile_lines 572 lines.extend(new_lines) 573 574 if refresh_cursor: 575 self.refresh_cursor() 576 return lines 577 578 579 def seekable(self) -> bool: 580 return True 581 582 583 def seek(self, position: int) -> None: 584 """ 585 Seek to the beginning of the logs stream. 586 """ 587 existing_subfile_indices = self.get_existing_subfile_indices() 588 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 589 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 590 if position == 0: 591 self._cursor = (min_ix, 0) 592 return 593 594 self._cursor = (max_ix, position) 595 if self._current_file_obj is not None: 596 self._current_file_obj.seek(position) 597 598 599 def flush(self) -> None: 600 """ 601 Flush any open subfiles. 602 """ 603 for subfile_index, subfile_object in self.subfile_objects.items(): 604 if not subfile_object.closed: 605 try: 606 subfile_object.flush() 607 except Exception: 608 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 609 610 if self.redirect_streams: 611 try: 612 sys.stdout.flush() 613 except BrokenPipeError: 614 pass 615 except Exception: 616 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 617 try: 618 sys.stderr.flush() 619 except BrokenPipeError: 620 pass 621 except Exception: 622 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}") 623 624 625 def start_log_fd_interception(self): 626 """ 627 Start the file descriptor monitoring threads. 628 """ 629 if not self.write_timestamps: 630 return 631 632 self._stdout_interceptor = FileDescriptorInterceptor( 633 sys.stdout.fileno(), 634 self.get_timestamp_prefix_str, 635 ) 636 self._stderr_interceptor = FileDescriptorInterceptor( 637 sys.stderr.fileno(), 638 self.get_timestamp_prefix_str, 639 ) 640 641 self._stdout_interceptor_thread = Thread( 642 target = self._stdout_interceptor.start_interception, 643 daemon = True, 644 ) 645 self._stderr_interceptor_thread = Thread( 646 target = self._stderr_interceptor.start_interception, 647 daemon = True, 648 ) 649 self._stdout_interceptor_thread.start() 650 self._stderr_interceptor_thread.start() 651 self._intercepting = True 652 653 if '_interceptor_threads' not in self.__dict__: 654 self._interceptor_threads = [] 655 if '_interceptors' not in self.__dict__: 656 self._interceptors = [] 657 self._interceptor_threads.extend([ 658 self._stdout_interceptor_thread, 659 self._stderr_interceptor_thread, 660 ]) 661 self._interceptors.extend([ 662 self._stdout_interceptor, 663 self._stderr_interceptor, 664 ]) 665 self.stop_log_fd_interception(unused_only=True) 666 667 668 def stop_log_fd_interception(self, unused_only: bool = False): 669 """ 670 Stop the file descriptor monitoring threads. 671 """ 672 if not self.write_timestamps: 673 return 674 675 interceptors = self.__dict__.get('_interceptors', []) 676 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 677 678 end_ix = len(interceptors) if not unused_only else -2 679 680 for interceptor in interceptors[:end_ix]: 681 interceptor.stop_interception() 682 del interceptors[:end_ix] 683 684 for thread in interceptor_threads[:end_ix]: 685 try: 686 thread.join() 687 except Exception: 688 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 689 del interceptor_threads[:end_ix] 690 691 def touch(self): 692 """ 693 Touch the latest subfile. 694 """ 695 subfile_path = self.get_latest_subfile_path() 696 subfile_path.touch() 697 698 def isatty(self) -> bool: 699 return True 700 701 def __repr__(self) -> str: 702 """ 703 Return basic info for this `RotatingFile`. 704 """ 705 return ( 706 "RotatingFile(" 707 + f"'{self.file_path.as_posix()}', " 708 + f"num_files_to_keep={self.num_files_to_keep}, " 709 + f"max_file_size={self.max_file_size})" 710 )
A RotatingFile may be treated like a normal file-like object.
Under the hood, however, it will create new sub-files and delete old ones.
35 def __init__( 36 self, 37 file_path: pathlib.Path, 38 num_files_to_keep: Optional[int] = None, 39 max_file_size: Optional[int] = None, 40 redirect_streams: bool = False, 41 write_timestamps: bool = False, 42 timestamp_format: Optional[str] = None, 43 write_callback: Optional[Callable[[str], None]] = None, 44 ): 45 """ 46 Create a file-like object which manages other files. 47 48 Parameters 49 ---------- 50 num_files_to_keep: int, default None 51 How many sub-files to keep at any given time. 52 Defaults to the configured value (5). 53 54 max_file_size: int, default None 55 How large in bytes each sub-file can grow before another file is created. 56 Note that this is not a hard limit but rather a threshold 57 which may be slightly exceeded. 58 Defaults to the configured value (100_000). 59 60 redirect_streams: bool, default False 61 If `True`, redirect previous file streams when opening a new file descriptor. 62 63 NOTE: Only set this to `True` if you are entering into a daemon context. 64 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 65 66 write_timestamps: bool, default False 67 If `True`, prepend the current UTC timestamp to each line of the file. 68 69 timestamp_format: str, default None 70 If `write_timestamps` is `True`, use this format for the timestamps. 71 Defaults to `'%Y-%m-%d %H:%M'`. 72 73 write_callback: Optional[Callable[[str], None]], default None 74 If provided, execute this callback with the data to be written. 75 """ 76 self.file_path = pathlib.Path(file_path) 77 if num_files_to_keep is None: 78 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 79 if max_file_size is None: 80 max_file_size = get_config('jobs', 'logs', 'max_file_size') 81 if timestamp_format is None: 82 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 83 if num_files_to_keep < 1: 84 raise ValueError("At least 1 file must be kept.") 85 if max_file_size < 100: 86 raise ValueError("Subfiles must contain at least 100 bytes.") 87 88 self.num_files_to_keep = num_files_to_keep 89 self.max_file_size = max_file_size 90 self.redirect_streams = redirect_streams 91 self.write_timestamps = write_timestamps 92 self.timestamp_format = timestamp_format 93 self.write_callback = write_callback 94 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?') 95 96 ### When subfiles are opened, map from their index to the file objects. 97 self.subfile_objects = {} 98 self._redirected_subfile_objects = {} 99 self._current_file_obj = None 100 self._previous_file_obj = None 101 102 ### When reading, keep track of the file index and position. 103 self._cursor: Tuple[int, int] = (0, 0) 104 105 ### Don't forget to close any stray files. 106 atexit.register(self.close)
Create a file-like object which manages other files.
Parameters
- num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
- max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
redirect_streams (bool, default False): If
True, redirect previous file streams when opening a new file descriptor.NOTE: Only set this to
Trueif you are entering into a daemon context. Doing so will redirectsys.stdoutandsys.stderrinto the log files.- write_timestamps (bool, default False):
If
True, prepend the current UTC timestamp to each line of the file. - timestamp_format (str, default None):
If
write_timestampsisTrue, use this format for the timestamps. Defaults to'%Y-%m-%d %H:%M'. - write_callback (Optional[Callable[[str], None]], default None): If provided, execute this callback with the data to be written.
109 def fileno(self): 110 """ 111 Return the file descriptor for the latest subfile. 112 """ 113 self.refresh_files(start_interception=False) 114 return self._current_file_obj.fileno()
Return the file descriptor for the latest subfile.
117 def get_latest_subfile_path(self) -> pathlib.Path: 118 """ 119 Return the path for the latest subfile to which to write into. 120 """ 121 return self.get_subfile_path_from_index( 122 self.get_latest_subfile_index() 123 )
Return the path for the latest subfile to which to write into.
126 def get_remaining_subfile_size(self, subfile_index: int) -> int: 127 """ 128 Return the remaining buffer size for a subfile. 129 130 Parameters 131 --------- 132 subfile_index: int 133 The index of the subfile to be checked. 134 135 Returns 136 ------- 137 The remaining size in bytes. 138 """ 139 subfile_path = self.get_subfile_path_from_index(subfile_index) 140 if not subfile_path.exists(): 141 return self.max_file_size 142 143 return self.max_file_size - os.path.getsize(subfile_path)
Return the remaining buffer size for a subfile.
Parameters
- subfile_index (int): The index of the subfile to be checked.
Returns
- The remaining size in bytes.
146 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 147 """ 148 Return whether a given subfile is too large. 149 150 Parameters 151 ---------- 152 subfile_index: int 153 The index of the subfile to be checked. 154 155 potential_new_len: int, default 0 156 The length of a potential write of new data. 157 158 Returns 159 ------- 160 A bool indicating the subfile is or will be too large. 161 """ 162 subfile_path = self.get_subfile_path_from_index(subfile_index) 163 if not subfile_path.exists(): 164 return False 165 166 self.flush() 167 168 return ( 169 (os.path.getsize(subfile_path) + potential_new_len) 170 >= 171 self.max_file_size 172 )
Return whether a given subfile is too large.
Parameters
- subfile_index (int): The index of the subfile to be checked.
- potential_new_len (int, default 0): The length of a potential write of new data.
Returns
- A bool indicating the subfile is or will be too large.
175 def get_latest_subfile_index(self) -> int: 176 """ 177 Return the latest existing subfile index. 178 If no index may be found, return -1. 179 """ 180 existing_subfile_paths = self.get_existing_subfile_paths() 181 latest_index = ( 182 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 183 if existing_subfile_paths 184 else 0 185 ) 186 return latest_index
Return the latest existing subfile index. If no index may be found, return -1.
189 def get_index_from_subfile_name(self, subfile_name: str) -> int: 190 """ 191 Return the index from a given subfile name. 192 If the file name cannot be parsed, return -1. 193 """ 194 try: 195 return int(subfile_name.replace(self.file_path.name + '.', '')) 196 except Exception: 197 return -1
Return the index from a given subfile name. If the file name cannot be parsed, return -1.
200 def get_subfile_name_from_index(self, subfile_index: int) -> str: 201 """ 202 Return the subfile name from the given index. 203 """ 204 return f'{self.file_path.name}.{subfile_index}'
Return the subfile name from the given index.
207 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 208 """ 209 Return the subfile's path from its index. 210 """ 211 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
Return the subfile's path from its index.
214 def get_existing_subfile_indices(self) -> List[int]: 215 """ 216 Return of list of subfile indices which exist on disk. 217 """ 218 existing_subfile_paths = self.get_existing_subfile_paths() 219 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
Return of list of subfile indices which exist on disk.
222 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 223 """ 224 Return a list of file paths that match the input filename pattern. 225 """ 226 if not self.file_path.parent.exists(): 227 return [] 228 229 subfile_names_indices = sorted( 230 [ 231 (file_name, self.get_index_from_subfile_name(file_name)) 232 for file_name in os.listdir(self.file_path.parent) 233 if ( 234 file_name.startswith(self.file_path.name) 235 and re.match(self.subfile_regex_pattern, file_name) 236 ) 237 ], 238 key=lambda x: x[1], 239 ) 240 return [ 241 (self.file_path.parent / file_name) 242 for file_name, _ in subfile_names_indices 243 ]
Return a list of file paths that match the input filename pattern.
246 def refresh_files( 247 self, 248 potential_new_len: int = 0, 249 start_interception: bool = False, 250 ) -> 'io.TextIOWrapper': 251 """ 252 Check the state of the subfiles. 253 If the latest subfile is too large, create a new file and delete old ones. 254 255 Parameters 256 ---------- 257 potential_new_len: int, default 0 258 259 start_interception: bool, default False 260 If `True`, kick off the file interception threads. 261 """ 262 self.flush() 263 264 latest_subfile_index = self.get_latest_subfile_index() 265 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 266 267 ### First run with existing log files: open the most recent log file. 268 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 269 270 ### Sometimes a new file is created but output doesn't switch over. 271 lost_latest_handle = ( 272 self._current_file_obj is not None 273 and 274 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 275 ) 276 if is_first_run_with_logs or lost_latest_handle: 277 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 278 if self.redirect_streams: 279 try: 280 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 281 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 282 except OSError: 283 warn( 284 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 285 ) 286 if start_interception and self.write_timestamps: 287 self.start_log_fd_interception() 288 289 create_new_file = ( 290 (latest_subfile_index == -1) 291 or 292 self._current_file_obj is None 293 or 294 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 295 ) 296 if create_new_file: 297 self.increment_subfiles() 298 299 return self._current_file_obj
Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.
Parameters
potential_new_len (int, default 0):
start_interception (bool, default False): If
True, kick off the file interception threads.
301 def increment_subfiles(self, increment_by: int = 1): 302 """ 303 Create a new subfile and switch the file pointer over. 304 """ 305 latest_subfile_index = self.get_latest_subfile_index() 306 old_subfile_index = latest_subfile_index 307 new_subfile_index = old_subfile_index + increment_by 308 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 309 self._previous_file_obj = self._current_file_obj 310 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 311 self.subfile_objects[new_subfile_index] = self._current_file_obj 312 self.flush() 313 314 if self.redirect_streams: 315 if self._previous_file_obj is not None: 316 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 317 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 318 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 319 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 320 self.close(unused_only=True) 321 322 ### Sanity check in case writing somehow fails. 323 if self._previous_file_obj is self._current_file_obj: 324 self._previous_file_obj = None 325 326 self.delete(unused_only=True)
Create a new subfile and switch the file pointer over.
328 def close(self, unused_only: bool = False) -> None: 329 """ 330 Close any open file descriptors. 331 332 Parameters 333 ---------- 334 unused_only: bool, default False 335 If `True`, only close file descriptors not currently in use. 336 """ 337 self.stop_log_fd_interception(unused_only=unused_only) 338 subfile_indices = sorted(self.subfile_objects.keys()) 339 for subfile_index in subfile_indices: 340 subfile_object = self.subfile_objects[subfile_index] 341 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 342 continue 343 try: 344 if not subfile_object.closed: 345 subfile_object.close() 346 except Exception: 347 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 348 349 _ = self.subfile_objects.pop(subfile_index, None) 350 if self.redirect_streams: 351 _ = self._redirected_subfile_objects.pop(subfile_index, None) 352 353 if not unused_only: 354 self._previous_file_obj = None 355 self._current_file_obj = None
Close any open file descriptors.
Parameters
- unused_only (bool, default False):
If
True, only close file descriptors not currently in use.
358 def get_timestamp_prefix_str(self) -> str: 359 """ 360 Return the current minute prefix string. 361 """ 362 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
Return the current minute prefix string.
365 def write(self, data: str) -> None: 366 """ 367 Write the given text into the latest subfile. 368 If the subfile will be too large, create a new subfile. 369 If too many subfiles exist at once, the oldest one will be deleted. 370 371 NOTE: This will not split data across multiple files. 372 As such, if data is larger than max_file_size, then the corresponding subfile 373 may exceed this limit. 374 """ 375 try: 376 if callable(self.write_callback): 377 self.write_callback(data) 378 except Exception: 379 warn(f"Failed to execute write callback:\n{traceback.format_exc()}") 380 381 try: 382 self.file_path.parent.mkdir(exist_ok=True, parents=True) 383 if isinstance(data, bytes): 384 data = data.decode('utf-8') 385 386 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 387 suffix_str = "\n" if self.write_timestamps else "" 388 self.refresh_files( 389 potential_new_len = len(prefix_str + data + suffix_str), 390 start_interception = self.write_timestamps, 391 ) 392 try: 393 if prefix_str: 394 self._current_file_obj.write(prefix_str) 395 self._current_file_obj.write(data) 396 if suffix_str: 397 self._current_file_obj.write(suffix_str) 398 except BrokenPipeError: 399 warn("BrokenPipeError encountered. The daemon may have been terminated.") 400 return 401 except Exception: 402 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 403 self.flush() 404 self.delete(unused_only=True) 405 except Exception as e: 406 warn(f"Unexpected error in RotatingFile.write: {e}")
Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.
NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.
409 def delete(self, unused_only: bool = False) -> None: 410 """ 411 Delete old subfiles. 412 413 Parameters 414 ---------- 415 unused_only: bool, default False 416 If `True`, only delete subfiles which are no longer needed. 417 """ 418 existing_subfile_paths = self.get_existing_subfile_paths() 419 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 420 return 421 422 self.flush() 423 self.close(unused_only=unused_only) 424 425 end_ix = ( 426 (-1 * self.num_files_to_keep) 427 if unused_only 428 else len(existing_subfile_paths) 429 ) 430 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 431 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 432 433 try: 434 subfile_path_to_delete.unlink() 435 except Exception: 436 warn( 437 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 438 + f"{traceback.format_exc()}" 439 )
Delete old subfiles.
Parameters
- unused_only (bool, default False):
If
True, only delete subfiles which are no longer needed.
442 def read(self, *args, **kwargs) -> str: 443 """ 444 Read the contents of the existing subfiles. 445 """ 446 existing_subfile_indices = [ 447 self.get_index_from_subfile_name(subfile_path.name) 448 for subfile_path in self.get_existing_subfile_paths() 449 ] 450 paths_to_read = [ 451 self.get_subfile_path_from_index(subfile_index) 452 for subfile_index in existing_subfile_indices 453 if subfile_index >= self._cursor[0] 454 ] 455 buffer = '' 456 refresh_cursor = True 457 for subfile_path in paths_to_read: 458 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 459 seek_ix = ( 460 self._cursor[1] 461 if subfile_index == self._cursor[0] 462 else 0 463 ) 464 465 if ( 466 subfile_index in self.subfile_objects 467 and 468 subfile_index not in self._redirected_subfile_objects 469 ): 470 subfile_object = self.subfile_objects[subfile_index] 471 for i in range(self.SEEK_BACK_ATTEMPTS): 472 try: 473 subfile_object.seek(max(seek_ix - i, 0)) 474 buffer += subfile_object.read() 475 except UnicodeDecodeError: 476 continue 477 break 478 else: 479 with open(subfile_path, 'r', encoding='utf-8') as f: 480 for i in range(self.SEEK_BACK_ATTEMPTS): 481 try: 482 f.seek(max(seek_ix - i, 0)) 483 buffer += f.read() 484 except UnicodeDecodeError: 485 continue 486 break 487 488 ### Handle the case when no files have yet been opened. 489 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 490 self._cursor = (subfile_index, f.tell()) 491 refresh_cursor = False 492 493 if refresh_cursor: 494 self.refresh_cursor() 495 return buffer
Read the contents of the existing subfiles.
498 def refresh_cursor(self) -> None: 499 """ 500 Update the cursor to the latest subfile index and file.tell() value. 501 """ 502 self.flush() 503 existing_subfile_paths = self.get_existing_subfile_paths() 504 current_ix = ( 505 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 506 if existing_subfile_paths 507 else 0 508 ) 509 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 510 self._cursor = (current_ix, position)
Update the cursor to the latest subfile index and file.tell() value.
513 def readlines(self) -> List[str]: 514 """ 515 Return a list of lines of text. 516 """ 517 existing_subfile_indices = [ 518 self.get_index_from_subfile_name(subfile_path.name) 519 for subfile_path in self.get_existing_subfile_paths() 520 ] 521 paths_to_read = [ 522 self.get_subfile_path_from_index(subfile_index) 523 for subfile_index in existing_subfile_indices 524 if subfile_index >= self._cursor[0] 525 ] 526 527 lines = [] 528 refresh_cursor = True 529 for subfile_path in paths_to_read: 530 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 531 seek_ix = ( 532 self._cursor[1] 533 if subfile_index == self._cursor[0] 534 else 0 535 ) 536 537 subfile_lines = [] 538 if ( 539 subfile_index in self.subfile_objects 540 and 541 subfile_index not in self._redirected_subfile_objects 542 ): 543 subfile_object = self.subfile_objects[subfile_index] 544 for i in range(self.SEEK_BACK_ATTEMPTS): 545 try: 546 subfile_object.seek(max((seek_ix - i), 0)) 547 subfile_lines = subfile_object.readlines() 548 except UnicodeDecodeError: 549 continue 550 break 551 else: 552 with open(subfile_path, 'r', encoding='utf-8') as f: 553 for i in range(self.SEEK_BACK_ATTEMPTS): 554 try: 555 f.seek(max(seek_ix - i, 0)) 556 subfile_lines = f.readlines() 557 except UnicodeDecodeError: 558 continue 559 break 560 561 ### Handle the case when no files have yet been opened. 562 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 563 self._cursor = (subfile_index, f.tell()) 564 refresh_cursor = False 565 566 ### Sometimes a line may span multiple files. 567 if lines and subfile_lines and not lines[-1].endswith('\n'): 568 lines[-1] += subfile_lines[0] 569 new_lines = subfile_lines[1:] 570 else: 571 new_lines = subfile_lines 572 lines.extend(new_lines) 573 574 if refresh_cursor: 575 self.refresh_cursor() 576 return lines
Return a list of lines of text.
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
583 def seek(self, position: int) -> None: 584 """ 585 Seek to the beginning of the logs stream. 586 """ 587 existing_subfile_indices = self.get_existing_subfile_indices() 588 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 589 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 590 if position == 0: 591 self._cursor = (min_ix, 0) 592 return 593 594 self._cursor = (max_ix, position) 595 if self._current_file_obj is not None: 596 self._current_file_obj.seek(position)
Seek to the beginning of the logs stream.
599 def flush(self) -> None: 600 """ 601 Flush any open subfiles. 602 """ 603 for subfile_index, subfile_object in self.subfile_objects.items(): 604 if not subfile_object.closed: 605 try: 606 subfile_object.flush() 607 except Exception: 608 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 609 610 if self.redirect_streams: 611 try: 612 sys.stdout.flush() 613 except BrokenPipeError: 614 pass 615 except Exception: 616 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 617 try: 618 sys.stderr.flush() 619 except BrokenPipeError: 620 pass 621 except Exception: 622 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
Flush any open subfiles.
625 def start_log_fd_interception(self): 626 """ 627 Start the file descriptor monitoring threads. 628 """ 629 if not self.write_timestamps: 630 return 631 632 self._stdout_interceptor = FileDescriptorInterceptor( 633 sys.stdout.fileno(), 634 self.get_timestamp_prefix_str, 635 ) 636 self._stderr_interceptor = FileDescriptorInterceptor( 637 sys.stderr.fileno(), 638 self.get_timestamp_prefix_str, 639 ) 640 641 self._stdout_interceptor_thread = Thread( 642 target = self._stdout_interceptor.start_interception, 643 daemon = True, 644 ) 645 self._stderr_interceptor_thread = Thread( 646 target = self._stderr_interceptor.start_interception, 647 daemon = True, 648 ) 649 self._stdout_interceptor_thread.start() 650 self._stderr_interceptor_thread.start() 651 self._intercepting = True 652 653 if '_interceptor_threads' not in self.__dict__: 654 self._interceptor_threads = [] 655 if '_interceptors' not in self.__dict__: 656 self._interceptors = [] 657 self._interceptor_threads.extend([ 658 self._stdout_interceptor_thread, 659 self._stderr_interceptor_thread, 660 ]) 661 self._interceptors.extend([ 662 self._stdout_interceptor, 663 self._stderr_interceptor, 664 ]) 665 self.stop_log_fd_interception(unused_only=True)
Start the file descriptor monitoring threads.
668 def stop_log_fd_interception(self, unused_only: bool = False): 669 """ 670 Stop the file descriptor monitoring threads. 671 """ 672 if not self.write_timestamps: 673 return 674 675 interceptors = self.__dict__.get('_interceptors', []) 676 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 677 678 end_ix = len(interceptors) if not unused_only else -2 679 680 for interceptor in interceptors[:end_ix]: 681 interceptor.stop_interception() 682 del interceptors[:end_ix] 683 684 for thread in interceptor_threads[:end_ix]: 685 try: 686 thread.join() 687 except Exception: 688 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 689 del interceptor_threads[:end_ix]
Stop the file descriptor monitoring threads.
23class FileDescriptorInterceptor: 24 """ 25 A management class to intercept data written to a file descriptor. 26 """ 27 def __init__( 28 self, 29 file_descriptor: int, 30 injection_hook: Callable[[], str], 31 ): 32 """ 33 Parameters 34 ---------- 35 file_descriptor: int 36 The OS file descriptor from which to read. 37 38 injection_hook: Callable[[], str] 39 A callable which returns a string to be injected into the written data. 40 """ 41 self.stop_event = Event() 42 self.injection_hook = injection_hook 43 self.original_file_descriptor = file_descriptor 44 self.new_file_descriptor = os.dup(file_descriptor) 45 self.read_pipe, self.write_pipe = os.pipe() 46 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 47 os.dup2(self.write_pipe, file_descriptor) 48 49 def start_interception(self): 50 """ 51 Read from the file descriptor and write the modified data after injection. 52 53 NOTE: This is blocking and is meant to be run in a thread. 54 """ 55 os.set_blocking(self.read_pipe, False) 56 os.set_blocking(self.signal_read_pipe, False) 57 is_first_read = True 58 while not self.stop_event.is_set(): 59 try: 60 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 61 if self.signal_read_pipe in rlist: 62 break 63 if not rlist: 64 continue 65 data = os.read(self.read_pipe, 1024) 66 if not data: 67 break 68 except BlockingIOError: 69 continue 70 except OSError as e: 71 if e.errno == errno.EBADF: 72 ### File descriptor is closed. 73 pass 74 elif e.errno == errno.EINTR: 75 continue # Interrupted system call, just try again 76 else: 77 warn(f"OSError in FileDescriptorInterceptor: {e}") 78 break 79 80 try: 81 last_char_is_newline = data[-1] == b'\n' 82 83 injected_str = self.injection_hook() 84 injected_bytes = injected_str.encode('utf-8') 85 86 if is_first_read: 87 data = b'\n' + data 88 is_first_read = False 89 90 modified_data = ( 91 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 92 if last_char_is_newline 93 else data.replace(b'\n', b'\n' + injected_bytes) 94 ) 95 os.write(self.new_file_descriptor, modified_data) 96 except (BrokenPipeError, OSError): 97 break 98 except Exception: 99 with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 100 f.write(traceback.format_exc()) 101 break 102 103 104 def stop_interception(self): 105 """ 106 Close the new file descriptors. 107 """ 108 self.stop_event.set() 109 os.write(self.signal_write_pipe, b'\0') 110 try: 111 os.close(self.new_file_descriptor) 112 except OSError as e: 113 if e.errno != FD_CLOSED: 114 warn( 115 "Error while trying to close the duplicated file descriptor:\n" 116 + f"{traceback.format_exc()}" 117 ) 118 119 try: 120 os.close(self.write_pipe) 121 except OSError as e: 122 if e.errno != FD_CLOSED: 123 warn( 124 "Error while trying to close the write-pipe " 125 + "to the intercepted file descriptor:\n" 126 + f"{traceback.format_exc()}" 127 ) 128 try: 129 os.close(self.read_pipe) 130 except OSError as e: 131 if e.errno != FD_CLOSED: 132 warn( 133 "Error while trying to close the read-pipe " 134 + "to the intercepted file descriptor:\n" 135 + f"{traceback.format_exc()}" 136 ) 137 138 try: 139 os.close(self.signal_read_pipe) 140 except OSError as e: 141 if e.errno != FD_CLOSED: 142 warn( 143 "Error while trying to close the signal-read-pipe " 144 + "to the intercepted file descriptor:\n" 145 + f"{traceback.format_exc()}" 146 ) 147 148 try: 149 os.close(self.signal_write_pipe) 150 except OSError as e: 151 if e.errno != FD_CLOSED: 152 warn( 153 "Error while trying to close the signal-write-pipe " 154 + "to the intercepted file descriptor:\n" 155 + f"{traceback.format_exc()}" 156 )
A management class to intercept data written to a file descriptor.
27 def __init__( 28 self, 29 file_descriptor: int, 30 injection_hook: Callable[[], str], 31 ): 32 """ 33 Parameters 34 ---------- 35 file_descriptor: int 36 The OS file descriptor from which to read. 37 38 injection_hook: Callable[[], str] 39 A callable which returns a string to be injected into the written data. 40 """ 41 self.stop_event = Event() 42 self.injection_hook = injection_hook 43 self.original_file_descriptor = file_descriptor 44 self.new_file_descriptor = os.dup(file_descriptor) 45 self.read_pipe, self.write_pipe = os.pipe() 46 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 47 os.dup2(self.write_pipe, file_descriptor)
Parameters
- file_descriptor (int): The OS file descriptor from which to read.
- injection_hook (Callable[[], str]): A callable which returns a string to be injected into the written data.
49 def start_interception(self): 50 """ 51 Read from the file descriptor and write the modified data after injection. 52 53 NOTE: This is blocking and is meant to be run in a thread. 54 """ 55 os.set_blocking(self.read_pipe, False) 56 os.set_blocking(self.signal_read_pipe, False) 57 is_first_read = True 58 while not self.stop_event.is_set(): 59 try: 60 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 61 if self.signal_read_pipe in rlist: 62 break 63 if not rlist: 64 continue 65 data = os.read(self.read_pipe, 1024) 66 if not data: 67 break 68 except BlockingIOError: 69 continue 70 except OSError as e: 71 if e.errno == errno.EBADF: 72 ### File descriptor is closed. 73 pass 74 elif e.errno == errno.EINTR: 75 continue # Interrupted system call, just try again 76 else: 77 warn(f"OSError in FileDescriptorInterceptor: {e}") 78 break 79 80 try: 81 last_char_is_newline = data[-1] == b'\n' 82 83 injected_str = self.injection_hook() 84 injected_bytes = injected_str.encode('utf-8') 85 86 if is_first_read: 87 data = b'\n' + data 88 is_first_read = False 89 90 modified_data = ( 91 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 92 if last_char_is_newline 93 else data.replace(b'\n', b'\n' + injected_bytes) 94 ) 95 os.write(self.new_file_descriptor, modified_data) 96 except (BrokenPipeError, OSError): 97 break 98 except Exception: 99 with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 100 f.write(traceback.format_exc()) 101 break
Read from the file descriptor and write the modified data after injection.
NOTE: This is blocking and is meant to be run in a thread.
104 def stop_interception(self): 105 """ 106 Close the new file descriptors. 107 """ 108 self.stop_event.set() 109 os.write(self.signal_write_pipe, b'\0') 110 try: 111 os.close(self.new_file_descriptor) 112 except OSError as e: 113 if e.errno != FD_CLOSED: 114 warn( 115 "Error while trying to close the duplicated file descriptor:\n" 116 + f"{traceback.format_exc()}" 117 ) 118 119 try: 120 os.close(self.write_pipe) 121 except OSError as e: 122 if e.errno != FD_CLOSED: 123 warn( 124 "Error while trying to close the write-pipe " 125 + "to the intercepted file descriptor:\n" 126 + f"{traceback.format_exc()}" 127 ) 128 try: 129 os.close(self.read_pipe) 130 except OSError as e: 131 if e.errno != FD_CLOSED: 132 warn( 133 "Error while trying to close the read-pipe " 134 + "to the intercepted file descriptor:\n" 135 + f"{traceback.format_exc()}" 136 ) 137 138 try: 139 os.close(self.signal_read_pipe) 140 except OSError as e: 141 if e.errno != FD_CLOSED: 142 warn( 143 "Error while trying to close the signal-read-pipe " 144 + "to the intercepted file descriptor:\n" 145 + f"{traceback.format_exc()}" 146 ) 147 148 try: 149 os.close(self.signal_write_pipe) 150 except OSError as e: 151 if e.errno != FD_CLOSED: 152 warn( 153 "Error while trying to close the signal-write-pipe " 154 + "to the intercepted file descriptor:\n" 155 + f"{traceback.format_exc()}" 156 )
Close the new file descriptors.