meerschaum.jobs
Higher-level utilities for managing meerschaum.utils.daemon.Daemon.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Higher-level utilities for managing `meerschaum.utils.daemon.Daemon`. 7""" 8 9import meerschaum as mrsm 10from meerschaum.utils.typing import Dict, Optional, List, SuccessTuple 11 12from meerschaum.jobs._Job import Job, StopMonitoringLogs 13from meerschaum.jobs._Executor import Executor 14 15__all__ = ( 16 'Job', 17 'StopMonitoringLogs', 18 'systemd', 19 'get_jobs', 20 'get_filtered_jobs', 21 'get_restart_jobs', 22 'get_running_jobs', 23 'get_stopped_jobs', 24 'get_paused_jobs', 25 'get_restart_jobs', 26 'make_executor', 27 'Executor', 28 'check_restart_jobs', 29 'start_check_jobs_thread', 30 'stop_check_jobs_thread', 31) 32 33executor_types: List[str] = ['api', 'local'] 34 35 36def get_jobs( 37 executor_keys: Optional[str] = None, 38 include_hidden: bool = False, 39 combine_local_and_systemd: bool = True, 40 debug: bool = False, 41) -> Dict[str, Job]: 42 """ 43 Return a dictionary of the existing jobs. 44 45 Parameters 46 ---------- 47 executor_keys: Optional[str], default None 48 If provided, return remote jobs on the given API instance. 49 Otherwise return local jobs. 50 51 include_hidden: bool, default False 52 If `True`, include jobs with the `hidden` attribute. 53 54 Returns 55 ------- 56 A dictionary mapping job names to jobs. 57 """ 58 from meerschaum.connectors.parse import parse_executor_keys 59 executor_keys = executor_keys or get_executor_keys_from_context() 60 61 include_local_and_system = ( 62 combine_local_and_systemd 63 and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd') 64 and get_executor_keys_from_context() == 'systemd' 65 ) 66 67 def _get_local_jobs(): 68 from meerschaum.utils.daemon import get_daemons 69 daemons = get_daemons() 70 jobs = { 71 daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local') 72 for daemon in daemons 73 } 74 return { 75 name: job 76 for name, job in jobs.items() 77 if (include_hidden or not job.hidden) and not job._is_externally_managed 78 79 } 80 81 def _get_systemd_jobs(): 82 from meerschaum.jobs.systemd import SystemdExecutor 83 conn = SystemdExecutor('systemd') 84 jobs = conn.get_jobs(debug=debug) 85 return { 86 name: job 87 for name, job in jobs.items() 88 if include_hidden or not job.hidden 89 } 90 91 if include_local_and_system: 92 local_jobs = _get_local_jobs() 93 systemd_jobs = _get_systemd_jobs() 94 shared_jobs = set(local_jobs) & set(systemd_jobs) 95 if shared_jobs: 96 from meerschaum.utils.misc import items_str 97 from meerschaum.utils.warnings import warn 98 warn( 99 "Job" 100 + ('s' if len(shared_jobs) != 1 else '') 101 + f" {items_str(list(shared_jobs))} " 102 + "exist" 103 + ('s' if len(shared_jobs) == 1 else '') 104 + " in both `local` and `systemd`.", 105 stack=False, 106 ) 107 return {**local_jobs, **systemd_jobs} 108 109 if executor_keys == 'local': 110 return _get_local_jobs() 111 112 if executor_keys == 'systemd': 113 return _get_systemd_jobs() 114 115 try: 116 _ = parse_executor_keys(executor_keys, construct=False) 117 conn = parse_executor_keys(executor_keys) 118 jobs = conn.get_jobs(debug=debug) 119 return { 120 name: job 121 for name, job in jobs.items() 122 if include_hidden or not job.hidden 123 } 124 except Exception: 125 return {} 126 127 128def get_filtered_jobs( 129 executor_keys: Optional[str] = None, 130 filter_list: Optional[List[str]] = None, 131 include_hidden: bool = False, 132 combine_local_and_systemd: bool = True, 133 warn: bool = False, 134 debug: bool = False, 135) -> Dict[str, Job]: 136 """ 137 Return a list of jobs filtered by the user. 138 """ 139 from meerschaum.utils.warnings import warn as _warn 140 jobs = get_jobs( 141 executor_keys, 142 include_hidden=True, 143 combine_local_and_systemd=combine_local_and_systemd, 144 debug=debug, 145 ) 146 if not filter_list: 147 return { 148 name: job 149 for name, job in jobs.items() 150 if include_hidden or not job.hidden 151 } 152 153 jobs_to_return = {} 154 filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')] 155 filter_list_with_underscores = [name for name in filter_list if name.startswith('_')] 156 if ( 157 filter_list_without_underscores and not filter_list_with_underscores 158 or filter_list_with_underscores and not filter_list_without_underscores 159 ): 160 pass 161 for name in filter_list: 162 job = jobs.get(name, None) 163 if job is None: 164 if warn: 165 _warn( 166 f"Job '{name}' does not exist.", 167 stack=False, 168 ) 169 continue 170 jobs_to_return[name] = job 171 172 if not jobs_to_return and filter_list_with_underscores: 173 names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores] 174 return { 175 name: job 176 for name, job in jobs.items() 177 if name not in names_to_exclude 178 } 179 180 return jobs_to_return 181 182 183def get_restart_jobs( 184 executor_keys: Optional[str] = None, 185 jobs: Optional[Dict[str, Job]] = None, 186 include_hidden: bool = False, 187 combine_local_and_systemd: bool = True, 188 debug: bool = False, 189) -> Dict[str, Job]: 190 """ 191 Return jobs which were created with `--restart` or `--loop`. 192 """ 193 if jobs is None: 194 jobs = get_jobs( 195 executor_keys, 196 include_hidden=include_hidden, 197 combine_local_and_systemd=combine_local_and_systemd, 198 debug=debug, 199 ) 200 201 return { 202 name: job 203 for name, job in jobs.items() 204 if job.restart 205 } 206 207 208def get_running_jobs( 209 executor_keys: Optional[str] = None, 210 jobs: Optional[Dict[str, Job]] = None, 211 include_hidden: bool = False, 212 combine_local_and_systemd: bool = True, 213 debug: bool = False, 214) -> Dict[str, Job]: 215 """ 216 Return a dictionary of running jobs. 217 """ 218 if jobs is None: 219 jobs = get_jobs( 220 executor_keys, 221 include_hidden=include_hidden, 222 combine_local_and_systemd=combine_local_and_systemd, 223 debug=debug, 224 ) 225 226 return { 227 name: job 228 for name, job in jobs.items() 229 if job.status == 'running' 230 } 231 232 233def get_paused_jobs( 234 executor_keys: Optional[str] = None, 235 jobs: Optional[Dict[str, Job]] = None, 236 include_hidden: bool = False, 237 combine_local_and_systemd: bool = True, 238 debug: bool = False, 239) -> Dict[str, Job]: 240 """ 241 Return a dictionary of paused jobs. 242 """ 243 if jobs is None: 244 jobs = get_jobs( 245 executor_keys, 246 include_hidden=include_hidden, 247 combine_local_and_systemd=combine_local_and_systemd, 248 debug=debug, 249 ) 250 251 return { 252 name: job 253 for name, job in jobs.items() 254 if job.status == 'paused' 255 } 256 257 258def get_stopped_jobs( 259 executor_keys: Optional[str] = None, 260 jobs: Optional[Dict[str, Job]] = None, 261 include_hidden: bool = False, 262 combine_local_and_systemd: bool = True, 263 debug: bool = False, 264) -> Dict[str, Job]: 265 """ 266 Return a dictionary of stopped jobs. 267 """ 268 if jobs is None: 269 jobs = get_jobs( 270 executor_keys, 271 include_hidden=include_hidden, 272 combine_local_and_systemd=combine_local_and_systemd, 273 debug=debug, 274 ) 275 276 return { 277 name: job 278 for name, job in jobs.items() 279 if job.status == 'stopped' 280 } 281 282 283def make_executor(cls): 284 """ 285 Register a class as an `Executor`. 286 """ 287 import re 288 from meerschaum.connectors import make_connector 289 suffix_regex = r'executor$' 290 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 291 if typ not in executor_types: 292 executor_types.append(typ) 293 return make_connector(cls, _is_executor=True) 294 295 296def check_restart_jobs( 297 executor_keys: Optional[str] = 'local', 298 jobs: Optional[Dict[str, Job]] = None, 299 include_hidden: bool = True, 300 silent: bool = False, 301 debug: bool = False, 302) -> SuccessTuple: 303 """ 304 Restart any stopped jobs which were created with `--restart`. 305 306 Parameters 307 ---------- 308 executor_keys: Optional[str], default None 309 If provided, check jobs on the given remote API instance. 310 Otherwise check local jobs. 311 312 include_hidden: bool, default True 313 If `True`, include hidden jobs in the check. 314 315 silent: bool, default False 316 If `True`, do not print the restart success message. 317 """ 318 from meerschaum.utils.misc import items_str 319 320 if jobs is None: 321 jobs = get_jobs( 322 executor_keys, 323 include_hidden=include_hidden, 324 combine_local_and_systemd=False, 325 debug=debug, 326 ) 327 328 if not jobs: 329 return True, "No jobs to restart." 330 331 results = {} 332 for name, job in jobs.items(): 333 check_success, check_msg = job.check_restart() 334 results[job.name] = (check_success, check_msg) 335 if not silent: 336 mrsm.pprint((check_success, check_msg)) 337 338 success_names = [name for name, (check_success, check_msg) in results.items() if check_success] 339 fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success] 340 success = len(success_names) == len(jobs) 341 msg = ( 342 ( 343 "Successfully restarted job" 344 + ('s' if len(success_names) != 1 else '') 345 + ' ' + items_str(success_names) + '.' 346 ) 347 if success 348 else ( 349 "Failed to restart job" 350 + ('s' if len(success_names) != 1 else '') 351 + ' ' + items_str(fail_names) + '.' 352 ) 353 ) 354 return success, msg 355 356 357def _check_restart_jobs_against_lock(*args, **kwargs): 358 import meerschaum.config.paths as paths 359 fasteners = mrsm.attempt_import('fasteners', lazy=False) 360 lock = fasteners.InterProcessLock(paths.CHECK_JOBS_LOCK_PATH) 361 with lock: 362 check_restart_jobs(*args, **kwargs) 363 364 365_check_loop_stop_thread = None 366def start_check_jobs_thread(): 367 """ 368 Start a thread to regularly monitor jobs. 369 """ 370 import atexit 371 from functools import partial 372 from meerschaum.utils.threading import RepeatTimer 373 from meerschaum._internal.static import STATIC_CONFIG 374 375 global _check_loop_stop_thread 376 sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds'] 377 378 _check_loop_stop_thread = RepeatTimer( 379 sleep_seconds, 380 partial( 381 _check_restart_jobs_against_lock, 382 silent=True, 383 ) 384 ) 385 _check_loop_stop_thread.daemon = True 386 atexit.register(stop_check_jobs_thread) 387 388 _check_loop_stop_thread.start() 389 return _check_loop_stop_thread 390 391 392def stop_check_jobs_thread(): 393 """ 394 Stop the job monitoring thread. 395 """ 396 import meerschaum.config.paths as paths 397 from meerschaum.utils.warnings import warn 398 if _check_loop_stop_thread is None: 399 return 400 401 _check_loop_stop_thread.cancel() 402 403 try: 404 if paths.CHECK_JOBS_LOCK_PATH.exists(): 405 paths.CHECK_JOBS_LOCK_PATH.unlink() 406 except Exception as e: 407 warn(f"Failed to remove check jobs lock file:\n{e}") 408 409 410_context_keys = None 411def get_executor_keys_from_context() -> str: 412 """ 413 If we are running on the host with the default root, default to `'systemd'`. 414 Otherwise return `'local'`. 415 """ 416 global _context_keys 417 418 if _context_keys is not None: 419 return _context_keys 420 421 import meerschaum.config.paths as paths 422 from meerschaum.config import get_config 423 from meerschaum.utils.misc import is_systemd_available 424 from meerschaum.utils.packages import is_installed 425 426 configured_executor = get_config('meerschaum', 'executor', warn=False) 427 if configured_executor is not None: 428 return configured_executor 429 430 _context_keys = ( 431 'systemd' 432 if ( 433 is_systemd_available() 434 and paths.ROOT_DIR_PATH == paths.DEFAULT_ROOT_DIR_PATH 435 and is_installed('meerschaum', venv=None, allow_outside_venv=False) 436 ) 437 else 'local' 438 ) 439 return _context_keys 440 441 442def _install_healthcheck_job() -> SuccessTuple: 443 """ 444 Install the systemd job which checks local jobs. 445 """ 446 from meerschaum.config import get_config 447 448 enable_healthcheck = get_config('system', 'experimental', 'systemd_healthcheck') 449 if not enable_healthcheck: 450 return False, "Local healthcheck is disabled." 451 452 if get_executor_keys_from_context() != 'systemd': 453 return False, "Not running systemd." 454 455 job = Job( 456 '.local-healthcheck', 457 ['restart', 'jobs', '-e', 'local', '--loop', '--min-seconds', '60'], 458 executor_keys='systemd', 459 ) 460 return job.start()
70class Job: 71 """ 72 Manage a `meerschaum.utils.daemon.Daemon`, locally or remotely via the API. 73 """ 74 75 def __init__( 76 self, 77 name: str, 78 sysargs: Union[List[str], str, None] = None, 79 env: Optional[Dict[str, str]] = None, 80 executor_keys: Optional[str] = None, 81 delete_after_completion: bool = False, 82 refresh_seconds: Union[int, float, None] = None, 83 _properties: Optional[Dict[str, Any]] = None, 84 _rotating_log=None, 85 _stdin_file=None, 86 _status_hook: Optional[Callable[[], str]] = None, 87 _result_hook: Optional[Callable[[], SuccessTuple]] = None, 88 _externally_managed: bool = False, 89 ): 90 """ 91 Create a new job to manage a `meerschaum.utils.daemon.Daemon`. 92 93 Parameters 94 ---------- 95 name: str 96 The name of the job to be created. 97 This will also be used as the Daemon ID. 98 99 sysargs: Union[List[str], str, None], default None 100 The sysargs of the command to be executed, e.g. 'start api'. 101 102 env: Optional[Dict[str, str]], default None 103 If provided, set these environment variables in the job's process. 104 105 executor_keys: Optional[str], default None 106 If provided, execute the job remotely on an API instance, e.g. 'api:main'. 107 108 delete_after_completion: bool, default False 109 If `True`, delete this job when it has finished executing. 110 111 refresh_seconds: Union[int, float, None], default None 112 The number of seconds to sleep between refreshes. 113 Defaults to the configured value `system.cli.refresh_seconds`. 114 115 _properties: Optional[Dict[str, Any]], default None 116 If provided, use this to patch the daemon's properties. 117 """ 118 from meerschaum.utils.daemon import Daemon 119 for char in BANNED_CHARS: 120 if char in name: 121 raise ValueError(f"Invalid name: ({char}) is not allowed.") 122 123 if isinstance(sysargs, str): 124 sysargs = shlex.split(sysargs) 125 126 and_key = STATIC_CONFIG['system']['arguments']['and_key'] 127 escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key'] 128 if sysargs: 129 sysargs = [ 130 (arg if arg != escaped_and_key else and_key) 131 for arg in sysargs 132 ] 133 134 ### NOTE: 'local' and 'systemd' executors are being coalesced. 135 if executor_keys is None: 136 from meerschaum.jobs import get_executor_keys_from_context 137 executor_keys = get_executor_keys_from_context() 138 139 self.executor_keys = executor_keys 140 self.name = name 141 self.refresh_seconds = ( 142 refresh_seconds 143 if refresh_seconds is not None 144 else mrsm.get_config('system', 'cli', 'refresh_seconds') 145 ) 146 try: 147 self._daemon = ( 148 Daemon(daemon_id=name) 149 if executor_keys == 'local' 150 else None 151 ) 152 except Exception: 153 self._daemon = None 154 155 ### Handle any injected dependencies. 156 if _rotating_log is not None: 157 self._rotating_log = _rotating_log 158 if self._daemon is not None: 159 self._daemon._rotating_log = _rotating_log 160 161 if _stdin_file is not None: 162 self._stdin_file = _stdin_file 163 if self._daemon is not None: 164 self._daemon._stdin_file = _stdin_file 165 self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path 166 167 if _status_hook is not None: 168 self._status_hook = _status_hook 169 170 if _result_hook is not None: 171 self._result_hook = _result_hook 172 173 self._externally_managed = _externally_managed 174 self._properties_patch = _properties or {} 175 if _externally_managed: 176 self._properties_patch.update({'externally_managed': _externally_managed}) 177 178 if env: 179 self._properties_patch.update({'env': env}) 180 181 if delete_after_completion: 182 self._properties_patch.update({'delete_after_completion': delete_after_completion}) 183 184 daemon_sysargs = ( 185 self._daemon.properties.get('target', {}).get('args', [None])[0] 186 if self._daemon is not None 187 else None 188 ) 189 190 if daemon_sysargs and sysargs and daemon_sysargs != sysargs: 191 warn("Given sysargs differ from existing sysargs.") 192 193 self._sysargs = [ 194 arg 195 for arg in (daemon_sysargs or sysargs or []) 196 if arg not in ('-d', '--daemon') 197 ] 198 for restart_flag in RESTART_FLAGS: 199 if restart_flag in self._sysargs: 200 self._properties_patch.update({'restart': True}) 201 break 202 203 @staticmethod 204 def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job: 205 """ 206 Build a `Job` from the PID of a running Meerschaum process. 207 208 Parameters 209 ---------- 210 pid: int 211 The PID of the process. 212 213 executor_keys: Optional[str], default None 214 The executor keys to assign to the job. 215 """ 216 psutil = mrsm.attempt_import('psutil') 217 try: 218 process = psutil.Process(pid) 219 except psutil.NoSuchProcess as e: 220 warn(f"Process with PID {pid} does not exist.", stack=False) 221 raise e 222 223 command_args = process.cmdline() 224 is_daemon = command_args[1] == '-c' 225 226 if is_daemon: 227 daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '') 228 root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None) 229 if root_dir is None: 230 root_dir = paths.ROOT_DIR_PATH 231 else: 232 root_dir = pathlib.Path(root_dir) 233 jobs_dir = root_dir / paths.DAEMON_RESOURCES_PATH.name 234 daemon_dir = jobs_dir / daemon_id 235 pid_file = daemon_dir / 'process.pid' 236 237 if pid_file.exists(): 238 with open(pid_file, 'r', encoding='utf-8') as f: 239 daemon_pid = int(f.read()) 240 241 if pid != daemon_pid: 242 raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}") 243 else: 244 raise EnvironmentError(f"Is job '{daemon_id}' running?") 245 246 return Job(daemon_id, executor_keys=executor_keys) 247 248 from meerschaum._internal.arguments._parse_arguments import parse_arguments 249 from meerschaum.utils.daemon import get_new_daemon_name 250 251 mrsm_ix = 0 252 for i, arg in enumerate(command_args): 253 if 'mrsm' in arg or 'meerschaum' in arg.lower(): 254 mrsm_ix = i 255 break 256 257 sysargs = command_args[mrsm_ix+1:] 258 kwargs = parse_arguments(sysargs) 259 name = kwargs.get('name', get_new_daemon_name()) 260 return Job(name, sysargs, executor_keys=executor_keys) 261 262 def start(self, debug: bool = False) -> SuccessTuple: 263 """ 264 Start the job's daemon. 265 """ 266 if self.executor is not None: 267 if not self.exists(debug=debug): 268 return self.executor.create_job( 269 self.name, 270 self.sysargs, 271 properties=self.daemon.properties, 272 debug=debug, 273 ) 274 return self.executor.start_job(self.name, debug=debug) 275 276 if self.is_running(): 277 return True, f"{self} is already running." 278 279 success, msg = self.daemon.run( 280 keep_daemon_output=(not self.delete_after_completion), 281 allow_dirty_run=True, 282 ) 283 if not success: 284 return success, msg 285 286 return success, f"Started {self}." 287 288 def stop( 289 self, 290 timeout_seconds: Union[int, float, None] = None, 291 debug: bool = False, 292 ) -> SuccessTuple: 293 """ 294 Stop the job's daemon. 295 """ 296 if self.executor is not None: 297 return self.executor.stop_job(self.name, debug=debug) 298 299 if self.daemon.status == 'stopped': 300 if not self.restart: 301 return True, f"{self} is not running." 302 elif self.stop_time is not None: 303 return True, f"{self} will not restart until manually started." 304 305 quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds) 306 if quit_success: 307 return quit_success, f"Stopped {self}." 308 309 warn( 310 f"Failed to gracefully quit {self}.", 311 stack=False, 312 ) 313 kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds) 314 if not kill_success: 315 return kill_success, kill_msg 316 317 return kill_success, f"Killed {self}." 318 319 def pause( 320 self, 321 timeout_seconds: Union[int, float, None] = None, 322 debug: bool = False, 323 ) -> SuccessTuple: 324 """ 325 Pause the job's daemon. 326 """ 327 if self.executor is not None: 328 return self.executor.pause_job(self.name, debug=debug) 329 330 pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds) 331 if not pause_success: 332 return pause_success, pause_msg 333 334 return pause_success, f"Paused {self}." 335 336 def delete(self, debug: bool = False) -> SuccessTuple: 337 """ 338 Delete the job and its daemon. 339 """ 340 if self.executor is not None: 341 return self.executor.delete_job(self.name, debug=debug) 342 343 if self.is_running(): 344 stop_success, stop_msg = self.stop() 345 if not stop_success: 346 return stop_success, stop_msg 347 348 cleanup_success, cleanup_msg = self.daemon.cleanup() 349 if not cleanup_success: 350 return cleanup_success, cleanup_msg 351 352 _ = self.daemon._properties.pop('result', None) 353 return cleanup_success, f"Deleted {self}." 354 355 def is_running(self) -> bool: 356 """ 357 Determine whether the job's daemon is running. 358 """ 359 return self.status == 'running' 360 361 def exists(self, debug: bool = False) -> bool: 362 """ 363 Determine whether the job exists. 364 """ 365 if self.executor is not None: 366 return self.executor.get_job_exists(self.name, debug=debug) 367 368 return self.daemon.path.exists() 369 370 def get_logs(self) -> Union[str, None]: 371 """ 372 Return the output text of the job's daemon. 373 """ 374 if self.executor is not None: 375 return self.executor.get_logs(self.name) 376 377 return self.daemon.log_text 378 379 def monitor_logs( 380 self, 381 callback_function: Callable[[str], None] = _default_stdout_callback, 382 input_callback_function: Optional[Callable[[], str]] = None, 383 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 384 stop_event: Optional[asyncio.Event] = None, 385 stop_on_exit: bool = False, 386 strip_timestamps: bool = False, 387 accept_input: bool = True, 388 debug: bool = False, 389 _logs_path: Optional[pathlib.Path] = None, 390 _log=None, 391 _stdin_file=None, 392 _wait_if_stopped: bool = True, 393 ): 394 """ 395 Monitor the job's log files and execute a callback on new lines. 396 397 Parameters 398 ---------- 399 callback_function: Callable[[str], None], default partial(print, end='') 400 The callback to execute as new data comes in. 401 Defaults to printing the output directly to `stdout`. 402 403 input_callback_function: Optional[Callable[[], str]], default None 404 If provided, execute this callback when the daemon is blocking on stdin. 405 Defaults to `sys.stdin.readline()`. 406 407 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 408 If provided, execute this callback when the daemon stops. 409 The job's SuccessTuple will be passed to the callback. 410 411 stop_event: Optional[asyncio.Event], default None 412 If provided, stop monitoring when this event is set. 413 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 414 from within `callback_function` to stop monitoring. 415 416 stop_on_exit: bool, default False 417 If `True`, stop monitoring when the job stops. 418 419 strip_timestamps: bool, default False 420 If `True`, remove leading timestamps from lines. 421 422 accept_input: bool, default True 423 If `True`, accept input when the daemon blocks on stdin. 424 """ 425 if self.executor is not None: 426 self.executor.monitor_logs( 427 self.name, 428 callback_function, 429 input_callback_function=input_callback_function, 430 stop_callback_function=stop_callback_function, 431 stop_on_exit=stop_on_exit, 432 accept_input=accept_input, 433 strip_timestamps=strip_timestamps, 434 debug=debug, 435 ) 436 return 437 438 monitor_logs_coroutine = self.monitor_logs_async( 439 callback_function=callback_function, 440 input_callback_function=input_callback_function, 441 stop_callback_function=stop_callback_function, 442 stop_event=stop_event, 443 stop_on_exit=stop_on_exit, 444 strip_timestamps=strip_timestamps, 445 accept_input=accept_input, 446 debug=debug, 447 _logs_path=_logs_path, 448 _log=_log, 449 _stdin_file=_stdin_file, 450 _wait_if_stopped=_wait_if_stopped, 451 ) 452 return asyncio.run(monitor_logs_coroutine) 453 454 async def monitor_logs_async( 455 self, 456 callback_function: Callable[[str], None] = _default_stdout_callback, 457 input_callback_function: Optional[Callable[[], str]] = None, 458 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 459 stop_event: Optional[asyncio.Event] = None, 460 stop_on_exit: bool = False, 461 strip_timestamps: bool = False, 462 accept_input: bool = True, 463 debug: bool = False, 464 _logs_path: Optional[pathlib.Path] = None, 465 _log=None, 466 _stdin_file=None, 467 _wait_if_stopped: bool = True, 468 ): 469 """ 470 Monitor the job's log files and await a callback on new lines. 471 472 Parameters 473 ---------- 474 callback_function: Callable[[str], None], default _default_stdout_callback 475 The callback to execute as new data comes in. 476 Defaults to printing the output directly to `stdout`. 477 478 input_callback_function: Optional[Callable[[], str]], default None 479 If provided, execute this callback when the daemon is blocking on stdin. 480 Defaults to `sys.stdin.readline()`. 481 482 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 483 If provided, execute this callback when the daemon stops. 484 The job's SuccessTuple will be passed to the callback. 485 486 stop_event: Optional[asyncio.Event], default None 487 If provided, stop monitoring when this event is set. 488 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 489 from within `callback_function` to stop monitoring. 490 491 stop_on_exit: bool, default False 492 If `True`, stop monitoring when the job stops. 493 494 strip_timestamps: bool, default False 495 If `True`, remove leading timestamps from lines. 496 497 accept_input: bool, default True 498 If `True`, accept input when the daemon blocks on stdin. 499 """ 500 from meerschaum.utils.prompt import prompt 501 502 def default_input_callback_function(): 503 prompt_kwargs = self.get_prompt_kwargs(debug=debug) 504 if prompt_kwargs: 505 answer = prompt(**prompt_kwargs) 506 return answer + '\n' 507 return sys.stdin.readline() 508 509 if input_callback_function is None: 510 input_callback_function = default_input_callback_function 511 512 if self.executor is not None: 513 await self.executor.monitor_logs_async( 514 self.name, 515 callback_function, 516 input_callback_function=input_callback_function, 517 stop_callback_function=stop_callback_function, 518 stop_on_exit=stop_on_exit, 519 strip_timestamps=strip_timestamps, 520 accept_input=accept_input, 521 debug=debug, 522 ) 523 return 524 525 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 526 527 events = { 528 'user': stop_event, 529 'stopped': asyncio.Event(), 530 'stop_token': asyncio.Event(), 531 'stop_exception': asyncio.Event(), 532 'stopped_timeout': asyncio.Event(), 533 } 534 combined_event = asyncio.Event() 535 emitted_text = False 536 stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file 537 538 async def check_job_status(): 539 if not stop_on_exit: 540 return 541 542 nonlocal emitted_text 543 544 sleep_time = 0.1 545 while sleep_time < 0.2: 546 if self.status == 'stopped': 547 if not emitted_text and _wait_if_stopped: 548 await asyncio.sleep(sleep_time) 549 sleep_time = round(sleep_time * 1.1, 3) 550 continue 551 552 if stop_callback_function is not None: 553 try: 554 if asyncio.iscoroutinefunction(stop_callback_function): 555 await stop_callback_function(self.result) 556 else: 557 stop_callback_function(self.result) 558 except asyncio.exceptions.CancelledError: 559 break 560 except Exception: 561 warn(traceback.format_exc()) 562 563 if stop_on_exit: 564 events['stopped'].set() 565 566 break 567 await asyncio.sleep(0.1) 568 569 events['stopped_timeout'].set() 570 571 async def check_blocking_on_input(): 572 while True: 573 if not emitted_text or not self.is_blocking_on_stdin(): 574 try: 575 await asyncio.sleep(self.refresh_seconds) 576 except asyncio.exceptions.CancelledError: 577 break 578 continue 579 580 if not self.is_running(): 581 break 582 583 await emit_latest_lines() 584 585 try: 586 print('', end='', flush=True) 587 if asyncio.iscoroutinefunction(input_callback_function): 588 data = await input_callback_function() 589 else: 590 loop = asyncio.get_running_loop() 591 data = await loop.run_in_executor(None, input_callback_function) 592 except KeyboardInterrupt: 593 break 594 # if not data.endswith('\n'): 595 # data += '\n' 596 597 stdin_file.write(data) 598 await asyncio.sleep(self.refresh_seconds) 599 600 async def combine_events(): 601 event_tasks = [ 602 asyncio.create_task(event.wait()) 603 for event in events.values() 604 if event is not None 605 ] 606 if not event_tasks: 607 return 608 609 try: 610 done, pending = await asyncio.wait( 611 event_tasks, 612 return_when=asyncio.FIRST_COMPLETED, 613 ) 614 for task in pending: 615 task.cancel() 616 except asyncio.exceptions.CancelledError: 617 pass 618 finally: 619 combined_event.set() 620 621 check_job_status_task = asyncio.create_task(check_job_status()) 622 check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input()) 623 combine_events_task = asyncio.create_task(combine_events()) 624 625 log = _log if _log is not None else self.daemon.rotating_log 626 lines_to_show = ( 627 self.daemon.properties.get( 628 'logs', {} 629 ).get( 630 'lines_to_show', get_config('jobs', 'logs', 'lines_to_show') 631 ) 632 ) 633 634 async def emit_latest_lines(): 635 nonlocal emitted_text 636 nonlocal stop_event 637 lines = log.readlines() 638 for line in lines[(-1 * lines_to_show):]: 639 if stop_event is not None and stop_event.is_set(): 640 return 641 642 line_stripped_extra = strip_timestamp_from_line(line.strip()) 643 line_stripped = strip_timestamp_from_line(line) 644 645 if line_stripped_extra == STOP_TOKEN: 646 events['stop_token'].set() 647 return 648 649 if line_stripped_extra == CLEAR_TOKEN: 650 clear_screen(debug=debug) 651 continue 652 653 if line_stripped_extra == FLUSH_TOKEN.strip(): 654 line_stripped = '' 655 line = '' 656 657 if strip_timestamps: 658 line = line_stripped 659 660 try: 661 if asyncio.iscoroutinefunction(callback_function): 662 await callback_function(line) 663 else: 664 callback_function(line) 665 emitted_text = True 666 except StopMonitoringLogs: 667 events['stop_exception'].set() 668 return 669 except Exception: 670 warn(f"Error in logs callback:\n{traceback.format_exc()}") 671 672 await emit_latest_lines() 673 674 tasks = ( 675 [check_job_status_task] 676 + ([check_blocking_on_input_task] if accept_input else []) 677 + [combine_events_task] 678 ) 679 try: 680 _ = asyncio.gather(*tasks, return_exceptions=True) 681 except asyncio.exceptions.CancelledError: 682 raise 683 except Exception: 684 warn(f"Failed to run async checks:\n{traceback.format_exc()}") 685 686 watchfiles = mrsm.attempt_import('watchfiles', lazy=False) 687 dir_path_to_monitor = ( 688 _logs_path 689 or (log.file_path.parent if log else None) 690 or paths.LOGS_RESOURCES_PATH 691 ) 692 async for changes in watchfiles.awatch( 693 dir_path_to_monitor, 694 stop_event=combined_event, 695 ): 696 for change in changes: 697 file_path_str = change[1] 698 file_path = pathlib.Path(file_path_str) 699 latest_subfile_path = log.get_latest_subfile_path() 700 if latest_subfile_path != file_path: 701 continue 702 703 await emit_latest_lines() 704 705 await emit_latest_lines() 706 707 def is_blocking_on_stdin(self, debug: bool = False) -> bool: 708 """ 709 Return whether a job's daemon is blocking on stdin. 710 """ 711 if self.executor is not None: 712 return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug) 713 714 return self.is_running() and self.daemon.blocking_stdin_file_path.exists() 715 716 def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]: 717 """ 718 Return the kwargs to the blocking `prompt()`, if available. 719 """ 720 if self.executor is not None: 721 return self.executor.get_job_prompt_kwargs(self.name, debug=debug) 722 723 if not self.daemon.prompt_kwargs_file_path.exists(): 724 return {} 725 726 try: 727 with open(self.daemon.prompt_kwargs_file_path, 'r', encoding='utf-8') as f: 728 prompt_kwargs = json.load(f) 729 730 return prompt_kwargs 731 732 except Exception: 733 import traceback 734 traceback.print_exc() 735 return {} 736 737 def write_stdin(self, data): 738 """ 739 Write to a job's daemon's `stdin`. 740 """ 741 self.daemon.stdin_file.write(data) 742 743 @property 744 def executor(self) -> Union[Executor, None]: 745 """ 746 If the job is remote, return the connector to the remote API instance. 747 """ 748 return ( 749 mrsm.get_connector(self.executor_keys) 750 if self.executor_keys != 'local' 751 else None 752 ) 753 754 @property 755 def status(self) -> str: 756 """ 757 Return the running status of the job's daemon. 758 """ 759 if '_status_hook' in self.__dict__: 760 return self._status_hook() 761 762 if self.executor is not None: 763 return self.executor.get_job_status(self.name) 764 765 return self.daemon.status 766 767 @property 768 def pid(self) -> Union[int, None]: 769 """ 770 Return the PID of the job's dameon. 771 """ 772 if self.executor is not None: 773 return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None) 774 775 return self.daemon.pid 776 777 @property 778 def restart(self) -> bool: 779 """ 780 Return whether to restart a stopped job. 781 """ 782 if self.executor is not None: 783 return self.executor.get_job_metadata(self.name).get('restart', False) 784 785 return self.daemon.properties.get('restart', False) 786 787 @property 788 def result(self) -> SuccessTuple: 789 """ 790 Return the `SuccessTuple` when the job has terminated. 791 """ 792 if self.is_running(): 793 return True, f"{self} is running." 794 795 if '_result_hook' in self.__dict__: 796 return self._result_hook() 797 798 if self.executor is not None: 799 return ( 800 self.executor.get_job_metadata(self.name) 801 .get('result', (False, "No result available.")) 802 ) 803 804 _result = self.daemon.properties.get('result', None) 805 if _result is None: 806 from meerschaum.utils.daemon.Daemon import _results 807 return _results.get(self.daemon.daemon_id, (False, "No result available.")) 808 809 return tuple(_result) 810 811 @property 812 def sysargs(self) -> List[str]: 813 """ 814 Return the sysargs to use for the Daemon. 815 """ 816 if self._sysargs: 817 return self._sysargs 818 819 if self.executor is not None: 820 return self.executor.get_job_metadata(self.name).get('sysargs', []) 821 822 target_args = self.daemon.target_args 823 if target_args is None: 824 return [] 825 self._sysargs = target_args[0] if len(target_args) > 0 else [] 826 return self._sysargs 827 828 def get_daemon_properties(self) -> Dict[str, Any]: 829 """ 830 Return the `properties` dictionary for the job's daemon. 831 """ 832 remote_properties = ( 833 {} 834 if self.executor is None 835 else self.executor.get_job_properties(self.name) 836 ) 837 return { 838 **remote_properties, 839 **self._properties_patch 840 } 841 842 @property 843 def daemon(self) -> 'Daemon': 844 """ 845 Return the daemon which this job manages. 846 """ 847 from meerschaum.utils.daemon import Daemon 848 if self._daemon is not None and self.executor is None and self._sysargs: 849 return self._daemon 850 851 self._daemon = Daemon( 852 target=entry, 853 target_args=[self._sysargs], 854 target_kw={}, 855 daemon_id=self.name, 856 label=shlex.join(self._sysargs), 857 properties=self.get_daemon_properties(), 858 ) 859 if '_rotating_log' in self.__dict__: 860 self._daemon._rotating_log = self._rotating_log 861 862 if '_stdin_file' in self.__dict__: 863 self._daemon._stdin_file = self._stdin_file 864 self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path 865 866 return self._daemon 867 868 @property 869 def began(self) -> Union[datetime, None]: 870 """ 871 The datetime when the job began running. 872 """ 873 if self.executor is not None: 874 began_str = self.executor.get_job_began(self.name) 875 if began_str is None: 876 return None 877 return ( 878 datetime.fromisoformat(began_str) 879 .astimezone(timezone.utc) 880 .replace(tzinfo=None) 881 ) 882 883 began_str = self.daemon.properties.get('process', {}).get('began', None) 884 if began_str is None: 885 return None 886 887 return datetime.fromisoformat(began_str) 888 889 @property 890 def ended(self) -> Union[datetime, None]: 891 """ 892 The datetime when the job stopped running. 893 """ 894 if self.executor is not None: 895 ended_str = self.executor.get_job_ended(self.name) 896 if ended_str is None: 897 return None 898 return ( 899 datetime.fromisoformat(ended_str) 900 .astimezone(timezone.utc) 901 .replace(tzinfo=None) 902 ) 903 904 ended_str = self.daemon.properties.get('process', {}).get('ended', None) 905 if ended_str is None: 906 return None 907 908 return datetime.fromisoformat(ended_str) 909 910 @property 911 def paused(self) -> Union[datetime, None]: 912 """ 913 The datetime when the job was suspended while running. 914 """ 915 if self.executor is not None: 916 paused_str = self.executor.get_job_paused(self.name) 917 if paused_str is None: 918 return None 919 return ( 920 datetime.fromisoformat(paused_str) 921 .astimezone(timezone.utc) 922 .replace(tzinfo=None) 923 ) 924 925 paused_str = self.daemon.properties.get('process', {}).get('paused', None) 926 if paused_str is None: 927 return None 928 929 return datetime.fromisoformat(paused_str) 930 931 @property 932 def stop_time(self) -> Union[datetime, None]: 933 """ 934 Return the timestamp when the job was manually stopped. 935 """ 936 if self.executor is not None: 937 return self.executor.get_job_stop_time(self.name) 938 939 if not self.daemon.stop_path.exists(): 940 return None 941 942 stop_data = self.daemon._read_stop_file() 943 if not stop_data: 944 return None 945 946 stop_time_str = stop_data.get('stop_time', None) 947 if not stop_time_str: 948 warn(f"Could not read stop time for {self}.") 949 return None 950 951 return datetime.fromisoformat(stop_time_str) 952 953 @property 954 def hidden(self) -> bool: 955 """ 956 Return a bool indicating whether this job should be displayed. 957 """ 958 return ( 959 self.name.startswith('_') 960 or self.name.startswith('.') 961 or self._is_externally_managed 962 ) 963 964 def check_restart(self) -> SuccessTuple: 965 """ 966 If `restart` is `True` and the daemon is not running, 967 restart the job. 968 Do not restart if the job was manually stopped. 969 """ 970 if self.is_running(): 971 return True, f"{self} is running." 972 973 if not self.restart: 974 return True, f"{self} does not need to be restarted." 975 976 if self.stop_time is not None: 977 return True, f"{self} was manually stopped." 978 979 return self.start() 980 981 @property 982 def label(self) -> str: 983 """ 984 Return the job's Daemon label (joined sysargs). 985 """ 986 from meerschaum._internal.arguments import compress_pipeline_sysargs 987 sysargs = compress_pipeline_sysargs(self.sysargs) 988 return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip() 989 990 @property 991 def _externally_managed_file(self) -> pathlib.Path: 992 """ 993 Return the path to the externally managed file. 994 """ 995 return self.daemon.path / '.externally-managed' 996 997 def _set_externally_managed(self): 998 """ 999 Set this job as externally managed. 1000 """ 1001 self._externally_managed = True 1002 try: 1003 self._externally_managed_file.parent.mkdir(exist_ok=True, parents=True) 1004 self._externally_managed_file.touch() 1005 except Exception as e: 1006 warn(e) 1007 1008 @property 1009 def _is_externally_managed(self) -> bool: 1010 """ 1011 Return whether this job is externally managed. 1012 """ 1013 return self.executor_keys in (None, 'local') and ( 1014 self._externally_managed or self._externally_managed_file.exists() 1015 ) 1016 1017 @property 1018 def env(self) -> Dict[str, str]: 1019 """ 1020 Return the environment variables to set for the job's process. 1021 """ 1022 if '_env' in self.__dict__: 1023 return self.__dict__['_env'] 1024 1025 _env = self.daemon.properties.get('env', {}) 1026 default_env = { 1027 'PYTHONUNBUFFERED': '1', 1028 'LINES': str(get_config('jobs', 'terminal', 'lines')), 1029 'COLUMNS': str(get_config('jobs', 'terminal', 'columns')), 1030 STATIC_CONFIG['environment']['noninteractive']: 'true', 1031 } 1032 self._env = {**default_env, **_env} 1033 return self._env 1034 1035 @property 1036 def delete_after_completion(self) -> bool: 1037 """ 1038 Return whether this job is configured to delete itself after completion. 1039 """ 1040 if '_delete_after_completion' in self.__dict__: 1041 return self.__dict__.get('_delete_after_completion', False) 1042 1043 self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False) 1044 return self._delete_after_completion 1045 1046 def __str__(self) -> str: 1047 sysargs = self.sysargs 1048 sysargs_str = shlex.join(sysargs) if sysargs else '' 1049 job_str = f'Job("{self.name}"' 1050 if sysargs_str: 1051 job_str += f', "{sysargs_str}"' 1052 1053 job_str += ')' 1054 return job_str 1055 1056 def __repr__(self) -> str: 1057 return str(self) 1058 1059 def __hash__(self) -> int: 1060 return hash(self.name)
Manage a meerschaum.utils.daemon.Daemon, locally or remotely via the API.
75 def __init__( 76 self, 77 name: str, 78 sysargs: Union[List[str], str, None] = None, 79 env: Optional[Dict[str, str]] = None, 80 executor_keys: Optional[str] = None, 81 delete_after_completion: bool = False, 82 refresh_seconds: Union[int, float, None] = None, 83 _properties: Optional[Dict[str, Any]] = None, 84 _rotating_log=None, 85 _stdin_file=None, 86 _status_hook: Optional[Callable[[], str]] = None, 87 _result_hook: Optional[Callable[[], SuccessTuple]] = None, 88 _externally_managed: bool = False, 89 ): 90 """ 91 Create a new job to manage a `meerschaum.utils.daemon.Daemon`. 92 93 Parameters 94 ---------- 95 name: str 96 The name of the job to be created. 97 This will also be used as the Daemon ID. 98 99 sysargs: Union[List[str], str, None], default None 100 The sysargs of the command to be executed, e.g. 'start api'. 101 102 env: Optional[Dict[str, str]], default None 103 If provided, set these environment variables in the job's process. 104 105 executor_keys: Optional[str], default None 106 If provided, execute the job remotely on an API instance, e.g. 'api:main'. 107 108 delete_after_completion: bool, default False 109 If `True`, delete this job when it has finished executing. 110 111 refresh_seconds: Union[int, float, None], default None 112 The number of seconds to sleep between refreshes. 113 Defaults to the configured value `system.cli.refresh_seconds`. 114 115 _properties: Optional[Dict[str, Any]], default None 116 If provided, use this to patch the daemon's properties. 117 """ 118 from meerschaum.utils.daemon import Daemon 119 for char in BANNED_CHARS: 120 if char in name: 121 raise ValueError(f"Invalid name: ({char}) is not allowed.") 122 123 if isinstance(sysargs, str): 124 sysargs = shlex.split(sysargs) 125 126 and_key = STATIC_CONFIG['system']['arguments']['and_key'] 127 escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key'] 128 if sysargs: 129 sysargs = [ 130 (arg if arg != escaped_and_key else and_key) 131 for arg in sysargs 132 ] 133 134 ### NOTE: 'local' and 'systemd' executors are being coalesced. 135 if executor_keys is None: 136 from meerschaum.jobs import get_executor_keys_from_context 137 executor_keys = get_executor_keys_from_context() 138 139 self.executor_keys = executor_keys 140 self.name = name 141 self.refresh_seconds = ( 142 refresh_seconds 143 if refresh_seconds is not None 144 else mrsm.get_config('system', 'cli', 'refresh_seconds') 145 ) 146 try: 147 self._daemon = ( 148 Daemon(daemon_id=name) 149 if executor_keys == 'local' 150 else None 151 ) 152 except Exception: 153 self._daemon = None 154 155 ### Handle any injected dependencies. 156 if _rotating_log is not None: 157 self._rotating_log = _rotating_log 158 if self._daemon is not None: 159 self._daemon._rotating_log = _rotating_log 160 161 if _stdin_file is not None: 162 self._stdin_file = _stdin_file 163 if self._daemon is not None: 164 self._daemon._stdin_file = _stdin_file 165 self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path 166 167 if _status_hook is not None: 168 self._status_hook = _status_hook 169 170 if _result_hook is not None: 171 self._result_hook = _result_hook 172 173 self._externally_managed = _externally_managed 174 self._properties_patch = _properties or {} 175 if _externally_managed: 176 self._properties_patch.update({'externally_managed': _externally_managed}) 177 178 if env: 179 self._properties_patch.update({'env': env}) 180 181 if delete_after_completion: 182 self._properties_patch.update({'delete_after_completion': delete_after_completion}) 183 184 daemon_sysargs = ( 185 self._daemon.properties.get('target', {}).get('args', [None])[0] 186 if self._daemon is not None 187 else None 188 ) 189 190 if daemon_sysargs and sysargs and daemon_sysargs != sysargs: 191 warn("Given sysargs differ from existing sysargs.") 192 193 self._sysargs = [ 194 arg 195 for arg in (daemon_sysargs or sysargs or []) 196 if arg not in ('-d', '--daemon') 197 ] 198 for restart_flag in RESTART_FLAGS: 199 if restart_flag in self._sysargs: 200 self._properties_patch.update({'restart': True}) 201 break
Create a new job to manage a meerschaum.utils.daemon.Daemon.
Parameters
- name (str): The name of the job to be created. This will also be used as the Daemon ID.
- sysargs (Union[List[str], str, None], default None): The sysargs of the command to be executed, e.g. 'start api'.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the job's process.
- executor_keys (Optional[str], default None): If provided, execute the job remotely on an API instance, e.g. 'api:main'.
- delete_after_completion (bool, default False):
If
True, delete this job when it has finished executing. - refresh_seconds (Union[int, float, None], default None):
The number of seconds to sleep between refreshes.
Defaults to the configured value
system.cli.refresh_seconds. - _properties (Optional[Dict[str, Any]], default None): If provided, use this to patch the daemon's properties.
203 @staticmethod 204 def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job: 205 """ 206 Build a `Job` from the PID of a running Meerschaum process. 207 208 Parameters 209 ---------- 210 pid: int 211 The PID of the process. 212 213 executor_keys: Optional[str], default None 214 The executor keys to assign to the job. 215 """ 216 psutil = mrsm.attempt_import('psutil') 217 try: 218 process = psutil.Process(pid) 219 except psutil.NoSuchProcess as e: 220 warn(f"Process with PID {pid} does not exist.", stack=False) 221 raise e 222 223 command_args = process.cmdline() 224 is_daemon = command_args[1] == '-c' 225 226 if is_daemon: 227 daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '') 228 root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None) 229 if root_dir is None: 230 root_dir = paths.ROOT_DIR_PATH 231 else: 232 root_dir = pathlib.Path(root_dir) 233 jobs_dir = root_dir / paths.DAEMON_RESOURCES_PATH.name 234 daemon_dir = jobs_dir / daemon_id 235 pid_file = daemon_dir / 'process.pid' 236 237 if pid_file.exists(): 238 with open(pid_file, 'r', encoding='utf-8') as f: 239 daemon_pid = int(f.read()) 240 241 if pid != daemon_pid: 242 raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}") 243 else: 244 raise EnvironmentError(f"Is job '{daemon_id}' running?") 245 246 return Job(daemon_id, executor_keys=executor_keys) 247 248 from meerschaum._internal.arguments._parse_arguments import parse_arguments 249 from meerschaum.utils.daemon import get_new_daemon_name 250 251 mrsm_ix = 0 252 for i, arg in enumerate(command_args): 253 if 'mrsm' in arg or 'meerschaum' in arg.lower(): 254 mrsm_ix = i 255 break 256 257 sysargs = command_args[mrsm_ix+1:] 258 kwargs = parse_arguments(sysargs) 259 name = kwargs.get('name', get_new_daemon_name()) 260 return Job(name, sysargs, executor_keys=executor_keys)
Build a Job from the PID of a running Meerschaum process.
Parameters
- pid (int): The PID of the process.
- executor_keys (Optional[str], default None): The executor keys to assign to the job.
262 def start(self, debug: bool = False) -> SuccessTuple: 263 """ 264 Start the job's daemon. 265 """ 266 if self.executor is not None: 267 if not self.exists(debug=debug): 268 return self.executor.create_job( 269 self.name, 270 self.sysargs, 271 properties=self.daemon.properties, 272 debug=debug, 273 ) 274 return self.executor.start_job(self.name, debug=debug) 275 276 if self.is_running(): 277 return True, f"{self} is already running." 278 279 success, msg = self.daemon.run( 280 keep_daemon_output=(not self.delete_after_completion), 281 allow_dirty_run=True, 282 ) 283 if not success: 284 return success, msg 285 286 return success, f"Started {self}."
Start the job's daemon.
288 def stop( 289 self, 290 timeout_seconds: Union[int, float, None] = None, 291 debug: bool = False, 292 ) -> SuccessTuple: 293 """ 294 Stop the job's daemon. 295 """ 296 if self.executor is not None: 297 return self.executor.stop_job(self.name, debug=debug) 298 299 if self.daemon.status == 'stopped': 300 if not self.restart: 301 return True, f"{self} is not running." 302 elif self.stop_time is not None: 303 return True, f"{self} will not restart until manually started." 304 305 quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds) 306 if quit_success: 307 return quit_success, f"Stopped {self}." 308 309 warn( 310 f"Failed to gracefully quit {self}.", 311 stack=False, 312 ) 313 kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds) 314 if not kill_success: 315 return kill_success, kill_msg 316 317 return kill_success, f"Killed {self}."
Stop the job's daemon.
319 def pause( 320 self, 321 timeout_seconds: Union[int, float, None] = None, 322 debug: bool = False, 323 ) -> SuccessTuple: 324 """ 325 Pause the job's daemon. 326 """ 327 if self.executor is not None: 328 return self.executor.pause_job(self.name, debug=debug) 329 330 pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds) 331 if not pause_success: 332 return pause_success, pause_msg 333 334 return pause_success, f"Paused {self}."
Pause the job's daemon.
336 def delete(self, debug: bool = False) -> SuccessTuple: 337 """ 338 Delete the job and its daemon. 339 """ 340 if self.executor is not None: 341 return self.executor.delete_job(self.name, debug=debug) 342 343 if self.is_running(): 344 stop_success, stop_msg = self.stop() 345 if not stop_success: 346 return stop_success, stop_msg 347 348 cleanup_success, cleanup_msg = self.daemon.cleanup() 349 if not cleanup_success: 350 return cleanup_success, cleanup_msg 351 352 _ = self.daemon._properties.pop('result', None) 353 return cleanup_success, f"Deleted {self}."
Delete the job and its daemon.
355 def is_running(self) -> bool: 356 """ 357 Determine whether the job's daemon is running. 358 """ 359 return self.status == 'running'
Determine whether the job's daemon is running.
361 def exists(self, debug: bool = False) -> bool: 362 """ 363 Determine whether the job exists. 364 """ 365 if self.executor is not None: 366 return self.executor.get_job_exists(self.name, debug=debug) 367 368 return self.daemon.path.exists()
Determine whether the job exists.
370 def get_logs(self) -> Union[str, None]: 371 """ 372 Return the output text of the job's daemon. 373 """ 374 if self.executor is not None: 375 return self.executor.get_logs(self.name) 376 377 return self.daemon.log_text
Return the output text of the job's daemon.
379 def monitor_logs( 380 self, 381 callback_function: Callable[[str], None] = _default_stdout_callback, 382 input_callback_function: Optional[Callable[[], str]] = None, 383 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 384 stop_event: Optional[asyncio.Event] = None, 385 stop_on_exit: bool = False, 386 strip_timestamps: bool = False, 387 accept_input: bool = True, 388 debug: bool = False, 389 _logs_path: Optional[pathlib.Path] = None, 390 _log=None, 391 _stdin_file=None, 392 _wait_if_stopped: bool = True, 393 ): 394 """ 395 Monitor the job's log files and execute a callback on new lines. 396 397 Parameters 398 ---------- 399 callback_function: Callable[[str], None], default partial(print, end='') 400 The callback to execute as new data comes in. 401 Defaults to printing the output directly to `stdout`. 402 403 input_callback_function: Optional[Callable[[], str]], default None 404 If provided, execute this callback when the daemon is blocking on stdin. 405 Defaults to `sys.stdin.readline()`. 406 407 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 408 If provided, execute this callback when the daemon stops. 409 The job's SuccessTuple will be passed to the callback. 410 411 stop_event: Optional[asyncio.Event], default None 412 If provided, stop monitoring when this event is set. 413 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 414 from within `callback_function` to stop monitoring. 415 416 stop_on_exit: bool, default False 417 If `True`, stop monitoring when the job stops. 418 419 strip_timestamps: bool, default False 420 If `True`, remove leading timestamps from lines. 421 422 accept_input: bool, default True 423 If `True`, accept input when the daemon blocks on stdin. 424 """ 425 if self.executor is not None: 426 self.executor.monitor_logs( 427 self.name, 428 callback_function, 429 input_callback_function=input_callback_function, 430 stop_callback_function=stop_callback_function, 431 stop_on_exit=stop_on_exit, 432 accept_input=accept_input, 433 strip_timestamps=strip_timestamps, 434 debug=debug, 435 ) 436 return 437 438 monitor_logs_coroutine = self.monitor_logs_async( 439 callback_function=callback_function, 440 input_callback_function=input_callback_function, 441 stop_callback_function=stop_callback_function, 442 stop_event=stop_event, 443 stop_on_exit=stop_on_exit, 444 strip_timestamps=strip_timestamps, 445 accept_input=accept_input, 446 debug=debug, 447 _logs_path=_logs_path, 448 _log=_log, 449 _stdin_file=_stdin_file, 450 _wait_if_stopped=_wait_if_stopped, 451 ) 452 return asyncio.run(monitor_logs_coroutine)
Monitor the job's log files and execute a callback on new lines.
Parameters
- callback_function (Callable[[str], None], default partial(print, end='')):
The callback to execute as new data comes in.
Defaults to printing the output directly to
stdout. - input_callback_function (Optional[Callable[[], str]], default None):
If provided, execute this callback when the daemon is blocking on stdin.
Defaults to
sys.stdin.readline(). - stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
- stop_event (Optional[asyncio.Event], default None):
If provided, stop monitoring when this event is set.
You may instead raise
meerschaum.jobs.StopMonitoringLogsfrom withincallback_functionto stop monitoring. - stop_on_exit (bool, default False):
If
True, stop monitoring when the job stops. - strip_timestamps (bool, default False):
If
True, remove leading timestamps from lines. - accept_input (bool, default True):
If
True, accept input when the daemon blocks on stdin.
454 async def monitor_logs_async( 455 self, 456 callback_function: Callable[[str], None] = _default_stdout_callback, 457 input_callback_function: Optional[Callable[[], str]] = None, 458 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 459 stop_event: Optional[asyncio.Event] = None, 460 stop_on_exit: bool = False, 461 strip_timestamps: bool = False, 462 accept_input: bool = True, 463 debug: bool = False, 464 _logs_path: Optional[pathlib.Path] = None, 465 _log=None, 466 _stdin_file=None, 467 _wait_if_stopped: bool = True, 468 ): 469 """ 470 Monitor the job's log files and await a callback on new lines. 471 472 Parameters 473 ---------- 474 callback_function: Callable[[str], None], default _default_stdout_callback 475 The callback to execute as new data comes in. 476 Defaults to printing the output directly to `stdout`. 477 478 input_callback_function: Optional[Callable[[], str]], default None 479 If provided, execute this callback when the daemon is blocking on stdin. 480 Defaults to `sys.stdin.readline()`. 481 482 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 483 If provided, execute this callback when the daemon stops. 484 The job's SuccessTuple will be passed to the callback. 485 486 stop_event: Optional[asyncio.Event], default None 487 If provided, stop monitoring when this event is set. 488 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 489 from within `callback_function` to stop monitoring. 490 491 stop_on_exit: bool, default False 492 If `True`, stop monitoring when the job stops. 493 494 strip_timestamps: bool, default False 495 If `True`, remove leading timestamps from lines. 496 497 accept_input: bool, default True 498 If `True`, accept input when the daemon blocks on stdin. 499 """ 500 from meerschaum.utils.prompt import prompt 501 502 def default_input_callback_function(): 503 prompt_kwargs = self.get_prompt_kwargs(debug=debug) 504 if prompt_kwargs: 505 answer = prompt(**prompt_kwargs) 506 return answer + '\n' 507 return sys.stdin.readline() 508 509 if input_callback_function is None: 510 input_callback_function = default_input_callback_function 511 512 if self.executor is not None: 513 await self.executor.monitor_logs_async( 514 self.name, 515 callback_function, 516 input_callback_function=input_callback_function, 517 stop_callback_function=stop_callback_function, 518 stop_on_exit=stop_on_exit, 519 strip_timestamps=strip_timestamps, 520 accept_input=accept_input, 521 debug=debug, 522 ) 523 return 524 525 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 526 527 events = { 528 'user': stop_event, 529 'stopped': asyncio.Event(), 530 'stop_token': asyncio.Event(), 531 'stop_exception': asyncio.Event(), 532 'stopped_timeout': asyncio.Event(), 533 } 534 combined_event = asyncio.Event() 535 emitted_text = False 536 stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file 537 538 async def check_job_status(): 539 if not stop_on_exit: 540 return 541 542 nonlocal emitted_text 543 544 sleep_time = 0.1 545 while sleep_time < 0.2: 546 if self.status == 'stopped': 547 if not emitted_text and _wait_if_stopped: 548 await asyncio.sleep(sleep_time) 549 sleep_time = round(sleep_time * 1.1, 3) 550 continue 551 552 if stop_callback_function is not None: 553 try: 554 if asyncio.iscoroutinefunction(stop_callback_function): 555 await stop_callback_function(self.result) 556 else: 557 stop_callback_function(self.result) 558 except asyncio.exceptions.CancelledError: 559 break 560 except Exception: 561 warn(traceback.format_exc()) 562 563 if stop_on_exit: 564 events['stopped'].set() 565 566 break 567 await asyncio.sleep(0.1) 568 569 events['stopped_timeout'].set() 570 571 async def check_blocking_on_input(): 572 while True: 573 if not emitted_text or not self.is_blocking_on_stdin(): 574 try: 575 await asyncio.sleep(self.refresh_seconds) 576 except asyncio.exceptions.CancelledError: 577 break 578 continue 579 580 if not self.is_running(): 581 break 582 583 await emit_latest_lines() 584 585 try: 586 print('', end='', flush=True) 587 if asyncio.iscoroutinefunction(input_callback_function): 588 data = await input_callback_function() 589 else: 590 loop = asyncio.get_running_loop() 591 data = await loop.run_in_executor(None, input_callback_function) 592 except KeyboardInterrupt: 593 break 594 # if not data.endswith('\n'): 595 # data += '\n' 596 597 stdin_file.write(data) 598 await asyncio.sleep(self.refresh_seconds) 599 600 async def combine_events(): 601 event_tasks = [ 602 asyncio.create_task(event.wait()) 603 for event in events.values() 604 if event is not None 605 ] 606 if not event_tasks: 607 return 608 609 try: 610 done, pending = await asyncio.wait( 611 event_tasks, 612 return_when=asyncio.FIRST_COMPLETED, 613 ) 614 for task in pending: 615 task.cancel() 616 except asyncio.exceptions.CancelledError: 617 pass 618 finally: 619 combined_event.set() 620 621 check_job_status_task = asyncio.create_task(check_job_status()) 622 check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input()) 623 combine_events_task = asyncio.create_task(combine_events()) 624 625 log = _log if _log is not None else self.daemon.rotating_log 626 lines_to_show = ( 627 self.daemon.properties.get( 628 'logs', {} 629 ).get( 630 'lines_to_show', get_config('jobs', 'logs', 'lines_to_show') 631 ) 632 ) 633 634 async def emit_latest_lines(): 635 nonlocal emitted_text 636 nonlocal stop_event 637 lines = log.readlines() 638 for line in lines[(-1 * lines_to_show):]: 639 if stop_event is not None and stop_event.is_set(): 640 return 641 642 line_stripped_extra = strip_timestamp_from_line(line.strip()) 643 line_stripped = strip_timestamp_from_line(line) 644 645 if line_stripped_extra == STOP_TOKEN: 646 events['stop_token'].set() 647 return 648 649 if line_stripped_extra == CLEAR_TOKEN: 650 clear_screen(debug=debug) 651 continue 652 653 if line_stripped_extra == FLUSH_TOKEN.strip(): 654 line_stripped = '' 655 line = '' 656 657 if strip_timestamps: 658 line = line_stripped 659 660 try: 661 if asyncio.iscoroutinefunction(callback_function): 662 await callback_function(line) 663 else: 664 callback_function(line) 665 emitted_text = True 666 except StopMonitoringLogs: 667 events['stop_exception'].set() 668 return 669 except Exception: 670 warn(f"Error in logs callback:\n{traceback.format_exc()}") 671 672 await emit_latest_lines() 673 674 tasks = ( 675 [check_job_status_task] 676 + ([check_blocking_on_input_task] if accept_input else []) 677 + [combine_events_task] 678 ) 679 try: 680 _ = asyncio.gather(*tasks, return_exceptions=True) 681 except asyncio.exceptions.CancelledError: 682 raise 683 except Exception: 684 warn(f"Failed to run async checks:\n{traceback.format_exc()}") 685 686 watchfiles = mrsm.attempt_import('watchfiles', lazy=False) 687 dir_path_to_monitor = ( 688 _logs_path 689 or (log.file_path.parent if log else None) 690 or paths.LOGS_RESOURCES_PATH 691 ) 692 async for changes in watchfiles.awatch( 693 dir_path_to_monitor, 694 stop_event=combined_event, 695 ): 696 for change in changes: 697 file_path_str = change[1] 698 file_path = pathlib.Path(file_path_str) 699 latest_subfile_path = log.get_latest_subfile_path() 700 if latest_subfile_path != file_path: 701 continue 702 703 await emit_latest_lines() 704 705 await emit_latest_lines()
Monitor the job's log files and await a callback on new lines.
Parameters
- callback_function (Callable[[str], None], default _default_stdout_callback):
The callback to execute as new data comes in.
Defaults to printing the output directly to
stdout. - input_callback_function (Optional[Callable[[], str]], default None):
If provided, execute this callback when the daemon is blocking on stdin.
Defaults to
sys.stdin.readline(). - stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
- stop_event (Optional[asyncio.Event], default None):
If provided, stop monitoring when this event is set.
You may instead raise
meerschaum.jobs.StopMonitoringLogsfrom withincallback_functionto stop monitoring. - stop_on_exit (bool, default False):
If
True, stop monitoring when the job stops. - strip_timestamps (bool, default False):
If
True, remove leading timestamps from lines. - accept_input (bool, default True):
If
True, accept input when the daemon blocks on stdin.
707 def is_blocking_on_stdin(self, debug: bool = False) -> bool: 708 """ 709 Return whether a job's daemon is blocking on stdin. 710 """ 711 if self.executor is not None: 712 return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug) 713 714 return self.is_running() and self.daemon.blocking_stdin_file_path.exists()
Return whether a job's daemon is blocking on stdin.
716 def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]: 717 """ 718 Return the kwargs to the blocking `prompt()`, if available. 719 """ 720 if self.executor is not None: 721 return self.executor.get_job_prompt_kwargs(self.name, debug=debug) 722 723 if not self.daemon.prompt_kwargs_file_path.exists(): 724 return {} 725 726 try: 727 with open(self.daemon.prompt_kwargs_file_path, 'r', encoding='utf-8') as f: 728 prompt_kwargs = json.load(f) 729 730 return prompt_kwargs 731 732 except Exception: 733 import traceback 734 traceback.print_exc() 735 return {}
Return the kwargs to the blocking prompt(), if available.
737 def write_stdin(self, data): 738 """ 739 Write to a job's daemon's `stdin`. 740 """ 741 self.daemon.stdin_file.write(data)
Write to a job's daemon's stdin.
743 @property 744 def executor(self) -> Union[Executor, None]: 745 """ 746 If the job is remote, return the connector to the remote API instance. 747 """ 748 return ( 749 mrsm.get_connector(self.executor_keys) 750 if self.executor_keys != 'local' 751 else None 752 )
If the job is remote, return the connector to the remote API instance.
754 @property 755 def status(self) -> str: 756 """ 757 Return the running status of the job's daemon. 758 """ 759 if '_status_hook' in self.__dict__: 760 return self._status_hook() 761 762 if self.executor is not None: 763 return self.executor.get_job_status(self.name) 764 765 return self.daemon.status
Return the running status of the job's daemon.
767 @property 768 def pid(self) -> Union[int, None]: 769 """ 770 Return the PID of the job's dameon. 771 """ 772 if self.executor is not None: 773 return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None) 774 775 return self.daemon.pid
Return the PID of the job's dameon.
777 @property 778 def restart(self) -> bool: 779 """ 780 Return whether to restart a stopped job. 781 """ 782 if self.executor is not None: 783 return self.executor.get_job_metadata(self.name).get('restart', False) 784 785 return self.daemon.properties.get('restart', False)
Return whether to restart a stopped job.
787 @property 788 def result(self) -> SuccessTuple: 789 """ 790 Return the `SuccessTuple` when the job has terminated. 791 """ 792 if self.is_running(): 793 return True, f"{self} is running." 794 795 if '_result_hook' in self.__dict__: 796 return self._result_hook() 797 798 if self.executor is not None: 799 return ( 800 self.executor.get_job_metadata(self.name) 801 .get('result', (False, "No result available.")) 802 ) 803 804 _result = self.daemon.properties.get('result', None) 805 if _result is None: 806 from meerschaum.utils.daemon.Daemon import _results 807 return _results.get(self.daemon.daemon_id, (False, "No result available.")) 808 809 return tuple(_result)
Return the SuccessTuple when the job has terminated.
811 @property 812 def sysargs(self) -> List[str]: 813 """ 814 Return the sysargs to use for the Daemon. 815 """ 816 if self._sysargs: 817 return self._sysargs 818 819 if self.executor is not None: 820 return self.executor.get_job_metadata(self.name).get('sysargs', []) 821 822 target_args = self.daemon.target_args 823 if target_args is None: 824 return [] 825 self._sysargs = target_args[0] if len(target_args) > 0 else [] 826 return self._sysargs
Return the sysargs to use for the Daemon.
828 def get_daemon_properties(self) -> Dict[str, Any]: 829 """ 830 Return the `properties` dictionary for the job's daemon. 831 """ 832 remote_properties = ( 833 {} 834 if self.executor is None 835 else self.executor.get_job_properties(self.name) 836 ) 837 return { 838 **remote_properties, 839 **self._properties_patch 840 }
Return the properties dictionary for the job's daemon.
842 @property 843 def daemon(self) -> 'Daemon': 844 """ 845 Return the daemon which this job manages. 846 """ 847 from meerschaum.utils.daemon import Daemon 848 if self._daemon is not None and self.executor is None and self._sysargs: 849 return self._daemon 850 851 self._daemon = Daemon( 852 target=entry, 853 target_args=[self._sysargs], 854 target_kw={}, 855 daemon_id=self.name, 856 label=shlex.join(self._sysargs), 857 properties=self.get_daemon_properties(), 858 ) 859 if '_rotating_log' in self.__dict__: 860 self._daemon._rotating_log = self._rotating_log 861 862 if '_stdin_file' in self.__dict__: 863 self._daemon._stdin_file = self._stdin_file 864 self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path 865 866 return self._daemon
Return the daemon which this job manages.
868 @property 869 def began(self) -> Union[datetime, None]: 870 """ 871 The datetime when the job began running. 872 """ 873 if self.executor is not None: 874 began_str = self.executor.get_job_began(self.name) 875 if began_str is None: 876 return None 877 return ( 878 datetime.fromisoformat(began_str) 879 .astimezone(timezone.utc) 880 .replace(tzinfo=None) 881 ) 882 883 began_str = self.daemon.properties.get('process', {}).get('began', None) 884 if began_str is None: 885 return None 886 887 return datetime.fromisoformat(began_str)
The datetime when the job began running.
889 @property 890 def ended(self) -> Union[datetime, None]: 891 """ 892 The datetime when the job stopped running. 893 """ 894 if self.executor is not None: 895 ended_str = self.executor.get_job_ended(self.name) 896 if ended_str is None: 897 return None 898 return ( 899 datetime.fromisoformat(ended_str) 900 .astimezone(timezone.utc) 901 .replace(tzinfo=None) 902 ) 903 904 ended_str = self.daemon.properties.get('process', {}).get('ended', None) 905 if ended_str is None: 906 return None 907 908 return datetime.fromisoformat(ended_str)
The datetime when the job stopped running.
910 @property 911 def paused(self) -> Union[datetime, None]: 912 """ 913 The datetime when the job was suspended while running. 914 """ 915 if self.executor is not None: 916 paused_str = self.executor.get_job_paused(self.name) 917 if paused_str is None: 918 return None 919 return ( 920 datetime.fromisoformat(paused_str) 921 .astimezone(timezone.utc) 922 .replace(tzinfo=None) 923 ) 924 925 paused_str = self.daemon.properties.get('process', {}).get('paused', None) 926 if paused_str is None: 927 return None 928 929 return datetime.fromisoformat(paused_str)
The datetime when the job was suspended while running.
931 @property 932 def stop_time(self) -> Union[datetime, None]: 933 """ 934 Return the timestamp when the job was manually stopped. 935 """ 936 if self.executor is not None: 937 return self.executor.get_job_stop_time(self.name) 938 939 if not self.daemon.stop_path.exists(): 940 return None 941 942 stop_data = self.daemon._read_stop_file() 943 if not stop_data: 944 return None 945 946 stop_time_str = stop_data.get('stop_time', None) 947 if not stop_time_str: 948 warn(f"Could not read stop time for {self}.") 949 return None 950 951 return datetime.fromisoformat(stop_time_str)
Return the timestamp when the job was manually stopped.
964 def check_restart(self) -> SuccessTuple: 965 """ 966 If `restart` is `True` and the daemon is not running, 967 restart the job. 968 Do not restart if the job was manually stopped. 969 """ 970 if self.is_running(): 971 return True, f"{self} is running." 972 973 if not self.restart: 974 return True, f"{self} does not need to be restarted." 975 976 if self.stop_time is not None: 977 return True, f"{self} was manually stopped." 978 979 return self.start()
If restart is True and the daemon is not running,
restart the job.
Do not restart if the job was manually stopped.
981 @property 982 def label(self) -> str: 983 """ 984 Return the job's Daemon label (joined sysargs). 985 """ 986 from meerschaum._internal.arguments import compress_pipeline_sysargs 987 sysargs = compress_pipeline_sysargs(self.sysargs) 988 return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip()
Return the job's Daemon label (joined sysargs).
1017 @property 1018 def env(self) -> Dict[str, str]: 1019 """ 1020 Return the environment variables to set for the job's process. 1021 """ 1022 if '_env' in self.__dict__: 1023 return self.__dict__['_env'] 1024 1025 _env = self.daemon.properties.get('env', {}) 1026 default_env = { 1027 'PYTHONUNBUFFERED': '1', 1028 'LINES': str(get_config('jobs', 'terminal', 'lines')), 1029 'COLUMNS': str(get_config('jobs', 'terminal', 'columns')), 1030 STATIC_CONFIG['environment']['noninteractive']: 'true', 1031 } 1032 self._env = {**default_env, **_env} 1033 return self._env
Return the environment variables to set for the job's process.
1035 @property 1036 def delete_after_completion(self) -> bool: 1037 """ 1038 Return whether this job is configured to delete itself after completion. 1039 """ 1040 if '_delete_after_completion' in self.__dict__: 1041 return self.__dict__.get('_delete_after_completion', False) 1042 1043 self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False) 1044 return self._delete_after_completion
Return whether this job is configured to delete itself after completion.
49class StopMonitoringLogs(Exception): 50 """ 51 Raise this exception to stop the logs monitoring. 52 """
Raise this exception to stop the logs monitoring.
37def get_jobs( 38 executor_keys: Optional[str] = None, 39 include_hidden: bool = False, 40 combine_local_and_systemd: bool = True, 41 debug: bool = False, 42) -> Dict[str, Job]: 43 """ 44 Return a dictionary of the existing jobs. 45 46 Parameters 47 ---------- 48 executor_keys: Optional[str], default None 49 If provided, return remote jobs on the given API instance. 50 Otherwise return local jobs. 51 52 include_hidden: bool, default False 53 If `True`, include jobs with the `hidden` attribute. 54 55 Returns 56 ------- 57 A dictionary mapping job names to jobs. 58 """ 59 from meerschaum.connectors.parse import parse_executor_keys 60 executor_keys = executor_keys or get_executor_keys_from_context() 61 62 include_local_and_system = ( 63 combine_local_and_systemd 64 and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd') 65 and get_executor_keys_from_context() == 'systemd' 66 ) 67 68 def _get_local_jobs(): 69 from meerschaum.utils.daemon import get_daemons 70 daemons = get_daemons() 71 jobs = { 72 daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local') 73 for daemon in daemons 74 } 75 return { 76 name: job 77 for name, job in jobs.items() 78 if (include_hidden or not job.hidden) and not job._is_externally_managed 79 80 } 81 82 def _get_systemd_jobs(): 83 from meerschaum.jobs.systemd import SystemdExecutor 84 conn = SystemdExecutor('systemd') 85 jobs = conn.get_jobs(debug=debug) 86 return { 87 name: job 88 for name, job in jobs.items() 89 if include_hidden or not job.hidden 90 } 91 92 if include_local_and_system: 93 local_jobs = _get_local_jobs() 94 systemd_jobs = _get_systemd_jobs() 95 shared_jobs = set(local_jobs) & set(systemd_jobs) 96 if shared_jobs: 97 from meerschaum.utils.misc import items_str 98 from meerschaum.utils.warnings import warn 99 warn( 100 "Job" 101 + ('s' if len(shared_jobs) != 1 else '') 102 + f" {items_str(list(shared_jobs))} " 103 + "exist" 104 + ('s' if len(shared_jobs) == 1 else '') 105 + " in both `local` and `systemd`.", 106 stack=False, 107 ) 108 return {**local_jobs, **systemd_jobs} 109 110 if executor_keys == 'local': 111 return _get_local_jobs() 112 113 if executor_keys == 'systemd': 114 return _get_systemd_jobs() 115 116 try: 117 _ = parse_executor_keys(executor_keys, construct=False) 118 conn = parse_executor_keys(executor_keys) 119 jobs = conn.get_jobs(debug=debug) 120 return { 121 name: job 122 for name, job in jobs.items() 123 if include_hidden or not job.hidden 124 } 125 except Exception: 126 return {}
Return a dictionary of the existing jobs.
Parameters
- executor_keys (Optional[str], default None): If provided, return remote jobs on the given API instance. Otherwise return local jobs.
- include_hidden (bool, default False):
If
True, include jobs with thehiddenattribute.
Returns
- A dictionary mapping job names to jobs.
129def get_filtered_jobs( 130 executor_keys: Optional[str] = None, 131 filter_list: Optional[List[str]] = None, 132 include_hidden: bool = False, 133 combine_local_and_systemd: bool = True, 134 warn: bool = False, 135 debug: bool = False, 136) -> Dict[str, Job]: 137 """ 138 Return a list of jobs filtered by the user. 139 """ 140 from meerschaum.utils.warnings import warn as _warn 141 jobs = get_jobs( 142 executor_keys, 143 include_hidden=True, 144 combine_local_and_systemd=combine_local_and_systemd, 145 debug=debug, 146 ) 147 if not filter_list: 148 return { 149 name: job 150 for name, job in jobs.items() 151 if include_hidden or not job.hidden 152 } 153 154 jobs_to_return = {} 155 filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')] 156 filter_list_with_underscores = [name for name in filter_list if name.startswith('_')] 157 if ( 158 filter_list_without_underscores and not filter_list_with_underscores 159 or filter_list_with_underscores and not filter_list_without_underscores 160 ): 161 pass 162 for name in filter_list: 163 job = jobs.get(name, None) 164 if job is None: 165 if warn: 166 _warn( 167 f"Job '{name}' does not exist.", 168 stack=False, 169 ) 170 continue 171 jobs_to_return[name] = job 172 173 if not jobs_to_return and filter_list_with_underscores: 174 names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores] 175 return { 176 name: job 177 for name, job in jobs.items() 178 if name not in names_to_exclude 179 } 180 181 return jobs_to_return
Return a list of jobs filtered by the user.
184def get_restart_jobs( 185 executor_keys: Optional[str] = None, 186 jobs: Optional[Dict[str, Job]] = None, 187 include_hidden: bool = False, 188 combine_local_and_systemd: bool = True, 189 debug: bool = False, 190) -> Dict[str, Job]: 191 """ 192 Return jobs which were created with `--restart` or `--loop`. 193 """ 194 if jobs is None: 195 jobs = get_jobs( 196 executor_keys, 197 include_hidden=include_hidden, 198 combine_local_and_systemd=combine_local_and_systemd, 199 debug=debug, 200 ) 201 202 return { 203 name: job 204 for name, job in jobs.items() 205 if job.restart 206 }
Return jobs which were created with --restart or --loop.
209def get_running_jobs( 210 executor_keys: Optional[str] = None, 211 jobs: Optional[Dict[str, Job]] = None, 212 include_hidden: bool = False, 213 combine_local_and_systemd: bool = True, 214 debug: bool = False, 215) -> Dict[str, Job]: 216 """ 217 Return a dictionary of running jobs. 218 """ 219 if jobs is None: 220 jobs = get_jobs( 221 executor_keys, 222 include_hidden=include_hidden, 223 combine_local_and_systemd=combine_local_and_systemd, 224 debug=debug, 225 ) 226 227 return { 228 name: job 229 for name, job in jobs.items() 230 if job.status == 'running' 231 }
Return a dictionary of running jobs.
259def get_stopped_jobs( 260 executor_keys: Optional[str] = None, 261 jobs: Optional[Dict[str, Job]] = None, 262 include_hidden: bool = False, 263 combine_local_and_systemd: bool = True, 264 debug: bool = False, 265) -> Dict[str, Job]: 266 """ 267 Return a dictionary of stopped jobs. 268 """ 269 if jobs is None: 270 jobs = get_jobs( 271 executor_keys, 272 include_hidden=include_hidden, 273 combine_local_and_systemd=combine_local_and_systemd, 274 debug=debug, 275 ) 276 277 return { 278 name: job 279 for name, job in jobs.items() 280 if job.status == 'stopped' 281 }
Return a dictionary of stopped jobs.
234def get_paused_jobs( 235 executor_keys: Optional[str] = None, 236 jobs: Optional[Dict[str, Job]] = None, 237 include_hidden: bool = False, 238 combine_local_and_systemd: bool = True, 239 debug: bool = False, 240) -> Dict[str, Job]: 241 """ 242 Return a dictionary of paused jobs. 243 """ 244 if jobs is None: 245 jobs = get_jobs( 246 executor_keys, 247 include_hidden=include_hidden, 248 combine_local_and_systemd=combine_local_and_systemd, 249 debug=debug, 250 ) 251 252 return { 253 name: job 254 for name, job in jobs.items() 255 if job.status == 'paused' 256 }
Return a dictionary of paused jobs.
284def make_executor(cls): 285 """ 286 Register a class as an `Executor`. 287 """ 288 import re 289 from meerschaum.connectors import make_connector 290 suffix_regex = r'executor$' 291 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 292 if typ not in executor_types: 293 executor_types.append(typ) 294 return make_connector(cls, _is_executor=True)
Register a class as an Executor.
22class Executor(Connector): 23 """ 24 Define the methods for managing jobs. 25 """ 26 27 @abstractmethod 28 def get_job_names(self, debug: bool = False) -> List[str]: 29 """ 30 Return a list of existing jobs, including hidden ones. 31 """ 32 33 @abstractmethod 34 def get_job_exists(self, name: str, debug: bool = False) -> bool: 35 """ 36 Return whether a job exists. 37 """ 38 39 @abstractmethod 40 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 41 """ 42 Return a dictionary of existing jobs. 43 """ 44 45 @abstractmethod 46 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 47 """ 48 Return a job's metadata. 49 """ 50 51 @abstractmethod 52 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 53 """ 54 Return the underlying daemon's properties. 55 """ 56 @abstractmethod 57 def get_job_status(self, name: str, debug: bool = False) -> str: 58 """ 59 Return the job's status. 60 """ 61 62 @abstractmethod 63 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 64 """ 65 Return when a job began running. 66 """ 67 68 @abstractmethod 69 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 70 """ 71 Return when a job stopped running. 72 """ 73 74 @abstractmethod 75 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 76 """ 77 Return a job's `paused` timestamp, if it exists. 78 """ 79 80 @abstractmethod 81 def create_job( 82 self, 83 name: str, 84 sysargs: List[str], 85 properties: Optional[Dict[str, Any]] = None, 86 debug: bool = False, 87 ) -> SuccessTuple: 88 """ 89 Create a new job. 90 """ 91 92 @abstractmethod 93 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 94 """ 95 Start a job. 96 """ 97 98 @abstractmethod 99 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 100 """ 101 Stop a job. 102 """ 103 104 @abstractmethod 105 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 106 """ 107 Pause a job. 108 """ 109 110 @abstractmethod 111 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 112 """ 113 Delete a job. 114 """ 115 116 @abstractmethod 117 def get_logs(self, name: str, debug: bool = False) -> str: 118 """ 119 Return a job's log output. 120 """ 121 122 @abstractmethod 123 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 124 """ 125 Return the job's manual stop time. 126 """ 127 128 @abstractmethod 129 async def monitor_logs_async( 130 self, 131 name: str, 132 callback_function: Callable[[Any], Any], 133 input_callback_function: Callable[[], str], 134 stop_callback_function: Callable[[SuccessTuple], str], 135 stop_on_exit: bool = False, 136 strip_timestamps: bool = False, 137 accept_input: bool = True, 138 debug: bool = False, 139 ): 140 """ 141 Monitor a job's log files and await a callback with the changes. 142 """ 143 144 @abstractmethod 145 def monitor_logs(self, *args, **kwargs): 146 """ 147 Monitor a job's log files. 148 """ 149 150 @abstractmethod 151 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 152 """ 153 Return whether a job is blocking on stdin. 154 """ 155 156 @abstractmethod 157 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 158 """ 159 Return the kwargs to the blocking prompt. 160 """
Define the methods for managing jobs.
27 @abstractmethod 28 def get_job_names(self, debug: bool = False) -> List[str]: 29 """ 30 Return a list of existing jobs, including hidden ones. 31 """
Return a list of existing jobs, including hidden ones.
33 @abstractmethod 34 def get_job_exists(self, name: str, debug: bool = False) -> bool: 35 """ 36 Return whether a job exists. 37 """
Return whether a job exists.
39 @abstractmethod 40 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 41 """ 42 Return a dictionary of existing jobs. 43 """
Return a dictionary of existing jobs.
45 @abstractmethod 46 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 47 """ 48 Return a job's metadata. 49 """
Return a job's metadata.
51 @abstractmethod 52 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 53 """ 54 Return the underlying daemon's properties. 55 """
Return the underlying daemon's properties.
56 @abstractmethod 57 def get_job_status(self, name: str, debug: bool = False) -> str: 58 """ 59 Return the job's status. 60 """
Return the job's status.
62 @abstractmethod 63 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 64 """ 65 Return when a job began running. 66 """
Return when a job began running.
68 @abstractmethod 69 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 70 """ 71 Return when a job stopped running. 72 """
Return when a job stopped running.
74 @abstractmethod 75 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 76 """ 77 Return a job's `paused` timestamp, if it exists. 78 """
Return a job's paused timestamp, if it exists.
80 @abstractmethod 81 def create_job( 82 self, 83 name: str, 84 sysargs: List[str], 85 properties: Optional[Dict[str, Any]] = None, 86 debug: bool = False, 87 ) -> SuccessTuple: 88 """ 89 Create a new job. 90 """
Create a new job.
92 @abstractmethod 93 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 94 """ 95 Start a job. 96 """
Start a job.
98 @abstractmethod 99 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 100 """ 101 Stop a job. 102 """
Stop a job.
104 @abstractmethod 105 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 106 """ 107 Pause a job. 108 """
Pause a job.
110 @abstractmethod 111 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 112 """ 113 Delete a job. 114 """
Delete a job.
116 @abstractmethod 117 def get_logs(self, name: str, debug: bool = False) -> str: 118 """ 119 Return a job's log output. 120 """
Return a job's log output.
122 @abstractmethod 123 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 124 """ 125 Return the job's manual stop time. 126 """
Return the job's manual stop time.
128 @abstractmethod 129 async def monitor_logs_async( 130 self, 131 name: str, 132 callback_function: Callable[[Any], Any], 133 input_callback_function: Callable[[], str], 134 stop_callback_function: Callable[[SuccessTuple], str], 135 stop_on_exit: bool = False, 136 strip_timestamps: bool = False, 137 accept_input: bool = True, 138 debug: bool = False, 139 ): 140 """ 141 Monitor a job's log files and await a callback with the changes. 142 """
Monitor a job's log files and await a callback with the changes.
144 @abstractmethod 145 def monitor_logs(self, *args, **kwargs): 146 """ 147 Monitor a job's log files. 148 """
Monitor a job's log files.
150 @abstractmethod 151 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 152 """ 153 Return whether a job is blocking on stdin. 154 """
Return whether a job is blocking on stdin.
156 @abstractmethod 157 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 158 """ 159 Return the kwargs to the blocking prompt. 160 """
Return the kwargs to the blocking prompt.
297def check_restart_jobs( 298 executor_keys: Optional[str] = 'local', 299 jobs: Optional[Dict[str, Job]] = None, 300 include_hidden: bool = True, 301 silent: bool = False, 302 debug: bool = False, 303) -> SuccessTuple: 304 """ 305 Restart any stopped jobs which were created with `--restart`. 306 307 Parameters 308 ---------- 309 executor_keys: Optional[str], default None 310 If provided, check jobs on the given remote API instance. 311 Otherwise check local jobs. 312 313 include_hidden: bool, default True 314 If `True`, include hidden jobs in the check. 315 316 silent: bool, default False 317 If `True`, do not print the restart success message. 318 """ 319 from meerschaum.utils.misc import items_str 320 321 if jobs is None: 322 jobs = get_jobs( 323 executor_keys, 324 include_hidden=include_hidden, 325 combine_local_and_systemd=False, 326 debug=debug, 327 ) 328 329 if not jobs: 330 return True, "No jobs to restart." 331 332 results = {} 333 for name, job in jobs.items(): 334 check_success, check_msg = job.check_restart() 335 results[job.name] = (check_success, check_msg) 336 if not silent: 337 mrsm.pprint((check_success, check_msg)) 338 339 success_names = [name for name, (check_success, check_msg) in results.items() if check_success] 340 fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success] 341 success = len(success_names) == len(jobs) 342 msg = ( 343 ( 344 "Successfully restarted job" 345 + ('s' if len(success_names) != 1 else '') 346 + ' ' + items_str(success_names) + '.' 347 ) 348 if success 349 else ( 350 "Failed to restart job" 351 + ('s' if len(success_names) != 1 else '') 352 + ' ' + items_str(fail_names) + '.' 353 ) 354 ) 355 return success, msg
Restart any stopped jobs which were created with --restart.
Parameters
- executor_keys (Optional[str], default None): If provided, check jobs on the given remote API instance. Otherwise check local jobs.
- include_hidden (bool, default True):
If
True, include hidden jobs in the check. - silent (bool, default False):
If
True, do not print the restart success message.
367def start_check_jobs_thread(): 368 """ 369 Start a thread to regularly monitor jobs. 370 """ 371 import atexit 372 from functools import partial 373 from meerschaum.utils.threading import RepeatTimer 374 from meerschaum._internal.static import STATIC_CONFIG 375 376 global _check_loop_stop_thread 377 sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds'] 378 379 _check_loop_stop_thread = RepeatTimer( 380 sleep_seconds, 381 partial( 382 _check_restart_jobs_against_lock, 383 silent=True, 384 ) 385 ) 386 _check_loop_stop_thread.daemon = True 387 atexit.register(stop_check_jobs_thread) 388 389 _check_loop_stop_thread.start() 390 return _check_loop_stop_thread
Start a thread to regularly monitor jobs.
393def stop_check_jobs_thread(): 394 """ 395 Stop the job monitoring thread. 396 """ 397 import meerschaum.config.paths as paths 398 from meerschaum.utils.warnings import warn 399 if _check_loop_stop_thread is None: 400 return 401 402 _check_loop_stop_thread.cancel() 403 404 try: 405 if paths.CHECK_JOBS_LOCK_PATH.exists(): 406 paths.CHECK_JOBS_LOCK_PATH.unlink() 407 except Exception as e: 408 warn(f"Failed to remove check jobs lock file:\n{e}")
Stop the job monitoring thread.