meerschaum.jobs
Higher-level utilities for managing meerschaum.utils.daemon.Daemon
.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Higher-level utilities for managing `meerschaum.utils.daemon.Daemon`. 7""" 8 9import pathlib 10 11import meerschaum as mrsm 12from meerschaum.utils.typing import Dict, Optional, List, SuccessTuple 13 14from meerschaum.jobs._Job import Job, StopMonitoringLogs 15from meerschaum.jobs._Executor import Executor 16 17__all__ = ( 18 'Job', 19 'StopMonitoringLogs', 20 'systemd', 21 'get_jobs', 22 'get_filtered_jobs', 23 'get_restart_jobs', 24 'get_running_jobs', 25 'get_stopped_jobs', 26 'get_paused_jobs', 27 'get_restart_jobs', 28 'make_executor', 29 'Executor', 30 'check_restart_jobs', 31 'start_check_jobs_thread', 32 'stop_check_jobs_thread', 33) 34 35executor_types: List[str] = ['api', 'local'] 36 37 38def get_jobs( 39 executor_keys: Optional[str] = None, 40 include_hidden: bool = False, 41 combine_local_and_systemd: bool = True, 42 debug: bool = False, 43) -> Dict[str, Job]: 44 """ 45 Return a dictionary of the existing jobs. 46 47 Parameters 48 ---------- 49 executor_keys: Optional[str], default None 50 If provided, return remote jobs on the given API instance. 51 Otherwise return local jobs. 52 53 include_hidden: bool, default False 54 If `True`, include jobs with the `hidden` attribute. 55 56 Returns 57 ------- 58 A dictionary mapping job names to jobs. 59 """ 60 from meerschaum.connectors.parse import parse_executor_keys 61 executor_keys = executor_keys or get_executor_keys_from_context() 62 63 include_local_and_system = ( 64 combine_local_and_systemd 65 and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd') 66 and get_executor_keys_from_context() == 'systemd' 67 ) 68 69 def _get_local_jobs(): 70 from meerschaum.utils.daemon import get_daemons 71 daemons = get_daemons() 72 jobs = { 73 daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local') 74 for daemon in daemons 75 } 76 return { 77 name: job 78 for name, job in jobs.items() 79 if (include_hidden or not job.hidden) and not job._is_externally_managed 80 81 } 82 83 def _get_systemd_jobs(): 84 conn = mrsm.get_connector('systemd') 85 jobs = conn.get_jobs(debug=debug) 86 return { 87 name: job 88 for name, job in jobs.items() 89 if include_hidden or not job.hidden 90 } 91 92 if include_local_and_system: 93 local_jobs = _get_local_jobs() 94 systemd_jobs = _get_systemd_jobs() 95 shared_jobs = set(local_jobs) & set(systemd_jobs) 96 if shared_jobs: 97 from meerschaum.utils.misc import items_str 98 from meerschaum.utils.warnings import warn 99 warn( 100 "Job" 101 + ('s' if len(shared_jobs) != 1 else '') 102 + f" {items_str(list(shared_jobs))} " 103 + "exist" 104 + ('s' if len(shared_jobs) == 1 else '') 105 + " in both `local` and `systemd`.", 106 stack=False, 107 ) 108 return {**local_jobs, **systemd_jobs} 109 110 if executor_keys == 'local': 111 return _get_local_jobs() 112 113 if executor_keys == 'systemd': 114 return _get_systemd_jobs() 115 116 try: 117 _ = parse_executor_keys(executor_keys, construct=False) 118 conn = parse_executor_keys(executor_keys) 119 jobs = conn.get_jobs(debug=debug) 120 return { 121 name: job 122 for name, job in jobs.items() 123 if include_hidden or not job.hidden 124 } 125 except Exception: 126 return {} 127 128 129def get_filtered_jobs( 130 executor_keys: Optional[str] = None, 131 filter_list: Optional[List[str]] = None, 132 include_hidden: bool = False, 133 combine_local_and_systemd: bool = True, 134 warn: bool = False, 135 debug: bool = False, 136) -> Dict[str, Job]: 137 """ 138 Return a list of jobs filtered by the user. 139 """ 140 from meerschaum.utils.warnings import warn as _warn 141 jobs = get_jobs( 142 executor_keys, 143 include_hidden=True, 144 combine_local_and_systemd=combine_local_and_systemd, 145 debug=debug, 146 ) 147 if not filter_list: 148 return { 149 name: job 150 for name, job in jobs.items() 151 if include_hidden or not job.hidden 152 } 153 154 jobs_to_return = {} 155 for name in filter_list: 156 job = jobs.get(name, None) 157 if job is None: 158 if warn: 159 _warn( 160 f"Job '{name}' does not exist.", 161 stack=False, 162 ) 163 continue 164 jobs_to_return[name] = job 165 166 return jobs_to_return 167 168 169def get_restart_jobs( 170 executor_keys: Optional[str] = None, 171 jobs: Optional[Dict[str, Job]] = None, 172 include_hidden: bool = False, 173 combine_local_and_systemd: bool = True, 174 debug: bool = False, 175) -> Dict[str, Job]: 176 """ 177 Return jobs which were created with `--restart` or `--loop`. 178 """ 179 if jobs is None: 180 jobs = get_jobs( 181 executor_keys, 182 include_hidden=include_hidden, 183 combine_local_and_systemd=combine_local_and_systemd, 184 debug=debug, 185 ) 186 187 return { 188 name: job 189 for name, job in jobs.items() 190 if job.restart 191 } 192 193 194def get_running_jobs( 195 executor_keys: Optional[str] = None, 196 jobs: Optional[Dict[str, Job]] = None, 197 include_hidden: bool = False, 198 combine_local_and_systemd: bool = True, 199 debug: bool = False, 200) -> Dict[str, Job]: 201 """ 202 Return a dictionary of running jobs. 203 """ 204 if jobs is None: 205 jobs = get_jobs( 206 executor_keys, 207 include_hidden=include_hidden, 208 combine_local_and_systemd=combine_local_and_systemd, 209 debug=debug, 210 ) 211 212 return { 213 name: job 214 for name, job in jobs.items() 215 if job.status == 'running' 216 } 217 218 219def get_paused_jobs( 220 executor_keys: Optional[str] = None, 221 jobs: Optional[Dict[str, Job]] = None, 222 include_hidden: bool = False, 223 combine_local_and_systemd: bool = True, 224 debug: bool = False, 225) -> Dict[str, Job]: 226 """ 227 Return a dictionary of paused jobs. 228 """ 229 if jobs is None: 230 jobs = get_jobs( 231 executor_keys, 232 include_hidden=include_hidden, 233 combine_local_and_systemd=combine_local_and_systemd, 234 debug=debug, 235 ) 236 237 return { 238 name: job 239 for name, job in jobs.items() 240 if job.status == 'paused' 241 } 242 243 244def get_stopped_jobs( 245 executor_keys: Optional[str] = None, 246 jobs: Optional[Dict[str, Job]] = None, 247 include_hidden: bool = False, 248 combine_local_and_systemd: bool = True, 249 debug: bool = False, 250) -> Dict[str, Job]: 251 """ 252 Return a dictionary of stopped jobs. 253 """ 254 if jobs is None: 255 jobs = get_jobs( 256 executor_keys, 257 include_hidden=include_hidden, 258 combine_local_and_systemd=combine_local_and_systemd, 259 debug=debug, 260 ) 261 262 return { 263 name: job 264 for name, job in jobs.items() 265 if job.status == 'stopped' 266 } 267 268 269def make_executor(cls): 270 """ 271 Register a class as an `Executor`. 272 """ 273 import re 274 from meerschaum.connectors import make_connector 275 suffix_regex = r'executor$' 276 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 277 if typ not in executor_types: 278 executor_types.append(typ) 279 return make_connector(cls, _is_executor=True) 280 281 282def check_restart_jobs( 283 executor_keys: Optional[str] = 'local', 284 jobs: Optional[Dict[str, Job]] = None, 285 include_hidden: bool = True, 286 silent: bool = False, 287 debug: bool = False, 288) -> SuccessTuple: 289 """ 290 Restart any stopped jobs which were created with `--restart`. 291 292 Parameters 293 ---------- 294 executor_keys: Optional[str], default None 295 If provided, check jobs on the given remote API instance. 296 Otherwise check local jobs. 297 298 include_hidden: bool, default True 299 If `True`, include hidden jobs in the check. 300 301 silent: bool, default False 302 If `True`, do not print the restart success message. 303 """ 304 from meerschaum.utils.misc import items_str 305 306 if jobs is None: 307 jobs = get_jobs( 308 executor_keys, 309 include_hidden=include_hidden, 310 combine_local_and_systemd=False, 311 debug=debug, 312 ) 313 314 if not jobs: 315 return True, "No jobs to restart." 316 317 results = {} 318 for name, job in jobs.items(): 319 check_success, check_msg = job.check_restart() 320 results[job.name] = (check_success, check_msg) 321 if not silent: 322 mrsm.pprint((check_success, check_msg)) 323 324 success_names = [name for name, (check_success, check_msg) in results.items() if check_success] 325 fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success] 326 success = len(success_names) == len(jobs) 327 msg = ( 328 ( 329 "Successfully restarted job" 330 + ('s' if len(success_names) != 1 else '') 331 + ' ' + items_str(success_names) + '.' 332 ) 333 if success 334 else ( 335 "Failed to restart job" 336 + ('s' if len(success_names) != 1 else '') 337 + ' ' + items_str(fail_names) + '.' 338 ) 339 ) 340 return success, msg 341 342 343def _check_restart_jobs_against_lock(*args, **kwargs): 344 from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH 345 fasteners = mrsm.attempt_import('fasteners') 346 lock = fasteners.InterProcessLock(CHECK_JOBS_LOCK_PATH) 347 with lock: 348 check_restart_jobs(*args, **kwargs) 349 350 351_check_loop_stop_thread = None 352def start_check_jobs_thread(): 353 """ 354 Start a thread to regularly monitor jobs. 355 """ 356 import atexit 357 from functools import partial 358 from meerschaum.utils.threading import RepeatTimer 359 from meerschaum.config.static import STATIC_CONFIG 360 361 global _check_loop_stop_thread 362 sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds'] 363 364 _check_loop_stop_thread = RepeatTimer( 365 sleep_seconds, 366 partial( 367 _check_restart_jobs_against_lock, 368 silent=True, 369 ) 370 ) 371 _check_loop_stop_thread.daemon = True 372 atexit.register(stop_check_jobs_thread) 373 374 _check_loop_stop_thread.start() 375 return _check_loop_stop_thread 376 377 378def stop_check_jobs_thread(): 379 """ 380 Stop the job monitoring thread. 381 """ 382 from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH 383 from meerschaum.utils.warnings import warn 384 if _check_loop_stop_thread is None: 385 return 386 387 _check_loop_stop_thread.cancel() 388 389 try: 390 if CHECK_JOBS_LOCK_PATH.exists(): 391 CHECK_JOBS_LOCK_PATH.unlink() 392 except Exception as e: 393 warn(f"Failed to remove check jobs lock file:\n{e}") 394 395 396_context_keys = None 397def get_executor_keys_from_context() -> str: 398 """ 399 If we are running on the host with the default root, default to `'systemd'`. 400 Otherwise return `'local'`. 401 """ 402 global _context_keys 403 404 if _context_keys is not None: 405 return _context_keys 406 407 from meerschaum.config import get_config 408 from meerschaum.config.paths import ROOT_DIR_PATH, DEFAULT_ROOT_DIR_PATH 409 from meerschaum.utils.misc import is_systemd_available 410 411 configured_executor = get_config('meerschaum', 'executor', warn=False) 412 if configured_executor is not None: 413 return configured_executor 414 415 _context_keys = ( 416 'systemd' 417 if is_systemd_available() and ROOT_DIR_PATH == DEFAULT_ROOT_DIR_PATH 418 else 'local' 419 ) 420 return _context_keys 421 422 423def _install_healthcheck_job() -> SuccessTuple: 424 """ 425 Install the systemd job which checks local jobs. 426 """ 427 from meerschaum.config import get_config 428 429 enable_healthcheck = get_config('system', 'experimental', 'systemd_healthcheck') 430 if not enable_healthcheck: 431 return False, "Local healthcheck is disabled." 432 433 if get_executor_keys_from_context() != 'systemd': 434 return False, "Not running systemd." 435 436 job = Job( 437 '.local-healthcheck', 438 ['restart', 'jobs', '-e', 'local', '--loop', '--min-seconds', '60'], 439 executor_keys='systemd', 440 ) 441 return job.start()
50class Job: 51 """ 52 Manage a `meerschaum.utils.daemon.Daemon`, locally or remotely via the API. 53 """ 54 55 def __init__( 56 self, 57 name: str, 58 sysargs: Union[List[str], str, None] = None, 59 env: Optional[Dict[str, str]] = None, 60 executor_keys: Optional[str] = None, 61 delete_after_completion: bool = False, 62 _properties: Optional[Dict[str, Any]] = None, 63 _rotating_log=None, 64 _stdin_file=None, 65 _status_hook: Optional[Callable[[], str]] = None, 66 _result_hook: Optional[Callable[[], SuccessTuple]] = None, 67 _externally_managed: bool = False, 68 ): 69 """ 70 Create a new job to manage a `meerschaum.utils.daemon.Daemon`. 71 72 Parameters 73 ---------- 74 name: str 75 The name of the job to be created. 76 This will also be used as the Daemon ID. 77 78 sysargs: Union[List[str], str, None], default None 79 The sysargs of the command to be executed, e.g. 'start api'. 80 81 env: Optional[Dict[str, str]], default None 82 If provided, set these environment variables in the job's process. 83 84 executor_keys: Optional[str], default None 85 If provided, execute the job remotely on an API instance, e.g. 'api:main'. 86 87 delete_after_completion: bool, default False 88 If `True`, delete this job when it has finished executing. 89 90 _properties: Optional[Dict[str, Any]], default None 91 If provided, use this to patch the daemon's properties. 92 """ 93 from meerschaum.utils.daemon import Daemon 94 for char in BANNED_CHARS: 95 if char in name: 96 raise ValueError(f"Invalid name: ({char}) is not allowed.") 97 98 if isinstance(sysargs, str): 99 sysargs = shlex.split(sysargs) 100 101 and_key = STATIC_CONFIG['system']['arguments']['and_key'] 102 escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key'] 103 if sysargs: 104 sysargs = [ 105 (arg if arg != escaped_and_key else and_key) 106 for arg in sysargs 107 ] 108 109 ### NOTE: 'local' and 'systemd' executors are being coalesced. 110 if executor_keys is None: 111 from meerschaum.jobs import get_executor_keys_from_context 112 executor_keys = get_executor_keys_from_context() 113 114 self.executor_keys = executor_keys 115 self.name = name 116 try: 117 self._daemon = ( 118 Daemon(daemon_id=name) 119 if executor_keys == 'local' 120 else None 121 ) 122 except Exception: 123 self._daemon = None 124 125 ### Handle any injected dependencies. 126 if _rotating_log is not None: 127 self._rotating_log = _rotating_log 128 if self._daemon is not None: 129 self._daemon._rotating_log = _rotating_log 130 131 if _stdin_file is not None: 132 self._stdin_file = _stdin_file 133 if self._daemon is not None: 134 self._daemon._stdin_file = _stdin_file 135 self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path 136 137 if _status_hook is not None: 138 self._status_hook = _status_hook 139 140 if _result_hook is not None: 141 self._result_hook = _result_hook 142 143 self._externally_managed = _externally_managed 144 self._properties_patch = _properties or {} 145 if _externally_managed: 146 self._properties_patch.update({'externally_managed': _externally_managed}) 147 148 if env: 149 self._properties_patch.update({'env': env}) 150 151 if delete_after_completion: 152 self._properties_patch.update({'delete_after_completion': delete_after_completion}) 153 154 daemon_sysargs = ( 155 self._daemon.properties.get('target', {}).get('args', [None])[0] 156 if self._daemon is not None 157 else None 158 ) 159 160 if daemon_sysargs and sysargs and daemon_sysargs != sysargs: 161 warn("Given sysargs differ from existing sysargs.") 162 163 self._sysargs = [ 164 arg 165 for arg in (daemon_sysargs or sysargs or []) 166 if arg not in ('-d', '--daemon') 167 ] 168 for restart_flag in RESTART_FLAGS: 169 if restart_flag in self._sysargs: 170 self._properties_patch.update({'restart': True}) 171 break 172 173 @staticmethod 174 def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job: 175 """ 176 Build a `Job` from the PID of a running Meerschaum process. 177 178 Parameters 179 ---------- 180 pid: int 181 The PID of the process. 182 183 executor_keys: Optional[str], default None 184 The executor keys to assign to the job. 185 """ 186 from meerschaum.config.paths import DAEMON_RESOURCES_PATH 187 188 psutil = mrsm.attempt_import('psutil') 189 try: 190 process = psutil.Process(pid) 191 except psutil.NoSuchProcess as e: 192 warn(f"Process with PID {pid} does not exist.", stack=False) 193 raise e 194 195 command_args = process.cmdline() 196 is_daemon = command_args[1] == '-c' 197 198 if is_daemon: 199 daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '') 200 root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None) 201 if root_dir is None: 202 from meerschaum.config.paths import ROOT_DIR_PATH 203 root_dir = ROOT_DIR_PATH 204 jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name 205 daemon_dir = jobs_dir / daemon_id 206 pid_file = daemon_dir / 'process.pid' 207 208 if pid_file.exists(): 209 with open(pid_file, 'r', encoding='utf-8') as f: 210 daemon_pid = int(f.read()) 211 212 if pid != daemon_pid: 213 raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}") 214 else: 215 raise EnvironmentError(f"Is job '{daemon_id}' running?") 216 217 return Job(daemon_id, executor_keys=executor_keys) 218 219 from meerschaum._internal.arguments._parse_arguments import parse_arguments 220 from meerschaum.utils.daemon import get_new_daemon_name 221 222 mrsm_ix = 0 223 for i, arg in enumerate(command_args): 224 if 'mrsm' in arg or 'meerschaum' in arg.lower(): 225 mrsm_ix = i 226 break 227 228 sysargs = command_args[mrsm_ix+1:] 229 kwargs = parse_arguments(sysargs) 230 name = kwargs.get('name', get_new_daemon_name()) 231 return Job(name, sysargs, executor_keys=executor_keys) 232 233 def start(self, debug: bool = False) -> SuccessTuple: 234 """ 235 Start the job's daemon. 236 """ 237 if self.executor is not None: 238 if not self.exists(debug=debug): 239 return self.executor.create_job( 240 self.name, 241 self.sysargs, 242 properties=self.daemon.properties, 243 debug=debug, 244 ) 245 return self.executor.start_job(self.name, debug=debug) 246 247 if self.is_running(): 248 return True, f"{self} is already running." 249 250 success, msg = self.daemon.run( 251 keep_daemon_output=(not self.delete_after_completion), 252 allow_dirty_run=True, 253 ) 254 if not success: 255 return success, msg 256 257 return success, f"Started {self}." 258 259 def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple: 260 """ 261 Stop the job's daemon. 262 """ 263 if self.executor is not None: 264 return self.executor.stop_job(self.name, debug=debug) 265 266 if self.daemon.status == 'stopped': 267 if not self.restart: 268 return True, f"{self} is not running." 269 elif self.stop_time is not None: 270 return True, f"{self} will not restart until manually started." 271 272 quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds) 273 if quit_success: 274 return quit_success, f"Stopped {self}." 275 276 warn( 277 f"Failed to gracefully quit {self}.", 278 stack=False, 279 ) 280 kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds) 281 if not kill_success: 282 return kill_success, kill_msg 283 284 return kill_success, f"Killed {self}." 285 286 def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple: 287 """ 288 Pause the job's daemon. 289 """ 290 if self.executor is not None: 291 return self.executor.pause_job(self.name, debug=debug) 292 293 pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds) 294 if not pause_success: 295 return pause_success, pause_msg 296 297 return pause_success, f"Paused {self}." 298 299 def delete(self, debug: bool = False) -> SuccessTuple: 300 """ 301 Delete the job and its daemon. 302 """ 303 if self.executor is not None: 304 return self.executor.delete_job(self.name, debug=debug) 305 306 if self.is_running(): 307 stop_success, stop_msg = self.stop() 308 if not stop_success: 309 return stop_success, stop_msg 310 311 cleanup_success, cleanup_msg = self.daemon.cleanup() 312 if not cleanup_success: 313 return cleanup_success, cleanup_msg 314 315 return cleanup_success, f"Deleted {self}." 316 317 def is_running(self) -> bool: 318 """ 319 Determine whether the job's daemon is running. 320 """ 321 return self.status == 'running' 322 323 def exists(self, debug: bool = False) -> bool: 324 """ 325 Determine whether the job exists. 326 """ 327 if self.executor is not None: 328 return self.executor.get_job_exists(self.name, debug=debug) 329 330 return self.daemon.path.exists() 331 332 def get_logs(self) -> Union[str, None]: 333 """ 334 Return the output text of the job's daemon. 335 """ 336 if self.executor is not None: 337 return self.executor.get_logs(self.name) 338 339 return self.daemon.log_text 340 341 def monitor_logs( 342 self, 343 callback_function: Callable[[str], None] = partial(print, end=''), 344 input_callback_function: Optional[Callable[[], str]] = None, 345 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 346 stop_event: Optional[asyncio.Event] = None, 347 stop_on_exit: bool = False, 348 strip_timestamps: bool = False, 349 accept_input: bool = True, 350 debug: bool = False, 351 ): 352 """ 353 Monitor the job's log files and execute a callback on new lines. 354 355 Parameters 356 ---------- 357 callback_function: Callable[[str], None], default partial(print, end='') 358 The callback to execute as new data comes in. 359 Defaults to printing the output directly to `stdout`. 360 361 input_callback_function: Optional[Callable[[], str]], default None 362 If provided, execute this callback when the daemon is blocking on stdin. 363 Defaults to `sys.stdin.readline()`. 364 365 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 366 If provided, execute this callback when the daemon stops. 367 The job's SuccessTuple will be passed to the callback. 368 369 stop_event: Optional[asyncio.Event], default None 370 If provided, stop monitoring when this event is set. 371 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 372 from within `callback_function` to stop monitoring. 373 374 stop_on_exit: bool, default False 375 If `True`, stop monitoring when the job stops. 376 377 strip_timestamps: bool, default False 378 If `True`, remove leading timestamps from lines. 379 380 accept_input: bool, default True 381 If `True`, accept input when the daemon blocks on stdin. 382 """ 383 def default_input_callback_function(): 384 return sys.stdin.readline() 385 386 if input_callback_function is None: 387 input_callback_function = default_input_callback_function 388 389 if self.executor is not None: 390 self.executor.monitor_logs( 391 self.name, 392 callback_function, 393 input_callback_function=input_callback_function, 394 stop_callback_function=stop_callback_function, 395 stop_on_exit=stop_on_exit, 396 accept_input=accept_input, 397 strip_timestamps=strip_timestamps, 398 debug=debug, 399 ) 400 return 401 402 monitor_logs_coroutine = self.monitor_logs_async( 403 callback_function=callback_function, 404 input_callback_function=input_callback_function, 405 stop_callback_function=stop_callback_function, 406 stop_event=stop_event, 407 stop_on_exit=stop_on_exit, 408 strip_timestamps=strip_timestamps, 409 accept_input=accept_input, 410 ) 411 return asyncio.run(monitor_logs_coroutine) 412 413 async def monitor_logs_async( 414 self, 415 callback_function: Callable[[str], None] = partial(print, end='', flush=True), 416 input_callback_function: Optional[Callable[[], str]] = None, 417 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 418 stop_event: Optional[asyncio.Event] = None, 419 stop_on_exit: bool = False, 420 strip_timestamps: bool = False, 421 accept_input: bool = True, 422 _logs_path: Optional[pathlib.Path] = None, 423 _log=None, 424 _stdin_file=None, 425 debug: bool = False, 426 ): 427 """ 428 Monitor the job's log files and await a callback on new lines. 429 430 Parameters 431 ---------- 432 callback_function: Callable[[str], None], default partial(print, end='') 433 The callback to execute as new data comes in. 434 Defaults to printing the output directly to `stdout`. 435 436 input_callback_function: Optional[Callable[[], str]], default None 437 If provided, execute this callback when the daemon is blocking on stdin. 438 Defaults to `sys.stdin.readline()`. 439 440 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 441 If provided, execute this callback when the daemon stops. 442 The job's SuccessTuple will be passed to the callback. 443 444 stop_event: Optional[asyncio.Event], default None 445 If provided, stop monitoring when this event is set. 446 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 447 from within `callback_function` to stop monitoring. 448 449 stop_on_exit: bool, default False 450 If `True`, stop monitoring when the job stops. 451 452 strip_timestamps: bool, default False 453 If `True`, remove leading timestamps from lines. 454 455 accept_input: bool, default True 456 If `True`, accept input when the daemon blocks on stdin. 457 """ 458 def default_input_callback_function(): 459 return sys.stdin.readline() 460 461 if input_callback_function is None: 462 input_callback_function = default_input_callback_function 463 464 if self.executor is not None: 465 await self.executor.monitor_logs_async( 466 self.name, 467 callback_function, 468 input_callback_function=input_callback_function, 469 stop_callback_function=stop_callback_function, 470 stop_on_exit=stop_on_exit, 471 strip_timestamps=strip_timestamps, 472 accept_input=accept_input, 473 debug=debug, 474 ) 475 return 476 477 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 478 479 events = { 480 'user': stop_event, 481 'stopped': asyncio.Event(), 482 } 483 combined_event = asyncio.Event() 484 emitted_text = False 485 stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file 486 487 async def check_job_status(): 488 nonlocal emitted_text 489 stopped_event = events.get('stopped', None) 490 if stopped_event is None: 491 return 492 493 sleep_time = 0.1 494 while sleep_time < 60: 495 if self.status == 'stopped': 496 if not emitted_text: 497 await asyncio.sleep(sleep_time) 498 sleep_time = round(sleep_time * 1.1, 2) 499 continue 500 501 if stop_callback_function is not None: 502 try: 503 if asyncio.iscoroutinefunction(stop_callback_function): 504 await stop_callback_function(self.result) 505 else: 506 stop_callback_function(self.result) 507 except asyncio.exceptions.CancelledError: 508 break 509 except Exception: 510 warn(traceback.format_exc()) 511 512 if stop_on_exit: 513 events['stopped'].set() 514 515 break 516 await asyncio.sleep(0.1) 517 518 async def check_blocking_on_input(): 519 while True: 520 if not emitted_text or not self.is_blocking_on_stdin(): 521 try: 522 await asyncio.sleep(0.1) 523 except asyncio.exceptions.CancelledError: 524 break 525 continue 526 527 if not self.is_running(): 528 break 529 530 await emit_latest_lines() 531 532 try: 533 print('', end='', flush=True) 534 if asyncio.iscoroutinefunction(input_callback_function): 535 data = await input_callback_function() 536 else: 537 data = input_callback_function() 538 except KeyboardInterrupt: 539 break 540 if not data.endswith('\n'): 541 data += '\n' 542 543 stdin_file.write(data) 544 await asyncio.sleep(0.1) 545 546 async def combine_events(): 547 event_tasks = [ 548 asyncio.create_task(event.wait()) 549 for event in events.values() 550 if event is not None 551 ] 552 if not event_tasks: 553 return 554 555 try: 556 done, pending = await asyncio.wait( 557 event_tasks, 558 return_when=asyncio.FIRST_COMPLETED, 559 ) 560 for task in pending: 561 task.cancel() 562 except asyncio.exceptions.CancelledError: 563 pass 564 finally: 565 combined_event.set() 566 567 check_job_status_task = asyncio.create_task(check_job_status()) 568 check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input()) 569 combine_events_task = asyncio.create_task(combine_events()) 570 571 log = _log if _log is not None else self.daemon.rotating_log 572 lines_to_show = get_config('jobs', 'logs', 'lines_to_show') 573 574 async def emit_latest_lines(): 575 nonlocal emitted_text 576 lines = log.readlines() 577 for line in lines[(-1 * lines_to_show):]: 578 if stop_event is not None and stop_event.is_set(): 579 return 580 581 if strip_timestamps: 582 line = strip_timestamp_from_line(line) 583 584 try: 585 if asyncio.iscoroutinefunction(callback_function): 586 await callback_function(line) 587 else: 588 callback_function(line) 589 emitted_text = True 590 except StopMonitoringLogs: 591 return 592 except Exception: 593 warn(f"Error in logs callback:\n{traceback.format_exc()}") 594 595 await emit_latest_lines() 596 597 tasks = ( 598 [check_job_status_task] 599 + ([check_blocking_on_input_task] if accept_input else []) 600 + [combine_events_task] 601 ) 602 try: 603 _ = asyncio.gather(*tasks, return_exceptions=True) 604 except asyncio.exceptions.CancelledError: 605 raise 606 except Exception: 607 warn(f"Failed to run async checks:\n{traceback.format_exc()}") 608 609 watchfiles = mrsm.attempt_import('watchfiles') 610 async for changes in watchfiles.awatch( 611 _logs_path or LOGS_RESOURCES_PATH, 612 stop_event=combined_event, 613 ): 614 for change in changes: 615 file_path_str = change[1] 616 file_path = pathlib.Path(file_path_str) 617 latest_subfile_path = log.get_latest_subfile_path() 618 if latest_subfile_path != file_path: 619 continue 620 621 await emit_latest_lines() 622 623 await emit_latest_lines() 624 625 def is_blocking_on_stdin(self, debug: bool = False) -> bool: 626 """ 627 Return whether a job's daemon is blocking on stdin. 628 """ 629 if self.executor is not None: 630 return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug) 631 632 return self.is_running() and self.daemon.blocking_stdin_file_path.exists() 633 634 def write_stdin(self, data): 635 """ 636 Write to a job's daemon's `stdin`. 637 """ 638 self.daemon.stdin_file.write(data) 639 640 @property 641 def executor(self) -> Union[Executor, None]: 642 """ 643 If the job is remote, return the connector to the remote API instance. 644 """ 645 return ( 646 mrsm.get_connector(self.executor_keys) 647 if self.executor_keys != 'local' 648 else None 649 ) 650 651 @property 652 def status(self) -> str: 653 """ 654 Return the running status of the job's daemon. 655 """ 656 if '_status_hook' in self.__dict__: 657 return self._status_hook() 658 659 if self.executor is not None: 660 return self.executor.get_job_status(self.name) 661 662 return self.daemon.status 663 664 @property 665 def pid(self) -> Union[int, None]: 666 """ 667 Return the PID of the job's dameon. 668 """ 669 if self.executor is not None: 670 return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None) 671 672 return self.daemon.pid 673 674 @property 675 def restart(self) -> bool: 676 """ 677 Return whether to restart a stopped job. 678 """ 679 if self.executor is not None: 680 return self.executor.get_job_metadata(self.name).get('restart', False) 681 682 return self.daemon.properties.get('restart', False) 683 684 @property 685 def result(self) -> SuccessTuple: 686 """ 687 Return the `SuccessTuple` when the job has terminated. 688 """ 689 if self.is_running(): 690 return True, f"{self} is running." 691 692 if '_result_hook' in self.__dict__: 693 return self._result_hook() 694 695 if self.executor is not None: 696 return ( 697 self.executor.get_job_metadata(self.name) 698 .get('result', (False, "No result available.")) 699 ) 700 701 _result = self.daemon.properties.get('result', None) 702 if _result is None: 703 return False, "No result available." 704 705 return tuple(_result) 706 707 @property 708 def sysargs(self) -> List[str]: 709 """ 710 Return the sysargs to use for the Daemon. 711 """ 712 if self._sysargs: 713 return self._sysargs 714 715 if self.executor is not None: 716 return self.executor.get_job_metadata(self.name).get('sysargs', []) 717 718 target_args = self.daemon.target_args 719 if target_args is None: 720 return [] 721 self._sysargs = target_args[0] if len(target_args) > 0 else [] 722 return self._sysargs 723 724 @property 725 def daemon(self) -> 'Daemon': 726 """ 727 Return the daemon which this job manages. 728 """ 729 from meerschaum.utils.daemon import Daemon 730 if self._daemon is not None and self.executor is None and self._sysargs: 731 return self._daemon 732 733 remote_properties = ( 734 {} 735 if self.executor is None 736 else self.executor.get_job_properties(self.name) 737 ) 738 properties = {**remote_properties, **self._properties_patch} 739 740 self._daemon = Daemon( 741 target=entry, 742 target_args=[self._sysargs], 743 target_kw={}, 744 daemon_id=self.name, 745 label=shlex.join(self._sysargs), 746 properties=properties, 747 ) 748 if '_rotating_log' in self.__dict__: 749 self._daemon._rotating_log = self._rotating_log 750 751 if '_stdin_file' in self.__dict__: 752 self._daemon._stdin_file = self._stdin_file 753 self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path 754 755 return self._daemon 756 757 @property 758 def began(self) -> Union[datetime, None]: 759 """ 760 The datetime when the job began running. 761 """ 762 if self.executor is not None: 763 began_str = self.executor.get_job_began(self.name) 764 if began_str is None: 765 return None 766 return ( 767 datetime.fromisoformat(began_str) 768 .astimezone(timezone.utc) 769 .replace(tzinfo=None) 770 ) 771 772 began_str = self.daemon.properties.get('process', {}).get('began', None) 773 if began_str is None: 774 return None 775 776 return datetime.fromisoformat(began_str) 777 778 @property 779 def ended(self) -> Union[datetime, None]: 780 """ 781 The datetime when the job stopped running. 782 """ 783 if self.executor is not None: 784 ended_str = self.executor.get_job_ended(self.name) 785 if ended_str is None: 786 return None 787 return ( 788 datetime.fromisoformat(ended_str) 789 .astimezone(timezone.utc) 790 .replace(tzinfo=None) 791 ) 792 793 ended_str = self.daemon.properties.get('process', {}).get('ended', None) 794 if ended_str is None: 795 return None 796 797 return datetime.fromisoformat(ended_str) 798 799 @property 800 def paused(self) -> Union[datetime, None]: 801 """ 802 The datetime when the job was suspended while running. 803 """ 804 if self.executor is not None: 805 paused_str = self.executor.get_job_paused(self.name) 806 if paused_str is None: 807 return None 808 return ( 809 datetime.fromisoformat(paused_str) 810 .astimezone(timezone.utc) 811 .replace(tzinfo=None) 812 ) 813 814 paused_str = self.daemon.properties.get('process', {}).get('paused', None) 815 if paused_str is None: 816 return None 817 818 return datetime.fromisoformat(paused_str) 819 820 @property 821 def stop_time(self) -> Union[datetime, None]: 822 """ 823 Return the timestamp when the job was manually stopped. 824 """ 825 if self.executor is not None: 826 return self.executor.get_job_stop_time(self.name) 827 828 if not self.daemon.stop_path.exists(): 829 return None 830 831 stop_data = self.daemon._read_stop_file() 832 if not stop_data: 833 return None 834 835 stop_time_str = stop_data.get('stop_time', None) 836 if not stop_time_str: 837 warn(f"Could not read stop time for {self}.") 838 return None 839 840 return datetime.fromisoformat(stop_time_str) 841 842 @property 843 def hidden(self) -> bool: 844 """ 845 Return a bool indicating whether this job should be displayed. 846 """ 847 return ( 848 self.name.startswith('_') 849 or self.name.startswith('.') 850 or self._is_externally_managed 851 ) 852 853 def check_restart(self) -> SuccessTuple: 854 """ 855 If `restart` is `True` and the daemon is not running, 856 restart the job. 857 Do not restart if the job was manually stopped. 858 """ 859 if self.is_running(): 860 return True, f"{self} is running." 861 862 if not self.restart: 863 return True, f"{self} does not need to be restarted." 864 865 if self.stop_time is not None: 866 return True, f"{self} was manually stopped." 867 868 return self.start() 869 870 @property 871 def label(self) -> str: 872 """ 873 Return the job's Daemon label (joined sysargs). 874 """ 875 from meerschaum._internal.arguments import compress_pipeline_sysargs 876 sysargs = compress_pipeline_sysargs(self.sysargs) 877 return shlex.join(sysargs).replace(' + ', '\n+ ') 878 879 @property 880 def _externally_managed_file(self) -> pathlib.Path: 881 """ 882 Return the path to the externally managed file. 883 """ 884 return self.daemon.path / '.externally-managed' 885 886 def _set_externally_managed(self): 887 """ 888 Set this job as externally managed. 889 """ 890 self._externally_managed = True 891 try: 892 self._externally_managed_file.parent.mkdir(exist_ok=True, parents=True) 893 self._externally_managed_file.touch() 894 except Exception as e: 895 warn(e) 896 897 @property 898 def _is_externally_managed(self) -> bool: 899 """ 900 Return whether this job is externally managed. 901 """ 902 return self.executor_keys in (None, 'local') and ( 903 self._externally_managed or self._externally_managed_file.exists() 904 ) 905 906 @property 907 def env(self) -> Dict[str, str]: 908 """ 909 Return the environment variables to set for the job's process. 910 """ 911 if '_env' in self.__dict__: 912 return self.__dict__['_env'] 913 914 _env = self.daemon.properties.get('env', {}) 915 default_env = { 916 'PYTHONUNBUFFERED': '1', 917 'LINES': str(get_config('jobs', 'terminal', 'lines')), 918 'COLUMNS': str(get_config('jobs', 'terminal', 'columns')), 919 } 920 self._env = {**default_env, **_env} 921 return self._env 922 923 @property 924 def delete_after_completion(self) -> bool: 925 """ 926 Return whether this job is configured to delete itself after completion. 927 """ 928 if '_delete_after_completion' in self.__dict__: 929 return self.__dict__.get('_delete_after_completion', False) 930 931 self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False) 932 return self._delete_after_completion 933 934 def __str__(self) -> str: 935 sysargs = self.sysargs 936 sysargs_str = shlex.join(sysargs) if sysargs else '' 937 job_str = f'Job("{self.name}"' 938 if sysargs_str: 939 job_str += f', "{sysargs_str}"' 940 941 job_str += ')' 942 return job_str 943 944 def __repr__(self) -> str: 945 return str(self) 946 947 def __hash__(self) -> int: 948 return hash(self.name)
Manage a meerschaum.utils.daemon.Daemon
, locally or remotely via the API.
55 def __init__( 56 self, 57 name: str, 58 sysargs: Union[List[str], str, None] = None, 59 env: Optional[Dict[str, str]] = None, 60 executor_keys: Optional[str] = None, 61 delete_after_completion: bool = False, 62 _properties: Optional[Dict[str, Any]] = None, 63 _rotating_log=None, 64 _stdin_file=None, 65 _status_hook: Optional[Callable[[], str]] = None, 66 _result_hook: Optional[Callable[[], SuccessTuple]] = None, 67 _externally_managed: bool = False, 68 ): 69 """ 70 Create a new job to manage a `meerschaum.utils.daemon.Daemon`. 71 72 Parameters 73 ---------- 74 name: str 75 The name of the job to be created. 76 This will also be used as the Daemon ID. 77 78 sysargs: Union[List[str], str, None], default None 79 The sysargs of the command to be executed, e.g. 'start api'. 80 81 env: Optional[Dict[str, str]], default None 82 If provided, set these environment variables in the job's process. 83 84 executor_keys: Optional[str], default None 85 If provided, execute the job remotely on an API instance, e.g. 'api:main'. 86 87 delete_after_completion: bool, default False 88 If `True`, delete this job when it has finished executing. 89 90 _properties: Optional[Dict[str, Any]], default None 91 If provided, use this to patch the daemon's properties. 92 """ 93 from meerschaum.utils.daemon import Daemon 94 for char in BANNED_CHARS: 95 if char in name: 96 raise ValueError(f"Invalid name: ({char}) is not allowed.") 97 98 if isinstance(sysargs, str): 99 sysargs = shlex.split(sysargs) 100 101 and_key = STATIC_CONFIG['system']['arguments']['and_key'] 102 escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key'] 103 if sysargs: 104 sysargs = [ 105 (arg if arg != escaped_and_key else and_key) 106 for arg in sysargs 107 ] 108 109 ### NOTE: 'local' and 'systemd' executors are being coalesced. 110 if executor_keys is None: 111 from meerschaum.jobs import get_executor_keys_from_context 112 executor_keys = get_executor_keys_from_context() 113 114 self.executor_keys = executor_keys 115 self.name = name 116 try: 117 self._daemon = ( 118 Daemon(daemon_id=name) 119 if executor_keys == 'local' 120 else None 121 ) 122 except Exception: 123 self._daemon = None 124 125 ### Handle any injected dependencies. 126 if _rotating_log is not None: 127 self._rotating_log = _rotating_log 128 if self._daemon is not None: 129 self._daemon._rotating_log = _rotating_log 130 131 if _stdin_file is not None: 132 self._stdin_file = _stdin_file 133 if self._daemon is not None: 134 self._daemon._stdin_file = _stdin_file 135 self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path 136 137 if _status_hook is not None: 138 self._status_hook = _status_hook 139 140 if _result_hook is not None: 141 self._result_hook = _result_hook 142 143 self._externally_managed = _externally_managed 144 self._properties_patch = _properties or {} 145 if _externally_managed: 146 self._properties_patch.update({'externally_managed': _externally_managed}) 147 148 if env: 149 self._properties_patch.update({'env': env}) 150 151 if delete_after_completion: 152 self._properties_patch.update({'delete_after_completion': delete_after_completion}) 153 154 daemon_sysargs = ( 155 self._daemon.properties.get('target', {}).get('args', [None])[0] 156 if self._daemon is not None 157 else None 158 ) 159 160 if daemon_sysargs and sysargs and daemon_sysargs != sysargs: 161 warn("Given sysargs differ from existing sysargs.") 162 163 self._sysargs = [ 164 arg 165 for arg in (daemon_sysargs or sysargs or []) 166 if arg not in ('-d', '--daemon') 167 ] 168 for restart_flag in RESTART_FLAGS: 169 if restart_flag in self._sysargs: 170 self._properties_patch.update({'restart': True}) 171 break
Create a new job to manage a meerschaum.utils.daemon.Daemon
.
Parameters
- name (str): The name of the job to be created. This will also be used as the Daemon ID.
- sysargs (Union[List[str], str, None], default None): The sysargs of the command to be executed, e.g. 'start api'.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the job's process.
- executor_keys (Optional[str], default None): If provided, execute the job remotely on an API instance, e.g. 'api:main'.
- delete_after_completion (bool, default False):
If
True
, delete this job when it has finished executing. - _properties (Optional[Dict[str, Any]], default None): If provided, use this to patch the daemon's properties.
173 @staticmethod 174 def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job: 175 """ 176 Build a `Job` from the PID of a running Meerschaum process. 177 178 Parameters 179 ---------- 180 pid: int 181 The PID of the process. 182 183 executor_keys: Optional[str], default None 184 The executor keys to assign to the job. 185 """ 186 from meerschaum.config.paths import DAEMON_RESOURCES_PATH 187 188 psutil = mrsm.attempt_import('psutil') 189 try: 190 process = psutil.Process(pid) 191 except psutil.NoSuchProcess as e: 192 warn(f"Process with PID {pid} does not exist.", stack=False) 193 raise e 194 195 command_args = process.cmdline() 196 is_daemon = command_args[1] == '-c' 197 198 if is_daemon: 199 daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '') 200 root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None) 201 if root_dir is None: 202 from meerschaum.config.paths import ROOT_DIR_PATH 203 root_dir = ROOT_DIR_PATH 204 jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name 205 daemon_dir = jobs_dir / daemon_id 206 pid_file = daemon_dir / 'process.pid' 207 208 if pid_file.exists(): 209 with open(pid_file, 'r', encoding='utf-8') as f: 210 daemon_pid = int(f.read()) 211 212 if pid != daemon_pid: 213 raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}") 214 else: 215 raise EnvironmentError(f"Is job '{daemon_id}' running?") 216 217 return Job(daemon_id, executor_keys=executor_keys) 218 219 from meerschaum._internal.arguments._parse_arguments import parse_arguments 220 from meerschaum.utils.daemon import get_new_daemon_name 221 222 mrsm_ix = 0 223 for i, arg in enumerate(command_args): 224 if 'mrsm' in arg or 'meerschaum' in arg.lower(): 225 mrsm_ix = i 226 break 227 228 sysargs = command_args[mrsm_ix+1:] 229 kwargs = parse_arguments(sysargs) 230 name = kwargs.get('name', get_new_daemon_name()) 231 return Job(name, sysargs, executor_keys=executor_keys)
Build a Job
from the PID of a running Meerschaum process.
Parameters
- pid (int): The PID of the process.
- executor_keys (Optional[str], default None): The executor keys to assign to the job.
233 def start(self, debug: bool = False) -> SuccessTuple: 234 """ 235 Start the job's daemon. 236 """ 237 if self.executor is not None: 238 if not self.exists(debug=debug): 239 return self.executor.create_job( 240 self.name, 241 self.sysargs, 242 properties=self.daemon.properties, 243 debug=debug, 244 ) 245 return self.executor.start_job(self.name, debug=debug) 246 247 if self.is_running(): 248 return True, f"{self} is already running." 249 250 success, msg = self.daemon.run( 251 keep_daemon_output=(not self.delete_after_completion), 252 allow_dirty_run=True, 253 ) 254 if not success: 255 return success, msg 256 257 return success, f"Started {self}."
Start the job's daemon.
259 def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple: 260 """ 261 Stop the job's daemon. 262 """ 263 if self.executor is not None: 264 return self.executor.stop_job(self.name, debug=debug) 265 266 if self.daemon.status == 'stopped': 267 if not self.restart: 268 return True, f"{self} is not running." 269 elif self.stop_time is not None: 270 return True, f"{self} will not restart until manually started." 271 272 quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds) 273 if quit_success: 274 return quit_success, f"Stopped {self}." 275 276 warn( 277 f"Failed to gracefully quit {self}.", 278 stack=False, 279 ) 280 kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds) 281 if not kill_success: 282 return kill_success, kill_msg 283 284 return kill_success, f"Killed {self}."
Stop the job's daemon.
286 def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple: 287 """ 288 Pause the job's daemon. 289 """ 290 if self.executor is not None: 291 return self.executor.pause_job(self.name, debug=debug) 292 293 pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds) 294 if not pause_success: 295 return pause_success, pause_msg 296 297 return pause_success, f"Paused {self}."
Pause the job's daemon.
299 def delete(self, debug: bool = False) -> SuccessTuple: 300 """ 301 Delete the job and its daemon. 302 """ 303 if self.executor is not None: 304 return self.executor.delete_job(self.name, debug=debug) 305 306 if self.is_running(): 307 stop_success, stop_msg = self.stop() 308 if not stop_success: 309 return stop_success, stop_msg 310 311 cleanup_success, cleanup_msg = self.daemon.cleanup() 312 if not cleanup_success: 313 return cleanup_success, cleanup_msg 314 315 return cleanup_success, f"Deleted {self}."
Delete the job and its daemon.
317 def is_running(self) -> bool: 318 """ 319 Determine whether the job's daemon is running. 320 """ 321 return self.status == 'running'
Determine whether the job's daemon is running.
323 def exists(self, debug: bool = False) -> bool: 324 """ 325 Determine whether the job exists. 326 """ 327 if self.executor is not None: 328 return self.executor.get_job_exists(self.name, debug=debug) 329 330 return self.daemon.path.exists()
Determine whether the job exists.
332 def get_logs(self) -> Union[str, None]: 333 """ 334 Return the output text of the job's daemon. 335 """ 336 if self.executor is not None: 337 return self.executor.get_logs(self.name) 338 339 return self.daemon.log_text
Return the output text of the job's daemon.
341 def monitor_logs( 342 self, 343 callback_function: Callable[[str], None] = partial(print, end=''), 344 input_callback_function: Optional[Callable[[], str]] = None, 345 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 346 stop_event: Optional[asyncio.Event] = None, 347 stop_on_exit: bool = False, 348 strip_timestamps: bool = False, 349 accept_input: bool = True, 350 debug: bool = False, 351 ): 352 """ 353 Monitor the job's log files and execute a callback on new lines. 354 355 Parameters 356 ---------- 357 callback_function: Callable[[str], None], default partial(print, end='') 358 The callback to execute as new data comes in. 359 Defaults to printing the output directly to `stdout`. 360 361 input_callback_function: Optional[Callable[[], str]], default None 362 If provided, execute this callback when the daemon is blocking on stdin. 363 Defaults to `sys.stdin.readline()`. 364 365 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 366 If provided, execute this callback when the daemon stops. 367 The job's SuccessTuple will be passed to the callback. 368 369 stop_event: Optional[asyncio.Event], default None 370 If provided, stop monitoring when this event is set. 371 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 372 from within `callback_function` to stop monitoring. 373 374 stop_on_exit: bool, default False 375 If `True`, stop monitoring when the job stops. 376 377 strip_timestamps: bool, default False 378 If `True`, remove leading timestamps from lines. 379 380 accept_input: bool, default True 381 If `True`, accept input when the daemon blocks on stdin. 382 """ 383 def default_input_callback_function(): 384 return sys.stdin.readline() 385 386 if input_callback_function is None: 387 input_callback_function = default_input_callback_function 388 389 if self.executor is not None: 390 self.executor.monitor_logs( 391 self.name, 392 callback_function, 393 input_callback_function=input_callback_function, 394 stop_callback_function=stop_callback_function, 395 stop_on_exit=stop_on_exit, 396 accept_input=accept_input, 397 strip_timestamps=strip_timestamps, 398 debug=debug, 399 ) 400 return 401 402 monitor_logs_coroutine = self.monitor_logs_async( 403 callback_function=callback_function, 404 input_callback_function=input_callback_function, 405 stop_callback_function=stop_callback_function, 406 stop_event=stop_event, 407 stop_on_exit=stop_on_exit, 408 strip_timestamps=strip_timestamps, 409 accept_input=accept_input, 410 ) 411 return asyncio.run(monitor_logs_coroutine)
Monitor the job's log files and execute a callback on new lines.
Parameters
- callback_function (Callable[[str], None], default partial(print, end='')):
The callback to execute as new data comes in.
Defaults to printing the output directly to
stdout
. - input_callback_function (Optional[Callable[[], str]], default None):
If provided, execute this callback when the daemon is blocking on stdin.
Defaults to
sys.stdin.readline()
. - stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
- stop_event (Optional[asyncio.Event], default None):
If provided, stop monitoring when this event is set.
You may instead raise
meerschaum.jobs.StopMonitoringLogs
from withincallback_function
to stop monitoring. - stop_on_exit (bool, default False):
If
True
, stop monitoring when the job stops. - strip_timestamps (bool, default False):
If
True
, remove leading timestamps from lines. - accept_input (bool, default True):
If
True
, accept input when the daemon blocks on stdin.
413 async def monitor_logs_async( 414 self, 415 callback_function: Callable[[str], None] = partial(print, end='', flush=True), 416 input_callback_function: Optional[Callable[[], str]] = None, 417 stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None, 418 stop_event: Optional[asyncio.Event] = None, 419 stop_on_exit: bool = False, 420 strip_timestamps: bool = False, 421 accept_input: bool = True, 422 _logs_path: Optional[pathlib.Path] = None, 423 _log=None, 424 _stdin_file=None, 425 debug: bool = False, 426 ): 427 """ 428 Monitor the job's log files and await a callback on new lines. 429 430 Parameters 431 ---------- 432 callback_function: Callable[[str], None], default partial(print, end='') 433 The callback to execute as new data comes in. 434 Defaults to printing the output directly to `stdout`. 435 436 input_callback_function: Optional[Callable[[], str]], default None 437 If provided, execute this callback when the daemon is blocking on stdin. 438 Defaults to `sys.stdin.readline()`. 439 440 stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None 441 If provided, execute this callback when the daemon stops. 442 The job's SuccessTuple will be passed to the callback. 443 444 stop_event: Optional[asyncio.Event], default None 445 If provided, stop monitoring when this event is set. 446 You may instead raise `meerschaum.jobs.StopMonitoringLogs` 447 from within `callback_function` to stop monitoring. 448 449 stop_on_exit: bool, default False 450 If `True`, stop monitoring when the job stops. 451 452 strip_timestamps: bool, default False 453 If `True`, remove leading timestamps from lines. 454 455 accept_input: bool, default True 456 If `True`, accept input when the daemon blocks on stdin. 457 """ 458 def default_input_callback_function(): 459 return sys.stdin.readline() 460 461 if input_callback_function is None: 462 input_callback_function = default_input_callback_function 463 464 if self.executor is not None: 465 await self.executor.monitor_logs_async( 466 self.name, 467 callback_function, 468 input_callback_function=input_callback_function, 469 stop_callback_function=stop_callback_function, 470 stop_on_exit=stop_on_exit, 471 strip_timestamps=strip_timestamps, 472 accept_input=accept_input, 473 debug=debug, 474 ) 475 return 476 477 from meerschaum.utils.formatting._jobs import strip_timestamp_from_line 478 479 events = { 480 'user': stop_event, 481 'stopped': asyncio.Event(), 482 } 483 combined_event = asyncio.Event() 484 emitted_text = False 485 stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file 486 487 async def check_job_status(): 488 nonlocal emitted_text 489 stopped_event = events.get('stopped', None) 490 if stopped_event is None: 491 return 492 493 sleep_time = 0.1 494 while sleep_time < 60: 495 if self.status == 'stopped': 496 if not emitted_text: 497 await asyncio.sleep(sleep_time) 498 sleep_time = round(sleep_time * 1.1, 2) 499 continue 500 501 if stop_callback_function is not None: 502 try: 503 if asyncio.iscoroutinefunction(stop_callback_function): 504 await stop_callback_function(self.result) 505 else: 506 stop_callback_function(self.result) 507 except asyncio.exceptions.CancelledError: 508 break 509 except Exception: 510 warn(traceback.format_exc()) 511 512 if stop_on_exit: 513 events['stopped'].set() 514 515 break 516 await asyncio.sleep(0.1) 517 518 async def check_blocking_on_input(): 519 while True: 520 if not emitted_text or not self.is_blocking_on_stdin(): 521 try: 522 await asyncio.sleep(0.1) 523 except asyncio.exceptions.CancelledError: 524 break 525 continue 526 527 if not self.is_running(): 528 break 529 530 await emit_latest_lines() 531 532 try: 533 print('', end='', flush=True) 534 if asyncio.iscoroutinefunction(input_callback_function): 535 data = await input_callback_function() 536 else: 537 data = input_callback_function() 538 except KeyboardInterrupt: 539 break 540 if not data.endswith('\n'): 541 data += '\n' 542 543 stdin_file.write(data) 544 await asyncio.sleep(0.1) 545 546 async def combine_events(): 547 event_tasks = [ 548 asyncio.create_task(event.wait()) 549 for event in events.values() 550 if event is not None 551 ] 552 if not event_tasks: 553 return 554 555 try: 556 done, pending = await asyncio.wait( 557 event_tasks, 558 return_when=asyncio.FIRST_COMPLETED, 559 ) 560 for task in pending: 561 task.cancel() 562 except asyncio.exceptions.CancelledError: 563 pass 564 finally: 565 combined_event.set() 566 567 check_job_status_task = asyncio.create_task(check_job_status()) 568 check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input()) 569 combine_events_task = asyncio.create_task(combine_events()) 570 571 log = _log if _log is not None else self.daemon.rotating_log 572 lines_to_show = get_config('jobs', 'logs', 'lines_to_show') 573 574 async def emit_latest_lines(): 575 nonlocal emitted_text 576 lines = log.readlines() 577 for line in lines[(-1 * lines_to_show):]: 578 if stop_event is not None and stop_event.is_set(): 579 return 580 581 if strip_timestamps: 582 line = strip_timestamp_from_line(line) 583 584 try: 585 if asyncio.iscoroutinefunction(callback_function): 586 await callback_function(line) 587 else: 588 callback_function(line) 589 emitted_text = True 590 except StopMonitoringLogs: 591 return 592 except Exception: 593 warn(f"Error in logs callback:\n{traceback.format_exc()}") 594 595 await emit_latest_lines() 596 597 tasks = ( 598 [check_job_status_task] 599 + ([check_blocking_on_input_task] if accept_input else []) 600 + [combine_events_task] 601 ) 602 try: 603 _ = asyncio.gather(*tasks, return_exceptions=True) 604 except asyncio.exceptions.CancelledError: 605 raise 606 except Exception: 607 warn(f"Failed to run async checks:\n{traceback.format_exc()}") 608 609 watchfiles = mrsm.attempt_import('watchfiles') 610 async for changes in watchfiles.awatch( 611 _logs_path or LOGS_RESOURCES_PATH, 612 stop_event=combined_event, 613 ): 614 for change in changes: 615 file_path_str = change[1] 616 file_path = pathlib.Path(file_path_str) 617 latest_subfile_path = log.get_latest_subfile_path() 618 if latest_subfile_path != file_path: 619 continue 620 621 await emit_latest_lines() 622 623 await emit_latest_lines()
Monitor the job's log files and await a callback on new lines.
Parameters
- callback_function (Callable[[str], None], default partial(print, end='')):
The callback to execute as new data comes in.
Defaults to printing the output directly to
stdout
. - input_callback_function (Optional[Callable[[], str]], default None):
If provided, execute this callback when the daemon is blocking on stdin.
Defaults to
sys.stdin.readline()
. - stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
- stop_event (Optional[asyncio.Event], default None):
If provided, stop monitoring when this event is set.
You may instead raise
meerschaum.jobs.StopMonitoringLogs
from withincallback_function
to stop monitoring. - stop_on_exit (bool, default False):
If
True
, stop monitoring when the job stops. - strip_timestamps (bool, default False):
If
True
, remove leading timestamps from lines. - accept_input (bool, default True):
If
True
, accept input when the daemon blocks on stdin.
625 def is_blocking_on_stdin(self, debug: bool = False) -> bool: 626 """ 627 Return whether a job's daemon is blocking on stdin. 628 """ 629 if self.executor is not None: 630 return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug) 631 632 return self.is_running() and self.daemon.blocking_stdin_file_path.exists()
Return whether a job's daemon is blocking on stdin.
634 def write_stdin(self, data): 635 """ 636 Write to a job's daemon's `stdin`. 637 """ 638 self.daemon.stdin_file.write(data)
Write to a job's daemon's stdin
.
640 @property 641 def executor(self) -> Union[Executor, None]: 642 """ 643 If the job is remote, return the connector to the remote API instance. 644 """ 645 return ( 646 mrsm.get_connector(self.executor_keys) 647 if self.executor_keys != 'local' 648 else None 649 )
If the job is remote, return the connector to the remote API instance.
651 @property 652 def status(self) -> str: 653 """ 654 Return the running status of the job's daemon. 655 """ 656 if '_status_hook' in self.__dict__: 657 return self._status_hook() 658 659 if self.executor is not None: 660 return self.executor.get_job_status(self.name) 661 662 return self.daemon.status
Return the running status of the job's daemon.
664 @property 665 def pid(self) -> Union[int, None]: 666 """ 667 Return the PID of the job's dameon. 668 """ 669 if self.executor is not None: 670 return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None) 671 672 return self.daemon.pid
Return the PID of the job's dameon.
674 @property 675 def restart(self) -> bool: 676 """ 677 Return whether to restart a stopped job. 678 """ 679 if self.executor is not None: 680 return self.executor.get_job_metadata(self.name).get('restart', False) 681 682 return self.daemon.properties.get('restart', False)
Return whether to restart a stopped job.
684 @property 685 def result(self) -> SuccessTuple: 686 """ 687 Return the `SuccessTuple` when the job has terminated. 688 """ 689 if self.is_running(): 690 return True, f"{self} is running." 691 692 if '_result_hook' in self.__dict__: 693 return self._result_hook() 694 695 if self.executor is not None: 696 return ( 697 self.executor.get_job_metadata(self.name) 698 .get('result', (False, "No result available.")) 699 ) 700 701 _result = self.daemon.properties.get('result', None) 702 if _result is None: 703 return False, "No result available." 704 705 return tuple(_result)
Return the SuccessTuple
when the job has terminated.
707 @property 708 def sysargs(self) -> List[str]: 709 """ 710 Return the sysargs to use for the Daemon. 711 """ 712 if self._sysargs: 713 return self._sysargs 714 715 if self.executor is not None: 716 return self.executor.get_job_metadata(self.name).get('sysargs', []) 717 718 target_args = self.daemon.target_args 719 if target_args is None: 720 return [] 721 self._sysargs = target_args[0] if len(target_args) > 0 else [] 722 return self._sysargs
Return the sysargs to use for the Daemon.
724 @property 725 def daemon(self) -> 'Daemon': 726 """ 727 Return the daemon which this job manages. 728 """ 729 from meerschaum.utils.daemon import Daemon 730 if self._daemon is not None and self.executor is None and self._sysargs: 731 return self._daemon 732 733 remote_properties = ( 734 {} 735 if self.executor is None 736 else self.executor.get_job_properties(self.name) 737 ) 738 properties = {**remote_properties, **self._properties_patch} 739 740 self._daemon = Daemon( 741 target=entry, 742 target_args=[self._sysargs], 743 target_kw={}, 744 daemon_id=self.name, 745 label=shlex.join(self._sysargs), 746 properties=properties, 747 ) 748 if '_rotating_log' in self.__dict__: 749 self._daemon._rotating_log = self._rotating_log 750 751 if '_stdin_file' in self.__dict__: 752 self._daemon._stdin_file = self._stdin_file 753 self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path 754 755 return self._daemon
Return the daemon which this job manages.
757 @property 758 def began(self) -> Union[datetime, None]: 759 """ 760 The datetime when the job began running. 761 """ 762 if self.executor is not None: 763 began_str = self.executor.get_job_began(self.name) 764 if began_str is None: 765 return None 766 return ( 767 datetime.fromisoformat(began_str) 768 .astimezone(timezone.utc) 769 .replace(tzinfo=None) 770 ) 771 772 began_str = self.daemon.properties.get('process', {}).get('began', None) 773 if began_str is None: 774 return None 775 776 return datetime.fromisoformat(began_str)
The datetime when the job began running.
778 @property 779 def ended(self) -> Union[datetime, None]: 780 """ 781 The datetime when the job stopped running. 782 """ 783 if self.executor is not None: 784 ended_str = self.executor.get_job_ended(self.name) 785 if ended_str is None: 786 return None 787 return ( 788 datetime.fromisoformat(ended_str) 789 .astimezone(timezone.utc) 790 .replace(tzinfo=None) 791 ) 792 793 ended_str = self.daemon.properties.get('process', {}).get('ended', None) 794 if ended_str is None: 795 return None 796 797 return datetime.fromisoformat(ended_str)
The datetime when the job stopped running.
799 @property 800 def paused(self) -> Union[datetime, None]: 801 """ 802 The datetime when the job was suspended while running. 803 """ 804 if self.executor is not None: 805 paused_str = self.executor.get_job_paused(self.name) 806 if paused_str is None: 807 return None 808 return ( 809 datetime.fromisoformat(paused_str) 810 .astimezone(timezone.utc) 811 .replace(tzinfo=None) 812 ) 813 814 paused_str = self.daemon.properties.get('process', {}).get('paused', None) 815 if paused_str is None: 816 return None 817 818 return datetime.fromisoformat(paused_str)
The datetime when the job was suspended while running.
820 @property 821 def stop_time(self) -> Union[datetime, None]: 822 """ 823 Return the timestamp when the job was manually stopped. 824 """ 825 if self.executor is not None: 826 return self.executor.get_job_stop_time(self.name) 827 828 if not self.daemon.stop_path.exists(): 829 return None 830 831 stop_data = self.daemon._read_stop_file() 832 if not stop_data: 833 return None 834 835 stop_time_str = stop_data.get('stop_time', None) 836 if not stop_time_str: 837 warn(f"Could not read stop time for {self}.") 838 return None 839 840 return datetime.fromisoformat(stop_time_str)
Return the timestamp when the job was manually stopped.
853 def check_restart(self) -> SuccessTuple: 854 """ 855 If `restart` is `True` and the daemon is not running, 856 restart the job. 857 Do not restart if the job was manually stopped. 858 """ 859 if self.is_running(): 860 return True, f"{self} is running." 861 862 if not self.restart: 863 return True, f"{self} does not need to be restarted." 864 865 if self.stop_time is not None: 866 return True, f"{self} was manually stopped." 867 868 return self.start()
If restart
is True
and the daemon is not running,
restart the job.
Do not restart if the job was manually stopped.
870 @property 871 def label(self) -> str: 872 """ 873 Return the job's Daemon label (joined sysargs). 874 """ 875 from meerschaum._internal.arguments import compress_pipeline_sysargs 876 sysargs = compress_pipeline_sysargs(self.sysargs) 877 return shlex.join(sysargs).replace(' + ', '\n+ ')
Return the job's Daemon label (joined sysargs).
906 @property 907 def env(self) -> Dict[str, str]: 908 """ 909 Return the environment variables to set for the job's process. 910 """ 911 if '_env' in self.__dict__: 912 return self.__dict__['_env'] 913 914 _env = self.daemon.properties.get('env', {}) 915 default_env = { 916 'PYTHONUNBUFFERED': '1', 917 'LINES': str(get_config('jobs', 'terminal', 'lines')), 918 'COLUMNS': str(get_config('jobs', 'terminal', 'columns')), 919 } 920 self._env = {**default_env, **_env} 921 return self._env
Return the environment variables to set for the job's process.
923 @property 924 def delete_after_completion(self) -> bool: 925 """ 926 Return whether this job is configured to delete itself after completion. 927 """ 928 if '_delete_after_completion' in self.__dict__: 929 return self.__dict__.get('_delete_after_completion', False) 930 931 self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False) 932 return self._delete_after_completion
Return whether this job is configured to delete itself after completion.
44class StopMonitoringLogs(Exception): 45 """ 46 Raise this exception to stop the logs monitoring. 47 """
Raise this exception to stop the logs monitoring.
39def get_jobs( 40 executor_keys: Optional[str] = None, 41 include_hidden: bool = False, 42 combine_local_and_systemd: bool = True, 43 debug: bool = False, 44) -> Dict[str, Job]: 45 """ 46 Return a dictionary of the existing jobs. 47 48 Parameters 49 ---------- 50 executor_keys: Optional[str], default None 51 If provided, return remote jobs on the given API instance. 52 Otherwise return local jobs. 53 54 include_hidden: bool, default False 55 If `True`, include jobs with the `hidden` attribute. 56 57 Returns 58 ------- 59 A dictionary mapping job names to jobs. 60 """ 61 from meerschaum.connectors.parse import parse_executor_keys 62 executor_keys = executor_keys or get_executor_keys_from_context() 63 64 include_local_and_system = ( 65 combine_local_and_systemd 66 and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd') 67 and get_executor_keys_from_context() == 'systemd' 68 ) 69 70 def _get_local_jobs(): 71 from meerschaum.utils.daemon import get_daemons 72 daemons = get_daemons() 73 jobs = { 74 daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local') 75 for daemon in daemons 76 } 77 return { 78 name: job 79 for name, job in jobs.items() 80 if (include_hidden or not job.hidden) and not job._is_externally_managed 81 82 } 83 84 def _get_systemd_jobs(): 85 conn = mrsm.get_connector('systemd') 86 jobs = conn.get_jobs(debug=debug) 87 return { 88 name: job 89 for name, job in jobs.items() 90 if include_hidden or not job.hidden 91 } 92 93 if include_local_and_system: 94 local_jobs = _get_local_jobs() 95 systemd_jobs = _get_systemd_jobs() 96 shared_jobs = set(local_jobs) & set(systemd_jobs) 97 if shared_jobs: 98 from meerschaum.utils.misc import items_str 99 from meerschaum.utils.warnings import warn 100 warn( 101 "Job" 102 + ('s' if len(shared_jobs) != 1 else '') 103 + f" {items_str(list(shared_jobs))} " 104 + "exist" 105 + ('s' if len(shared_jobs) == 1 else '') 106 + " in both `local` and `systemd`.", 107 stack=False, 108 ) 109 return {**local_jobs, **systemd_jobs} 110 111 if executor_keys == 'local': 112 return _get_local_jobs() 113 114 if executor_keys == 'systemd': 115 return _get_systemd_jobs() 116 117 try: 118 _ = parse_executor_keys(executor_keys, construct=False) 119 conn = parse_executor_keys(executor_keys) 120 jobs = conn.get_jobs(debug=debug) 121 return { 122 name: job 123 for name, job in jobs.items() 124 if include_hidden or not job.hidden 125 } 126 except Exception: 127 return {}
Return a dictionary of the existing jobs.
Parameters
- executor_keys (Optional[str], default None): If provided, return remote jobs on the given API instance. Otherwise return local jobs.
- include_hidden (bool, default False):
If
True
, include jobs with thehidden
attribute.
Returns
- A dictionary mapping job names to jobs.
130def get_filtered_jobs( 131 executor_keys: Optional[str] = None, 132 filter_list: Optional[List[str]] = None, 133 include_hidden: bool = False, 134 combine_local_and_systemd: bool = True, 135 warn: bool = False, 136 debug: bool = False, 137) -> Dict[str, Job]: 138 """ 139 Return a list of jobs filtered by the user. 140 """ 141 from meerschaum.utils.warnings import warn as _warn 142 jobs = get_jobs( 143 executor_keys, 144 include_hidden=True, 145 combine_local_and_systemd=combine_local_and_systemd, 146 debug=debug, 147 ) 148 if not filter_list: 149 return { 150 name: job 151 for name, job in jobs.items() 152 if include_hidden or not job.hidden 153 } 154 155 jobs_to_return = {} 156 for name in filter_list: 157 job = jobs.get(name, None) 158 if job is None: 159 if warn: 160 _warn( 161 f"Job '{name}' does not exist.", 162 stack=False, 163 ) 164 continue 165 jobs_to_return[name] = job 166 167 return jobs_to_return
Return a list of jobs filtered by the user.
170def get_restart_jobs( 171 executor_keys: Optional[str] = None, 172 jobs: Optional[Dict[str, Job]] = None, 173 include_hidden: bool = False, 174 combine_local_and_systemd: bool = True, 175 debug: bool = False, 176) -> Dict[str, Job]: 177 """ 178 Return jobs which were created with `--restart` or `--loop`. 179 """ 180 if jobs is None: 181 jobs = get_jobs( 182 executor_keys, 183 include_hidden=include_hidden, 184 combine_local_and_systemd=combine_local_and_systemd, 185 debug=debug, 186 ) 187 188 return { 189 name: job 190 for name, job in jobs.items() 191 if job.restart 192 }
Return jobs which were created with --restart
or --loop
.
195def get_running_jobs( 196 executor_keys: Optional[str] = None, 197 jobs: Optional[Dict[str, Job]] = None, 198 include_hidden: bool = False, 199 combine_local_and_systemd: bool = True, 200 debug: bool = False, 201) -> Dict[str, Job]: 202 """ 203 Return a dictionary of running jobs. 204 """ 205 if jobs is None: 206 jobs = get_jobs( 207 executor_keys, 208 include_hidden=include_hidden, 209 combine_local_and_systemd=combine_local_and_systemd, 210 debug=debug, 211 ) 212 213 return { 214 name: job 215 for name, job in jobs.items() 216 if job.status == 'running' 217 }
Return a dictionary of running jobs.
245def get_stopped_jobs( 246 executor_keys: Optional[str] = None, 247 jobs: Optional[Dict[str, Job]] = None, 248 include_hidden: bool = False, 249 combine_local_and_systemd: bool = True, 250 debug: bool = False, 251) -> Dict[str, Job]: 252 """ 253 Return a dictionary of stopped jobs. 254 """ 255 if jobs is None: 256 jobs = get_jobs( 257 executor_keys, 258 include_hidden=include_hidden, 259 combine_local_and_systemd=combine_local_and_systemd, 260 debug=debug, 261 ) 262 263 return { 264 name: job 265 for name, job in jobs.items() 266 if job.status == 'stopped' 267 }
Return a dictionary of stopped jobs.
220def get_paused_jobs( 221 executor_keys: Optional[str] = None, 222 jobs: Optional[Dict[str, Job]] = None, 223 include_hidden: bool = False, 224 combine_local_and_systemd: bool = True, 225 debug: bool = False, 226) -> Dict[str, Job]: 227 """ 228 Return a dictionary of paused jobs. 229 """ 230 if jobs is None: 231 jobs = get_jobs( 232 executor_keys, 233 include_hidden=include_hidden, 234 combine_local_and_systemd=combine_local_and_systemd, 235 debug=debug, 236 ) 237 238 return { 239 name: job 240 for name, job in jobs.items() 241 if job.status == 'paused' 242 }
Return a dictionary of paused jobs.
270def make_executor(cls): 271 """ 272 Register a class as an `Executor`. 273 """ 274 import re 275 from meerschaum.connectors import make_connector 276 suffix_regex = r'executor$' 277 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 278 if typ not in executor_types: 279 executor_types.append(typ) 280 return make_connector(cls, _is_executor=True)
Register a class as an Executor
.
19class Executor(Connector): 20 """ 21 Define the methods for managing jobs. 22 """ 23 24 @abstractmethod 25 def get_job_exists(self, name: str, debug: bool = False) -> bool: 26 """ 27 Return whether a job exists. 28 """ 29 30 @abstractmethod 31 def get_jobs(self) -> Dict[str, Job]: 32 """ 33 Return a dictionary of names -> Jobs. 34 """ 35 36 @abstractmethod 37 def create_job( 38 self, 39 name: str, 40 sysargs: List[str], 41 properties: Optional[Dict[str, Any]] = None, 42 debug: bool = False, 43 ) -> SuccessTuple: 44 """ 45 Create a new job. 46 """ 47 48 @abstractmethod 49 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 50 """ 51 Start a job. 52 """ 53 54 @abstractmethod 55 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 56 """ 57 Stop a job. 58 """ 59 60 @abstractmethod 61 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 62 """ 63 Pause a job. 64 """ 65 66 @abstractmethod 67 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 68 """ 69 Delete a job. 70 """ 71 72 @abstractmethod 73 def get_logs(self, name: str, debug: bool = False) -> str: 74 """ 75 Return a job's log output. 76 """
Define the methods for managing jobs.
24 @abstractmethod 25 def get_job_exists(self, name: str, debug: bool = False) -> bool: 26 """ 27 Return whether a job exists. 28 """
Return whether a job exists.
30 @abstractmethod 31 def get_jobs(self) -> Dict[str, Job]: 32 """ 33 Return a dictionary of names -> Jobs. 34 """
Return a dictionary of names -> Jobs.
36 @abstractmethod 37 def create_job( 38 self, 39 name: str, 40 sysargs: List[str], 41 properties: Optional[Dict[str, Any]] = None, 42 debug: bool = False, 43 ) -> SuccessTuple: 44 """ 45 Create a new job. 46 """
Create a new job.
48 @abstractmethod 49 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 50 """ 51 Start a job. 52 """
Start a job.
54 @abstractmethod 55 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 56 """ 57 Stop a job. 58 """
Stop a job.
60 @abstractmethod 61 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 62 """ 63 Pause a job. 64 """
Pause a job.
283def check_restart_jobs( 284 executor_keys: Optional[str] = 'local', 285 jobs: Optional[Dict[str, Job]] = None, 286 include_hidden: bool = True, 287 silent: bool = False, 288 debug: bool = False, 289) -> SuccessTuple: 290 """ 291 Restart any stopped jobs which were created with `--restart`. 292 293 Parameters 294 ---------- 295 executor_keys: Optional[str], default None 296 If provided, check jobs on the given remote API instance. 297 Otherwise check local jobs. 298 299 include_hidden: bool, default True 300 If `True`, include hidden jobs in the check. 301 302 silent: bool, default False 303 If `True`, do not print the restart success message. 304 """ 305 from meerschaum.utils.misc import items_str 306 307 if jobs is None: 308 jobs = get_jobs( 309 executor_keys, 310 include_hidden=include_hidden, 311 combine_local_and_systemd=False, 312 debug=debug, 313 ) 314 315 if not jobs: 316 return True, "No jobs to restart." 317 318 results = {} 319 for name, job in jobs.items(): 320 check_success, check_msg = job.check_restart() 321 results[job.name] = (check_success, check_msg) 322 if not silent: 323 mrsm.pprint((check_success, check_msg)) 324 325 success_names = [name for name, (check_success, check_msg) in results.items() if check_success] 326 fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success] 327 success = len(success_names) == len(jobs) 328 msg = ( 329 ( 330 "Successfully restarted job" 331 + ('s' if len(success_names) != 1 else '') 332 + ' ' + items_str(success_names) + '.' 333 ) 334 if success 335 else ( 336 "Failed to restart job" 337 + ('s' if len(success_names) != 1 else '') 338 + ' ' + items_str(fail_names) + '.' 339 ) 340 ) 341 return success, msg
Restart any stopped jobs which were created with --restart
.
Parameters
- executor_keys (Optional[str], default None): If provided, check jobs on the given remote API instance. Otherwise check local jobs.
- include_hidden (bool, default True):
If
True
, include hidden jobs in the check. - silent (bool, default False):
If
True
, do not print the restart success message.
353def start_check_jobs_thread(): 354 """ 355 Start a thread to regularly monitor jobs. 356 """ 357 import atexit 358 from functools import partial 359 from meerschaum.utils.threading import RepeatTimer 360 from meerschaum.config.static import STATIC_CONFIG 361 362 global _check_loop_stop_thread 363 sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds'] 364 365 _check_loop_stop_thread = RepeatTimer( 366 sleep_seconds, 367 partial( 368 _check_restart_jobs_against_lock, 369 silent=True, 370 ) 371 ) 372 _check_loop_stop_thread.daemon = True 373 atexit.register(stop_check_jobs_thread) 374 375 _check_loop_stop_thread.start() 376 return _check_loop_stop_thread
Start a thread to regularly monitor jobs.
379def stop_check_jobs_thread(): 380 """ 381 Stop the job monitoring thread. 382 """ 383 from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH 384 from meerschaum.utils.warnings import warn 385 if _check_loop_stop_thread is None: 386 return 387 388 _check_loop_stop_thread.cancel() 389 390 try: 391 if CHECK_JOBS_LOCK_PATH.exists(): 392 CHECK_JOBS_LOCK_PATH.unlink() 393 except Exception as e: 394 warn(f"Failed to remove check jobs lock file:\n{e}")
Stop the job monitoring thread.