meerschaum.jobs

Higher-level utilities for managing meerschaum.utils.daemon.Daemon.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Higher-level utilities for managing `meerschaum.utils.daemon.Daemon`.
  7"""
  8
  9import meerschaum as mrsm
 10from meerschaum.utils.typing import Dict, Optional, List, SuccessTuple
 11
 12from meerschaum.jobs._Job import Job, StopMonitoringLogs
 13from meerschaum.jobs._Executor import Executor
 14
 15__all__ = (
 16    'Job',
 17    'StopMonitoringLogs',
 18    'systemd',
 19    'get_jobs',
 20    'get_filtered_jobs',
 21    'get_restart_jobs',
 22    'get_running_jobs',
 23    'get_stopped_jobs',
 24    'get_paused_jobs',
 25    'get_restart_jobs',
 26    'make_executor',
 27    'Executor',
 28    'check_restart_jobs',
 29    'start_check_jobs_thread',
 30    'stop_check_jobs_thread',
 31)
 32
 33executor_types: List[str] = ['api', 'local']
 34
 35
 36def get_jobs(
 37    executor_keys: Optional[str] = None,
 38    include_hidden: bool = False,
 39    combine_local_and_systemd: bool = True,
 40    debug: bool = False,
 41) -> Dict[str, Job]:
 42    """
 43    Return a dictionary of the existing jobs.
 44
 45    Parameters
 46    ----------
 47    executor_keys: Optional[str], default None
 48        If provided, return remote jobs on the given API instance.
 49        Otherwise return local jobs.
 50
 51    include_hidden: bool, default False
 52        If `True`, include jobs with the `hidden` attribute.
 53
 54    Returns
 55    -------
 56    A dictionary mapping job names to jobs.
 57    """
 58    from meerschaum.connectors.parse import parse_executor_keys
 59    executor_keys = executor_keys or get_executor_keys_from_context()
 60
 61    include_local_and_system = (
 62        combine_local_and_systemd
 63        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 64        and get_executor_keys_from_context() == 'systemd'
 65    )
 66
 67    def _get_local_jobs():
 68        from meerschaum.utils.daemon import get_daemons
 69        daemons = get_daemons()
 70        jobs = {
 71            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 72            for daemon in daemons
 73        }
 74        return {
 75            name: job
 76            for name, job in jobs.items()
 77            if (include_hidden or not job.hidden) and not job._is_externally_managed
 78
 79        }
 80
 81    def _get_systemd_jobs():
 82        conn = mrsm.get_connector('systemd')
 83        jobs = conn.get_jobs(debug=debug)
 84        return {
 85            name: job
 86            for name, job in jobs.items()
 87            if include_hidden or not job.hidden
 88        }
 89
 90    if include_local_and_system:
 91        local_jobs = _get_local_jobs()
 92        systemd_jobs = _get_systemd_jobs()
 93        shared_jobs = set(local_jobs) & set(systemd_jobs)
 94        if shared_jobs:
 95            from meerschaum.utils.misc import items_str
 96            from meerschaum.utils.warnings import warn
 97            warn(
 98                "Job"
 99                + ('s' if len(shared_jobs) != 1 else '')
100                + f" {items_str(list(shared_jobs))} "
101                + "exist"
102                + ('s' if len(shared_jobs) == 1 else '')
103                + " in both `local` and `systemd`.",
104                stack=False,
105            )
106        return {**local_jobs, **systemd_jobs}
107
108    if executor_keys == 'local':
109        return _get_local_jobs()
110
111    if executor_keys == 'systemd':
112        return _get_systemd_jobs()
113
114    try:
115        _ = parse_executor_keys(executor_keys, construct=False)
116        conn = parse_executor_keys(executor_keys)
117        jobs = conn.get_jobs(debug=debug)
118        return {
119            name: job
120            for name, job in jobs.items()
121            if include_hidden or not job.hidden
122        }
123    except Exception:
124        return {}
125
126
127def get_filtered_jobs(
128    executor_keys: Optional[str] = None,
129    filter_list: Optional[List[str]] = None,
130    include_hidden: bool = False,
131    combine_local_and_systemd: bool = True,
132    warn: bool = False,
133    debug: bool = False,
134) -> Dict[str, Job]:
135    """
136    Return a list of jobs filtered by the user.
137    """
138    from meerschaum.utils.warnings import warn as _warn
139    jobs = get_jobs(
140        executor_keys,
141        include_hidden=True,
142        combine_local_and_systemd=combine_local_and_systemd,
143        debug=debug,
144    )
145    if not filter_list:
146        return {
147            name: job
148            for name, job in jobs.items()
149            if include_hidden or not job.hidden
150        }
151
152    jobs_to_return = {}
153    filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')]
154    filter_list_with_underscores = [name for name in filter_list if name.startswith('_')]
155    if (
156        filter_list_without_underscores and not filter_list_with_underscores
157        or filter_list_with_underscores and not filter_list_without_underscores
158    ):
159        pass
160    for name in filter_list:
161        job = jobs.get(name, None)
162        if job is None:
163            if warn:
164                _warn(
165                    f"Job '{name}' does not exist.",
166                    stack=False,
167                )
168            continue
169        jobs_to_return[name] = job
170
171    if not jobs_to_return and filter_list_with_underscores:
172        names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores]
173        return {
174            name: job
175            for name, job in jobs.items()
176            if name not in names_to_exclude
177        }
178
179    return jobs_to_return
180
181
182def get_restart_jobs(
183    executor_keys: Optional[str] = None,
184    jobs: Optional[Dict[str, Job]] = None,
185    include_hidden: bool = False,
186    combine_local_and_systemd: bool = True,
187    debug: bool = False,
188) -> Dict[str, Job]:
189    """
190    Return jobs which were created with `--restart` or `--loop`.
191    """
192    if jobs is None:
193        jobs = get_jobs(
194            executor_keys,
195            include_hidden=include_hidden,
196            combine_local_and_systemd=combine_local_and_systemd,
197            debug=debug,
198        )
199
200    return {
201        name: job
202        for name, job in jobs.items()
203        if job.restart
204    }
205
206
207def get_running_jobs(
208    executor_keys: Optional[str] = None,
209    jobs: Optional[Dict[str, Job]] = None,
210    include_hidden: bool = False,
211    combine_local_and_systemd: bool = True,
212    debug: bool = False,
213) -> Dict[str, Job]:
214    """
215    Return a dictionary of running jobs.
216    """
217    if jobs is None:
218        jobs = get_jobs(
219            executor_keys,
220            include_hidden=include_hidden,
221            combine_local_and_systemd=combine_local_and_systemd,
222            debug=debug,
223        )
224
225    return {
226        name: job
227        for name, job in jobs.items()
228        if job.status == 'running'
229    }
230
231
232def get_paused_jobs(
233    executor_keys: Optional[str] = None,
234    jobs: Optional[Dict[str, Job]] = None,
235    include_hidden: bool = False,
236    combine_local_and_systemd: bool = True,
237    debug: bool = False,
238) -> Dict[str, Job]:
239    """
240    Return a dictionary of paused jobs.
241    """
242    if jobs is None:
243        jobs = get_jobs(
244            executor_keys,
245            include_hidden=include_hidden,
246            combine_local_and_systemd=combine_local_and_systemd,
247            debug=debug,
248        )
249
250    return {
251        name: job
252        for name, job in jobs.items()
253        if job.status == 'paused'
254    }
255
256
257def get_stopped_jobs(
258    executor_keys: Optional[str] = None,
259    jobs: Optional[Dict[str, Job]] = None,
260    include_hidden: bool = False,
261    combine_local_and_systemd: bool = True,
262    debug: bool = False,
263) -> Dict[str, Job]:
264    """
265    Return a dictionary of stopped jobs.
266    """
267    if jobs is None:
268        jobs = get_jobs(
269            executor_keys,
270            include_hidden=include_hidden,
271            combine_local_and_systemd=combine_local_and_systemd,
272            debug=debug,
273        )
274
275    return {
276        name: job
277        for name, job in jobs.items()
278        if job.status == 'stopped'
279    }
280
281
282def make_executor(cls):
283    """
284    Register a class as an `Executor`.
285    """
286    import re
287    from meerschaum.connectors import make_connector
288    suffix_regex = r'executor$'
289    typ = re.sub(suffix_regex, '', cls.__name__.lower())
290    if typ not in executor_types:
291        executor_types.append(typ)
292    return make_connector(cls, _is_executor=True)
293
294
295def check_restart_jobs(
296    executor_keys: Optional[str] = 'local',
297    jobs: Optional[Dict[str, Job]] = None,
298    include_hidden: bool = True,
299    silent: bool = False,
300    debug: bool = False,
301) -> SuccessTuple:
302    """
303    Restart any stopped jobs which were created with `--restart`.
304
305    Parameters
306    ----------
307    executor_keys: Optional[str], default None
308        If provided, check jobs on the given remote API instance.
309        Otherwise check local jobs.
310
311    include_hidden: bool, default True
312        If `True`, include hidden jobs in the check.
313
314    silent: bool, default False
315        If `True`, do not print the restart success message.
316    """
317    from meerschaum.utils.misc import items_str
318
319    if jobs is None:
320        jobs = get_jobs(
321            executor_keys,
322            include_hidden=include_hidden,
323            combine_local_and_systemd=False,
324            debug=debug,
325        )
326
327    if not jobs:
328        return True, "No jobs to restart."
329
330    results = {}
331    for name, job in jobs.items():
332        check_success, check_msg = job.check_restart()
333        results[job.name] = (check_success, check_msg)
334        if not silent:
335            mrsm.pprint((check_success, check_msg))
336
337    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
338    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
339    success = len(success_names) == len(jobs)
340    msg = (
341        (
342            "Successfully restarted job"
343            + ('s' if len(success_names) != 1 else '')
344            + ' ' + items_str(success_names) + '.'
345        )
346        if success
347        else (
348            "Failed to restart job"
349            + ('s' if len(success_names) != 1 else '')
350            + ' ' + items_str(fail_names) + '.'
351        )
352    )
353    return success, msg
354
355
356def _check_restart_jobs_against_lock(*args, **kwargs):
357    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
358    fasteners = mrsm.attempt_import('fasteners', lazy=False)
359    lock = fasteners.InterProcessLock(CHECK_JOBS_LOCK_PATH)
360    with lock:
361        check_restart_jobs(*args, **kwargs)
362
363
364_check_loop_stop_thread = None
365def start_check_jobs_thread():
366    """
367    Start a thread to regularly monitor jobs.
368    """
369    import atexit
370    from functools import partial
371    from meerschaum.utils.threading import RepeatTimer
372    from meerschaum.config.static import STATIC_CONFIG
373
374    global _check_loop_stop_thread
375    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
376
377    _check_loop_stop_thread = RepeatTimer(
378        sleep_seconds,
379        partial(
380            _check_restart_jobs_against_lock,
381            silent=True,
382        )
383    )
384    _check_loop_stop_thread.daemon = True
385    atexit.register(stop_check_jobs_thread)
386
387    _check_loop_stop_thread.start()
388    return _check_loop_stop_thread
389
390
391def stop_check_jobs_thread():
392    """
393    Stop the job monitoring thread.
394    """
395    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
396    from meerschaum.utils.warnings import warn
397    if _check_loop_stop_thread is None:
398        return
399
400    _check_loop_stop_thread.cancel()
401
402    try:
403        if CHECK_JOBS_LOCK_PATH.exists():
404            CHECK_JOBS_LOCK_PATH.unlink()
405    except Exception as e:
406        warn(f"Failed to remove check jobs lock file:\n{e}")
407
408
409_context_keys = None
410def get_executor_keys_from_context() -> str:
411    """
412    If we are running on the host with the default root, default to `'systemd'`.
413    Otherwise return `'local'`.
414    """
415    global _context_keys
416
417    if _context_keys is not None:
418        return _context_keys
419
420    from meerschaum.config import get_config
421    from meerschaum.config.paths import ROOT_DIR_PATH, DEFAULT_ROOT_DIR_PATH
422    from meerschaum.utils.misc import is_systemd_available
423
424    configured_executor = get_config('meerschaum', 'executor', warn=False)
425    if configured_executor is not None:
426        return configured_executor
427
428    _context_keys = (
429        'systemd'
430        if is_systemd_available() and ROOT_DIR_PATH == DEFAULT_ROOT_DIR_PATH
431        else 'local'
432    )
433    return _context_keys
434
435
436def _install_healthcheck_job() -> SuccessTuple:
437    """
438    Install the systemd job which checks local jobs.
439    """
440    from meerschaum.config import get_config
441
442    enable_healthcheck = get_config('system', 'experimental', 'systemd_healthcheck')
443    if not enable_healthcheck:
444        return False, "Local healthcheck is disabled."
445
446    if get_executor_keys_from_context() != 'systemd':
447        return False, "Not running systemd."
448
449    job = Job(
450        '.local-healthcheck',
451        ['restart', 'jobs', '-e', 'local', '--loop', '--min-seconds', '60'],
452        executor_keys='systemd',
453    )
454    return job.start()
class Job:
 50class Job:
 51    """
 52    Manage a `meerschaum.utils.daemon.Daemon`, locally or remotely via the API.
 53    """
 54
 55    def __init__(
 56        self,
 57        name: str,
 58        sysargs: Union[List[str], str, None] = None,
 59        env: Optional[Dict[str, str]] = None,
 60        executor_keys: Optional[str] = None,
 61        delete_after_completion: bool = False,
 62        _properties: Optional[Dict[str, Any]] = None,
 63        _rotating_log=None,
 64        _stdin_file=None,
 65        _status_hook: Optional[Callable[[], str]] = None,
 66        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
 67        _externally_managed: bool = False,
 68    ):
 69        """
 70        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
 71
 72        Parameters
 73        ----------
 74        name: str
 75            The name of the job to be created.
 76            This will also be used as the Daemon ID.
 77
 78        sysargs: Union[List[str], str, None], default None
 79            The sysargs of the command to be executed, e.g. 'start api'.
 80
 81        env: Optional[Dict[str, str]], default None
 82            If provided, set these environment variables in the job's process.
 83
 84        executor_keys: Optional[str], default None
 85            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
 86
 87        delete_after_completion: bool, default False
 88            If `True`, delete this job when it has finished executing.
 89
 90        _properties: Optional[Dict[str, Any]], default None
 91            If provided, use this to patch the daemon's properties.
 92        """
 93        from meerschaum.utils.daemon import Daemon
 94        for char in BANNED_CHARS:
 95            if char in name:
 96                raise ValueError(f"Invalid name: ({char}) is not allowed.")
 97
 98        if isinstance(sysargs, str):
 99            sysargs = shlex.split(sysargs)
100
101        and_key = STATIC_CONFIG['system']['arguments']['and_key']
102        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
103        if sysargs:
104            sysargs = [
105                (arg if arg != escaped_and_key else and_key)
106                for arg in sysargs
107            ]
108
109        ### NOTE: 'local' and 'systemd' executors are being coalesced.
110        if executor_keys is None:
111            from meerschaum.jobs import get_executor_keys_from_context
112            executor_keys = get_executor_keys_from_context()
113
114        self.executor_keys = executor_keys
115        self.name = name
116        try:
117            self._daemon = (
118                Daemon(daemon_id=name)
119                if executor_keys == 'local'
120                else None
121            )
122        except Exception:
123            self._daemon = None
124
125        ### Handle any injected dependencies.
126        if _rotating_log is not None:
127            self._rotating_log = _rotating_log
128            if self._daemon is not None:
129                self._daemon._rotating_log = _rotating_log
130
131        if _stdin_file is not None:
132            self._stdin_file = _stdin_file
133            if self._daemon is not None:
134                self._daemon._stdin_file = _stdin_file
135                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
136
137        if _status_hook is not None:
138            self._status_hook = _status_hook
139
140        if _result_hook is not None:
141            self._result_hook = _result_hook
142
143        self._externally_managed = _externally_managed
144        self._properties_patch = _properties or {}
145        if _externally_managed:
146            self._properties_patch.update({'externally_managed': _externally_managed})
147
148        if env:
149            self._properties_patch.update({'env': env})
150
151        if delete_after_completion:
152            self._properties_patch.update({'delete_after_completion': delete_after_completion})
153
154        daemon_sysargs = (
155            self._daemon.properties.get('target', {}).get('args', [None])[0]
156            if self._daemon is not None
157            else None
158        )
159
160        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
161            warn("Given sysargs differ from existing sysargs.")
162
163        self._sysargs = [
164            arg
165            for arg in (daemon_sysargs or sysargs or [])
166            if arg not in ('-d', '--daemon')
167        ]
168        for restart_flag in RESTART_FLAGS:
169            if restart_flag in self._sysargs:
170                self._properties_patch.update({'restart': True})
171                break
172
173    @staticmethod
174    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
175        """
176        Build a `Job` from the PID of a running Meerschaum process.
177
178        Parameters
179        ----------
180        pid: int
181            The PID of the process.
182
183        executor_keys: Optional[str], default None
184            The executor keys to assign to the job.
185        """
186        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
187
188        psutil = mrsm.attempt_import('psutil')
189        try:
190            process = psutil.Process(pid)
191        except psutil.NoSuchProcess as e:
192            warn(f"Process with PID {pid} does not exist.", stack=False)
193            raise e
194
195        command_args = process.cmdline()
196        is_daemon = command_args[1] == '-c'
197
198        if is_daemon:
199            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
200            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
201            if root_dir is None:
202                from meerschaum.config.paths import ROOT_DIR_PATH
203                root_dir = ROOT_DIR_PATH
204            else:
205                root_dir = pathlib.Path(root_dir)
206            jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name
207            daemon_dir = jobs_dir / daemon_id
208            pid_file = daemon_dir / 'process.pid'
209
210            if pid_file.exists():
211                with open(pid_file, 'r', encoding='utf-8') as f:
212                    daemon_pid = int(f.read())
213
214                if pid != daemon_pid:
215                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
216            else:
217                raise EnvironmentError(f"Is job '{daemon_id}' running?")
218
219            return Job(daemon_id, executor_keys=executor_keys)
220
221        from meerschaum._internal.arguments._parse_arguments import parse_arguments
222        from meerschaum.utils.daemon import get_new_daemon_name
223
224        mrsm_ix = 0
225        for i, arg in enumerate(command_args):
226            if 'mrsm' in arg or 'meerschaum' in arg.lower():
227                mrsm_ix = i
228                break
229
230        sysargs = command_args[mrsm_ix+1:]
231        kwargs = parse_arguments(sysargs)
232        name = kwargs.get('name', get_new_daemon_name())
233        return Job(name, sysargs, executor_keys=executor_keys)
234
235    def start(self, debug: bool = False) -> SuccessTuple:
236        """
237        Start the job's daemon.
238        """
239        if self.executor is not None:
240            if not self.exists(debug=debug):
241                return self.executor.create_job(
242                    self.name,
243                    self.sysargs,
244                    properties=self.daemon.properties,
245                    debug=debug,
246                )
247            return self.executor.start_job(self.name, debug=debug)
248
249        if self.is_running():
250            return True, f"{self} is already running."
251
252        success, msg = self.daemon.run(
253            keep_daemon_output=(not self.delete_after_completion),
254            allow_dirty_run=True,
255        )
256        if not success:
257            return success, msg
258
259        return success, f"Started {self}."
260
261    def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
262        """
263        Stop the job's daemon.
264        """
265        if self.executor is not None:
266            return self.executor.stop_job(self.name, debug=debug)
267
268        if self.daemon.status == 'stopped':
269            if not self.restart:
270                return True, f"{self} is not running."
271            elif self.stop_time is not None:
272                return True, f"{self} will not restart until manually started."
273
274        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
275        if quit_success:
276            return quit_success, f"Stopped {self}."
277
278        warn(
279            f"Failed to gracefully quit {self}.",
280            stack=False,
281        )
282        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
283        if not kill_success:
284            return kill_success, kill_msg
285
286        return kill_success, f"Killed {self}."
287
288    def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
289        """
290        Pause the job's daemon.
291        """
292        if self.executor is not None:
293            return self.executor.pause_job(self.name, debug=debug)
294
295        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
296        if not pause_success:
297            return pause_success, pause_msg
298
299        return pause_success, f"Paused {self}."
300
301    def delete(self, debug: bool = False) -> SuccessTuple:
302        """
303        Delete the job and its daemon.
304        """
305        if self.executor is not None:
306            return self.executor.delete_job(self.name, debug=debug)
307
308        if self.is_running():
309            stop_success, stop_msg = self.stop()
310            if not stop_success:
311                return stop_success, stop_msg
312
313        cleanup_success, cleanup_msg = self.daemon.cleanup()
314        if not cleanup_success:
315            return cleanup_success, cleanup_msg
316
317        _ = self.daemon._properties.pop('result', None)
318        return cleanup_success, f"Deleted {self}."
319
320    def is_running(self) -> bool:
321        """
322        Determine whether the job's daemon is running.
323        """
324        return self.status == 'running'
325
326    def exists(self, debug: bool = False) -> bool:
327        """
328        Determine whether the job exists.
329        """
330        if self.executor is not None:
331            return self.executor.get_job_exists(self.name, debug=debug)
332
333        return self.daemon.path.exists()
334
335    def get_logs(self) -> Union[str, None]:
336        """
337        Return the output text of the job's daemon.
338        """
339        if self.executor is not None:
340            return self.executor.get_logs(self.name)
341
342        return self.daemon.log_text
343
344    def monitor_logs(
345        self,
346        callback_function: Callable[[str], None] = partial(print, end=''),
347        input_callback_function: Optional[Callable[[], str]] = None,
348        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
349        stop_event: Optional[asyncio.Event] = None,
350        stop_on_exit: bool = False,
351        strip_timestamps: bool = False,
352        accept_input: bool = True,
353        debug: bool = False,
354    ):
355        """
356        Monitor the job's log files and execute a callback on new lines.
357
358        Parameters
359        ----------
360        callback_function: Callable[[str], None], default partial(print, end='')
361            The callback to execute as new data comes in.
362            Defaults to printing the output directly to `stdout`.
363
364        input_callback_function: Optional[Callable[[], str]], default None
365            If provided, execute this callback when the daemon is blocking on stdin.
366            Defaults to `sys.stdin.readline()`.
367
368        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
369            If provided, execute this callback when the daemon stops.
370            The job's SuccessTuple will be passed to the callback.
371
372        stop_event: Optional[asyncio.Event], default None
373            If provided, stop monitoring when this event is set.
374            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
375            from within `callback_function` to stop monitoring.
376
377        stop_on_exit: bool, default False
378            If `True`, stop monitoring when the job stops.
379
380        strip_timestamps: bool, default False
381            If `True`, remove leading timestamps from lines.
382
383        accept_input: bool, default True
384            If `True`, accept input when the daemon blocks on stdin.
385        """
386        def default_input_callback_function():
387            return sys.stdin.readline()
388
389        if input_callback_function is None:
390            input_callback_function = default_input_callback_function
391
392        if self.executor is not None:
393            self.executor.monitor_logs(
394                self.name,
395                callback_function,
396                input_callback_function=input_callback_function,
397                stop_callback_function=stop_callback_function,
398                stop_on_exit=stop_on_exit,
399                accept_input=accept_input,
400                strip_timestamps=strip_timestamps,
401                debug=debug,
402            )
403            return
404
405        monitor_logs_coroutine = self.monitor_logs_async(
406            callback_function=callback_function,
407            input_callback_function=input_callback_function,
408            stop_callback_function=stop_callback_function,
409            stop_event=stop_event,
410            stop_on_exit=stop_on_exit,
411            strip_timestamps=strip_timestamps,
412            accept_input=accept_input,
413        )
414        return asyncio.run(monitor_logs_coroutine)
415
416    async def monitor_logs_async(
417        self,
418        callback_function: Callable[[str], None] = partial(print, end='', flush=True),
419        input_callback_function: Optional[Callable[[], str]] = None,
420        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
421        stop_event: Optional[asyncio.Event] = None,
422        stop_on_exit: bool = False,
423        strip_timestamps: bool = False,
424        accept_input: bool = True,
425        _logs_path: Optional[pathlib.Path] = None,
426        _log=None,
427        _stdin_file=None,
428        debug: bool = False,
429    ):
430        """
431        Monitor the job's log files and await a callback on new lines.
432
433        Parameters
434        ----------
435        callback_function: Callable[[str], None], default partial(print, end='')
436            The callback to execute as new data comes in.
437            Defaults to printing the output directly to `stdout`.
438
439        input_callback_function: Optional[Callable[[], str]], default None
440            If provided, execute this callback when the daemon is blocking on stdin.
441            Defaults to `sys.stdin.readline()`.
442
443        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
444            If provided, execute this callback when the daemon stops.
445            The job's SuccessTuple will be passed to the callback.
446
447        stop_event: Optional[asyncio.Event], default None
448            If provided, stop monitoring when this event is set.
449            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
450            from within `callback_function` to stop monitoring.
451
452        stop_on_exit: bool, default False
453            If `True`, stop monitoring when the job stops.
454
455        strip_timestamps: bool, default False
456            If `True`, remove leading timestamps from lines.
457
458        accept_input: bool, default True
459            If `True`, accept input when the daemon blocks on stdin.
460        """
461        def default_input_callback_function():
462            return sys.stdin.readline()
463
464        if input_callback_function is None:
465            input_callback_function = default_input_callback_function
466
467        if self.executor is not None:
468            await self.executor.monitor_logs_async(
469                self.name,
470                callback_function,
471                input_callback_function=input_callback_function,
472                stop_callback_function=stop_callback_function,
473                stop_on_exit=stop_on_exit,
474                strip_timestamps=strip_timestamps,
475                accept_input=accept_input,
476                debug=debug,
477            )
478            return
479
480        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
481
482        events = {
483            'user': stop_event,
484            'stopped': asyncio.Event(),
485        }
486        combined_event = asyncio.Event()
487        emitted_text = False
488        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
489
490        async def check_job_status():
491            nonlocal emitted_text
492            stopped_event = events.get('stopped', None)
493            if stopped_event is None:
494                return
495
496            sleep_time = 0.1
497            while sleep_time < 60:
498                if self.status == 'stopped':
499                    if not emitted_text:
500                        await asyncio.sleep(sleep_time)
501                        sleep_time = round(sleep_time * 1.1, 2)
502                        continue
503
504                    if stop_callback_function is not None:
505                        try:
506                            if asyncio.iscoroutinefunction(stop_callback_function):
507                                await stop_callback_function(self.result)
508                            else:
509                                stop_callback_function(self.result)
510                        except asyncio.exceptions.CancelledError:
511                            break
512                        except Exception:
513                            warn(traceback.format_exc())
514
515                    if stop_on_exit:
516                        events['stopped'].set()
517
518                    break
519                await asyncio.sleep(0.1)
520
521        async def check_blocking_on_input():
522            while True:
523                if not emitted_text or not self.is_blocking_on_stdin():
524                    try:
525                        await asyncio.sleep(0.1)
526                    except asyncio.exceptions.CancelledError:
527                        break
528                    continue
529
530                if not self.is_running():
531                    break
532
533                await emit_latest_lines()
534
535                try:
536                    print('', end='', flush=True)
537                    if asyncio.iscoroutinefunction(input_callback_function):
538                        data = await input_callback_function()
539                    else:
540                        data = input_callback_function()
541                except KeyboardInterrupt:
542                    break
543                if not data.endswith('\n'):
544                    data += '\n'
545
546                stdin_file.write(data)
547                await asyncio.sleep(0.1)
548
549        async def combine_events():
550            event_tasks = [
551                asyncio.create_task(event.wait())
552                for event in events.values()
553                if event is not None
554            ]
555            if not event_tasks:
556                return
557
558            try:
559                done, pending = await asyncio.wait(
560                    event_tasks,
561                    return_when=asyncio.FIRST_COMPLETED,
562                )
563                for task in pending:
564                    task.cancel()
565            except asyncio.exceptions.CancelledError:
566                pass
567            finally:
568                combined_event.set()
569
570        check_job_status_task = asyncio.create_task(check_job_status())
571        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
572        combine_events_task = asyncio.create_task(combine_events())
573
574        log = _log if _log is not None else self.daemon.rotating_log
575        lines_to_show = get_config('jobs', 'logs', 'lines_to_show')
576
577        async def emit_latest_lines():
578            nonlocal emitted_text
579            lines = log.readlines()
580            for line in lines[(-1 * lines_to_show):]:
581                if stop_event is not None and stop_event.is_set():
582                    return
583
584                if strip_timestamps:
585                    line = strip_timestamp_from_line(line)
586
587                try:
588                    if asyncio.iscoroutinefunction(callback_function):
589                        await callback_function(line)
590                    else:
591                        callback_function(line)
592                    emitted_text = True
593                except StopMonitoringLogs:
594                    return
595                except Exception:
596                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
597
598        await emit_latest_lines()
599
600        tasks = (
601            [check_job_status_task]
602            + ([check_blocking_on_input_task] if accept_input else [])
603            + [combine_events_task]
604        )
605        try:
606            _ = asyncio.gather(*tasks, return_exceptions=True)
607        except asyncio.exceptions.CancelledError:
608            raise
609        except Exception:
610            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
611
612        watchfiles = mrsm.attempt_import('watchfiles')
613        async for changes in watchfiles.awatch(
614            _logs_path or LOGS_RESOURCES_PATH,
615            stop_event=combined_event,
616        ):
617            for change in changes:
618                file_path_str = change[1]
619                file_path = pathlib.Path(file_path_str)
620                latest_subfile_path = log.get_latest_subfile_path()
621                if latest_subfile_path != file_path:
622                    continue
623
624                await emit_latest_lines()
625
626        await emit_latest_lines()
627
628    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
629        """
630        Return whether a job's daemon is blocking on stdin.
631        """
632        if self.executor is not None:
633            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
634
635        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()
636
637    def write_stdin(self, data):
638        """
639        Write to a job's daemon's `stdin`.
640        """
641        self.daemon.stdin_file.write(data)
642
643    @property
644    def executor(self) -> Union[Executor, None]:
645        """
646        If the job is remote, return the connector to the remote API instance.
647        """
648        return (
649            mrsm.get_connector(self.executor_keys)
650            if self.executor_keys != 'local'
651            else None
652        )
653
654    @property
655    def status(self) -> str:
656        """
657        Return the running status of the job's daemon.
658        """
659        if '_status_hook' in self.__dict__:
660            return self._status_hook()
661
662        if self.executor is not None:
663            return self.executor.get_job_status(self.name)
664
665        return self.daemon.status
666
667    @property
668    def pid(self) -> Union[int, None]:
669        """
670        Return the PID of the job's dameon.
671        """
672        if self.executor is not None:
673            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
674
675        return self.daemon.pid
676
677    @property
678    def restart(self) -> bool:
679        """
680        Return whether to restart a stopped job.
681        """
682        if self.executor is not None:
683            return self.executor.get_job_metadata(self.name).get('restart', False)
684
685        return self.daemon.properties.get('restart', False)
686
687    @property
688    def result(self) -> SuccessTuple:
689        """
690        Return the `SuccessTuple` when the job has terminated.
691        """
692        if self.is_running():
693            return True, f"{self} is running."
694
695        if '_result_hook' in self.__dict__:
696            return self._result_hook()
697
698        if self.executor is not None:
699            return (
700                self.executor.get_job_metadata(self.name)
701                .get('result', (False, "No result available."))
702            )
703
704        _result = self.daemon.properties.get('result', None)
705        if _result is None:
706            return False, "No result available."
707
708        return tuple(_result)
709
710    @property
711    def sysargs(self) -> List[str]:
712        """
713        Return the sysargs to use for the Daemon.
714        """
715        if self._sysargs:
716            return self._sysargs
717
718        if self.executor is not None:
719            return self.executor.get_job_metadata(self.name).get('sysargs', [])
720
721        target_args = self.daemon.target_args
722        if target_args is None:
723            return []
724        self._sysargs = target_args[0] if len(target_args) > 0 else []
725        return self._sysargs
726
727    @property
728    def daemon(self) -> 'Daemon':
729        """
730        Return the daemon which this job manages.
731        """
732        from meerschaum.utils.daemon import Daemon
733        if self._daemon is not None and self.executor is None and self._sysargs:
734            return self._daemon
735
736        remote_properties = (
737            {}
738            if self.executor is None
739            else self.executor.get_job_properties(self.name)
740        )
741        properties = {**remote_properties, **self._properties_patch}
742
743        self._daemon = Daemon(
744            target=entry,
745            target_args=[self._sysargs],
746            target_kw={},
747            daemon_id=self.name,
748            label=shlex.join(self._sysargs),
749            properties=properties,
750        )
751        if '_rotating_log' in self.__dict__:
752            self._daemon._rotating_log = self._rotating_log
753
754        if '_stdin_file' in self.__dict__:
755            self._daemon._stdin_file = self._stdin_file
756            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
757
758        return self._daemon
759
760    @property
761    def began(self) -> Union[datetime, None]:
762        """
763        The datetime when the job began running.
764        """
765        if self.executor is not None:
766            began_str = self.executor.get_job_began(self.name)
767            if began_str is None:
768                return None
769            return (
770                datetime.fromisoformat(began_str)
771                .astimezone(timezone.utc)
772                .replace(tzinfo=None)
773            )
774
775        began_str = self.daemon.properties.get('process', {}).get('began', None)
776        if began_str is None:
777            return None
778
779        return datetime.fromisoformat(began_str)
780
781    @property
782    def ended(self) -> Union[datetime, None]:
783        """
784        The datetime when the job stopped running.
785        """
786        if self.executor is not None:
787            ended_str = self.executor.get_job_ended(self.name)
788            if ended_str is None:
789                return None
790            return (
791                datetime.fromisoformat(ended_str)
792                .astimezone(timezone.utc)
793                .replace(tzinfo=None)
794            )
795
796        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
797        if ended_str is None:
798            return None
799
800        return datetime.fromisoformat(ended_str)
801
802    @property
803    def paused(self) -> Union[datetime, None]:
804        """
805        The datetime when the job was suspended while running.
806        """
807        if self.executor is not None:
808            paused_str = self.executor.get_job_paused(self.name)
809            if paused_str is None:
810                return None
811            return (
812                datetime.fromisoformat(paused_str)
813                .astimezone(timezone.utc)
814                .replace(tzinfo=None)
815            )
816
817        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
818        if paused_str is None:
819            return None
820
821        return datetime.fromisoformat(paused_str)
822
823    @property
824    def stop_time(self) -> Union[datetime, None]:
825        """
826        Return the timestamp when the job was manually stopped.
827        """
828        if self.executor is not None:
829            return self.executor.get_job_stop_time(self.name)
830
831        if not self.daemon.stop_path.exists():
832            return None
833
834        stop_data = self.daemon._read_stop_file()
835        if not stop_data:
836            return None
837
838        stop_time_str = stop_data.get('stop_time', None)
839        if not stop_time_str:
840            warn(f"Could not read stop time for {self}.")
841            return None
842
843        return datetime.fromisoformat(stop_time_str)
844
845    @property
846    def hidden(self) -> bool:
847        """
848        Return a bool indicating whether this job should be displayed.
849        """
850        return (
851            self.name.startswith('_')
852            or self.name.startswith('.')
853            or self._is_externally_managed
854        )
855
856    def check_restart(self) -> SuccessTuple:
857        """
858        If `restart` is `True` and the daemon is not running,
859        restart the job.
860        Do not restart if the job was manually stopped.
861        """
862        if self.is_running():
863            return True, f"{self} is running."
864
865        if not self.restart:
866            return True, f"{self} does not need to be restarted."
867
868        if self.stop_time is not None:
869            return True, f"{self} was manually stopped."
870
871        return self.start()
872
873    @property
874    def label(self) -> str:
875        """
876        Return the job's Daemon label (joined sysargs).
877        """
878        from meerschaum._internal.arguments import compress_pipeline_sysargs
879        sysargs = compress_pipeline_sysargs(self.sysargs)
880        return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip()
881
882    @property
883    def _externally_managed_file(self) -> pathlib.Path:
884        """
885        Return the path to the externally managed file.
886        """
887        return self.daemon.path / '.externally-managed'
888
889    def _set_externally_managed(self):
890        """
891        Set this job as externally managed.
892        """
893        self._externally_managed = True
894        try:
895            self._externally_managed_file.parent.mkdir(exist_ok=True, parents=True)
896            self._externally_managed_file.touch()
897        except Exception as e:
898            warn(e)
899
900    @property
901    def _is_externally_managed(self) -> bool:
902        """
903        Return whether this job is externally managed.
904        """
905        return self.executor_keys in (None, 'local') and (
906            self._externally_managed or self._externally_managed_file.exists()
907        )
908
909    @property
910    def env(self) -> Dict[str, str]:
911        """
912        Return the environment variables to set for the job's process.
913        """
914        if '_env' in self.__dict__:
915            return self.__dict__['_env']
916
917        _env = self.daemon.properties.get('env', {})
918        default_env = {
919            'PYTHONUNBUFFERED': '1',
920            'LINES': str(get_config('jobs', 'terminal', 'lines')),
921            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
922            STATIC_CONFIG['environment']['noninteractive']: 'true',
923        }
924        self._env = {**default_env, **_env}
925        return self._env
926
927    @property
928    def delete_after_completion(self) -> bool:
929        """
930        Return whether this job is configured to delete itself after completion.
931        """
932        if '_delete_after_completion' in self.__dict__:
933            return self.__dict__.get('_delete_after_completion', False)
934
935        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
936        return self._delete_after_completion
937
938    def __str__(self) -> str:
939        sysargs = self.sysargs
940        sysargs_str = shlex.join(sysargs) if sysargs else ''
941        job_str = f'Job("{self.name}"'
942        if sysargs_str:
943            job_str += f', "{sysargs_str}"'
944
945        job_str += ')'
946        return job_str
947
948    def __repr__(self) -> str:
949        return str(self)
950
951    def __hash__(self) -> int:
952        return hash(self.name)

Manage a meerschaum.utils.daemon.Daemon, locally or remotely via the API.

Job( name: str, sysargs: Union[List[str], str, NoneType] = None, env: Optional[Dict[str, str]] = None, executor_keys: Optional[str] = None, delete_after_completion: bool = False, _properties: Optional[Dict[str, Any]] = None, _rotating_log=None, _stdin_file=None, _status_hook: Optional[Callable[[], str]] = None, _result_hook: Optional[Callable[[], Tuple[bool, str]]] = None, _externally_managed: bool = False)
 55    def __init__(
 56        self,
 57        name: str,
 58        sysargs: Union[List[str], str, None] = None,
 59        env: Optional[Dict[str, str]] = None,
 60        executor_keys: Optional[str] = None,
 61        delete_after_completion: bool = False,
 62        _properties: Optional[Dict[str, Any]] = None,
 63        _rotating_log=None,
 64        _stdin_file=None,
 65        _status_hook: Optional[Callable[[], str]] = None,
 66        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
 67        _externally_managed: bool = False,
 68    ):
 69        """
 70        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
 71
 72        Parameters
 73        ----------
 74        name: str
 75            The name of the job to be created.
 76            This will also be used as the Daemon ID.
 77
 78        sysargs: Union[List[str], str, None], default None
 79            The sysargs of the command to be executed, e.g. 'start api'.
 80
 81        env: Optional[Dict[str, str]], default None
 82            If provided, set these environment variables in the job's process.
 83
 84        executor_keys: Optional[str], default None
 85            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
 86
 87        delete_after_completion: bool, default False
 88            If `True`, delete this job when it has finished executing.
 89
 90        _properties: Optional[Dict[str, Any]], default None
 91            If provided, use this to patch the daemon's properties.
 92        """
 93        from meerschaum.utils.daemon import Daemon
 94        for char in BANNED_CHARS:
 95            if char in name:
 96                raise ValueError(f"Invalid name: ({char}) is not allowed.")
 97
 98        if isinstance(sysargs, str):
 99            sysargs = shlex.split(sysargs)
100
101        and_key = STATIC_CONFIG['system']['arguments']['and_key']
102        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
103        if sysargs:
104            sysargs = [
105                (arg if arg != escaped_and_key else and_key)
106                for arg in sysargs
107            ]
108
109        ### NOTE: 'local' and 'systemd' executors are being coalesced.
110        if executor_keys is None:
111            from meerschaum.jobs import get_executor_keys_from_context
112            executor_keys = get_executor_keys_from_context()
113
114        self.executor_keys = executor_keys
115        self.name = name
116        try:
117            self._daemon = (
118                Daemon(daemon_id=name)
119                if executor_keys == 'local'
120                else None
121            )
122        except Exception:
123            self._daemon = None
124
125        ### Handle any injected dependencies.
126        if _rotating_log is not None:
127            self._rotating_log = _rotating_log
128            if self._daemon is not None:
129                self._daemon._rotating_log = _rotating_log
130
131        if _stdin_file is not None:
132            self._stdin_file = _stdin_file
133            if self._daemon is not None:
134                self._daemon._stdin_file = _stdin_file
135                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
136
137        if _status_hook is not None:
138            self._status_hook = _status_hook
139
140        if _result_hook is not None:
141            self._result_hook = _result_hook
142
143        self._externally_managed = _externally_managed
144        self._properties_patch = _properties or {}
145        if _externally_managed:
146            self._properties_patch.update({'externally_managed': _externally_managed})
147
148        if env:
149            self._properties_patch.update({'env': env})
150
151        if delete_after_completion:
152            self._properties_patch.update({'delete_after_completion': delete_after_completion})
153
154        daemon_sysargs = (
155            self._daemon.properties.get('target', {}).get('args', [None])[0]
156            if self._daemon is not None
157            else None
158        )
159
160        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
161            warn("Given sysargs differ from existing sysargs.")
162
163        self._sysargs = [
164            arg
165            for arg in (daemon_sysargs or sysargs or [])
166            if arg not in ('-d', '--daemon')
167        ]
168        for restart_flag in RESTART_FLAGS:
169            if restart_flag in self._sysargs:
170                self._properties_patch.update({'restart': True})
171                break

Create a new job to manage a meerschaum.utils.daemon.Daemon.

Parameters
  • name (str): The name of the job to be created. This will also be used as the Daemon ID.
  • sysargs (Union[List[str], str, None], default None): The sysargs of the command to be executed, e.g. 'start api'.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the job's process.
  • executor_keys (Optional[str], default None): If provided, execute the job remotely on an API instance, e.g. 'api:main'.
  • delete_after_completion (bool, default False): If True, delete this job when it has finished executing.
  • _properties (Optional[Dict[str, Any]], default None): If provided, use this to patch the daemon's properties.
executor_keys
name
@staticmethod
def from_pid( pid: int, executor_keys: Optional[str] = None) -> Job:
173    @staticmethod
174    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
175        """
176        Build a `Job` from the PID of a running Meerschaum process.
177
178        Parameters
179        ----------
180        pid: int
181            The PID of the process.
182
183        executor_keys: Optional[str], default None
184            The executor keys to assign to the job.
185        """
186        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
187
188        psutil = mrsm.attempt_import('psutil')
189        try:
190            process = psutil.Process(pid)
191        except psutil.NoSuchProcess as e:
192            warn(f"Process with PID {pid} does not exist.", stack=False)
193            raise e
194
195        command_args = process.cmdline()
196        is_daemon = command_args[1] == '-c'
197
198        if is_daemon:
199            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
200            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
201            if root_dir is None:
202                from meerschaum.config.paths import ROOT_DIR_PATH
203                root_dir = ROOT_DIR_PATH
204            else:
205                root_dir = pathlib.Path(root_dir)
206            jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name
207            daemon_dir = jobs_dir / daemon_id
208            pid_file = daemon_dir / 'process.pid'
209
210            if pid_file.exists():
211                with open(pid_file, 'r', encoding='utf-8') as f:
212                    daemon_pid = int(f.read())
213
214                if pid != daemon_pid:
215                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
216            else:
217                raise EnvironmentError(f"Is job '{daemon_id}' running?")
218
219            return Job(daemon_id, executor_keys=executor_keys)
220
221        from meerschaum._internal.arguments._parse_arguments import parse_arguments
222        from meerschaum.utils.daemon import get_new_daemon_name
223
224        mrsm_ix = 0
225        for i, arg in enumerate(command_args):
226            if 'mrsm' in arg or 'meerschaum' in arg.lower():
227                mrsm_ix = i
228                break
229
230        sysargs = command_args[mrsm_ix+1:]
231        kwargs = parse_arguments(sysargs)
232        name = kwargs.get('name', get_new_daemon_name())
233        return Job(name, sysargs, executor_keys=executor_keys)

Build a Job from the PID of a running Meerschaum process.

Parameters
  • pid (int): The PID of the process.
  • executor_keys (Optional[str], default None): The executor keys to assign to the job.
def start(self, debug: bool = False) -> Tuple[bool, str]:
235    def start(self, debug: bool = False) -> SuccessTuple:
236        """
237        Start the job's daemon.
238        """
239        if self.executor is not None:
240            if not self.exists(debug=debug):
241                return self.executor.create_job(
242                    self.name,
243                    self.sysargs,
244                    properties=self.daemon.properties,
245                    debug=debug,
246                )
247            return self.executor.start_job(self.name, debug=debug)
248
249        if self.is_running():
250            return True, f"{self} is already running."
251
252        success, msg = self.daemon.run(
253            keep_daemon_output=(not self.delete_after_completion),
254            allow_dirty_run=True,
255        )
256        if not success:
257            return success, msg
258
259        return success, f"Started {self}."

Start the job's daemon.

def stop( self, timeout_seconds: Optional[int] = None, debug: bool = False) -> Tuple[bool, str]:
261    def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
262        """
263        Stop the job's daemon.
264        """
265        if self.executor is not None:
266            return self.executor.stop_job(self.name, debug=debug)
267
268        if self.daemon.status == 'stopped':
269            if not self.restart:
270                return True, f"{self} is not running."
271            elif self.stop_time is not None:
272                return True, f"{self} will not restart until manually started."
273
274        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
275        if quit_success:
276            return quit_success, f"Stopped {self}."
277
278        warn(
279            f"Failed to gracefully quit {self}.",
280            stack=False,
281        )
282        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
283        if not kill_success:
284            return kill_success, kill_msg
285
286        return kill_success, f"Killed {self}."

Stop the job's daemon.

def pause( self, timeout_seconds: Optional[int] = None, debug: bool = False) -> Tuple[bool, str]:
288    def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
289        """
290        Pause the job's daemon.
291        """
292        if self.executor is not None:
293            return self.executor.pause_job(self.name, debug=debug)
294
295        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
296        if not pause_success:
297            return pause_success, pause_msg
298
299        return pause_success, f"Paused {self}."

Pause the job's daemon.

def delete(self, debug: bool = False) -> Tuple[bool, str]:
301    def delete(self, debug: bool = False) -> SuccessTuple:
302        """
303        Delete the job and its daemon.
304        """
305        if self.executor is not None:
306            return self.executor.delete_job(self.name, debug=debug)
307
308        if self.is_running():
309            stop_success, stop_msg = self.stop()
310            if not stop_success:
311                return stop_success, stop_msg
312
313        cleanup_success, cleanup_msg = self.daemon.cleanup()
314        if not cleanup_success:
315            return cleanup_success, cleanup_msg
316
317        _ = self.daemon._properties.pop('result', None)
318        return cleanup_success, f"Deleted {self}."

Delete the job and its daemon.

def is_running(self) -> bool:
320    def is_running(self) -> bool:
321        """
322        Determine whether the job's daemon is running.
323        """
324        return self.status == 'running'

Determine whether the job's daemon is running.

def exists(self, debug: bool = False) -> bool:
326    def exists(self, debug: bool = False) -> bool:
327        """
328        Determine whether the job exists.
329        """
330        if self.executor is not None:
331            return self.executor.get_job_exists(self.name, debug=debug)
332
333        return self.daemon.path.exists()

Determine whether the job exists.

def get_logs(self) -> Optional[str]:
335    def get_logs(self) -> Union[str, None]:
336        """
337        Return the output text of the job's daemon.
338        """
339        if self.executor is not None:
340            return self.executor.get_logs(self.name)
341
342        return self.daemon.log_text

Return the output text of the job's daemon.

def monitor_logs( self, callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end=''), input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
344    def monitor_logs(
345        self,
346        callback_function: Callable[[str], None] = partial(print, end=''),
347        input_callback_function: Optional[Callable[[], str]] = None,
348        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
349        stop_event: Optional[asyncio.Event] = None,
350        stop_on_exit: bool = False,
351        strip_timestamps: bool = False,
352        accept_input: bool = True,
353        debug: bool = False,
354    ):
355        """
356        Monitor the job's log files and execute a callback on new lines.
357
358        Parameters
359        ----------
360        callback_function: Callable[[str], None], default partial(print, end='')
361            The callback to execute as new data comes in.
362            Defaults to printing the output directly to `stdout`.
363
364        input_callback_function: Optional[Callable[[], str]], default None
365            If provided, execute this callback when the daemon is blocking on stdin.
366            Defaults to `sys.stdin.readline()`.
367
368        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
369            If provided, execute this callback when the daemon stops.
370            The job's SuccessTuple will be passed to the callback.
371
372        stop_event: Optional[asyncio.Event], default None
373            If provided, stop monitoring when this event is set.
374            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
375            from within `callback_function` to stop monitoring.
376
377        stop_on_exit: bool, default False
378            If `True`, stop monitoring when the job stops.
379
380        strip_timestamps: bool, default False
381            If `True`, remove leading timestamps from lines.
382
383        accept_input: bool, default True
384            If `True`, accept input when the daemon blocks on stdin.
385        """
386        def default_input_callback_function():
387            return sys.stdin.readline()
388
389        if input_callback_function is None:
390            input_callback_function = default_input_callback_function
391
392        if self.executor is not None:
393            self.executor.monitor_logs(
394                self.name,
395                callback_function,
396                input_callback_function=input_callback_function,
397                stop_callback_function=stop_callback_function,
398                stop_on_exit=stop_on_exit,
399                accept_input=accept_input,
400                strip_timestamps=strip_timestamps,
401                debug=debug,
402            )
403            return
404
405        monitor_logs_coroutine = self.monitor_logs_async(
406            callback_function=callback_function,
407            input_callback_function=input_callback_function,
408            stop_callback_function=stop_callback_function,
409            stop_event=stop_event,
410            stop_on_exit=stop_on_exit,
411            strip_timestamps=strip_timestamps,
412            accept_input=accept_input,
413        )
414        return asyncio.run(monitor_logs_coroutine)

Monitor the job's log files and execute a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default partial(print, end='')): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
async def monitor_logs_async( self, callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='', flush=True), input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, _logs_path: Optional[pathlib.Path] = None, _log=None, _stdin_file=None, debug: bool = False):
416    async def monitor_logs_async(
417        self,
418        callback_function: Callable[[str], None] = partial(print, end='', flush=True),
419        input_callback_function: Optional[Callable[[], str]] = None,
420        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
421        stop_event: Optional[asyncio.Event] = None,
422        stop_on_exit: bool = False,
423        strip_timestamps: bool = False,
424        accept_input: bool = True,
425        _logs_path: Optional[pathlib.Path] = None,
426        _log=None,
427        _stdin_file=None,
428        debug: bool = False,
429    ):
430        """
431        Monitor the job's log files and await a callback on new lines.
432
433        Parameters
434        ----------
435        callback_function: Callable[[str], None], default partial(print, end='')
436            The callback to execute as new data comes in.
437            Defaults to printing the output directly to `stdout`.
438
439        input_callback_function: Optional[Callable[[], str]], default None
440            If provided, execute this callback when the daemon is blocking on stdin.
441            Defaults to `sys.stdin.readline()`.
442
443        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
444            If provided, execute this callback when the daemon stops.
445            The job's SuccessTuple will be passed to the callback.
446
447        stop_event: Optional[asyncio.Event], default None
448            If provided, stop monitoring when this event is set.
449            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
450            from within `callback_function` to stop monitoring.
451
452        stop_on_exit: bool, default False
453            If `True`, stop monitoring when the job stops.
454
455        strip_timestamps: bool, default False
456            If `True`, remove leading timestamps from lines.
457
458        accept_input: bool, default True
459            If `True`, accept input when the daemon blocks on stdin.
460        """
461        def default_input_callback_function():
462            return sys.stdin.readline()
463
464        if input_callback_function is None:
465            input_callback_function = default_input_callback_function
466
467        if self.executor is not None:
468            await self.executor.monitor_logs_async(
469                self.name,
470                callback_function,
471                input_callback_function=input_callback_function,
472                stop_callback_function=stop_callback_function,
473                stop_on_exit=stop_on_exit,
474                strip_timestamps=strip_timestamps,
475                accept_input=accept_input,
476                debug=debug,
477            )
478            return
479
480        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
481
482        events = {
483            'user': stop_event,
484            'stopped': asyncio.Event(),
485        }
486        combined_event = asyncio.Event()
487        emitted_text = False
488        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
489
490        async def check_job_status():
491            nonlocal emitted_text
492            stopped_event = events.get('stopped', None)
493            if stopped_event is None:
494                return
495
496            sleep_time = 0.1
497            while sleep_time < 60:
498                if self.status == 'stopped':
499                    if not emitted_text:
500                        await asyncio.sleep(sleep_time)
501                        sleep_time = round(sleep_time * 1.1, 2)
502                        continue
503
504                    if stop_callback_function is not None:
505                        try:
506                            if asyncio.iscoroutinefunction(stop_callback_function):
507                                await stop_callback_function(self.result)
508                            else:
509                                stop_callback_function(self.result)
510                        except asyncio.exceptions.CancelledError:
511                            break
512                        except Exception:
513                            warn(traceback.format_exc())
514
515                    if stop_on_exit:
516                        events['stopped'].set()
517
518                    break
519                await asyncio.sleep(0.1)
520
521        async def check_blocking_on_input():
522            while True:
523                if not emitted_text or not self.is_blocking_on_stdin():
524                    try:
525                        await asyncio.sleep(0.1)
526                    except asyncio.exceptions.CancelledError:
527                        break
528                    continue
529
530                if not self.is_running():
531                    break
532
533                await emit_latest_lines()
534
535                try:
536                    print('', end='', flush=True)
537                    if asyncio.iscoroutinefunction(input_callback_function):
538                        data = await input_callback_function()
539                    else:
540                        data = input_callback_function()
541                except KeyboardInterrupt:
542                    break
543                if not data.endswith('\n'):
544                    data += '\n'
545
546                stdin_file.write(data)
547                await asyncio.sleep(0.1)
548
549        async def combine_events():
550            event_tasks = [
551                asyncio.create_task(event.wait())
552                for event in events.values()
553                if event is not None
554            ]
555            if not event_tasks:
556                return
557
558            try:
559                done, pending = await asyncio.wait(
560                    event_tasks,
561                    return_when=asyncio.FIRST_COMPLETED,
562                )
563                for task in pending:
564                    task.cancel()
565            except asyncio.exceptions.CancelledError:
566                pass
567            finally:
568                combined_event.set()
569
570        check_job_status_task = asyncio.create_task(check_job_status())
571        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
572        combine_events_task = asyncio.create_task(combine_events())
573
574        log = _log if _log is not None else self.daemon.rotating_log
575        lines_to_show = get_config('jobs', 'logs', 'lines_to_show')
576
577        async def emit_latest_lines():
578            nonlocal emitted_text
579            lines = log.readlines()
580            for line in lines[(-1 * lines_to_show):]:
581                if stop_event is not None and stop_event.is_set():
582                    return
583
584                if strip_timestamps:
585                    line = strip_timestamp_from_line(line)
586
587                try:
588                    if asyncio.iscoroutinefunction(callback_function):
589                        await callback_function(line)
590                    else:
591                        callback_function(line)
592                    emitted_text = True
593                except StopMonitoringLogs:
594                    return
595                except Exception:
596                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
597
598        await emit_latest_lines()
599
600        tasks = (
601            [check_job_status_task]
602            + ([check_blocking_on_input_task] if accept_input else [])
603            + [combine_events_task]
604        )
605        try:
606            _ = asyncio.gather(*tasks, return_exceptions=True)
607        except asyncio.exceptions.CancelledError:
608            raise
609        except Exception:
610            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
611
612        watchfiles = mrsm.attempt_import('watchfiles')
613        async for changes in watchfiles.awatch(
614            _logs_path or LOGS_RESOURCES_PATH,
615            stop_event=combined_event,
616        ):
617            for change in changes:
618                file_path_str = change[1]
619                file_path = pathlib.Path(file_path_str)
620                latest_subfile_path = log.get_latest_subfile_path()
621                if latest_subfile_path != file_path:
622                    continue
623
624                await emit_latest_lines()
625
626        await emit_latest_lines()

Monitor the job's log files and await a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default partial(print, end='')): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
def is_blocking_on_stdin(self, debug: bool = False) -> bool:
628    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
629        """
630        Return whether a job's daemon is blocking on stdin.
631        """
632        if self.executor is not None:
633            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
634
635        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()

Return whether a job's daemon is blocking on stdin.

def write_stdin(self, data):
637    def write_stdin(self, data):
638        """
639        Write to a job's daemon's `stdin`.
640        """
641        self.daemon.stdin_file.write(data)

Write to a job's daemon's stdin.

executor: Optional[Executor]
643    @property
644    def executor(self) -> Union[Executor, None]:
645        """
646        If the job is remote, return the connector to the remote API instance.
647        """
648        return (
649            mrsm.get_connector(self.executor_keys)
650            if self.executor_keys != 'local'
651            else None
652        )

If the job is remote, return the connector to the remote API instance.

status: str
654    @property
655    def status(self) -> str:
656        """
657        Return the running status of the job's daemon.
658        """
659        if '_status_hook' in self.__dict__:
660            return self._status_hook()
661
662        if self.executor is not None:
663            return self.executor.get_job_status(self.name)
664
665        return self.daemon.status

Return the running status of the job's daemon.

pid: Optional[int]
667    @property
668    def pid(self) -> Union[int, None]:
669        """
670        Return the PID of the job's dameon.
671        """
672        if self.executor is not None:
673            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
674
675        return self.daemon.pid

Return the PID of the job's dameon.

restart: bool
677    @property
678    def restart(self) -> bool:
679        """
680        Return whether to restart a stopped job.
681        """
682        if self.executor is not None:
683            return self.executor.get_job_metadata(self.name).get('restart', False)
684
685        return self.daemon.properties.get('restart', False)

Return whether to restart a stopped job.

result: Tuple[bool, str]
687    @property
688    def result(self) -> SuccessTuple:
689        """
690        Return the `SuccessTuple` when the job has terminated.
691        """
692        if self.is_running():
693            return True, f"{self} is running."
694
695        if '_result_hook' in self.__dict__:
696            return self._result_hook()
697
698        if self.executor is not None:
699            return (
700                self.executor.get_job_metadata(self.name)
701                .get('result', (False, "No result available."))
702            )
703
704        _result = self.daemon.properties.get('result', None)
705        if _result is None:
706            return False, "No result available."
707
708        return tuple(_result)

Return the SuccessTuple when the job has terminated.

sysargs: List[str]
710    @property
711    def sysargs(self) -> List[str]:
712        """
713        Return the sysargs to use for the Daemon.
714        """
715        if self._sysargs:
716            return self._sysargs
717
718        if self.executor is not None:
719            return self.executor.get_job_metadata(self.name).get('sysargs', [])
720
721        target_args = self.daemon.target_args
722        if target_args is None:
723            return []
724        self._sysargs = target_args[0] if len(target_args) > 0 else []
725        return self._sysargs

Return the sysargs to use for the Daemon.

daemon: "'Daemon'"
727    @property
728    def daemon(self) -> 'Daemon':
729        """
730        Return the daemon which this job manages.
731        """
732        from meerschaum.utils.daemon import Daemon
733        if self._daemon is not None and self.executor is None and self._sysargs:
734            return self._daemon
735
736        remote_properties = (
737            {}
738            if self.executor is None
739            else self.executor.get_job_properties(self.name)
740        )
741        properties = {**remote_properties, **self._properties_patch}
742
743        self._daemon = Daemon(
744            target=entry,
745            target_args=[self._sysargs],
746            target_kw={},
747            daemon_id=self.name,
748            label=shlex.join(self._sysargs),
749            properties=properties,
750        )
751        if '_rotating_log' in self.__dict__:
752            self._daemon._rotating_log = self._rotating_log
753
754        if '_stdin_file' in self.__dict__:
755            self._daemon._stdin_file = self._stdin_file
756            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
757
758        return self._daemon

Return the daemon which this job manages.

began: Optional[datetime.datetime]
760    @property
761    def began(self) -> Union[datetime, None]:
762        """
763        The datetime when the job began running.
764        """
765        if self.executor is not None:
766            began_str = self.executor.get_job_began(self.name)
767            if began_str is None:
768                return None
769            return (
770                datetime.fromisoformat(began_str)
771                .astimezone(timezone.utc)
772                .replace(tzinfo=None)
773            )
774
775        began_str = self.daemon.properties.get('process', {}).get('began', None)
776        if began_str is None:
777            return None
778
779        return datetime.fromisoformat(began_str)

The datetime when the job began running.

ended: Optional[datetime.datetime]
781    @property
782    def ended(self) -> Union[datetime, None]:
783        """
784        The datetime when the job stopped running.
785        """
786        if self.executor is not None:
787            ended_str = self.executor.get_job_ended(self.name)
788            if ended_str is None:
789                return None
790            return (
791                datetime.fromisoformat(ended_str)
792                .astimezone(timezone.utc)
793                .replace(tzinfo=None)
794            )
795
796        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
797        if ended_str is None:
798            return None
799
800        return datetime.fromisoformat(ended_str)

The datetime when the job stopped running.

paused: Optional[datetime.datetime]
802    @property
803    def paused(self) -> Union[datetime, None]:
804        """
805        The datetime when the job was suspended while running.
806        """
807        if self.executor is not None:
808            paused_str = self.executor.get_job_paused(self.name)
809            if paused_str is None:
810                return None
811            return (
812                datetime.fromisoformat(paused_str)
813                .astimezone(timezone.utc)
814                .replace(tzinfo=None)
815            )
816
817        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
818        if paused_str is None:
819            return None
820
821        return datetime.fromisoformat(paused_str)

The datetime when the job was suspended while running.

stop_time: Optional[datetime.datetime]
823    @property
824    def stop_time(self) -> Union[datetime, None]:
825        """
826        Return the timestamp when the job was manually stopped.
827        """
828        if self.executor is not None:
829            return self.executor.get_job_stop_time(self.name)
830
831        if not self.daemon.stop_path.exists():
832            return None
833
834        stop_data = self.daemon._read_stop_file()
835        if not stop_data:
836            return None
837
838        stop_time_str = stop_data.get('stop_time', None)
839        if not stop_time_str:
840            warn(f"Could not read stop time for {self}.")
841            return None
842
843        return datetime.fromisoformat(stop_time_str)

Return the timestamp when the job was manually stopped.

hidden: bool
845    @property
846    def hidden(self) -> bool:
847        """
848        Return a bool indicating whether this job should be displayed.
849        """
850        return (
851            self.name.startswith('_')
852            or self.name.startswith('.')
853            or self._is_externally_managed
854        )

Return a bool indicating whether this job should be displayed.

def check_restart(self) -> Tuple[bool, str]:
856    def check_restart(self) -> SuccessTuple:
857        """
858        If `restart` is `True` and the daemon is not running,
859        restart the job.
860        Do not restart if the job was manually stopped.
861        """
862        if self.is_running():
863            return True, f"{self} is running."
864
865        if not self.restart:
866            return True, f"{self} does not need to be restarted."
867
868        if self.stop_time is not None:
869            return True, f"{self} was manually stopped."
870
871        return self.start()

If restart is True and the daemon is not running, restart the job. Do not restart if the job was manually stopped.

label: str
873    @property
874    def label(self) -> str:
875        """
876        Return the job's Daemon label (joined sysargs).
877        """
878        from meerschaum._internal.arguments import compress_pipeline_sysargs
879        sysargs = compress_pipeline_sysargs(self.sysargs)
880        return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip()

Return the job's Daemon label (joined sysargs).

env: Dict[str, str]
909    @property
910    def env(self) -> Dict[str, str]:
911        """
912        Return the environment variables to set for the job's process.
913        """
914        if '_env' in self.__dict__:
915            return self.__dict__['_env']
916
917        _env = self.daemon.properties.get('env', {})
918        default_env = {
919            'PYTHONUNBUFFERED': '1',
920            'LINES': str(get_config('jobs', 'terminal', 'lines')),
921            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
922            STATIC_CONFIG['environment']['noninteractive']: 'true',
923        }
924        self._env = {**default_env, **_env}
925        return self._env

Return the environment variables to set for the job's process.

delete_after_completion: bool
927    @property
928    def delete_after_completion(self) -> bool:
929        """
930        Return whether this job is configured to delete itself after completion.
931        """
932        if '_delete_after_completion' in self.__dict__:
933            return self.__dict__.get('_delete_after_completion', False)
934
935        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
936        return self._delete_after_completion

Return whether this job is configured to delete itself after completion.

class StopMonitoringLogs(builtins.Exception):
44class StopMonitoringLogs(Exception):
45    """
46    Raise this exception to stop the logs monitoring.
47    """

Raise this exception to stop the logs monitoring.

def get_jobs( executor_keys: Optional[str] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
 37def get_jobs(
 38    executor_keys: Optional[str] = None,
 39    include_hidden: bool = False,
 40    combine_local_and_systemd: bool = True,
 41    debug: bool = False,
 42) -> Dict[str, Job]:
 43    """
 44    Return a dictionary of the existing jobs.
 45
 46    Parameters
 47    ----------
 48    executor_keys: Optional[str], default None
 49        If provided, return remote jobs on the given API instance.
 50        Otherwise return local jobs.
 51
 52    include_hidden: bool, default False
 53        If `True`, include jobs with the `hidden` attribute.
 54
 55    Returns
 56    -------
 57    A dictionary mapping job names to jobs.
 58    """
 59    from meerschaum.connectors.parse import parse_executor_keys
 60    executor_keys = executor_keys or get_executor_keys_from_context()
 61
 62    include_local_and_system = (
 63        combine_local_and_systemd
 64        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 65        and get_executor_keys_from_context() == 'systemd'
 66    )
 67
 68    def _get_local_jobs():
 69        from meerschaum.utils.daemon import get_daemons
 70        daemons = get_daemons()
 71        jobs = {
 72            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 73            for daemon in daemons
 74        }
 75        return {
 76            name: job
 77            for name, job in jobs.items()
 78            if (include_hidden or not job.hidden) and not job._is_externally_managed
 79
 80        }
 81
 82    def _get_systemd_jobs():
 83        conn = mrsm.get_connector('systemd')
 84        jobs = conn.get_jobs(debug=debug)
 85        return {
 86            name: job
 87            for name, job in jobs.items()
 88            if include_hidden or not job.hidden
 89        }
 90
 91    if include_local_and_system:
 92        local_jobs = _get_local_jobs()
 93        systemd_jobs = _get_systemd_jobs()
 94        shared_jobs = set(local_jobs) & set(systemd_jobs)
 95        if shared_jobs:
 96            from meerschaum.utils.misc import items_str
 97            from meerschaum.utils.warnings import warn
 98            warn(
 99                "Job"
100                + ('s' if len(shared_jobs) != 1 else '')
101                + f" {items_str(list(shared_jobs))} "
102                + "exist"
103                + ('s' if len(shared_jobs) == 1 else '')
104                + " in both `local` and `systemd`.",
105                stack=False,
106            )
107        return {**local_jobs, **systemd_jobs}
108
109    if executor_keys == 'local':
110        return _get_local_jobs()
111
112    if executor_keys == 'systemd':
113        return _get_systemd_jobs()
114
115    try:
116        _ = parse_executor_keys(executor_keys, construct=False)
117        conn = parse_executor_keys(executor_keys)
118        jobs = conn.get_jobs(debug=debug)
119        return {
120            name: job
121            for name, job in jobs.items()
122            if include_hidden or not job.hidden
123        }
124    except Exception:
125        return {}

Return a dictionary of the existing jobs.

Parameters
  • executor_keys (Optional[str], default None): If provided, return remote jobs on the given API instance. Otherwise return local jobs.
  • include_hidden (bool, default False): If True, include jobs with the hidden attribute.
Returns
  • A dictionary mapping job names to jobs.
def get_filtered_jobs( executor_keys: Optional[str] = None, filter_list: Optional[List[str]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, warn: bool = False, debug: bool = False) -> Dict[str, Job]:
128def get_filtered_jobs(
129    executor_keys: Optional[str] = None,
130    filter_list: Optional[List[str]] = None,
131    include_hidden: bool = False,
132    combine_local_and_systemd: bool = True,
133    warn: bool = False,
134    debug: bool = False,
135) -> Dict[str, Job]:
136    """
137    Return a list of jobs filtered by the user.
138    """
139    from meerschaum.utils.warnings import warn as _warn
140    jobs = get_jobs(
141        executor_keys,
142        include_hidden=True,
143        combine_local_and_systemd=combine_local_and_systemd,
144        debug=debug,
145    )
146    if not filter_list:
147        return {
148            name: job
149            for name, job in jobs.items()
150            if include_hidden or not job.hidden
151        }
152
153    jobs_to_return = {}
154    filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')]
155    filter_list_with_underscores = [name for name in filter_list if name.startswith('_')]
156    if (
157        filter_list_without_underscores and not filter_list_with_underscores
158        or filter_list_with_underscores and not filter_list_without_underscores
159    ):
160        pass
161    for name in filter_list:
162        job = jobs.get(name, None)
163        if job is None:
164            if warn:
165                _warn(
166                    f"Job '{name}' does not exist.",
167                    stack=False,
168                )
169            continue
170        jobs_to_return[name] = job
171
172    if not jobs_to_return and filter_list_with_underscores:
173        names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores]
174        return {
175            name: job
176            for name, job in jobs.items()
177            if name not in names_to_exclude
178        }
179
180    return jobs_to_return

Return a list of jobs filtered by the user.

def get_restart_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
183def get_restart_jobs(
184    executor_keys: Optional[str] = None,
185    jobs: Optional[Dict[str, Job]] = None,
186    include_hidden: bool = False,
187    combine_local_and_systemd: bool = True,
188    debug: bool = False,
189) -> Dict[str, Job]:
190    """
191    Return jobs which were created with `--restart` or `--loop`.
192    """
193    if jobs is None:
194        jobs = get_jobs(
195            executor_keys,
196            include_hidden=include_hidden,
197            combine_local_and_systemd=combine_local_and_systemd,
198            debug=debug,
199        )
200
201    return {
202        name: job
203        for name, job in jobs.items()
204        if job.restart
205    }

Return jobs which were created with --restart or --loop.

def get_running_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
208def get_running_jobs(
209    executor_keys: Optional[str] = None,
210    jobs: Optional[Dict[str, Job]] = None,
211    include_hidden: bool = False,
212    combine_local_and_systemd: bool = True,
213    debug: bool = False,
214) -> Dict[str, Job]:
215    """
216    Return a dictionary of running jobs.
217    """
218    if jobs is None:
219        jobs = get_jobs(
220            executor_keys,
221            include_hidden=include_hidden,
222            combine_local_and_systemd=combine_local_and_systemd,
223            debug=debug,
224        )
225
226    return {
227        name: job
228        for name, job in jobs.items()
229        if job.status == 'running'
230    }

Return a dictionary of running jobs.

def get_stopped_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
258def get_stopped_jobs(
259    executor_keys: Optional[str] = None,
260    jobs: Optional[Dict[str, Job]] = None,
261    include_hidden: bool = False,
262    combine_local_and_systemd: bool = True,
263    debug: bool = False,
264) -> Dict[str, Job]:
265    """
266    Return a dictionary of stopped jobs.
267    """
268    if jobs is None:
269        jobs = get_jobs(
270            executor_keys,
271            include_hidden=include_hidden,
272            combine_local_and_systemd=combine_local_and_systemd,
273            debug=debug,
274        )
275
276    return {
277        name: job
278        for name, job in jobs.items()
279        if job.status == 'stopped'
280    }

Return a dictionary of stopped jobs.

def get_paused_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
233def get_paused_jobs(
234    executor_keys: Optional[str] = None,
235    jobs: Optional[Dict[str, Job]] = None,
236    include_hidden: bool = False,
237    combine_local_and_systemd: bool = True,
238    debug: bool = False,
239) -> Dict[str, Job]:
240    """
241    Return a dictionary of paused jobs.
242    """
243    if jobs is None:
244        jobs = get_jobs(
245            executor_keys,
246            include_hidden=include_hidden,
247            combine_local_and_systemd=combine_local_and_systemd,
248            debug=debug,
249        )
250
251    return {
252        name: job
253        for name, job in jobs.items()
254        if job.status == 'paused'
255    }

Return a dictionary of paused jobs.

def make_executor(cls):
283def make_executor(cls):
284    """
285    Register a class as an `Executor`.
286    """
287    import re
288    from meerschaum.connectors import make_connector
289    suffix_regex = r'executor$'
290    typ = re.sub(suffix_regex, '', cls.__name__.lower())
291    if typ not in executor_types:
292        executor_types.append(typ)
293    return make_connector(cls, _is_executor=True)

Register a class as an Executor.

class Executor(meerschaum.connectors._Connector.Connector):
19class Executor(Connector):
20    """
21    Define the methods for managing jobs.
22    """
23
24    @abstractmethod
25    def get_job_exists(self, name: str, debug: bool = False) -> bool:
26        """
27        Return whether a job exists.
28        """
29    
30    @abstractmethod
31    def get_jobs(self) -> Dict[str, Job]:
32        """
33        Return a dictionary of names -> Jobs.
34        """
35
36    @abstractmethod
37    def create_job(
38        self,
39        name: str,
40        sysargs: List[str],
41        properties: Optional[Dict[str, Any]] = None,
42        debug: bool = False,
43    ) -> SuccessTuple:
44        """
45        Create a new job.
46        """
47
48    @abstractmethod
49    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
50        """
51        Start a job.
52        """
53
54    @abstractmethod
55    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
56        """
57        Stop a job.
58        """
59
60    @abstractmethod
61    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
62        """
63        Pause a job.
64        """
65
66    @abstractmethod
67    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
68        """
69        Delete a job.
70        """
71
72    @abstractmethod
73    def get_logs(self, name: str, debug: bool = False) -> str:
74        """
75        Return a job's log output.
76        """

Define the methods for managing jobs.

@abstractmethod
def get_job_exists(self, name: str, debug: bool = False) -> bool:
24    @abstractmethod
25    def get_job_exists(self, name: str, debug: bool = False) -> bool:
26        """
27        Return whether a job exists.
28        """

Return whether a job exists.

@abstractmethod
def get_jobs(self) -> Dict[str, Job]:
30    @abstractmethod
31    def get_jobs(self) -> Dict[str, Job]:
32        """
33        Return a dictionary of names -> Jobs.
34        """

Return a dictionary of names -> Jobs.

@abstractmethod
def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
36    @abstractmethod
37    def create_job(
38        self,
39        name: str,
40        sysargs: List[str],
41        properties: Optional[Dict[str, Any]] = None,
42        debug: bool = False,
43    ) -> SuccessTuple:
44        """
45        Create a new job.
46        """

Create a new job.

@abstractmethod
def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
48    @abstractmethod
49    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
50        """
51        Start a job.
52        """

Start a job.

@abstractmethod
def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
54    @abstractmethod
55    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
56        """
57        Stop a job.
58        """

Stop a job.

@abstractmethod
def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
60    @abstractmethod
61    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
62        """
63        Pause a job.
64        """

Pause a job.

@abstractmethod
def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
66    @abstractmethod
67    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
68        """
69        Delete a job.
70        """

Delete a job.

@abstractmethod
def get_logs(self, name: str, debug: bool = False) -> str:
72    @abstractmethod
73    def get_logs(self, name: str, debug: bool = False) -> str:
74        """
75        Return a job's log output.
76        """

Return a job's log output.

def check_restart_jobs( executor_keys: Optional[str] = 'local', jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = True, silent: bool = False, debug: bool = False) -> Tuple[bool, str]:
296def check_restart_jobs(
297    executor_keys: Optional[str] = 'local',
298    jobs: Optional[Dict[str, Job]] = None,
299    include_hidden: bool = True,
300    silent: bool = False,
301    debug: bool = False,
302) -> SuccessTuple:
303    """
304    Restart any stopped jobs which were created with `--restart`.
305
306    Parameters
307    ----------
308    executor_keys: Optional[str], default None
309        If provided, check jobs on the given remote API instance.
310        Otherwise check local jobs.
311
312    include_hidden: bool, default True
313        If `True`, include hidden jobs in the check.
314
315    silent: bool, default False
316        If `True`, do not print the restart success message.
317    """
318    from meerschaum.utils.misc import items_str
319
320    if jobs is None:
321        jobs = get_jobs(
322            executor_keys,
323            include_hidden=include_hidden,
324            combine_local_and_systemd=False,
325            debug=debug,
326        )
327
328    if not jobs:
329        return True, "No jobs to restart."
330
331    results = {}
332    for name, job in jobs.items():
333        check_success, check_msg = job.check_restart()
334        results[job.name] = (check_success, check_msg)
335        if not silent:
336            mrsm.pprint((check_success, check_msg))
337
338    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
339    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
340    success = len(success_names) == len(jobs)
341    msg = (
342        (
343            "Successfully restarted job"
344            + ('s' if len(success_names) != 1 else '')
345            + ' ' + items_str(success_names) + '.'
346        )
347        if success
348        else (
349            "Failed to restart job"
350            + ('s' if len(success_names) != 1 else '')
351            + ' ' + items_str(fail_names) + '.'
352        )
353    )
354    return success, msg

Restart any stopped jobs which were created with --restart.

Parameters
  • executor_keys (Optional[str], default None): If provided, check jobs on the given remote API instance. Otherwise check local jobs.
  • include_hidden (bool, default True): If True, include hidden jobs in the check.
  • silent (bool, default False): If True, do not print the restart success message.
def start_check_jobs_thread():
366def start_check_jobs_thread():
367    """
368    Start a thread to regularly monitor jobs.
369    """
370    import atexit
371    from functools import partial
372    from meerschaum.utils.threading import RepeatTimer
373    from meerschaum.config.static import STATIC_CONFIG
374
375    global _check_loop_stop_thread
376    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
377
378    _check_loop_stop_thread = RepeatTimer(
379        sleep_seconds,
380        partial(
381            _check_restart_jobs_against_lock,
382            silent=True,
383        )
384    )
385    _check_loop_stop_thread.daemon = True
386    atexit.register(stop_check_jobs_thread)
387
388    _check_loop_stop_thread.start()
389    return _check_loop_stop_thread

Start a thread to regularly monitor jobs.

def stop_check_jobs_thread():
392def stop_check_jobs_thread():
393    """
394    Stop the job monitoring thread.
395    """
396    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
397    from meerschaum.utils.warnings import warn
398    if _check_loop_stop_thread is None:
399        return
400
401    _check_loop_stop_thread.cancel()
402
403    try:
404        if CHECK_JOBS_LOCK_PATH.exists():
405            CHECK_JOBS_LOCK_PATH.unlink()
406    except Exception as e:
407        warn(f"Failed to remove check jobs lock file:\n{e}")

Stop the job monitoring thread.