meerschaum.jobs
Higher-level utilities for managing meerschaum.utils.daemon.Daemon
.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Higher-level utilities for managing `meerschaum.utils.daemon.Daemon`. 7""" 8 9import meerschaum as mrsm 10from meerschaum.utils.typing import Dict, Optional, List, SuccessTuple 11 12from meerschaum.jobs._Job import Job, StopMonitoringLogs 13from meerschaum.jobs._Executor import Executor 14 15__all__ = ( 16 'Job', 17 'StopMonitoringLogs', 18 'systemd', 19 'get_jobs', 20 'get_filtered_jobs', 21 'get_restart_jobs', 22 'get_running_jobs', 23 'get_stopped_jobs', 24 'get_paused_jobs', 25 'get_restart_jobs', 26 'make_executor', 27 'Executor', 28 'check_restart_jobs', 29 'start_check_jobs_thread', 30 'stop_check_jobs_thread', 31) 32 33executor_types: List[str] = ['api', 'local'] 34 35 36def get_jobs( 37 executor_keys: Optional[str] = None, 38 include_hidden: bool = False, 39 combine_local_and_systemd: bool = True, 40 debug: bool = False, 41) -> Dict[str, Job]: 42 """ 43 Return a dictionary of the existing jobs. 44 45 Parameters 46 ---------- 47 executor_keys: Optional[str], default None 48 If provided, return remote jobs on the given API instance. 49 Otherwise return local jobs. 50 51 include_hidden: bool, default False 52 If `True`, include jobs with the `hidden` attribute. 53 54 Returns 55 ------- 56 A dictionary mapping job names to jobs. 57 """ 58 from meerschaum.connectors.parse import parse_executor_keys 59 executor_keys = executor_keys or get_executor_keys_from_context() 60 61 include_local_and_system = ( 62 combine_local_and_systemd 63 and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd') 64 and get_executor_keys_from_context() == 'systemd' 65 ) 66 67 def _get_local_jobs(): 68 from meerschaum.utils.daemon import get_daemons 69 daemons = get_daemons() 70 jobs = { 71 daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local') 72 for daemon in daemons 73 } 74 return { 75 name: job 76 for name, job in jobs.items() 77 if (include_hidden or not job.hidden) and not job._is_externally_managed 78 79 } 80 81 def _get_systemd_jobs(): 82 from meerschaum.jobs.systemd import SystemdExecutor 83 conn = SystemdExecutor('systemd') 84 jobs = conn.get_jobs(debug=debug) 85 return { 86 name: job 87 for name, job in jobs.items() 88 if include_hidden or not job.hidden 89 } 90 91 if include_local_and_system: 92 local_jobs = _get_local_jobs() 93 systemd_jobs = _get_systemd_jobs() 94 shared_jobs = set(local_jobs) & set(systemd_jobs) 95 if shared_jobs: 96 from meerschaum.utils.misc import items_str 97 from meerschaum.utils.warnings import warn 98 warn( 99 "Job" 100 + ('s' if len(shared_jobs) != 1 else '') 101 + f" {items_str(list(shared_jobs))} " 102 + "exist" 103 + ('s' if len(shared_jobs) == 1 else '') 104 + " in both `local` and `systemd`.", 105 stack=False, 106 ) 107 return {**local_jobs, **systemd_jobs} 108 109 if executor_keys == 'local': 110 return _get_local_jobs() 111 112 if executor_keys == 'systemd': 113 return _get_systemd_jobs() 114 115 try: 116 _ = parse_executor_keys(executor_keys, construct=False) 117 conn = parse_executor_keys(executor_keys) 118 jobs = conn.get_jobs(debug=debug) 119 return { 120 name: job 121 for name, job in jobs.items() 122 if include_hidden or not job.hidden 123 } 124 except Exception: 125 return {} 126 127 128def get_filtered_jobs( 129 executor_keys: Optional[str] = None, 130 filter_list: Optional[List[str]] = None, 131 include_hidden: bool = False, 132 combine_local_and_systemd: bool = True, 133 warn: bool = False, 134 debug: bool = False, 135) -> Dict[str, Job]: 136 """ 137 Return a list of jobs filtered by the user. 138 """ 139 from meerschaum.utils.warnings import warn as _warn 140 jobs = get_jobs( 141 executor_keys, 142 include_hidden=True, 143 combine_local_and_systemd=combine_local_and_systemd, 144 debug=debug, 145 ) 146 if not filter_list: 147 return { 148 name: job 149 for name, job in jobs.items() 150 if include_hidden or not job.hidden 151 } 152 153 jobs_to_return = {} 154 filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')] 155 filter_list_with_underscores = [name for name in filter_list if name.startswith('_')] 156 if ( 157 filter_list_without_underscores and not filter_list_with_underscores 158 or filter_list_with_underscores and not filter_list_without_underscores 159 ): 160 pass 161 for name in filter_list: 162 job = jobs.get(name, None) 163 if job is None: 164 if warn: 165 _warn( 166 f"Job '{name}' does not exist.", 167 stack=False, 168 ) 169 continue 170 jobs_to_return[name] = job 171 172 if not jobs_to_return and filter_list_with_underscores: 173 names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores] 174 return { 175 name: job 176 for name, job in jobs.items() 177 if name not in names_to_exclude 178 } 179 180 return jobs_to_return 181 182 183def get_restart_jobs( 184 executor_keys: Optional[str] = None, 185 jobs: Optional[Dict[str, Job]] = None, 186 include_hidden: bool = False, 187 combine_local_and_systemd: bool = True, 188 debug: bool = False, 189) -> Dict[str, Job]: 190 """ 191 Return jobs which were created with `--restart` or `--loop`. 192 """ 193 if jobs is None: 194 jobs = get_jobs( 195 executor_keys, 196 include_hidden=include_hidden, 197 combine_local_and_systemd=combine_local_and_systemd, 198 debug=debug, 199 ) 200 201 return { 202 name: job 203 for name, job in jobs.items() 204 if job.restart 205 } 206 207 208def get_running_jobs( 209 executor_keys: Optional[str] = None, 210 jobs: Optional[Dict[str, Job]] = None, 211 include_hidden: bool = False, 212 combine_local_and_systemd: bool = True, 213 debug: bool = False, 214) -> Dict[str, Job]: 215 """ 216 Return a dictionary of running jobs. 217 """ 218 if jobs is None: 219 jobs = get_jobs( 220 executor_keys, 221 include_hidden=include_hidden, 222 combine_local_and_systemd=combine_local_and_systemd, 223 debug=debug, 224 ) 225 226 return { 227 name: job 228 for name, job in jobs.items() 229 if job.status == 'running' 230 } 231 232 233def get_paused_jobs( 234 executor_keys: Optional[str] = None, 235 jobs: Optional[Dict[str, Job]] = None, 236 include_hidden: bool = False, 237 combine_local_and_systemd: bool = True, 238 debug: bool = False, 239) -> Dict[str, Job]: 240 """ 241 Return a dictionary of paused jobs. 242 """ 243 if jobs is None: 244 jobs = get_jobs( 245 executor_keys, 246 include_hidden=include_hidden, 247 combine_local_and_systemd=combine_local_and_systemd, 248 debug=debug, 249 ) 250 251 return { 252 name: job 253 for name, job in jobs.items() 254 if job.status == 'paused' 255 } 256 257 258def get_stopped_jobs( 259 executor_keys: Optional[str] = None, 260 jobs: Optional[Dict[str, Job]] = None, 261 include_hidden: bool = False, 262 combine_local_and_systemd: bool = True, 263 debug: bool = False, 264) -> Dict[str, Job]: 265 """ 266 Return a dictionary of stopped jobs. 267 """ 268 if jobs is None: 269 jobs = get_jobs( 270 executor_keys, 271 include_hidden=include_hidden, 272 combine_local_and_systemd=combine_local_and_systemd, 273 debug=debug, 274 ) 275 276 return { 277 name: job 278 for name, job in jobs.items() 279 if job.status == 'stopped' 280 } 281 282 283def make_executor(cls): 284 """ 285 Register a class as an `Executor`. 286 """ 287 import re 288 from meerschaum.connectors import make_connector 289 suffix_regex = r'executor$' 290 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 291 if typ not in executor_types: 292 executor_types.append(typ) 293 return make_connector(cls, _is_executor=True) 294 295 296def check_restart_jobs( 297 executor_keys: Optional[str] = 'local', 298 jobs: Optional[Dict[str, Job]] = None, 299 include_hidden: bool = True, 300 silent: bool = False, 301 debug: bool = False, 302) -> SuccessTuple: 303 """ 304 Restart any stopped jobs which were created with `--restart`. 305 306 Parameters 307 ---------- 308 executor_keys: Optional[str], default None 309 If provided, check jobs on the given remote API instance. 310 Otherwise check local jobs. 311 312 include_hidden: bool, default True 313 If `True`, include hidden jobs in the check. 314 315 silent: bool, default False 316 If `True`, do not print the restart success message. 317 """ 318 from meerschaum.utils.misc import items_str 319 320 if jobs is None: 321 jobs = get_jobs( 322 executor_keys, 323 include_hidden=include_hidden, 324 combine_local_and_systemd=False, 325 debug=debug, 326 ) 327 328 if not jobs: 329 return True, "No jobs to restart." 330 331 results = {} 332 for name, job in jobs.items(): 333 check_success, check_msg = job.check_restart() 334 results[job.name] = (check_success, check_msg) 335 if not silent: 336 mrsm.pprint((check_success, check_msg)) 337 338 success_names = [name for name, (check_success, check_msg) in results.items() if check_success] 339 fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success] 340 success = len(success_names) == len(jobs) 341 msg = ( 342 ( 343 "Successfully restarted job" 344 + ('s' if len(success_names) != 1 else '') 345 + ' ' + items_str(success_names) + '.' 346 ) 347 if success 348 else ( 349 "Failed to restart job" 350 + ('s' if len(success_names) != 1 else '') 351 + ' ' + items_str(fail_names) + '.' 352 ) 353 ) 354 return success, msg 355 356 357def _check_restart_jobs_against_lock(*args, **kwargs): 358 from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH 359 fasteners = mrsm.attempt_import('fasteners', lazy=False) 360 lock = fasteners.InterProcessLock(CHECK_JOBS_LOCK_PATH) 361 with lock: 362 check_restart_jobs(*args, **kwargs) 363 364 365_check_loop_stop_thread = None 366def start_check_jobs_thread(): 367 """ 368 Start a thread to regularly monitor jobs. 369 """ 370 import atexit 371 from functools import partial 372 from meerschaum.utils.threading import RepeatTimer 373 from meerschaum._internal.static import STATIC_CONFIG 374 375 global _check_loop_stop_thread 376 sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds'] 377 378 _check_loop_stop_thread = RepeatTimer( 379 sleep_seconds, 380 partial( 381 _check_restart_jobs_against_lock, 382 silent=True, 383 ) 384 ) 385 _check_loop_stop_thread.daemon = True 386 atexit.register(stop_check_jobs_thread) 387 388 _check_loop_stop_thread.start() 389 return _check_loop_stop_thread 390 391 392def stop_check_jobs_thread(): 393 """ 394 Stop the job monitoring thread. 395 """ 396 from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH 397 from meerschaum.utils.warnings import warn 398 if _check_loop_stop_thread is None: 399 return 400 401 _check_loop_stop_thread.cancel() 402 403 try: 404 if CHECK_JOBS_LOCK_PATH.exists(): 405 CHECK_JOBS_LOCK_PATH.unlink() 406 except Exception as e: 407 warn(f"Failed to remove check jobs lock file:\n{e}") 408 409 410_context_keys = None 411def get_executor_keys_from_context() -> str: 412 """ 413 If we are running on the host with the default root, default to `'systemd'`. 414 Otherwise return `'local'`. 415 """ 416 global _context_keys 417 418 if _context_keys is not None: 419 return _context_keys 420 421 from meerschaum.config import get_config 422 from meerschaum.config.paths import ROOT_DIR_PATH, DEFAULT_ROOT_DIR_PATH 423 from meerschaum.utils.misc import is_systemd_available 424 425 configured_executor = get_config('meerschaum', 'executor', warn=False) 426 if configured_executor is not None: 427 return configured_executor 428 429 _context_keys = ( 430 'systemd' 431 if is_systemd_available() and ROOT_DIR_PATH == DEFAULT_ROOT_DIR_PATH 432 else 'local' 433 ) 434 return _context_keys 435 436 437def _install_healthcheck_job() -> SuccessTuple: 438 """ 439 Install the systemd job which checks local jobs. 440 """ 441 from meerschaum.config import get_config 442 443 enable_healthcheck = get_config('system', 'experimental', 'systemd_healthcheck') 444 if not enable_healthcheck: 445 return False, "Local healthcheck is disabled." 446 447 if get_executor_keys_from_context() != 'systemd': 448 return False, "Not running systemd." 449 450 job = Job( 451 '.local-healthcheck', 452 ['restart', 'jobs', '-e', 'local', '--loop', '--min-seconds', '60'], 453 executor_keys='systemd', 454 ) 455 return job.start()
70class Job: 71 """ 72 Manage a `meerschaum.utils.daemon.Daemon`, locally or remotely via the API. 73 """ 74 75 def __init__( 76 self, 77 name: str, 78 sysargs: Union[List[str], str, None] = None, 79 env: Optional[Dict[str, str]] = None, 80 executor_keys: Optional[str] = None, 81 delete_after_completion: bool = False, 82 refresh_seconds: Union[int, float, None] = None, 83 _properties: Optional[Dict[str, Any]] = None, 84 _rotating_log=None, 85 _stdin_file=None, 86 _status_hook: Optional[Callable[[], str]] = None, 87 _result_hook: Optional[Callable[[], SuccessTuple]] = None, 88 _externally_managed: bool = False, 89 ): 90 """ 91 Create a new job to manage a `meerschaum.utils.daemon.Daemon`. 92 93 Parameters 94 ---------- 95 name: str 96 The name of the job to be created. 97 This will also be used as the Daemon ID. 98 99 sysargs: Union[List[str], str, None], default None 100 The sysargs of the command to be executed, e.g. 'start api'. 101 102 env: Optional[Dict[str, str]], default None 103 If provided, set these environment variables in the job's process. 104 105 executor_keys: Optional[str], default None 106 If provided, execute the job remotely on an API instance, e.g. 'api:main'. 107 108 delete_after_completion: bool, default False 109 If `True`, delete this job when it has finished executing. 110 111 refresh_seconds: Union[int, float, None], default None 112 The number of seconds to sleep between refreshes. 113 Defaults to the configured value `system.cli.refresh_seconds`. 114 115 _properties: Optional[Dict[str, Any]], default None 116 If provided, use this to patch the daemon's properties. 117 """ 118 from meerschaum.utils.daemon import Daemon 119 for char in BANNED_CHARS: 120 if char in name: 121 raise ValueError(f"Invalid name: ({char}) is not allowed.") 122 123 if isinstance(sysargs, str): 124 sysargs = shlex.split(sysargs) 125 126 and_key = STATIC_CONFIG['system']['arguments']['and_key'] 127 escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key'] 128 if sysargs: 129 sysargs = [ 130 (arg if arg != escaped_and_key else and_key) 131 for arg in sysargs 132 ] 133 134 ### NOTE: 'local' and 'systemd' executors are being coalesced. 135 if executor_keys is None: 136 from meerschaum.jobs import get_executor_keys_from_context 137 executor_keys = get_executor_keys_from_context() 138 139 self.executor_keys = executor_keys 140 self.name = name 141 self.refresh_seconds = ( 142 refresh_seconds 143 if refresh_seconds is not None 144 else mrsm.get_config('system', 'cli', 'refresh_seconds') 145 ) 146 try: 147 self._daemon = ( 148 Daemon(daemon_id=name) 149 if executor_keys == 'local' 150 else None 151 ) 152 except Exception: 153 self._daemon = None 154 155 ### Handle any injected dependencies. 156 if _rotating_log is not None: 157 self._rotating_log = _rotating_log 158 if self._daemon is not None: 159 self._daemon._rotating_log = _rotating_log 160 161 if _stdin_file is not None: 162 self._stdin_file = _stdin_file 163 if self._daemon is not None: 164 self._daemon._stdin_file = _stdin_file 165 self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path 166 167 if _status_hook is not None: 168 self._status_hook = _status_hook 169 170 if _result_hook is not None: 171 self._result_hook = _result_hook 172 173 self._externally_managed = _externally_managed 174 self._properties_patch = _properties or {} 175 if _externally_managed: 176 self._properties_patch.update({'externally_managed': _externally_managed}) 177 178 if env: 179 self._properties_patch.update({'env': env}) 180 181 if delete_after_completion: 182 self._properties_patch.update({'delete_after_completion': delete_after_completion}) 183 184 daemon_sysargs = ( 185 self._daemon.properties.get('target', {}).get('args', [None])[0] 186 if self._daemon is not None 187 else None 188 ) 189 190 if daemon_sysargs and sysargs and daemon_sysargs != sysargs: 191 warn("Given sysargs differ from existing sysargs.") 192 193 self._sysargs = [ 194 arg 195 for arg in (daemon_sysargs or sysargs or []) 196 if arg not in ('-d', '--daemon') 197 ] 198 for restart_flag in RESTART_FLAGS: 199 if restart_flag in self._sysargs: 200 self._properties_patch.update({'restart': True}) 201 break 202 203 @staticmethod 204 def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job: 205 """ 206 Build a `Job` from the PID of a running Meerschaum process. 207 208 Parameters 209 ---------- 210 pid: int 211 The PID of the process. 212 213 executor_keys: Optional[str], default None 214 The executor keys to assign to the job. 215 """ 216 from meerschaum.config.paths import DAEMON_RESOURCES_PATH 217 218 psutil = mrsm.attempt_import('psutil') 219 try: 220 process = psutil.Process(pid) 221 except psutil.NoSuchProcess as e: 222 warn(f"Process with PID {pid} does not exist.", stack=False) 223 raise e 224 225 command_args = process.cmdline() 226 is_daemon = command_args[1] == '-c' 227 228 if is_daemon: 229 daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '') 230 root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None) 231 if root_dir is None: 232 from meerschaum.config.paths import ROOT_DIR_PATH 233 root_dir = ROOT_DIR_PATH 234 else: 235 root_dir = pathlib.Path(root_dir) 236 jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name 237 daemon_dir = jobs_dir / daemon_id 238 pid_file = daemon_dir / 'process.pid' 239 240 if pid_file.exists(): 241 with open(pid_file, 'r', encoding='utf-8') as f: 242 daemon_pid = int(f.read()) 243 244 if pid != daemon_pid: 245 raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}") 246 else: 247 raise EnvironmentError(f"Is job '{daemon_id}' running?") 248 249 return Job(daemon_id, executor_keys=executor_keys) 250 251 from meerschaum._internal.arguments._parse_arguments import parse_arguments 252 from meerschaum.utils.daemon import get_new_daemon_name 253 254 mrsm_ix = 0 255 for i, arg in enumerate(command_args): 256 if 'mrsm' in arg or 'meerschaum' in arg.lower(): 257 mrsm_ix = i 258 break 259 260 sysargs = command_args[mrsm_ix+1:] 261 kwargs = parse_arguments(sysargs) 262 name = kwargs.get('name', get_new_daemon_name()) 263 return Job(name, sysargs, executor_keys=executor_keys) 264 265 def start(self, debug: bool = False) -> SuccessTuple: 266 """ 267 Start the job's daemon. 268 """ 269 if self.executor is not None: 270 if not self.exists(debug=debug): 271 return self.executor.create_job( 272 self.name, 273 self.sysargs, 274 properties=self.daemon.properties, 275 debug=debug, 276 ) 277 return self.executor.start_job(self.name, debug=debug) 278 279 if self.is_running(): 280 return True, f"{self} is already running." 281 282 success, msg = self.daemon.run( 283 keep_daemon_output=(not self.delete_after_completion), 284 allow_dirty_run=True, 285 ) 286 if not success: 287 return success, msg 288 289 return success, f"Started {self}." 290 291 def stop( 292 self, 293 timeout_seconds: Union[int, float, None] = None, 294 debug: bool = False, 295 ) -> SuccessTuple: 296 """ 297 Stop the job's daemon. 298 """ 299 if self.executor is not None: 300 return self.executor.stop_job(self.name, debug=debug) 301 302 if self.daemon.status == 'stopped': 303 if not self.restart: 304 return True, f"{self} is not running." 305 elif self.stop_time is not None: 306 return True, f"{self} will not restart until manually started." 307 308 quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds) 309 if quit_success: 310 return quit_success, f"Stopped {self}." 311 312 warn( 313 f"Failed to gracefully quit {self}.", 314 stack=False, 315 ) 316 kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds) 317 if not kill_success: 318 return kill_success, kill_msg 319 320 return kill_success, f"Killed {self}." 321 322 def pause( 323 self, 324 timeout_seconds: Union[int, float, None] = None, 325 debug: bool = False, 326 ) -> SuccessTuple: 327 """ 328 Pause the job's daemon. 329 """ 330 if self.executor is not None: 331 return self.executor.pause_job(self.name, debug=debug) 332 333 pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds) 334 if not pause_success: 335 return pause_success, pause_msg 336 337 return pause_success, f"Paused {self}." 338 339 def delete(self, debug: bool = False) -> SuccessTuple: 340 """ 341 Delete the job and its daemon. 342 """ 343 if self.executor is not None: 344 return self.executor.delete_job(self.name, debug=debug) 345 346 if self.is_running(): 347 stop_success, stop_msg = self.stop() 348 if not stop_success: 349 return stop_success, stop_msg 350 351 cleanup_success, cleanup_msg = self.daemon.cleanup() 352 if not cleanup_success: 353 return cleanup_success, cleanup_msg 354 355 _ = self.daemon._properties.pop('result', None) 356 return cleanup_success, f"Deleted {self}." 357 358 def is_running(self) -> bool: 359 """ 360 Determine whether the job's daemon is running. 361 """ 362 return self.status == 'running' 363 364 def exists(self, debug: bool = False) -> bool: 365 """ 366 Determine whether the job exists. 367 """ 368 if self.executor is not None: 369 return self.executor.get_job_exists(self.name, debug=debug) 370 371 return self.daemon.path.exists() 372 373 def get_logs(self) -> Union[str, None]: 374 """ 375 Return the output text of the job's daemon. 376 """ 377 if self.executor is not None: 378 return self.executor.get_logs(self.name) 379 380 return self.daemon.log_text 381 382 def monitor_logs( 383 self, 384 callback_function: Callable[[str], None] = _default_stdout_callback, 385 input_callback_function: Optional[Callable[[], str]] = None, 386 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 387 stop_event: Optional[asyncio.Event] = None, 388 stop_on_exit: bool = False, 389 strip_timestamps: bool = False, 390 accept_input: bool = True, 391 debug: bool = False, 392 _logs_path: Optional[pathlib.Path] = None, 393 _log=None, 394 _stdin_file=None, 395 _wait_if_stopped: bool = True, 396 ): 397 """ 398 Monitor the job's log files and execute a callback on new lines. 399 400 Parameters 401 ---------- 402 callback_function: Callable[[str], None], default partial(print, end='') 403 The callback to execute as new data comes in. 404 Defaults to printing the output directly to `stdout`. 405 406 input_callback_function: Optional[Callable[[], str]], default None 407 If provided, execute this callback when the daemon is blocking on stdin. 408 Defaults to `sys.stdin.readline()`. 409 410 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 411 If provided, execute this callback when the daemon stops. 412 The job's SuccessTuple will be passed to the callback. 413 414 stop_event: Optional[asyncio.Event], default None 415 If provided, stop monitoring when this event is set. 416 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 417 from within `callback_function` to stop monitoring. 418 419 stop_on_exit: bool, default False 420 If `True`, stop monitoring when the job stops. 421 422 strip_timestamps: bool, default False 423 If `True`, remove leading timestamps from lines. 424 425 accept_input: bool, default True 426 If `True`, accept input when the daemon blocks on stdin. 427 """ 428 if self.executor is not None: 429 self.executor.monitor_logs( 430 self.name, 431 callback_function, 432 input_callback_function=input_callback_function, 433 stop_callback_function=stop_callback_function, 434 stop_on_exit=stop_on_exit, 435 accept_input=accept_input, 436 strip_timestamps=strip_timestamps, 437 debug=debug, 438 ) 439 return 440 441 monitor_logs_coroutine = self.monitor_logs_async( 442 callback_function=callback_function, 443 input_callback_function=input_callback_function, 444 stop_callback_function=stop_callback_function, 445 stop_event=stop_event, 446 stop_on_exit=stop_on_exit, 447 strip_timestamps=strip_timestamps, 448 accept_input=accept_input, 449 debug=debug, 450 _logs_path=_logs_path, 451 _log=_log, 452 _stdin_file=_stdin_file, 453 _wait_if_stopped=_wait_if_stopped, 454 ) 455 return asyncio.run(monitor_logs_coroutine) 456 457 async def monitor_logs_async( 458 self, 459 callback_function: Callable[[str], None] = _default_stdout_callback, 460 input_callback_function: Optional[Callable[[], str]] = None, 461 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 462 stop_event: Optional[asyncio.Event] = None, 463 stop_on_exit: bool = False, 464 strip_timestamps: bool = False, 465 accept_input: bool = True, 466 debug: bool = False, 467 _logs_path: Optional[pathlib.Path] = None, 468 _log=None, 469 _stdin_file=None, 470 _wait_if_stopped: bool = True, 471 ): 472 """ 473 Monitor the job's log files and await a callback on new lines. 474 475 Parameters 476 ---------- 477 callback_function: Callable[[str], None], default _default_stdout_callback 478 The callback to execute as new data comes in. 479 Defaults to printing the output directly to `stdout`. 480 481 input_callback_function: Optional[Callable[[], str]], default None 482 If provided, execute this callback when the daemon is blocking on stdin. 483 Defaults to `sys.stdin.readline()`. 484 485 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 486 If provided, execute this callback when the daemon stops. 487 The job's SuccessTuple will be passed to the callback. 488 489 stop_event: Optional[asyncio.Event], default None 490 If provided, stop monitoring when this event is set. 491 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 492 from within `callback_function` to stop monitoring. 493 494 stop_on_exit: bool, default False 495 If `True`, stop monitoring when the job stops. 496 497 strip_timestamps: bool, default False 498 If `True`, remove leading timestamps from lines. 499 500 accept_input: bool, default True 501 If `True`, accept input when the daemon blocks on stdin. 502 """ 503 from meerschaum.utils.prompt import prompt 504 505 def default_input_callback_function(): 506 prompt_kwargs = self.get_prompt_kwargs(debug=debug) 507 if prompt_kwargs: 508 answer = prompt(**prompt_kwargs) 509 return answer + '\n' 510 return sys.stdin.readline() 511 512 if input_callback_function is None: 513 input_callback_function = default_input_callback_function 514 515 if self.executor is not None: 516 await self.executor.monitor_logs_async( 517 self.name, 518 callback_function, 519 input_callback_function=input_callback_function, 520 stop_callback_function=stop_callback_function, 521 stop_on_exit=stop_on_exit, 522 strip_timestamps=strip_timestamps, 523 accept_input=accept_input, 524 debug=debug, 525 ) 526 return 527 528 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 529 530 events = { 531 'user': stop_event, 532 'stopped': asyncio.Event(), 533 'stop_token': asyncio.Event(), 534 'stop_exception': asyncio.Event(), 535 'stopped_timeout': asyncio.Event(), 536 } 537 combined_event = asyncio.Event() 538 emitted_text = False 539 stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file 540 541 async def check_job_status(): 542 if not stop_on_exit: 543 return 544 545 nonlocal emitted_text 546 547 sleep_time = 0.1 548 while sleep_time < 0.2: 549 if self.status == 'stopped': 550 if not emitted_text and _wait_if_stopped: 551 await asyncio.sleep(sleep_time) 552 sleep_time = round(sleep_time * 1.1, 3) 553 continue 554 555 if stop_callback_function is not None: 556 try: 557 if asyncio.iscoroutinefunction(stop_callback_function): 558 await stop_callback_function(self.result) 559 else: 560 stop_callback_function(self.result) 561 except asyncio.exceptions.CancelledError: 562 break 563 except Exception: 564 warn(traceback.format_exc()) 565 566 if stop_on_exit: 567 events['stopped'].set() 568 569 break 570 await asyncio.sleep(0.1) 571 572 events['stopped_timeout'].set() 573 574 async def check_blocking_on_input(): 575 while True: 576 if not emitted_text or not self.is_blocking_on_stdin(): 577 try: 578 await asyncio.sleep(self.refresh_seconds) 579 except asyncio.exceptions.CancelledError: 580 break 581 continue 582 583 if not self.is_running(): 584 break 585 586 await emit_latest_lines() 587 588 try: 589 print('', end='', flush=True) 590 if asyncio.iscoroutinefunction(input_callback_function): 591 data = await input_callback_function() 592 else: 593 loop = asyncio.get_running_loop() 594 data = await loop.run_in_executor(None, input_callback_function) 595 except KeyboardInterrupt: 596 break 597 # if not data.endswith('\n'): 598 # data += '\n' 599 600 stdin_file.write(data) 601 await asyncio.sleep(self.refresh_seconds) 602 603 async def combine_events(): 604 event_tasks = [ 605 asyncio.create_task(event.wait()) 606 for event in events.values() 607 if event is not None 608 ] 609 if not event_tasks: 610 return 611 612 try: 613 done, pending = await asyncio.wait( 614 event_tasks, 615 return_when=asyncio.FIRST_COMPLETED, 616 ) 617 for task in pending: 618 task.cancel() 619 except asyncio.exceptions.CancelledError: 620 pass 621 finally: 622 combined_event.set() 623 624 check_job_status_task = asyncio.create_task(check_job_status()) 625 check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input()) 626 combine_events_task = asyncio.create_task(combine_events()) 627 628 log = _log if _log is not None else self.daemon.rotating_log 629 lines_to_show = ( 630 self.daemon.properties.get( 631 'logs', {} 632 ).get( 633 'lines_to_show', get_config('jobs', 'logs', 'lines_to_show') 634 ) 635 ) 636 637 async def emit_latest_lines(): 638 nonlocal emitted_text 639 nonlocal stop_event 640 lines = log.readlines() 641 for line in lines[(-1 * lines_to_show):]: 642 if stop_event is not None and stop_event.is_set(): 643 return 644 645 line_stripped_extra = strip_timestamp_from_line(line.strip()) 646 line_stripped = strip_timestamp_from_line(line) 647 648 if line_stripped_extra == STOP_TOKEN: 649 events['stop_token'].set() 650 return 651 652 if line_stripped_extra == CLEAR_TOKEN: 653 clear_screen(debug=debug) 654 continue 655 656 if line_stripped_extra == FLUSH_TOKEN.strip(): 657 line_stripped = '' 658 line = '' 659 660 if strip_timestamps: 661 line = line_stripped 662 663 try: 664 if asyncio.iscoroutinefunction(callback_function): 665 await callback_function(line) 666 else: 667 callback_function(line) 668 emitted_text = True 669 except StopMonitoringLogs: 670 events['stop_exception'].set() 671 return 672 except Exception: 673 warn(f"Error in logs callback:\n{traceback.format_exc()}") 674 675 await emit_latest_lines() 676 677 tasks = ( 678 [check_job_status_task] 679 + ([check_blocking_on_input_task] if accept_input else []) 680 + [combine_events_task] 681 ) 682 try: 683 _ = asyncio.gather(*tasks, return_exceptions=True) 684 except asyncio.exceptions.CancelledError: 685 raise 686 except Exception: 687 warn(f"Failed to run async checks:\n{traceback.format_exc()}") 688 689 watchfiles = mrsm.attempt_import('watchfiles', lazy=False) 690 dir_path_to_monitor = ( 691 _logs_path 692 or (log.file_path.parent if log else None) 693 or LOGS_RESOURCES_PATH 694 ) 695 async for changes in watchfiles.awatch( 696 dir_path_to_monitor, 697 stop_event=combined_event, 698 ): 699 for change in changes: 700 file_path_str = change[1] 701 file_path = pathlib.Path(file_path_str) 702 latest_subfile_path = log.get_latest_subfile_path() 703 if latest_subfile_path != file_path: 704 continue 705 706 await emit_latest_lines() 707 708 await emit_latest_lines() 709 710 def is_blocking_on_stdin(self, debug: bool = False) -> bool: 711 """ 712 Return whether a job's daemon is blocking on stdin. 713 """ 714 if self.executor is not None: 715 return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug) 716 717 return self.is_running() and self.daemon.blocking_stdin_file_path.exists() 718 719 def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]: 720 """ 721 Return the kwargs to the blocking `prompt()`, if available. 722 """ 723 if self.executor is not None: 724 return self.executor.get_job_prompt_kwargs(self.name, debug=debug) 725 726 if not self.daemon.prompt_kwargs_file_path.exists(): 727 return {} 728 729 try: 730 with open(self.daemon.prompt_kwargs_file_path, 'r', encoding='utf-8') as f: 731 prompt_kwargs = json.load(f) 732 733 return prompt_kwargs 734 735 except Exception: 736 import traceback 737 traceback.print_exc() 738 return {} 739 740 def write_stdin(self, data): 741 """ 742 Write to a job's daemon's `stdin`. 743 """ 744 self.daemon.stdin_file.write(data) 745 746 @property 747 def executor(self) -> Union[Executor, None]: 748 """ 749 If the job is remote, return the connector to the remote API instance. 750 """ 751 return ( 752 mrsm.get_connector(self.executor_keys) 753 if self.executor_keys != 'local' 754 else None 755 ) 756 757 @property 758 def status(self) -> str: 759 """ 760 Return the running status of the job's daemon. 761 """ 762 if '_status_hook' in self.__dict__: 763 return self._status_hook() 764 765 if self.executor is not None: 766 return self.executor.get_job_status(self.name) 767 768 return self.daemon.status 769 770 @property 771 def pid(self) -> Union[int, None]: 772 """ 773 Return the PID of the job's dameon. 774 """ 775 if self.executor is not None: 776 return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None) 777 778 return self.daemon.pid 779 780 @property 781 def restart(self) -> bool: 782 """ 783 Return whether to restart a stopped job. 784 """ 785 if self.executor is not None: 786 return self.executor.get_job_metadata(self.name).get('restart', False) 787 788 return self.daemon.properties.get('restart', False) 789 790 @property 791 def result(self) -> SuccessTuple: 792 """ 793 Return the `SuccessTuple` when the job has terminated. 794 """ 795 if self.is_running(): 796 return True, f"{self} is running." 797 798 if '_result_hook' in self.__dict__: 799 return self._result_hook() 800 801 if self.executor is not None: 802 return ( 803 self.executor.get_job_metadata(self.name) 804 .get('result', (False, "No result available.")) 805 ) 806 807 _result = self.daemon.properties.get('result', None) 808 if _result is None: 809 from meerschaum.utils.daemon.Daemon import _results 810 return _results.get(self.daemon.daemon_id, (False, "No result available.")) 811 812 return tuple(_result) 813 814 @property 815 def sysargs(self) -> List[str]: 816 """ 817 Return the sysargs to use for the Daemon. 818 """ 819 if self._sysargs: 820 return self._sysargs 821 822 if self.executor is not None: 823 return self.executor.get_job_metadata(self.name).get('sysargs', []) 824 825 target_args = self.daemon.target_args 826 if target_args is None: 827 return [] 828 self._sysargs = target_args[0] if len(target_args) > 0 else [] 829 return self._sysargs 830 831 def get_daemon_properties(self) -> Dict[str, Any]: 832 """ 833 Return the `properties` dictionary for the job's daemon. 834 """ 835 remote_properties = ( 836 {} 837 if self.executor is None 838 else self.executor.get_job_properties(self.name) 839 ) 840 return { 841 **remote_properties, 842 **self._properties_patch 843 } 844 845 @property 846 def daemon(self) -> 'Daemon': 847 """ 848 Return the daemon which this job manages. 849 """ 850 from meerschaum.utils.daemon import Daemon 851 if self._daemon is not None and self.executor is None and self._sysargs: 852 return self._daemon 853 854 self._daemon = Daemon( 855 target=entry, 856 target_args=[self._sysargs], 857 target_kw={}, 858 daemon_id=self.name, 859 label=shlex.join(self._sysargs), 860 properties=self.get_daemon_properties(), 861 ) 862 if '_rotating_log' in self.__dict__: 863 self._daemon._rotating_log = self._rotating_log 864 865 if '_stdin_file' in self.__dict__: 866 self._daemon._stdin_file = self._stdin_file 867 self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path 868 869 return self._daemon 870 871 @property 872 def began(self) -> Union[datetime, None]: 873 """ 874 The datetime when the job began running. 875 """ 876 if self.executor is not None: 877 began_str = self.executor.get_job_began(self.name) 878 if began_str is None: 879 return None 880 return ( 881 datetime.fromisoformat(began_str) 882 .astimezone(timezone.utc) 883 .replace(tzinfo=None) 884 ) 885 886 began_str = self.daemon.properties.get('process', {}).get('began', None) 887 if began_str is None: 888 return None 889 890 return datetime.fromisoformat(began_str) 891 892 @property 893 def ended(self) -> Union[datetime, None]: 894 """ 895 The datetime when the job stopped running. 896 """ 897 if self.executor is not None: 898 ended_str = self.executor.get_job_ended(self.name) 899 if ended_str is None: 900 return None 901 return ( 902 datetime.fromisoformat(ended_str) 903 .astimezone(timezone.utc) 904 .replace(tzinfo=None) 905 ) 906 907 ended_str = self.daemon.properties.get('process', {}).get('ended', None) 908 if ended_str is None: 909 return None 910 911 return datetime.fromisoformat(ended_str) 912 913 @property 914 def paused(self) -> Union[datetime, None]: 915 """ 916 The datetime when the job was suspended while running. 917 """ 918 if self.executor is not None: 919 paused_str = self.executor.get_job_paused(self.name) 920 if paused_str is None: 921 return None 922 return ( 923 datetime.fromisoformat(paused_str) 924 .astimezone(timezone.utc) 925 .replace(tzinfo=None) 926 ) 927 928 paused_str = self.daemon.properties.get('process', {}).get('paused', None) 929 if paused_str is None: 930 return None 931 932 return datetime.fromisoformat(paused_str) 933 934 @property 935 def stop_time(self) -> Union[datetime, None]: 936 """ 937 Return the timestamp when the job was manually stopped. 938 """ 939 if self.executor is not None: 940 return self.executor.get_job_stop_time(self.name) 941 942 if not self.daemon.stop_path.exists(): 943 return None 944 945 stop_data = self.daemon._read_stop_file() 946 if not stop_data: 947 return None 948 949 stop_time_str = stop_data.get('stop_time', None) 950 if not stop_time_str: 951 warn(f"Could not read stop time for {self}.") 952 return None 953 954 return datetime.fromisoformat(stop_time_str) 955 956 @property 957 def hidden(self) -> bool: 958 """ 959 Return a bool indicating whether this job should be displayed. 960 """ 961 return ( 962 self.name.startswith('_') 963 or self.name.startswith('.') 964 or self._is_externally_managed 965 ) 966 967 def check_restart(self) -> SuccessTuple: 968 """ 969 If `restart` is `True` and the daemon is not running, 970 restart the job. 971 Do not restart if the job was manually stopped. 972 """ 973 if self.is_running(): 974 return True, f"{self} is running." 975 976 if not self.restart: 977 return True, f"{self} does not need to be restarted." 978 979 if self.stop_time is not None: 980 return True, f"{self} was manually stopped." 981 982 return self.start() 983 984 @property 985 def label(self) -> str: 986 """ 987 Return the job's Daemon label (joined sysargs). 988 """ 989 from meerschaum._internal.arguments import compress_pipeline_sysargs 990 sysargs = compress_pipeline_sysargs(self.sysargs) 991 return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip() 992 993 @property 994 def _externally_managed_file(self) -> pathlib.Path: 995 """ 996 Return the path to the externally managed file. 997 """ 998 return self.daemon.path / '.externally-managed' 999 1000 def _set_externally_managed(self): 1001 """ 1002 Set this job as externally managed. 1003 """ 1004 self._externally_managed = True 1005 try: 1006 self._externally_managed_file.parent.mkdir(exist_ok=True, parents=True) 1007 self._externally_managed_file.touch() 1008 except Exception as e: 1009 warn(e) 1010 1011 @property 1012 def _is_externally_managed(self) -> bool: 1013 """ 1014 Return whether this job is externally managed. 1015 """ 1016 return self.executor_keys in (None, 'local') and ( 1017 self._externally_managed or self._externally_managed_file.exists() 1018 ) 1019 1020 @property 1021 def env(self) -> Dict[str, str]: 1022 """ 1023 Return the environment variables to set for the job's process. 1024 """ 1025 if '_env' in self.__dict__: 1026 return self.__dict__['_env'] 1027 1028 _env = self.daemon.properties.get('env', {}) 1029 default_env = { 1030 'PYTHONUNBUFFERED': '1', 1031 'LINES': str(get_config('jobs', 'terminal', 'lines')), 1032 'COLUMNS': str(get_config('jobs', 'terminal', 'columns')), 1033 STATIC_CONFIG['environment']['noninteractive']: 'true', 1034 } 1035 self._env = {**default_env, **_env} 1036 return self._env 1037 1038 @property 1039 def delete_after_completion(self) -> bool: 1040 """ 1041 Return whether this job is configured to delete itself after completion. 1042 """ 1043 if '_delete_after_completion' in self.__dict__: 1044 return self.__dict__.get('_delete_after_completion', False) 1045 1046 self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False) 1047 return self._delete_after_completion 1048 1049 def __str__(self) -> str: 1050 sysargs = self.sysargs 1051 sysargs_str = shlex.join(sysargs) if sysargs else '' 1052 job_str = f'Job("{self.name}"' 1053 if sysargs_str: 1054 job_str += f', "{sysargs_str}"' 1055 1056 job_str += ')' 1057 return job_str 1058 1059 def __repr__(self) -> str: 1060 return str(self) 1061 1062 def __hash__(self) -> int: 1063 return hash(self.name)
Manage a meerschaum.utils.daemon.Daemon
, locally or remotely via the API.
75 def __init__( 76 self, 77 name: str, 78 sysargs: Union[List[str], str, None] = None, 79 env: Optional[Dict[str, str]] = None, 80 executor_keys: Optional[str] = None, 81 delete_after_completion: bool = False, 82 refresh_seconds: Union[int, float, None] = None, 83 _properties: Optional[Dict[str, Any]] = None, 84 _rotating_log=None, 85 _stdin_file=None, 86 _status_hook: Optional[Callable[[], str]] = None, 87 _result_hook: Optional[Callable[[], SuccessTuple]] = None, 88 _externally_managed: bool = False, 89 ): 90 """ 91 Create a new job to manage a `meerschaum.utils.daemon.Daemon`. 92 93 Parameters 94 ---------- 95 name: str 96 The name of the job to be created. 97 This will also be used as the Daemon ID. 98 99 sysargs: Union[List[str], str, None], default None 100 The sysargs of the command to be executed, e.g. 'start api'. 101 102 env: Optional[Dict[str, str]], default None 103 If provided, set these environment variables in the job's process. 104 105 executor_keys: Optional[str], default None 106 If provided, execute the job remotely on an API instance, e.g. 'api:main'. 107 108 delete_after_completion: bool, default False 109 If `True`, delete this job when it has finished executing. 110 111 refresh_seconds: Union[int, float, None], default None 112 The number of seconds to sleep between refreshes. 113 Defaults to the configured value `system.cli.refresh_seconds`. 114 115 _properties: Optional[Dict[str, Any]], default None 116 If provided, use this to patch the daemon's properties. 117 """ 118 from meerschaum.utils.daemon import Daemon 119 for char in BANNED_CHARS: 120 if char in name: 121 raise ValueError(f"Invalid name: ({char}) is not allowed.") 122 123 if isinstance(sysargs, str): 124 sysargs = shlex.split(sysargs) 125 126 and_key = STATIC_CONFIG['system']['arguments']['and_key'] 127 escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key'] 128 if sysargs: 129 sysargs = [ 130 (arg if arg != escaped_and_key else and_key) 131 for arg in sysargs 132 ] 133 134 ### NOTE: 'local' and 'systemd' executors are being coalesced. 135 if executor_keys is None: 136 from meerschaum.jobs import get_executor_keys_from_context 137 executor_keys = get_executor_keys_from_context() 138 139 self.executor_keys = executor_keys 140 self.name = name 141 self.refresh_seconds = ( 142 refresh_seconds 143 if refresh_seconds is not None 144 else mrsm.get_config('system', 'cli', 'refresh_seconds') 145 ) 146 try: 147 self._daemon = ( 148 Daemon(daemon_id=name) 149 if executor_keys == 'local' 150 else None 151 ) 152 except Exception: 153 self._daemon = None 154 155 ### Handle any injected dependencies. 156 if _rotating_log is not None: 157 self._rotating_log = _rotating_log 158 if self._daemon is not None: 159 self._daemon._rotating_log = _rotating_log 160 161 if _stdin_file is not None: 162 self._stdin_file = _stdin_file 163 if self._daemon is not None: 164 self._daemon._stdin_file = _stdin_file 165 self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path 166 167 if _status_hook is not None: 168 self._status_hook = _status_hook 169 170 if _result_hook is not None: 171 self._result_hook = _result_hook 172 173 self._externally_managed = _externally_managed 174 self._properties_patch = _properties or {} 175 if _externally_managed: 176 self._properties_patch.update({'externally_managed': _externally_managed}) 177 178 if env: 179 self._properties_patch.update({'env': env}) 180 181 if delete_after_completion: 182 self._properties_patch.update({'delete_after_completion': delete_after_completion}) 183 184 daemon_sysargs = ( 185 self._daemon.properties.get('target', {}).get('args', [None])[0] 186 if self._daemon is not None 187 else None 188 ) 189 190 if daemon_sysargs and sysargs and daemon_sysargs != sysargs: 191 warn("Given sysargs differ from existing sysargs.") 192 193 self._sysargs = [ 194 arg 195 for arg in (daemon_sysargs or sysargs or []) 196 if arg not in ('-d', '--daemon') 197 ] 198 for restart_flag in RESTART_FLAGS: 199 if restart_flag in self._sysargs: 200 self._properties_patch.update({'restart': True}) 201 break
Create a new job to manage a meerschaum.utils.daemon.Daemon
.
Parameters
- name (str): The name of the job to be created. This will also be used as the Daemon ID.
- sysargs (Union[List[str], str, None], default None): The sysargs of the command to be executed, e.g. 'start api'.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the job's process.
- executor_keys (Optional[str], default None): If provided, execute the job remotely on an API instance, e.g. 'api:main'.
- delete_after_completion (bool, default False):
If
True
, delete this job when it has finished executing. - refresh_seconds (Union[int, float, None], default None):
The number of seconds to sleep between refreshes.
Defaults to the configured value
system.cli.refresh_seconds
. - _properties (Optional[Dict[str, Any]], default None): If provided, use this to patch the daemon's properties.
203 @staticmethod 204 def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job: 205 """ 206 Build a `Job` from the PID of a running Meerschaum process. 207 208 Parameters 209 ---------- 210 pid: int 211 The PID of the process. 212 213 executor_keys: Optional[str], default None 214 The executor keys to assign to the job. 215 """ 216 from meerschaum.config.paths import DAEMON_RESOURCES_PATH 217 218 psutil = mrsm.attempt_import('psutil') 219 try: 220 process = psutil.Process(pid) 221 except psutil.NoSuchProcess as e: 222 warn(f"Process with PID {pid} does not exist.", stack=False) 223 raise e 224 225 command_args = process.cmdline() 226 is_daemon = command_args[1] == '-c' 227 228 if is_daemon: 229 daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '') 230 root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None) 231 if root_dir is None: 232 from meerschaum.config.paths import ROOT_DIR_PATH 233 root_dir = ROOT_DIR_PATH 234 else: 235 root_dir = pathlib.Path(root_dir) 236 jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name 237 daemon_dir = jobs_dir / daemon_id 238 pid_file = daemon_dir / 'process.pid' 239 240 if pid_file.exists(): 241 with open(pid_file, 'r', encoding='utf-8') as f: 242 daemon_pid = int(f.read()) 243 244 if pid != daemon_pid: 245 raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}") 246 else: 247 raise EnvironmentError(f"Is job '{daemon_id}' running?") 248 249 return Job(daemon_id, executor_keys=executor_keys) 250 251 from meerschaum._internal.arguments._parse_arguments import parse_arguments 252 from meerschaum.utils.daemon import get_new_daemon_name 253 254 mrsm_ix = 0 255 for i, arg in enumerate(command_args): 256 if 'mrsm' in arg or 'meerschaum' in arg.lower(): 257 mrsm_ix = i 258 break 259 260 sysargs = command_args[mrsm_ix+1:] 261 kwargs = parse_arguments(sysargs) 262 name = kwargs.get('name', get_new_daemon_name()) 263 return Job(name, sysargs, executor_keys=executor_keys)
Build a Job
from the PID of a running Meerschaum process.
Parameters
- pid (int): The PID of the process.
- executor_keys (Optional[str], default None): The executor keys to assign to the job.
265 def start(self, debug: bool = False) -> SuccessTuple: 266 """ 267 Start the job's daemon. 268 """ 269 if self.executor is not None: 270 if not self.exists(debug=debug): 271 return self.executor.create_job( 272 self.name, 273 self.sysargs, 274 properties=self.daemon.properties, 275 debug=debug, 276 ) 277 return self.executor.start_job(self.name, debug=debug) 278 279 if self.is_running(): 280 return True, f"{self} is already running." 281 282 success, msg = self.daemon.run( 283 keep_daemon_output=(not self.delete_after_completion), 284 allow_dirty_run=True, 285 ) 286 if not success: 287 return success, msg 288 289 return success, f"Started {self}."
Start the job's daemon.
291 def stop( 292 self, 293 timeout_seconds: Union[int, float, None] = None, 294 debug: bool = False, 295 ) -> SuccessTuple: 296 """ 297 Stop the job's daemon. 298 """ 299 if self.executor is not None: 300 return self.executor.stop_job(self.name, debug=debug) 301 302 if self.daemon.status == 'stopped': 303 if not self.restart: 304 return True, f"{self} is not running." 305 elif self.stop_time is not None: 306 return True, f"{self} will not restart until manually started." 307 308 quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds) 309 if quit_success: 310 return quit_success, f"Stopped {self}." 311 312 warn( 313 f"Failed to gracefully quit {self}.", 314 stack=False, 315 ) 316 kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds) 317 if not kill_success: 318 return kill_success, kill_msg 319 320 return kill_success, f"Killed {self}."
Stop the job's daemon.
322 def pause( 323 self, 324 timeout_seconds: Union[int, float, None] = None, 325 debug: bool = False, 326 ) -> SuccessTuple: 327 """ 328 Pause the job's daemon. 329 """ 330 if self.executor is not None: 331 return self.executor.pause_job(self.name, debug=debug) 332 333 pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds) 334 if not pause_success: 335 return pause_success, pause_msg 336 337 return pause_success, f"Paused {self}."
Pause the job's daemon.
339 def delete(self, debug: bool = False) -> SuccessTuple: 340 """ 341 Delete the job and its daemon. 342 """ 343 if self.executor is not None: 344 return self.executor.delete_job(self.name, debug=debug) 345 346 if self.is_running(): 347 stop_success, stop_msg = self.stop() 348 if not stop_success: 349 return stop_success, stop_msg 350 351 cleanup_success, cleanup_msg = self.daemon.cleanup() 352 if not cleanup_success: 353 return cleanup_success, cleanup_msg 354 355 _ = self.daemon._properties.pop('result', None) 356 return cleanup_success, f"Deleted {self}."
Delete the job and its daemon.
358 def is_running(self) -> bool: 359 """ 360 Determine whether the job's daemon is running. 361 """ 362 return self.status == 'running'
Determine whether the job's daemon is running.
364 def exists(self, debug: bool = False) -> bool: 365 """ 366 Determine whether the job exists. 367 """ 368 if self.executor is not None: 369 return self.executor.get_job_exists(self.name, debug=debug) 370 371 return self.daemon.path.exists()
Determine whether the job exists.
373 def get_logs(self) -> Union[str, None]: 374 """ 375 Return the output text of the job's daemon. 376 """ 377 if self.executor is not None: 378 return self.executor.get_logs(self.name) 379 380 return self.daemon.log_text
Return the output text of the job's daemon.
382 def monitor_logs( 383 self, 384 callback_function: Callable[[str], None] = _default_stdout_callback, 385 input_callback_function: Optional[Callable[[], str]] = None, 386 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 387 stop_event: Optional[asyncio.Event] = None, 388 stop_on_exit: bool = False, 389 strip_timestamps: bool = False, 390 accept_input: bool = True, 391 debug: bool = False, 392 _logs_path: Optional[pathlib.Path] = None, 393 _log=None, 394 _stdin_file=None, 395 _wait_if_stopped: bool = True, 396 ): 397 """ 398 Monitor the job's log files and execute a callback on new lines. 399 400 Parameters 401 ---------- 402 callback_function: Callable[[str], None], default partial(print, end='') 403 The callback to execute as new data comes in. 404 Defaults to printing the output directly to `stdout`. 405 406 input_callback_function: Optional[Callable[[], str]], default None 407 If provided, execute this callback when the daemon is blocking on stdin. 408 Defaults to `sys.stdin.readline()`. 409 410 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 411 If provided, execute this callback when the daemon stops. 412 The job's SuccessTuple will be passed to the callback. 413 414 stop_event: Optional[asyncio.Event], default None 415 If provided, stop monitoring when this event is set. 416 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 417 from within `callback_function` to stop monitoring. 418 419 stop_on_exit: bool, default False 420 If `True`, stop monitoring when the job stops. 421 422 strip_timestamps: bool, default False 423 If `True`, remove leading timestamps from lines. 424 425 accept_input: bool, default True 426 If `True`, accept input when the daemon blocks on stdin. 427 """ 428 if self.executor is not None: 429 self.executor.monitor_logs( 430 self.name, 431 callback_function, 432 input_callback_function=input_callback_function, 433 stop_callback_function=stop_callback_function, 434 stop_on_exit=stop_on_exit, 435 accept_input=accept_input, 436 strip_timestamps=strip_timestamps, 437 debug=debug, 438 ) 439 return 440 441 monitor_logs_coroutine = self.monitor_logs_async( 442 callback_function=callback_function, 443 input_callback_function=input_callback_function, 444 stop_callback_function=stop_callback_function, 445 stop_event=stop_event, 446 stop_on_exit=stop_on_exit, 447 strip_timestamps=strip_timestamps, 448 accept_input=accept_input, 449 debug=debug, 450 _logs_path=_logs_path, 451 _log=_log, 452 _stdin_file=_stdin_file, 453 _wait_if_stopped=_wait_if_stopped, 454 ) 455 return asyncio.run(monitor_logs_coroutine)
Monitor the job's log files and execute a callback on new lines.
Parameters
- callback_function (Callable[[str], None], default partial(print, end='')):
The callback to execute as new data comes in.
Defaults to printing the output directly to
stdout
. - input_callback_function (Optional[Callable[[], str]], default None):
If provided, execute this callback when the daemon is blocking on stdin.
Defaults to
sys.stdin.readline()
. - stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
- stop_event (Optional[asyncio.Event], default None):
If provided, stop monitoring when this event is set.
You may instead raise
meerschaum.jobs.StopMonitoringLogs
from withincallback_function
to stop monitoring. - stop_on_exit (bool, default False):
If
True
, stop monitoring when the job stops. - strip_timestamps (bool, default False):
If
True
, remove leading timestamps from lines. - accept_input (bool, default True):
If
True
, accept input when the daemon blocks on stdin.
457 async def monitor_logs_async( 458 self, 459 callback_function: Callable[[str], None] = _default_stdout_callback, 460 input_callback_function: Optional[Callable[[], str]] = None, 461 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 462 stop_event: Optional[asyncio.Event] = None, 463 stop_on_exit: bool = False, 464 strip_timestamps: bool = False, 465 accept_input: bool = True, 466 debug: bool = False, 467 _logs_path: Optional[pathlib.Path] = None, 468 _log=None, 469 _stdin_file=None, 470 _wait_if_stopped: bool = True, 471 ): 472 """ 473 Monitor the job's log files and await a callback on new lines. 474 475 Parameters 476 ---------- 477 callback_function: Callable[[str], None], default _default_stdout_callback 478 The callback to execute as new data comes in. 479 Defaults to printing the output directly to `stdout`. 480 481 input_callback_function: Optional[Callable[[], str]], default None 482 If provided, execute this callback when the daemon is blocking on stdin. 483 Defaults to `sys.stdin.readline()`. 484 485 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 486 If provided, execute this callback when the daemon stops. 487 The job's SuccessTuple will be passed to the callback. 488 489 stop_event: Optional[asyncio.Event], default None 490 If provided, stop monitoring when this event is set. 491 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 492 from within `callback_function` to stop monitoring. 493 494 stop_on_exit: bool, default False 495 If `True`, stop monitoring when the job stops. 496 497 strip_timestamps: bool, default False 498 If `True`, remove leading timestamps from lines. 499 500 accept_input: bool, default True 501 If `True`, accept input when the daemon blocks on stdin. 502 """ 503 from meerschaum.utils.prompt import prompt 504 505 def default_input_callback_function(): 506 prompt_kwargs = self.get_prompt_kwargs(debug=debug) 507 if prompt_kwargs: 508 answer = prompt(**prompt_kwargs) 509 return answer + '\n' 510 return sys.stdin.readline() 511 512 if input_callback_function is None: 513 input_callback_function = default_input_callback_function 514 515 if self.executor is not None: 516 await self.executor.monitor_logs_async( 517 self.name, 518 callback_function, 519 input_callback_function=input_callback_function, 520 stop_callback_function=stop_callback_function, 521 stop_on_exit=stop_on_exit, 522 strip_timestamps=strip_timestamps, 523 accept_input=accept_input, 524 debug=debug, 525 ) 526 return 527 528 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 529 530 events = { 531 'user': stop_event, 532 'stopped': asyncio.Event(), 533 'stop_token': asyncio.Event(), 534 'stop_exception': asyncio.Event(), 535 'stopped_timeout': asyncio.Event(), 536 } 537 combined_event = asyncio.Event() 538 emitted_text = False 539 stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file 540 541 async def check_job_status(): 542 if not stop_on_exit: 543 return 544 545 nonlocal emitted_text 546 547 sleep_time = 0.1 548 while sleep_time < 0.2: 549 if self.status == 'stopped': 550 if not emitted_text and _wait_if_stopped: 551 await asyncio.sleep(sleep_time) 552 sleep_time = round(sleep_time * 1.1, 3) 553 continue 554 555 if stop_callback_function is not None: 556 try: 557 if asyncio.iscoroutinefunction(stop_callback_function): 558 await stop_callback_function(self.result) 559 else: 560 stop_callback_function(self.result) 561 except asyncio.exceptions.CancelledError: 562 break 563 except Exception: 564 warn(traceback.format_exc()) 565 566 if stop_on_exit: 567 events['stopped'].set() 568 569 break 570 await asyncio.sleep(0.1) 571 572 events['stopped_timeout'].set() 573 574 async def check_blocking_on_input(): 575 while True: 576 if not emitted_text or not self.is_blocking_on_stdin(): 577 try: 578 await asyncio.sleep(self.refresh_seconds) 579 except asyncio.exceptions.CancelledError: 580 break 581 continue 582 583 if not self.is_running(): 584 break 585 586 await emit_latest_lines() 587 588 try: 589 print('', end='', flush=True) 590 if asyncio.iscoroutinefunction(input_callback_function): 591 data = await input_callback_function() 592 else: 593 loop = asyncio.get_running_loop() 594 data = await loop.run_in_executor(None, input_callback_function) 595 except KeyboardInterrupt: 596 break 597 # if not data.endswith('\n'): 598 # data += '\n' 599 600 stdin_file.write(data) 601 await asyncio.sleep(self.refresh_seconds) 602 603 async def combine_events(): 604 event_tasks = [ 605 asyncio.create_task(event.wait()) 606 for event in events.values() 607 if event is not None 608 ] 609 if not event_tasks: 610 return 611 612 try: 613 done, pending = await asyncio.wait( 614 event_tasks, 615 return_when=asyncio.FIRST_COMPLETED, 616 ) 617 for task in pending: 618 task.cancel() 619 except asyncio.exceptions.CancelledError: 620 pass 621 finally: 622 combined_event.set() 623 624 check_job_status_task = asyncio.create_task(check_job_status()) 625 check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input()) 626 combine_events_task = asyncio.create_task(combine_events()) 627 628 log = _log if _log is not None else self.daemon.rotating_log 629 lines_to_show = ( 630 self.daemon.properties.get( 631 'logs', {} 632 ).get( 633 'lines_to_show', get_config('jobs', 'logs', 'lines_to_show') 634 ) 635 ) 636 637 async def emit_latest_lines(): 638 nonlocal emitted_text 639 nonlocal stop_event 640 lines = log.readlines() 641 for line in lines[(-1 * lines_to_show):]: 642 if stop_event is not None and stop_event.is_set(): 643 return 644 645 line_stripped_extra = strip_timestamp_from_line(line.strip()) 646 line_stripped = strip_timestamp_from_line(line) 647 648 if line_stripped_extra == STOP_TOKEN: 649 events['stop_token'].set() 650 return 651 652 if line_stripped_extra == CLEAR_TOKEN: 653 clear_screen(debug=debug) 654 continue 655 656 if line_stripped_extra == FLUSH_TOKEN.strip(): 657 line_stripped = '' 658 line = '' 659 660 if strip_timestamps: 661 line = line_stripped 662 663 try: 664 if asyncio.iscoroutinefunction(callback_function): 665 await callback_function(line) 666 else: 667 callback_function(line) 668 emitted_text = True 669 except StopMonitoringLogs: 670 events['stop_exception'].set() 671 return 672 except Exception: 673 warn(f"Error in logs callback:\n{traceback.format_exc()}") 674 675 await emit_latest_lines() 676 677 tasks = ( 678 [check_job_status_task] 679 + ([check_blocking_on_input_task] if accept_input else []) 680 + [combine_events_task] 681 ) 682 try: 683 _ = asyncio.gather(*tasks, return_exceptions=True) 684 except asyncio.exceptions.CancelledError: 685 raise 686 except Exception: 687 warn(f"Failed to run async checks:\n{traceback.format_exc()}") 688 689 watchfiles = mrsm.attempt_import('watchfiles', lazy=False) 690 dir_path_to_monitor = ( 691 _logs_path 692 or (log.file_path.parent if log else None) 693 or LOGS_RESOURCES_PATH 694 ) 695 async for changes in watchfiles.awatch( 696 dir_path_to_monitor, 697 stop_event=combined_event, 698 ): 699 for change in changes: 700 file_path_str = change[1] 701 file_path = pathlib.Path(file_path_str) 702 latest_subfile_path = log.get_latest_subfile_path() 703 if latest_subfile_path != file_path: 704 continue 705 706 await emit_latest_lines() 707 708 await emit_latest_lines()
Monitor the job's log files and await a callback on new lines.
Parameters
- callback_function (Callable[[str], None], default _default_stdout_callback):
The callback to execute as new data comes in.
Defaults to printing the output directly to
stdout
. - input_callback_function (Optional[Callable[[], str]], default None):
If provided, execute this callback when the daemon is blocking on stdin.
Defaults to
sys.stdin.readline()
. - stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
- stop_event (Optional[asyncio.Event], default None):
If provided, stop monitoring when this event is set.
You may instead raise
meerschaum.jobs.StopMonitoringLogs
from withincallback_function
to stop monitoring. - stop_on_exit (bool, default False):
If
True
, stop monitoring when the job stops. - strip_timestamps (bool, default False):
If
True
, remove leading timestamps from lines. - accept_input (bool, default True):
If
True
, accept input when the daemon blocks on stdin.
710 def is_blocking_on_stdin(self, debug: bool = False) -> bool: 711 """ 712 Return whether a job's daemon is blocking on stdin. 713 """ 714 if self.executor is not None: 715 return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug) 716 717 return self.is_running() and self.daemon.blocking_stdin_file_path.exists()
Return whether a job's daemon is blocking on stdin.
719 def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]: 720 """ 721 Return the kwargs to the blocking `prompt()`, if available. 722 """ 723 if self.executor is not None: 724 return self.executor.get_job_prompt_kwargs(self.name, debug=debug) 725 726 if not self.daemon.prompt_kwargs_file_path.exists(): 727 return {} 728 729 try: 730 with open(self.daemon.prompt_kwargs_file_path, 'r', encoding='utf-8') as f: 731 prompt_kwargs = json.load(f) 732 733 return prompt_kwargs 734 735 except Exception: 736 import traceback 737 traceback.print_exc() 738 return {}
Return the kwargs to the blocking prompt()
, if available.
740 def write_stdin(self, data): 741 """ 742 Write to a job's daemon's `stdin`. 743 """ 744 self.daemon.stdin_file.write(data)
Write to a job's daemon's stdin
.
746 @property 747 def executor(self) -> Union[Executor, None]: 748 """ 749 If the job is remote, return the connector to the remote API instance. 750 """ 751 return ( 752 mrsm.get_connector(self.executor_keys) 753 if self.executor_keys != 'local' 754 else None 755 )
If the job is remote, return the connector to the remote API instance.
757 @property 758 def status(self) -> str: 759 """ 760 Return the running status of the job's daemon. 761 """ 762 if '_status_hook' in self.__dict__: 763 return self._status_hook() 764 765 if self.executor is not None: 766 return self.executor.get_job_status(self.name) 767 768 return self.daemon.status
Return the running status of the job's daemon.
770 @property 771 def pid(self) -> Union[int, None]: 772 """ 773 Return the PID of the job's dameon. 774 """ 775 if self.executor is not None: 776 return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None) 777 778 return self.daemon.pid
Return the PID of the job's dameon.
780 @property 781 def restart(self) -> bool: 782 """ 783 Return whether to restart a stopped job. 784 """ 785 if self.executor is not None: 786 return self.executor.get_job_metadata(self.name).get('restart', False) 787 788 return self.daemon.properties.get('restart', False)
Return whether to restart a stopped job.
790 @property 791 def result(self) -> SuccessTuple: 792 """ 793 Return the `SuccessTuple` when the job has terminated. 794 """ 795 if self.is_running(): 796 return True, f"{self} is running." 797 798 if '_result_hook' in self.__dict__: 799 return self._result_hook() 800 801 if self.executor is not None: 802 return ( 803 self.executor.get_job_metadata(self.name) 804 .get('result', (False, "No result available.")) 805 ) 806 807 _result = self.daemon.properties.get('result', None) 808 if _result is None: 809 from meerschaum.utils.daemon.Daemon import _results 810 return _results.get(self.daemon.daemon_id, (False, "No result available.")) 811 812 return tuple(_result)
Return the SuccessTuple
when the job has terminated.
814 @property 815 def sysargs(self) -> List[str]: 816 """ 817 Return the sysargs to use for the Daemon. 818 """ 819 if self._sysargs: 820 return self._sysargs 821 822 if self.executor is not None: 823 return self.executor.get_job_metadata(self.name).get('sysargs', []) 824 825 target_args = self.daemon.target_args 826 if target_args is None: 827 return [] 828 self._sysargs = target_args[0] if len(target_args) > 0 else [] 829 return self._sysargs
Return the sysargs to use for the Daemon.
831 def get_daemon_properties(self) -> Dict[str, Any]: 832 """ 833 Return the `properties` dictionary for the job's daemon. 834 """ 835 remote_properties = ( 836 {} 837 if self.executor is None 838 else self.executor.get_job_properties(self.name) 839 ) 840 return { 841 **remote_properties, 842 **self._properties_patch 843 }
Return the properties
dictionary for the job's daemon.
845 @property 846 def daemon(self) -> 'Daemon': 847 """ 848 Return the daemon which this job manages. 849 """ 850 from meerschaum.utils.daemon import Daemon 851 if self._daemon is not None and self.executor is None and self._sysargs: 852 return self._daemon 853 854 self._daemon = Daemon( 855 target=entry, 856 target_args=[self._sysargs], 857 target_kw={}, 858 daemon_id=self.name, 859 label=shlex.join(self._sysargs), 860 properties=self.get_daemon_properties(), 861 ) 862 if '_rotating_log' in self.__dict__: 863 self._daemon._rotating_log = self._rotating_log 864 865 if '_stdin_file' in self.__dict__: 866 self._daemon._stdin_file = self._stdin_file 867 self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path 868 869 return self._daemon
Return the daemon which this job manages.
871 @property 872 def began(self) -> Union[datetime, None]: 873 """ 874 The datetime when the job began running. 875 """ 876 if self.executor is not None: 877 began_str = self.executor.get_job_began(self.name) 878 if began_str is None: 879 return None 880 return ( 881 datetime.fromisoformat(began_str) 882 .astimezone(timezone.utc) 883 .replace(tzinfo=None) 884 ) 885 886 began_str = self.daemon.properties.get('process', {}).get('began', None) 887 if began_str is None: 888 return None 889 890 return datetime.fromisoformat(began_str)
The datetime when the job began running.
892 @property 893 def ended(self) -> Union[datetime, None]: 894 """ 895 The datetime when the job stopped running. 896 """ 897 if self.executor is not None: 898 ended_str = self.executor.get_job_ended(self.name) 899 if ended_str is None: 900 return None 901 return ( 902 datetime.fromisoformat(ended_str) 903 .astimezone(timezone.utc) 904 .replace(tzinfo=None) 905 ) 906 907 ended_str = self.daemon.properties.get('process', {}).get('ended', None) 908 if ended_str is None: 909 return None 910 911 return datetime.fromisoformat(ended_str)
The datetime when the job stopped running.
913 @property 914 def paused(self) -> Union[datetime, None]: 915 """ 916 The datetime when the job was suspended while running. 917 """ 918 if self.executor is not None: 919 paused_str = self.executor.get_job_paused(self.name) 920 if paused_str is None: 921 return None 922 return ( 923 datetime.fromisoformat(paused_str) 924 .astimezone(timezone.utc) 925 .replace(tzinfo=None) 926 ) 927 928 paused_str = self.daemon.properties.get('process', {}).get('paused', None) 929 if paused_str is None: 930 return None 931 932 return datetime.fromisoformat(paused_str)
The datetime when the job was suspended while running.
934 @property 935 def stop_time(self) -> Union[datetime, None]: 936 """ 937 Return the timestamp when the job was manually stopped. 938 """ 939 if self.executor is not None: 940 return self.executor.get_job_stop_time(self.name) 941 942 if not self.daemon.stop_path.exists(): 943 return None 944 945 stop_data = self.daemon._read_stop_file() 946 if not stop_data: 947 return None 948 949 stop_time_str = stop_data.get('stop_time', None) 950 if not stop_time_str: 951 warn(f"Could not read stop time for {self}.") 952 return None 953 954 return datetime.fromisoformat(stop_time_str)
Return the timestamp when the job was manually stopped.
967 def check_restart(self) -> SuccessTuple: 968 """ 969 If `restart` is `True` and the daemon is not running, 970 restart the job. 971 Do not restart if the job was manually stopped. 972 """ 973 if self.is_running(): 974 return True, f"{self} is running." 975 976 if not self.restart: 977 return True, f"{self} does not need to be restarted." 978 979 if self.stop_time is not None: 980 return True, f"{self} was manually stopped." 981 982 return self.start()
If restart
is True
and the daemon is not running,
restart the job.
Do not restart if the job was manually stopped.
984 @property 985 def label(self) -> str: 986 """ 987 Return the job's Daemon label (joined sysargs). 988 """ 989 from meerschaum._internal.arguments import compress_pipeline_sysargs 990 sysargs = compress_pipeline_sysargs(self.sysargs) 991 return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip()
Return the job's Daemon label (joined sysargs).
1020 @property 1021 def env(self) -> Dict[str, str]: 1022 """ 1023 Return the environment variables to set for the job's process. 1024 """ 1025 if '_env' in self.__dict__: 1026 return self.__dict__['_env'] 1027 1028 _env = self.daemon.properties.get('env', {}) 1029 default_env = { 1030 'PYTHONUNBUFFERED': '1', 1031 'LINES': str(get_config('jobs', 'terminal', 'lines')), 1032 'COLUMNS': str(get_config('jobs', 'terminal', 'columns')), 1033 STATIC_CONFIG['environment']['noninteractive']: 'true', 1034 } 1035 self._env = {**default_env, **_env} 1036 return self._env
Return the environment variables to set for the job's process.
1038 @property 1039 def delete_after_completion(self) -> bool: 1040 """ 1041 Return whether this job is configured to delete itself after completion. 1042 """ 1043 if '_delete_after_completion' in self.__dict__: 1044 return self.__dict__.get('_delete_after_completion', False) 1045 1046 self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False) 1047 return self._delete_after_completion
Return whether this job is configured to delete itself after completion.
49class StopMonitoringLogs(Exception): 50 """ 51 Raise this exception to stop the logs monitoring. 52 """
Raise this exception to stop the logs monitoring.
37def get_jobs( 38 executor_keys: Optional[str] = None, 39 include_hidden: bool = False, 40 combine_local_and_systemd: bool = True, 41 debug: bool = False, 42) -> Dict[str, Job]: 43 """ 44 Return a dictionary of the existing jobs. 45 46 Parameters 47 ---------- 48 executor_keys: Optional[str], default None 49 If provided, return remote jobs on the given API instance. 50 Otherwise return local jobs. 51 52 include_hidden: bool, default False 53 If `True`, include jobs with the `hidden` attribute. 54 55 Returns 56 ------- 57 A dictionary mapping job names to jobs. 58 """ 59 from meerschaum.connectors.parse import parse_executor_keys 60 executor_keys = executor_keys or get_executor_keys_from_context() 61 62 include_local_and_system = ( 63 combine_local_and_systemd 64 and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd') 65 and get_executor_keys_from_context() == 'systemd' 66 ) 67 68 def _get_local_jobs(): 69 from meerschaum.utils.daemon import get_daemons 70 daemons = get_daemons() 71 jobs = { 72 daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local') 73 for daemon in daemons 74 } 75 return { 76 name: job 77 for name, job in jobs.items() 78 if (include_hidden or not job.hidden) and not job._is_externally_managed 79 80 } 81 82 def _get_systemd_jobs(): 83 from meerschaum.jobs.systemd import SystemdExecutor 84 conn = SystemdExecutor('systemd') 85 jobs = conn.get_jobs(debug=debug) 86 return { 87 name: job 88 for name, job in jobs.items() 89 if include_hidden or not job.hidden 90 } 91 92 if include_local_and_system: 93 local_jobs = _get_local_jobs() 94 systemd_jobs = _get_systemd_jobs() 95 shared_jobs = set(local_jobs) & set(systemd_jobs) 96 if shared_jobs: 97 from meerschaum.utils.misc import items_str 98 from meerschaum.utils.warnings import warn 99 warn( 100 "Job" 101 + ('s' if len(shared_jobs) != 1 else '') 102 + f" {items_str(list(shared_jobs))} " 103 + "exist" 104 + ('s' if len(shared_jobs) == 1 else '') 105 + " in both `local` and `systemd`.", 106 stack=False, 107 ) 108 return {**local_jobs, **systemd_jobs} 109 110 if executor_keys == 'local': 111 return _get_local_jobs() 112 113 if executor_keys == 'systemd': 114 return _get_systemd_jobs() 115 116 try: 117 _ = parse_executor_keys(executor_keys, construct=False) 118 conn = parse_executor_keys(executor_keys) 119 jobs = conn.get_jobs(debug=debug) 120 return { 121 name: job 122 for name, job in jobs.items() 123 if include_hidden or not job.hidden 124 } 125 except Exception: 126 return {}
Return a dictionary of the existing jobs.
Parameters
- executor_keys (Optional[str], default None): If provided, return remote jobs on the given API instance. Otherwise return local jobs.
- include_hidden (bool, default False):
If
True
, include jobs with thehidden
attribute.
Returns
- A dictionary mapping job names to jobs.
129def get_filtered_jobs( 130 executor_keys: Optional[str] = None, 131 filter_list: Optional[List[str]] = None, 132 include_hidden: bool = False, 133 combine_local_and_systemd: bool = True, 134 warn: bool = False, 135 debug: bool = False, 136) -> Dict[str, Job]: 137 """ 138 Return a list of jobs filtered by the user. 139 """ 140 from meerschaum.utils.warnings import warn as _warn 141 jobs = get_jobs( 142 executor_keys, 143 include_hidden=True, 144 combine_local_and_systemd=combine_local_and_systemd, 145 debug=debug, 146 ) 147 if not filter_list: 148 return { 149 name: job 150 for name, job in jobs.items() 151 if include_hidden or not job.hidden 152 } 153 154 jobs_to_return = {} 155 filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')] 156 filter_list_with_underscores = [name for name in filter_list if name.startswith('_')] 157 if ( 158 filter_list_without_underscores and not filter_list_with_underscores 159 or filter_list_with_underscores and not filter_list_without_underscores 160 ): 161 pass 162 for name in filter_list: 163 job = jobs.get(name, None) 164 if job is None: 165 if warn: 166 _warn( 167 f"Job '{name}' does not exist.", 168 stack=False, 169 ) 170 continue 171 jobs_to_return[name] = job 172 173 if not jobs_to_return and filter_list_with_underscores: 174 names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores] 175 return { 176 name: job 177 for name, job in jobs.items() 178 if name not in names_to_exclude 179 } 180 181 return jobs_to_return
Return a list of jobs filtered by the user.
184def get_restart_jobs( 185 executor_keys: Optional[str] = None, 186 jobs: Optional[Dict[str, Job]] = None, 187 include_hidden: bool = False, 188 combine_local_and_systemd: bool = True, 189 debug: bool = False, 190) -> Dict[str, Job]: 191 """ 192 Return jobs which were created with `--restart` or `--loop`. 193 """ 194 if jobs is None: 195 jobs = get_jobs( 196 executor_keys, 197 include_hidden=include_hidden, 198 combine_local_and_systemd=combine_local_and_systemd, 199 debug=debug, 200 ) 201 202 return { 203 name: job 204 for name, job in jobs.items() 205 if job.restart 206 }
Return jobs which were created with --restart
or --loop
.
209def get_running_jobs( 210 executor_keys: Optional[str] = None, 211 jobs: Optional[Dict[str, Job]] = None, 212 include_hidden: bool = False, 213 combine_local_and_systemd: bool = True, 214 debug: bool = False, 215) -> Dict[str, Job]: 216 """ 217 Return a dictionary of running jobs. 218 """ 219 if jobs is None: 220 jobs = get_jobs( 221 executor_keys, 222 include_hidden=include_hidden, 223 combine_local_and_systemd=combine_local_and_systemd, 224 debug=debug, 225 ) 226 227 return { 228 name: job 229 for name, job in jobs.items() 230 if job.status == 'running' 231 }
Return a dictionary of running jobs.
259def get_stopped_jobs( 260 executor_keys: Optional[str] = None, 261 jobs: Optional[Dict[str, Job]] = None, 262 include_hidden: bool = False, 263 combine_local_and_systemd: bool = True, 264 debug: bool = False, 265) -> Dict[str, Job]: 266 """ 267 Return a dictionary of stopped jobs. 268 """ 269 if jobs is None: 270 jobs = get_jobs( 271 executor_keys, 272 include_hidden=include_hidden, 273 combine_local_and_systemd=combine_local_and_systemd, 274 debug=debug, 275 ) 276 277 return { 278 name: job 279 for name, job in jobs.items() 280 if job.status == 'stopped' 281 }
Return a dictionary of stopped jobs.
234def get_paused_jobs( 235 executor_keys: Optional[str] = None, 236 jobs: Optional[Dict[str, Job]] = None, 237 include_hidden: bool = False, 238 combine_local_and_systemd: bool = True, 239 debug: bool = False, 240) -> Dict[str, Job]: 241 """ 242 Return a dictionary of paused jobs. 243 """ 244 if jobs is None: 245 jobs = get_jobs( 246 executor_keys, 247 include_hidden=include_hidden, 248 combine_local_and_systemd=combine_local_and_systemd, 249 debug=debug, 250 ) 251 252 return { 253 name: job 254 for name, job in jobs.items() 255 if job.status == 'paused' 256 }
Return a dictionary of paused jobs.
284def make_executor(cls): 285 """ 286 Register a class as an `Executor`. 287 """ 288 import re 289 from meerschaum.connectors import make_connector 290 suffix_regex = r'executor$' 291 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 292 if typ not in executor_types: 293 executor_types.append(typ) 294 return make_connector(cls, _is_executor=True)
Register a class as an Executor
.
22class Executor(Connector): 23 """ 24 Define the methods for managing jobs. 25 """ 26 27 @abstractmethod 28 def get_job_names(self, debug: bool = False) -> List[str]: 29 """ 30 Return a list of existing jobs, including hidden ones. 31 """ 32 33 @abstractmethod 34 def get_job_exists(self, name: str, debug: bool = False) -> bool: 35 """ 36 Return whether a job exists. 37 """ 38 39 @abstractmethod 40 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 41 """ 42 Return a dictionary of existing jobs. 43 """ 44 45 @abstractmethod 46 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 47 """ 48 Return a job's metadata. 49 """ 50 51 @abstractmethod 52 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 53 """ 54 Return the underlying daemon's properties. 55 """ 56 @abstractmethod 57 def get_job_status(self, name: str, debug: bool = False) -> str: 58 """ 59 Return the job's status. 60 """ 61 62 @abstractmethod 63 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 64 """ 65 Return when a job began running. 66 """ 67 68 @abstractmethod 69 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 70 """ 71 Return when a job stopped running. 72 """ 73 74 @abstractmethod 75 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 76 """ 77 Return a job's `paused` timestamp, if it exists. 78 """ 79 80 @abstractmethod 81 def create_job( 82 self, 83 name: str, 84 sysargs: List[str], 85 properties: Optional[Dict[str, Any]] = None, 86 debug: bool = False, 87 ) -> SuccessTuple: 88 """ 89 Create a new job. 90 """ 91 92 @abstractmethod 93 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 94 """ 95 Start a job. 96 """ 97 98 @abstractmethod 99 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 100 """ 101 Stop a job. 102 """ 103 104 @abstractmethod 105 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 106 """ 107 Pause a job. 108 """ 109 110 @abstractmethod 111 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 112 """ 113 Delete a job. 114 """ 115 116 @abstractmethod 117 def get_logs(self, name: str, debug: bool = False) -> str: 118 """ 119 Return a job's log output. 120 """ 121 122 @abstractmethod 123 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 124 """ 125 Return the job's manual stop time. 126 """ 127 128 @abstractmethod 129 async def monitor_logs_async( 130 self, 131 name: str, 132 callback_function: Callable[[Any], Any], 133 input_callback_function: Callable[[], str], 134 stop_callback_function: Callable[[SuccessTuple], str], 135 stop_on_exit: bool = False, 136 strip_timestamps: bool = False, 137 accept_input: bool = True, 138 debug: bool = False, 139 ): 140 """ 141 Monitor a job's log files and await a callback with the changes. 142 """ 143 144 @abstractmethod 145 def monitor_logs(self, *args, **kwargs): 146 """ 147 Monitor a job's log files. 148 """ 149 150 @abstractmethod 151 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 152 """ 153 Return whether a job is blocking on stdin. 154 """ 155 156 @abstractmethod 157 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 158 """ 159 Return the kwargs to the blocking prompt. 160 """
Define the methods for managing jobs.
27 @abstractmethod 28 def get_job_names(self, debug: bool = False) -> List[str]: 29 """ 30 Return a list of existing jobs, including hidden ones. 31 """
Return a list of existing jobs, including hidden ones.
33 @abstractmethod 34 def get_job_exists(self, name: str, debug: bool = False) -> bool: 35 """ 36 Return whether a job exists. 37 """
Return whether a job exists.
39 @abstractmethod 40 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 41 """ 42 Return a dictionary of existing jobs. 43 """
Return a dictionary of existing jobs.
45 @abstractmethod 46 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 47 """ 48 Return a job's metadata. 49 """
Return a job's metadata.
51 @abstractmethod 52 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 53 """ 54 Return the underlying daemon's properties. 55 """
Return the underlying daemon's properties.
56 @abstractmethod 57 def get_job_status(self, name: str, debug: bool = False) -> str: 58 """ 59 Return the job's status. 60 """
Return the job's status.
62 @abstractmethod 63 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 64 """ 65 Return when a job began running. 66 """
Return when a job began running.
68 @abstractmethod 69 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 70 """ 71 Return when a job stopped running. 72 """
Return when a job stopped running.
74 @abstractmethod 75 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 76 """ 77 Return a job's `paused` timestamp, if it exists. 78 """
Return a job's paused
timestamp, if it exists.
80 @abstractmethod 81 def create_job( 82 self, 83 name: str, 84 sysargs: List[str], 85 properties: Optional[Dict[str, Any]] = None, 86 debug: bool = False, 87 ) -> SuccessTuple: 88 """ 89 Create a new job. 90 """
Create a new job.
92 @abstractmethod 93 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 94 """ 95 Start a job. 96 """
Start a job.
98 @abstractmethod 99 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 100 """ 101 Stop a job. 102 """
Stop a job.
104 @abstractmethod 105 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 106 """ 107 Pause a job. 108 """
Pause a job.
110 @abstractmethod 111 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 112 """ 113 Delete a job. 114 """
Delete a job.
116 @abstractmethod 117 def get_logs(self, name: str, debug: bool = False) -> str: 118 """ 119 Return a job's log output. 120 """
Return a job's log output.
122 @abstractmethod 123 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 124 """ 125 Return the job's manual stop time. 126 """
Return the job's manual stop time.
128 @abstractmethod 129 async def monitor_logs_async( 130 self, 131 name: str, 132 callback_function: Callable[[Any], Any], 133 input_callback_function: Callable[[], str], 134 stop_callback_function: Callable[[SuccessTuple], str], 135 stop_on_exit: bool = False, 136 strip_timestamps: bool = False, 137 accept_input: bool = True, 138 debug: bool = False, 139 ): 140 """ 141 Monitor a job's log files and await a callback with the changes. 142 """
Monitor a job's log files and await a callback with the changes.
144 @abstractmethod 145 def monitor_logs(self, *args, **kwargs): 146 """ 147 Monitor a job's log files. 148 """
Monitor a job's log files.
150 @abstractmethod 151 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 152 """ 153 Return whether a job is blocking on stdin. 154 """
Return whether a job is blocking on stdin.
156 @abstractmethod 157 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 158 """ 159 Return the kwargs to the blocking prompt. 160 """
Return the kwargs to the blocking prompt.
297def check_restart_jobs( 298 executor_keys: Optional[str] = 'local', 299 jobs: Optional[Dict[str, Job]] = None, 300 include_hidden: bool = True, 301 silent: bool = False, 302 debug: bool = False, 303) -> SuccessTuple: 304 """ 305 Restart any stopped jobs which were created with `--restart`. 306 307 Parameters 308 ---------- 309 executor_keys: Optional[str], default None 310 If provided, check jobs on the given remote API instance. 311 Otherwise check local jobs. 312 313 include_hidden: bool, default True 314 If `True`, include hidden jobs in the check. 315 316 silent: bool, default False 317 If `True`, do not print the restart success message. 318 """ 319 from meerschaum.utils.misc import items_str 320 321 if jobs is None: 322 jobs = get_jobs( 323 executor_keys, 324 include_hidden=include_hidden, 325 combine_local_and_systemd=False, 326 debug=debug, 327 ) 328 329 if not jobs: 330 return True, "No jobs to restart." 331 332 results = {} 333 for name, job in jobs.items(): 334 check_success, check_msg = job.check_restart() 335 results[job.name] = (check_success, check_msg) 336 if not silent: 337 mrsm.pprint((check_success, check_msg)) 338 339 success_names = [name for name, (check_success, check_msg) in results.items() if check_success] 340 fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success] 341 success = len(success_names) == len(jobs) 342 msg = ( 343 ( 344 "Successfully restarted job" 345 + ('s' if len(success_names) != 1 else '') 346 + ' ' + items_str(success_names) + '.' 347 ) 348 if success 349 else ( 350 "Failed to restart job" 351 + ('s' if len(success_names) != 1 else '') 352 + ' ' + items_str(fail_names) + '.' 353 ) 354 ) 355 return success, msg
Restart any stopped jobs which were created with --restart
.
Parameters
- executor_keys (Optional[str], default None): If provided, check jobs on the given remote API instance. Otherwise check local jobs.
- include_hidden (bool, default True):
If
True
, include hidden jobs in the check. - silent (bool, default False):
If
True
, do not print the restart success message.
367def start_check_jobs_thread(): 368 """ 369 Start a thread to regularly monitor jobs. 370 """ 371 import atexit 372 from functools import partial 373 from meerschaum.utils.threading import RepeatTimer 374 from meerschaum._internal.static import STATIC_CONFIG 375 376 global _check_loop_stop_thread 377 sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds'] 378 379 _check_loop_stop_thread = RepeatTimer( 380 sleep_seconds, 381 partial( 382 _check_restart_jobs_against_lock, 383 silent=True, 384 ) 385 ) 386 _check_loop_stop_thread.daemon = True 387 atexit.register(stop_check_jobs_thread) 388 389 _check_loop_stop_thread.start() 390 return _check_loop_stop_thread
Start a thread to regularly monitor jobs.
393def stop_check_jobs_thread(): 394 """ 395 Stop the job monitoring thread. 396 """ 397 from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH 398 from meerschaum.utils.warnings import warn 399 if _check_loop_stop_thread is None: 400 return 401 402 _check_loop_stop_thread.cancel() 403 404 try: 405 if CHECK_JOBS_LOCK_PATH.exists(): 406 CHECK_JOBS_LOCK_PATH.unlink() 407 except Exception as e: 408 warn(f"Failed to remove check jobs lock file:\n{e}")
Stop the job monitoring thread.