meerschaum.jobs

Higher-level utilities for managing meerschaum.utils.daemon.Daemon.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Higher-level utilities for managing `meerschaum.utils.daemon.Daemon`.
  7"""
  8
  9import meerschaum as mrsm
 10from meerschaum.utils.typing import Dict, Optional, List, SuccessTuple
 11
 12from meerschaum.jobs._Job import Job, StopMonitoringLogs
 13from meerschaum.jobs._Executor import Executor
 14
 15__all__ = (
 16    'Job',
 17    'StopMonitoringLogs',
 18    'systemd',
 19    'get_jobs',
 20    'get_filtered_jobs',
 21    'get_restart_jobs',
 22    'get_running_jobs',
 23    'get_stopped_jobs',
 24    'get_paused_jobs',
 25    'get_restart_jobs',
 26    'make_executor',
 27    'Executor',
 28    'check_restart_jobs',
 29    'start_check_jobs_thread',
 30    'stop_check_jobs_thread',
 31)
 32
 33executor_types: List[str] = ['api', 'local']
 34
 35
 36def get_jobs(
 37    executor_keys: Optional[str] = None,
 38    include_hidden: bool = False,
 39    combine_local_and_systemd: bool = True,
 40    debug: bool = False,
 41) -> Dict[str, Job]:
 42    """
 43    Return a dictionary of the existing jobs.
 44
 45    Parameters
 46    ----------
 47    executor_keys: Optional[str], default None
 48        If provided, return remote jobs on the given API instance.
 49        Otherwise return local jobs.
 50
 51    include_hidden: bool, default False
 52        If `True`, include jobs with the `hidden` attribute.
 53
 54    Returns
 55    -------
 56    A dictionary mapping job names to jobs.
 57    """
 58    from meerschaum.connectors.parse import parse_executor_keys
 59    executor_keys = executor_keys or get_executor_keys_from_context()
 60
 61    include_local_and_system = (
 62        combine_local_and_systemd
 63        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 64        and get_executor_keys_from_context() == 'systemd'
 65    )
 66
 67    def _get_local_jobs():
 68        from meerschaum.utils.daemon import get_daemons
 69        daemons = get_daemons()
 70        jobs = {
 71            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 72            for daemon in daemons
 73        }
 74        return {
 75            name: job
 76            for name, job in jobs.items()
 77            if (include_hidden or not job.hidden) and not job._is_externally_managed
 78
 79        }
 80
 81    def _get_systemd_jobs():
 82        from meerschaum.jobs.systemd import SystemdExecutor
 83        conn = SystemdExecutor('systemd')
 84        jobs = conn.get_jobs(debug=debug)
 85        return {
 86            name: job
 87            for name, job in jobs.items()
 88            if include_hidden or not job.hidden
 89        }
 90
 91    if include_local_and_system:
 92        local_jobs = _get_local_jobs()
 93        systemd_jobs = _get_systemd_jobs()
 94        shared_jobs = set(local_jobs) & set(systemd_jobs)
 95        if shared_jobs:
 96            from meerschaum.utils.misc import items_str
 97            from meerschaum.utils.warnings import warn
 98            warn(
 99                "Job"
100                + ('s' if len(shared_jobs) != 1 else '')
101                + f" {items_str(list(shared_jobs))} "
102                + "exist"
103                + ('s' if len(shared_jobs) == 1 else '')
104                + " in both `local` and `systemd`.",
105                stack=False,
106            )
107        return {**local_jobs, **systemd_jobs}
108
109    if executor_keys == 'local':
110        return _get_local_jobs()
111
112    if executor_keys == 'systemd':
113        return _get_systemd_jobs()
114
115    try:
116        _ = parse_executor_keys(executor_keys, construct=False)
117        conn = parse_executor_keys(executor_keys)
118        jobs = conn.get_jobs(debug=debug)
119        return {
120            name: job
121            for name, job in jobs.items()
122            if include_hidden or not job.hidden
123        }
124    except Exception:
125        return {}
126
127
128def get_filtered_jobs(
129    executor_keys: Optional[str] = None,
130    filter_list: Optional[List[str]] = None,
131    include_hidden: bool = False,
132    combine_local_and_systemd: bool = True,
133    warn: bool = False,
134    debug: bool = False,
135) -> Dict[str, Job]:
136    """
137    Return a list of jobs filtered by the user.
138    """
139    from meerschaum.utils.warnings import warn as _warn
140    jobs = get_jobs(
141        executor_keys,
142        include_hidden=True,
143        combine_local_and_systemd=combine_local_and_systemd,
144        debug=debug,
145    )
146    if not filter_list:
147        return {
148            name: job
149            for name, job in jobs.items()
150            if include_hidden or not job.hidden
151        }
152
153    jobs_to_return = {}
154    filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')]
155    filter_list_with_underscores = [name for name in filter_list if name.startswith('_')]
156    if (
157        filter_list_without_underscores and not filter_list_with_underscores
158        or filter_list_with_underscores and not filter_list_without_underscores
159    ):
160        pass
161    for name in filter_list:
162        job = jobs.get(name, None)
163        if job is None:
164            if warn:
165                _warn(
166                    f"Job '{name}' does not exist.",
167                    stack=False,
168                )
169            continue
170        jobs_to_return[name] = job
171
172    if not jobs_to_return and filter_list_with_underscores:
173        names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores]
174        return {
175            name: job
176            for name, job in jobs.items()
177            if name not in names_to_exclude
178        }
179
180    return jobs_to_return
181
182
183def get_restart_jobs(
184    executor_keys: Optional[str] = None,
185    jobs: Optional[Dict[str, Job]] = None,
186    include_hidden: bool = False,
187    combine_local_and_systemd: bool = True,
188    debug: bool = False,
189) -> Dict[str, Job]:
190    """
191    Return jobs which were created with `--restart` or `--loop`.
192    """
193    if jobs is None:
194        jobs = get_jobs(
195            executor_keys,
196            include_hidden=include_hidden,
197            combine_local_and_systemd=combine_local_and_systemd,
198            debug=debug,
199        )
200
201    return {
202        name: job
203        for name, job in jobs.items()
204        if job.restart
205    }
206
207
208def get_running_jobs(
209    executor_keys: Optional[str] = None,
210    jobs: Optional[Dict[str, Job]] = None,
211    include_hidden: bool = False,
212    combine_local_and_systemd: bool = True,
213    debug: bool = False,
214) -> Dict[str, Job]:
215    """
216    Return a dictionary of running jobs.
217    """
218    if jobs is None:
219        jobs = get_jobs(
220            executor_keys,
221            include_hidden=include_hidden,
222            combine_local_and_systemd=combine_local_and_systemd,
223            debug=debug,
224        )
225
226    return {
227        name: job
228        for name, job in jobs.items()
229        if job.status == 'running'
230    }
231
232
233def get_paused_jobs(
234    executor_keys: Optional[str] = None,
235    jobs: Optional[Dict[str, Job]] = None,
236    include_hidden: bool = False,
237    combine_local_and_systemd: bool = True,
238    debug: bool = False,
239) -> Dict[str, Job]:
240    """
241    Return a dictionary of paused jobs.
242    """
243    if jobs is None:
244        jobs = get_jobs(
245            executor_keys,
246            include_hidden=include_hidden,
247            combine_local_and_systemd=combine_local_and_systemd,
248            debug=debug,
249        )
250
251    return {
252        name: job
253        for name, job in jobs.items()
254        if job.status == 'paused'
255    }
256
257
258def get_stopped_jobs(
259    executor_keys: Optional[str] = None,
260    jobs: Optional[Dict[str, Job]] = None,
261    include_hidden: bool = False,
262    combine_local_and_systemd: bool = True,
263    debug: bool = False,
264) -> Dict[str, Job]:
265    """
266    Return a dictionary of stopped jobs.
267    """
268    if jobs is None:
269        jobs = get_jobs(
270            executor_keys,
271            include_hidden=include_hidden,
272            combine_local_and_systemd=combine_local_and_systemd,
273            debug=debug,
274        )
275
276    return {
277        name: job
278        for name, job in jobs.items()
279        if job.status == 'stopped'
280    }
281
282
283def make_executor(cls):
284    """
285    Register a class as an `Executor`.
286    """
287    import re
288    from meerschaum.connectors import make_connector
289    suffix_regex = r'executor$'
290    typ = re.sub(suffix_regex, '', cls.__name__.lower())
291    if typ not in executor_types:
292        executor_types.append(typ)
293    return make_connector(cls, _is_executor=True)
294
295
296def check_restart_jobs(
297    executor_keys: Optional[str] = 'local',
298    jobs: Optional[Dict[str, Job]] = None,
299    include_hidden: bool = True,
300    silent: bool = False,
301    debug: bool = False,
302) -> SuccessTuple:
303    """
304    Restart any stopped jobs which were created with `--restart`.
305
306    Parameters
307    ----------
308    executor_keys: Optional[str], default None
309        If provided, check jobs on the given remote API instance.
310        Otherwise check local jobs.
311
312    include_hidden: bool, default True
313        If `True`, include hidden jobs in the check.
314
315    silent: bool, default False
316        If `True`, do not print the restart success message.
317    """
318    from meerschaum.utils.misc import items_str
319
320    if jobs is None:
321        jobs = get_jobs(
322            executor_keys,
323            include_hidden=include_hidden,
324            combine_local_and_systemd=False,
325            debug=debug,
326        )
327
328    if not jobs:
329        return True, "No jobs to restart."
330
331    results = {}
332    for name, job in jobs.items():
333        check_success, check_msg = job.check_restart()
334        results[job.name] = (check_success, check_msg)
335        if not silent:
336            mrsm.pprint((check_success, check_msg))
337
338    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
339    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
340    success = len(success_names) == len(jobs)
341    msg = (
342        (
343            "Successfully restarted job"
344            + ('s' if len(success_names) != 1 else '')
345            + ' ' + items_str(success_names) + '.'
346        )
347        if success
348        else (
349            "Failed to restart job"
350            + ('s' if len(success_names) != 1 else '')
351            + ' ' + items_str(fail_names) + '.'
352        )
353    )
354    return success, msg
355
356
357def _check_restart_jobs_against_lock(*args, **kwargs):
358    import meerschaum.config.paths as paths
359    fasteners = mrsm.attempt_import('fasteners', lazy=False)
360    lock = fasteners.InterProcessLock(paths.CHECK_JOBS_LOCK_PATH)
361    with lock:
362        check_restart_jobs(*args, **kwargs)
363
364
365_check_loop_stop_thread = None
366def start_check_jobs_thread():
367    """
368    Start a thread to regularly monitor jobs.
369    """
370    import atexit
371    from functools import partial
372    from meerschaum.utils.threading import RepeatTimer
373    from meerschaum._internal.static import STATIC_CONFIG
374
375    global _check_loop_stop_thread
376    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
377
378    _check_loop_stop_thread = RepeatTimer(
379        sleep_seconds,
380        partial(
381            _check_restart_jobs_against_lock,
382            silent=True,
383        )
384    )
385    _check_loop_stop_thread.daemon = True
386    atexit.register(stop_check_jobs_thread)
387
388    _check_loop_stop_thread.start()
389    return _check_loop_stop_thread
390
391
392def stop_check_jobs_thread():
393    """
394    Stop the job monitoring thread.
395    """
396    import meerschaum.config.paths as paths
397    from meerschaum.utils.warnings import warn
398    if _check_loop_stop_thread is None:
399        return
400
401    _check_loop_stop_thread.cancel()
402
403    try:
404        if paths.CHECK_JOBS_LOCK_PATH.exists():
405            paths.CHECK_JOBS_LOCK_PATH.unlink()
406    except Exception as e:
407        warn(f"Failed to remove check jobs lock file:\n{e}")
408
409
410_context_keys = None
411def get_executor_keys_from_context() -> str:
412    """
413    If we are running on the host with the default root, default to `'systemd'`.
414    Otherwise return `'local'`.
415    """
416    global _context_keys
417
418    if _context_keys is not None:
419        return _context_keys
420
421    import meerschaum.config.paths as paths
422    from meerschaum.config import get_config
423    from meerschaum.utils.misc import is_systemd_available
424    from meerschaum.utils.packages import is_installed
425
426    configured_executor = get_config('meerschaum', 'executor', warn=False)
427    if configured_executor is not None:
428        return configured_executor
429
430    _context_keys = (
431        'systemd'
432        if (
433            is_systemd_available()
434            and paths.ROOT_DIR_PATH == paths.DEFAULT_ROOT_DIR_PATH
435            and is_installed('meerschaum', venv=None, allow_outside_venv=False)
436        )
437        else 'local'
438    )
439    return _context_keys
440
441
442def _install_healthcheck_job() -> SuccessTuple:
443    """
444    Install the systemd job which checks local jobs.
445    """
446    from meerschaum.config import get_config
447
448    enable_healthcheck = get_config('system', 'experimental', 'systemd_healthcheck')
449    if not enable_healthcheck:
450        return False, "Local healthcheck is disabled."
451
452    if get_executor_keys_from_context() != 'systemd':
453        return False, "Not running systemd."
454
455    job = Job(
456        '.local-healthcheck',
457        ['restart', 'jobs', '-e', 'local', '--loop', '--min-seconds', '60'],
458        executor_keys='systemd',
459    )
460    return job.start()
class Job:
  70class Job:
  71    """
  72    Manage a `meerschaum.utils.daemon.Daemon`, locally or remotely via the API.
  73    """
  74
  75    def __init__(
  76        self,
  77        name: str,
  78        sysargs: Union[List[str], str, None] = None,
  79        env: Optional[Dict[str, str]] = None,
  80        executor_keys: Optional[str] = None,
  81        delete_after_completion: bool = False,
  82        refresh_seconds: Union[int, float, None] = None,
  83        _properties: Optional[Dict[str, Any]] = None,
  84        _rotating_log=None,
  85        _stdin_file=None,
  86        _status_hook: Optional[Callable[[], str]] = None,
  87        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
  88        _externally_managed: bool = False,
  89    ):
  90        """
  91        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
  92
  93        Parameters
  94        ----------
  95        name: str
  96            The name of the job to be created.
  97            This will also be used as the Daemon ID.
  98
  99        sysargs: Union[List[str], str, None], default None
 100            The sysargs of the command to be executed, e.g. 'start api'.
 101
 102        env: Optional[Dict[str, str]], default None
 103            If provided, set these environment variables in the job's process.
 104
 105        executor_keys: Optional[str], default None
 106            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
 107
 108        delete_after_completion: bool, default False
 109            If `True`, delete this job when it has finished executing.
 110
 111        refresh_seconds: Union[int, float, None], default None
 112            The number of seconds to sleep between refreshes.
 113            Defaults to the configured value `system.cli.refresh_seconds`.
 114
 115        _properties: Optional[Dict[str, Any]], default None
 116            If provided, use this to patch the daemon's properties.
 117        """
 118        from meerschaum.utils.daemon import Daemon
 119        for char in BANNED_CHARS:
 120            if char in name:
 121                raise ValueError(f"Invalid name: ({char}) is not allowed.")
 122
 123        if isinstance(sysargs, str):
 124            sysargs = shlex.split(sysargs)
 125
 126        and_key = STATIC_CONFIG['system']['arguments']['and_key']
 127        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
 128        if sysargs:
 129            sysargs = [
 130                (arg if arg != escaped_and_key else and_key)
 131                for arg in sysargs
 132            ]
 133
 134        ### NOTE: 'local' and 'systemd' executors are being coalesced.
 135        if executor_keys is None:
 136            from meerschaum.jobs import get_executor_keys_from_context
 137            executor_keys = get_executor_keys_from_context()
 138
 139        self.executor_keys = executor_keys
 140        self.name = name
 141        self.refresh_seconds = (
 142            refresh_seconds
 143            if refresh_seconds is not None
 144            else mrsm.get_config('system', 'cli', 'refresh_seconds')
 145        )
 146        try:
 147            self._daemon = (
 148                Daemon(daemon_id=name)
 149                if executor_keys == 'local'
 150                else None
 151            )
 152        except Exception:
 153            self._daemon = None
 154
 155        ### Handle any injected dependencies.
 156        if _rotating_log is not None:
 157            self._rotating_log = _rotating_log
 158            if self._daemon is not None:
 159                self._daemon._rotating_log = _rotating_log
 160
 161        if _stdin_file is not None:
 162            self._stdin_file = _stdin_file
 163            if self._daemon is not None:
 164                self._daemon._stdin_file = _stdin_file
 165                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
 166
 167        if _status_hook is not None:
 168            self._status_hook = _status_hook
 169
 170        if _result_hook is not None:
 171            self._result_hook = _result_hook
 172
 173        self._externally_managed = _externally_managed
 174        self._properties_patch = _properties or {}
 175        if _externally_managed:
 176            self._properties_patch.update({'externally_managed': _externally_managed})
 177
 178        if env:
 179            self._properties_patch.update({'env': env})
 180
 181        if delete_after_completion:
 182            self._properties_patch.update({'delete_after_completion': delete_after_completion})
 183
 184        daemon_sysargs = (
 185            self._daemon.properties.get('target', {}).get('args', [None])[0]
 186            if self._daemon is not None
 187            else None
 188        )
 189
 190        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
 191            warn("Given sysargs differ from existing sysargs.")
 192
 193        self._sysargs = [
 194            arg
 195            for arg in (daemon_sysargs or sysargs or [])
 196            if arg not in ('-d', '--daemon')
 197        ]
 198        for restart_flag in RESTART_FLAGS:
 199            if restart_flag in self._sysargs:
 200                self._properties_patch.update({'restart': True})
 201                break
 202
 203    @staticmethod
 204    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
 205        """
 206        Build a `Job` from the PID of a running Meerschaum process.
 207
 208        Parameters
 209        ----------
 210        pid: int
 211            The PID of the process.
 212
 213        executor_keys: Optional[str], default None
 214            The executor keys to assign to the job.
 215        """
 216        psutil = mrsm.attempt_import('psutil')
 217        try:
 218            process = psutil.Process(pid)
 219        except psutil.NoSuchProcess as e:
 220            warn(f"Process with PID {pid} does not exist.", stack=False)
 221            raise e
 222
 223        command_args = process.cmdline()
 224        is_daemon = command_args[1] == '-c'
 225
 226        if is_daemon:
 227            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
 228            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
 229            if root_dir is None:
 230                root_dir = paths.ROOT_DIR_PATH
 231            else:
 232                root_dir = pathlib.Path(root_dir)
 233            jobs_dir = root_dir / paths.DAEMON_RESOURCES_PATH.name
 234            daemon_dir = jobs_dir / daemon_id
 235            pid_file = daemon_dir / 'process.pid'
 236
 237            if pid_file.exists():
 238                with open(pid_file, 'r', encoding='utf-8') as f:
 239                    daemon_pid = int(f.read())
 240
 241                if pid != daemon_pid:
 242                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
 243            else:
 244                raise EnvironmentError(f"Is job '{daemon_id}' running?")
 245
 246            return Job(daemon_id, executor_keys=executor_keys)
 247
 248        from meerschaum._internal.arguments._parse_arguments import parse_arguments
 249        from meerschaum.utils.daemon import get_new_daemon_name
 250
 251        mrsm_ix = 0
 252        for i, arg in enumerate(command_args):
 253            if 'mrsm' in arg or 'meerschaum' in arg.lower():
 254                mrsm_ix = i
 255                break
 256
 257        sysargs = command_args[mrsm_ix+1:]
 258        kwargs = parse_arguments(sysargs)
 259        name = kwargs.get('name', get_new_daemon_name())
 260        return Job(name, sysargs, executor_keys=executor_keys)
 261
 262    def start(self, debug: bool = False) -> SuccessTuple:
 263        """
 264        Start the job's daemon.
 265        """
 266        if self.executor is not None:
 267            if not self.exists(debug=debug):
 268                return self.executor.create_job(
 269                    self.name,
 270                    self.sysargs,
 271                    properties=self.daemon.properties,
 272                    debug=debug,
 273                )
 274            return self.executor.start_job(self.name, debug=debug)
 275
 276        if self.is_running():
 277            return True, f"{self} is already running."
 278
 279        success, msg = self.daemon.run(
 280            keep_daemon_output=(not self.delete_after_completion),
 281            allow_dirty_run=True,
 282        )
 283        if not success:
 284            return success, msg
 285
 286        return success, f"Started {self}."
 287
 288    def stop(
 289        self,
 290        timeout_seconds: Union[int, float, None] = None,
 291        debug: bool = False,
 292    ) -> SuccessTuple:
 293        """
 294        Stop the job's daemon.
 295        """
 296        if self.executor is not None:
 297            return self.executor.stop_job(self.name, debug=debug)
 298
 299        if self.daemon.status == 'stopped':
 300            if not self.restart:
 301                return True, f"{self} is not running."
 302            elif self.stop_time is not None:
 303                return True, f"{self} will not restart until manually started."
 304
 305        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
 306        if quit_success:
 307            return quit_success, f"Stopped {self}."
 308
 309        warn(
 310            f"Failed to gracefully quit {self}.",
 311            stack=False,
 312        )
 313        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
 314        if not kill_success:
 315            return kill_success, kill_msg
 316
 317        return kill_success, f"Killed {self}."
 318
 319    def pause(
 320        self,
 321        timeout_seconds: Union[int, float, None] = None,
 322        debug: bool = False,
 323    ) -> SuccessTuple:
 324        """
 325        Pause the job's daemon.
 326        """
 327        if self.executor is not None:
 328            return self.executor.pause_job(self.name, debug=debug)
 329
 330        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
 331        if not pause_success:
 332            return pause_success, pause_msg
 333
 334        return pause_success, f"Paused {self}."
 335
 336    def delete(self, debug: bool = False) -> SuccessTuple:
 337        """
 338        Delete the job and its daemon.
 339        """
 340        if self.executor is not None:
 341            return self.executor.delete_job(self.name, debug=debug)
 342
 343        if self.is_running():
 344            stop_success, stop_msg = self.stop()
 345            if not stop_success:
 346                return stop_success, stop_msg
 347
 348        cleanup_success, cleanup_msg = self.daemon.cleanup()
 349        if not cleanup_success:
 350            return cleanup_success, cleanup_msg
 351
 352        _ = self.daemon._properties.pop('result', None)
 353        return cleanup_success, f"Deleted {self}."
 354
 355    def is_running(self) -> bool:
 356        """
 357        Determine whether the job's daemon is running.
 358        """
 359        return self.status == 'running'
 360
 361    def exists(self, debug: bool = False) -> bool:
 362        """
 363        Determine whether the job exists.
 364        """
 365        if self.executor is not None:
 366            return self.executor.get_job_exists(self.name, debug=debug)
 367
 368        return self.daemon.path.exists()
 369
 370    def get_logs(self) -> Union[str, None]:
 371        """
 372        Return the output text of the job's daemon.
 373        """
 374        if self.executor is not None:
 375            return self.executor.get_logs(self.name)
 376
 377        return self.daemon.log_text
 378
 379    def monitor_logs(
 380        self,
 381        callback_function: Callable[[str], None] = _default_stdout_callback,
 382        input_callback_function: Optional[Callable[[], str]] = None,
 383        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
 384        stop_event: Optional[asyncio.Event] = None,
 385        stop_on_exit: bool = False,
 386        strip_timestamps: bool = False,
 387        accept_input: bool = True,
 388        debug: bool = False,
 389        _logs_path: Optional[pathlib.Path] = None,
 390        _log=None,
 391        _stdin_file=None,
 392        _wait_if_stopped: bool = True,
 393    ):
 394        """
 395        Monitor the job's log files and execute a callback on new lines.
 396
 397        Parameters
 398        ----------
 399        callback_function: Callable[[str], None], default partial(print, end='')
 400            The callback to execute as new data comes in.
 401            Defaults to printing the output directly to `stdout`.
 402
 403        input_callback_function: Optional[Callable[[], str]], default None
 404            If provided, execute this callback when the daemon is blocking on stdin.
 405            Defaults to `sys.stdin.readline()`.
 406
 407        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
 408            If provided, execute this callback when the daemon stops.
 409            The job's SuccessTuple will be passed to the callback.
 410
 411        stop_event: Optional[asyncio.Event], default None
 412            If provided, stop monitoring when this event is set.
 413            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
 414            from within `callback_function` to stop monitoring.
 415
 416        stop_on_exit: bool, default False
 417            If `True`, stop monitoring when the job stops.
 418
 419        strip_timestamps: bool, default False
 420            If `True`, remove leading timestamps from lines.
 421
 422        accept_input: bool, default True
 423            If `True`, accept input when the daemon blocks on stdin.
 424        """
 425        if self.executor is not None:
 426            self.executor.monitor_logs(
 427                self.name,
 428                callback_function,
 429                input_callback_function=input_callback_function,
 430                stop_callback_function=stop_callback_function,
 431                stop_on_exit=stop_on_exit,
 432                accept_input=accept_input,
 433                strip_timestamps=strip_timestamps,
 434                debug=debug,
 435            )
 436            return
 437
 438        monitor_logs_coroutine = self.monitor_logs_async(
 439            callback_function=callback_function,
 440            input_callback_function=input_callback_function,
 441            stop_callback_function=stop_callback_function,
 442            stop_event=stop_event,
 443            stop_on_exit=stop_on_exit,
 444            strip_timestamps=strip_timestamps,
 445            accept_input=accept_input,
 446            debug=debug,
 447            _logs_path=_logs_path,
 448            _log=_log,
 449            _stdin_file=_stdin_file,
 450            _wait_if_stopped=_wait_if_stopped,
 451        )
 452        return asyncio.run(monitor_logs_coroutine)
 453
 454    async def monitor_logs_async(
 455        self,
 456        callback_function: Callable[[str], None] = _default_stdout_callback,
 457        input_callback_function: Optional[Callable[[], str]] = None,
 458        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
 459        stop_event: Optional[asyncio.Event] = None,
 460        stop_on_exit: bool = False,
 461        strip_timestamps: bool = False,
 462        accept_input: bool = True,
 463        debug: bool = False,
 464        _logs_path: Optional[pathlib.Path] = None,
 465        _log=None,
 466        _stdin_file=None,
 467        _wait_if_stopped: bool = True,
 468    ):
 469        """
 470        Monitor the job's log files and await a callback on new lines.
 471
 472        Parameters
 473        ----------
 474        callback_function: Callable[[str], None], default _default_stdout_callback
 475            The callback to execute as new data comes in.
 476            Defaults to printing the output directly to `stdout`.
 477
 478        input_callback_function: Optional[Callable[[], str]], default None
 479            If provided, execute this callback when the daemon is blocking on stdin.
 480            Defaults to `sys.stdin.readline()`.
 481
 482        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
 483            If provided, execute this callback when the daemon stops.
 484            The job's SuccessTuple will be passed to the callback.
 485
 486        stop_event: Optional[asyncio.Event], default None
 487            If provided, stop monitoring when this event is set.
 488            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
 489            from within `callback_function` to stop monitoring.
 490
 491        stop_on_exit: bool, default False
 492            If `True`, stop monitoring when the job stops.
 493
 494        strip_timestamps: bool, default False
 495            If `True`, remove leading timestamps from lines.
 496
 497        accept_input: bool, default True
 498            If `True`, accept input when the daemon blocks on stdin.
 499        """
 500        from meerschaum.utils.prompt import prompt
 501
 502        def default_input_callback_function():
 503            prompt_kwargs = self.get_prompt_kwargs(debug=debug)
 504            if prompt_kwargs:
 505                answer = prompt(**prompt_kwargs)
 506                return answer + '\n'
 507            return sys.stdin.readline()
 508
 509        if input_callback_function is None:
 510            input_callback_function = default_input_callback_function
 511
 512        if self.executor is not None:
 513            await self.executor.monitor_logs_async(
 514                self.name,
 515                callback_function,
 516                input_callback_function=input_callback_function,
 517                stop_callback_function=stop_callback_function,
 518                stop_on_exit=stop_on_exit,
 519                strip_timestamps=strip_timestamps,
 520                accept_input=accept_input,
 521                debug=debug,
 522            )
 523            return
 524
 525        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
 526
 527        events = {
 528            'user': stop_event,
 529            'stopped': asyncio.Event(),
 530            'stop_token': asyncio.Event(),
 531            'stop_exception': asyncio.Event(),
 532            'stopped_timeout': asyncio.Event(),
 533        }
 534        combined_event = asyncio.Event()
 535        emitted_text = False
 536        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
 537
 538        async def check_job_status():
 539            if not stop_on_exit:
 540                return
 541
 542            nonlocal emitted_text
 543
 544            sleep_time = 0.1
 545            while sleep_time < 0.2:
 546                if self.status == 'stopped':
 547                    if not emitted_text and _wait_if_stopped:
 548                        await asyncio.sleep(sleep_time)
 549                        sleep_time = round(sleep_time * 1.1, 3)
 550                        continue
 551
 552                    if stop_callback_function is not None:
 553                        try:
 554                            if asyncio.iscoroutinefunction(stop_callback_function):
 555                                await stop_callback_function(self.result)
 556                            else:
 557                                stop_callback_function(self.result)
 558                        except asyncio.exceptions.CancelledError:
 559                            break
 560                        except Exception:
 561                            warn(traceback.format_exc())
 562
 563                    if stop_on_exit:
 564                        events['stopped'].set()
 565
 566                    break
 567                await asyncio.sleep(0.1)
 568
 569            events['stopped_timeout'].set()
 570
 571        async def check_blocking_on_input():
 572            while True:
 573                if not emitted_text or not self.is_blocking_on_stdin():
 574                    try:
 575                        await asyncio.sleep(self.refresh_seconds)
 576                    except asyncio.exceptions.CancelledError:
 577                        break
 578                    continue
 579
 580                if not self.is_running():
 581                    break
 582
 583                await emit_latest_lines()
 584
 585                try:
 586                    print('', end='', flush=True)
 587                    if asyncio.iscoroutinefunction(input_callback_function):
 588                        data = await input_callback_function()
 589                    else:
 590                        loop = asyncio.get_running_loop()
 591                        data = await loop.run_in_executor(None, input_callback_function)
 592                except KeyboardInterrupt:
 593                    break
 594                #  if not data.endswith('\n'):
 595                    #  data += '\n'
 596
 597                stdin_file.write(data)
 598                await asyncio.sleep(self.refresh_seconds)
 599
 600        async def combine_events():
 601            event_tasks = [
 602                asyncio.create_task(event.wait())
 603                for event in events.values()
 604                if event is not None
 605            ]
 606            if not event_tasks:
 607                return
 608
 609            try:
 610                done, pending = await asyncio.wait(
 611                    event_tasks,
 612                    return_when=asyncio.FIRST_COMPLETED,
 613                )
 614                for task in pending:
 615                    task.cancel()
 616            except asyncio.exceptions.CancelledError:
 617                pass
 618            finally:
 619                combined_event.set()
 620
 621        check_job_status_task = asyncio.create_task(check_job_status())
 622        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
 623        combine_events_task = asyncio.create_task(combine_events())
 624
 625        log = _log if _log is not None else self.daemon.rotating_log
 626        lines_to_show = (
 627            self.daemon.properties.get(
 628                'logs', {}
 629            ).get(
 630                'lines_to_show', get_config('jobs', 'logs', 'lines_to_show')
 631            )
 632        )
 633
 634        async def emit_latest_lines():
 635            nonlocal emitted_text
 636            nonlocal stop_event
 637            lines = log.readlines()
 638            for line in lines[(-1 * lines_to_show):]:
 639                if stop_event is not None and stop_event.is_set():
 640                    return
 641
 642                line_stripped_extra = strip_timestamp_from_line(line.strip())
 643                line_stripped = strip_timestamp_from_line(line)
 644
 645                if line_stripped_extra == STOP_TOKEN:
 646                    events['stop_token'].set()
 647                    return
 648
 649                if line_stripped_extra == CLEAR_TOKEN:
 650                    clear_screen(debug=debug)
 651                    continue
 652
 653                if line_stripped_extra == FLUSH_TOKEN.strip():
 654                    line_stripped = ''
 655                    line = ''
 656
 657                if strip_timestamps:
 658                    line = line_stripped
 659
 660                try:
 661                    if asyncio.iscoroutinefunction(callback_function):
 662                        await callback_function(line)
 663                    else:
 664                        callback_function(line)
 665                    emitted_text = True
 666                except StopMonitoringLogs:
 667                    events['stop_exception'].set()
 668                    return
 669                except Exception:
 670                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
 671
 672        await emit_latest_lines()
 673
 674        tasks = (
 675            [check_job_status_task]
 676            + ([check_blocking_on_input_task] if accept_input else [])
 677            + [combine_events_task]
 678        )
 679        try:
 680            _ = asyncio.gather(*tasks, return_exceptions=True)
 681        except asyncio.exceptions.CancelledError:
 682            raise
 683        except Exception:
 684            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
 685
 686        watchfiles = mrsm.attempt_import('watchfiles', lazy=False)
 687        dir_path_to_monitor = (
 688            _logs_path
 689            or (log.file_path.parent if log else None)
 690            or paths.LOGS_RESOURCES_PATH
 691        )
 692        async for changes in watchfiles.awatch(
 693            dir_path_to_monitor,
 694            stop_event=combined_event,
 695        ):
 696            for change in changes:
 697                file_path_str = change[1]
 698                file_path = pathlib.Path(file_path_str)
 699                latest_subfile_path = log.get_latest_subfile_path()
 700                if latest_subfile_path != file_path:
 701                    continue
 702
 703                await emit_latest_lines()
 704
 705        await emit_latest_lines()
 706
 707    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
 708        """
 709        Return whether a job's daemon is blocking on stdin.
 710        """
 711        if self.executor is not None:
 712            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
 713
 714        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()
 715
 716    def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]:
 717        """
 718        Return the kwargs to the blocking `prompt()`, if available.
 719        """
 720        if self.executor is not None:
 721            return self.executor.get_job_prompt_kwargs(self.name, debug=debug)
 722
 723        if not self.daemon.prompt_kwargs_file_path.exists():
 724            return {}
 725
 726        try:
 727            with open(self.daemon.prompt_kwargs_file_path, 'r', encoding='utf-8') as f:
 728                prompt_kwargs = json.load(f)
 729
 730            return prompt_kwargs
 731        
 732        except Exception:
 733            import traceback
 734            traceback.print_exc()
 735            return {}
 736
 737    def write_stdin(self, data):
 738        """
 739        Write to a job's daemon's `stdin`.
 740        """
 741        self.daemon.stdin_file.write(data)
 742
 743    @property
 744    def executor(self) -> Union[Executor, None]:
 745        """
 746        If the job is remote, return the connector to the remote API instance.
 747        """
 748        return (
 749            mrsm.get_connector(self.executor_keys)
 750            if self.executor_keys != 'local'
 751            else None
 752        )
 753
 754    @property
 755    def status(self) -> str:
 756        """
 757        Return the running status of the job's daemon.
 758        """
 759        if '_status_hook' in self.__dict__:
 760            return self._status_hook()
 761
 762        if self.executor is not None:
 763            return self.executor.get_job_status(self.name)
 764
 765        return self.daemon.status
 766
 767    @property
 768    def pid(self) -> Union[int, None]:
 769        """
 770        Return the PID of the job's dameon.
 771        """
 772        if self.executor is not None:
 773            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
 774
 775        return self.daemon.pid
 776
 777    @property
 778    def restart(self) -> bool:
 779        """
 780        Return whether to restart a stopped job.
 781        """
 782        if self.executor is not None:
 783            return self.executor.get_job_metadata(self.name).get('restart', False)
 784
 785        return self.daemon.properties.get('restart', False)
 786
 787    @property
 788    def result(self) -> SuccessTuple:
 789        """
 790        Return the `SuccessTuple` when the job has terminated.
 791        """
 792        if self.is_running():
 793            return True, f"{self} is running."
 794
 795        if '_result_hook' in self.__dict__:
 796            return self._result_hook()
 797
 798        if self.executor is not None:
 799            return (
 800                self.executor.get_job_metadata(self.name)
 801                .get('result', (False, "No result available."))
 802            )
 803
 804        _result = self.daemon.properties.get('result', None)
 805        if _result is None:
 806            from meerschaum.utils.daemon.Daemon import _results
 807            return _results.get(self.daemon.daemon_id, (False, "No result available."))
 808
 809        return tuple(_result)
 810
 811    @property
 812    def sysargs(self) -> List[str]:
 813        """
 814        Return the sysargs to use for the Daemon.
 815        """
 816        if self._sysargs:
 817            return self._sysargs
 818
 819        if self.executor is not None:
 820            return self.executor.get_job_metadata(self.name).get('sysargs', [])
 821
 822        target_args = self.daemon.target_args
 823        if target_args is None:
 824            return []
 825        self._sysargs = target_args[0] if len(target_args) > 0 else []
 826        return self._sysargs
 827
 828    def get_daemon_properties(self) -> Dict[str, Any]:
 829        """
 830        Return the `properties` dictionary for the job's daemon.
 831        """
 832        remote_properties = (
 833            {}
 834            if self.executor is None
 835            else self.executor.get_job_properties(self.name)
 836        )
 837        return {
 838            **remote_properties,
 839            **self._properties_patch
 840        }
 841
 842    @property
 843    def daemon(self) -> 'Daemon':
 844        """
 845        Return the daemon which this job manages.
 846        """
 847        from meerschaum.utils.daemon import Daemon
 848        if self._daemon is not None and self.executor is None and self._sysargs:
 849            return self._daemon
 850
 851        self._daemon = Daemon(
 852            target=entry,
 853            target_args=[self._sysargs],
 854            target_kw={},
 855            daemon_id=self.name,
 856            label=shlex.join(self._sysargs),
 857            properties=self.get_daemon_properties(),
 858        )
 859        if '_rotating_log' in self.__dict__:
 860            self._daemon._rotating_log = self._rotating_log
 861
 862        if '_stdin_file' in self.__dict__:
 863            self._daemon._stdin_file = self._stdin_file
 864            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
 865
 866        return self._daemon
 867
 868    @property
 869    def began(self) -> Union[datetime, None]:
 870        """
 871        The datetime when the job began running.
 872        """
 873        if self.executor is not None:
 874            began_str = self.executor.get_job_began(self.name)
 875            if began_str is None:
 876                return None
 877            return (
 878                datetime.fromisoformat(began_str)
 879                .astimezone(timezone.utc)
 880                .replace(tzinfo=None)
 881            )
 882
 883        began_str = self.daemon.properties.get('process', {}).get('began', None)
 884        if began_str is None:
 885            return None
 886
 887        return datetime.fromisoformat(began_str)
 888
 889    @property
 890    def ended(self) -> Union[datetime, None]:
 891        """
 892        The datetime when the job stopped running.
 893        """
 894        if self.executor is not None:
 895            ended_str = self.executor.get_job_ended(self.name)
 896            if ended_str is None:
 897                return None
 898            return (
 899                datetime.fromisoformat(ended_str)
 900                .astimezone(timezone.utc)
 901                .replace(tzinfo=None)
 902            )
 903
 904        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
 905        if ended_str is None:
 906            return None
 907
 908        return datetime.fromisoformat(ended_str)
 909
 910    @property
 911    def paused(self) -> Union[datetime, None]:
 912        """
 913        The datetime when the job was suspended while running.
 914        """
 915        if self.executor is not None:
 916            paused_str = self.executor.get_job_paused(self.name)
 917            if paused_str is None:
 918                return None
 919            return (
 920                datetime.fromisoformat(paused_str)
 921                .astimezone(timezone.utc)
 922                .replace(tzinfo=None)
 923            )
 924
 925        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
 926        if paused_str is None:
 927            return None
 928
 929        return datetime.fromisoformat(paused_str)
 930
 931    @property
 932    def stop_time(self) -> Union[datetime, None]:
 933        """
 934        Return the timestamp when the job was manually stopped.
 935        """
 936        if self.executor is not None:
 937            return self.executor.get_job_stop_time(self.name)
 938
 939        if not self.daemon.stop_path.exists():
 940            return None
 941
 942        stop_data = self.daemon._read_stop_file()
 943        if not stop_data:
 944            return None
 945
 946        stop_time_str = stop_data.get('stop_time', None)
 947        if not stop_time_str:
 948            warn(f"Could not read stop time for {self}.")
 949            return None
 950
 951        return datetime.fromisoformat(stop_time_str)
 952
 953    @property
 954    def hidden(self) -> bool:
 955        """
 956        Return a bool indicating whether this job should be displayed.
 957        """
 958        return (
 959            self.name.startswith('_')
 960            or self.name.startswith('.')
 961            or self._is_externally_managed
 962        )
 963
 964    def check_restart(self) -> SuccessTuple:
 965        """
 966        If `restart` is `True` and the daemon is not running,
 967        restart the job.
 968        Do not restart if the job was manually stopped.
 969        """
 970        if self.is_running():
 971            return True, f"{self} is running."
 972
 973        if not self.restart:
 974            return True, f"{self} does not need to be restarted."
 975
 976        if self.stop_time is not None:
 977            return True, f"{self} was manually stopped."
 978
 979        return self.start()
 980
 981    @property
 982    def label(self) -> str:
 983        """
 984        Return the job's Daemon label (joined sysargs).
 985        """
 986        from meerschaum._internal.arguments import compress_pipeline_sysargs
 987        sysargs = compress_pipeline_sysargs(self.sysargs)
 988        return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip()
 989
 990    @property
 991    def _externally_managed_file(self) -> pathlib.Path:
 992        """
 993        Return the path to the externally managed file.
 994        """
 995        return self.daemon.path / '.externally-managed'
 996
 997    def _set_externally_managed(self):
 998        """
 999        Set this job as externally managed.
1000        """
1001        self._externally_managed = True
1002        try:
1003            self._externally_managed_file.parent.mkdir(exist_ok=True, parents=True)
1004            self._externally_managed_file.touch()
1005        except Exception as e:
1006            warn(e)
1007
1008    @property
1009    def _is_externally_managed(self) -> bool:
1010        """
1011        Return whether this job is externally managed.
1012        """
1013        return self.executor_keys in (None, 'local') and (
1014            self._externally_managed or self._externally_managed_file.exists()
1015        )
1016
1017    @property
1018    def env(self) -> Dict[str, str]:
1019        """
1020        Return the environment variables to set for the job's process.
1021        """
1022        if '_env' in self.__dict__:
1023            return self.__dict__['_env']
1024
1025        _env = self.daemon.properties.get('env', {})
1026        default_env = {
1027            'PYTHONUNBUFFERED': '1',
1028            'LINES': str(get_config('jobs', 'terminal', 'lines')),
1029            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
1030            STATIC_CONFIG['environment']['noninteractive']: 'true',
1031        }
1032        self._env = {**default_env, **_env}
1033        return self._env
1034
1035    @property
1036    def delete_after_completion(self) -> bool:
1037        """
1038        Return whether this job is configured to delete itself after completion.
1039        """
1040        if '_delete_after_completion' in self.__dict__:
1041            return self.__dict__.get('_delete_after_completion', False)
1042
1043        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
1044        return self._delete_after_completion
1045
1046    def __str__(self) -> str:
1047        sysargs = self.sysargs
1048        sysargs_str = shlex.join(sysargs) if sysargs else ''
1049        job_str = f'Job("{self.name}"'
1050        if sysargs_str:
1051            job_str += f', "{sysargs_str}"'
1052
1053        job_str += ')'
1054        return job_str
1055
1056    def __repr__(self) -> str:
1057        return str(self)
1058
1059    def __hash__(self) -> int:
1060        return hash(self.name)

Manage a meerschaum.utils.daemon.Daemon, locally or remotely via the API.

Job( name: str, sysargs: Union[List[str], str, NoneType] = None, env: Optional[Dict[str, str]] = None, executor_keys: Optional[str] = None, delete_after_completion: bool = False, refresh_seconds: Union[int, float, NoneType] = None, _properties: Optional[Dict[str, Any]] = None, _rotating_log=None, _stdin_file=None, _status_hook: Optional[Callable[[], str]] = None, _result_hook: Optional[Callable[[], Tuple[bool, str]]] = None, _externally_managed: bool = False)
 75    def __init__(
 76        self,
 77        name: str,
 78        sysargs: Union[List[str], str, None] = None,
 79        env: Optional[Dict[str, str]] = None,
 80        executor_keys: Optional[str] = None,
 81        delete_after_completion: bool = False,
 82        refresh_seconds: Union[int, float, None] = None,
 83        _properties: Optional[Dict[str, Any]] = None,
 84        _rotating_log=None,
 85        _stdin_file=None,
 86        _status_hook: Optional[Callable[[], str]] = None,
 87        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
 88        _externally_managed: bool = False,
 89    ):
 90        """
 91        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
 92
 93        Parameters
 94        ----------
 95        name: str
 96            The name of the job to be created.
 97            This will also be used as the Daemon ID.
 98
 99        sysargs: Union[List[str], str, None], default None
100            The sysargs of the command to be executed, e.g. 'start api'.
101
102        env: Optional[Dict[str, str]], default None
103            If provided, set these environment variables in the job's process.
104
105        executor_keys: Optional[str], default None
106            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
107
108        delete_after_completion: bool, default False
109            If `True`, delete this job when it has finished executing.
110
111        refresh_seconds: Union[int, float, None], default None
112            The number of seconds to sleep between refreshes.
113            Defaults to the configured value `system.cli.refresh_seconds`.
114
115        _properties: Optional[Dict[str, Any]], default None
116            If provided, use this to patch the daemon's properties.
117        """
118        from meerschaum.utils.daemon import Daemon
119        for char in BANNED_CHARS:
120            if char in name:
121                raise ValueError(f"Invalid name: ({char}) is not allowed.")
122
123        if isinstance(sysargs, str):
124            sysargs = shlex.split(sysargs)
125
126        and_key = STATIC_CONFIG['system']['arguments']['and_key']
127        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
128        if sysargs:
129            sysargs = [
130                (arg if arg != escaped_and_key else and_key)
131                for arg in sysargs
132            ]
133
134        ### NOTE: 'local' and 'systemd' executors are being coalesced.
135        if executor_keys is None:
136            from meerschaum.jobs import get_executor_keys_from_context
137            executor_keys = get_executor_keys_from_context()
138
139        self.executor_keys = executor_keys
140        self.name = name
141        self.refresh_seconds = (
142            refresh_seconds
143            if refresh_seconds is not None
144            else mrsm.get_config('system', 'cli', 'refresh_seconds')
145        )
146        try:
147            self._daemon = (
148                Daemon(daemon_id=name)
149                if executor_keys == 'local'
150                else None
151            )
152        except Exception:
153            self._daemon = None
154
155        ### Handle any injected dependencies.
156        if _rotating_log is not None:
157            self._rotating_log = _rotating_log
158            if self._daemon is not None:
159                self._daemon._rotating_log = _rotating_log
160
161        if _stdin_file is not None:
162            self._stdin_file = _stdin_file
163            if self._daemon is not None:
164                self._daemon._stdin_file = _stdin_file
165                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
166
167        if _status_hook is not None:
168            self._status_hook = _status_hook
169
170        if _result_hook is not None:
171            self._result_hook = _result_hook
172
173        self._externally_managed = _externally_managed
174        self._properties_patch = _properties or {}
175        if _externally_managed:
176            self._properties_patch.update({'externally_managed': _externally_managed})
177
178        if env:
179            self._properties_patch.update({'env': env})
180
181        if delete_after_completion:
182            self._properties_patch.update({'delete_after_completion': delete_after_completion})
183
184        daemon_sysargs = (
185            self._daemon.properties.get('target', {}).get('args', [None])[0]
186            if self._daemon is not None
187            else None
188        )
189
190        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
191            warn("Given sysargs differ from existing sysargs.")
192
193        self._sysargs = [
194            arg
195            for arg in (daemon_sysargs or sysargs or [])
196            if arg not in ('-d', '--daemon')
197        ]
198        for restart_flag in RESTART_FLAGS:
199            if restart_flag in self._sysargs:
200                self._properties_patch.update({'restart': True})
201                break

Create a new job to manage a meerschaum.utils.daemon.Daemon.

Parameters
  • name (str): The name of the job to be created. This will also be used as the Daemon ID.
  • sysargs (Union[List[str], str, None], default None): The sysargs of the command to be executed, e.g. 'start api'.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the job's process.
  • executor_keys (Optional[str], default None): If provided, execute the job remotely on an API instance, e.g. 'api:main'.
  • delete_after_completion (bool, default False): If True, delete this job when it has finished executing.
  • refresh_seconds (Union[int, float, None], default None): The number of seconds to sleep between refreshes. Defaults to the configured value system.cli.refresh_seconds.
  • _properties (Optional[Dict[str, Any]], default None): If provided, use this to patch the daemon's properties.
executor_keys
name
refresh_seconds
@staticmethod
def from_pid( pid: int, executor_keys: Optional[str] = None) -> Job:
203    @staticmethod
204    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
205        """
206        Build a `Job` from the PID of a running Meerschaum process.
207
208        Parameters
209        ----------
210        pid: int
211            The PID of the process.
212
213        executor_keys: Optional[str], default None
214            The executor keys to assign to the job.
215        """
216        psutil = mrsm.attempt_import('psutil')
217        try:
218            process = psutil.Process(pid)
219        except psutil.NoSuchProcess as e:
220            warn(f"Process with PID {pid} does not exist.", stack=False)
221            raise e
222
223        command_args = process.cmdline()
224        is_daemon = command_args[1] == '-c'
225
226        if is_daemon:
227            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
228            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
229            if root_dir is None:
230                root_dir = paths.ROOT_DIR_PATH
231            else:
232                root_dir = pathlib.Path(root_dir)
233            jobs_dir = root_dir / paths.DAEMON_RESOURCES_PATH.name
234            daemon_dir = jobs_dir / daemon_id
235            pid_file = daemon_dir / 'process.pid'
236
237            if pid_file.exists():
238                with open(pid_file, 'r', encoding='utf-8') as f:
239                    daemon_pid = int(f.read())
240
241                if pid != daemon_pid:
242                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
243            else:
244                raise EnvironmentError(f"Is job '{daemon_id}' running?")
245
246            return Job(daemon_id, executor_keys=executor_keys)
247
248        from meerschaum._internal.arguments._parse_arguments import parse_arguments
249        from meerschaum.utils.daemon import get_new_daemon_name
250
251        mrsm_ix = 0
252        for i, arg in enumerate(command_args):
253            if 'mrsm' in arg or 'meerschaum' in arg.lower():
254                mrsm_ix = i
255                break
256
257        sysargs = command_args[mrsm_ix+1:]
258        kwargs = parse_arguments(sysargs)
259        name = kwargs.get('name', get_new_daemon_name())
260        return Job(name, sysargs, executor_keys=executor_keys)

Build a Job from the PID of a running Meerschaum process.

Parameters
  • pid (int): The PID of the process.
  • executor_keys (Optional[str], default None): The executor keys to assign to the job.
def start(self, debug: bool = False) -> Tuple[bool, str]:
262    def start(self, debug: bool = False) -> SuccessTuple:
263        """
264        Start the job's daemon.
265        """
266        if self.executor is not None:
267            if not self.exists(debug=debug):
268                return self.executor.create_job(
269                    self.name,
270                    self.sysargs,
271                    properties=self.daemon.properties,
272                    debug=debug,
273                )
274            return self.executor.start_job(self.name, debug=debug)
275
276        if self.is_running():
277            return True, f"{self} is already running."
278
279        success, msg = self.daemon.run(
280            keep_daemon_output=(not self.delete_after_completion),
281            allow_dirty_run=True,
282        )
283        if not success:
284            return success, msg
285
286        return success, f"Started {self}."

Start the job's daemon.

def stop( self, timeout_seconds: Union[int, float, NoneType] = None, debug: bool = False) -> Tuple[bool, str]:
288    def stop(
289        self,
290        timeout_seconds: Union[int, float, None] = None,
291        debug: bool = False,
292    ) -> SuccessTuple:
293        """
294        Stop the job's daemon.
295        """
296        if self.executor is not None:
297            return self.executor.stop_job(self.name, debug=debug)
298
299        if self.daemon.status == 'stopped':
300            if not self.restart:
301                return True, f"{self} is not running."
302            elif self.stop_time is not None:
303                return True, f"{self} will not restart until manually started."
304
305        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
306        if quit_success:
307            return quit_success, f"Stopped {self}."
308
309        warn(
310            f"Failed to gracefully quit {self}.",
311            stack=False,
312        )
313        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
314        if not kill_success:
315            return kill_success, kill_msg
316
317        return kill_success, f"Killed {self}."

Stop the job's daemon.

def pause( self, timeout_seconds: Union[int, float, NoneType] = None, debug: bool = False) -> Tuple[bool, str]:
319    def pause(
320        self,
321        timeout_seconds: Union[int, float, None] = None,
322        debug: bool = False,
323    ) -> SuccessTuple:
324        """
325        Pause the job's daemon.
326        """
327        if self.executor is not None:
328            return self.executor.pause_job(self.name, debug=debug)
329
330        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
331        if not pause_success:
332            return pause_success, pause_msg
333
334        return pause_success, f"Paused {self}."

Pause the job's daemon.

def delete(self, debug: bool = False) -> Tuple[bool, str]:
336    def delete(self, debug: bool = False) -> SuccessTuple:
337        """
338        Delete the job and its daemon.
339        """
340        if self.executor is not None:
341            return self.executor.delete_job(self.name, debug=debug)
342
343        if self.is_running():
344            stop_success, stop_msg = self.stop()
345            if not stop_success:
346                return stop_success, stop_msg
347
348        cleanup_success, cleanup_msg = self.daemon.cleanup()
349        if not cleanup_success:
350            return cleanup_success, cleanup_msg
351
352        _ = self.daemon._properties.pop('result', None)
353        return cleanup_success, f"Deleted {self}."

Delete the job and its daemon.

def is_running(self) -> bool:
355    def is_running(self) -> bool:
356        """
357        Determine whether the job's daemon is running.
358        """
359        return self.status == 'running'

Determine whether the job's daemon is running.

def exists(self, debug: bool = False) -> bool:
361    def exists(self, debug: bool = False) -> bool:
362        """
363        Determine whether the job exists.
364        """
365        if self.executor is not None:
366            return self.executor.get_job_exists(self.name, debug=debug)
367
368        return self.daemon.path.exists()

Determine whether the job exists.

def get_logs(self) -> Optional[str]:
370    def get_logs(self) -> Union[str, None]:
371        """
372        Return the output text of the job's daemon.
373        """
374        if self.executor is not None:
375            return self.executor.get_logs(self.name)
376
377        return self.daemon.log_text

Return the output text of the job's daemon.

def monitor_logs( self, callback_function: Callable[[str], NoneType] = <function _default_stdout_callback>, input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False, _logs_path: Optional[pathlib.Path] = None, _log=None, _stdin_file=None, _wait_if_stopped: bool = True):
379    def monitor_logs(
380        self,
381        callback_function: Callable[[str], None] = _default_stdout_callback,
382        input_callback_function: Optional[Callable[[], str]] = None,
383        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
384        stop_event: Optional[asyncio.Event] = None,
385        stop_on_exit: bool = False,
386        strip_timestamps: bool = False,
387        accept_input: bool = True,
388        debug: bool = False,
389        _logs_path: Optional[pathlib.Path] = None,
390        _log=None,
391        _stdin_file=None,
392        _wait_if_stopped: bool = True,
393    ):
394        """
395        Monitor the job's log files and execute a callback on new lines.
396
397        Parameters
398        ----------
399        callback_function: Callable[[str], None], default partial(print, end='')
400            The callback to execute as new data comes in.
401            Defaults to printing the output directly to `stdout`.
402
403        input_callback_function: Optional[Callable[[], str]], default None
404            If provided, execute this callback when the daemon is blocking on stdin.
405            Defaults to `sys.stdin.readline()`.
406
407        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
408            If provided, execute this callback when the daemon stops.
409            The job's SuccessTuple will be passed to the callback.
410
411        stop_event: Optional[asyncio.Event], default None
412            If provided, stop monitoring when this event is set.
413            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
414            from within `callback_function` to stop monitoring.
415
416        stop_on_exit: bool, default False
417            If `True`, stop monitoring when the job stops.
418
419        strip_timestamps: bool, default False
420            If `True`, remove leading timestamps from lines.
421
422        accept_input: bool, default True
423            If `True`, accept input when the daemon blocks on stdin.
424        """
425        if self.executor is not None:
426            self.executor.monitor_logs(
427                self.name,
428                callback_function,
429                input_callback_function=input_callback_function,
430                stop_callback_function=stop_callback_function,
431                stop_on_exit=stop_on_exit,
432                accept_input=accept_input,
433                strip_timestamps=strip_timestamps,
434                debug=debug,
435            )
436            return
437
438        monitor_logs_coroutine = self.monitor_logs_async(
439            callback_function=callback_function,
440            input_callback_function=input_callback_function,
441            stop_callback_function=stop_callback_function,
442            stop_event=stop_event,
443            stop_on_exit=stop_on_exit,
444            strip_timestamps=strip_timestamps,
445            accept_input=accept_input,
446            debug=debug,
447            _logs_path=_logs_path,
448            _log=_log,
449            _stdin_file=_stdin_file,
450            _wait_if_stopped=_wait_if_stopped,
451        )
452        return asyncio.run(monitor_logs_coroutine)

Monitor the job's log files and execute a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default partial(print, end='')): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
async def monitor_logs_async( self, callback_function: Callable[[str], NoneType] = <function _default_stdout_callback>, input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False, _logs_path: Optional[pathlib.Path] = None, _log=None, _stdin_file=None, _wait_if_stopped: bool = True):
454    async def monitor_logs_async(
455        self,
456        callback_function: Callable[[str], None] = _default_stdout_callback,
457        input_callback_function: Optional[Callable[[], str]] = None,
458        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
459        stop_event: Optional[asyncio.Event] = None,
460        stop_on_exit: bool = False,
461        strip_timestamps: bool = False,
462        accept_input: bool = True,
463        debug: bool = False,
464        _logs_path: Optional[pathlib.Path] = None,
465        _log=None,
466        _stdin_file=None,
467        _wait_if_stopped: bool = True,
468    ):
469        """
470        Monitor the job's log files and await a callback on new lines.
471
472        Parameters
473        ----------
474        callback_function: Callable[[str], None], default _default_stdout_callback
475            The callback to execute as new data comes in.
476            Defaults to printing the output directly to `stdout`.
477
478        input_callback_function: Optional[Callable[[], str]], default None
479            If provided, execute this callback when the daemon is blocking on stdin.
480            Defaults to `sys.stdin.readline()`.
481
482        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
483            If provided, execute this callback when the daemon stops.
484            The job's SuccessTuple will be passed to the callback.
485
486        stop_event: Optional[asyncio.Event], default None
487            If provided, stop monitoring when this event is set.
488            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
489            from within `callback_function` to stop monitoring.
490
491        stop_on_exit: bool, default False
492            If `True`, stop monitoring when the job stops.
493
494        strip_timestamps: bool, default False
495            If `True`, remove leading timestamps from lines.
496
497        accept_input: bool, default True
498            If `True`, accept input when the daemon blocks on stdin.
499        """
500        from meerschaum.utils.prompt import prompt
501
502        def default_input_callback_function():
503            prompt_kwargs = self.get_prompt_kwargs(debug=debug)
504            if prompt_kwargs:
505                answer = prompt(**prompt_kwargs)
506                return answer + '\n'
507            return sys.stdin.readline()
508
509        if input_callback_function is None:
510            input_callback_function = default_input_callback_function
511
512        if self.executor is not None:
513            await self.executor.monitor_logs_async(
514                self.name,
515                callback_function,
516                input_callback_function=input_callback_function,
517                stop_callback_function=stop_callback_function,
518                stop_on_exit=stop_on_exit,
519                strip_timestamps=strip_timestamps,
520                accept_input=accept_input,
521                debug=debug,
522            )
523            return
524
525        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
526
527        events = {
528            'user': stop_event,
529            'stopped': asyncio.Event(),
530            'stop_token': asyncio.Event(),
531            'stop_exception': asyncio.Event(),
532            'stopped_timeout': asyncio.Event(),
533        }
534        combined_event = asyncio.Event()
535        emitted_text = False
536        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
537
538        async def check_job_status():
539            if not stop_on_exit:
540                return
541
542            nonlocal emitted_text
543
544            sleep_time = 0.1
545            while sleep_time < 0.2:
546                if self.status == 'stopped':
547                    if not emitted_text and _wait_if_stopped:
548                        await asyncio.sleep(sleep_time)
549                        sleep_time = round(sleep_time * 1.1, 3)
550                        continue
551
552                    if stop_callback_function is not None:
553                        try:
554                            if asyncio.iscoroutinefunction(stop_callback_function):
555                                await stop_callback_function(self.result)
556                            else:
557                                stop_callback_function(self.result)
558                        except asyncio.exceptions.CancelledError:
559                            break
560                        except Exception:
561                            warn(traceback.format_exc())
562
563                    if stop_on_exit:
564                        events['stopped'].set()
565
566                    break
567                await asyncio.sleep(0.1)
568
569            events['stopped_timeout'].set()
570
571        async def check_blocking_on_input():
572            while True:
573                if not emitted_text or not self.is_blocking_on_stdin():
574                    try:
575                        await asyncio.sleep(self.refresh_seconds)
576                    except asyncio.exceptions.CancelledError:
577                        break
578                    continue
579
580                if not self.is_running():
581                    break
582
583                await emit_latest_lines()
584
585                try:
586                    print('', end='', flush=True)
587                    if asyncio.iscoroutinefunction(input_callback_function):
588                        data = await input_callback_function()
589                    else:
590                        loop = asyncio.get_running_loop()
591                        data = await loop.run_in_executor(None, input_callback_function)
592                except KeyboardInterrupt:
593                    break
594                #  if not data.endswith('\n'):
595                    #  data += '\n'
596
597                stdin_file.write(data)
598                await asyncio.sleep(self.refresh_seconds)
599
600        async def combine_events():
601            event_tasks = [
602                asyncio.create_task(event.wait())
603                for event in events.values()
604                if event is not None
605            ]
606            if not event_tasks:
607                return
608
609            try:
610                done, pending = await asyncio.wait(
611                    event_tasks,
612                    return_when=asyncio.FIRST_COMPLETED,
613                )
614                for task in pending:
615                    task.cancel()
616            except asyncio.exceptions.CancelledError:
617                pass
618            finally:
619                combined_event.set()
620
621        check_job_status_task = asyncio.create_task(check_job_status())
622        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
623        combine_events_task = asyncio.create_task(combine_events())
624
625        log = _log if _log is not None else self.daemon.rotating_log
626        lines_to_show = (
627            self.daemon.properties.get(
628                'logs', {}
629            ).get(
630                'lines_to_show', get_config('jobs', 'logs', 'lines_to_show')
631            )
632        )
633
634        async def emit_latest_lines():
635            nonlocal emitted_text
636            nonlocal stop_event
637            lines = log.readlines()
638            for line in lines[(-1 * lines_to_show):]:
639                if stop_event is not None and stop_event.is_set():
640                    return
641
642                line_stripped_extra = strip_timestamp_from_line(line.strip())
643                line_stripped = strip_timestamp_from_line(line)
644
645                if line_stripped_extra == STOP_TOKEN:
646                    events['stop_token'].set()
647                    return
648
649                if line_stripped_extra == CLEAR_TOKEN:
650                    clear_screen(debug=debug)
651                    continue
652
653                if line_stripped_extra == FLUSH_TOKEN.strip():
654                    line_stripped = ''
655                    line = ''
656
657                if strip_timestamps:
658                    line = line_stripped
659
660                try:
661                    if asyncio.iscoroutinefunction(callback_function):
662                        await callback_function(line)
663                    else:
664                        callback_function(line)
665                    emitted_text = True
666                except StopMonitoringLogs:
667                    events['stop_exception'].set()
668                    return
669                except Exception:
670                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
671
672        await emit_latest_lines()
673
674        tasks = (
675            [check_job_status_task]
676            + ([check_blocking_on_input_task] if accept_input else [])
677            + [combine_events_task]
678        )
679        try:
680            _ = asyncio.gather(*tasks, return_exceptions=True)
681        except asyncio.exceptions.CancelledError:
682            raise
683        except Exception:
684            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
685
686        watchfiles = mrsm.attempt_import('watchfiles', lazy=False)
687        dir_path_to_monitor = (
688            _logs_path
689            or (log.file_path.parent if log else None)
690            or paths.LOGS_RESOURCES_PATH
691        )
692        async for changes in watchfiles.awatch(
693            dir_path_to_monitor,
694            stop_event=combined_event,
695        ):
696            for change in changes:
697                file_path_str = change[1]
698                file_path = pathlib.Path(file_path_str)
699                latest_subfile_path = log.get_latest_subfile_path()
700                if latest_subfile_path != file_path:
701                    continue
702
703                await emit_latest_lines()
704
705        await emit_latest_lines()

Monitor the job's log files and await a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default _default_stdout_callback): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
def is_blocking_on_stdin(self, debug: bool = False) -> bool:
707    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
708        """
709        Return whether a job's daemon is blocking on stdin.
710        """
711        if self.executor is not None:
712            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
713
714        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()

Return whether a job's daemon is blocking on stdin.

def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]:
716    def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]:
717        """
718        Return the kwargs to the blocking `prompt()`, if available.
719        """
720        if self.executor is not None:
721            return self.executor.get_job_prompt_kwargs(self.name, debug=debug)
722
723        if not self.daemon.prompt_kwargs_file_path.exists():
724            return {}
725
726        try:
727            with open(self.daemon.prompt_kwargs_file_path, 'r', encoding='utf-8') as f:
728                prompt_kwargs = json.load(f)
729
730            return prompt_kwargs
731        
732        except Exception:
733            import traceback
734            traceback.print_exc()
735            return {}

Return the kwargs to the blocking prompt(), if available.

def write_stdin(self, data):
737    def write_stdin(self, data):
738        """
739        Write to a job's daemon's `stdin`.
740        """
741        self.daemon.stdin_file.write(data)

Write to a job's daemon's stdin.

executor: Optional[Executor]
743    @property
744    def executor(self) -> Union[Executor, None]:
745        """
746        If the job is remote, return the connector to the remote API instance.
747        """
748        return (
749            mrsm.get_connector(self.executor_keys)
750            if self.executor_keys != 'local'
751            else None
752        )

If the job is remote, return the connector to the remote API instance.

status: str
754    @property
755    def status(self) -> str:
756        """
757        Return the running status of the job's daemon.
758        """
759        if '_status_hook' in self.__dict__:
760            return self._status_hook()
761
762        if self.executor is not None:
763            return self.executor.get_job_status(self.name)
764
765        return self.daemon.status

Return the running status of the job's daemon.

pid: Optional[int]
767    @property
768    def pid(self) -> Union[int, None]:
769        """
770        Return the PID of the job's dameon.
771        """
772        if self.executor is not None:
773            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
774
775        return self.daemon.pid

Return the PID of the job's dameon.

restart: bool
777    @property
778    def restart(self) -> bool:
779        """
780        Return whether to restart a stopped job.
781        """
782        if self.executor is not None:
783            return self.executor.get_job_metadata(self.name).get('restart', False)
784
785        return self.daemon.properties.get('restart', False)

Return whether to restart a stopped job.

result: Tuple[bool, str]
787    @property
788    def result(self) -> SuccessTuple:
789        """
790        Return the `SuccessTuple` when the job has terminated.
791        """
792        if self.is_running():
793            return True, f"{self} is running."
794
795        if '_result_hook' in self.__dict__:
796            return self._result_hook()
797
798        if self.executor is not None:
799            return (
800                self.executor.get_job_metadata(self.name)
801                .get('result', (False, "No result available."))
802            )
803
804        _result = self.daemon.properties.get('result', None)
805        if _result is None:
806            from meerschaum.utils.daemon.Daemon import _results
807            return _results.get(self.daemon.daemon_id, (False, "No result available."))
808
809        return tuple(_result)

Return the SuccessTuple when the job has terminated.

sysargs: List[str]
811    @property
812    def sysargs(self) -> List[str]:
813        """
814        Return the sysargs to use for the Daemon.
815        """
816        if self._sysargs:
817            return self._sysargs
818
819        if self.executor is not None:
820            return self.executor.get_job_metadata(self.name).get('sysargs', [])
821
822        target_args = self.daemon.target_args
823        if target_args is None:
824            return []
825        self._sysargs = target_args[0] if len(target_args) > 0 else []
826        return self._sysargs

Return the sysargs to use for the Daemon.

def get_daemon_properties(self) -> Dict[str, Any]:
828    def get_daemon_properties(self) -> Dict[str, Any]:
829        """
830        Return the `properties` dictionary for the job's daemon.
831        """
832        remote_properties = (
833            {}
834            if self.executor is None
835            else self.executor.get_job_properties(self.name)
836        )
837        return {
838            **remote_properties,
839            **self._properties_patch
840        }

Return the properties dictionary for the job's daemon.

daemon: "'Daemon'"
842    @property
843    def daemon(self) -> 'Daemon':
844        """
845        Return the daemon which this job manages.
846        """
847        from meerschaum.utils.daemon import Daemon
848        if self._daemon is not None and self.executor is None and self._sysargs:
849            return self._daemon
850
851        self._daemon = Daemon(
852            target=entry,
853            target_args=[self._sysargs],
854            target_kw={},
855            daemon_id=self.name,
856            label=shlex.join(self._sysargs),
857            properties=self.get_daemon_properties(),
858        )
859        if '_rotating_log' in self.__dict__:
860            self._daemon._rotating_log = self._rotating_log
861
862        if '_stdin_file' in self.__dict__:
863            self._daemon._stdin_file = self._stdin_file
864            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
865
866        return self._daemon

Return the daemon which this job manages.

began: Optional[datetime.datetime]
868    @property
869    def began(self) -> Union[datetime, None]:
870        """
871        The datetime when the job began running.
872        """
873        if self.executor is not None:
874            began_str = self.executor.get_job_began(self.name)
875            if began_str is None:
876                return None
877            return (
878                datetime.fromisoformat(began_str)
879                .astimezone(timezone.utc)
880                .replace(tzinfo=None)
881            )
882
883        began_str = self.daemon.properties.get('process', {}).get('began', None)
884        if began_str is None:
885            return None
886
887        return datetime.fromisoformat(began_str)

The datetime when the job began running.

ended: Optional[datetime.datetime]
889    @property
890    def ended(self) -> Union[datetime, None]:
891        """
892        The datetime when the job stopped running.
893        """
894        if self.executor is not None:
895            ended_str = self.executor.get_job_ended(self.name)
896            if ended_str is None:
897                return None
898            return (
899                datetime.fromisoformat(ended_str)
900                .astimezone(timezone.utc)
901                .replace(tzinfo=None)
902            )
903
904        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
905        if ended_str is None:
906            return None
907
908        return datetime.fromisoformat(ended_str)

The datetime when the job stopped running.

paused: Optional[datetime.datetime]
910    @property
911    def paused(self) -> Union[datetime, None]:
912        """
913        The datetime when the job was suspended while running.
914        """
915        if self.executor is not None:
916            paused_str = self.executor.get_job_paused(self.name)
917            if paused_str is None:
918                return None
919            return (
920                datetime.fromisoformat(paused_str)
921                .astimezone(timezone.utc)
922                .replace(tzinfo=None)
923            )
924
925        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
926        if paused_str is None:
927            return None
928
929        return datetime.fromisoformat(paused_str)

The datetime when the job was suspended while running.

stop_time: Optional[datetime.datetime]
931    @property
932    def stop_time(self) -> Union[datetime, None]:
933        """
934        Return the timestamp when the job was manually stopped.
935        """
936        if self.executor is not None:
937            return self.executor.get_job_stop_time(self.name)
938
939        if not self.daemon.stop_path.exists():
940            return None
941
942        stop_data = self.daemon._read_stop_file()
943        if not stop_data:
944            return None
945
946        stop_time_str = stop_data.get('stop_time', None)
947        if not stop_time_str:
948            warn(f"Could not read stop time for {self}.")
949            return None
950
951        return datetime.fromisoformat(stop_time_str)

Return the timestamp when the job was manually stopped.

hidden: bool
953    @property
954    def hidden(self) -> bool:
955        """
956        Return a bool indicating whether this job should be displayed.
957        """
958        return (
959            self.name.startswith('_')
960            or self.name.startswith('.')
961            or self._is_externally_managed
962        )

Return a bool indicating whether this job should be displayed.

def check_restart(self) -> Tuple[bool, str]:
964    def check_restart(self) -> SuccessTuple:
965        """
966        If `restart` is `True` and the daemon is not running,
967        restart the job.
968        Do not restart if the job was manually stopped.
969        """
970        if self.is_running():
971            return True, f"{self} is running."
972
973        if not self.restart:
974            return True, f"{self} does not need to be restarted."
975
976        if self.stop_time is not None:
977            return True, f"{self} was manually stopped."
978
979        return self.start()

If restart is True and the daemon is not running, restart the job. Do not restart if the job was manually stopped.

label: str
981    @property
982    def label(self) -> str:
983        """
984        Return the job's Daemon label (joined sysargs).
985        """
986        from meerschaum._internal.arguments import compress_pipeline_sysargs
987        sysargs = compress_pipeline_sysargs(self.sysargs)
988        return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip()

Return the job's Daemon label (joined sysargs).

env: Dict[str, str]
1017    @property
1018    def env(self) -> Dict[str, str]:
1019        """
1020        Return the environment variables to set for the job's process.
1021        """
1022        if '_env' in self.__dict__:
1023            return self.__dict__['_env']
1024
1025        _env = self.daemon.properties.get('env', {})
1026        default_env = {
1027            'PYTHONUNBUFFERED': '1',
1028            'LINES': str(get_config('jobs', 'terminal', 'lines')),
1029            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
1030            STATIC_CONFIG['environment']['noninteractive']: 'true',
1031        }
1032        self._env = {**default_env, **_env}
1033        return self._env

Return the environment variables to set for the job's process.

delete_after_completion: bool
1035    @property
1036    def delete_after_completion(self) -> bool:
1037        """
1038        Return whether this job is configured to delete itself after completion.
1039        """
1040        if '_delete_after_completion' in self.__dict__:
1041            return self.__dict__.get('_delete_after_completion', False)
1042
1043        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
1044        return self._delete_after_completion

Return whether this job is configured to delete itself after completion.

class StopMonitoringLogs(builtins.Exception):
49class StopMonitoringLogs(Exception):
50    """
51    Raise this exception to stop the logs monitoring.
52    """

Raise this exception to stop the logs monitoring.

def get_jobs( executor_keys: Optional[str] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
 37def get_jobs(
 38    executor_keys: Optional[str] = None,
 39    include_hidden: bool = False,
 40    combine_local_and_systemd: bool = True,
 41    debug: bool = False,
 42) -> Dict[str, Job]:
 43    """
 44    Return a dictionary of the existing jobs.
 45
 46    Parameters
 47    ----------
 48    executor_keys: Optional[str], default None
 49        If provided, return remote jobs on the given API instance.
 50        Otherwise return local jobs.
 51
 52    include_hidden: bool, default False
 53        If `True`, include jobs with the `hidden` attribute.
 54
 55    Returns
 56    -------
 57    A dictionary mapping job names to jobs.
 58    """
 59    from meerschaum.connectors.parse import parse_executor_keys
 60    executor_keys = executor_keys or get_executor_keys_from_context()
 61
 62    include_local_and_system = (
 63        combine_local_and_systemd
 64        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 65        and get_executor_keys_from_context() == 'systemd'
 66    )
 67
 68    def _get_local_jobs():
 69        from meerschaum.utils.daemon import get_daemons
 70        daemons = get_daemons()
 71        jobs = {
 72            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 73            for daemon in daemons
 74        }
 75        return {
 76            name: job
 77            for name, job in jobs.items()
 78            if (include_hidden or not job.hidden) and not job._is_externally_managed
 79
 80        }
 81
 82    def _get_systemd_jobs():
 83        from meerschaum.jobs.systemd import SystemdExecutor
 84        conn = SystemdExecutor('systemd')
 85        jobs = conn.get_jobs(debug=debug)
 86        return {
 87            name: job
 88            for name, job in jobs.items()
 89            if include_hidden or not job.hidden
 90        }
 91
 92    if include_local_and_system:
 93        local_jobs = _get_local_jobs()
 94        systemd_jobs = _get_systemd_jobs()
 95        shared_jobs = set(local_jobs) & set(systemd_jobs)
 96        if shared_jobs:
 97            from meerschaum.utils.misc import items_str
 98            from meerschaum.utils.warnings import warn
 99            warn(
100                "Job"
101                + ('s' if len(shared_jobs) != 1 else '')
102                + f" {items_str(list(shared_jobs))} "
103                + "exist"
104                + ('s' if len(shared_jobs) == 1 else '')
105                + " in both `local` and `systemd`.",
106                stack=False,
107            )
108        return {**local_jobs, **systemd_jobs}
109
110    if executor_keys == 'local':
111        return _get_local_jobs()
112
113    if executor_keys == 'systemd':
114        return _get_systemd_jobs()
115
116    try:
117        _ = parse_executor_keys(executor_keys, construct=False)
118        conn = parse_executor_keys(executor_keys)
119        jobs = conn.get_jobs(debug=debug)
120        return {
121            name: job
122            for name, job in jobs.items()
123            if include_hidden or not job.hidden
124        }
125    except Exception:
126        return {}

Return a dictionary of the existing jobs.

Parameters
  • executor_keys (Optional[str], default None): If provided, return remote jobs on the given API instance. Otherwise return local jobs.
  • include_hidden (bool, default False): If True, include jobs with the hidden attribute.
Returns
  • A dictionary mapping job names to jobs.
def get_filtered_jobs( executor_keys: Optional[str] = None, filter_list: Optional[List[str]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, warn: bool = False, debug: bool = False) -> Dict[str, Job]:
129def get_filtered_jobs(
130    executor_keys: Optional[str] = None,
131    filter_list: Optional[List[str]] = None,
132    include_hidden: bool = False,
133    combine_local_and_systemd: bool = True,
134    warn: bool = False,
135    debug: bool = False,
136) -> Dict[str, Job]:
137    """
138    Return a list of jobs filtered by the user.
139    """
140    from meerschaum.utils.warnings import warn as _warn
141    jobs = get_jobs(
142        executor_keys,
143        include_hidden=True,
144        combine_local_and_systemd=combine_local_and_systemd,
145        debug=debug,
146    )
147    if not filter_list:
148        return {
149            name: job
150            for name, job in jobs.items()
151            if include_hidden or not job.hidden
152        }
153
154    jobs_to_return = {}
155    filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')]
156    filter_list_with_underscores = [name for name in filter_list if name.startswith('_')]
157    if (
158        filter_list_without_underscores and not filter_list_with_underscores
159        or filter_list_with_underscores and not filter_list_without_underscores
160    ):
161        pass
162    for name in filter_list:
163        job = jobs.get(name, None)
164        if job is None:
165            if warn:
166                _warn(
167                    f"Job '{name}' does not exist.",
168                    stack=False,
169                )
170            continue
171        jobs_to_return[name] = job
172
173    if not jobs_to_return and filter_list_with_underscores:
174        names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores]
175        return {
176            name: job
177            for name, job in jobs.items()
178            if name not in names_to_exclude
179        }
180
181    return jobs_to_return

Return a list of jobs filtered by the user.

def get_restart_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
184def get_restart_jobs(
185    executor_keys: Optional[str] = None,
186    jobs: Optional[Dict[str, Job]] = None,
187    include_hidden: bool = False,
188    combine_local_and_systemd: bool = True,
189    debug: bool = False,
190) -> Dict[str, Job]:
191    """
192    Return jobs which were created with `--restart` or `--loop`.
193    """
194    if jobs is None:
195        jobs = get_jobs(
196            executor_keys,
197            include_hidden=include_hidden,
198            combine_local_and_systemd=combine_local_and_systemd,
199            debug=debug,
200        )
201
202    return {
203        name: job
204        for name, job in jobs.items()
205        if job.restart
206    }

Return jobs which were created with --restart or --loop.

def get_running_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
209def get_running_jobs(
210    executor_keys: Optional[str] = None,
211    jobs: Optional[Dict[str, Job]] = None,
212    include_hidden: bool = False,
213    combine_local_and_systemd: bool = True,
214    debug: bool = False,
215) -> Dict[str, Job]:
216    """
217    Return a dictionary of running jobs.
218    """
219    if jobs is None:
220        jobs = get_jobs(
221            executor_keys,
222            include_hidden=include_hidden,
223            combine_local_and_systemd=combine_local_and_systemd,
224            debug=debug,
225        )
226
227    return {
228        name: job
229        for name, job in jobs.items()
230        if job.status == 'running'
231    }

Return a dictionary of running jobs.

def get_stopped_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
259def get_stopped_jobs(
260    executor_keys: Optional[str] = None,
261    jobs: Optional[Dict[str, Job]] = None,
262    include_hidden: bool = False,
263    combine_local_and_systemd: bool = True,
264    debug: bool = False,
265) -> Dict[str, Job]:
266    """
267    Return a dictionary of stopped jobs.
268    """
269    if jobs is None:
270        jobs = get_jobs(
271            executor_keys,
272            include_hidden=include_hidden,
273            combine_local_and_systemd=combine_local_and_systemd,
274            debug=debug,
275        )
276
277    return {
278        name: job
279        for name, job in jobs.items()
280        if job.status == 'stopped'
281    }

Return a dictionary of stopped jobs.

def get_paused_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
234def get_paused_jobs(
235    executor_keys: Optional[str] = None,
236    jobs: Optional[Dict[str, Job]] = None,
237    include_hidden: bool = False,
238    combine_local_and_systemd: bool = True,
239    debug: bool = False,
240) -> Dict[str, Job]:
241    """
242    Return a dictionary of paused jobs.
243    """
244    if jobs is None:
245        jobs = get_jobs(
246            executor_keys,
247            include_hidden=include_hidden,
248            combine_local_and_systemd=combine_local_and_systemd,
249            debug=debug,
250        )
251
252    return {
253        name: job
254        for name, job in jobs.items()
255        if job.status == 'paused'
256    }

Return a dictionary of paused jobs.

def make_executor(cls):
284def make_executor(cls):
285    """
286    Register a class as an `Executor`.
287    """
288    import re
289    from meerschaum.connectors import make_connector
290    suffix_regex = r'executor$'
291    typ = re.sub(suffix_regex, '', cls.__name__.lower())
292    if typ not in executor_types:
293        executor_types.append(typ)
294    return make_connector(cls, _is_executor=True)

Register a class as an Executor.

class Executor(meerschaum.connectors._Connector.Connector):
 22class Executor(Connector):
 23    """
 24    Define the methods for managing jobs.
 25    """
 26
 27    @abstractmethod
 28    def get_job_names(self, debug: bool = False) -> List[str]:
 29        """
 30        Return a list of existing jobs, including hidden ones.
 31        """
 32
 33    @abstractmethod
 34    def get_job_exists(self, name: str, debug: bool = False) -> bool:
 35        """
 36        Return whether a job exists.
 37        """
 38
 39    @abstractmethod
 40    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
 41        """
 42        Return a dictionary of existing jobs.
 43        """
 44
 45    @abstractmethod
 46    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 47        """
 48        Return a job's metadata.
 49        """
 50
 51    @abstractmethod
 52    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
 53        """
 54        Return the underlying daemon's properties.
 55        """
 56    @abstractmethod
 57    def get_job_status(self, name: str, debug: bool = False) -> str:
 58        """
 59        Return the job's status.
 60        """
 61
 62    @abstractmethod
 63    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
 64        """
 65        Return when a job began running.
 66        """
 67
 68    @abstractmethod
 69    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
 70        """
 71        Return when a job stopped running.
 72        """
 73
 74    @abstractmethod
 75    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
 76        """
 77        Return a job's `paused` timestamp, if it exists.
 78        """
 79    
 80    @abstractmethod
 81    def create_job(
 82        self,
 83        name: str,
 84        sysargs: List[str],
 85        properties: Optional[Dict[str, Any]] = None,
 86        debug: bool = False,
 87    ) -> SuccessTuple:
 88        """
 89        Create a new job.
 90        """
 91
 92    @abstractmethod
 93    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
 94        """
 95        Start a job.
 96        """
 97
 98    @abstractmethod
 99    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
100        """
101        Stop a job.
102        """
103
104    @abstractmethod
105    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
106        """
107        Pause a job.
108        """
109
110    @abstractmethod
111    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
112        """
113        Delete a job.
114        """
115
116    @abstractmethod
117    def get_logs(self, name: str, debug: bool = False) -> str:
118        """
119        Return a job's log output.
120        """
121
122    @abstractmethod
123    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
124        """
125        Return the job's manual stop time.
126        """
127
128    @abstractmethod
129    async def monitor_logs_async(
130        self,
131        name: str,
132        callback_function: Callable[[Any], Any],
133        input_callback_function: Callable[[], str],
134        stop_callback_function: Callable[[SuccessTuple], str],
135        stop_on_exit: bool = False,
136        strip_timestamps: bool = False,
137        accept_input: bool = True,
138        debug: bool = False,
139    ):
140        """
141        Monitor a job's log files and await a callback with the changes.
142        """
143
144    @abstractmethod
145    def monitor_logs(self, *args, **kwargs):
146        """
147        Monitor a job's log files.
148        """
149
150    @abstractmethod
151    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
152        """
153        Return whether a job is blocking on stdin.
154        """
155
156    @abstractmethod
157    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
158        """
159        Return the kwargs to the blocking prompt.
160        """

Define the methods for managing jobs.

@abstractmethod
def get_job_names(self, debug: bool = False) -> List[str]:
27    @abstractmethod
28    def get_job_names(self, debug: bool = False) -> List[str]:
29        """
30        Return a list of existing jobs, including hidden ones.
31        """

Return a list of existing jobs, including hidden ones.

@abstractmethod
def get_job_exists(self, name: str, debug: bool = False) -> bool:
33    @abstractmethod
34    def get_job_exists(self, name: str, debug: bool = False) -> bool:
35        """
36        Return whether a job exists.
37        """

Return whether a job exists.

@abstractmethod
def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
39    @abstractmethod
40    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
41        """
42        Return a dictionary of existing jobs.
43        """

Return a dictionary of existing jobs.

@abstractmethod
def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
45    @abstractmethod
46    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
47        """
48        Return a job's metadata.
49        """

Return a job's metadata.

@abstractmethod
def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
51    @abstractmethod
52    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
53        """
54        Return the underlying daemon's properties.
55        """

Return the underlying daemon's properties.

@abstractmethod
def get_job_status(self, name: str, debug: bool = False) -> str:
56    @abstractmethod
57    def get_job_status(self, name: str, debug: bool = False) -> str:
58        """
59        Return the job's status.
60        """

Return the job's status.

@abstractmethod
def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
62    @abstractmethod
63    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
64        """
65        Return when a job began running.
66        """

Return when a job began running.

@abstractmethod
def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
68    @abstractmethod
69    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
70        """
71        Return when a job stopped running.
72        """

Return when a job stopped running.

@abstractmethod
def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
74    @abstractmethod
75    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
76        """
77        Return a job's `paused` timestamp, if it exists.
78        """

Return a job's paused timestamp, if it exists.

@abstractmethod
def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
80    @abstractmethod
81    def create_job(
82        self,
83        name: str,
84        sysargs: List[str],
85        properties: Optional[Dict[str, Any]] = None,
86        debug: bool = False,
87    ) -> SuccessTuple:
88        """
89        Create a new job.
90        """

Create a new job.

@abstractmethod
def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
92    @abstractmethod
93    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
94        """
95        Start a job.
96        """

Start a job.

@abstractmethod
def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
 98    @abstractmethod
 99    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
100        """
101        Stop a job.
102        """

Stop a job.

@abstractmethod
def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
104    @abstractmethod
105    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
106        """
107        Pause a job.
108        """

Pause a job.

@abstractmethod
def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
110    @abstractmethod
111    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
112        """
113        Delete a job.
114        """

Delete a job.

@abstractmethod
def get_logs(self, name: str, debug: bool = False) -> str:
116    @abstractmethod
117    def get_logs(self, name: str, debug: bool = False) -> str:
118        """
119        Return a job's log output.
120        """

Return a job's log output.

@abstractmethod
def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
122    @abstractmethod
123    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
124        """
125        Return the job's manual stop time.
126        """

Return the job's manual stop time.

@abstractmethod
async def monitor_logs_async( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[], str], stop_callback_function: Callable[[Tuple[bool, str]], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
128    @abstractmethod
129    async def monitor_logs_async(
130        self,
131        name: str,
132        callback_function: Callable[[Any], Any],
133        input_callback_function: Callable[[], str],
134        stop_callback_function: Callable[[SuccessTuple], str],
135        stop_on_exit: bool = False,
136        strip_timestamps: bool = False,
137        accept_input: bool = True,
138        debug: bool = False,
139    ):
140        """
141        Monitor a job's log files and await a callback with the changes.
142        """

Monitor a job's log files and await a callback with the changes.

@abstractmethod
def monitor_logs(self, *args, **kwargs):
144    @abstractmethod
145    def monitor_logs(self, *args, **kwargs):
146        """
147        Monitor a job's log files.
148        """

Monitor a job's log files.

@abstractmethod
def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
150    @abstractmethod
151    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
152        """
153        Return whether a job is blocking on stdin.
154        """

Return whether a job is blocking on stdin.

@abstractmethod
def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
156    @abstractmethod
157    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
158        """
159        Return the kwargs to the blocking prompt.
160        """

Return the kwargs to the blocking prompt.

def check_restart_jobs( executor_keys: Optional[str] = 'local', jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = True, silent: bool = False, debug: bool = False) -> Tuple[bool, str]:
297def check_restart_jobs(
298    executor_keys: Optional[str] = 'local',
299    jobs: Optional[Dict[str, Job]] = None,
300    include_hidden: bool = True,
301    silent: bool = False,
302    debug: bool = False,
303) -> SuccessTuple:
304    """
305    Restart any stopped jobs which were created with `--restart`.
306
307    Parameters
308    ----------
309    executor_keys: Optional[str], default None
310        If provided, check jobs on the given remote API instance.
311        Otherwise check local jobs.
312
313    include_hidden: bool, default True
314        If `True`, include hidden jobs in the check.
315
316    silent: bool, default False
317        If `True`, do not print the restart success message.
318    """
319    from meerschaum.utils.misc import items_str
320
321    if jobs is None:
322        jobs = get_jobs(
323            executor_keys,
324            include_hidden=include_hidden,
325            combine_local_and_systemd=False,
326            debug=debug,
327        )
328
329    if not jobs:
330        return True, "No jobs to restart."
331
332    results = {}
333    for name, job in jobs.items():
334        check_success, check_msg = job.check_restart()
335        results[job.name] = (check_success, check_msg)
336        if not silent:
337            mrsm.pprint((check_success, check_msg))
338
339    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
340    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
341    success = len(success_names) == len(jobs)
342    msg = (
343        (
344            "Successfully restarted job"
345            + ('s' if len(success_names) != 1 else '')
346            + ' ' + items_str(success_names) + '.'
347        )
348        if success
349        else (
350            "Failed to restart job"
351            + ('s' if len(success_names) != 1 else '')
352            + ' ' + items_str(fail_names) + '.'
353        )
354    )
355    return success, msg

Restart any stopped jobs which were created with --restart.

Parameters
  • executor_keys (Optional[str], default None): If provided, check jobs on the given remote API instance. Otherwise check local jobs.
  • include_hidden (bool, default True): If True, include hidden jobs in the check.
  • silent (bool, default False): If True, do not print the restart success message.
def start_check_jobs_thread():
367def start_check_jobs_thread():
368    """
369    Start a thread to regularly monitor jobs.
370    """
371    import atexit
372    from functools import partial
373    from meerschaum.utils.threading import RepeatTimer
374    from meerschaum._internal.static import STATIC_CONFIG
375
376    global _check_loop_stop_thread
377    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
378
379    _check_loop_stop_thread = RepeatTimer(
380        sleep_seconds,
381        partial(
382            _check_restart_jobs_against_lock,
383            silent=True,
384        )
385    )
386    _check_loop_stop_thread.daemon = True
387    atexit.register(stop_check_jobs_thread)
388
389    _check_loop_stop_thread.start()
390    return _check_loop_stop_thread

Start a thread to regularly monitor jobs.

def stop_check_jobs_thread():
393def stop_check_jobs_thread():
394    """
395    Stop the job monitoring thread.
396    """
397    import meerschaum.config.paths as paths
398    from meerschaum.utils.warnings import warn
399    if _check_loop_stop_thread is None:
400        return
401
402    _check_loop_stop_thread.cancel()
403
404    try:
405        if paths.CHECK_JOBS_LOCK_PATH.exists():
406            paths.CHECK_JOBS_LOCK_PATH.unlink()
407    except Exception as e:
408        warn(f"Failed to remove check jobs lock file:\n{e}")

Stop the job monitoring thread.