meerschaum.jobs
Higher-level utilities for managing meerschaum.utils.daemon.Daemon
.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Higher-level utilities for managing `meerschaum.utils.daemon.Daemon`. 7""" 8 9import meerschaum as mrsm 10from meerschaum.utils.typing import Dict, Optional, List, SuccessTuple 11 12from meerschaum.jobs._Job import Job, StopMonitoringLogs 13from meerschaum.jobs._Executor import Executor 14 15__all__ = ( 16 'Job', 17 'StopMonitoringLogs', 18 'systemd', 19 'get_jobs', 20 'get_filtered_jobs', 21 'get_restart_jobs', 22 'get_running_jobs', 23 'get_stopped_jobs', 24 'get_paused_jobs', 25 'get_restart_jobs', 26 'make_executor', 27 'Executor', 28 'check_restart_jobs', 29 'start_check_jobs_thread', 30 'stop_check_jobs_thread', 31) 32 33executor_types: List[str] = ['api', 'local'] 34 35 36def get_jobs( 37 executor_keys: Optional[str] = None, 38 include_hidden: bool = False, 39 combine_local_and_systemd: bool = True, 40 debug: bool = False, 41) -> Dict[str, Job]: 42 """ 43 Return a dictionary of the existing jobs. 44 45 Parameters 46 ---------- 47 executor_keys: Optional[str], default None 48 If provided, return remote jobs on the given API instance. 49 Otherwise return local jobs. 50 51 include_hidden: bool, default False 52 If `True`, include jobs with the `hidden` attribute. 53 54 Returns 55 ------- 56 A dictionary mapping job names to jobs. 57 """ 58 from meerschaum.connectors.parse import parse_executor_keys 59 executor_keys = executor_keys or get_executor_keys_from_context() 60 61 include_local_and_system = ( 62 combine_local_and_systemd 63 and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd') 64 and get_executor_keys_from_context() == 'systemd' 65 ) 66 67 def _get_local_jobs(): 68 from meerschaum.utils.daemon import get_daemons 69 daemons = get_daemons() 70 jobs = { 71 daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local') 72 for daemon in daemons 73 } 74 return { 75 name: job 76 for name, job in jobs.items() 77 if (include_hidden or not job.hidden) and not job._is_externally_managed 78 79 } 80 81 def _get_systemd_jobs(): 82 conn = mrsm.get_connector('systemd') 83 jobs = conn.get_jobs(debug=debug) 84 return { 85 name: job 86 for name, job in jobs.items() 87 if include_hidden or not job.hidden 88 } 89 90 if include_local_and_system: 91 local_jobs = _get_local_jobs() 92 systemd_jobs = _get_systemd_jobs() 93 shared_jobs = set(local_jobs) & set(systemd_jobs) 94 if shared_jobs: 95 from meerschaum.utils.misc import items_str 96 from meerschaum.utils.warnings import warn 97 warn( 98 "Job" 99 + ('s' if len(shared_jobs) != 1 else '') 100 + f" {items_str(list(shared_jobs))} " 101 + "exist" 102 + ('s' if len(shared_jobs) == 1 else '') 103 + " in both `local` and `systemd`.", 104 stack=False, 105 ) 106 return {**local_jobs, **systemd_jobs} 107 108 if executor_keys == 'local': 109 return _get_local_jobs() 110 111 if executor_keys == 'systemd': 112 return _get_systemd_jobs() 113 114 try: 115 _ = parse_executor_keys(executor_keys, construct=False) 116 conn = parse_executor_keys(executor_keys) 117 jobs = conn.get_jobs(debug=debug) 118 return { 119 name: job 120 for name, job in jobs.items() 121 if include_hidden or not job.hidden 122 } 123 except Exception: 124 return {} 125 126 127def get_filtered_jobs( 128 executor_keys: Optional[str] = None, 129 filter_list: Optional[List[str]] = None, 130 include_hidden: bool = False, 131 combine_local_and_systemd: bool = True, 132 warn: bool = False, 133 debug: bool = False, 134) -> Dict[str, Job]: 135 """ 136 Return a list of jobs filtered by the user. 137 """ 138 from meerschaum.utils.warnings import warn as _warn 139 jobs = get_jobs( 140 executor_keys, 141 include_hidden=True, 142 combine_local_and_systemd=combine_local_and_systemd, 143 debug=debug, 144 ) 145 if not filter_list: 146 return { 147 name: job 148 for name, job in jobs.items() 149 if include_hidden or not job.hidden 150 } 151 152 jobs_to_return = {} 153 filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')] 154 filter_list_with_underscores = [name for name in filter_list if name.startswith('_')] 155 if ( 156 filter_list_without_underscores and not filter_list_with_underscores 157 or filter_list_with_underscores and not filter_list_without_underscores 158 ): 159 pass 160 for name in filter_list: 161 job = jobs.get(name, None) 162 if job is None: 163 if warn: 164 _warn( 165 f"Job '{name}' does not exist.", 166 stack=False, 167 ) 168 continue 169 jobs_to_return[name] = job 170 171 if not jobs_to_return and filter_list_with_underscores: 172 names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores] 173 return { 174 name: job 175 for name, job in jobs.items() 176 if name not in names_to_exclude 177 } 178 179 return jobs_to_return 180 181 182def get_restart_jobs( 183 executor_keys: Optional[str] = None, 184 jobs: Optional[Dict[str, Job]] = None, 185 include_hidden: bool = False, 186 combine_local_and_systemd: bool = True, 187 debug: bool = False, 188) -> Dict[str, Job]: 189 """ 190 Return jobs which were created with `--restart` or `--loop`. 191 """ 192 if jobs is None: 193 jobs = get_jobs( 194 executor_keys, 195 include_hidden=include_hidden, 196 combine_local_and_systemd=combine_local_and_systemd, 197 debug=debug, 198 ) 199 200 return { 201 name: job 202 for name, job in jobs.items() 203 if job.restart 204 } 205 206 207def get_running_jobs( 208 executor_keys: Optional[str] = None, 209 jobs: Optional[Dict[str, Job]] = None, 210 include_hidden: bool = False, 211 combine_local_and_systemd: bool = True, 212 debug: bool = False, 213) -> Dict[str, Job]: 214 """ 215 Return a dictionary of running jobs. 216 """ 217 if jobs is None: 218 jobs = get_jobs( 219 executor_keys, 220 include_hidden=include_hidden, 221 combine_local_and_systemd=combine_local_and_systemd, 222 debug=debug, 223 ) 224 225 return { 226 name: job 227 for name, job in jobs.items() 228 if job.status == 'running' 229 } 230 231 232def get_paused_jobs( 233 executor_keys: Optional[str] = None, 234 jobs: Optional[Dict[str, Job]] = None, 235 include_hidden: bool = False, 236 combine_local_and_systemd: bool = True, 237 debug: bool = False, 238) -> Dict[str, Job]: 239 """ 240 Return a dictionary of paused jobs. 241 """ 242 if jobs is None: 243 jobs = get_jobs( 244 executor_keys, 245 include_hidden=include_hidden, 246 combine_local_and_systemd=combine_local_and_systemd, 247 debug=debug, 248 ) 249 250 return { 251 name: job 252 for name, job in jobs.items() 253 if job.status == 'paused' 254 } 255 256 257def get_stopped_jobs( 258 executor_keys: Optional[str] = None, 259 jobs: Optional[Dict[str, Job]] = None, 260 include_hidden: bool = False, 261 combine_local_and_systemd: bool = True, 262 debug: bool = False, 263) -> Dict[str, Job]: 264 """ 265 Return a dictionary of stopped jobs. 266 """ 267 if jobs is None: 268 jobs = get_jobs( 269 executor_keys, 270 include_hidden=include_hidden, 271 combine_local_and_systemd=combine_local_and_systemd, 272 debug=debug, 273 ) 274 275 return { 276 name: job 277 for name, job in jobs.items() 278 if job.status == 'stopped' 279 } 280 281 282def make_executor(cls): 283 """ 284 Register a class as an `Executor`. 285 """ 286 import re 287 from meerschaum.connectors import make_connector 288 suffix_regex = r'executor$' 289 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 290 if typ not in executor_types: 291 executor_types.append(typ) 292 return make_connector(cls, _is_executor=True) 293 294 295def check_restart_jobs( 296 executor_keys: Optional[str] = 'local', 297 jobs: Optional[Dict[str, Job]] = None, 298 include_hidden: bool = True, 299 silent: bool = False, 300 debug: bool = False, 301) -> SuccessTuple: 302 """ 303 Restart any stopped jobs which were created with `--restart`. 304 305 Parameters 306 ---------- 307 executor_keys: Optional[str], default None 308 If provided, check jobs on the given remote API instance. 309 Otherwise check local jobs. 310 311 include_hidden: bool, default True 312 If `True`, include hidden jobs in the check. 313 314 silent: bool, default False 315 If `True`, do not print the restart success message. 316 """ 317 from meerschaum.utils.misc import items_str 318 319 if jobs is None: 320 jobs = get_jobs( 321 executor_keys, 322 include_hidden=include_hidden, 323 combine_local_and_systemd=False, 324 debug=debug, 325 ) 326 327 if not jobs: 328 return True, "No jobs to restart." 329 330 results = {} 331 for name, job in jobs.items(): 332 check_success, check_msg = job.check_restart() 333 results[job.name] = (check_success, check_msg) 334 if not silent: 335 mrsm.pprint((check_success, check_msg)) 336 337 success_names = [name for name, (check_success, check_msg) in results.items() if check_success] 338 fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success] 339 success = len(success_names) == len(jobs) 340 msg = ( 341 ( 342 "Successfully restarted job" 343 + ('s' if len(success_names) != 1 else '') 344 + ' ' + items_str(success_names) + '.' 345 ) 346 if success 347 else ( 348 "Failed to restart job" 349 + ('s' if len(success_names) != 1 else '') 350 + ' ' + items_str(fail_names) + '.' 351 ) 352 ) 353 return success, msg 354 355 356def _check_restart_jobs_against_lock(*args, **kwargs): 357 from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH 358 fasteners = mrsm.attempt_import('fasteners', lazy=False) 359 lock = fasteners.InterProcessLock(CHECK_JOBS_LOCK_PATH) 360 with lock: 361 check_restart_jobs(*args, **kwargs) 362 363 364_check_loop_stop_thread = None 365def start_check_jobs_thread(): 366 """ 367 Start a thread to regularly monitor jobs. 368 """ 369 import atexit 370 from functools import partial 371 from meerschaum.utils.threading import RepeatTimer 372 from meerschaum.config.static import STATIC_CONFIG 373 374 global _check_loop_stop_thread 375 sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds'] 376 377 _check_loop_stop_thread = RepeatTimer( 378 sleep_seconds, 379 partial( 380 _check_restart_jobs_against_lock, 381 silent=True, 382 ) 383 ) 384 _check_loop_stop_thread.daemon = True 385 atexit.register(stop_check_jobs_thread) 386 387 _check_loop_stop_thread.start() 388 return _check_loop_stop_thread 389 390 391def stop_check_jobs_thread(): 392 """ 393 Stop the job monitoring thread. 394 """ 395 from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH 396 from meerschaum.utils.warnings import warn 397 if _check_loop_stop_thread is None: 398 return 399 400 _check_loop_stop_thread.cancel() 401 402 try: 403 if CHECK_JOBS_LOCK_PATH.exists(): 404 CHECK_JOBS_LOCK_PATH.unlink() 405 except Exception as e: 406 warn(f"Failed to remove check jobs lock file:\n{e}") 407 408 409_context_keys = None 410def get_executor_keys_from_context() -> str: 411 """ 412 If we are running on the host with the default root, default to `'systemd'`. 413 Otherwise return `'local'`. 414 """ 415 global _context_keys 416 417 if _context_keys is not None: 418 return _context_keys 419 420 from meerschaum.config import get_config 421 from meerschaum.config.paths import ROOT_DIR_PATH, DEFAULT_ROOT_DIR_PATH 422 from meerschaum.utils.misc import is_systemd_available 423 424 configured_executor = get_config('meerschaum', 'executor', warn=False) 425 if configured_executor is not None: 426 return configured_executor 427 428 _context_keys = ( 429 'systemd' 430 if is_systemd_available() and ROOT_DIR_PATH == DEFAULT_ROOT_DIR_PATH 431 else 'local' 432 ) 433 return _context_keys 434 435 436def _install_healthcheck_job() -> SuccessTuple: 437 """ 438 Install the systemd job which checks local jobs. 439 """ 440 from meerschaum.config import get_config 441 442 enable_healthcheck = get_config('system', 'experimental', 'systemd_healthcheck') 443 if not enable_healthcheck: 444 return False, "Local healthcheck is disabled." 445 446 if get_executor_keys_from_context() != 'systemd': 447 return False, "Not running systemd." 448 449 job = Job( 450 '.local-healthcheck', 451 ['restart', 'jobs', '-e', 'local', '--loop', '--min-seconds', '60'], 452 executor_keys='systemd', 453 ) 454 return job.start()
50class Job: 51 """ 52 Manage a `meerschaum.utils.daemon.Daemon`, locally or remotely via the API. 53 """ 54 55 def __init__( 56 self, 57 name: str, 58 sysargs: Union[List[str], str, None] = None, 59 env: Optional[Dict[str, str]] = None, 60 executor_keys: Optional[str] = None, 61 delete_after_completion: bool = False, 62 _properties: Optional[Dict[str, Any]] = None, 63 _rotating_log=None, 64 _stdin_file=None, 65 _status_hook: Optional[Callable[[], str]] = None, 66 _result_hook: Optional[Callable[[], SuccessTuple]] = None, 67 _externally_managed: bool = False, 68 ): 69 """ 70 Create a new job to manage a `meerschaum.utils.daemon.Daemon`. 71 72 Parameters 73 ---------- 74 name: str 75 The name of the job to be created. 76 This will also be used as the Daemon ID. 77 78 sysargs: Union[List[str], str, None], default None 79 The sysargs of the command to be executed, e.g. 'start api'. 80 81 env: Optional[Dict[str, str]], default None 82 If provided, set these environment variables in the job's process. 83 84 executor_keys: Optional[str], default None 85 If provided, execute the job remotely on an API instance, e.g. 'api:main'. 86 87 delete_after_completion: bool, default False 88 If `True`, delete this job when it has finished executing. 89 90 _properties: Optional[Dict[str, Any]], default None 91 If provided, use this to patch the daemon's properties. 92 """ 93 from meerschaum.utils.daemon import Daemon 94 for char in BANNED_CHARS: 95 if char in name: 96 raise ValueError(f"Invalid name: ({char}) is not allowed.") 97 98 if isinstance(sysargs, str): 99 sysargs = shlex.split(sysargs) 100 101 and_key = STATIC_CONFIG['system']['arguments']['and_key'] 102 escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key'] 103 if sysargs: 104 sysargs = [ 105 (arg if arg != escaped_and_key else and_key) 106 for arg in sysargs 107 ] 108 109 ### NOTE: 'local' and 'systemd' executors are being coalesced. 110 if executor_keys is None: 111 from meerschaum.jobs import get_executor_keys_from_context 112 executor_keys = get_executor_keys_from_context() 113 114 self.executor_keys = executor_keys 115 self.name = name 116 try: 117 self._daemon = ( 118 Daemon(daemon_id=name) 119 if executor_keys == 'local' 120 else None 121 ) 122 except Exception: 123 self._daemon = None 124 125 ### Handle any injected dependencies. 126 if _rotating_log is not None: 127 self._rotating_log = _rotating_log 128 if self._daemon is not None: 129 self._daemon._rotating_log = _rotating_log 130 131 if _stdin_file is not None: 132 self._stdin_file = _stdin_file 133 if self._daemon is not None: 134 self._daemon._stdin_file = _stdin_file 135 self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path 136 137 if _status_hook is not None: 138 self._status_hook = _status_hook 139 140 if _result_hook is not None: 141 self._result_hook = _result_hook 142 143 self._externally_managed = _externally_managed 144 self._properties_patch = _properties or {} 145 if _externally_managed: 146 self._properties_patch.update({'externally_managed': _externally_managed}) 147 148 if env: 149 self._properties_patch.update({'env': env}) 150 151 if delete_after_completion: 152 self._properties_patch.update({'delete_after_completion': delete_after_completion}) 153 154 daemon_sysargs = ( 155 self._daemon.properties.get('target', {}).get('args', [None])[0] 156 if self._daemon is not None 157 else None 158 ) 159 160 if daemon_sysargs and sysargs and daemon_sysargs != sysargs: 161 warn("Given sysargs differ from existing sysargs.") 162 163 self._sysargs = [ 164 arg 165 for arg in (daemon_sysargs or sysargs or []) 166 if arg not in ('-d', '--daemon') 167 ] 168 for restart_flag in RESTART_FLAGS: 169 if restart_flag in self._sysargs: 170 self._properties_patch.update({'restart': True}) 171 break 172 173 @staticmethod 174 def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job: 175 """ 176 Build a `Job` from the PID of a running Meerschaum process. 177 178 Parameters 179 ---------- 180 pid: int 181 The PID of the process. 182 183 executor_keys: Optional[str], default None 184 The executor keys to assign to the job. 185 """ 186 from meerschaum.config.paths import DAEMON_RESOURCES_PATH 187 188 psutil = mrsm.attempt_import('psutil') 189 try: 190 process = psutil.Process(pid) 191 except psutil.NoSuchProcess as e: 192 warn(f"Process with PID {pid} does not exist.", stack=False) 193 raise e 194 195 command_args = process.cmdline() 196 is_daemon = command_args[1] == '-c' 197 198 if is_daemon: 199 daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '') 200 root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None) 201 if root_dir is None: 202 from meerschaum.config.paths import ROOT_DIR_PATH 203 root_dir = ROOT_DIR_PATH 204 else: 205 root_dir = pathlib.Path(root_dir) 206 jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name 207 daemon_dir = jobs_dir / daemon_id 208 pid_file = daemon_dir / 'process.pid' 209 210 if pid_file.exists(): 211 with open(pid_file, 'r', encoding='utf-8') as f: 212 daemon_pid = int(f.read()) 213 214 if pid != daemon_pid: 215 raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}") 216 else: 217 raise EnvironmentError(f"Is job '{daemon_id}' running?") 218 219 return Job(daemon_id, executor_keys=executor_keys) 220 221 from meerschaum._internal.arguments._parse_arguments import parse_arguments 222 from meerschaum.utils.daemon import get_new_daemon_name 223 224 mrsm_ix = 0 225 for i, arg in enumerate(command_args): 226 if 'mrsm' in arg or 'meerschaum' in arg.lower(): 227 mrsm_ix = i 228 break 229 230 sysargs = command_args[mrsm_ix+1:] 231 kwargs = parse_arguments(sysargs) 232 name = kwargs.get('name', get_new_daemon_name()) 233 return Job(name, sysargs, executor_keys=executor_keys) 234 235 def start(self, debug: bool = False) -> SuccessTuple: 236 """ 237 Start the job's daemon. 238 """ 239 if self.executor is not None: 240 if not self.exists(debug=debug): 241 return self.executor.create_job( 242 self.name, 243 self.sysargs, 244 properties=self.daemon.properties, 245 debug=debug, 246 ) 247 return self.executor.start_job(self.name, debug=debug) 248 249 if self.is_running(): 250 return True, f"{self} is already running." 251 252 success, msg = self.daemon.run( 253 keep_daemon_output=(not self.delete_after_completion), 254 allow_dirty_run=True, 255 ) 256 if not success: 257 return success, msg 258 259 return success, f"Started {self}." 260 261 def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple: 262 """ 263 Stop the job's daemon. 264 """ 265 if self.executor is not None: 266 return self.executor.stop_job(self.name, debug=debug) 267 268 if self.daemon.status == 'stopped': 269 if not self.restart: 270 return True, f"{self} is not running." 271 elif self.stop_time is not None: 272 return True, f"{self} will not restart until manually started." 273 274 quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds) 275 if quit_success: 276 return quit_success, f"Stopped {self}." 277 278 warn( 279 f"Failed to gracefully quit {self}.", 280 stack=False, 281 ) 282 kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds) 283 if not kill_success: 284 return kill_success, kill_msg 285 286 return kill_success, f"Killed {self}." 287 288 def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple: 289 """ 290 Pause the job's daemon. 291 """ 292 if self.executor is not None: 293 return self.executor.pause_job(self.name, debug=debug) 294 295 pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds) 296 if not pause_success: 297 return pause_success, pause_msg 298 299 return pause_success, f"Paused {self}." 300 301 def delete(self, debug: bool = False) -> SuccessTuple: 302 """ 303 Delete the job and its daemon. 304 """ 305 if self.executor is not None: 306 return self.executor.delete_job(self.name, debug=debug) 307 308 if self.is_running(): 309 stop_success, stop_msg = self.stop() 310 if not stop_success: 311 return stop_success, stop_msg 312 313 cleanup_success, cleanup_msg = self.daemon.cleanup() 314 if not cleanup_success: 315 return cleanup_success, cleanup_msg 316 317 _ = self.daemon._properties.pop('result', None) 318 return cleanup_success, f"Deleted {self}." 319 320 def is_running(self) -> bool: 321 """ 322 Determine whether the job's daemon is running. 323 """ 324 return self.status == 'running' 325 326 def exists(self, debug: bool = False) -> bool: 327 """ 328 Determine whether the job exists. 329 """ 330 if self.executor is not None: 331 return self.executor.get_job_exists(self.name, debug=debug) 332 333 return self.daemon.path.exists() 334 335 def get_logs(self) -> Union[str, None]: 336 """ 337 Return the output text of the job's daemon. 338 """ 339 if self.executor is not None: 340 return self.executor.get_logs(self.name) 341 342 return self.daemon.log_text 343 344 def monitor_logs( 345 self, 346 callback_function: Callable[[str], None] = partial(print, end=''), 347 input_callback_function: Optional[Callable[[], str]] = None, 348 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 349 stop_event: Optional[asyncio.Event] = None, 350 stop_on_exit: bool = False, 351 strip_timestamps: bool = False, 352 accept_input: bool = True, 353 debug: bool = False, 354 ): 355 """ 356 Monitor the job's log files and execute a callback on new lines. 357 358 Parameters 359 ---------- 360 callback_function: Callable[[str], None], default partial(print, end='') 361 The callback to execute as new data comes in. 362 Defaults to printing the output directly to `stdout`. 363 364 input_callback_function: Optional[Callable[[], str]], default None 365 If provided, execute this callback when the daemon is blocking on stdin. 366 Defaults to `sys.stdin.readline()`. 367 368 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 369 If provided, execute this callback when the daemon stops. 370 The job's SuccessTuple will be passed to the callback. 371 372 stop_event: Optional[asyncio.Event], default None 373 If provided, stop monitoring when this event is set. 374 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 375 from within `callback_function` to stop monitoring. 376 377 stop_on_exit: bool, default False 378 If `True`, stop monitoring when the job stops. 379 380 strip_timestamps: bool, default False 381 If `True`, remove leading timestamps from lines. 382 383 accept_input: bool, default True 384 If `True`, accept input when the daemon blocks on stdin. 385 """ 386 def default_input_callback_function(): 387 return sys.stdin.readline() 388 389 if input_callback_function is None: 390 input_callback_function = default_input_callback_function 391 392 if self.executor is not None: 393 self.executor.monitor_logs( 394 self.name, 395 callback_function, 396 input_callback_function=input_callback_function, 397 stop_callback_function=stop_callback_function, 398 stop_on_exit=stop_on_exit, 399 accept_input=accept_input, 400 strip_timestamps=strip_timestamps, 401 debug=debug, 402 ) 403 return 404 405 monitor_logs_coroutine = self.monitor_logs_async( 406 callback_function=callback_function, 407 input_callback_function=input_callback_function, 408 stop_callback_function=stop_callback_function, 409 stop_event=stop_event, 410 stop_on_exit=stop_on_exit, 411 strip_timestamps=strip_timestamps, 412 accept_input=accept_input, 413 ) 414 return asyncio.run(monitor_logs_coroutine) 415 416 async def monitor_logs_async( 417 self, 418 callback_function: Callable[[str], None] = partial(print, end='', flush=True), 419 input_callback_function: Optional[Callable[[], str]] = None, 420 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 421 stop_event: Optional[asyncio.Event] = None, 422 stop_on_exit: bool = False, 423 strip_timestamps: bool = False, 424 accept_input: bool = True, 425 _logs_path: Optional[pathlib.Path] = None, 426 _log=None, 427 _stdin_file=None, 428 debug: bool = False, 429 ): 430 """ 431 Monitor the job's log files and await a callback on new lines. 432 433 Parameters 434 ---------- 435 callback_function: Callable[[str], None], default partial(print, end='') 436 The callback to execute as new data comes in. 437 Defaults to printing the output directly to `stdout`. 438 439 input_callback_function: Optional[Callable[[], str]], default None 440 If provided, execute this callback when the daemon is blocking on stdin. 441 Defaults to `sys.stdin.readline()`. 442 443 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 444 If provided, execute this callback when the daemon stops. 445 The job's SuccessTuple will be passed to the callback. 446 447 stop_event: Optional[asyncio.Event], default None 448 If provided, stop monitoring when this event is set. 449 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 450 from within `callback_function` to stop monitoring. 451 452 stop_on_exit: bool, default False 453 If `True`, stop monitoring when the job stops. 454 455 strip_timestamps: bool, default False 456 If `True`, remove leading timestamps from lines. 457 458 accept_input: bool, default True 459 If `True`, accept input when the daemon blocks on stdin. 460 """ 461 def default_input_callback_function(): 462 return sys.stdin.readline() 463 464 if input_callback_function is None: 465 input_callback_function = default_input_callback_function 466 467 if self.executor is not None: 468 await self.executor.monitor_logs_async( 469 self.name, 470 callback_function, 471 input_callback_function=input_callback_function, 472 stop_callback_function=stop_callback_function, 473 stop_on_exit=stop_on_exit, 474 strip_timestamps=strip_timestamps, 475 accept_input=accept_input, 476 debug=debug, 477 ) 478 return 479 480 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 481 482 events = { 483 'user': stop_event, 484 'stopped': asyncio.Event(), 485 } 486 combined_event = asyncio.Event() 487 emitted_text = False 488 stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file 489 490 async def check_job_status(): 491 nonlocal emitted_text 492 stopped_event = events.get('stopped', None) 493 if stopped_event is None: 494 return 495 496 sleep_time = 0.1 497 while sleep_time < 60: 498 if self.status == 'stopped': 499 if not emitted_text: 500 await asyncio.sleep(sleep_time) 501 sleep_time = round(sleep_time * 1.1, 2) 502 continue 503 504 if stop_callback_function is not None: 505 try: 506 if asyncio.iscoroutinefunction(stop_callback_function): 507 await stop_callback_function(self.result) 508 else: 509 stop_callback_function(self.result) 510 except asyncio.exceptions.CancelledError: 511 break 512 except Exception: 513 warn(traceback.format_exc()) 514 515 if stop_on_exit: 516 events['stopped'].set() 517 518 break 519 await asyncio.sleep(0.1) 520 521 async def check_blocking_on_input(): 522 while True: 523 if not emitted_text or not self.is_blocking_on_stdin(): 524 try: 525 await asyncio.sleep(0.1) 526 except asyncio.exceptions.CancelledError: 527 break 528 continue 529 530 if not self.is_running(): 531 break 532 533 await emit_latest_lines() 534 535 try: 536 print('', end='', flush=True) 537 if asyncio.iscoroutinefunction(input_callback_function): 538 data = await input_callback_function() 539 else: 540 data = input_callback_function() 541 except KeyboardInterrupt: 542 break 543 if not data.endswith('\n'): 544 data += '\n' 545 546 stdin_file.write(data) 547 await asyncio.sleep(0.1) 548 549 async def combine_events(): 550 event_tasks = [ 551 asyncio.create_task(event.wait()) 552 for event in events.values() 553 if event is not None 554 ] 555 if not event_tasks: 556 return 557 558 try: 559 done, pending = await asyncio.wait( 560 event_tasks, 561 return_when=asyncio.FIRST_COMPLETED, 562 ) 563 for task in pending: 564 task.cancel() 565 except asyncio.exceptions.CancelledError: 566 pass 567 finally: 568 combined_event.set() 569 570 check_job_status_task = asyncio.create_task(check_job_status()) 571 check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input()) 572 combine_events_task = asyncio.create_task(combine_events()) 573 574 log = _log if _log is not None else self.daemon.rotating_log 575 lines_to_show = get_config('jobs', 'logs', 'lines_to_show') 576 577 async def emit_latest_lines(): 578 nonlocal emitted_text 579 lines = log.readlines() 580 for line in lines[(-1 * lines_to_show):]: 581 if stop_event is not None and stop_event.is_set(): 582 return 583 584 if strip_timestamps: 585 line = strip_timestamp_from_line(line) 586 587 try: 588 if asyncio.iscoroutinefunction(callback_function): 589 await callback_function(line) 590 else: 591 callback_function(line) 592 emitted_text = True 593 except StopMonitoringLogs: 594 return 595 except Exception: 596 warn(f"Error in logs callback:\n{traceback.format_exc()}") 597 598 await emit_latest_lines() 599 600 tasks = ( 601 [check_job_status_task] 602 + ([check_blocking_on_input_task] if accept_input else []) 603 + [combine_events_task] 604 ) 605 try: 606 _ = asyncio.gather(*tasks, return_exceptions=True) 607 except asyncio.exceptions.CancelledError: 608 raise 609 except Exception: 610 warn(f"Failed to run async checks:\n{traceback.format_exc()}") 611 612 watchfiles = mrsm.attempt_import('watchfiles') 613 async for changes in watchfiles.awatch( 614 _logs_path or LOGS_RESOURCES_PATH, 615 stop_event=combined_event, 616 ): 617 for change in changes: 618 file_path_str = change[1] 619 file_path = pathlib.Path(file_path_str) 620 latest_subfile_path = log.get_latest_subfile_path() 621 if latest_subfile_path != file_path: 622 continue 623 624 await emit_latest_lines() 625 626 await emit_latest_lines() 627 628 def is_blocking_on_stdin(self, debug: bool = False) -> bool: 629 """ 630 Return whether a job's daemon is blocking on stdin. 631 """ 632 if self.executor is not None: 633 return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug) 634 635 return self.is_running() and self.daemon.blocking_stdin_file_path.exists() 636 637 def write_stdin(self, data): 638 """ 639 Write to a job's daemon's `stdin`. 640 """ 641 self.daemon.stdin_file.write(data) 642 643 @property 644 def executor(self) -> Union[Executor, None]: 645 """ 646 If the job is remote, return the connector to the remote API instance. 647 """ 648 return ( 649 mrsm.get_connector(self.executor_keys) 650 if self.executor_keys != 'local' 651 else None 652 ) 653 654 @property 655 def status(self) -> str: 656 """ 657 Return the running status of the job's daemon. 658 """ 659 if '_status_hook' in self.__dict__: 660 return self._status_hook() 661 662 if self.executor is not None: 663 return self.executor.get_job_status(self.name) 664 665 return self.daemon.status 666 667 @property 668 def pid(self) -> Union[int, None]: 669 """ 670 Return the PID of the job's dameon. 671 """ 672 if self.executor is not None: 673 return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None) 674 675 return self.daemon.pid 676 677 @property 678 def restart(self) -> bool: 679 """ 680 Return whether to restart a stopped job. 681 """ 682 if self.executor is not None: 683 return self.executor.get_job_metadata(self.name).get('restart', False) 684 685 return self.daemon.properties.get('restart', False) 686 687 @property 688 def result(self) -> SuccessTuple: 689 """ 690 Return the `SuccessTuple` when the job has terminated. 691 """ 692 if self.is_running(): 693 return True, f"{self} is running." 694 695 if '_result_hook' in self.__dict__: 696 return self._result_hook() 697 698 if self.executor is not None: 699 return ( 700 self.executor.get_job_metadata(self.name) 701 .get('result', (False, "No result available.")) 702 ) 703 704 _result = self.daemon.properties.get('result', None) 705 if _result is None: 706 return False, "No result available." 707 708 return tuple(_result) 709 710 @property 711 def sysargs(self) -> List[str]: 712 """ 713 Return the sysargs to use for the Daemon. 714 """ 715 if self._sysargs: 716 return self._sysargs 717 718 if self.executor is not None: 719 return self.executor.get_job_metadata(self.name).get('sysargs', []) 720 721 target_args = self.daemon.target_args 722 if target_args is None: 723 return [] 724 self._sysargs = target_args[0] if len(target_args) > 0 else [] 725 return self._sysargs 726 727 @property 728 def daemon(self) -> 'Daemon': 729 """ 730 Return the daemon which this job manages. 731 """ 732 from meerschaum.utils.daemon import Daemon 733 if self._daemon is not None and self.executor is None and self._sysargs: 734 return self._daemon 735 736 remote_properties = ( 737 {} 738 if self.executor is None 739 else self.executor.get_job_properties(self.name) 740 ) 741 properties = {**remote_properties, **self._properties_patch} 742 743 self._daemon = Daemon( 744 target=entry, 745 target_args=[self._sysargs], 746 target_kw={}, 747 daemon_id=self.name, 748 label=shlex.join(self._sysargs), 749 properties=properties, 750 ) 751 if '_rotating_log' in self.__dict__: 752 self._daemon._rotating_log = self._rotating_log 753 754 if '_stdin_file' in self.__dict__: 755 self._daemon._stdin_file = self._stdin_file 756 self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path 757 758 return self._daemon 759 760 @property 761 def began(self) -> Union[datetime, None]: 762 """ 763 The datetime when the job began running. 764 """ 765 if self.executor is not None: 766 began_str = self.executor.get_job_began(self.name) 767 if began_str is None: 768 return None 769 return ( 770 datetime.fromisoformat(began_str) 771 .astimezone(timezone.utc) 772 .replace(tzinfo=None) 773 ) 774 775 began_str = self.daemon.properties.get('process', {}).get('began', None) 776 if began_str is None: 777 return None 778 779 return datetime.fromisoformat(began_str) 780 781 @property 782 def ended(self) -> Union[datetime, None]: 783 """ 784 The datetime when the job stopped running. 785 """ 786 if self.executor is not None: 787 ended_str = self.executor.get_job_ended(self.name) 788 if ended_str is None: 789 return None 790 return ( 791 datetime.fromisoformat(ended_str) 792 .astimezone(timezone.utc) 793 .replace(tzinfo=None) 794 ) 795 796 ended_str = self.daemon.properties.get('process', {}).get('ended', None) 797 if ended_str is None: 798 return None 799 800 return datetime.fromisoformat(ended_str) 801 802 @property 803 def paused(self) -> Union[datetime, None]: 804 """ 805 The datetime when the job was suspended while running. 806 """ 807 if self.executor is not None: 808 paused_str = self.executor.get_job_paused(self.name) 809 if paused_str is None: 810 return None 811 return ( 812 datetime.fromisoformat(paused_str) 813 .astimezone(timezone.utc) 814 .replace(tzinfo=None) 815 ) 816 817 paused_str = self.daemon.properties.get('process', {}).get('paused', None) 818 if paused_str is None: 819 return None 820 821 return datetime.fromisoformat(paused_str) 822 823 @property 824 def stop_time(self) -> Union[datetime, None]: 825 """ 826 Return the timestamp when the job was manually stopped. 827 """ 828 if self.executor is not None: 829 return self.executor.get_job_stop_time(self.name) 830 831 if not self.daemon.stop_path.exists(): 832 return None 833 834 stop_data = self.daemon._read_stop_file() 835 if not stop_data: 836 return None 837 838 stop_time_str = stop_data.get('stop_time', None) 839 if not stop_time_str: 840 warn(f"Could not read stop time for {self}.") 841 return None 842 843 return datetime.fromisoformat(stop_time_str) 844 845 @property 846 def hidden(self) -> bool: 847 """ 848 Return a bool indicating whether this job should be displayed. 849 """ 850 return ( 851 self.name.startswith('_') 852 or self.name.startswith('.') 853 or self._is_externally_managed 854 ) 855 856 def check_restart(self) -> SuccessTuple: 857 """ 858 If `restart` is `True` and the daemon is not running, 859 restart the job. 860 Do not restart if the job was manually stopped. 861 """ 862 if self.is_running(): 863 return True, f"{self} is running." 864 865 if not self.restart: 866 return True, f"{self} does not need to be restarted." 867 868 if self.stop_time is not None: 869 return True, f"{self} was manually stopped." 870 871 return self.start() 872 873 @property 874 def label(self) -> str: 875 """ 876 Return the job's Daemon label (joined sysargs). 877 """ 878 from meerschaum._internal.arguments import compress_pipeline_sysargs 879 sysargs = compress_pipeline_sysargs(self.sysargs) 880 return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip() 881 882 @property 883 def _externally_managed_file(self) -> pathlib.Path: 884 """ 885 Return the path to the externally managed file. 886 """ 887 return self.daemon.path / '.externally-managed' 888 889 def _set_externally_managed(self): 890 """ 891 Set this job as externally managed. 892 """ 893 self._externally_managed = True 894 try: 895 self._externally_managed_file.parent.mkdir(exist_ok=True, parents=True) 896 self._externally_managed_file.touch() 897 except Exception as e: 898 warn(e) 899 900 @property 901 def _is_externally_managed(self) -> bool: 902 """ 903 Return whether this job is externally managed. 904 """ 905 return self.executor_keys in (None, 'local') and ( 906 self._externally_managed or self._externally_managed_file.exists() 907 ) 908 909 @property 910 def env(self) -> Dict[str, str]: 911 """ 912 Return the environment variables to set for the job's process. 913 """ 914 if '_env' in self.__dict__: 915 return self.__dict__['_env'] 916 917 _env = self.daemon.properties.get('env', {}) 918 default_env = { 919 'PYTHONUNBUFFERED': '1', 920 'LINES': str(get_config('jobs', 'terminal', 'lines')), 921 'COLUMNS': str(get_config('jobs', 'terminal', 'columns')), 922 STATIC_CONFIG['environment']['noninteractive']: 'true', 923 } 924 self._env = {**default_env, **_env} 925 return self._env 926 927 @property 928 def delete_after_completion(self) -> bool: 929 """ 930 Return whether this job is configured to delete itself after completion. 931 """ 932 if '_delete_after_completion' in self.__dict__: 933 return self.__dict__.get('_delete_after_completion', False) 934 935 self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False) 936 return self._delete_after_completion 937 938 def __str__(self) -> str: 939 sysargs = self.sysargs 940 sysargs_str = shlex.join(sysargs) if sysargs else '' 941 job_str = f'Job("{self.name}"' 942 if sysargs_str: 943 job_str += f', "{sysargs_str}"' 944 945 job_str += ')' 946 return job_str 947 948 def __repr__(self) -> str: 949 return str(self) 950 951 def __hash__(self) -> int: 952 return hash(self.name)
Manage a meerschaum.utils.daemon.Daemon
, locally or remotely via the API.
55 def __init__( 56 self, 57 name: str, 58 sysargs: Union[List[str], str, None] = None, 59 env: Optional[Dict[str, str]] = None, 60 executor_keys: Optional[str] = None, 61 delete_after_completion: bool = False, 62 _properties: Optional[Dict[str, Any]] = None, 63 _rotating_log=None, 64 _stdin_file=None, 65 _status_hook: Optional[Callable[[], str]] = None, 66 _result_hook: Optional[Callable[[], SuccessTuple]] = None, 67 _externally_managed: bool = False, 68 ): 69 """ 70 Create a new job to manage a `meerschaum.utils.daemon.Daemon`. 71 72 Parameters 73 ---------- 74 name: str 75 The name of the job to be created. 76 This will also be used as the Daemon ID. 77 78 sysargs: Union[List[str], str, None], default None 79 The sysargs of the command to be executed, e.g. 'start api'. 80 81 env: Optional[Dict[str, str]], default None 82 If provided, set these environment variables in the job's process. 83 84 executor_keys: Optional[str], default None 85 If provided, execute the job remotely on an API instance, e.g. 'api:main'. 86 87 delete_after_completion: bool, default False 88 If `True`, delete this job when it has finished executing. 89 90 _properties: Optional[Dict[str, Any]], default None 91 If provided, use this to patch the daemon's properties. 92 """ 93 from meerschaum.utils.daemon import Daemon 94 for char in BANNED_CHARS: 95 if char in name: 96 raise ValueError(f"Invalid name: ({char}) is not allowed.") 97 98 if isinstance(sysargs, str): 99 sysargs = shlex.split(sysargs) 100 101 and_key = STATIC_CONFIG['system']['arguments']['and_key'] 102 escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key'] 103 if sysargs: 104 sysargs = [ 105 (arg if arg != escaped_and_key else and_key) 106 for arg in sysargs 107 ] 108 109 ### NOTE: 'local' and 'systemd' executors are being coalesced. 110 if executor_keys is None: 111 from meerschaum.jobs import get_executor_keys_from_context 112 executor_keys = get_executor_keys_from_context() 113 114 self.executor_keys = executor_keys 115 self.name = name 116 try: 117 self._daemon = ( 118 Daemon(daemon_id=name) 119 if executor_keys == 'local' 120 else None 121 ) 122 except Exception: 123 self._daemon = None 124 125 ### Handle any injected dependencies. 126 if _rotating_log is not None: 127 self._rotating_log = _rotating_log 128 if self._daemon is not None: 129 self._daemon._rotating_log = _rotating_log 130 131 if _stdin_file is not None: 132 self._stdin_file = _stdin_file 133 if self._daemon is not None: 134 self._daemon._stdin_file = _stdin_file 135 self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path 136 137 if _status_hook is not None: 138 self._status_hook = _status_hook 139 140 if _result_hook is not None: 141 self._result_hook = _result_hook 142 143 self._externally_managed = _externally_managed 144 self._properties_patch = _properties or {} 145 if _externally_managed: 146 self._properties_patch.update({'externally_managed': _externally_managed}) 147 148 if env: 149 self._properties_patch.update({'env': env}) 150 151 if delete_after_completion: 152 self._properties_patch.update({'delete_after_completion': delete_after_completion}) 153 154 daemon_sysargs = ( 155 self._daemon.properties.get('target', {}).get('args', [None])[0] 156 if self._daemon is not None 157 else None 158 ) 159 160 if daemon_sysargs and sysargs and daemon_sysargs != sysargs: 161 warn("Given sysargs differ from existing sysargs.") 162 163 self._sysargs = [ 164 arg 165 for arg in (daemon_sysargs or sysargs or []) 166 if arg not in ('-d', '--daemon') 167 ] 168 for restart_flag in RESTART_FLAGS: 169 if restart_flag in self._sysargs: 170 self._properties_patch.update({'restart': True}) 171 break
Create a new job to manage a meerschaum.utils.daemon.Daemon
.
Parameters
- name (str): The name of the job to be created. This will also be used as the Daemon ID.
- sysargs (Union[List[str], str, None], default None): The sysargs of the command to be executed, e.g. 'start api'.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the job's process.
- executor_keys (Optional[str], default None): If provided, execute the job remotely on an API instance, e.g. 'api:main'.
- delete_after_completion (bool, default False):
If
True
, delete this job when it has finished executing. - _properties (Optional[Dict[str, Any]], default None): If provided, use this to patch the daemon's properties.
173 @staticmethod 174 def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job: 175 """ 176 Build a `Job` from the PID of a running Meerschaum process. 177 178 Parameters 179 ---------- 180 pid: int 181 The PID of the process. 182 183 executor_keys: Optional[str], default None 184 The executor keys to assign to the job. 185 """ 186 from meerschaum.config.paths import DAEMON_RESOURCES_PATH 187 188 psutil = mrsm.attempt_import('psutil') 189 try: 190 process = psutil.Process(pid) 191 except psutil.NoSuchProcess as e: 192 warn(f"Process with PID {pid} does not exist.", stack=False) 193 raise e 194 195 command_args = process.cmdline() 196 is_daemon = command_args[1] == '-c' 197 198 if is_daemon: 199 daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '') 200 root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None) 201 if root_dir is None: 202 from meerschaum.config.paths import ROOT_DIR_PATH 203 root_dir = ROOT_DIR_PATH 204 else: 205 root_dir = pathlib.Path(root_dir) 206 jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name 207 daemon_dir = jobs_dir / daemon_id 208 pid_file = daemon_dir / 'process.pid' 209 210 if pid_file.exists(): 211 with open(pid_file, 'r', encoding='utf-8') as f: 212 daemon_pid = int(f.read()) 213 214 if pid != daemon_pid: 215 raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}") 216 else: 217 raise EnvironmentError(f"Is job '{daemon_id}' running?") 218 219 return Job(daemon_id, executor_keys=executor_keys) 220 221 from meerschaum._internal.arguments._parse_arguments import parse_arguments 222 from meerschaum.utils.daemon import get_new_daemon_name 223 224 mrsm_ix = 0 225 for i, arg in enumerate(command_args): 226 if 'mrsm' in arg or 'meerschaum' in arg.lower(): 227 mrsm_ix = i 228 break 229 230 sysargs = command_args[mrsm_ix+1:] 231 kwargs = parse_arguments(sysargs) 232 name = kwargs.get('name', get_new_daemon_name()) 233 return Job(name, sysargs, executor_keys=executor_keys)
Build a Job
from the PID of a running Meerschaum process.
Parameters
- pid (int): The PID of the process.
- executor_keys (Optional[str], default None): The executor keys to assign to the job.
235 def start(self, debug: bool = False) -> SuccessTuple: 236 """ 237 Start the job's daemon. 238 """ 239 if self.executor is not None: 240 if not self.exists(debug=debug): 241 return self.executor.create_job( 242 self.name, 243 self.sysargs, 244 properties=self.daemon.properties, 245 debug=debug, 246 ) 247 return self.executor.start_job(self.name, debug=debug) 248 249 if self.is_running(): 250 return True, f"{self} is already running." 251 252 success, msg = self.daemon.run( 253 keep_daemon_output=(not self.delete_after_completion), 254 allow_dirty_run=True, 255 ) 256 if not success: 257 return success, msg 258 259 return success, f"Started {self}."
Start the job's daemon.
261 def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple: 262 """ 263 Stop the job's daemon. 264 """ 265 if self.executor is not None: 266 return self.executor.stop_job(self.name, debug=debug) 267 268 if self.daemon.status == 'stopped': 269 if not self.restart: 270 return True, f"{self} is not running." 271 elif self.stop_time is not None: 272 return True, f"{self} will not restart until manually started." 273 274 quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds) 275 if quit_success: 276 return quit_success, f"Stopped {self}." 277 278 warn( 279 f"Failed to gracefully quit {self}.", 280 stack=False, 281 ) 282 kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds) 283 if not kill_success: 284 return kill_success, kill_msg 285 286 return kill_success, f"Killed {self}."
Stop the job's daemon.
288 def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple: 289 """ 290 Pause the job's daemon. 291 """ 292 if self.executor is not None: 293 return self.executor.pause_job(self.name, debug=debug) 294 295 pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds) 296 if not pause_success: 297 return pause_success, pause_msg 298 299 return pause_success, f"Paused {self}."
Pause the job's daemon.
301 def delete(self, debug: bool = False) -> SuccessTuple: 302 """ 303 Delete the job and its daemon. 304 """ 305 if self.executor is not None: 306 return self.executor.delete_job(self.name, debug=debug) 307 308 if self.is_running(): 309 stop_success, stop_msg = self.stop() 310 if not stop_success: 311 return stop_success, stop_msg 312 313 cleanup_success, cleanup_msg = self.daemon.cleanup() 314 if not cleanup_success: 315 return cleanup_success, cleanup_msg 316 317 _ = self.daemon._properties.pop('result', None) 318 return cleanup_success, f"Deleted {self}."
Delete the job and its daemon.
320 def is_running(self) -> bool: 321 """ 322 Determine whether the job's daemon is running. 323 """ 324 return self.status == 'running'
Determine whether the job's daemon is running.
326 def exists(self, debug: bool = False) -> bool: 327 """ 328 Determine whether the job exists. 329 """ 330 if self.executor is not None: 331 return self.executor.get_job_exists(self.name, debug=debug) 332 333 return self.daemon.path.exists()
Determine whether the job exists.
335 def get_logs(self) -> Union[str, None]: 336 """ 337 Return the output text of the job's daemon. 338 """ 339 if self.executor is not None: 340 return self.executor.get_logs(self.name) 341 342 return self.daemon.log_text
Return the output text of the job's daemon.
344 def monitor_logs( 345 self, 346 callback_function: Callable[[str], None] = partial(print, end=''), 347 input_callback_function: Optional[Callable[[], str]] = None, 348 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 349 stop_event: Optional[asyncio.Event] = None, 350 stop_on_exit: bool = False, 351 strip_timestamps: bool = False, 352 accept_input: bool = True, 353 debug: bool = False, 354 ): 355 """ 356 Monitor the job's log files and execute a callback on new lines. 357 358 Parameters 359 ---------- 360 callback_function: Callable[[str], None], default partial(print, end='') 361 The callback to execute as new data comes in. 362 Defaults to printing the output directly to `stdout`. 363 364 input_callback_function: Optional[Callable[[], str]], default None 365 If provided, execute this callback when the daemon is blocking on stdin. 366 Defaults to `sys.stdin.readline()`. 367 368 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 369 If provided, execute this callback when the daemon stops. 370 The job's SuccessTuple will be passed to the callback. 371 372 stop_event: Optional[asyncio.Event], default None 373 If provided, stop monitoring when this event is set. 374 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 375 from within `callback_function` to stop monitoring. 376 377 stop_on_exit: bool, default False 378 If `True`, stop monitoring when the job stops. 379 380 strip_timestamps: bool, default False 381 If `True`, remove leading timestamps from lines. 382 383 accept_input: bool, default True 384 If `True`, accept input when the daemon blocks on stdin. 385 """ 386 def default_input_callback_function(): 387 return sys.stdin.readline() 388 389 if input_callback_function is None: 390 input_callback_function = default_input_callback_function 391 392 if self.executor is not None: 393 self.executor.monitor_logs( 394 self.name, 395 callback_function, 396 input_callback_function=input_callback_function, 397 stop_callback_function=stop_callback_function, 398 stop_on_exit=stop_on_exit, 399 accept_input=accept_input, 400 strip_timestamps=strip_timestamps, 401 debug=debug, 402 ) 403 return 404 405 monitor_logs_coroutine = self.monitor_logs_async( 406 callback_function=callback_function, 407 input_callback_function=input_callback_function, 408 stop_callback_function=stop_callback_function, 409 stop_event=stop_event, 410 stop_on_exit=stop_on_exit, 411 strip_timestamps=strip_timestamps, 412 accept_input=accept_input, 413 ) 414 return asyncio.run(monitor_logs_coroutine)
Monitor the job's log files and execute a callback on new lines.
Parameters
- callback_function (Callable[[str], None], default partial(print, end='')):
The callback to execute as new data comes in.
Defaults to printing the output directly to
stdout
. - input_callback_function (Optional[Callable[[], str]], default None):
If provided, execute this callback when the daemon is blocking on stdin.
Defaults to
sys.stdin.readline()
. - stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
- stop_event (Optional[asyncio.Event], default None):
If provided, stop monitoring when this event is set.
You may instead raise
meerschaum.jobs.StopMonitoringLogs
from withincallback_function
to stop monitoring. - stop_on_exit (bool, default False):
If
True
, stop monitoring when the job stops. - strip_timestamps (bool, default False):
If
True
, remove leading timestamps from lines. - accept_input (bool, default True):
If
True
, accept input when the daemon blocks on stdin.
416 async def monitor_logs_async( 417 self, 418 callback_function: Callable[[str], None] = partial(print, end='', flush=True), 419 input_callback_function: Optional[Callable[[], str]] = None, 420 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 421 stop_event: Optional[asyncio.Event] = None, 422 stop_on_exit: bool = False, 423 strip_timestamps: bool = False, 424 accept_input: bool = True, 425 _logs_path: Optional[pathlib.Path] = None, 426 _log=None, 427 _stdin_file=None, 428 debug: bool = False, 429 ): 430 """ 431 Monitor the job's log files and await a callback on new lines. 432 433 Parameters 434 ---------- 435 callback_function: Callable[[str], None], default partial(print, end='') 436 The callback to execute as new data comes in. 437 Defaults to printing the output directly to `stdout`. 438 439 input_callback_function: Optional[Callable[[], str]], default None 440 If provided, execute this callback when the daemon is blocking on stdin. 441 Defaults to `sys.stdin.readline()`. 442 443 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 444 If provided, execute this callback when the daemon stops. 445 The job's SuccessTuple will be passed to the callback. 446 447 stop_event: Optional[asyncio.Event], default None 448 If provided, stop monitoring when this event is set. 449 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 450 from within `callback_function` to stop monitoring. 451 452 stop_on_exit: bool, default False 453 If `True`, stop monitoring when the job stops. 454 455 strip_timestamps: bool, default False 456 If `True`, remove leading timestamps from lines. 457 458 accept_input: bool, default True 459 If `True`, accept input when the daemon blocks on stdin. 460 """ 461 def default_input_callback_function(): 462 return sys.stdin.readline() 463 464 if input_callback_function is None: 465 input_callback_function = default_input_callback_function 466 467 if self.executor is not None: 468 await self.executor.monitor_logs_async( 469 self.name, 470 callback_function, 471 input_callback_function=input_callback_function, 472 stop_callback_function=stop_callback_function, 473 stop_on_exit=stop_on_exit, 474 strip_timestamps=strip_timestamps, 475 accept_input=accept_input, 476 debug=debug, 477 ) 478 return 479 480 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 481 482 events = { 483 'user': stop_event, 484 'stopped': asyncio.Event(), 485 } 486 combined_event = asyncio.Event() 487 emitted_text = False 488 stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file 489 490 async def check_job_status(): 491 nonlocal emitted_text 492 stopped_event = events.get('stopped', None) 493 if stopped_event is None: 494 return 495 496 sleep_time = 0.1 497 while sleep_time < 60: 498 if self.status == 'stopped': 499 if not emitted_text: 500 await asyncio.sleep(sleep_time) 501 sleep_time = round(sleep_time * 1.1, 2) 502 continue 503 504 if stop_callback_function is not None: 505 try: 506 if asyncio.iscoroutinefunction(stop_callback_function): 507 await stop_callback_function(self.result) 508 else: 509 stop_callback_function(self.result) 510 except asyncio.exceptions.CancelledError: 511 break 512 except Exception: 513 warn(traceback.format_exc()) 514 515 if stop_on_exit: 516 events['stopped'].set() 517 518 break 519 await asyncio.sleep(0.1) 520 521 async def check_blocking_on_input(): 522 while True: 523 if not emitted_text or not self.is_blocking_on_stdin(): 524 try: 525 await asyncio.sleep(0.1) 526 except asyncio.exceptions.CancelledError: 527 break 528 continue 529 530 if not self.is_running(): 531 break 532 533 await emit_latest_lines() 534 535 try: 536 print('', end='', flush=True) 537 if asyncio.iscoroutinefunction(input_callback_function): 538 data = await input_callback_function() 539 else: 540 data = input_callback_function() 541 except KeyboardInterrupt: 542 break 543 if not data.endswith('\n'): 544 data += '\n' 545 546 stdin_file.write(data) 547 await asyncio.sleep(0.1) 548 549 async def combine_events(): 550 event_tasks = [ 551 asyncio.create_task(event.wait()) 552 for event in events.values() 553 if event is not None 554 ] 555 if not event_tasks: 556 return 557 558 try: 559 done, pending = await asyncio.wait( 560 event_tasks, 561 return_when=asyncio.FIRST_COMPLETED, 562 ) 563 for task in pending: 564 task.cancel() 565 except asyncio.exceptions.CancelledError: 566 pass 567 finally: 568 combined_event.set() 569 570 check_job_status_task = asyncio.create_task(check_job_status()) 571 check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input()) 572 combine_events_task = asyncio.create_task(combine_events()) 573 574 log = _log if _log is not None else self.daemon.rotating_log 575 lines_to_show = get_config('jobs', 'logs', 'lines_to_show') 576 577 async def emit_latest_lines(): 578 nonlocal emitted_text 579 lines = log.readlines() 580 for line in lines[(-1 * lines_to_show):]: 581 if stop_event is not None and stop_event.is_set(): 582 return 583 584 if strip_timestamps: 585 line = strip_timestamp_from_line(line) 586 587 try: 588 if asyncio.iscoroutinefunction(callback_function): 589 await callback_function(line) 590 else: 591 callback_function(line) 592 emitted_text = True 593 except StopMonitoringLogs: 594 return 595 except Exception: 596 warn(f"Error in logs callback:\n{traceback.format_exc()}") 597 598 await emit_latest_lines() 599 600 tasks = ( 601 [check_job_status_task] 602 + ([check_blocking_on_input_task] if accept_input else []) 603 + [combine_events_task] 604 ) 605 try: 606 _ = asyncio.gather(*tasks, return_exceptions=True) 607 except asyncio.exceptions.CancelledError: 608 raise 609 except Exception: 610 warn(f"Failed to run async checks:\n{traceback.format_exc()}") 611 612 watchfiles = mrsm.attempt_import('watchfiles') 613 async for changes in watchfiles.awatch( 614 _logs_path or LOGS_RESOURCES_PATH, 615 stop_event=combined_event, 616 ): 617 for change in changes: 618 file_path_str = change[1] 619 file_path = pathlib.Path(file_path_str) 620 latest_subfile_path = log.get_latest_subfile_path() 621 if latest_subfile_path != file_path: 622 continue 623 624 await emit_latest_lines() 625 626 await emit_latest_lines()
Monitor the job's log files and await a callback on new lines.
Parameters
- callback_function (Callable[[str], None], default partial(print, end='')):
The callback to execute as new data comes in.
Defaults to printing the output directly to
stdout
. - input_callback_function (Optional[Callable[[], str]], default None):
If provided, execute this callback when the daemon is blocking on stdin.
Defaults to
sys.stdin.readline()
. - stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
- stop_event (Optional[asyncio.Event], default None):
If provided, stop monitoring when this event is set.
You may instead raise
meerschaum.jobs.StopMonitoringLogs
from withincallback_function
to stop monitoring. - stop_on_exit (bool, default False):
If
True
, stop monitoring when the job stops. - strip_timestamps (bool, default False):
If
True
, remove leading timestamps from lines. - accept_input (bool, default True):
If
True
, accept input when the daemon blocks on stdin.
628 def is_blocking_on_stdin(self, debug: bool = False) -> bool: 629 """ 630 Return whether a job's daemon is blocking on stdin. 631 """ 632 if self.executor is not None: 633 return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug) 634 635 return self.is_running() and self.daemon.blocking_stdin_file_path.exists()
Return whether a job's daemon is blocking on stdin.
637 def write_stdin(self, data): 638 """ 639 Write to a job's daemon's `stdin`. 640 """ 641 self.daemon.stdin_file.write(data)
Write to a job's daemon's stdin
.
643 @property 644 def executor(self) -> Union[Executor, None]: 645 """ 646 If the job is remote, return the connector to the remote API instance. 647 """ 648 return ( 649 mrsm.get_connector(self.executor_keys) 650 if self.executor_keys != 'local' 651 else None 652 )
If the job is remote, return the connector to the remote API instance.
654 @property 655 def status(self) -> str: 656 """ 657 Return the running status of the job's daemon. 658 """ 659 if '_status_hook' in self.__dict__: 660 return self._status_hook() 661 662 if self.executor is not None: 663 return self.executor.get_job_status(self.name) 664 665 return self.daemon.status
Return the running status of the job's daemon.
667 @property 668 def pid(self) -> Union[int, None]: 669 """ 670 Return the PID of the job's dameon. 671 """ 672 if self.executor is not None: 673 return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None) 674 675 return self.daemon.pid
Return the PID of the job's dameon.
677 @property 678 def restart(self) -> bool: 679 """ 680 Return whether to restart a stopped job. 681 """ 682 if self.executor is not None: 683 return self.executor.get_job_metadata(self.name).get('restart', False) 684 685 return self.daemon.properties.get('restart', False)
Return whether to restart a stopped job.
687 @property 688 def result(self) -> SuccessTuple: 689 """ 690 Return the `SuccessTuple` when the job has terminated. 691 """ 692 if self.is_running(): 693 return True, f"{self} is running." 694 695 if '_result_hook' in self.__dict__: 696 return self._result_hook() 697 698 if self.executor is not None: 699 return ( 700 self.executor.get_job_metadata(self.name) 701 .get('result', (False, "No result available.")) 702 ) 703 704 _result = self.daemon.properties.get('result', None) 705 if _result is None: 706 return False, "No result available." 707 708 return tuple(_result)
Return the SuccessTuple
when the job has terminated.
710 @property 711 def sysargs(self) -> List[str]: 712 """ 713 Return the sysargs to use for the Daemon. 714 """ 715 if self._sysargs: 716 return self._sysargs 717 718 if self.executor is not None: 719 return self.executor.get_job_metadata(self.name).get('sysargs', []) 720 721 target_args = self.daemon.target_args 722 if target_args is None: 723 return [] 724 self._sysargs = target_args[0] if len(target_args) > 0 else [] 725 return self._sysargs
Return the sysargs to use for the Daemon.
727 @property 728 def daemon(self) -> 'Daemon': 729 """ 730 Return the daemon which this job manages. 731 """ 732 from meerschaum.utils.daemon import Daemon 733 if self._daemon is not None and self.executor is None and self._sysargs: 734 return self._daemon 735 736 remote_properties = ( 737 {} 738 if self.executor is None 739 else self.executor.get_job_properties(self.name) 740 ) 741 properties = {**remote_properties, **self._properties_patch} 742 743 self._daemon = Daemon( 744 target=entry, 745 target_args=[self._sysargs], 746 target_kw={}, 747 daemon_id=self.name, 748 label=shlex.join(self._sysargs), 749 properties=properties, 750 ) 751 if '_rotating_log' in self.__dict__: 752 self._daemon._rotating_log = self._rotating_log 753 754 if '_stdin_file' in self.__dict__: 755 self._daemon._stdin_file = self._stdin_file 756 self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path 757 758 return self._daemon
Return the daemon which this job manages.
760 @property 761 def began(self) -> Union[datetime, None]: 762 """ 763 The datetime when the job began running. 764 """ 765 if self.executor is not None: 766 began_str = self.executor.get_job_began(self.name) 767 if began_str is None: 768 return None 769 return ( 770 datetime.fromisoformat(began_str) 771 .astimezone(timezone.utc) 772 .replace(tzinfo=None) 773 ) 774 775 began_str = self.daemon.properties.get('process', {}).get('began', None) 776 if began_str is None: 777 return None 778 779 return datetime.fromisoformat(began_str)
The datetime when the job began running.
781 @property 782 def ended(self) -> Union[datetime, None]: 783 """ 784 The datetime when the job stopped running. 785 """ 786 if self.executor is not None: 787 ended_str = self.executor.get_job_ended(self.name) 788 if ended_str is None: 789 return None 790 return ( 791 datetime.fromisoformat(ended_str) 792 .astimezone(timezone.utc) 793 .replace(tzinfo=None) 794 ) 795 796 ended_str = self.daemon.properties.get('process', {}).get('ended', None) 797 if ended_str is None: 798 return None 799 800 return datetime.fromisoformat(ended_str)
The datetime when the job stopped running.
802 @property 803 def paused(self) -> Union[datetime, None]: 804 """ 805 The datetime when the job was suspended while running. 806 """ 807 if self.executor is not None: 808 paused_str = self.executor.get_job_paused(self.name) 809 if paused_str is None: 810 return None 811 return ( 812 datetime.fromisoformat(paused_str) 813 .astimezone(timezone.utc) 814 .replace(tzinfo=None) 815 ) 816 817 paused_str = self.daemon.properties.get('process', {}).get('paused', None) 818 if paused_str is None: 819 return None 820 821 return datetime.fromisoformat(paused_str)
The datetime when the job was suspended while running.
823 @property 824 def stop_time(self) -> Union[datetime, None]: 825 """ 826 Return the timestamp when the job was manually stopped. 827 """ 828 if self.executor is not None: 829 return self.executor.get_job_stop_time(self.name) 830 831 if not self.daemon.stop_path.exists(): 832 return None 833 834 stop_data = self.daemon._read_stop_file() 835 if not stop_data: 836 return None 837 838 stop_time_str = stop_data.get('stop_time', None) 839 if not stop_time_str: 840 warn(f"Could not read stop time for {self}.") 841 return None 842 843 return datetime.fromisoformat(stop_time_str)
Return the timestamp when the job was manually stopped.
856 def check_restart(self) -> SuccessTuple: 857 """ 858 If `restart` is `True` and the daemon is not running, 859 restart the job. 860 Do not restart if the job was manually stopped. 861 """ 862 if self.is_running(): 863 return True, f"{self} is running." 864 865 if not self.restart: 866 return True, f"{self} does not need to be restarted." 867 868 if self.stop_time is not None: 869 return True, f"{self} was manually stopped." 870 871 return self.start()
If restart
is True
and the daemon is not running,
restart the job.
Do not restart if the job was manually stopped.
873 @property 874 def label(self) -> str: 875 """ 876 Return the job's Daemon label (joined sysargs). 877 """ 878 from meerschaum._internal.arguments import compress_pipeline_sysargs 879 sysargs = compress_pipeline_sysargs(self.sysargs) 880 return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip()
Return the job's Daemon label (joined sysargs).
909 @property 910 def env(self) -> Dict[str, str]: 911 """ 912 Return the environment variables to set for the job's process. 913 """ 914 if '_env' in self.__dict__: 915 return self.__dict__['_env'] 916 917 _env = self.daemon.properties.get('env', {}) 918 default_env = { 919 'PYTHONUNBUFFERED': '1', 920 'LINES': str(get_config('jobs', 'terminal', 'lines')), 921 'COLUMNS': str(get_config('jobs', 'terminal', 'columns')), 922 STATIC_CONFIG['environment']['noninteractive']: 'true', 923 } 924 self._env = {**default_env, **_env} 925 return self._env
Return the environment variables to set for the job's process.
927 @property 928 def delete_after_completion(self) -> bool: 929 """ 930 Return whether this job is configured to delete itself after completion. 931 """ 932 if '_delete_after_completion' in self.__dict__: 933 return self.__dict__.get('_delete_after_completion', False) 934 935 self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False) 936 return self._delete_after_completion
Return whether this job is configured to delete itself after completion.
44class StopMonitoringLogs(Exception): 45 """ 46 Raise this exception to stop the logs monitoring. 47 """
Raise this exception to stop the logs monitoring.
37def get_jobs( 38 executor_keys: Optional[str] = None, 39 include_hidden: bool = False, 40 combine_local_and_systemd: bool = True, 41 debug: bool = False, 42) -> Dict[str, Job]: 43 """ 44 Return a dictionary of the existing jobs. 45 46 Parameters 47 ---------- 48 executor_keys: Optional[str], default None 49 If provided, return remote jobs on the given API instance. 50 Otherwise return local jobs. 51 52 include_hidden: bool, default False 53 If `True`, include jobs with the `hidden` attribute. 54 55 Returns 56 ------- 57 A dictionary mapping job names to jobs. 58 """ 59 from meerschaum.connectors.parse import parse_executor_keys 60 executor_keys = executor_keys or get_executor_keys_from_context() 61 62 include_local_and_system = ( 63 combine_local_and_systemd 64 and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd') 65 and get_executor_keys_from_context() == 'systemd' 66 ) 67 68 def _get_local_jobs(): 69 from meerschaum.utils.daemon import get_daemons 70 daemons = get_daemons() 71 jobs = { 72 daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local') 73 for daemon in daemons 74 } 75 return { 76 name: job 77 for name, job in jobs.items() 78 if (include_hidden or not job.hidden) and not job._is_externally_managed 79 80 } 81 82 def _get_systemd_jobs(): 83 conn = mrsm.get_connector('systemd') 84 jobs = conn.get_jobs(debug=debug) 85 return { 86 name: job 87 for name, job in jobs.items() 88 if include_hidden or not job.hidden 89 } 90 91 if include_local_and_system: 92 local_jobs = _get_local_jobs() 93 systemd_jobs = _get_systemd_jobs() 94 shared_jobs = set(local_jobs) & set(systemd_jobs) 95 if shared_jobs: 96 from meerschaum.utils.misc import items_str 97 from meerschaum.utils.warnings import warn 98 warn( 99 "Job" 100 + ('s' if len(shared_jobs) != 1 else '') 101 + f" {items_str(list(shared_jobs))} " 102 + "exist" 103 + ('s' if len(shared_jobs) == 1 else '') 104 + " in both `local` and `systemd`.", 105 stack=False, 106 ) 107 return {**local_jobs, **systemd_jobs} 108 109 if executor_keys == 'local': 110 return _get_local_jobs() 111 112 if executor_keys == 'systemd': 113 return _get_systemd_jobs() 114 115 try: 116 _ = parse_executor_keys(executor_keys, construct=False) 117 conn = parse_executor_keys(executor_keys) 118 jobs = conn.get_jobs(debug=debug) 119 return { 120 name: job 121 for name, job in jobs.items() 122 if include_hidden or not job.hidden 123 } 124 except Exception: 125 return {}
Return a dictionary of the existing jobs.
Parameters
- executor_keys (Optional[str], default None): If provided, return remote jobs on the given API instance. Otherwise return local jobs.
- include_hidden (bool, default False):
If
True
, include jobs with thehidden
attribute.
Returns
- A dictionary mapping job names to jobs.
128def get_filtered_jobs( 129 executor_keys: Optional[str] = None, 130 filter_list: Optional[List[str]] = None, 131 include_hidden: bool = False, 132 combine_local_and_systemd: bool = True, 133 warn: bool = False, 134 debug: bool = False, 135) -> Dict[str, Job]: 136 """ 137 Return a list of jobs filtered by the user. 138 """ 139 from meerschaum.utils.warnings import warn as _warn 140 jobs = get_jobs( 141 executor_keys, 142 include_hidden=True, 143 combine_local_and_systemd=combine_local_and_systemd, 144 debug=debug, 145 ) 146 if not filter_list: 147 return { 148 name: job 149 for name, job in jobs.items() 150 if include_hidden or not job.hidden 151 } 152 153 jobs_to_return = {} 154 filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')] 155 filter_list_with_underscores = [name for name in filter_list if name.startswith('_')] 156 if ( 157 filter_list_without_underscores and not filter_list_with_underscores 158 or filter_list_with_underscores and not filter_list_without_underscores 159 ): 160 pass 161 for name in filter_list: 162 job = jobs.get(name, None) 163 if job is None: 164 if warn: 165 _warn( 166 f"Job '{name}' does not exist.", 167 stack=False, 168 ) 169 continue 170 jobs_to_return[name] = job 171 172 if not jobs_to_return and filter_list_with_underscores: 173 names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores] 174 return { 175 name: job 176 for name, job in jobs.items() 177 if name not in names_to_exclude 178 } 179 180 return jobs_to_return
Return a list of jobs filtered by the user.
183def get_restart_jobs( 184 executor_keys: Optional[str] = None, 185 jobs: Optional[Dict[str, Job]] = None, 186 include_hidden: bool = False, 187 combine_local_and_systemd: bool = True, 188 debug: bool = False, 189) -> Dict[str, Job]: 190 """ 191 Return jobs which were created with `--restart` or `--loop`. 192 """ 193 if jobs is None: 194 jobs = get_jobs( 195 executor_keys, 196 include_hidden=include_hidden, 197 combine_local_and_systemd=combine_local_and_systemd, 198 debug=debug, 199 ) 200 201 return { 202 name: job 203 for name, job in jobs.items() 204 if job.restart 205 }
Return jobs which were created with --restart
or --loop
.
208def get_running_jobs( 209 executor_keys: Optional[str] = None, 210 jobs: Optional[Dict[str, Job]] = None, 211 include_hidden: bool = False, 212 combine_local_and_systemd: bool = True, 213 debug: bool = False, 214) -> Dict[str, Job]: 215 """ 216 Return a dictionary of running jobs. 217 """ 218 if jobs is None: 219 jobs = get_jobs( 220 executor_keys, 221 include_hidden=include_hidden, 222 combine_local_and_systemd=combine_local_and_systemd, 223 debug=debug, 224 ) 225 226 return { 227 name: job 228 for name, job in jobs.items() 229 if job.status == 'running' 230 }
Return a dictionary of running jobs.
258def get_stopped_jobs( 259 executor_keys: Optional[str] = None, 260 jobs: Optional[Dict[str, Job]] = None, 261 include_hidden: bool = False, 262 combine_local_and_systemd: bool = True, 263 debug: bool = False, 264) -> Dict[str, Job]: 265 """ 266 Return a dictionary of stopped jobs. 267 """ 268 if jobs is None: 269 jobs = get_jobs( 270 executor_keys, 271 include_hidden=include_hidden, 272 combine_local_and_systemd=combine_local_and_systemd, 273 debug=debug, 274 ) 275 276 return { 277 name: job 278 for name, job in jobs.items() 279 if job.status == 'stopped' 280 }
Return a dictionary of stopped jobs.
233def get_paused_jobs( 234 executor_keys: Optional[str] = None, 235 jobs: Optional[Dict[str, Job]] = None, 236 include_hidden: bool = False, 237 combine_local_and_systemd: bool = True, 238 debug: bool = False, 239) -> Dict[str, Job]: 240 """ 241 Return a dictionary of paused jobs. 242 """ 243 if jobs is None: 244 jobs = get_jobs( 245 executor_keys, 246 include_hidden=include_hidden, 247 combine_local_and_systemd=combine_local_and_systemd, 248 debug=debug, 249 ) 250 251 return { 252 name: job 253 for name, job in jobs.items() 254 if job.status == 'paused' 255 }
Return a dictionary of paused jobs.
283def make_executor(cls): 284 """ 285 Register a class as an `Executor`. 286 """ 287 import re 288 from meerschaum.connectors import make_connector 289 suffix_regex = r'executor$' 290 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 291 if typ not in executor_types: 292 executor_types.append(typ) 293 return make_connector(cls, _is_executor=True)
Register a class as an Executor
.
19class Executor(Connector): 20 """ 21 Define the methods for managing jobs. 22 """ 23 24 @abstractmethod 25 def get_job_exists(self, name: str, debug: bool = False) -> bool: 26 """ 27 Return whether a job exists. 28 """ 29 30 @abstractmethod 31 def get_jobs(self) -> Dict[str, Job]: 32 """ 33 Return a dictionary of names -> Jobs. 34 """ 35 36 @abstractmethod 37 def create_job( 38 self, 39 name: str, 40 sysargs: List[str], 41 properties: Optional[Dict[str, Any]] = None, 42 debug: bool = False, 43 ) -> SuccessTuple: 44 """ 45 Create a new job. 46 """ 47 48 @abstractmethod 49 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 50 """ 51 Start a job. 52 """ 53 54 @abstractmethod 55 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 56 """ 57 Stop a job. 58 """ 59 60 @abstractmethod 61 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 62 """ 63 Pause a job. 64 """ 65 66 @abstractmethod 67 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 68 """ 69 Delete a job. 70 """ 71 72 @abstractmethod 73 def get_logs(self, name: str, debug: bool = False) -> str: 74 """ 75 Return a job's log output. 76 """
Define the methods for managing jobs.
24 @abstractmethod 25 def get_job_exists(self, name: str, debug: bool = False) -> bool: 26 """ 27 Return whether a job exists. 28 """
Return whether a job exists.
30 @abstractmethod 31 def get_jobs(self) -> Dict[str, Job]: 32 """ 33 Return a dictionary of names -> Jobs. 34 """
Return a dictionary of names -> Jobs.
36 @abstractmethod 37 def create_job( 38 self, 39 name: str, 40 sysargs: List[str], 41 properties: Optional[Dict[str, Any]] = None, 42 debug: bool = False, 43 ) -> SuccessTuple: 44 """ 45 Create a new job. 46 """
Create a new job.
48 @abstractmethod 49 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 50 """ 51 Start a job. 52 """
Start a job.
54 @abstractmethod 55 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 56 """ 57 Stop a job. 58 """
Stop a job.
60 @abstractmethod 61 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 62 """ 63 Pause a job. 64 """
Pause a job.
296def check_restart_jobs( 297 executor_keys: Optional[str] = 'local', 298 jobs: Optional[Dict[str, Job]] = None, 299 include_hidden: bool = True, 300 silent: bool = False, 301 debug: bool = False, 302) -> SuccessTuple: 303 """ 304 Restart any stopped jobs which were created with `--restart`. 305 306 Parameters 307 ---------- 308 executor_keys: Optional[str], default None 309 If provided, check jobs on the given remote API instance. 310 Otherwise check local jobs. 311 312 include_hidden: bool, default True 313 If `True`, include hidden jobs in the check. 314 315 silent: bool, default False 316 If `True`, do not print the restart success message. 317 """ 318 from meerschaum.utils.misc import items_str 319 320 if jobs is None: 321 jobs = get_jobs( 322 executor_keys, 323 include_hidden=include_hidden, 324 combine_local_and_systemd=False, 325 debug=debug, 326 ) 327 328 if not jobs: 329 return True, "No jobs to restart." 330 331 results = {} 332 for name, job in jobs.items(): 333 check_success, check_msg = job.check_restart() 334 results[job.name] = (check_success, check_msg) 335 if not silent: 336 mrsm.pprint((check_success, check_msg)) 337 338 success_names = [name for name, (check_success, check_msg) in results.items() if check_success] 339 fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success] 340 success = len(success_names) == len(jobs) 341 msg = ( 342 ( 343 "Successfully restarted job" 344 + ('s' if len(success_names) != 1 else '') 345 + ' ' + items_str(success_names) + '.' 346 ) 347 if success 348 else ( 349 "Failed to restart job" 350 + ('s' if len(success_names) != 1 else '') 351 + ' ' + items_str(fail_names) + '.' 352 ) 353 ) 354 return success, msg
Restart any stopped jobs which were created with --restart
.
Parameters
- executor_keys (Optional[str], default None): If provided, check jobs on the given remote API instance. Otherwise check local jobs.
- include_hidden (bool, default True):
If
True
, include hidden jobs in the check. - silent (bool, default False):
If
True
, do not print the restart success message.
366def start_check_jobs_thread(): 367 """ 368 Start a thread to regularly monitor jobs. 369 """ 370 import atexit 371 from functools import partial 372 from meerschaum.utils.threading import RepeatTimer 373 from meerschaum.config.static import STATIC_CONFIG 374 375 global _check_loop_stop_thread 376 sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds'] 377 378 _check_loop_stop_thread = RepeatTimer( 379 sleep_seconds, 380 partial( 381 _check_restart_jobs_against_lock, 382 silent=True, 383 ) 384 ) 385 _check_loop_stop_thread.daemon = True 386 atexit.register(stop_check_jobs_thread) 387 388 _check_loop_stop_thread.start() 389 return _check_loop_stop_thread
Start a thread to regularly monitor jobs.
392def stop_check_jobs_thread(): 393 """ 394 Stop the job monitoring thread. 395 """ 396 from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH 397 from meerschaum.utils.warnings import warn 398 if _check_loop_stop_thread is None: 399 return 400 401 _check_loop_stop_thread.cancel() 402 403 try: 404 if CHECK_JOBS_LOCK_PATH.exists(): 405 CHECK_JOBS_LOCK_PATH.unlink() 406 except Exception as e: 407 warn(f"Failed to remove check jobs lock file:\n{e}")
Stop the job monitoring thread.