meerschaum.jobs

Higher-level utilities for managing meerschaum.utils.daemon.Daemon.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Higher-level utilities for managing `meerschaum.utils.daemon.Daemon`.
  7"""
  8
  9import pathlib
 10
 11import meerschaum as mrsm
 12from meerschaum.utils.typing import Dict, Optional, List, SuccessTuple
 13
 14from meerschaum.jobs._Job import Job
 15from meerschaum.jobs._Executor import Executor
 16
 17__all__ = (
 18    'Job',
 19    'systemd',
 20    'get_jobs',
 21    'get_filtered_jobs',
 22    'get_restart_jobs',
 23    'get_running_jobs',
 24    'get_stopped_jobs',
 25    'get_paused_jobs',
 26    'get_restart_jobs',
 27    'make_executor',
 28    'Executor',
 29    'check_restart_jobs',
 30    'start_check_jobs_thread',
 31    'stop_check_jobs_thread',
 32)
 33
 34executor_types: List[str] = ['api', 'local']
 35
 36
 37def get_jobs(
 38    executor_keys: Optional[str] = None,
 39    include_hidden: bool = False,
 40    combine_local_and_systemd: bool = True,
 41    debug: bool = False,
 42) -> Dict[str, Job]:
 43    """
 44    Return a dictionary of the existing jobs.
 45
 46    Parameters
 47    ----------
 48    executor_keys: Optional[str], default None
 49        If provided, return remote jobs on the given API instance.
 50        Otherwise return local jobs.
 51
 52    include_hidden: bool, default False
 53        If `True`, include jobs with the `hidden` attribute.
 54
 55    Returns
 56    -------
 57    A dictionary mapping job names to jobs.
 58    """
 59    from meerschaum.connectors.parse import parse_executor_keys
 60    executor_keys = executor_keys or get_executor_keys_from_context()
 61
 62    include_local_and_system = (
 63        combine_local_and_systemd
 64        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 65        and get_executor_keys_from_context() == 'systemd'
 66    )
 67
 68    def _get_local_jobs():
 69        from meerschaum.utils.daemon import get_daemons
 70        daemons = get_daemons()
 71        jobs = {
 72            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 73            for daemon in daemons
 74        }
 75        return {
 76            name: job
 77            for name, job in jobs.items()
 78            if (include_hidden or not job.hidden) and not job._is_externally_managed
 79
 80        }
 81
 82    def _get_systemd_jobs():
 83        conn = mrsm.get_connector('systemd')
 84        jobs = conn.get_jobs(debug=debug)
 85        return {
 86            name: job
 87            for name, job in jobs.items()
 88            if include_hidden or not job.hidden
 89        }
 90
 91    if include_local_and_system:
 92        local_jobs = _get_local_jobs()
 93        systemd_jobs = _get_systemd_jobs()
 94        shared_jobs = set(local_jobs) & set(systemd_jobs)
 95        if shared_jobs:
 96            from meerschaum.utils.misc import items_str
 97            from meerschaum.utils.warnings import warn
 98            warn(
 99                "Job"
100                + ('s' if len(shared_jobs) != 1 else '')
101                + f" {items_str(list(shared_jobs))} "
102                + "exist"
103                + ('s' if len(shared_jobs) == 1 else '')
104                + " in both `local` and `systemd`.",
105                stack=False,
106            )
107        return {**local_jobs, **systemd_jobs}
108
109    if executor_keys == 'local':
110        return _get_local_jobs()
111
112    if executor_keys == 'systemd':
113        return _get_systemd_jobs()
114
115    try:
116        _ = parse_executor_keys(executor_keys, construct=False)
117        conn = parse_executor_keys(executor_keys)
118        jobs = conn.get_jobs(debug=debug)
119        return {
120            name: job
121            for name, job in jobs.items()
122            if include_hidden or not job.hidden
123        }
124    except Exception:
125        return {}
126
127
128def get_filtered_jobs(
129    executor_keys: Optional[str] = None,
130    filter_list: Optional[List[str]] = None,
131    include_hidden: bool = False,
132    combine_local_and_systemd: bool = True,
133    warn: bool = False,
134    debug: bool = False,
135) -> Dict[str, Job]:
136    """
137    Return a list of jobs filtered by the user.
138    """
139    from meerschaum.utils.warnings import warn as _warn
140    jobs = get_jobs(
141        executor_keys,
142        include_hidden=True,
143        combine_local_and_systemd=combine_local_and_systemd,
144        debug=debug,
145    )
146    if not filter_list:
147        return {
148            name: job
149            for name, job in jobs.items()
150            if include_hidden or not job.hidden
151        }
152
153    jobs_to_return = {}
154    for name in filter_list:
155        job = jobs.get(name, None)
156        if job is None:
157            if warn:
158                _warn(
159                    f"Job '{name}' does not exist.",
160                    stack=False,
161                )
162            continue
163        jobs_to_return[name] = job
164
165    return jobs_to_return
166
167
168def get_restart_jobs(
169    executor_keys: Optional[str] = None,
170    jobs: Optional[Dict[str, Job]] = None,
171    include_hidden: bool = False,
172    combine_local_and_systemd: bool = True,
173    debug: bool = False,
174) -> Dict[str, Job]:
175    """
176    Return jobs which were created with `--restart` or `--loop`.
177    """
178    if jobs is None:
179        jobs = get_jobs(
180            executor_keys,
181            include_hidden=include_hidden,
182            combine_local_and_systemd=combine_local_and_systemd,
183            debug=debug,
184        )
185
186    return {
187        name: job
188        for name, job in jobs.items()
189        if job.restart
190    }
191
192
193def get_running_jobs(
194    executor_keys: Optional[str] = None,
195    jobs: Optional[Dict[str, Job]] = None,
196    include_hidden: bool = False,
197    combine_local_and_systemd: bool = True,
198    debug: bool = False,
199) -> Dict[str, Job]:
200    """
201    Return a dictionary of running jobs.
202    """
203    if jobs is None:
204        jobs = get_jobs(
205            executor_keys,
206            include_hidden=include_hidden,
207            combine_local_and_systemd=combine_local_and_systemd,
208            debug=debug,
209        )
210
211    return {
212        name: job
213        for name, job in jobs.items()
214        if job.status == 'running'
215    }
216
217
218def get_paused_jobs(
219    executor_keys: Optional[str] = None,
220    jobs: Optional[Dict[str, Job]] = None,
221    include_hidden: bool = False,
222    combine_local_and_systemd: bool = True,
223    debug: bool = False,
224) -> Dict[str, Job]:
225    """
226    Return a dictionary of paused jobs.
227    """
228    if jobs is None:
229        jobs = get_jobs(
230            executor_keys,
231            include_hidden=include_hidden,
232            combine_local_and_systemd=combine_local_and_systemd,
233            debug=debug,
234        )
235
236    return {
237        name: job
238        for name, job in jobs.items()
239        if job.status == 'paused'
240    }
241
242
243def get_stopped_jobs(
244    executor_keys: Optional[str] = None,
245    jobs: Optional[Dict[str, Job]] = None,
246    include_hidden: bool = False,
247    combine_local_and_systemd: bool = True,
248    debug: bool = False,
249) -> Dict[str, Job]:
250    """
251    Return a dictionary of stopped jobs.
252    """
253    if jobs is None:
254        jobs = get_jobs(
255            executor_keys,
256            include_hidden=include_hidden,
257            combine_local_and_systemd=combine_local_and_systemd,
258            debug=debug,
259        )
260
261    return {
262        name: job
263        for name, job in jobs.items()
264        if job.status == 'stopped'
265    }
266
267
268def make_executor(cls):
269    """
270    Register a class as an `Executor`.
271    """
272    import re
273    from meerschaum.connectors import make_connector
274    suffix_regex = r'executor$'
275    typ = re.sub(suffix_regex, '', cls.__name__.lower())
276    if typ not in executor_types:
277        executor_types.append(typ)
278    return make_connector(cls, _is_executor=True)
279
280
281def check_restart_jobs(
282    executor_keys: Optional[str] = 'local',
283    jobs: Optional[Dict[str, Job]] = None,
284    include_hidden: bool = True,
285    silent: bool = False,
286    debug: bool = False,
287) -> SuccessTuple:
288    """
289    Restart any stopped jobs which were created with `--restart`.
290
291    Parameters
292    ----------
293    executor_keys: Optional[str], default None
294        If provided, check jobs on the given remote API instance.
295        Otherwise check local jobs.
296
297    include_hidden: bool, default True
298        If `True`, include hidden jobs in the check.
299
300    silent: bool, default False
301        If `True`, do not print the restart success message.
302    """
303    from meerschaum.utils.misc import items_str
304
305    if jobs is None:
306        jobs = get_jobs(
307            executor_keys,
308            include_hidden=include_hidden,
309            combine_local_and_systemd=False,
310            debug=debug,
311        )
312
313    if not jobs:
314        return True, "No jobs to restart."
315
316    results = {}
317    for name, job in jobs.items():
318        check_success, check_msg = job.check_restart()
319        results[job.name] = (check_success, check_msg)
320        if not silent:
321            mrsm.pprint((check_success, check_msg))
322
323    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
324    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
325    success = len(success_names) == len(jobs)
326    msg = (
327        (
328            "Successfully restarted job"
329            + ('s' if len(success_names) != 1 else '')
330            + ' ' + items_str(success_names) + '.'
331        )
332        if success
333        else (
334            "Failed to restart job"
335            + ('s' if len(success_names) != 1 else '')
336            + ' ' + items_str(fail_names) + '.'
337        )
338    )
339    return success, msg
340
341
342def _check_restart_jobs_against_lock(*args, **kwargs):
343    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
344    fasteners = mrsm.attempt_import('fasteners')
345    lock = fasteners.InterProcessLock(CHECK_JOBS_LOCK_PATH)
346    with lock:
347        check_restart_jobs(*args, **kwargs)
348
349
350_check_loop_stop_thread = None
351def start_check_jobs_thread():
352    """
353    Start a thread to regularly monitor jobs.
354    """
355    import atexit
356    from functools import partial
357    from meerschaum.utils.threading import RepeatTimer
358    from meerschaum.config.static import STATIC_CONFIG
359
360    global _check_loop_stop_thread
361    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
362
363    _check_loop_stop_thread = RepeatTimer(
364        sleep_seconds,
365        partial(
366            _check_restart_jobs_against_lock,
367            silent=True,
368        )
369    )
370    _check_loop_stop_thread.daemon = True
371    atexit.register(stop_check_jobs_thread)
372
373    _check_loop_stop_thread.start()
374    return _check_loop_stop_thread
375
376
377def stop_check_jobs_thread():
378    """
379    Stop the job monitoring thread.
380    """
381    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
382    from meerschaum.utils.warnings import warn
383    if _check_loop_stop_thread is None:
384        return
385
386    _check_loop_stop_thread.cancel()
387
388    try:
389        if CHECK_JOBS_LOCK_PATH.exists():
390            CHECK_JOBS_LOCK_PATH.unlink()
391    except Exception as e:
392        warn(f"Failed to remove check jobs lock file:\n{e}")
393
394
395_context_keys = None
396def get_executor_keys_from_context() -> str:
397    """
398    If we are running on the host with the default root, default to `'systemd'`.
399    Otherwise return `'local'`.
400    """
401    global _context_keys
402
403    if _context_keys is not None:
404        return _context_keys
405
406    from meerschaum.config import get_config
407    from meerschaum.config.paths import ROOT_DIR_PATH, DEFAULT_ROOT_DIR_PATH
408    from meerschaum.utils.misc import is_systemd_available
409
410    configured_executor = get_config('meerschaum', 'executor', warn=False)
411    if configured_executor is not None:
412        return configured_executor
413
414    _context_keys = (
415        'systemd'
416        if is_systemd_available() and ROOT_DIR_PATH == DEFAULT_ROOT_DIR_PATH
417        else 'local'
418    )
419    return _context_keys
420
421
422def _install_healthcheck_job() -> SuccessTuple:
423    """
424    Install the systemd job which checks local jobs.
425    """
426    from meerschaum.config import get_config
427
428    enable_healthcheck = get_config('system', 'experimental', 'systemd_healthcheck')
429    if not enable_healthcheck:
430        return False, "Local healthcheck is disabled."
431
432    if get_executor_keys_from_context() != 'systemd':
433        return False, "Not running systemd."
434
435    job = Job(
436        '.local-healthcheck',
437        ['restart', 'jobs', '-e', 'local', '--loop', '--min-seconds', '60'],
438        executor_keys='systemd',
439    )
440    return job.start()
class Job:
 50class Job:
 51    """
 52    Manage a `meerschaum.utils.daemon.Daemon`, locally or remotely via the API.
 53    """
 54
 55    def __init__(
 56        self,
 57        name: str,
 58        sysargs: Union[List[str], str, None] = None,
 59        env: Optional[Dict[str, str]] = None,
 60        executor_keys: Optional[str] = None,
 61        delete_after_completion: bool = False,
 62        _properties: Optional[Dict[str, Any]] = None,
 63        _rotating_log=None,
 64        _stdin_file=None,
 65        _status_hook: Optional[Callable[[], str]] = None,
 66        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
 67        _externally_managed: bool = False,
 68    ):
 69        """
 70        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
 71
 72        Parameters
 73        ----------
 74        name: str
 75            The name of the job to be created.
 76            This will also be used as the Daemon ID.
 77
 78        sysargs: Union[List[str], str, None], default None
 79            The sysargs of the command to be executed, e.g. 'start api'.
 80
 81        env: Optional[Dict[str, str]], default None
 82            If provided, set these environment variables in the job's process.
 83
 84        executor_keys: Optional[str], default None
 85            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
 86
 87        delete_after_completion: bool, default False
 88            If `True`, delete this job when it has finished executing.
 89
 90        _properties: Optional[Dict[str, Any]], default None
 91            If provided, use this to patch the daemon's properties.
 92        """
 93        from meerschaum.utils.daemon import Daemon
 94        for char in BANNED_CHARS:
 95            if char in name:
 96                raise ValueError(f"Invalid name: ({char}) is not allowed.")
 97
 98        if isinstance(sysargs, str):
 99            sysargs = shlex.split(sysargs)
100
101        and_key = STATIC_CONFIG['system']['arguments']['and_key']
102        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
103        if sysargs:
104            sysargs = [
105                (arg if arg != escaped_and_key else and_key)
106                for arg in sysargs
107            ]
108
109        ### NOTE: 'local' and 'systemd' executors are being coalesced.
110        if executor_keys is None:
111            from meerschaum.jobs import get_executor_keys_from_context
112            executor_keys = get_executor_keys_from_context()
113
114        self.executor_keys = executor_keys
115        self.name = name
116        try:
117            self._daemon = (
118                Daemon(daemon_id=name)
119                if executor_keys == 'local'
120                else None
121            )
122        except Exception:
123            self._daemon = None
124
125        ### Handle any injected dependencies.
126        if _rotating_log is not None:
127            self._rotating_log = _rotating_log
128            if self._daemon is not None:
129                self._daemon._rotating_log = _rotating_log
130
131        if _stdin_file is not None:
132            self._stdin_file = _stdin_file
133            if self._daemon is not None:
134                self._daemon._stdin_file = _stdin_file
135                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
136
137        if _status_hook is not None:
138            self._status_hook = _status_hook
139
140        if _result_hook is not None:
141            self._result_hook = _result_hook
142
143        self._externally_managed = _externally_managed
144        self._properties_patch = _properties or {}
145        if _externally_managed:
146            self._properties_patch.update({'externally_managed': _externally_managed})
147
148        if env:
149            self._properties_patch.update({'env': env})
150
151        if delete_after_completion:
152            self._properties_patch.update({'delete_after_completion': delete_after_completion})
153
154        daemon_sysargs = (
155            self._daemon.properties.get('target', {}).get('args', [None])[0]
156            if self._daemon is not None
157            else None
158        )
159
160        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
161            warn("Given sysargs differ from existing sysargs.")
162
163        self._sysargs = [
164            arg
165            for arg in (daemon_sysargs or sysargs or [])
166            if arg not in ('-d', '--daemon')
167        ]
168        for restart_flag in RESTART_FLAGS:
169            if restart_flag in self._sysargs:
170                self._properties_patch.update({'restart': True})
171                break
172
173    @staticmethod
174    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
175        """
176        Build a `Job` from the PID of a running Meerschaum process.
177
178        Parameters
179        ----------
180        pid: int
181            The PID of the process.
182
183        executor_keys: Optional[str], default None
184            The executor keys to assign to the job.
185        """
186        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
187
188        psutil = mrsm.attempt_import('psutil')
189        try:
190            process = psutil.Process(pid)
191        except psutil.NoSuchProcess as e:
192            warn(f"Process with PID {pid} does not exist.", stack=False)
193            raise e
194
195        command_args = process.cmdline()
196        is_daemon = command_args[1] == '-c'
197
198        if is_daemon:
199            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
200            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
201            if root_dir is None:
202                from meerschaum.config.paths import ROOT_DIR_PATH
203                root_dir = ROOT_DIR_PATH
204            jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name
205            daemon_dir = jobs_dir / daemon_id
206            pid_file = daemon_dir / 'process.pid'
207
208            if pid_file.exists():
209                with open(pid_file, 'r', encoding='utf-8') as f:
210                    daemon_pid = int(f.read())
211
212                if pid != daemon_pid:
213                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
214            else:
215                raise EnvironmentError(f"Is job '{daemon_id}' running?")
216
217            return Job(daemon_id, executor_keys=executor_keys)
218
219        from meerschaum._internal.arguments._parse_arguments import parse_arguments
220        from meerschaum.utils.daemon import get_new_daemon_name
221
222        mrsm_ix = 0
223        for i, arg in enumerate(command_args):
224            if 'mrsm' in arg or 'meerschaum' in arg.lower():
225                mrsm_ix = i
226                break
227
228        sysargs = command_args[mrsm_ix+1:]
229        kwargs = parse_arguments(sysargs)
230        name = kwargs.get('name', get_new_daemon_name())
231        return Job(name, sysargs, executor_keys=executor_keys)
232
233    def start(self, debug: bool = False) -> SuccessTuple:
234        """
235        Start the job's daemon.
236        """
237        if self.executor is not None:
238            if not self.exists(debug=debug):
239                return self.executor.create_job(
240                    self.name,
241                    self.sysargs,
242                    properties=self.daemon.properties,
243                    debug=debug,
244                )
245            return self.executor.start_job(self.name, debug=debug)
246
247        if self.is_running():
248            return True, f"{self} is already running."
249
250        success, msg = self.daemon.run(
251            keep_daemon_output=(not self.delete_after_completion),
252            allow_dirty_run=True,
253        )
254        if not success:
255            return success, msg
256
257        return success, f"Started {self}."
258
259    def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
260        """
261        Stop the job's daemon.
262        """
263        if self.executor is not None:
264            return self.executor.stop_job(self.name, debug=debug)
265
266        if self.daemon.status == 'stopped':
267            if not self.restart:
268                return True, f"{self} is not running."
269            elif self.stop_time is not None:
270                return True, f"{self} will not restart until manually started."
271
272        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
273        if quit_success:
274            return quit_success, f"Stopped {self}."
275
276        warn(
277            f"Failed to gracefully quit {self}.",
278            stack=False,
279        )
280        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
281        if not kill_success:
282            return kill_success, kill_msg
283
284        return kill_success, f"Killed {self}."
285
286    def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
287        """
288        Pause the job's daemon.
289        """
290        if self.executor is not None:
291            return self.executor.pause_job(self.name, debug=debug)
292
293        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
294        if not pause_success:
295            return pause_success, pause_msg
296
297        return pause_success, f"Paused {self}."
298
299    def delete(self, debug: bool = False) -> SuccessTuple:
300        """
301        Delete the job and its daemon.
302        """
303        if self.executor is not None:
304            return self.executor.delete_job(self.name, debug=debug)
305
306        if self.is_running():
307            stop_success, stop_msg = self.stop()
308            if not stop_success:
309                return stop_success, stop_msg
310
311        cleanup_success, cleanup_msg = self.daemon.cleanup()
312        if not cleanup_success:
313            return cleanup_success, cleanup_msg
314
315        return cleanup_success, f"Deleted {self}."
316
317    def is_running(self) -> bool:
318        """
319        Determine whether the job's daemon is running.
320        """
321        return self.status == 'running'
322
323    def exists(self, debug: bool = False) -> bool:
324        """
325        Determine whether the job exists.
326        """
327        if self.executor is not None:
328            return self.executor.get_job_exists(self.name, debug=debug)
329
330        return self.daemon.path.exists()
331
332    def get_logs(self) -> Union[str, None]:
333        """
334        Return the output text of the job's daemon.
335        """
336        if self.executor is not None:
337            return self.executor.get_logs(self.name)
338
339        return self.daemon.log_text
340
341    def monitor_logs(
342        self,
343        callback_function: Callable[[str], None] = partial(print, end=''),
344        input_callback_function: Optional[Callable[[], str]] = None,
345        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
346        stop_event: Optional[asyncio.Event] = None,
347        stop_on_exit: bool = False,
348        strip_timestamps: bool = False,
349        accept_input: bool = True,
350        debug: bool = False,
351    ):
352        """
353        Monitor the job's log files and execute a callback on new lines.
354
355        Parameters
356        ----------
357        callback_function: Callable[[str], None], default partial(print, end='')
358            The callback to execute as new data comes in.
359            Defaults to printing the output directly to `stdout`.
360
361        input_callback_function: Optional[Callable[[], str]], default None
362            If provided, execute this callback when the daemon is blocking on stdin.
363            Defaults to `sys.stdin.readline()`.
364
365        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
366            If provided, execute this callback when the daemon stops.
367            The job's SuccessTuple will be passed to the callback.
368
369        stop_event: Optional[asyncio.Event], default None
370            If provided, stop monitoring when this event is set.
371            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
372            from within `callback_function` to stop monitoring.
373
374        stop_on_exit: bool, default False
375            If `True`, stop monitoring when the job stops.
376
377        strip_timestamps: bool, default False
378            If `True`, remove leading timestamps from lines.
379
380        accept_input: bool, default True
381            If `True`, accept input when the daemon blocks on stdin.
382        """
383        def default_input_callback_function():
384            return sys.stdin.readline()
385
386        if input_callback_function is None:
387            input_callback_function = default_input_callback_function
388
389        if self.executor is not None:
390            self.executor.monitor_logs(
391                self.name,
392                callback_function,
393                input_callback_function=input_callback_function,
394                stop_callback_function=stop_callback_function,
395                stop_on_exit=stop_on_exit,
396                accept_input=accept_input,
397                strip_timestamps=strip_timestamps,
398                debug=debug,
399            )
400            return
401
402        monitor_logs_coroutine = self.monitor_logs_async(
403            callback_function=callback_function,
404            input_callback_function=input_callback_function,
405            stop_callback_function=stop_callback_function,
406            stop_event=stop_event,
407            stop_on_exit=stop_on_exit,
408            strip_timestamps=strip_timestamps,
409            accept_input=accept_input,
410        )
411        return asyncio.run(monitor_logs_coroutine)
412
413    async def monitor_logs_async(
414        self,
415        callback_function: Callable[[str], None] = partial(print, end='', flush=True),
416        input_callback_function: Optional[Callable[[], str]] = None,
417        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
418        stop_event: Optional[asyncio.Event] = None,
419        stop_on_exit: bool = False,
420        strip_timestamps: bool = False,
421        accept_input: bool = True,
422        _logs_path: Optional[pathlib.Path] = None,
423        _log=None,
424        _stdin_file=None,
425        debug: bool = False,
426    ):
427        """
428        Monitor the job's log files and await a callback on new lines.
429
430        Parameters
431        ----------
432        callback_function: Callable[[str], None], default partial(print, end='')
433            The callback to execute as new data comes in.
434            Defaults to printing the output directly to `stdout`.
435
436        input_callback_function: Optional[Callable[[], str]], default None
437            If provided, execute this callback when the daemon is blocking on stdin.
438            Defaults to `sys.stdin.readline()`.
439
440        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
441            If provided, execute this callback when the daemon stops.
442            The job's SuccessTuple will be passed to the callback.
443
444        stop_event: Optional[asyncio.Event], default None
445            If provided, stop monitoring when this event is set.
446            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
447            from within `callback_function` to stop monitoring.
448
449        stop_on_exit: bool, default False
450            If `True`, stop monitoring when the job stops.
451
452        strip_timestamps: bool, default False
453            If `True`, remove leading timestamps from lines.
454
455        accept_input: bool, default True
456            If `True`, accept input when the daemon blocks on stdin.
457        """
458        def default_input_callback_function():
459            return sys.stdin.readline()
460
461        if input_callback_function is None:
462            input_callback_function = default_input_callback_function
463
464        if self.executor is not None:
465            await self.executor.monitor_logs_async(
466                self.name,
467                callback_function,
468                input_callback_function=input_callback_function,
469                stop_callback_function=stop_callback_function,
470                stop_on_exit=stop_on_exit,
471                strip_timestamps=strip_timestamps,
472                accept_input=accept_input,
473                debug=debug,
474            )
475            return
476
477        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
478
479        events = {
480            'user': stop_event,
481            'stopped': asyncio.Event(),
482        }
483        combined_event = asyncio.Event()
484        emitted_text = False
485        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
486
487        async def check_job_status():
488            nonlocal emitted_text
489            stopped_event = events.get('stopped', None)
490            if stopped_event is None:
491                return
492
493            sleep_time = 0.1
494            while sleep_time < 60:
495                if self.status == 'stopped':
496                    if not emitted_text:
497                        await asyncio.sleep(sleep_time)
498                        sleep_time = round(sleep_time * 1.1, 2)
499                        continue
500
501                    if stop_callback_function is not None:
502                        try:
503                            if asyncio.iscoroutinefunction(stop_callback_function):
504                                await stop_callback_function(self.result)
505                            else:
506                                stop_callback_function(self.result)
507                        except asyncio.exceptions.CancelledError:
508                            break
509                        except Exception:
510                            warn(traceback.format_exc())
511
512                    if stop_on_exit:
513                        events['stopped'].set()
514
515                    break
516                await asyncio.sleep(0.1)
517
518        async def check_blocking_on_input():
519            while True:
520                if not emitted_text or not self.is_blocking_on_stdin():
521                    try:
522                        await asyncio.sleep(0.1)
523                    except asyncio.exceptions.CancelledError:
524                        break
525                    continue
526
527                if not self.is_running():
528                    break
529
530                await emit_latest_lines()
531
532                try:
533                    print('', end='', flush=True)
534                    if asyncio.iscoroutinefunction(input_callback_function):
535                        data = await input_callback_function()
536                    else:
537                        data = input_callback_function()
538                except KeyboardInterrupt:
539                    break
540                if not data.endswith('\n'):
541                    data += '\n'
542
543                stdin_file.write(data)
544                await asyncio.sleep(0.1)
545
546        async def combine_events():
547            event_tasks = [
548                asyncio.create_task(event.wait())
549                for event in events.values()
550                if event is not None
551            ]
552            if not event_tasks:
553                return
554
555            try:
556                done, pending = await asyncio.wait(
557                    event_tasks,
558                    return_when=asyncio.FIRST_COMPLETED,
559                )
560                for task in pending:
561                    task.cancel()
562            except asyncio.exceptions.CancelledError:
563                pass
564            finally:
565                combined_event.set()
566
567        check_job_status_task = asyncio.create_task(check_job_status())
568        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
569        combine_events_task = asyncio.create_task(combine_events())
570
571        log = _log if _log is not None else self.daemon.rotating_log
572        lines_to_show = get_config('jobs', 'logs', 'lines_to_show')
573
574        async def emit_latest_lines():
575            nonlocal emitted_text
576            lines = log.readlines()
577            for line in lines[(-1 * lines_to_show):]:
578                if stop_event is not None and stop_event.is_set():
579                    return
580
581                if strip_timestamps:
582                    line = strip_timestamp_from_line(line)
583
584                try:
585                    if asyncio.iscoroutinefunction(callback_function):
586                        await callback_function(line)
587                    else:
588                        callback_function(line)
589                    emitted_text = True
590                except StopMonitoringLogs:
591                    return
592                except Exception:
593                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
594
595        await emit_latest_lines()
596
597        tasks = (
598            [check_job_status_task]
599            + ([check_blocking_on_input_task] if accept_input else [])
600            + [combine_events_task]
601        )
602        try:
603            _ = asyncio.gather(*tasks, return_exceptions=True)
604        except asyncio.exceptions.CancelledError:
605            raise
606        except Exception:
607            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
608
609        watchfiles = mrsm.attempt_import('watchfiles')
610        async for changes in watchfiles.awatch(
611            _logs_path or LOGS_RESOURCES_PATH,
612            stop_event=combined_event,
613        ):
614            for change in changes:
615                file_path_str = change[1]
616                file_path = pathlib.Path(file_path_str)
617                latest_subfile_path = log.get_latest_subfile_path()
618                if latest_subfile_path != file_path:
619                    continue
620
621                await emit_latest_lines()
622
623        await emit_latest_lines()
624
625    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
626        """
627        Return whether a job's daemon is blocking on stdin.
628        """
629        if self.executor is not None:
630            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
631
632        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()
633
634    def write_stdin(self, data):
635        """
636        Write to a job's daemon's `stdin`.
637        """
638        self.daemon.stdin_file.write(data)
639
640    @property
641    def executor(self) -> Union[Executor, None]:
642        """
643        If the job is remote, return the connector to the remote API instance.
644        """
645        return (
646            mrsm.get_connector(self.executor_keys)
647            if self.executor_keys != 'local'
648            else None
649        )
650
651    @property
652    def status(self) -> str:
653        """
654        Return the running status of the job's daemon.
655        """
656        if '_status_hook' in self.__dict__:
657            return self._status_hook()
658
659        if self.executor is not None:
660            return self.executor.get_job_status(self.name)
661
662        return self.daemon.status
663
664    @property
665    def pid(self) -> Union[int, None]:
666        """
667        Return the PID of the job's dameon.
668        """
669        if self.executor is not None:
670            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
671
672        return self.daemon.pid
673
674    @property
675    def restart(self) -> bool:
676        """
677        Return whether to restart a stopped job.
678        """
679        if self.executor is not None:
680            return self.executor.get_job_metadata(self.name).get('restart', False)
681
682        return self.daemon.properties.get('restart', False)
683
684    @property
685    def result(self) -> SuccessTuple:
686        """
687        Return the `SuccessTuple` when the job has terminated.
688        """
689        if self.is_running():
690            return True, f"{self} is running."
691
692        if '_result_hook' in self.__dict__:
693            return self._result_hook()
694
695        if self.executor is not None:
696            return (
697                self.executor.get_job_metadata(self.name)
698                .get('result', (False, "No result available."))
699            )
700
701        _result = self.daemon.properties.get('result', None)
702        if _result is None:
703            return False, "No result available."
704
705        return tuple(_result)
706
707    @property
708    def sysargs(self) -> List[str]:
709        """
710        Return the sysargs to use for the Daemon.
711        """
712        if self._sysargs:
713            return self._sysargs
714
715        if self.executor is not None:
716            return self.executor.get_job_metadata(self.name).get('sysargs', [])
717
718        target_args = self.daemon.target_args
719        if target_args is None:
720            return []
721        self._sysargs = target_args[0] if len(target_args) > 0 else []
722        return self._sysargs
723
724    @property
725    def daemon(self) -> 'Daemon':
726        """
727        Return the daemon which this job manages.
728        """
729        from meerschaum.utils.daemon import Daemon
730        if self._daemon is not None and self.executor is None and self._sysargs:
731            return self._daemon
732
733        remote_properties = (
734            {}
735            if self.executor is None
736            else self.executor.get_job_properties(self.name)
737        )
738        properties = {**remote_properties, **self._properties_patch}
739
740        self._daemon = Daemon(
741            target=entry,
742            target_args=[self._sysargs],
743            target_kw={},
744            daemon_id=self.name,
745            label=shlex.join(self._sysargs),
746            properties=properties,
747        )
748        if '_rotating_log' in self.__dict__:
749            self._daemon._rotating_log = self._rotating_log
750
751        if '_stdin_file' in self.__dict__:
752            self._daemon._stdin_file = self._stdin_file
753            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
754
755        return self._daemon
756
757    @property
758    def began(self) -> Union[datetime, None]:
759        """
760        The datetime when the job began running.
761        """
762        if self.executor is not None:
763            began_str = self.executor.get_job_began(self.name)
764            if began_str is None:
765                return None
766            return (
767                datetime.fromisoformat(began_str)
768                .astimezone(timezone.utc)
769                .replace(tzinfo=None)
770            )
771
772        began_str = self.daemon.properties.get('process', {}).get('began', None)
773        if began_str is None:
774            return None
775
776        return datetime.fromisoformat(began_str)
777
778    @property
779    def ended(self) -> Union[datetime, None]:
780        """
781        The datetime when the job stopped running.
782        """
783        if self.executor is not None:
784            ended_str = self.executor.get_job_ended(self.name)
785            if ended_str is None:
786                return None
787            return (
788                datetime.fromisoformat(ended_str)
789                .astimezone(timezone.utc)
790                .replace(tzinfo=None)
791            )
792
793        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
794        if ended_str is None:
795            return None
796
797        return datetime.fromisoformat(ended_str)
798
799    @property
800    def paused(self) -> Union[datetime, None]:
801        """
802        The datetime when the job was suspended while running.
803        """
804        if self.executor is not None:
805            paused_str = self.executor.get_job_paused(self.name)
806            if paused_str is None:
807                return None
808            return (
809                datetime.fromisoformat(paused_str)
810                .astimezone(timezone.utc)
811                .replace(tzinfo=None)
812            )
813
814        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
815        if paused_str is None:
816            return None
817
818        return datetime.fromisoformat(paused_str)
819
820    @property
821    def stop_time(self) -> Union[datetime, None]:
822        """
823        Return the timestamp when the job was manually stopped.
824        """
825        if self.executor is not None:
826            return self.executor.get_job_stop_time(self.name)
827
828        if not self.daemon.stop_path.exists():
829            return None
830
831        stop_data = self.daemon._read_stop_file()
832        if not stop_data:
833            return None
834
835        stop_time_str = stop_data.get('stop_time', None)
836        if not stop_time_str:
837            warn(f"Could not read stop time for {self}.")
838            return None
839
840        return datetime.fromisoformat(stop_time_str)
841
842    @property
843    def hidden(self) -> bool:
844        """
845        Return a bool indicating whether this job should be displayed.
846        """
847        return (
848            self.name.startswith('_')
849            or self.name.startswith('.')
850            or self._is_externally_managed
851        )
852
853    def check_restart(self) -> SuccessTuple:
854        """
855        If `restart` is `True` and the daemon is not running,
856        restart the job.
857        Do not restart if the job was manually stopped.
858        """
859        if self.is_running():
860            return True, f"{self} is running."
861
862        if not self.restart:
863            return True, f"{self} does not need to be restarted."
864
865        if self.stop_time is not None:
866            return True, f"{self} was manually stopped."
867
868        return self.start()
869
870    @property
871    def label(self) -> str:
872        """
873        Return the job's Daemon label (joined sysargs).
874        """
875        from meerschaum._internal.arguments import compress_pipeline_sysargs
876        sysargs = compress_pipeline_sysargs(self.sysargs)
877        return shlex.join(sysargs).replace(' + ', '\n+ ')
878
879    @property
880    def _externally_managed_file(self) -> pathlib.Path:
881        """
882        Return the path to the externally managed file.
883        """
884        return self.daemon.path / '.externally-managed'
885
886    def _set_externally_managed(self):
887        """
888        Set this job as externally managed.
889        """
890        self._externally_managed = True
891        try:
892            self._externally_managed_file.parent.mkdir(exist_ok=True, parents=True)
893            self._externally_managed_file.touch()
894        except Exception as e:
895            warn(e)
896
897    @property
898    def _is_externally_managed(self) -> bool:
899        """
900        Return whether this job is externally managed.
901        """
902        return self.executor_keys in (None, 'local') and (
903            self._externally_managed or self._externally_managed_file.exists()
904        )
905
906    @property
907    def env(self) -> Dict[str, str]:
908        """
909        Return the environment variables to set for the job's process.
910        """
911        if '_env' in self.__dict__:
912            return self.__dict__['_env']
913
914        _env = self.daemon.properties.get('env', {})
915        default_env = {
916            'PYTHONUNBUFFERED': '1',
917            'LINES': str(get_config('jobs', 'terminal', 'lines')),
918            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
919        }
920        self._env = {**default_env, **_env}
921        return self._env
922
923    @property
924    def delete_after_completion(self) -> bool:
925        """
926        Return whether this job is configured to delete itself after completion.
927        """
928        if '_delete_after_completion' in self.__dict__:
929            return self.__dict__.get('_delete_after_completion', False)
930
931        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
932        return self._delete_after_completion
933
934    def __str__(self) -> str:
935        sysargs = self.sysargs
936        sysargs_str = shlex.join(sysargs) if sysargs else ''
937        job_str = f'Job("{self.name}"'
938        if sysargs_str:
939            job_str += f', "{sysargs_str}"'
940
941        job_str += ')'
942        return job_str
943
944    def __repr__(self) -> str:
945        return str(self)
946
947    def __hash__(self) -> int:
948        return hash(self.name)

Manage a meerschaum.utils.daemon.Daemon, locally or remotely via the API.

Job( name: str, sysargs: Union[List[str], str, NoneType] = None, env: Optional[Dict[str, str]] = None, executor_keys: Optional[str] = None, delete_after_completion: bool = False, _properties: Optional[Dict[str, Any]] = None, _rotating_log=None, _stdin_file=None, _status_hook: Optional[Callable[[], str]] = None, _result_hook: Optional[Callable[[], Tuple[bool, str]]] = None, _externally_managed: bool = False)
 55    def __init__(
 56        self,
 57        name: str,
 58        sysargs: Union[List[str], str, None] = None,
 59        env: Optional[Dict[str, str]] = None,
 60        executor_keys: Optional[str] = None,
 61        delete_after_completion: bool = False,
 62        _properties: Optional[Dict[str, Any]] = None,
 63        _rotating_log=None,
 64        _stdin_file=None,
 65        _status_hook: Optional[Callable[[], str]] = None,
 66        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
 67        _externally_managed: bool = False,
 68    ):
 69        """
 70        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
 71
 72        Parameters
 73        ----------
 74        name: str
 75            The name of the job to be created.
 76            This will also be used as the Daemon ID.
 77
 78        sysargs: Union[List[str], str, None], default None
 79            The sysargs of the command to be executed, e.g. 'start api'.
 80
 81        env: Optional[Dict[str, str]], default None
 82            If provided, set these environment variables in the job's process.
 83
 84        executor_keys: Optional[str], default None
 85            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
 86
 87        delete_after_completion: bool, default False
 88            If `True`, delete this job when it has finished executing.
 89
 90        _properties: Optional[Dict[str, Any]], default None
 91            If provided, use this to patch the daemon's properties.
 92        """
 93        from meerschaum.utils.daemon import Daemon
 94        for char in BANNED_CHARS:
 95            if char in name:
 96                raise ValueError(f"Invalid name: ({char}) is not allowed.")
 97
 98        if isinstance(sysargs, str):
 99            sysargs = shlex.split(sysargs)
100
101        and_key = STATIC_CONFIG['system']['arguments']['and_key']
102        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
103        if sysargs:
104            sysargs = [
105                (arg if arg != escaped_and_key else and_key)
106                for arg in sysargs
107            ]
108
109        ### NOTE: 'local' and 'systemd' executors are being coalesced.
110        if executor_keys is None:
111            from meerschaum.jobs import get_executor_keys_from_context
112            executor_keys = get_executor_keys_from_context()
113
114        self.executor_keys = executor_keys
115        self.name = name
116        try:
117            self._daemon = (
118                Daemon(daemon_id=name)
119                if executor_keys == 'local'
120                else None
121            )
122        except Exception:
123            self._daemon = None
124
125        ### Handle any injected dependencies.
126        if _rotating_log is not None:
127            self._rotating_log = _rotating_log
128            if self._daemon is not None:
129                self._daemon._rotating_log = _rotating_log
130
131        if _stdin_file is not None:
132            self._stdin_file = _stdin_file
133            if self._daemon is not None:
134                self._daemon._stdin_file = _stdin_file
135                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
136
137        if _status_hook is not None:
138            self._status_hook = _status_hook
139
140        if _result_hook is not None:
141            self._result_hook = _result_hook
142
143        self._externally_managed = _externally_managed
144        self._properties_patch = _properties or {}
145        if _externally_managed:
146            self._properties_patch.update({'externally_managed': _externally_managed})
147
148        if env:
149            self._properties_patch.update({'env': env})
150
151        if delete_after_completion:
152            self._properties_patch.update({'delete_after_completion': delete_after_completion})
153
154        daemon_sysargs = (
155            self._daemon.properties.get('target', {}).get('args', [None])[0]
156            if self._daemon is not None
157            else None
158        )
159
160        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
161            warn("Given sysargs differ from existing sysargs.")
162
163        self._sysargs = [
164            arg
165            for arg in (daemon_sysargs or sysargs or [])
166            if arg not in ('-d', '--daemon')
167        ]
168        for restart_flag in RESTART_FLAGS:
169            if restart_flag in self._sysargs:
170                self._properties_patch.update({'restart': True})
171                break

Create a new job to manage a meerschaum.utils.daemon.Daemon.

Parameters
  • name (str): The name of the job to be created. This will also be used as the Daemon ID.
  • sysargs (Union[List[str], str, None], default None): The sysargs of the command to be executed, e.g. 'start api'.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the job's process.
  • executor_keys (Optional[str], default None): If provided, execute the job remotely on an API instance, e.g. 'api:main'.
  • delete_after_completion (bool, default False): If True, delete this job when it has finished executing.
  • _properties (Optional[Dict[str, Any]], default None): If provided, use this to patch the daemon's properties.
executor_keys
name
@staticmethod
def from_pid( pid: int, executor_keys: Optional[str] = None) -> Job:
173    @staticmethod
174    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
175        """
176        Build a `Job` from the PID of a running Meerschaum process.
177
178        Parameters
179        ----------
180        pid: int
181            The PID of the process.
182
183        executor_keys: Optional[str], default None
184            The executor keys to assign to the job.
185        """
186        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
187
188        psutil = mrsm.attempt_import('psutil')
189        try:
190            process = psutil.Process(pid)
191        except psutil.NoSuchProcess as e:
192            warn(f"Process with PID {pid} does not exist.", stack=False)
193            raise e
194
195        command_args = process.cmdline()
196        is_daemon = command_args[1] == '-c'
197
198        if is_daemon:
199            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
200            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
201            if root_dir is None:
202                from meerschaum.config.paths import ROOT_DIR_PATH
203                root_dir = ROOT_DIR_PATH
204            jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name
205            daemon_dir = jobs_dir / daemon_id
206            pid_file = daemon_dir / 'process.pid'
207
208            if pid_file.exists():
209                with open(pid_file, 'r', encoding='utf-8') as f:
210                    daemon_pid = int(f.read())
211
212                if pid != daemon_pid:
213                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
214            else:
215                raise EnvironmentError(f"Is job '{daemon_id}' running?")
216
217            return Job(daemon_id, executor_keys=executor_keys)
218
219        from meerschaum._internal.arguments._parse_arguments import parse_arguments
220        from meerschaum.utils.daemon import get_new_daemon_name
221
222        mrsm_ix = 0
223        for i, arg in enumerate(command_args):
224            if 'mrsm' in arg or 'meerschaum' in arg.lower():
225                mrsm_ix = i
226                break
227
228        sysargs = command_args[mrsm_ix+1:]
229        kwargs = parse_arguments(sysargs)
230        name = kwargs.get('name', get_new_daemon_name())
231        return Job(name, sysargs, executor_keys=executor_keys)

Build a Job from the PID of a running Meerschaum process.

Parameters
  • pid (int): The PID of the process.
  • executor_keys (Optional[str], default None): The executor keys to assign to the job.
def start(self, debug: bool = False) -> Tuple[bool, str]:
233    def start(self, debug: bool = False) -> SuccessTuple:
234        """
235        Start the job's daemon.
236        """
237        if self.executor is not None:
238            if not self.exists(debug=debug):
239                return self.executor.create_job(
240                    self.name,
241                    self.sysargs,
242                    properties=self.daemon.properties,
243                    debug=debug,
244                )
245            return self.executor.start_job(self.name, debug=debug)
246
247        if self.is_running():
248            return True, f"{self} is already running."
249
250        success, msg = self.daemon.run(
251            keep_daemon_output=(not self.delete_after_completion),
252            allow_dirty_run=True,
253        )
254        if not success:
255            return success, msg
256
257        return success, f"Started {self}."

Start the job's daemon.

def stop( self, timeout_seconds: Optional[int] = None, debug: bool = False) -> Tuple[bool, str]:
259    def stop(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
260        """
261        Stop the job's daemon.
262        """
263        if self.executor is not None:
264            return self.executor.stop_job(self.name, debug=debug)
265
266        if self.daemon.status == 'stopped':
267            if not self.restart:
268                return True, f"{self} is not running."
269            elif self.stop_time is not None:
270                return True, f"{self} will not restart until manually started."
271
272        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
273        if quit_success:
274            return quit_success, f"Stopped {self}."
275
276        warn(
277            f"Failed to gracefully quit {self}.",
278            stack=False,
279        )
280        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
281        if not kill_success:
282            return kill_success, kill_msg
283
284        return kill_success, f"Killed {self}."

Stop the job's daemon.

def pause( self, timeout_seconds: Optional[int] = None, debug: bool = False) -> Tuple[bool, str]:
286    def pause(self, timeout_seconds: Optional[int] = None, debug: bool = False) -> SuccessTuple:
287        """
288        Pause the job's daemon.
289        """
290        if self.executor is not None:
291            return self.executor.pause_job(self.name, debug=debug)
292
293        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
294        if not pause_success:
295            return pause_success, pause_msg
296
297        return pause_success, f"Paused {self}."

Pause the job's daemon.

def delete(self, debug: bool = False) -> Tuple[bool, str]:
299    def delete(self, debug: bool = False) -> SuccessTuple:
300        """
301        Delete the job and its daemon.
302        """
303        if self.executor is not None:
304            return self.executor.delete_job(self.name, debug=debug)
305
306        if self.is_running():
307            stop_success, stop_msg = self.stop()
308            if not stop_success:
309                return stop_success, stop_msg
310
311        cleanup_success, cleanup_msg = self.daemon.cleanup()
312        if not cleanup_success:
313            return cleanup_success, cleanup_msg
314
315        return cleanup_success, f"Deleted {self}."

Delete the job and its daemon.

def is_running(self) -> bool:
317    def is_running(self) -> bool:
318        """
319        Determine whether the job's daemon is running.
320        """
321        return self.status == 'running'

Determine whether the job's daemon is running.

def exists(self, debug: bool = False) -> bool:
323    def exists(self, debug: bool = False) -> bool:
324        """
325        Determine whether the job exists.
326        """
327        if self.executor is not None:
328            return self.executor.get_job_exists(self.name, debug=debug)
329
330        return self.daemon.path.exists()

Determine whether the job exists.

def get_logs(self) -> Optional[str]:
332    def get_logs(self) -> Union[str, None]:
333        """
334        Return the output text of the job's daemon.
335        """
336        if self.executor is not None:
337            return self.executor.get_logs(self.name)
338
339        return self.daemon.log_text

Return the output text of the job's daemon.

def monitor_logs( self, callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end=''), input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
341    def monitor_logs(
342        self,
343        callback_function: Callable[[str], None] = partial(print, end=''),
344        input_callback_function: Optional[Callable[[], str]] = None,
345        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
346        stop_event: Optional[asyncio.Event] = None,
347        stop_on_exit: bool = False,
348        strip_timestamps: bool = False,
349        accept_input: bool = True,
350        debug: bool = False,
351    ):
352        """
353        Monitor the job's log files and execute a callback on new lines.
354
355        Parameters
356        ----------
357        callback_function: Callable[[str], None], default partial(print, end='')
358            The callback to execute as new data comes in.
359            Defaults to printing the output directly to `stdout`.
360
361        input_callback_function: Optional[Callable[[], str]], default None
362            If provided, execute this callback when the daemon is blocking on stdin.
363            Defaults to `sys.stdin.readline()`.
364
365        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
366            If provided, execute this callback when the daemon stops.
367            The job's SuccessTuple will be passed to the callback.
368
369        stop_event: Optional[asyncio.Event], default None
370            If provided, stop monitoring when this event is set.
371            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
372            from within `callback_function` to stop monitoring.
373
374        stop_on_exit: bool, default False
375            If `True`, stop monitoring when the job stops.
376
377        strip_timestamps: bool, default False
378            If `True`, remove leading timestamps from lines.
379
380        accept_input: bool, default True
381            If `True`, accept input when the daemon blocks on stdin.
382        """
383        def default_input_callback_function():
384            return sys.stdin.readline()
385
386        if input_callback_function is None:
387            input_callback_function = default_input_callback_function
388
389        if self.executor is not None:
390            self.executor.monitor_logs(
391                self.name,
392                callback_function,
393                input_callback_function=input_callback_function,
394                stop_callback_function=stop_callback_function,
395                stop_on_exit=stop_on_exit,
396                accept_input=accept_input,
397                strip_timestamps=strip_timestamps,
398                debug=debug,
399            )
400            return
401
402        monitor_logs_coroutine = self.monitor_logs_async(
403            callback_function=callback_function,
404            input_callback_function=input_callback_function,
405            stop_callback_function=stop_callback_function,
406            stop_event=stop_event,
407            stop_on_exit=stop_on_exit,
408            strip_timestamps=strip_timestamps,
409            accept_input=accept_input,
410        )
411        return asyncio.run(monitor_logs_coroutine)

Monitor the job's log files and execute a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default partial(print, end='')): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
async def monitor_logs_async( self, callback_function: Callable[[str], NoneType] = functools.partial(<built-in function print>, end='', flush=True), input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, _logs_path: Optional[pathlib.Path] = None, _log=None, _stdin_file=None, debug: bool = False):
413    async def monitor_logs_async(
414        self,
415        callback_function: Callable[[str], None] = partial(print, end='', flush=True),
416        input_callback_function: Optional[Callable[[], str]] = None,
417        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
418        stop_event: Optional[asyncio.Event] = None,
419        stop_on_exit: bool = False,
420        strip_timestamps: bool = False,
421        accept_input: bool = True,
422        _logs_path: Optional[pathlib.Path] = None,
423        _log=None,
424        _stdin_file=None,
425        debug: bool = False,
426    ):
427        """
428        Monitor the job's log files and await a callback on new lines.
429
430        Parameters
431        ----------
432        callback_function: Callable[[str], None], default partial(print, end='')
433            The callback to execute as new data comes in.
434            Defaults to printing the output directly to `stdout`.
435
436        input_callback_function: Optional[Callable[[], str]], default None
437            If provided, execute this callback when the daemon is blocking on stdin.
438            Defaults to `sys.stdin.readline()`.
439
440        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
441            If provided, execute this callback when the daemon stops.
442            The job's SuccessTuple will be passed to the callback.
443
444        stop_event: Optional[asyncio.Event], default None
445            If provided, stop monitoring when this event is set.
446            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
447            from within `callback_function` to stop monitoring.
448
449        stop_on_exit: bool, default False
450            If `True`, stop monitoring when the job stops.
451
452        strip_timestamps: bool, default False
453            If `True`, remove leading timestamps from lines.
454
455        accept_input: bool, default True
456            If `True`, accept input when the daemon blocks on stdin.
457        """
458        def default_input_callback_function():
459            return sys.stdin.readline()
460
461        if input_callback_function is None:
462            input_callback_function = default_input_callback_function
463
464        if self.executor is not None:
465            await self.executor.monitor_logs_async(
466                self.name,
467                callback_function,
468                input_callback_function=input_callback_function,
469                stop_callback_function=stop_callback_function,
470                stop_on_exit=stop_on_exit,
471                strip_timestamps=strip_timestamps,
472                accept_input=accept_input,
473                debug=debug,
474            )
475            return
476
477        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
478
479        events = {
480            'user': stop_event,
481            'stopped': asyncio.Event(),
482        }
483        combined_event = asyncio.Event()
484        emitted_text = False
485        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
486
487        async def check_job_status():
488            nonlocal emitted_text
489            stopped_event = events.get('stopped', None)
490            if stopped_event is None:
491                return
492
493            sleep_time = 0.1
494            while sleep_time < 60:
495                if self.status == 'stopped':
496                    if not emitted_text:
497                        await asyncio.sleep(sleep_time)
498                        sleep_time = round(sleep_time * 1.1, 2)
499                        continue
500
501                    if stop_callback_function is not None:
502                        try:
503                            if asyncio.iscoroutinefunction(stop_callback_function):
504                                await stop_callback_function(self.result)
505                            else:
506                                stop_callback_function(self.result)
507                        except asyncio.exceptions.CancelledError:
508                            break
509                        except Exception:
510                            warn(traceback.format_exc())
511
512                    if stop_on_exit:
513                        events['stopped'].set()
514
515                    break
516                await asyncio.sleep(0.1)
517
518        async def check_blocking_on_input():
519            while True:
520                if not emitted_text or not self.is_blocking_on_stdin():
521                    try:
522                        await asyncio.sleep(0.1)
523                    except asyncio.exceptions.CancelledError:
524                        break
525                    continue
526
527                if not self.is_running():
528                    break
529
530                await emit_latest_lines()
531
532                try:
533                    print('', end='', flush=True)
534                    if asyncio.iscoroutinefunction(input_callback_function):
535                        data = await input_callback_function()
536                    else:
537                        data = input_callback_function()
538                except KeyboardInterrupt:
539                    break
540                if not data.endswith('\n'):
541                    data += '\n'
542
543                stdin_file.write(data)
544                await asyncio.sleep(0.1)
545
546        async def combine_events():
547            event_tasks = [
548                asyncio.create_task(event.wait())
549                for event in events.values()
550                if event is not None
551            ]
552            if not event_tasks:
553                return
554
555            try:
556                done, pending = await asyncio.wait(
557                    event_tasks,
558                    return_when=asyncio.FIRST_COMPLETED,
559                )
560                for task in pending:
561                    task.cancel()
562            except asyncio.exceptions.CancelledError:
563                pass
564            finally:
565                combined_event.set()
566
567        check_job_status_task = asyncio.create_task(check_job_status())
568        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
569        combine_events_task = asyncio.create_task(combine_events())
570
571        log = _log if _log is not None else self.daemon.rotating_log
572        lines_to_show = get_config('jobs', 'logs', 'lines_to_show')
573
574        async def emit_latest_lines():
575            nonlocal emitted_text
576            lines = log.readlines()
577            for line in lines[(-1 * lines_to_show):]:
578                if stop_event is not None and stop_event.is_set():
579                    return
580
581                if strip_timestamps:
582                    line = strip_timestamp_from_line(line)
583
584                try:
585                    if asyncio.iscoroutinefunction(callback_function):
586                        await callback_function(line)
587                    else:
588                        callback_function(line)
589                    emitted_text = True
590                except StopMonitoringLogs:
591                    return
592                except Exception:
593                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
594
595        await emit_latest_lines()
596
597        tasks = (
598            [check_job_status_task]
599            + ([check_blocking_on_input_task] if accept_input else [])
600            + [combine_events_task]
601        )
602        try:
603            _ = asyncio.gather(*tasks, return_exceptions=True)
604        except asyncio.exceptions.CancelledError:
605            raise
606        except Exception:
607            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
608
609        watchfiles = mrsm.attempt_import('watchfiles')
610        async for changes in watchfiles.awatch(
611            _logs_path or LOGS_RESOURCES_PATH,
612            stop_event=combined_event,
613        ):
614            for change in changes:
615                file_path_str = change[1]
616                file_path = pathlib.Path(file_path_str)
617                latest_subfile_path = log.get_latest_subfile_path()
618                if latest_subfile_path != file_path:
619                    continue
620
621                await emit_latest_lines()
622
623        await emit_latest_lines()

Monitor the job's log files and await a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default partial(print, end='')): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
def is_blocking_on_stdin(self, debug: bool = False) -> bool:
625    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
626        """
627        Return whether a job's daemon is blocking on stdin.
628        """
629        if self.executor is not None:
630            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
631
632        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()

Return whether a job's daemon is blocking on stdin.

def write_stdin(self, data):
634    def write_stdin(self, data):
635        """
636        Write to a job's daemon's `stdin`.
637        """
638        self.daemon.stdin_file.write(data)

Write to a job's daemon's stdin.

executor: Optional[Executor]
640    @property
641    def executor(self) -> Union[Executor, None]:
642        """
643        If the job is remote, return the connector to the remote API instance.
644        """
645        return (
646            mrsm.get_connector(self.executor_keys)
647            if self.executor_keys != 'local'
648            else None
649        )

If the job is remote, return the connector to the remote API instance.

status: str
651    @property
652    def status(self) -> str:
653        """
654        Return the running status of the job's daemon.
655        """
656        if '_status_hook' in self.__dict__:
657            return self._status_hook()
658
659        if self.executor is not None:
660            return self.executor.get_job_status(self.name)
661
662        return self.daemon.status

Return the running status of the job's daemon.

pid: Optional[int]
664    @property
665    def pid(self) -> Union[int, None]:
666        """
667        Return the PID of the job's dameon.
668        """
669        if self.executor is not None:
670            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
671
672        return self.daemon.pid

Return the PID of the job's dameon.

restart: bool
674    @property
675    def restart(self) -> bool:
676        """
677        Return whether to restart a stopped job.
678        """
679        if self.executor is not None:
680            return self.executor.get_job_metadata(self.name).get('restart', False)
681
682        return self.daemon.properties.get('restart', False)

Return whether to restart a stopped job.

result: Tuple[bool, str]
684    @property
685    def result(self) -> SuccessTuple:
686        """
687        Return the `SuccessTuple` when the job has terminated.
688        """
689        if self.is_running():
690            return True, f"{self} is running."
691
692        if '_result_hook' in self.__dict__:
693            return self._result_hook()
694
695        if self.executor is not None:
696            return (
697                self.executor.get_job_metadata(self.name)
698                .get('result', (False, "No result available."))
699            )
700
701        _result = self.daemon.properties.get('result', None)
702        if _result is None:
703            return False, "No result available."
704
705        return tuple(_result)

Return the SuccessTuple when the job has terminated.

sysargs: List[str]
707    @property
708    def sysargs(self) -> List[str]:
709        """
710        Return the sysargs to use for the Daemon.
711        """
712        if self._sysargs:
713            return self._sysargs
714
715        if self.executor is not None:
716            return self.executor.get_job_metadata(self.name).get('sysargs', [])
717
718        target_args = self.daemon.target_args
719        if target_args is None:
720            return []
721        self._sysargs = target_args[0] if len(target_args) > 0 else []
722        return self._sysargs

Return the sysargs to use for the Daemon.

daemon: "'Daemon'"
724    @property
725    def daemon(self) -> 'Daemon':
726        """
727        Return the daemon which this job manages.
728        """
729        from meerschaum.utils.daemon import Daemon
730        if self._daemon is not None and self.executor is None and self._sysargs:
731            return self._daemon
732
733        remote_properties = (
734            {}
735            if self.executor is None
736            else self.executor.get_job_properties(self.name)
737        )
738        properties = {**remote_properties, **self._properties_patch}
739
740        self._daemon = Daemon(
741            target=entry,
742            target_args=[self._sysargs],
743            target_kw={},
744            daemon_id=self.name,
745            label=shlex.join(self._sysargs),
746            properties=properties,
747        )
748        if '_rotating_log' in self.__dict__:
749            self._daemon._rotating_log = self._rotating_log
750
751        if '_stdin_file' in self.__dict__:
752            self._daemon._stdin_file = self._stdin_file
753            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
754
755        return self._daemon

Return the daemon which this job manages.

began: Optional[datetime.datetime]
757    @property
758    def began(self) -> Union[datetime, None]:
759        """
760        The datetime when the job began running.
761        """
762        if self.executor is not None:
763            began_str = self.executor.get_job_began(self.name)
764            if began_str is None:
765                return None
766            return (
767                datetime.fromisoformat(began_str)
768                .astimezone(timezone.utc)
769                .replace(tzinfo=None)
770            )
771
772        began_str = self.daemon.properties.get('process', {}).get('began', None)
773        if began_str is None:
774            return None
775
776        return datetime.fromisoformat(began_str)

The datetime when the job began running.

ended: Optional[datetime.datetime]
778    @property
779    def ended(self) -> Union[datetime, None]:
780        """
781        The datetime when the job stopped running.
782        """
783        if self.executor is not None:
784            ended_str = self.executor.get_job_ended(self.name)
785            if ended_str is None:
786                return None
787            return (
788                datetime.fromisoformat(ended_str)
789                .astimezone(timezone.utc)
790                .replace(tzinfo=None)
791            )
792
793        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
794        if ended_str is None:
795            return None
796
797        return datetime.fromisoformat(ended_str)

The datetime when the job stopped running.

paused: Optional[datetime.datetime]
799    @property
800    def paused(self) -> Union[datetime, None]:
801        """
802        The datetime when the job was suspended while running.
803        """
804        if self.executor is not None:
805            paused_str = self.executor.get_job_paused(self.name)
806            if paused_str is None:
807                return None
808            return (
809                datetime.fromisoformat(paused_str)
810                .astimezone(timezone.utc)
811                .replace(tzinfo=None)
812            )
813
814        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
815        if paused_str is None:
816            return None
817
818        return datetime.fromisoformat(paused_str)

The datetime when the job was suspended while running.

stop_time: Optional[datetime.datetime]
820    @property
821    def stop_time(self) -> Union[datetime, None]:
822        """
823        Return the timestamp when the job was manually stopped.
824        """
825        if self.executor is not None:
826            return self.executor.get_job_stop_time(self.name)
827
828        if not self.daemon.stop_path.exists():
829            return None
830
831        stop_data = self.daemon._read_stop_file()
832        if not stop_data:
833            return None
834
835        stop_time_str = stop_data.get('stop_time', None)
836        if not stop_time_str:
837            warn(f"Could not read stop time for {self}.")
838            return None
839
840        return datetime.fromisoformat(stop_time_str)

Return the timestamp when the job was manually stopped.

hidden: bool
842    @property
843    def hidden(self) -> bool:
844        """
845        Return a bool indicating whether this job should be displayed.
846        """
847        return (
848            self.name.startswith('_')
849            or self.name.startswith('.')
850            or self._is_externally_managed
851        )

Return a bool indicating whether this job should be displayed.

def check_restart(self) -> Tuple[bool, str]:
853    def check_restart(self) -> SuccessTuple:
854        """
855        If `restart` is `True` and the daemon is not running,
856        restart the job.
857        Do not restart if the job was manually stopped.
858        """
859        if self.is_running():
860            return True, f"{self} is running."
861
862        if not self.restart:
863            return True, f"{self} does not need to be restarted."
864
865        if self.stop_time is not None:
866            return True, f"{self} was manually stopped."
867
868        return self.start()

If restart is True and the daemon is not running, restart the job. Do not restart if the job was manually stopped.

label: str
870    @property
871    def label(self) -> str:
872        """
873        Return the job's Daemon label (joined sysargs).
874        """
875        from meerschaum._internal.arguments import compress_pipeline_sysargs
876        sysargs = compress_pipeline_sysargs(self.sysargs)
877        return shlex.join(sysargs).replace(' + ', '\n+ ')

Return the job's Daemon label (joined sysargs).

env: Dict[str, str]
906    @property
907    def env(self) -> Dict[str, str]:
908        """
909        Return the environment variables to set for the job's process.
910        """
911        if '_env' in self.__dict__:
912            return self.__dict__['_env']
913
914        _env = self.daemon.properties.get('env', {})
915        default_env = {
916            'PYTHONUNBUFFERED': '1',
917            'LINES': str(get_config('jobs', 'terminal', 'lines')),
918            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
919        }
920        self._env = {**default_env, **_env}
921        return self._env

Return the environment variables to set for the job's process.

delete_after_completion: bool
923    @property
924    def delete_after_completion(self) -> bool:
925        """
926        Return whether this job is configured to delete itself after completion.
927        """
928        if '_delete_after_completion' in self.__dict__:
929            return self.__dict__.get('_delete_after_completion', False)
930
931        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
932        return self._delete_after_completion

Return whether this job is configured to delete itself after completion.

def get_jobs( executor_keys: Optional[str] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
 38def get_jobs(
 39    executor_keys: Optional[str] = None,
 40    include_hidden: bool = False,
 41    combine_local_and_systemd: bool = True,
 42    debug: bool = False,
 43) -> Dict[str, Job]:
 44    """
 45    Return a dictionary of the existing jobs.
 46
 47    Parameters
 48    ----------
 49    executor_keys: Optional[str], default None
 50        If provided, return remote jobs on the given API instance.
 51        Otherwise return local jobs.
 52
 53    include_hidden: bool, default False
 54        If `True`, include jobs with the `hidden` attribute.
 55
 56    Returns
 57    -------
 58    A dictionary mapping job names to jobs.
 59    """
 60    from meerschaum.connectors.parse import parse_executor_keys
 61    executor_keys = executor_keys or get_executor_keys_from_context()
 62
 63    include_local_and_system = (
 64        combine_local_and_systemd
 65        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 66        and get_executor_keys_from_context() == 'systemd'
 67    )
 68
 69    def _get_local_jobs():
 70        from meerschaum.utils.daemon import get_daemons
 71        daemons = get_daemons()
 72        jobs = {
 73            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 74            for daemon in daemons
 75        }
 76        return {
 77            name: job
 78            for name, job in jobs.items()
 79            if (include_hidden or not job.hidden) and not job._is_externally_managed
 80
 81        }
 82
 83    def _get_systemd_jobs():
 84        conn = mrsm.get_connector('systemd')
 85        jobs = conn.get_jobs(debug=debug)
 86        return {
 87            name: job
 88            for name, job in jobs.items()
 89            if include_hidden or not job.hidden
 90        }
 91
 92    if include_local_and_system:
 93        local_jobs = _get_local_jobs()
 94        systemd_jobs = _get_systemd_jobs()
 95        shared_jobs = set(local_jobs) & set(systemd_jobs)
 96        if shared_jobs:
 97            from meerschaum.utils.misc import items_str
 98            from meerschaum.utils.warnings import warn
 99            warn(
100                "Job"
101                + ('s' if len(shared_jobs) != 1 else '')
102                + f" {items_str(list(shared_jobs))} "
103                + "exist"
104                + ('s' if len(shared_jobs) == 1 else '')
105                + " in both `local` and `systemd`.",
106                stack=False,
107            )
108        return {**local_jobs, **systemd_jobs}
109
110    if executor_keys == 'local':
111        return _get_local_jobs()
112
113    if executor_keys == 'systemd':
114        return _get_systemd_jobs()
115
116    try:
117        _ = parse_executor_keys(executor_keys, construct=False)
118        conn = parse_executor_keys(executor_keys)
119        jobs = conn.get_jobs(debug=debug)
120        return {
121            name: job
122            for name, job in jobs.items()
123            if include_hidden or not job.hidden
124        }
125    except Exception:
126        return {}

Return a dictionary of the existing jobs.

Parameters
  • executor_keys (Optional[str], default None): If provided, return remote jobs on the given API instance. Otherwise return local jobs.
  • include_hidden (bool, default False): If True, include jobs with the hidden attribute.
Returns
  • A dictionary mapping job names to jobs.
def get_filtered_jobs( executor_keys: Optional[str] = None, filter_list: Optional[List[str]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, warn: bool = False, debug: bool = False) -> Dict[str, Job]:
129def get_filtered_jobs(
130    executor_keys: Optional[str] = None,
131    filter_list: Optional[List[str]] = None,
132    include_hidden: bool = False,
133    combine_local_and_systemd: bool = True,
134    warn: bool = False,
135    debug: bool = False,
136) -> Dict[str, Job]:
137    """
138    Return a list of jobs filtered by the user.
139    """
140    from meerschaum.utils.warnings import warn as _warn
141    jobs = get_jobs(
142        executor_keys,
143        include_hidden=True,
144        combine_local_and_systemd=combine_local_and_systemd,
145        debug=debug,
146    )
147    if not filter_list:
148        return {
149            name: job
150            for name, job in jobs.items()
151            if include_hidden or not job.hidden
152        }
153
154    jobs_to_return = {}
155    for name in filter_list:
156        job = jobs.get(name, None)
157        if job is None:
158            if warn:
159                _warn(
160                    f"Job '{name}' does not exist.",
161                    stack=False,
162                )
163            continue
164        jobs_to_return[name] = job
165
166    return jobs_to_return

Return a list of jobs filtered by the user.

def get_restart_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
169def get_restart_jobs(
170    executor_keys: Optional[str] = None,
171    jobs: Optional[Dict[str, Job]] = None,
172    include_hidden: bool = False,
173    combine_local_and_systemd: bool = True,
174    debug: bool = False,
175) -> Dict[str, Job]:
176    """
177    Return jobs which were created with `--restart` or `--loop`.
178    """
179    if jobs is None:
180        jobs = get_jobs(
181            executor_keys,
182            include_hidden=include_hidden,
183            combine_local_and_systemd=combine_local_and_systemd,
184            debug=debug,
185        )
186
187    return {
188        name: job
189        for name, job in jobs.items()
190        if job.restart
191    }

Return jobs which were created with --restart or --loop.

def get_running_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
194def get_running_jobs(
195    executor_keys: Optional[str] = None,
196    jobs: Optional[Dict[str, Job]] = None,
197    include_hidden: bool = False,
198    combine_local_and_systemd: bool = True,
199    debug: bool = False,
200) -> Dict[str, Job]:
201    """
202    Return a dictionary of running jobs.
203    """
204    if jobs is None:
205        jobs = get_jobs(
206            executor_keys,
207            include_hidden=include_hidden,
208            combine_local_and_systemd=combine_local_and_systemd,
209            debug=debug,
210        )
211
212    return {
213        name: job
214        for name, job in jobs.items()
215        if job.status == 'running'
216    }

Return a dictionary of running jobs.

def get_stopped_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
244def get_stopped_jobs(
245    executor_keys: Optional[str] = None,
246    jobs: Optional[Dict[str, Job]] = None,
247    include_hidden: bool = False,
248    combine_local_and_systemd: bool = True,
249    debug: bool = False,
250) -> Dict[str, Job]:
251    """
252    Return a dictionary of stopped jobs.
253    """
254    if jobs is None:
255        jobs = get_jobs(
256            executor_keys,
257            include_hidden=include_hidden,
258            combine_local_and_systemd=combine_local_and_systemd,
259            debug=debug,
260        )
261
262    return {
263        name: job
264        for name, job in jobs.items()
265        if job.status == 'stopped'
266    }

Return a dictionary of stopped jobs.

def get_paused_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
219def get_paused_jobs(
220    executor_keys: Optional[str] = None,
221    jobs: Optional[Dict[str, Job]] = None,
222    include_hidden: bool = False,
223    combine_local_and_systemd: bool = True,
224    debug: bool = False,
225) -> Dict[str, Job]:
226    """
227    Return a dictionary of paused jobs.
228    """
229    if jobs is None:
230        jobs = get_jobs(
231            executor_keys,
232            include_hidden=include_hidden,
233            combine_local_and_systemd=combine_local_and_systemd,
234            debug=debug,
235        )
236
237    return {
238        name: job
239        for name, job in jobs.items()
240        if job.status == 'paused'
241    }

Return a dictionary of paused jobs.

def make_executor(cls):
269def make_executor(cls):
270    """
271    Register a class as an `Executor`.
272    """
273    import re
274    from meerschaum.connectors import make_connector
275    suffix_regex = r'executor$'
276    typ = re.sub(suffix_regex, '', cls.__name__.lower())
277    if typ not in executor_types:
278        executor_types.append(typ)
279    return make_connector(cls, _is_executor=True)

Register a class as an Executor.

class Executor(meerschaum.connectors._Connector.Connector):
19class Executor(Connector):
20    """
21    Define the methods for managing jobs.
22    """
23
24    @abstractmethod
25    def get_job_exists(self, name: str, debug: bool = False) -> bool:
26        """
27        Return whether a job exists.
28        """
29    
30    @abstractmethod
31    def get_jobs(self) -> Dict[str, Job]:
32        """
33        Return a dictionary of names -> Jobs.
34        """
35
36    @abstractmethod
37    def create_job(
38        self,
39        name: str,
40        sysargs: List[str],
41        properties: Optional[Dict[str, Any]] = None,
42        debug: bool = False,
43    ) -> SuccessTuple:
44        """
45        Create a new job.
46        """
47
48    @abstractmethod
49    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
50        """
51        Start a job.
52        """
53
54    @abstractmethod
55    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
56        """
57        Stop a job.
58        """
59
60    @abstractmethod
61    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
62        """
63        Pause a job.
64        """
65
66    @abstractmethod
67    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
68        """
69        Delete a job.
70        """
71
72    @abstractmethod
73    def get_logs(self, name: str, debug: bool = False) -> str:
74        """
75        Return a job's log output.
76        """

Define the methods for managing jobs.

@abstractmethod
def get_job_exists(self, name: str, debug: bool = False) -> bool:
24    @abstractmethod
25    def get_job_exists(self, name: str, debug: bool = False) -> bool:
26        """
27        Return whether a job exists.
28        """

Return whether a job exists.

@abstractmethod
def get_jobs(self) -> Dict[str, Job]:
30    @abstractmethod
31    def get_jobs(self) -> Dict[str, Job]:
32        """
33        Return a dictionary of names -> Jobs.
34        """

Return a dictionary of names -> Jobs.

@abstractmethod
def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
36    @abstractmethod
37    def create_job(
38        self,
39        name: str,
40        sysargs: List[str],
41        properties: Optional[Dict[str, Any]] = None,
42        debug: bool = False,
43    ) -> SuccessTuple:
44        """
45        Create a new job.
46        """

Create a new job.

@abstractmethod
def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
48    @abstractmethod
49    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
50        """
51        Start a job.
52        """

Start a job.

@abstractmethod
def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
54    @abstractmethod
55    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
56        """
57        Stop a job.
58        """

Stop a job.

@abstractmethod
def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
60    @abstractmethod
61    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
62        """
63        Pause a job.
64        """

Pause a job.

@abstractmethod
def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
66    @abstractmethod
67    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
68        """
69        Delete a job.
70        """

Delete a job.

@abstractmethod
def get_logs(self, name: str, debug: bool = False) -> str:
72    @abstractmethod
73    def get_logs(self, name: str, debug: bool = False) -> str:
74        """
75        Return a job's log output.
76        """

Return a job's log output.

Inherited Members
meerschaum.connectors._Connector.Connector
Connector
verify_attributes
meta
type
label
def check_restart_jobs( executor_keys: Optional[str] = 'local', jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = True, silent: bool = False, debug: bool = False) -> Tuple[bool, str]:
282def check_restart_jobs(
283    executor_keys: Optional[str] = 'local',
284    jobs: Optional[Dict[str, Job]] = None,
285    include_hidden: bool = True,
286    silent: bool = False,
287    debug: bool = False,
288) -> SuccessTuple:
289    """
290    Restart any stopped jobs which were created with `--restart`.
291
292    Parameters
293    ----------
294    executor_keys: Optional[str], default None
295        If provided, check jobs on the given remote API instance.
296        Otherwise check local jobs.
297
298    include_hidden: bool, default True
299        If `True`, include hidden jobs in the check.
300
301    silent: bool, default False
302        If `True`, do not print the restart success message.
303    """
304    from meerschaum.utils.misc import items_str
305
306    if jobs is None:
307        jobs = get_jobs(
308            executor_keys,
309            include_hidden=include_hidden,
310            combine_local_and_systemd=False,
311            debug=debug,
312        )
313
314    if not jobs:
315        return True, "No jobs to restart."
316
317    results = {}
318    for name, job in jobs.items():
319        check_success, check_msg = job.check_restart()
320        results[job.name] = (check_success, check_msg)
321        if not silent:
322            mrsm.pprint((check_success, check_msg))
323
324    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
325    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
326    success = len(success_names) == len(jobs)
327    msg = (
328        (
329            "Successfully restarted job"
330            + ('s' if len(success_names) != 1 else '')
331            + ' ' + items_str(success_names) + '.'
332        )
333        if success
334        else (
335            "Failed to restart job"
336            + ('s' if len(success_names) != 1 else '')
337            + ' ' + items_str(fail_names) + '.'
338        )
339    )
340    return success, msg

Restart any stopped jobs which were created with --restart.

Parameters
  • executor_keys (Optional[str], default None): If provided, check jobs on the given remote API instance. Otherwise check local jobs.
  • include_hidden (bool, default True): If True, include hidden jobs in the check.
  • silent (bool, default False): If True, do not print the restart success message.
def start_check_jobs_thread():
352def start_check_jobs_thread():
353    """
354    Start a thread to regularly monitor jobs.
355    """
356    import atexit
357    from functools import partial
358    from meerschaum.utils.threading import RepeatTimer
359    from meerschaum.config.static import STATIC_CONFIG
360
361    global _check_loop_stop_thread
362    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
363
364    _check_loop_stop_thread = RepeatTimer(
365        sleep_seconds,
366        partial(
367            _check_restart_jobs_against_lock,
368            silent=True,
369        )
370    )
371    _check_loop_stop_thread.daemon = True
372    atexit.register(stop_check_jobs_thread)
373
374    _check_loop_stop_thread.start()
375    return _check_loop_stop_thread

Start a thread to regularly monitor jobs.

def stop_check_jobs_thread():
378def stop_check_jobs_thread():
379    """
380    Stop the job monitoring thread.
381    """
382    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
383    from meerschaum.utils.warnings import warn
384    if _check_loop_stop_thread is None:
385        return
386
387    _check_loop_stop_thread.cancel()
388
389    try:
390        if CHECK_JOBS_LOCK_PATH.exists():
391            CHECK_JOBS_LOCK_PATH.unlink()
392    except Exception as e:
393        warn(f"Failed to remove check jobs lock file:\n{e}")

Stop the job monitoring thread.