meerschaum.jobs

Higher-level utilities for managing meerschaum.utils.daemon.Daemon.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Higher-level utilities for managing `meerschaum.utils.daemon.Daemon`.
  7"""
  8
  9import pathlib
 10
 11import meerschaum as mrsm
 12from meerschaum.utils.typing import Dict, Optional, List, SuccessTuple
 13
 14from meerschaum.jobs._Job import Job, StopMonitoringLogs
 15from meerschaum.jobs._Executor import Executor
 16
 17__all__ = (
 18    'Job',
 19    'StopMonitoringLogs',
 20    'systemd',
 21    'get_jobs',
 22    'get_filtered_jobs',
 23    'get_restart_jobs',
 24    'get_running_jobs',
 25    'get_stopped_jobs',
 26    'get_paused_jobs',
 27    'get_restart_jobs',
 28    'make_executor',
 29    'Executor',
 30    'check_restart_jobs',
 31    'start_check_jobs_thread',
 32    'stop_check_jobs_thread',
 33)
 34
 35executor_types: List[str] = ['api', 'local']
 36
 37
 38def get_jobs(
 39    executor_keys: Optional[str] = None,
 40    include_hidden: bool = False,
 41    combine_local_and_systemd: bool = True,
 42    debug: bool = False,
 43) -> Dict[str, Job]:
 44    """
 45    Return a dictionary of the existing jobs.
 46
 47    Parameters
 48    ----------
 49    executor_keys: Optional[str], default None
 50        If provided, return remote jobs on the given API instance.
 51        Otherwise return local jobs.
 52
 53    include_hidden: bool, default False
 54        If `True`, include jobs with the `hidden` attribute.
 55
 56    Returns
 57    -------
 58    A dictionary mapping job names to jobs.
 59    """
 60    from meerschaum.connectors.parse import parse_executor_keys
 61    executor_keys = executor_keys or get_executor_keys_from_context()
 62
 63    include_local_and_system = (
 64        combine_local_and_systemd
 65        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 66        and get_executor_keys_from_context() == 'systemd'
 67    )
 68
 69    def _get_local_jobs():
 70        from meerschaum.utils.daemon import get_daemons
 71        daemons = get_daemons()
 72        jobs = {
 73            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 74            for daemon in daemons
 75        }
 76        return {
 77            name: job
 78            for name, job in jobs.items()
 79            if (include_hidden or not job.hidden) and not job._is_externally_managed
 80
 81        }
 82
 83    def _get_systemd_jobs():
 84        conn = mrsm.get_connector('systemd')
 85        jobs = conn.get_jobs(debug=debug)
 86        return {
 87            name: job
 88            for name, job in jobs.items()
 89            if include_hidden or not job.hidden
 90        }
 91
 92    if include_local_and_system:
 93        local_jobs = _get_local_jobs()
 94        systemd_jobs = _get_systemd_jobs()
 95        shared_jobs = set(local_jobs) & set(systemd_jobs)
 96        if shared_jobs:
 97            from meerschaum.utils.misc import items_str
 98            from meerschaum.utils.warnings import warn
 99            warn(
100                "Job"
101                + ('s' if len(shared_jobs) != 1 else '')
102                + f" {items_str(list(shared_jobs))} "
103                + "exist"
104                + ('s' if len(shared_jobs) == 1 else '')
105                + " in both `local` and `systemd`.",
106                stack=False,
107            )
108        return {**local_jobs, **systemd_jobs}
109
110    if executor_keys == 'local':
111        return _get_local_jobs()
112
113    if executor_keys == 'systemd':
114        return _get_systemd_jobs()
115
116    try:
117        _ = parse_executor_keys(executor_keys, construct=False)
118        conn = parse_executor_keys(executor_keys)
119        jobs = conn.get_jobs(debug=debug)
120        return {
121            name: job
122            for name, job in jobs.items()
123            if include_hidden or not job.hidden
124        }
125    except Exception:
126        return {}
127
128
129def get_filtered_jobs(
130    executor_keys: Optional[str] = None,
131    filter_list: Optional[List[str]] = None,
132    include_hidden: bool = False,
133    combine_local_and_systemd: bool = True,
134    warn: bool = False,
135    debug: bool = False,
136) -> Dict[str, Job]:
137    """
138    Return a list of jobs filtered by the user.
139    """
140    from meerschaum.utils.warnings import warn as _warn
141    jobs = get_jobs(
142        executor_keys,
143        include_hidden=True,
144        combine_local_and_systemd=combine_local_and_systemd,
145        debug=debug,
146    )
147    if not filter_list:
148        return {
149            name: job
150            for name, job in jobs.items()
151            if include_hidden or not job.hidden
152        }
153
154    jobs_to_return = {}
155    for name in filter_list:
156        job = jobs.get(name, None)
157        if job is None:
158            if warn:
159                _warn(
160                    f"Job '{name}' does not exist.",
161                    stack=False,
162                )
163            continue
164        jobs_to_return[name] = job
165
166    return jobs_to_return
167
168
169def get_restart_jobs(
170    executor_keys: Optional[str] = None,
171    jobs: Optional[Dict[str, Job]] = None,
172    include_hidden: bool = False,
173    combine_local_and_systemd: bool = True,
174    debug: bool = False,
175) -> Dict[str, Job]:
176    """
177    Return jobs which were created with `--restart` or `--loop`.
178    """
179    if jobs is None:
180        jobs = get_jobs(
181            executor_keys,
182            include_hidden=include_hidden,
183            combine_local_and_systemd=combine_local_and_systemd,
184            debug=debug,
185        )
186
187    return {
188        name: job
189        for name, job in jobs.items()
190        if job.restart
191    }
192
193
194def get_running_jobs(
195    executor_keys: Optional[str] = None,
196    jobs: Optional[Dict[str, Job]] = None,
197    include_hidden: bool = False,
198    combine_local_and_systemd: bool = True,
199    debug: bool = False,
200) -> Dict[str, Job]:
201    """
202    Return a dictionary of running jobs.
203    """
204    if jobs is None:
205        jobs = get_jobs(
206            executor_keys,
207            include_hidden=include_hidden,
208            combine_local_and_systemd=combine_local_and_systemd,
209            debug=debug,
210        )
211
212    return {
213        name: job
214        for name, job in jobs.items()
215        if job.status == 'running'
216    }
217
218
219def get_paused_jobs(
220    executor_keys: Optional[str] = None,
221    jobs: Optional[Dict[str, Job]] = None,
222    include_hidden: bool = False,
223    combine_local_and_systemd: bool = True,
224    debug: bool = False,
225) -> Dict[str, Job]:
226    """
227    Return a dictionary of paused jobs.
228    """
229    if jobs is None:
230        jobs = get_jobs(
231            executor_keys,
232            include_hidden=include_hidden,
233            combine_local_and_systemd=combine_local_and_systemd,
234            debug=debug,
235        )
236
237    return {
238        name: job
239        for name, job in jobs.items()
240        if job.status == 'paused'
241    }
242
243
244def get_stopped_jobs(
245    executor_keys: Optional[str] = None,
246    jobs: Optional[Dict[str, Job]] = None,
247    include_hidden: bool = False,
248    combine_local_and_systemd: bool = True,
249    debug: bool = False,
250) -> Dict[str, Job]:
251    """
252    Return a dictionary of stopped jobs.
253    """
254    if jobs is None:
255        jobs = get_jobs(
256            executor_keys,
257            include_hidden=include_hidden,
258            combine_local_and_systemd=combine_local_and_systemd,
259            debug=debug,
260        )
261
262    return {
263        name: job
264        for name, job in jobs.items()
265        if job.status == 'stopped'
266    }
267
268
269def make_executor(cls):
270    """
271    Register a class as an `Executor`.
272    """
273    import re
274    from meerschaum.connectors import make_connector
275    suffix_regex = r'executor$'
276    typ = re.sub(suffix_regex, '', cls.__name__.lower())
277    if typ not in executor_types:
278        executor_types.append(typ)
279    return make_connector(cls, _is_executor=True)
280
281
282def check_restart_jobs(
283    executor_keys: Optional[str] = 'local',
284    jobs: Optional[Dict[str, Job]] = None,
285    include_hidden: bool = True,
286    silent: bool = False,
287    debug: bool = False,
288) -> SuccessTuple:
289    """
290    Restart any stopped jobs which were created with `--restart`.
291
292    Parameters
293    ----------
294    executor_keys: Optional[str], default None
295        If provided, check jobs on the given remote API instance.
296        Otherwise check local jobs.
297
298    include_hidden: bool, default True
299        If `True`, include hidden jobs in the check.
300
301    silent: bool, default False
302        If `True`, do not print the restart success message.
303    """
304    from meerschaum.utils.misc import items_str
305
306    if jobs is None:
307        jobs = get_jobs(
308            executor_keys,
309            include_hidden=include_hidden,
310            combine_local_and_systemd=False,
311            debug=debug,
312        )
313
314    if not jobs:
315        return True, "No jobs to restart."
316
317    results = {}
318    for name, job in jobs.items():
319        check_success, check_msg = job.check_restart()
320        results[job.name] = (check_success, check_msg)
321        if not silent:
322            mrsm.pprint((check_success, check_msg))
323
324    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
325    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
326    success = len(success_names) == len(jobs)
327    msg = (
328        (
329            "Successfully restarted job"
330            + ('s' if len(success_names) != 1 else '')
331            + ' ' + items_str(success_names) + '.'
332        )
333        if success
334        else (
335            "Failed to restart job"
336            + ('s' if len(success_names) != 1 else '')
337            + ' ' + items_str(fail_names) + '.'
338        )
339    )
340    return success, msg
341
342
343def _check_restart_jobs_against_lock(*args, **kwargs):
344    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
345    fasteners = mrsm.attempt_import('fasteners')
346    lock = fasteners.InterProcessLock(CHECK_JOBS_LOCK_PATH)
347    with lock:
348        check_restart_jobs(*args, **kwargs)
349
350
351_check_loop_stop_thread = None
352def start_check_jobs_thread():
353    """
354    Start a thread to regularly monitor jobs.
355    """
356    import atexit
357    from functools import partial
358    from meerschaum.utils.threading import RepeatTimer
359    from meerschaum.config.static import STATIC_CONFIG
360
361    global _check_loop_stop_thread
362    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
363
364    _check_loop_stop_thread = RepeatTimer(
365        sleep_seconds,
366        partial(
367            _check_restart_jobs_against_lock,
368            silent=True,
369        )
370    )
371    _check_loop_stop_thread.daemon = True
372    atexit.register(stop_check_jobs_thread)
373
374    _check_loop_stop_thread.start()
375    return _check_loop_stop_thread
376
377
378def stop_check_jobs_thread():
379    """
380    Stop the job monitoring thread.
381    """
382    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
383    from meerschaum.utils.warnings import warn
384    if _check_loop_stop_thread is None:
385        return
386
387    _check_loop_stop_thread.cancel()
388
389    try:
390        if CHECK_JOBS_LOCK_PATH.exists():
391            CHECK_JOBS_LOCK_PATH.unlink()
392    except Exception as e:
393        warn(f"Failed to remove check jobs lock file:\n{e}")
394
395
396_context_keys = None
397def get_executor_keys_from_context() -> str:
398    """
399    If we are running on the host with the default root, default to `'systemd'`.
400    Otherwise return `'local'`.
401    """
402    global _context_keys
403
404    if _context_keys is not None:
405        return _context_keys
406
407    from meerschaum.config import get_config
408    from meerschaum.config.paths import ROOT_DIR_PATH, DEFAULT_ROOT_DIR_PATH
409    from meerschaum.utils.misc import is_systemd_available
410
411    configured_executor = get_config('meerschaum', 'executor', warn=False)
412    if configured_executor is not None:
413        return configured_executor
414
415    _context_keys = (
416        'systemd'
417        if is_systemd_available() and ROOT_DIR_PATH == DEFAULT_ROOT_DIR_PATH
418        else 'local'
419    )
420    return _context_keys
421
422
423def _install_healthcheck_job() -> SuccessTuple:
424    """
425    Install the systemd job which checks local jobs.
426    """
427    from meerschaum.config import get_config
428
429    enable_healthcheck = get_config('system', 'experimental', 'systemd_healthcheck')
430    if not enable_healthcheck:
431        return False, "Local healthcheck is disabled."
432
433    if get_executor_keys_from_context() != 'systemd':
434        return False, "Not running systemd."
435
436    job = Job(
437        '.local-healthcheck',
438        ['restart', 'jobs', '-e', 'local', '--loop', '--min-seconds', '60'],
439        executor_keys='systemd',
440    )
441    return job.start()
class Job:
 50class Job:
 51    """
 52    Manage a `meerschaum.utils.daemon.Daemon`, locally or remotely via the API.
 53    """
 54
 55    def __init__(
 56        self,
 57        name: str,
 58        sysargs: Union[List[str], str, None] = None,
 59        env: Optional[Dict[str, str]] = None,
 60        executor_keys: Optional[str] = None,
 61        delete_after_completion: bool = False,
 62        _properties: Optional[Dict[str, Any]] = None,
 63        _rotating_log=None,
 64        _stdin_file=None,
 65        _status_hook: Optional[Callable[[], str]] = None,
 66        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
 67        _externally_managed: bool = False,
 68    ):
 69        """
 70        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
 71
 72        Parameters
 73        ----------
 74        name: str
 75            The name of the job to be created.
 76            This will also be used as the Daemon ID.
 77
 78        sysargs: Union[List[str], str, None], default None
 79            The sysargs of the command to be executed, e.g. 'start api'.
 80
 81        env: Optional[Dict[str, str]], default None
 82            If provided, set these environment variables in the job's process.
 83
 84        executor_keys: Optional[str], default None
 85            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
 86
 87        delete_after_completion: bool, default False
 88            If `True`, delete this job when it has finished executing.
 89
 90        _properties: Optional[Dict[str, Any]], default None
 91            If provided, use this to patch the daemon's properties.
 92        """
 93        from meerschaum.utils.daemon import Daemon
 94        for char in BANNED_CHARS:
 95            if char in name:
 96                raise ValueError(f"Invalid name: ({char}) is not allowed.")
 97
 98        if isinstance(sysargs, str):
 99            sysargs = shlex.split(sysargs)
100
101        and_key = STATIC_CONFIG['system']['arguments']['and_key']
102        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
103        if sysargs:
104            sysargs = [
105                (arg if arg != escaped_and_key else and_key)
106                for arg in sysargs
107            ]
108
109        ### NOTE: 'local' and 'systemd' executors are being coalesced.
110        if executor_keys is None:
111            from meerschaum.jobs import get_executor_keys_from_context
112            executor_keys = get_executor_keys_from_context()
113
114        self.executor_keys = executor_keys
115        self.name = name
116        try:
117            self._daemon = (
118                Daemon(daemon_id=name)
119                if executor_keys == 'local'
120                else None
121            )
122        except Exception:
123            self._daemon = None
124
125        ### Handle any injected dependencies.
126        if _rotating_log is not None:
127            self._rotating_log = _rotating_log
128            if self._daemon is not None:
129                self._daemon._rotating_log = _rotating_log
130
131        if _stdin_file is not None:
132            self._stdin_file = _stdin_file
133            if self._daemon is not None:
134                self._daemon._stdin_file = _stdin_file
135                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
136
137        if _status_hook is not None:
138            self._status_hook = _status_hook
139
140        if _result_hook is not None:
141            self._result_hook = _result_hook
142
143        self._externally_managed = _externally_managed
144        self._properties_patch = _properties or {}
145        if _externally_managed:
146            self._properties_patch.update({'externally_managed': _externally_managed})
147
148        if env:
149            self._properties_patch.update({'env': env})
150
151        if delete_after_completion:
152            self._properties_patch.update({'delete_after_completion': delete_after_completion})
153
154        daemon_sysargs = (
155            self._daemon.properties.get('target', {}).get('args', [None])[0]
156            if self._daemon is not None
157            else None
158        )
159
160        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
161            warn("Given sysargs differ from existing sysargs.")
162
163        self._sysargs = [
164            arg
165            for arg in (daemon_sysargs or sysargs or [])
166            if arg not in ('-d', '--daemon')
167        ]
168        for restart_flag in RESTART_FLAGS:
169            if restart_flag in self._sysargs:
170                self._properties_patch.update({'restart': True})
171                break
172
173    @staticmethod
174    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
175        """
176        Build a `Job` from the PID of a running Meerschaum process.
177
178        Parameters
179        ----------
180        pid: int
181            The PID of the process.
182
183        executor_keys: Optional[str], default None
184            The executor keys to assign to the job.
185        """
186        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
187
188        psutil = mrsm.attempt_import('psutil')
189        try:
190            process = psutil.Process(pid)
191        except psutil.NoSuchProcess as e:
192            warn(f"Process with PID {pid} does not exist.", stack=False)
193            raise e
194
195        command_args = process.cmdline()
196        is_daemon = command_args[1] == '-c'
197
198        if is_daemon:
199            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
200            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
201            if root_dir is None:
202                from meerschaum.config.paths import ROOT_DIR_PATH
203                root_dir = ROOT_DIR_PATH
204            jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name
205            daemon_dir = jobs_dir / daemon_id
206            pid_file = daemon_dir / 'process.pid'
207
208            if pid_file.exists():
209                with open(pid_file, 'r', encoding='utf-8') as f:
210                    daemon_pid = int(f.read())
211
212                if pid != daemon_pid:
213                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
214            else:
215                raise EnvironmentError(f"Is job '{daemon_id}' running?")
216
217            return Job(daemon_id, executor_keys=executor_keys)
218
219        from meerschaum._internal.arguments._parse_arguments import parse_arguments
220        from meerschaum.utils.daemon import get_new_daemon_name
221
222        mrsm_ix = 0
223        for i, arg in enumerate(command_args):
224            if 'mrsm' in arg or 'meerschaum' in arg.lower():
225                mrsm_ix = i
226                break
227
228        sysargs = command_args[mrsm_ix+1:]
229        kwargs = parse_arguments(sysargs)
230        name = kwargs.get('name', get_new_daemon_name())
231        return Job(name, sysargs, executor_keys=executor_keys)
232
233    def start(self, debug: bool = False) -> SuccessTuple:
234        """
235        Start the job's daemon.
236        """
237        if self.executor is not None:
238            if not self.exists(debug=debug):
239                return self.executor.create_job(
240                    self.name,
241                    self.sysargs,
242                    properties=self.daemon.properties,
243                    debug=debug,
244                )
245            return self.executor.start_job(self.name, debug=debug)
246
247        if self.is_running():
248            return True, f"{self} is already running."
249
250        success, msg = self.daemon.run(
251            keep_daemon_output=(not self.delete_after_completion),
252            allow_dirty_run=True,
253        )
254        if not success:
255            return success, msg
256
257        return success, f"Started {self}."
258
259    def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
260        """
261        Stop the job's daemon.
262        """
263        if self.executor is not None:
264            return self.executor.stop_job(self.name, debug=debug)
265
266        if self.daemon.status == 'stopped':
267            if not self.restart:
268                return True, f"{self} is not running."
269            elif self.stop_time is not None:
270                return True, f"{self} will not restart until manually started."
271
272        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
273        if quit_success:
274            return quit_success, f"Stopped {self}."
275
276        warn(
277            f"Failed to gracefully quit {self}.",
278            stack=False,
279        )
280        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
281        if not kill_success:
282            return kill_success, kill_msg
283
284        return kill_success, f"Killed {self}."
285
286    def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
287        """
288        Pause the job's daemon.
289        """
290        if self.executor is not None:
291            return self.executor.pause_job(self.name, debug=debug)
292
293        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
294        if not pause_success:
295            return pause_success, pause_msg
296
297        return pause_success, f"Paused {self}."
298
299    def delete(self, debug: bool = False) -> SuccessTuple:
300        """
301        Delete the job and its daemon.
302        """
303        if self.executor is not None:
304            return self.executor.delete_job(self.name, debug=debug)
305
306        if self.is_running():
307            stop_success, stop_msg = self.stop()
308            if not stop_success:
309                return stop_success, stop_msg
310
311        cleanup_success, cleanup_msg = self.daemon.cleanup()
312        if not cleanup_success:
313            return cleanup_success, cleanup_msg
314
315        return cleanup_success, f"Deleted {self}."
316
317    def is_running(self) -> bool:
318        """
319        Determine whether the job's daemon is running.
320        """
321        return self.status == 'running'
322
323    def exists(self, debug: bool = False) -> bool:
324        """
325        Determine whether the job exists.
326        """
327        if self.executor is not None:
328            return self.executor.get_job_exists(self.name, debug=debug)
329
330        return self.daemon.path.exists()
331
332    def get_logs(self) -> Union[str, None]:
333        """
334        Return the output text of the job's daemon.
335        """
336        if self.executor is not None:
337            return self.executor.get_logs(self.name)
338
339        return self.daemon.log_text
340
341    def monitor_logs(
342        self,
343        callback_function: Callable[[str], None] = partial(print, end=''),
344        input_callback_function: Optional[Callable[[], str]] = None,
345        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
346        stop_event: Optional[asyncio.Event] = None,
347        stop_on_exit: bool = False,
348        strip_timestamps: bool = False,
349        accept_input: bool = True,
350        debug: bool = False,
351    ):
352        """
353        Monitor the job's log files and execute a callback on new lines.
354
355        Parameters
356        ----------
357        callback_function: Callable[[str], None], default partial(print, end='')
358            The callback to execute as new data comes in.
359            Defaults to printing the output directly to `stdout`.
360
361        input_callback_function: Optional[Callable[[], str]], default None
362            If provided, execute this callback when the daemon is blocking on stdin.
363            Defaults to `sys.stdin.readline()`.
364
365        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
366            If provided, execute this callback when the daemon stops.
367            The job's SuccessTuple will be passed to the callback.
368
369        stop_event: Optional[asyncio.Event], default None
370            If provided, stop monitoring when this event is set.
371            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
372            from within `callback_function` to stop monitoring.
373
374        stop_on_exit: bool, default False
375            If `True`, stop monitoring when the job stops.
376
377        strip_timestamps: bool, default False
378            If `True`, remove leading timestamps from lines.
379
380        accept_input: bool, default True
381            If `True`, accept input when the daemon blocks on stdin.
382        """
383        def default_input_callback_function():
384            return sys.stdin.readline()
385
386        if input_callback_function is None:
387            input_callback_function = default_input_callback_function
388
389        if self.executor is not None:
390            self.executor.monitor_logs(
391                self.name,
392                callback_function,
393                input_callback_function=input_callback_function,
394                stop_callback_function=stop_callback_function,
395                stop_on_exit=stop_on_exit,
396                accept_input=accept_input,
397                strip_timestamps=strip_timestamps,
398                debug=debug,
399            )
400            return
401
402        monitor_logs_coroutine = self.monitor_logs_async(
403            callback_function=callback_function,
404            input_callback_function=input_callback_function,
405            stop_callback_function=stop_callback_function,
406            stop_event=stop_event,
407            stop_on_exit=stop_on_exit,
408            strip_timestamps=strip_timestamps,
409            accept_input=accept_input,
410        )
411        return asyncio.run(monitor_logs_coroutine)
412
413    async def monitor_logs_async(
414        self,
415        callback_function: Callable[[str], None] = partial(print, end='', flush=True),
416        input_callback_function: Optional[Callable[[], str]] = None,
417        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
418        stop_event: Optional[asyncio.Event] = None,
419        stop_on_exit: bool = False,
420        strip_timestamps: bool = False,
421        accept_input: bool = True,
422        _logs_path: Optional[pathlib.Path] = None,
423        _log=None,
424        _stdin_file=None,
425        debug: bool = False,
426    ):
427        """
428        Monitor the job's log files and await a callback on new lines.
429
430        Parameters
431        ----------
432        callback_function: Callable[[str], None], default partial(print, end='')
433            The callback to execute as new data comes in.
434            Defaults to printing the output directly to `stdout`.
435
436        input_callback_function: Optional[Callable[[], str]], default None
437            If provided, execute this callback when the daemon is blocking on stdin.
438            Defaults to `sys.stdin.readline()`.
439
440        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
441            If provided, execute this callback when the daemon stops.
442            The job's SuccessTuple will be passed to the callback.
443
444        stop_event: Optional[asyncio.Event], default None
445            If provided, stop monitoring when this event is set.
446            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
447            from within `callback_function` to stop monitoring.
448
449        stop_on_exit: bool, default False
450            If `True`, stop monitoring when the job stops.
451
452        strip_timestamps: bool, default False
453            If `True`, remove leading timestamps from lines.
454
455        accept_input: bool, default True
456            If `True`, accept input when the daemon blocks on stdin.
457        """
458        def default_input_callback_function():
459            return sys.stdin.readline()
460
461        if input_callback_function is None:
462            input_callback_function = default_input_callback_function
463
464        if self.executor is not None:
465            await self.executor.monitor_logs_async(
466                self.name,
467                callback_function,
468                input_callback_function=input_callback_function,
469                stop_callback_function=stop_callback_function,
470                stop_on_exit=stop_on_exit,
471                strip_timestamps=strip_timestamps,
472                accept_input=accept_input,
473                debug=debug,
474            )
475            return
476
477        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
478
479        events = {
480            'user': stop_event,
481            'stopped': asyncio.Event(),
482        }
483        combined_event = asyncio.Event()
484        emitted_text = False
485        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
486
487        async def check_job_status():
488            nonlocal emitted_text
489            stopped_event = events.get('stopped', None)
490            if stopped_event is None:
491                return
492
493            sleep_time = 0.1
494            while sleep_time < 60:
495                if self.status == 'stopped':
496                    if not emitted_text:
497                        await asyncio.sleep(sleep_time)
498                        sleep_time = round(sleep_time * 1.1, 2)
499                        continue
500
501                    if stop_callback_function is not None:
502                        try:
503                            if asyncio.iscoroutinefunction(stop_callback_function):
504                                await stop_callback_function(self.result)
505                            else:
506                                stop_callback_function(self.result)
507                        except asyncio.exceptions.CancelledError:
508                            break
509                        except Exception:
510                            warn(traceback.format_exc())
511
512                    if stop_on_exit:
513                        events['stopped'].set()
514
515                    break
516                await asyncio.sleep(0.1)
517
518        async def check_blocking_on_input():
519            while True:
520                if not emitted_text or not self.is_blocking_on_stdin():
521                    try:
522                        await asyncio.sleep(0.1)
523                    except asyncio.exceptions.CancelledError:
524                        break
525                    continue
526
527                if not self.is_running():
528                    break
529
530                await emit_latest_lines()
531
532                try:
533                    print('', end='', flush=True)
534                    if asyncio.iscoroutinefunction(input_callback_function):
535                        data = await input_callback_function()
536                    else:
537                        data = input_callback_function()
538                except KeyboardInterrupt:
539                    break
540                if not data.endswith('\n'):
541                    data += '\n'
542
543                stdin_file.write(data)
544                await asyncio.sleep(0.1)
545
546        async def combine_events():
547            event_tasks = [
548                asyncio.create_task(event.wait())
549                for event in events.values()
550                if event is not None
551            ]
552            if not event_tasks:
553                return
554
555            try:
556                done, pending = await asyncio.wait(
557                    event_tasks,
558                    return_when=asyncio.FIRST_COMPLETED,
559                )
560                for task in pending:
561                    task.cancel()
562            except asyncio.exceptions.CancelledError:
563                pass
564            finally:
565                combined_event.set()
566
567        check_job_status_task = asyncio.create_task(check_job_status())
568        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
569        combine_events_task = asyncio.create_task(combine_events())
570
571        log = _log if _log is not None else self.daemon.rotating_log
572        lines_to_show = get_config('jobs', 'logs', 'lines_to_show')
573
574        async def emit_latest_lines():
575            nonlocal emitted_text
576            lines = log.readlines()
577            for line in lines[(-1 * lines_to_show):]:
578                if stop_event is not None and stop_event.is_set():
579                    return
580
581                if strip_timestamps:
582                    line = strip_timestamp_from_line(line)
583
584                try:
585                    if asyncio.iscoroutinefunction(callback_function):
586                        await callback_function(line)
587                    else:
588                        callback_function(line)
589                    emitted_text = True
590                except StopMonitoringLogs:
591                    return
592                except Exception:
593                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
594
595        await emit_latest_lines()
596
597        tasks = (
598            [check_job_status_task]
599            + ([check_blocking_on_input_task] if accept_input else [])
600            + [combine_events_task]
601        )
602        try:
603            _ = asyncio.gather(*tasks, return_exceptions=True)
604        except asyncio.exceptions.CancelledError:
605            raise
606        except Exception:
607            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
608
609        watchfiles = mrsm.attempt_import('watchfiles')
610        async for changes in watchfiles.awatch(
611            _logs_path or LOGS_RESOURCES_PATH,
612            stop_event=combined_event,
613        ):
614            for change in changes:
615                file_path_str = change[1]
616                file_path = pathlib.Path(file_path_str)
617                latest_subfile_path = log.get_latest_subfile_path()
618                if latest_subfile_path != file_path:
619                    continue
620
621                await emit_latest_lines()
622
623        await emit_latest_lines()
624
625    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
626        """
627        Return whether a job's daemon is blocking on stdin.
628        """
629        if self.executor is not None:
630            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
631
632        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()
633
634    def write_stdin(self, data):
635        """
636        Write to a job's daemon's `stdin`.
637        """
638        self.daemon.stdin_file.write(data)
639
640    @property
641    def executor(self) -> Union[Executor, None]:
642        """
643        If the job is remote, return the connector to the remote API instance.
644        """
645        return (
646            mrsm.get_connector(self.executor_keys)
647            if self.executor_keys != 'local'
648            else None
649        )
650
651    @property
652    def status(self) -> str:
653        """
654        Return the running status of the job's daemon.
655        """
656        if '_status_hook' in self.__dict__:
657            return self._status_hook()
658
659        if self.executor is not None:
660            return self.executor.get_job_status(self.name)
661
662        return self.daemon.status
663
664    @property
665    def pid(self) -> Union[int, None]:
666        """
667        Return the PID of the job's dameon.
668        """
669        if self.executor is not None:
670            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
671
672        return self.daemon.pid
673
674    @property
675    def restart(self) -> bool:
676        """
677        Return whether to restart a stopped job.
678        """
679        if self.executor is not None:
680            return self.executor.get_job_metadata(self.name).get('restart', False)
681
682        return self.daemon.properties.get('restart', False)
683
684    @property
685    def result(self) -> SuccessTuple:
686        """
687        Return the `SuccessTuple` when the job has terminated.
688        """
689        if self.is_running():
690            return True, f"{self} is running."
691
692        if '_result_hook' in self.__dict__:
693            return self._result_hook()
694
695        if self.executor is not None:
696            return (
697                self.executor.get_job_metadata(self.name)
698                .get('result', (False, "No result available."))
699            )
700
701        _result = self.daemon.properties.get('result', None)
702        if _result is None:
703            return False, "No result available."
704
705        return tuple(_result)
706
707    @property
708    def sysargs(self) -> List[str]:
709        """
710        Return the sysargs to use for the Daemon.
711        """
712        if self._sysargs:
713            return self._sysargs
714
715        if self.executor is not None:
716            return self.executor.get_job_metadata(self.name).get('sysargs', [])
717
718        target_args = self.daemon.target_args
719        if target_args is None:
720            return []
721        self._sysargs = target_args[0] if len(target_args) > 0 else []
722        return self._sysargs
723
724    @property
725    def daemon(self) -> 'Daemon':
726        """
727        Return the daemon which this job manages.
728        """
729        from meerschaum.utils.daemon import Daemon
730        if self._daemon is not None and self.executor is None and self._sysargs:
731            return self._daemon
732
733        remote_properties = (
734            {}
735            if self.executor is None
736            else self.executor.get_job_properties(self.name)
737        )
738        properties = {**remote_properties, **self._properties_patch}
739
740        self._daemon = Daemon(
741            target=entry,
742            target_args=[self._sysargs],
743            target_kw={},
744            daemon_id=self.name,
745            label=shlex.join(self._sysargs),
746            properties=properties,
747        )
748        if '_rotating_log' in self.__dict__:
749            self._daemon._rotating_log = self._rotating_log
750
751        if '_stdin_file' in self.__dict__:
752            self._daemon._stdin_file = self._stdin_file
753            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
754
755        return self._daemon
756
757    @property
758    def began(self) -> Union[datetime, None]:
759        """
760        The datetime when the job began running.
761        """
762        if self.executor is not None:
763            began_str = self.executor.get_job_began(self.name)
764            if began_str is None:
765                return None
766            return (
767                datetime.fromisoformat(began_str)
768                .astimezone(timezone.utc)
769                .replace(tzinfo=None)
770            )
771
772        began_str = self.daemon.properties.get('process', {}).get('began', None)
773        if began_str is None:
774            return None
775
776        return datetime.fromisoformat(began_str)
777
778    @property
779    def ended(self) -> Union[datetime, None]:
780        """
781        The datetime when the job stopped running.
782        """
783        if self.executor is not None:
784            ended_str = self.executor.get_job_ended(self.name)
785            if ended_str is None:
786                return None
787            return (
788                datetime.fromisoformat(ended_str)
789                .astimezone(timezone.utc)
790                .replace(tzinfo=None)
791            )
792
793        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
794        if ended_str is None:
795            return None
796
797        return datetime.fromisoformat(ended_str)
798
799    @property
800    def paused(self) -> Union[datetime, None]:
801        """
802        The datetime when the job was suspended while running.
803        """
804        if self.executor is not None:
805            paused_str = self.executor.get_job_paused(self.name)
806            if paused_str is None:
807                return None
808            return (
809                datetime.fromisoformat(paused_str)
810                .astimezone(timezone.utc)
811                .replace(tzinfo=None)
812            )
813
814        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
815        if paused_str is None:
816            return None
817
818        return datetime.fromisoformat(paused_str)
819
820    @property
821    def stop_time(self) -> Union[datetime, None]:
822        """
823        Return the timestamp when the job was manually stopped.
824        """
825        if self.executor is not None:
826            return self.executor.get_job_stop_time(self.name)
827
828        if not self.daemon.stop_path.exists():
829            return None
830
831        stop_data = self.daemon._read_stop_file()
832        if not stop_data:
833            return None
834
835        stop_time_str = stop_data.get('stop_time', None)
836        if not stop_time_str:
837            warn(f"Could not read stop time for {self}.")
838            return None
839
840        return datetime.fromisoformat(stop_time_str)
841
842    @property
843    def hidden(self) -> bool:
844        """
845        Return a bool indicating whether this job should be displayed.
846        """
847        return (
848            self.name.startswith('_')
849            or self.name.startswith('.')
850            or self._is_externally_managed
851        )
852
853    def check_restart(self) -> SuccessTuple:
854        """
855        If `restart` is `True` and the daemon is not running,
856        restart the job.
857        Do not restart if the job was manually stopped.
858        """
859        if self.is_running():
860            return True, f"{self} is running."
861
862        if not self.restart:
863            return True, f"{self} does not need to be restarted."
864
865        if self.stop_time is not None:
866            return True, f"{self} was manually stopped."
867
868        return self.start()
869
870    @property
871    def label(self) -> str:
872        """
873        Return the job's Daemon label (joined sysargs).
874        """
875        from meerschaum._internal.arguments import compress_pipeline_sysargs
876        sysargs = compress_pipeline_sysargs(self.sysargs)
877        return shlex.join(sysargs).replace(' + ', '\n+ ')
878
879    @property
880    def _externally_managed_file(self) -> pathlib.Path:
881        """
882        Return the path to the externally managed file.
883        """
884        return self.daemon.path / '.externally-managed'
885
886    def _set_externally_managed(self):
887        """
888        Set this job as externally managed.
889        """
890        self._externally_managed = True
891        try:
892            self._externally_managed_file.parent.mkdir(exist_ok=True, parents=True)
893            self._externally_managed_file.touch()
894        except Exception as e:
895            warn(e)
896
897    @property
898    def _is_externally_managed(self) -> bool:
899        """
900        Return whether this job is externally managed.
901        """
902        return self.executor_keys in (None, 'local') and (
903            self._externally_managed or self._externally_managed_file.exists()
904        )
905
906    @property
907    def env(self) -> Dict[str, str]:
908        """
909        Return the environment variables to set for the job's process.
910        """
911        if '_env' in self.__dict__:
912            return self.__dict__['_env']
913
914        _env = self.daemon.properties.get('env', {})
915        default_env = {
916            'PYTHONUNBUFFERED': '1',
917            'LINES': str(get_config('jobs', 'terminal', 'lines')),
918            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
919        }
920        self._env = {**default_env, **_env}
921        return self._env
922
923    @property
924    def delete_after_completion(self) -> bool:
925        """
926        Return whether this job is configured to delete itself after completion.
927        """
928        if '_delete_after_completion' in self.__dict__:
929            return self.__dict__.get('_delete_after_completion', False)
930
931        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
932        return self._delete_after_completion
933
934    def __str__(self) -> str:
935        sysargs = self.sysargs
936        sysargs_str = shlex.join(sysargs) if sysargs else ''
937        job_str = f'Job("{self.name}"'
938        if sysargs_str:
939            job_str += f', "{sysargs_str}"'
940
941        job_str += ')'
942        return job_str
943
944    def __repr__(self) -> str:
945        return str(self)
946
947    def __hash__(self) -> int:
948        return hash(self.name)

Manage a meerschaum.utils.daemon.Daemon, locally or remotely via the API.

Job( name: str, sysargs: Union[List[str], str, NoneType] = None, env: Optional[Dict[str, str]] = None, executor_keys: Optional[str] = None, delete_after_completion: bool = False, _properties: Optional[Dict[str, Any]] = None, _rotating_log=None, _stdin_file=None, _status_hook: Optional[Callable[[], str]] = None, _result_hook: Optional[Callable[[], Tuple[bool, str]]] = None, _externally_managed: bool = False)
 55    def __init__(
 56        self,
 57        name: str,
 58        sysargs: Union[List[str], str, None] = None,
 59        env: Optional[Dict[str, str]] = None,
 60        executor_keys: Optional[str] = None,
 61        delete_after_completion: bool = False,
 62        _properties: Optional[Dict[str, Any]] = None,
 63        _rotating_log=None,
 64        _stdin_file=None,
 65        _status_hook: Optional[Callable[[], str]] = None,
 66        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
 67        _externally_managed: bool = False,
 68    ):
 69        """
 70        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
 71
 72        Parameters
 73        ----------
 74        name: str
 75            The name of the job to be created.
 76            This will also be used as the Daemon ID.
 77
 78        sysargs: Union[List[str], str, None], default None
 79            The sysargs of the command to be executed, e.g. 'start api'.
 80
 81        env: Optional[Dict[str, str]], default None
 82            If provided, set these environment variables in the job's process.
 83
 84        executor_keys: Optional[str], default None
 85            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
 86
 87        delete_after_completion: bool, default False
 88            If `True`, delete this job when it has finished executing.
 89
 90        _properties: Optional[Dict[str, Any]], default None
 91            If provided, use this to patch the daemon's properties.
 92        """
 93        from meerschaum.utils.daemon import Daemon
 94        for char in BANNED_CHARS:
 95            if char in name:
 96                raise ValueError(f"Invalid name: ({char}) is not allowed.")
 97
 98        if isinstance(sysargs, str):
 99            sysargs = shlex.split(sysargs)
100
101        and_key = STATIC_CONFIG['system']['arguments']['and_key']
102        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
103        if sysargs:
104            sysargs = [
105                (arg if arg != escaped_and_key else and_key)
106                for arg in sysargs
107            ]
108
109        ### NOTE: 'local' and 'systemd' executors are being coalesced.
110        if executor_keys is None:
111            from meerschaum.jobs import get_executor_keys_from_context
112            executor_keys = get_executor_keys_from_context()
113
114        self.executor_keys = executor_keys
115        self.name = name
116        try:
117            self._daemon = (
118                Daemon(daemon_id=name)
119                if executor_keys == 'local'
120                else None
121            )
122        except Exception:
123            self._daemon = None
124
125        ### Handle any injected dependencies.
126        if _rotating_log is not None:
127            self._rotating_log = _rotating_log
128            if self._daemon is not None:
129                self._daemon._rotating_log = _rotating_log
130
131        if _stdin_file is not None:
132            self._stdin_file = _stdin_file
133            if self._daemon is not None:
134                self._daemon._stdin_file = _stdin_file
135                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
136
137        if _status_hook is not None:
138            self._status_hook = _status_hook
139
140        if _result_hook is not None:
141            self._result_hook = _result_hook
142
143        self._externally_managed = _externally_managed
144        self._properties_patch = _properties or {}
145        if _externally_managed:
146            self._properties_patch.update({'externally_managed': _externally_managed})
147
148        if env:
149            self._properties_patch.update({'env': env})
150
151        if delete_after_completion:
152            self._properties_patch.update({'delete_after_completion': delete_after_completion})
153
154        daemon_sysargs = (
155            self._daemon.properties.get('target', {}).get('args', [None])[0]
156            if self._daemon is not None
157            else None
158        )
159
160        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
161            warn("Given sysargs differ from existing sysargs.")
162
163        self._sysargs = [
164            arg
165            for arg in (daemon_sysargs or sysargs or [])
166            if arg not in ('-d', '--daemon')
167        ]
168        for restart_flag in RESTART_FLAGS:
169            if restart_flag in self._sysargs:
170                self._properties_patch.update({'restart': True})
171                break

Create a new job to manage a meerschaum.utils.daemon.Daemon.

Parameters
  • name (str): The name of the job to be created. This will also be used as the Daemon ID.
  • sysargs (Union[List[str], str, None], default None): The sysargs of the command to be executed, e.g. 'start api'.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the job's process.
  • executor_keys (Optional[str], default None): If provided, execute the job remotely on an API instance, e.g. 'api:main'.
  • delete_after_completion (bool, default False): If True, delete this job when it has finished executing.
  • _properties (Optional[Dict[str, Any]], default None): If provided, use this to patch the daemon's properties.
executor_keys
name
@staticmethod
def from_pid( pid: int, executor_keys: Optional[str] = None) -> Job:
173    @staticmethod
174    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
175        """
176        Build a `Job` from the PID of a running Meerschaum process.
177
178        Parameters
179        ----------
180        pid: int
181            The PID of the process.
182
183        executor_keys: Optional[str], default None
184            The executor keys to assign to the job.
185        """
186        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
187
188        psutil = mrsm.attempt_import('psutil')
189        try:
190            process = psutil.Process(pid)
191        except psutil.NoSuchProcess as e:
192            warn(f"Process with PID {pid} does not exist.", stack=False)
193            raise e
194
195        command_args = process.cmdline()
196        is_daemon = command_args[1] == '-c'
197
198        if is_daemon:
199            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
200            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
201            if root_dir is None:
202                from meerschaum.config.paths import ROOT_DIR_PATH
203                root_dir = ROOT_DIR_PATH
204            jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name
205            daemon_dir = jobs_dir / daemon_id
206            pid_file = daemon_dir / 'process.pid'
207
208            if pid_file.exists():
209                with open(pid_file, 'r', encoding='utf-8') as f:
210                    daemon_pid = int(f.read())
211
212                if pid != daemon_pid:
213                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
214            else:
215                raise EnvironmentError(f"Is job '{daemon_id}' running?")
216
217            return Job(daemon_id, executor_keys=executor_keys)
218
219        from meerschaum._internal.arguments._parse_arguments import parse_arguments
220        from meerschaum.utils.daemon import get_new_daemon_name
221
222        mrsm_ix = 0
223        for i, arg in enumerate(command_args):
224            if 'mrsm' in arg or 'meerschaum' in arg.lower():
225                mrsm_ix = i
226                break
227
228        sysargs = command_args[mrsm_ix+1:]
229        kwargs = parse_arguments(sysargs)
230        name = kwargs.get('name', get_new_daemon_name())
231        return Job(name, sysargs, executor_keys=executor_keys)

Build a Job from the PID of a running Meerschaum process.

Parameters
  • pid (int): The PID of the process.
  • executor_keys (Optional[str], default None): The executor keys to assign to the job.
def start(self, debug: bool = False) -> Tuple[bool, str]:
233    def start(self, debug: bool = False) -> SuccessTuple:
234        """
235        Start the job's daemon.
236        """
237        if self.executor is not None:
238            if not self.exists(debug=debug):
239                return self.executor.create_job(
240                    self.name,
241                    self.sysargs,
242                    properties=self.daemon.properties,
243                    debug=debug,
244                )
245            return self.executor.start_job(self.name, debug=debug)
246
247        if self.is_running():
248            return True, f"{self} is already running."
249
250        success, msg = self.daemon.run(
251            keep_daemon_output=(not self.delete_after_completion),
252            allow_dirty_run=True,
253        )
254        if not success:
255            return success, msg
256
257        return success, f"Started {self}."

Start the job's daemon.

def stop( self, timeout_seconds: Optional[int] = None, debug: bool = False) -> Tuple[bool, str]:
259    def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
260        """
261        Stop the job's daemon.
262        """
263        if self.executor is not None:
264            return self.executor.stop_job(self.name, debug=debug)
265
266        if self.daemon.status == 'stopped':
267            if not self.restart:
268                return True, f"{self} is not running."
269            elif self.stop_time is not None:
270                return True, f"{self} will not restart until manually started."
271
272        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
273        if quit_success:
274            return quit_success, f"Stopped {self}."
275
276        warn(
277            f"Failed to gracefully quit {self}.",
278            stack=False,
279        )
280        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
281        if not kill_success:
282            return kill_success, kill_msg
283
284        return kill_success, f"Killed {self}."

Stop the job's daemon.

def pause( self, timeout_seconds: Optional[int] = None, debug: bool = False) -> Tuple[bool, str]:
286    def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
287        """
288        Pause the job's daemon.
289        """
290        if self.executor is not None:
291            return self.executor.pause_job(self.name, debug=debug)
292
293        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
294        if not pause_success:
295            return pause_success, pause_msg
296
297        return pause_success, f"Paused {self}."

Pause the job's daemon.

def delete(self, debug: bool = False) -> Tuple[bool, str]:
299    def delete(self, debug: bool = False) -> SuccessTuple:
300        """
301        Delete the job and its daemon.
302        """
303        if self.executor is not None:
304            return self.executor.delete_job(self.name, debug=debug)
305
306        if self.is_running():
307            stop_success, stop_msg = self.stop()
308            if not stop_success:
309                return stop_success, stop_msg
310
311        cleanup_success, cleanup_msg = self.daemon.cleanup()
312        if not cleanup_success:
313            return cleanup_success, cleanup_msg
314
315        return cleanup_success, f"Deleted {self}."

Delete the job and its daemon.

def is_running(self) -> bool:
317    def is_running(self) -> bool:
318        """
319        Determine whether the job's daemon is running.
320        """
321        return self.status == 'running'

Determine whether the job's daemon is running.

def exists(self, debug: bool = False) -> bool:
323    def exists(self, debug: bool = False) -> bool:
324        """
325        Determine whether the job exists.
326        """
327        if self.executor is not None:
328            return self.executor.get_job_exists(self.name, debug=debug)
329
330        return self.daemon.path.exists()

Determine whether the job exists.

def get_logs(self) -> Optional[str]:
332    def get_logs(self) -> Union[str, None]:
333        """
334        Return the output text of the job's daemon.
335        """
336        if self.executor is not None:
337            return self.executor.get_logs(self.name)
338
339        return self.daemon.log_text

Return the output text of the job's daemon.

def monitor_logs( self, callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end=''), input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
341    def monitor_logs(
342        self,
343        callback_function: Callable[[str], None] = partial(print, end=''),
344        input_callback_function: Optional[Callable[[], str]] = None,
345        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
346        stop_event: Optional[asyncio.Event] = None,
347        stop_on_exit: bool = False,
348        strip_timestamps: bool = False,
349        accept_input: bool = True,
350        debug: bool = False,
351    ):
352        """
353        Monitor the job's log files and execute a callback on new lines.
354
355        Parameters
356        ----------
357        callback_function: Callable[[str], None], default partial(print, end='')
358            The callback to execute as new data comes in.
359            Defaults to printing the output directly to `stdout`.
360
361        input_callback_function: Optional[Callable[[], str]], default None
362            If provided, execute this callback when the daemon is blocking on stdin.
363            Defaults to `sys.stdin.readline()`.
364
365        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
366            If provided, execute this callback when the daemon stops.
367            The job's SuccessTuple will be passed to the callback.
368
369        stop_event: Optional[asyncio.Event], default None
370            If provided, stop monitoring when this event is set.
371            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
372            from within `callback_function` to stop monitoring.
373
374        stop_on_exit: bool, default False
375            If `True`, stop monitoring when the job stops.
376
377        strip_timestamps: bool, default False
378            If `True`, remove leading timestamps from lines.
379
380        accept_input: bool, default True
381            If `True`, accept input when the daemon blocks on stdin.
382        """
383        def default_input_callback_function():
384            return sys.stdin.readline()
385
386        if input_callback_function is None:
387            input_callback_function = default_input_callback_function
388
389        if self.executor is not None:
390            self.executor.monitor_logs(
391                self.name,
392                callback_function,
393                input_callback_function=input_callback_function,
394                stop_callback_function=stop_callback_function,
395                stop_on_exit=stop_on_exit,
396                accept_input=accept_input,
397                strip_timestamps=strip_timestamps,
398                debug=debug,
399            )
400            return
401
402        monitor_logs_coroutine = self.monitor_logs_async(
403            callback_function=callback_function,
404            input_callback_function=input_callback_function,
405            stop_callback_function=stop_callback_function,
406            stop_event=stop_event,
407            stop_on_exit=stop_on_exit,
408            strip_timestamps=strip_timestamps,
409            accept_input=accept_input,
410        )
411        return asyncio.run(monitor_logs_coroutine)

Monitor the job's log files and execute a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default partial(print, end='')): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
async def monitor_logs_async( self, callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='', flush=True), input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, _logs_path: Optional[pathlib.Path] = None, _log=None, _stdin_file=None, debug: bool = False):
413    async def monitor_logs_async(
414        self,
415        callback_function: Callable[[str], None] = partial(print, end='', flush=True),
416        input_callback_function: Optional[Callable[[], str]] = None,
417        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
418        stop_event: Optional[asyncio.Event] = None,
419        stop_on_exit: bool = False,
420        strip_timestamps: bool = False,
421        accept_input: bool = True,
422        _logs_path: Optional[pathlib.Path] = None,
423        _log=None,
424        _stdin_file=None,
425        debug: bool = False,
426    ):
427        """
428        Monitor the job's log files and await a callback on new lines.
429
430        Parameters
431        ----------
432        callback_function: Callable[[str], None], default partial(print, end='')
433            The callback to execute as new data comes in.
434            Defaults to printing the output directly to `stdout`.
435
436        input_callback_function: Optional[Callable[[], str]], default None
437            If provided, execute this callback when the daemon is blocking on stdin.
438            Defaults to `sys.stdin.readline()`.
439
440        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
441            If provided, execute this callback when the daemon stops.
442            The job's SuccessTuple will be passed to the callback.
443
444        stop_event: Optional[asyncio.Event], default None
445            If provided, stop monitoring when this event is set.
446            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
447            from within `callback_function` to stop monitoring.
448
449        stop_on_exit: bool, default False
450            If `True`, stop monitoring when the job stops.
451
452        strip_timestamps: bool, default False
453            If `True`, remove leading timestamps from lines.
454
455        accept_input: bool, default True
456            If `True`, accept input when the daemon blocks on stdin.
457        """
458        def default_input_callback_function():
459            return sys.stdin.readline()
460
461        if input_callback_function is None:
462            input_callback_function = default_input_callback_function
463
464        if self.executor is not None:
465            await self.executor.monitor_logs_async(
466                self.name,
467                callback_function,
468                input_callback_function=input_callback_function,
469                stop_callback_function=stop_callback_function,
470                stop_on_exit=stop_on_exit,
471                strip_timestamps=strip_timestamps,
472                accept_input=accept_input,
473                debug=debug,
474            )
475            return
476
477        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
478
479        events = {
480            'user': stop_event,
481            'stopped': asyncio.Event(),
482        }
483        combined_event = asyncio.Event()
484        emitted_text = False
485        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
486
487        async def check_job_status():
488            nonlocal emitted_text
489            stopped_event = events.get('stopped', None)
490            if stopped_event is None:
491                return
492
493            sleep_time = 0.1
494            while sleep_time < 60:
495                if self.status == 'stopped':
496                    if not emitted_text:
497                        await asyncio.sleep(sleep_time)
498                        sleep_time = round(sleep_time * 1.1, 2)
499                        continue
500
501                    if stop_callback_function is not None:
502                        try:
503                            if asyncio.iscoroutinefunction(stop_callback_function):
504                                await stop_callback_function(self.result)
505                            else:
506                                stop_callback_function(self.result)
507                        except asyncio.exceptions.CancelledError:
508                            break
509                        except Exception:
510                            warn(traceback.format_exc())
511
512                    if stop_on_exit:
513                        events['stopped'].set()
514
515                    break
516                await asyncio.sleep(0.1)
517
518        async def check_blocking_on_input():
519            while True:
520                if not emitted_text or not self.is_blocking_on_stdin():
521                    try:
522                        await asyncio.sleep(0.1)
523                    except asyncio.exceptions.CancelledError:
524                        break
525                    continue
526
527                if not self.is_running():
528                    break
529
530                await emit_latest_lines()
531
532                try:
533                    print('', end='', flush=True)
534                    if asyncio.iscoroutinefunction(input_callback_function):
535                        data = await input_callback_function()
536                    else:
537                        data = input_callback_function()
538                except KeyboardInterrupt:
539                    break
540                if not data.endswith('\n'):
541                    data += '\n'
542
543                stdin_file.write(data)
544                await asyncio.sleep(0.1)
545
546        async def combine_events():
547            event_tasks = [
548                asyncio.create_task(event.wait())
549                for event in events.values()
550                if event is not None
551            ]
552            if not event_tasks:
553                return
554
555            try:
556                done, pending = await asyncio.wait(
557                    event_tasks,
558                    return_when=asyncio.FIRST_COMPLETED,
559                )
560                for task in pending:
561                    task.cancel()
562            except asyncio.exceptions.CancelledError:
563                pass
564            finally:
565                combined_event.set()
566
567        check_job_status_task = asyncio.create_task(check_job_status())
568        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
569        combine_events_task = asyncio.create_task(combine_events())
570
571        log = _log if _log is not None else self.daemon.rotating_log
572        lines_to_show = get_config('jobs', 'logs', 'lines_to_show')
573
574        async def emit_latest_lines():
575            nonlocal emitted_text
576            lines = log.readlines()
577            for line in lines[(-1 * lines_to_show):]:
578                if stop_event is not None and stop_event.is_set():
579                    return
580
581                if strip_timestamps:
582                    line = strip_timestamp_from_line(line)
583
584                try:
585                    if asyncio.iscoroutinefunction(callback_function):
586                        await callback_function(line)
587                    else:
588                        callback_function(line)
589                    emitted_text = True
590                except StopMonitoringLogs:
591                    return
592                except Exception:
593                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
594
595        await emit_latest_lines()
596
597        tasks = (
598            [check_job_status_task]
599            + ([check_blocking_on_input_task] if accept_input else [])
600            + [combine_events_task]
601        )
602        try:
603            _ = asyncio.gather(*tasks, return_exceptions=True)
604        except asyncio.exceptions.CancelledError:
605            raise
606        except Exception:
607            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
608
609        watchfiles = mrsm.attempt_import('watchfiles')
610        async for changes in watchfiles.awatch(
611            _logs_path or LOGS_RESOURCES_PATH,
612            stop_event=combined_event,
613        ):
614            for change in changes:
615                file_path_str = change[1]
616                file_path = pathlib.Path(file_path_str)
617                latest_subfile_path = log.get_latest_subfile_path()
618                if latest_subfile_path != file_path:
619                    continue
620
621                await emit_latest_lines()
622
623        await emit_latest_lines()

Monitor the job's log files and await a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default partial(print, end='')): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
def is_blocking_on_stdin(self, debug: bool = False) -> bool:
625    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
626        """
627        Return whether a job's daemon is blocking on stdin.
628        """
629        if self.executor is not None:
630            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
631
632        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()

Return whether a job's daemon is blocking on stdin.

def write_stdin(self, data):
634    def write_stdin(self, data):
635        """
636        Write to a job's daemon's `stdin`.
637        """
638        self.daemon.stdin_file.write(data)

Write to a job's daemon's stdin.

executor: Optional[Executor]
640    @property
641    def executor(self) -> Union[Executor, None]:
642        """
643        If the job is remote, return the connector to the remote API instance.
644        """
645        return (
646            mrsm.get_connector(self.executor_keys)
647            if self.executor_keys != 'local'
648            else None
649        )

If the job is remote, return the connector to the remote API instance.

status: str
651    @property
652    def status(self) -> str:
653        """
654        Return the running status of the job's daemon.
655        """
656        if '_status_hook' in self.__dict__:
657            return self._status_hook()
658
659        if self.executor is not None:
660            return self.executor.get_job_status(self.name)
661
662        return self.daemon.status

Return the running status of the job's daemon.

pid: Optional[int]
664    @property
665    def pid(self) -> Union[int, None]:
666        """
667        Return the PID of the job's dameon.
668        """
669        if self.executor is not None:
670            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
671
672        return self.daemon.pid

Return the PID of the job's dameon.

restart: bool
674    @property
675    def restart(self) -> bool:
676        """
677        Return whether to restart a stopped job.
678        """
679        if self.executor is not None:
680            return self.executor.get_job_metadata(self.name).get('restart', False)
681
682        return self.daemon.properties.get('restart', False)

Return whether to restart a stopped job.

result: Tuple[bool, str]
684    @property
685    def result(self) -> SuccessTuple:
686        """
687        Return the `SuccessTuple` when the job has terminated.
688        """
689        if self.is_running():
690            return True, f"{self} is running."
691
692        if '_result_hook' in self.__dict__:
693            return self._result_hook()
694
695        if self.executor is not None:
696            return (
697                self.executor.get_job_metadata(self.name)
698                .get('result', (False, "No result available."))
699            )
700
701        _result = self.daemon.properties.get('result', None)
702        if _result is None:
703            return False, "No result available."
704
705        return tuple(_result)

Return the SuccessTuple when the job has terminated.

sysargs: List[str]
707    @property
708    def sysargs(self) -> List[str]:
709        """
710        Return the sysargs to use for the Daemon.
711        """
712        if self._sysargs:
713            return self._sysargs
714
715        if self.executor is not None:
716            return self.executor.get_job_metadata(self.name).get('sysargs', [])
717
718        target_args = self.daemon.target_args
719        if target_args is None:
720            return []
721        self._sysargs = target_args[0] if len(target_args) > 0 else []
722        return self._sysargs

Return the sysargs to use for the Daemon.

daemon: "'Daemon'"
724    @property
725    def daemon(self) -> 'Daemon':
726        """
727        Return the daemon which this job manages.
728        """
729        from meerschaum.utils.daemon import Daemon
730        if self._daemon is not None and self.executor is None and self._sysargs:
731            return self._daemon
732
733        remote_properties = (
734            {}
735            if self.executor is None
736            else self.executor.get_job_properties(self.name)
737        )
738        properties = {**remote_properties, **self._properties_patch}
739
740        self._daemon = Daemon(
741            target=entry,
742            target_args=[self._sysargs],
743            target_kw={},
744            daemon_id=self.name,
745            label=shlex.join(self._sysargs),
746            properties=properties,
747        )
748        if '_rotating_log' in self.__dict__:
749            self._daemon._rotating_log = self._rotating_log
750
751        if '_stdin_file' in self.__dict__:
752            self._daemon._stdin_file = self._stdin_file
753            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
754
755        return self._daemon

Return the daemon which this job manages.

began: Optional[datetime.datetime]
757    @property
758    def began(self) -> Union[datetime, None]:
759        """
760        The datetime when the job began running.
761        """
762        if self.executor is not None:
763            began_str = self.executor.get_job_began(self.name)
764            if began_str is None:
765                return None
766            return (
767                datetime.fromisoformat(began_str)
768                .astimezone(timezone.utc)
769                .replace(tzinfo=None)
770            )
771
772        began_str = self.daemon.properties.get('process', {}).get('began', None)
773        if began_str is None:
774            return None
775
776        return datetime.fromisoformat(began_str)

The datetime when the job began running.

ended: Optional[datetime.datetime]
778    @property
779    def ended(self) -> Union[datetime, None]:
780        """
781        The datetime when the job stopped running.
782        """
783        if self.executor is not None:
784            ended_str = self.executor.get_job_ended(self.name)
785            if ended_str is None:
786                return None
787            return (
788                datetime.fromisoformat(ended_str)
789                .astimezone(timezone.utc)
790                .replace(tzinfo=None)
791            )
792
793        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
794        if ended_str is None:
795            return None
796
797        return datetime.fromisoformat(ended_str)

The datetime when the job stopped running.

paused: Optional[datetime.datetime]
799    @property
800    def paused(self) -> Union[datetime, None]:
801        """
802        The datetime when the job was suspended while running.
803        """
804        if self.executor is not None:
805            paused_str = self.executor.get_job_paused(self.name)
806            if paused_str is None:
807                return None
808            return (
809                datetime.fromisoformat(paused_str)
810                .astimezone(timezone.utc)
811                .replace(tzinfo=None)
812            )
813
814        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
815        if paused_str is None:
816            return None
817
818        return datetime.fromisoformat(paused_str)

The datetime when the job was suspended while running.

stop_time: Optional[datetime.datetime]
820    @property
821    def stop_time(self) -> Union[datetime, None]:
822        """
823        Return the timestamp when the job was manually stopped.
824        """
825        if self.executor is not None:
826            return self.executor.get_job_stop_time(self.name)
827
828        if not self.daemon.stop_path.exists():
829            return None
830
831        stop_data = self.daemon._read_stop_file()
832        if not stop_data:
833            return None
834
835        stop_time_str = stop_data.get('stop_time', None)
836        if not stop_time_str:
837            warn(f"Could not read stop time for {self}.")
838            return None
839
840        return datetime.fromisoformat(stop_time_str)

Return the timestamp when the job was manually stopped.

hidden: bool
842    @property
843    def hidden(self) -> bool:
844        """
845        Return a bool indicating whether this job should be displayed.
846        """
847        return (
848            self.name.startswith('_')
849            or self.name.startswith('.')
850            or self._is_externally_managed
851        )

Return a bool indicating whether this job should be displayed.

def check_restart(self) -> Tuple[bool, str]:
853    def check_restart(self) -> SuccessTuple:
854        """
855        If `restart` is `True` and the daemon is not running,
856        restart the job.
857        Do not restart if the job was manually stopped.
858        """
859        if self.is_running():
860            return True, f"{self} is running."
861
862        if not self.restart:
863            return True, f"{self} does not need to be restarted."
864
865        if self.stop_time is not None:
866            return True, f"{self} was manually stopped."
867
868        return self.start()

If restart is True and the daemon is not running, restart the job. Do not restart if the job was manually stopped.

label: str
870    @property
871    def label(self) -> str:
872        """
873        Return the job's Daemon label (joined sysargs).
874        """
875        from meerschaum._internal.arguments import compress_pipeline_sysargs
876        sysargs = compress_pipeline_sysargs(self.sysargs)
877        return shlex.join(sysargs).replace(' + ', '\n+ ')

Return the job's Daemon label (joined sysargs).

env: Dict[str, str]
906    @property
907    def env(self) -> Dict[str, str]:
908        """
909        Return the environment variables to set for the job's process.
910        """
911        if '_env' in self.__dict__:
912            return self.__dict__['_env']
913
914        _env = self.daemon.properties.get('env', {})
915        default_env = {
916            'PYTHONUNBUFFERED': '1',
917            'LINES': str(get_config('jobs', 'terminal', 'lines')),
918            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
919        }
920        self._env = {**default_env, **_env}
921        return self._env

Return the environment variables to set for the job's process.

delete_after_completion: bool
923    @property
924    def delete_after_completion(self) -> bool:
925        """
926        Return whether this job is configured to delete itself after completion.
927        """
928        if '_delete_after_completion' in self.__dict__:
929            return self.__dict__.get('_delete_after_completion', False)
930
931        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
932        return self._delete_after_completion

Return whether this job is configured to delete itself after completion.

class StopMonitoringLogs(builtins.Exception):
44class StopMonitoringLogs(Exception):
45    """
46    Raise this exception to stop the logs monitoring.
47    """

Raise this exception to stop the logs monitoring.

def get_jobs( executor_keys: Optional[str] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
 39def get_jobs(
 40    executor_keys: Optional[str] = None,
 41    include_hidden: bool = False,
 42    combine_local_and_systemd: bool = True,
 43    debug: bool = False,
 44) -> Dict[str, Job]:
 45    """
 46    Return a dictionary of the existing jobs.
 47
 48    Parameters
 49    ----------
 50    executor_keys: Optional[str], default None
 51        If provided, return remote jobs on the given API instance.
 52        Otherwise return local jobs.
 53
 54    include_hidden: bool, default False
 55        If `True`, include jobs with the `hidden` attribute.
 56
 57    Returns
 58    -------
 59    A dictionary mapping job names to jobs.
 60    """
 61    from meerschaum.connectors.parse import parse_executor_keys
 62    executor_keys = executor_keys or get_executor_keys_from_context()
 63
 64    include_local_and_system = (
 65        combine_local_and_systemd
 66        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 67        and get_executor_keys_from_context() == 'systemd'
 68    )
 69
 70    def _get_local_jobs():
 71        from meerschaum.utils.daemon import get_daemons
 72        daemons = get_daemons()
 73        jobs = {
 74            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 75            for daemon in daemons
 76        }
 77        return {
 78            name: job
 79            for name, job in jobs.items()
 80            if (include_hidden or not job.hidden) and not job._is_externally_managed
 81
 82        }
 83
 84    def _get_systemd_jobs():
 85        conn = mrsm.get_connector('systemd')
 86        jobs = conn.get_jobs(debug=debug)
 87        return {
 88            name: job
 89            for name, job in jobs.items()
 90            if include_hidden or not job.hidden
 91        }
 92
 93    if include_local_and_system:
 94        local_jobs = _get_local_jobs()
 95        systemd_jobs = _get_systemd_jobs()
 96        shared_jobs = set(local_jobs) & set(systemd_jobs)
 97        if shared_jobs:
 98            from meerschaum.utils.misc import items_str
 99            from meerschaum.utils.warnings import warn
100            warn(
101                "Job"
102                + ('s' if len(shared_jobs) != 1 else '')
103                + f" {items_str(list(shared_jobs))} "
104                + "exist"
105                + ('s' if len(shared_jobs) == 1 else '')
106                + " in both `local` and `systemd`.",
107                stack=False,
108            )
109        return {**local_jobs, **systemd_jobs}
110
111    if executor_keys == 'local':
112        return _get_local_jobs()
113
114    if executor_keys == 'systemd':
115        return _get_systemd_jobs()
116
117    try:
118        _ = parse_executor_keys(executor_keys, construct=False)
119        conn = parse_executor_keys(executor_keys)
120        jobs = conn.get_jobs(debug=debug)
121        return {
122            name: job
123            for name, job in jobs.items()
124            if include_hidden or not job.hidden
125        }
126    except Exception:
127        return {}

Return a dictionary of the existing jobs.

Parameters
  • executor_keys (Optional[str], default None): If provided, return remote jobs on the given API instance. Otherwise return local jobs.
  • include_hidden (bool, default False): If True, include jobs with the hidden attribute.
Returns
  • A dictionary mapping job names to jobs.
def get_filtered_jobs( executor_keys: Optional[str] = None, filter_list: Optional[List[str]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, warn: bool = False, debug: bool = False) -> Dict[str, Job]:
130def get_filtered_jobs(
131    executor_keys: Optional[str] = None,
132    filter_list: Optional[List[str]] = None,
133    include_hidden: bool = False,
134    combine_local_and_systemd: bool = True,
135    warn: bool = False,
136    debug: bool = False,
137) -> Dict[str, Job]:
138    """
139    Return a list of jobs filtered by the user.
140    """
141    from meerschaum.utils.warnings import warn as _warn
142    jobs = get_jobs(
143        executor_keys,
144        include_hidden=True,
145        combine_local_and_systemd=combine_local_and_systemd,
146        debug=debug,
147    )
148    if not filter_list:
149        return {
150            name: job
151            for name, job in jobs.items()
152            if include_hidden or not job.hidden
153        }
154
155    jobs_to_return = {}
156    for name in filter_list:
157        job = jobs.get(name, None)
158        if job is None:
159            if warn:
160                _warn(
161                    f"Job '{name}' does not exist.",
162                    stack=False,
163                )
164            continue
165        jobs_to_return[name] = job
166
167    return jobs_to_return

Return a list of jobs filtered by the user.

def get_restart_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
170def get_restart_jobs(
171    executor_keys: Optional[str] = None,
172    jobs: Optional[Dict[str, Job]] = None,
173    include_hidden: bool = False,
174    combine_local_and_systemd: bool = True,
175    debug: bool = False,
176) -> Dict[str, Job]:
177    """
178    Return jobs which were created with `--restart` or `--loop`.
179    """
180    if jobs is None:
181        jobs = get_jobs(
182            executor_keys,
183            include_hidden=include_hidden,
184            combine_local_and_systemd=combine_local_and_systemd,
185            debug=debug,
186        )
187
188    return {
189        name: job
190        for name, job in jobs.items()
191        if job.restart
192    }

Return jobs which were created with --restart or --loop.

def get_running_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
195def get_running_jobs(
196    executor_keys: Optional[str] = None,
197    jobs: Optional[Dict[str, Job]] = None,
198    include_hidden: bool = False,
199    combine_local_and_systemd: bool = True,
200    debug: bool = False,
201) -> Dict[str, Job]:
202    """
203    Return a dictionary of running jobs.
204    """
205    if jobs is None:
206        jobs = get_jobs(
207            executor_keys,
208            include_hidden=include_hidden,
209            combine_local_and_systemd=combine_local_and_systemd,
210            debug=debug,
211        )
212
213    return {
214        name: job
215        for name, job in jobs.items()
216        if job.status == 'running'
217    }

Return a dictionary of running jobs.

def get_stopped_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
245def get_stopped_jobs(
246    executor_keys: Optional[str] = None,
247    jobs: Optional[Dict[str, Job]] = None,
248    include_hidden: bool = False,
249    combine_local_and_systemd: bool = True,
250    debug: bool = False,
251) -> Dict[str, Job]:
252    """
253    Return a dictionary of stopped jobs.
254    """
255    if jobs is None:
256        jobs = get_jobs(
257            executor_keys,
258            include_hidden=include_hidden,
259            combine_local_and_systemd=combine_local_and_systemd,
260            debug=debug,
261        )
262
263    return {
264        name: job
265        for name, job in jobs.items()
266        if job.status == 'stopped'
267    }

Return a dictionary of stopped jobs.

def get_paused_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
220def get_paused_jobs(
221    executor_keys: Optional[str] = None,
222    jobs: Optional[Dict[str, Job]] = None,
223    include_hidden: bool = False,
224    combine_local_and_systemd: bool = True,
225    debug: bool = False,
226) -> Dict[str, Job]:
227    """
228    Return a dictionary of paused jobs.
229    """
230    if jobs is None:
231        jobs = get_jobs(
232            executor_keys,
233            include_hidden=include_hidden,
234            combine_local_and_systemd=combine_local_and_systemd,
235            debug=debug,
236        )
237
238    return {
239        name: job
240        for name, job in jobs.items()
241        if job.status == 'paused'
242    }

Return a dictionary of paused jobs.

def make_executor(cls):
270def make_executor(cls):
271    """
272    Register a class as an `Executor`.
273    """
274    import re
275    from meerschaum.connectors import make_connector
276    suffix_regex = r'executor$'
277    typ = re.sub(suffix_regex, '', cls.__name__.lower())
278    if typ not in executor_types:
279        executor_types.append(typ)
280    return make_connector(cls, _is_executor=True)

Register a class as an Executor.

class Executor(meerschaum.connectors._Connector.Connector):
19class Executor(Connector):
20    """
21    Define the methods for managing jobs.
22    """
23
24    @abstractmethod
25    def get_job_exists(self, name: str, debug: bool = False) -> bool:
26        """
27        Return whether a job exists.
28        """
29    
30    @abstractmethod
31    def get_jobs(self) -> Dict[str, Job]:
32        """
33        Return a dictionary of names -> Jobs.
34        """
35
36    @abstractmethod
37    def create_job(
38        self,
39        name: str,
40        sysargs: List[str],
41        properties: Optional[Dict[str, Any]] = None,
42        debug: bool = False,
43    ) -> SuccessTuple:
44        """
45        Create a new job.
46        """
47
48    @abstractmethod
49    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
50        """
51        Start a job.
52        """
53
54    @abstractmethod
55    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
56        """
57        Stop a job.
58        """
59
60    @abstractmethod
61    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
62        """
63        Pause a job.
64        """
65
66    @abstractmethod
67    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
68        """
69        Delete a job.
70        """
71
72    @abstractmethod
73    def get_logs(self, name: str, debug: bool = False) -> str:
74        """
75        Return a job's log output.
76        """

Define the methods for managing jobs.

@abstractmethod
def get_job_exists(self, name: str, debug: bool = False) -> bool:
24    @abstractmethod
25    def get_job_exists(self, name: str, debug: bool = False) -> bool:
26        """
27        Return whether a job exists.
28        """

Return whether a job exists.

@abstractmethod
def get_jobs(self) -> Dict[str, Job]:
30    @abstractmethod
31    def get_jobs(self) -> Dict[str, Job]:
32        """
33        Return a dictionary of names -> Jobs.
34        """

Return a dictionary of names -> Jobs.

@abstractmethod
def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
36    @abstractmethod
37    def create_job(
38        self,
39        name: str,
40        sysargs: List[str],
41        properties: Optional[Dict[str, Any]] = None,
42        debug: bool = False,
43    ) -> SuccessTuple:
44        """
45        Create a new job.
46        """

Create a new job.

@abstractmethod
def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
48    @abstractmethod
49    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
50        """
51        Start a job.
52        """

Start a job.

@abstractmethod
def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
54    @abstractmethod
55    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
56        """
57        Stop a job.
58        """

Stop a job.

@abstractmethod
def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
60    @abstractmethod
61    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
62        """
63        Pause a job.
64        """

Pause a job.

@abstractmethod
def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
66    @abstractmethod
67    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
68        """
69        Delete a job.
70        """

Delete a job.

@abstractmethod
def get_logs(self, name: str, debug: bool = False) -> str:
72    @abstractmethod
73    def get_logs(self, name: str, debug: bool = False) -> str:
74        """
75        Return a job's log output.
76        """

Return a job's log output.

def check_restart_jobs( executor_keys: Optional[str] = 'local', jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = True, silent: bool = False, debug: bool = False) -> Tuple[bool, str]:
283def check_restart_jobs(
284    executor_keys: Optional[str] = 'local',
285    jobs: Optional[Dict[str, Job]] = None,
286    include_hidden: bool = True,
287    silent: bool = False,
288    debug: bool = False,
289) -> SuccessTuple:
290    """
291    Restart any stopped jobs which were created with `--restart`.
292
293    Parameters
294    ----------
295    executor_keys: Optional[str], default None
296        If provided, check jobs on the given remote API instance.
297        Otherwise check local jobs.
298
299    include_hidden: bool, default True
300        If `True`, include hidden jobs in the check.
301
302    silent: bool, default False
303        If `True`, do not print the restart success message.
304    """
305    from meerschaum.utils.misc import items_str
306
307    if jobs is None:
308        jobs = get_jobs(
309            executor_keys,
310            include_hidden=include_hidden,
311            combine_local_and_systemd=False,
312            debug=debug,
313        )
314
315    if not jobs:
316        return True, "No jobs to restart."
317
318    results = {}
319    for name, job in jobs.items():
320        check_success, check_msg = job.check_restart()
321        results[job.name] = (check_success, check_msg)
322        if not silent:
323            mrsm.pprint((check_success, check_msg))
324
325    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
326    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
327    success = len(success_names) == len(jobs)
328    msg = (
329        (
330            "Successfully restarted job"
331            + ('s' if len(success_names) != 1 else '')
332            + ' ' + items_str(success_names) + '.'
333        )
334        if success
335        else (
336            "Failed to restart job"
337            + ('s' if len(success_names) != 1 else '')
338            + ' ' + items_str(fail_names) + '.'
339        )
340    )
341    return success, msg

Restart any stopped jobs which were created with --restart.

Parameters
  • executor_keys (Optional[str], default None): If provided, check jobs on the given remote API instance. Otherwise check local jobs.
  • include_hidden (bool, default True): If True, include hidden jobs in the check.
  • silent (bool, default False): If True, do not print the restart success message.
def start_check_jobs_thread():
353def start_check_jobs_thread():
354    """
355    Start a thread to regularly monitor jobs.
356    """
357    import atexit
358    from functools import partial
359    from meerschaum.utils.threading import RepeatTimer
360    from meerschaum.config.static import STATIC_CONFIG
361
362    global _check_loop_stop_thread
363    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
364
365    _check_loop_stop_thread = RepeatTimer(
366        sleep_seconds,
367        partial(
368            _check_restart_jobs_against_lock,
369            silent=True,
370        )
371    )
372    _check_loop_stop_thread.daemon = True
373    atexit.register(stop_check_jobs_thread)
374
375    _check_loop_stop_thread.start()
376    return _check_loop_stop_thread

Start a thread to regularly monitor jobs.

def stop_check_jobs_thread():
379def stop_check_jobs_thread():
380    """
381    Stop the job monitoring thread.
382    """
383    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
384    from meerschaum.utils.warnings import warn
385    if _check_loop_stop_thread is None:
386        return
387
388    _check_loop_stop_thread.cancel()
389
390    try:
391        if CHECK_JOBS_LOCK_PATH.exists():
392            CHECK_JOBS_LOCK_PATH.unlink()
393    except Exception as e:
394        warn(f"Failed to remove check jobs lock file:\n{e}")

Stop the job monitoring thread.