meerschaum.jobs

Higher-level utilities for managing meerschaum.utils.daemon.Daemon.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Higher-level utilities for managing `meerschaum.utils.daemon.Daemon`.
  7"""
  8
  9import meerschaum as mrsm
 10from meerschaum.utils.typing import Dict, Optional, List, SuccessTuple
 11
 12from meerschaum.jobs._Job import Job, StopMonitoringLogs
 13from meerschaum.jobs._Executor import Executor
 14
 15__all__ = (
 16    'Job',
 17    'StopMonitoringLogs',
 18    'systemd',
 19    'get_jobs',
 20    'get_filtered_jobs',
 21    'get_restart_jobs',
 22    'get_running_jobs',
 23    'get_stopped_jobs',
 24    'get_paused_jobs',
 25    'get_restart_jobs',
 26    'make_executor',
 27    'Executor',
 28    'check_restart_jobs',
 29    'start_check_jobs_thread',
 30    'stop_check_jobs_thread',
 31)
 32
 33executor_types: List[str] = ['api', 'local']
 34
 35
 36def get_jobs(
 37    executor_keys: Optional[str] = None,
 38    include_hidden: bool = False,
 39    combine_local_and_systemd: bool = True,
 40    debug: bool = False,
 41) -> Dict[str, Job]:
 42    """
 43    Return a dictionary of the existing jobs.
 44
 45    Parameters
 46    ----------
 47    executor_keys: Optional[str], default None
 48        If provided, return remote jobs on the given API instance.
 49        Otherwise return local jobs.
 50
 51    include_hidden: bool, default False
 52        If `True`, include jobs with the `hidden` attribute.
 53
 54    Returns
 55    -------
 56    A dictionary mapping job names to jobs.
 57    """
 58    from meerschaum.connectors.parse import parse_executor_keys
 59    executor_keys = executor_keys or get_executor_keys_from_context()
 60
 61    include_local_and_system = (
 62        combine_local_and_systemd
 63        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 64        and get_executor_keys_from_context() == 'systemd'
 65    )
 66
 67    def _get_local_jobs():
 68        from meerschaum.utils.daemon import get_daemons
 69        daemons = get_daemons()
 70        jobs = {
 71            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 72            for daemon in daemons
 73        }
 74        return {
 75            name: job
 76            for name, job in jobs.items()
 77            if (include_hidden or not job.hidden) and not job._is_externally_managed
 78
 79        }
 80
 81    def _get_systemd_jobs():
 82        from meerschaum.jobs.systemd import SystemdExecutor
 83        conn = SystemdExecutor('systemd')
 84        jobs = conn.get_jobs(debug=debug)
 85        return {
 86            name: job
 87            for name, job in jobs.items()
 88            if include_hidden or not job.hidden
 89        }
 90
 91    if include_local_and_system:
 92        local_jobs = _get_local_jobs()
 93        systemd_jobs = _get_systemd_jobs()
 94        shared_jobs = set(local_jobs) & set(systemd_jobs)
 95        if shared_jobs:
 96            from meerschaum.utils.misc import items_str
 97            from meerschaum.utils.warnings import warn
 98            warn(
 99                "Job"
100                + ('s' if len(shared_jobs) != 1 else '')
101                + f" {items_str(list(shared_jobs))} "
102                + "exist"
103                + ('s' if len(shared_jobs) == 1 else '')
104                + " in both `local` and `systemd`.",
105                stack=False,
106            )
107        return {**local_jobs, **systemd_jobs}
108
109    if executor_keys == 'local':
110        return _get_local_jobs()
111
112    if executor_keys == 'systemd':
113        return _get_systemd_jobs()
114
115    try:
116        _ = parse_executor_keys(executor_keys, construct=False)
117        conn = parse_executor_keys(executor_keys)
118        jobs = conn.get_jobs(debug=debug)
119        return {
120            name: job
121            for name, job in jobs.items()
122            if include_hidden or not job.hidden
123        }
124    except Exception:
125        return {}
126
127
128def get_filtered_jobs(
129    executor_keys: Optional[str] = None,
130    filter_list: Optional[List[str]] = None,
131    include_hidden: bool = False,
132    combine_local_and_systemd: bool = True,
133    warn: bool = False,
134    debug: bool = False,
135) -> Dict[str, Job]:
136    """
137    Return a list of jobs filtered by the user.
138    """
139    from meerschaum.utils.warnings import warn as _warn
140    jobs = get_jobs(
141        executor_keys,
142        include_hidden=True,
143        combine_local_and_systemd=combine_local_and_systemd,
144        debug=debug,
145    )
146    if not filter_list:
147        return {
148            name: job
149            for name, job in jobs.items()
150            if include_hidden or not job.hidden
151        }
152
153    jobs_to_return = {}
154    filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')]
155    filter_list_with_underscores = [name for name in filter_list if name.startswith('_')]
156    if (
157        filter_list_without_underscores and not filter_list_with_underscores
158        or filter_list_with_underscores and not filter_list_without_underscores
159    ):
160        pass
161    for name in filter_list:
162        job = jobs.get(name, None)
163        if job is None:
164            if warn:
165                _warn(
166                    f"Job '{name}' does not exist.",
167                    stack=False,
168                )
169            continue
170        jobs_to_return[name] = job
171
172    if not jobs_to_return and filter_list_with_underscores:
173        names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores]
174        return {
175            name: job
176            for name, job in jobs.items()
177            if name not in names_to_exclude
178        }
179
180    return jobs_to_return
181
182
183def get_restart_jobs(
184    executor_keys: Optional[str] = None,
185    jobs: Optional[Dict[str, Job]] = None,
186    include_hidden: bool = False,
187    combine_local_and_systemd: bool = True,
188    debug: bool = False,
189) -> Dict[str, Job]:
190    """
191    Return jobs which were created with `--restart` or `--loop`.
192    """
193    if jobs is None:
194        jobs = get_jobs(
195            executor_keys,
196            include_hidden=include_hidden,
197            combine_local_and_systemd=combine_local_and_systemd,
198            debug=debug,
199        )
200
201    return {
202        name: job
203        for name, job in jobs.items()
204        if job.restart
205    }
206
207
208def get_running_jobs(
209    executor_keys: Optional[str] = None,
210    jobs: Optional[Dict[str, Job]] = None,
211    include_hidden: bool = False,
212    combine_local_and_systemd: bool = True,
213    debug: bool = False,
214) -> Dict[str, Job]:
215    """
216    Return a dictionary of running jobs.
217    """
218    if jobs is None:
219        jobs = get_jobs(
220            executor_keys,
221            include_hidden=include_hidden,
222            combine_local_and_systemd=combine_local_and_systemd,
223            debug=debug,
224        )
225
226    return {
227        name: job
228        for name, job in jobs.items()
229        if job.status == 'running'
230    }
231
232
233def get_paused_jobs(
234    executor_keys: Optional[str] = None,
235    jobs: Optional[Dict[str, Job]] = None,
236    include_hidden: bool = False,
237    combine_local_and_systemd: bool = True,
238    debug: bool = False,
239) -> Dict[str, Job]:
240    """
241    Return a dictionary of paused jobs.
242    """
243    if jobs is None:
244        jobs = get_jobs(
245            executor_keys,
246            include_hidden=include_hidden,
247            combine_local_and_systemd=combine_local_and_systemd,
248            debug=debug,
249        )
250
251    return {
252        name: job
253        for name, job in jobs.items()
254        if job.status == 'paused'
255    }
256
257
258def get_stopped_jobs(
259    executor_keys: Optional[str] = None,
260    jobs: Optional[Dict[str, Job]] = None,
261    include_hidden: bool = False,
262    combine_local_and_systemd: bool = True,
263    debug: bool = False,
264) -> Dict[str, Job]:
265    """
266    Return a dictionary of stopped jobs.
267    """
268    if jobs is None:
269        jobs = get_jobs(
270            executor_keys,
271            include_hidden=include_hidden,
272            combine_local_and_systemd=combine_local_and_systemd,
273            debug=debug,
274        )
275
276    return {
277        name: job
278        for name, job in jobs.items()
279        if job.status == 'stopped'
280    }
281
282
283def make_executor(cls):
284    """
285    Register a class as an `Executor`.
286    """
287    import re
288    from meerschaum.connectors import make_connector
289    suffix_regex = r'executor$'
290    typ = re.sub(suffix_regex, '', cls.__name__.lower())
291    if typ not in executor_types:
292        executor_types.append(typ)
293    return make_connector(cls, _is_executor=True)
294
295
296def check_restart_jobs(
297    executor_keys: Optional[str] = 'local',
298    jobs: Optional[Dict[str, Job]] = None,
299    include_hidden: bool = True,
300    silent: bool = False,
301    debug: bool = False,
302) -> SuccessTuple:
303    """
304    Restart any stopped jobs which were created with `--restart`.
305
306    Parameters
307    ----------
308    executor_keys: Optional[str], default None
309        If provided, check jobs on the given remote API instance.
310        Otherwise check local jobs.
311
312    include_hidden: bool, default True
313        If `True`, include hidden jobs in the check.
314
315    silent: bool, default False
316        If `True`, do not print the restart success message.
317    """
318    from meerschaum.utils.misc import items_str
319
320    if jobs is None:
321        jobs = get_jobs(
322            executor_keys,
323            include_hidden=include_hidden,
324            combine_local_and_systemd=False,
325            debug=debug,
326        )
327
328    if not jobs:
329        return True, "No jobs to restart."
330
331    results = {}
332    for name, job in jobs.items():
333        check_success, check_msg = job.check_restart()
334        results[job.name] = (check_success, check_msg)
335        if not silent:
336            mrsm.pprint((check_success, check_msg))
337
338    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
339    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
340    success = len(success_names) == len(jobs)
341    msg = (
342        (
343            "Successfully restarted job"
344            + ('s' if len(success_names) != 1 else '')
345            + ' ' + items_str(success_names) + '.'
346        )
347        if success
348        else (
349            "Failed to restart job"
350            + ('s' if len(success_names) != 1 else '')
351            + ' ' + items_str(fail_names) + '.'
352        )
353    )
354    return success, msg
355
356
357def _check_restart_jobs_against_lock(*args, **kwargs):
358    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
359    fasteners = mrsm.attempt_import('fasteners', lazy=False)
360    lock = fasteners.InterProcessLock(CHECK_JOBS_LOCK_PATH)
361    with lock:
362        check_restart_jobs(*args, **kwargs)
363
364
365_check_loop_stop_thread = None
366def start_check_jobs_thread():
367    """
368    Start a thread to regularly monitor jobs.
369    """
370    import atexit
371    from functools import partial
372    from meerschaum.utils.threading import RepeatTimer
373    from meerschaum._internal.static import STATIC_CONFIG
374
375    global _check_loop_stop_thread
376    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
377
378    _check_loop_stop_thread = RepeatTimer(
379        sleep_seconds,
380        partial(
381            _check_restart_jobs_against_lock,
382            silent=True,
383        )
384    )
385    _check_loop_stop_thread.daemon = True
386    atexit.register(stop_check_jobs_thread)
387
388    _check_loop_stop_thread.start()
389    return _check_loop_stop_thread
390
391
392def stop_check_jobs_thread():
393    """
394    Stop the job monitoring thread.
395    """
396    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
397    from meerschaum.utils.warnings import warn
398    if _check_loop_stop_thread is None:
399        return
400
401    _check_loop_stop_thread.cancel()
402
403    try:
404        if CHECK_JOBS_LOCK_PATH.exists():
405            CHECK_JOBS_LOCK_PATH.unlink()
406    except Exception as e:
407        warn(f"Failed to remove check jobs lock file:\n{e}")
408
409
410_context_keys = None
411def get_executor_keys_from_context() -> str:
412    """
413    If we are running on the host with the default root, default to `'systemd'`.
414    Otherwise return `'local'`.
415    """
416    global _context_keys
417
418    if _context_keys is not None:
419        return _context_keys
420
421    from meerschaum.config import get_config
422    from meerschaum.config.paths import ROOT_DIR_PATH, DEFAULT_ROOT_DIR_PATH
423    from meerschaum.utils.misc import is_systemd_available
424
425    configured_executor = get_config('meerschaum', 'executor', warn=False)
426    if configured_executor is not None:
427        return configured_executor
428
429    _context_keys = (
430        'systemd'
431        if is_systemd_available() and ROOT_DIR_PATH == DEFAULT_ROOT_DIR_PATH
432        else 'local'
433    )
434    return _context_keys
435
436
437def _install_healthcheck_job() -> SuccessTuple:
438    """
439    Install the systemd job which checks local jobs.
440    """
441    from meerschaum.config import get_config
442
443    enable_healthcheck = get_config('system', 'experimental', 'systemd_healthcheck')
444    if not enable_healthcheck:
445        return False, "Local healthcheck is disabled."
446
447    if get_executor_keys_from_context() != 'systemd':
448        return False, "Not running systemd."
449
450    job = Job(
451        '.local-healthcheck',
452        ['restart', 'jobs', '-e', 'local', '--loop', '--min-seconds', '60'],
453        executor_keys='systemd',
454    )
455    return job.start()
class Job:
  70class Job:
  71    """
  72    Manage a `meerschaum.utils.daemon.Daemon`, locally or remotely via the API.
  73    """
  74
  75    def __init__(
  76        self,
  77        name: str,
  78        sysargs: Union[List[str], str, None] = None,
  79        env: Optional[Dict[str, str]] = None,
  80        executor_keys: Optional[str] = None,
  81        delete_after_completion: bool = False,
  82        refresh_seconds: Union[int, float, None] = None,
  83        _properties: Optional[Dict[str, Any]] = None,
  84        _rotating_log=None,
  85        _stdin_file=None,
  86        _status_hook: Optional[Callable[[], str]] = None,
  87        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
  88        _externally_managed: bool = False,
  89    ):
  90        """
  91        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
  92
  93        Parameters
  94        ----------
  95        name: str
  96            The name of the job to be created.
  97            This will also be used as the Daemon ID.
  98
  99        sysargs: Union[List[str], str, None], default None
 100            The sysargs of the command to be executed, e.g. 'start api'.
 101
 102        env: Optional[Dict[str, str]], default None
 103            If provided, set these environment variables in the job's process.
 104
 105        executor_keys: Optional[str], default None
 106            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
 107
 108        delete_after_completion: bool, default False
 109            If `True`, delete this job when it has finished executing.
 110
 111        refresh_seconds: Union[int, float, None], default None
 112            The number of seconds to sleep between refreshes.
 113            Defaults to the configured value `system.cli.refresh_seconds`.
 114
 115        _properties: Optional[Dict[str, Any]], default None
 116            If provided, use this to patch the daemon's properties.
 117        """
 118        from meerschaum.utils.daemon import Daemon
 119        for char in BANNED_CHARS:
 120            if char in name:
 121                raise ValueError(f"Invalid name: ({char}) is not allowed.")
 122
 123        if isinstance(sysargs, str):
 124            sysargs = shlex.split(sysargs)
 125
 126        and_key = STATIC_CONFIG['system']['arguments']['and_key']
 127        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
 128        if sysargs:
 129            sysargs = [
 130                (arg if arg != escaped_and_key else and_key)
 131                for arg in sysargs
 132            ]
 133
 134        ### NOTE: 'local' and 'systemd' executors are being coalesced.
 135        if executor_keys is None:
 136            from meerschaum.jobs import get_executor_keys_from_context
 137            executor_keys = get_executor_keys_from_context()
 138
 139        self.executor_keys = executor_keys
 140        self.name = name
 141        self.refresh_seconds = (
 142            refresh_seconds
 143            if refresh_seconds is not None
 144            else mrsm.get_config('system', 'cli', 'refresh_seconds')
 145        )
 146        try:
 147            self._daemon = (
 148                Daemon(daemon_id=name)
 149                if executor_keys == 'local'
 150                else None
 151            )
 152        except Exception:
 153            self._daemon = None
 154
 155        ### Handle any injected dependencies.
 156        if _rotating_log is not None:
 157            self._rotating_log = _rotating_log
 158            if self._daemon is not None:
 159                self._daemon._rotating_log = _rotating_log
 160
 161        if _stdin_file is not None:
 162            self._stdin_file = _stdin_file
 163            if self._daemon is not None:
 164                self._daemon._stdin_file = _stdin_file
 165                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
 166
 167        if _status_hook is not None:
 168            self._status_hook = _status_hook
 169
 170        if _result_hook is not None:
 171            self._result_hook = _result_hook
 172
 173        self._externally_managed = _externally_managed
 174        self._properties_patch = _properties or {}
 175        if _externally_managed:
 176            self._properties_patch.update({'externally_managed': _externally_managed})
 177
 178        if env:
 179            self._properties_patch.update({'env': env})
 180
 181        if delete_after_completion:
 182            self._properties_patch.update({'delete_after_completion': delete_after_completion})
 183
 184        daemon_sysargs = (
 185            self._daemon.properties.get('target', {}).get('args', [None])[0]
 186            if self._daemon is not None
 187            else None
 188        )
 189
 190        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
 191            warn("Given sysargs differ from existing sysargs.")
 192
 193        self._sysargs = [
 194            arg
 195            for arg in (daemon_sysargs or sysargs or [])
 196            if arg not in ('-d', '--daemon')
 197        ]
 198        for restart_flag in RESTART_FLAGS:
 199            if restart_flag in self._sysargs:
 200                self._properties_patch.update({'restart': True})
 201                break
 202
 203    @staticmethod
 204    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
 205        """
 206        Build a `Job` from the PID of a running Meerschaum process.
 207
 208        Parameters
 209        ----------
 210        pid: int
 211            The PID of the process.
 212
 213        executor_keys: Optional[str], default None
 214            The executor keys to assign to the job.
 215        """
 216        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
 217
 218        psutil = mrsm.attempt_import('psutil')
 219        try:
 220            process = psutil.Process(pid)
 221        except psutil.NoSuchProcess as e:
 222            warn(f"Process with PID {pid} does not exist.", stack=False)
 223            raise e
 224
 225        command_args = process.cmdline()
 226        is_daemon = command_args[1] == '-c'
 227
 228        if is_daemon:
 229            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
 230            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
 231            if root_dir is None:
 232                from meerschaum.config.paths import ROOT_DIR_PATH
 233                root_dir = ROOT_DIR_PATH
 234            else:
 235                root_dir = pathlib.Path(root_dir)
 236            jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name
 237            daemon_dir = jobs_dir / daemon_id
 238            pid_file = daemon_dir / 'process.pid'
 239
 240            if pid_file.exists():
 241                with open(pid_file, 'r', encoding='utf-8') as f:
 242                    daemon_pid = int(f.read())
 243
 244                if pid != daemon_pid:
 245                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
 246            else:
 247                raise EnvironmentError(f"Is job '{daemon_id}' running?")
 248
 249            return Job(daemon_id, executor_keys=executor_keys)
 250
 251        from meerschaum._internal.arguments._parse_arguments import parse_arguments
 252        from meerschaum.utils.daemon import get_new_daemon_name
 253
 254        mrsm_ix = 0
 255        for i, arg in enumerate(command_args):
 256            if 'mrsm' in arg or 'meerschaum' in arg.lower():
 257                mrsm_ix = i
 258                break
 259
 260        sysargs = command_args[mrsm_ix+1:]
 261        kwargs = parse_arguments(sysargs)
 262        name = kwargs.get('name', get_new_daemon_name())
 263        return Job(name, sysargs, executor_keys=executor_keys)
 264
 265    def start(self, debug: bool = False) -> SuccessTuple:
 266        """
 267        Start the job's daemon.
 268        """
 269        if self.executor is not None:
 270            if not self.exists(debug=debug):
 271                return self.executor.create_job(
 272                    self.name,
 273                    self.sysargs,
 274                    properties=self.daemon.properties,
 275                    debug=debug,
 276                )
 277            return self.executor.start_job(self.name, debug=debug)
 278
 279        if self.is_running():
 280            return True, f"{self} is already running."
 281
 282        success, msg = self.daemon.run(
 283            keep_daemon_output=(not self.delete_after_completion),
 284            allow_dirty_run=True,
 285        )
 286        if not success:
 287            return success, msg
 288
 289        return success, f"Started {self}."
 290
 291    def stop(
 292        self,
 293        timeout_seconds: Union[int, float, None] = None,
 294        debug: bool = False,
 295    ) -> SuccessTuple:
 296        """
 297        Stop the job's daemon.
 298        """
 299        if self.executor is not None:
 300            return self.executor.stop_job(self.name, debug=debug)
 301
 302        if self.daemon.status == 'stopped':
 303            if not self.restart:
 304                return True, f"{self} is not running."
 305            elif self.stop_time is not None:
 306                return True, f"{self} will not restart until manually started."
 307
 308        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
 309        if quit_success:
 310            return quit_success, f"Stopped {self}."
 311
 312        warn(
 313            f"Failed to gracefully quit {self}.",
 314            stack=False,
 315        )
 316        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
 317        if not kill_success:
 318            return kill_success, kill_msg
 319
 320        return kill_success, f"Killed {self}."
 321
 322    def pause(
 323        self,
 324        timeout_seconds: Union[int, float, None] = None,
 325        debug: bool = False,
 326    ) -> SuccessTuple:
 327        """
 328        Pause the job's daemon.
 329        """
 330        if self.executor is not None:
 331            return self.executor.pause_job(self.name, debug=debug)
 332
 333        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
 334        if not pause_success:
 335            return pause_success, pause_msg
 336
 337        return pause_success, f"Paused {self}."
 338
 339    def delete(self, debug: bool = False) -> SuccessTuple:
 340        """
 341        Delete the job and its daemon.
 342        """
 343        if self.executor is not None:
 344            return self.executor.delete_job(self.name, debug=debug)
 345
 346        if self.is_running():
 347            stop_success, stop_msg = self.stop()
 348            if not stop_success:
 349                return stop_success, stop_msg
 350
 351        cleanup_success, cleanup_msg = self.daemon.cleanup()
 352        if not cleanup_success:
 353            return cleanup_success, cleanup_msg
 354
 355        _ = self.daemon._properties.pop('result', None)
 356        return cleanup_success, f"Deleted {self}."
 357
 358    def is_running(self) -> bool:
 359        """
 360        Determine whether the job's daemon is running.
 361        """
 362        return self.status == 'running'
 363
 364    def exists(self, debug: bool = False) -> bool:
 365        """
 366        Determine whether the job exists.
 367        """
 368        if self.executor is not None:
 369            return self.executor.get_job_exists(self.name, debug=debug)
 370
 371        return self.daemon.path.exists()
 372
 373    def get_logs(self) -> Union[str, None]:
 374        """
 375        Return the output text of the job's daemon.
 376        """
 377        if self.executor is not None:
 378            return self.executor.get_logs(self.name)
 379
 380        return self.daemon.log_text
 381
 382    def monitor_logs(
 383        self,
 384        callback_function: Callable[[str], None] = _default_stdout_callback,
 385        input_callback_function: Optional[Callable[[], str]] = None,
 386        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
 387        stop_event: Optional[asyncio.Event] = None,
 388        stop_on_exit: bool = False,
 389        strip_timestamps: bool = False,
 390        accept_input: bool = True,
 391        debug: bool = False,
 392        _logs_path: Optional[pathlib.Path] = None,
 393        _log=None,
 394        _stdin_file=None,
 395        _wait_if_stopped: bool = True,
 396    ):
 397        """
 398        Monitor the job's log files and execute a callback on new lines.
 399
 400        Parameters
 401        ----------
 402        callback_function: Callable[[str], None], default partial(print, end='')
 403            The callback to execute as new data comes in.
 404            Defaults to printing the output directly to `stdout`.
 405
 406        input_callback_function: Optional[Callable[[], str]], default None
 407            If provided, execute this callback when the daemon is blocking on stdin.
 408            Defaults to `sys.stdin.readline()`.
 409
 410        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
 411            If provided, execute this callback when the daemon stops.
 412            The job's SuccessTuple will be passed to the callback.
 413
 414        stop_event: Optional[asyncio.Event], default None
 415            If provided, stop monitoring when this event is set.
 416            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
 417            from within `callback_function` to stop monitoring.
 418
 419        stop_on_exit: bool, default False
 420            If `True`, stop monitoring when the job stops.
 421
 422        strip_timestamps: bool, default False
 423            If `True`, remove leading timestamps from lines.
 424
 425        accept_input: bool, default True
 426            If `True`, accept input when the daemon blocks on stdin.
 427        """
 428        if self.executor is not None:
 429            self.executor.monitor_logs(
 430                self.name,
 431                callback_function,
 432                input_callback_function=input_callback_function,
 433                stop_callback_function=stop_callback_function,
 434                stop_on_exit=stop_on_exit,
 435                accept_input=accept_input,
 436                strip_timestamps=strip_timestamps,
 437                debug=debug,
 438            )
 439            return
 440
 441        monitor_logs_coroutine = self.monitor_logs_async(
 442            callback_function=callback_function,
 443            input_callback_function=input_callback_function,
 444            stop_callback_function=stop_callback_function,
 445            stop_event=stop_event,
 446            stop_on_exit=stop_on_exit,
 447            strip_timestamps=strip_timestamps,
 448            accept_input=accept_input,
 449            debug=debug,
 450            _logs_path=_logs_path,
 451            _log=_log,
 452            _stdin_file=_stdin_file,
 453            _wait_if_stopped=_wait_if_stopped,
 454        )
 455        return asyncio.run(monitor_logs_coroutine)
 456
 457    async def monitor_logs_async(
 458        self,
 459        callback_function: Callable[[str], None] = _default_stdout_callback,
 460        input_callback_function: Optional[Callable[[], str]] = None,
 461        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
 462        stop_event: Optional[asyncio.Event] = None,
 463        stop_on_exit: bool = False,
 464        strip_timestamps: bool = False,
 465        accept_input: bool = True,
 466        debug: bool = False,
 467        _logs_path: Optional[pathlib.Path] = None,
 468        _log=None,
 469        _stdin_file=None,
 470        _wait_if_stopped: bool = True,
 471    ):
 472        """
 473        Monitor the job's log files and await a callback on new lines.
 474
 475        Parameters
 476        ----------
 477        callback_function: Callable[[str], None], default _default_stdout_callback
 478            The callback to execute as new data comes in.
 479            Defaults to printing the output directly to `stdout`.
 480
 481        input_callback_function: Optional[Callable[[], str]], default None
 482            If provided, execute this callback when the daemon is blocking on stdin.
 483            Defaults to `sys.stdin.readline()`.
 484
 485        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
 486            If provided, execute this callback when the daemon stops.
 487            The job's SuccessTuple will be passed to the callback.
 488
 489        stop_event: Optional[asyncio.Event], default None
 490            If provided, stop monitoring when this event is set.
 491            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
 492            from within `callback_function` to stop monitoring.
 493
 494        stop_on_exit: bool, default False
 495            If `True`, stop monitoring when the job stops.
 496
 497        strip_timestamps: bool, default False
 498            If `True`, remove leading timestamps from lines.
 499
 500        accept_input: bool, default True
 501            If `True`, accept input when the daemon blocks on stdin.
 502        """
 503        from meerschaum.utils.prompt import prompt
 504
 505        def default_input_callback_function():
 506            prompt_kwargs = self.get_prompt_kwargs(debug=debug)
 507            if prompt_kwargs:
 508                answer = prompt(**prompt_kwargs)
 509                return answer + '\n'
 510            return sys.stdin.readline()
 511
 512        if input_callback_function is None:
 513            input_callback_function = default_input_callback_function
 514
 515        if self.executor is not None:
 516            await self.executor.monitor_logs_async(
 517                self.name,
 518                callback_function,
 519                input_callback_function=input_callback_function,
 520                stop_callback_function=stop_callback_function,
 521                stop_on_exit=stop_on_exit,
 522                strip_timestamps=strip_timestamps,
 523                accept_input=accept_input,
 524                debug=debug,
 525            )
 526            return
 527
 528        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
 529
 530        events = {
 531            'user': stop_event,
 532            'stopped': asyncio.Event(),
 533            'stop_token': asyncio.Event(),
 534            'stop_exception': asyncio.Event(),
 535            'stopped_timeout': asyncio.Event(),
 536        }
 537        combined_event = asyncio.Event()
 538        emitted_text = False
 539        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
 540
 541        async def check_job_status():
 542            if not stop_on_exit:
 543                return
 544
 545            nonlocal emitted_text
 546
 547            sleep_time = 0.1
 548            while sleep_time < 0.2:
 549                if self.status == 'stopped':
 550                    if not emitted_text and _wait_if_stopped:
 551                        await asyncio.sleep(sleep_time)
 552                        sleep_time = round(sleep_time * 1.1, 3)
 553                        continue
 554
 555                    if stop_callback_function is not None:
 556                        try:
 557                            if asyncio.iscoroutinefunction(stop_callback_function):
 558                                await stop_callback_function(self.result)
 559                            else:
 560                                stop_callback_function(self.result)
 561                        except asyncio.exceptions.CancelledError:
 562                            break
 563                        except Exception:
 564                            warn(traceback.format_exc())
 565
 566                    if stop_on_exit:
 567                        events['stopped'].set()
 568
 569                    break
 570                await asyncio.sleep(0.1)
 571
 572            events['stopped_timeout'].set()
 573
 574        async def check_blocking_on_input():
 575            while True:
 576                if not emitted_text or not self.is_blocking_on_stdin():
 577                    try:
 578                        await asyncio.sleep(self.refresh_seconds)
 579                    except asyncio.exceptions.CancelledError:
 580                        break
 581                    continue
 582
 583                if not self.is_running():
 584                    break
 585
 586                await emit_latest_lines()
 587
 588                try:
 589                    print('', end='', flush=True)
 590                    if asyncio.iscoroutinefunction(input_callback_function):
 591                        data = await input_callback_function()
 592                    else:
 593                        loop = asyncio.get_running_loop()
 594                        data = await loop.run_in_executor(None, input_callback_function)
 595                except KeyboardInterrupt:
 596                    break
 597                #  if not data.endswith('\n'):
 598                    #  data += '\n'
 599
 600                stdin_file.write(data)
 601                await asyncio.sleep(self.refresh_seconds)
 602
 603        async def combine_events():
 604            event_tasks = [
 605                asyncio.create_task(event.wait())
 606                for event in events.values()
 607                if event is not None
 608            ]
 609            if not event_tasks:
 610                return
 611
 612            try:
 613                done, pending = await asyncio.wait(
 614                    event_tasks,
 615                    return_when=asyncio.FIRST_COMPLETED,
 616                )
 617                for task in pending:
 618                    task.cancel()
 619            except asyncio.exceptions.CancelledError:
 620                pass
 621            finally:
 622                combined_event.set()
 623
 624        check_job_status_task = asyncio.create_task(check_job_status())
 625        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
 626        combine_events_task = asyncio.create_task(combine_events())
 627
 628        log = _log if _log is not None else self.daemon.rotating_log
 629        lines_to_show = (
 630            self.daemon.properties.get(
 631                'logs', {}
 632            ).get(
 633                'lines_to_show', get_config('jobs', 'logs', 'lines_to_show')
 634            )
 635        )
 636
 637        async def emit_latest_lines():
 638            nonlocal emitted_text
 639            nonlocal stop_event
 640            lines = log.readlines()
 641            for line in lines[(-1 * lines_to_show):]:
 642                if stop_event is not None and stop_event.is_set():
 643                    return
 644
 645                line_stripped_extra = strip_timestamp_from_line(line.strip())
 646                line_stripped = strip_timestamp_from_line(line)
 647
 648                if line_stripped_extra == STOP_TOKEN:
 649                    events['stop_token'].set()
 650                    return
 651
 652                if line_stripped_extra == CLEAR_TOKEN:
 653                    clear_screen(debug=debug)
 654                    continue
 655
 656                if line_stripped_extra == FLUSH_TOKEN.strip():
 657                    line_stripped = ''
 658                    line = ''
 659
 660                if strip_timestamps:
 661                    line = line_stripped
 662
 663                try:
 664                    if asyncio.iscoroutinefunction(callback_function):
 665                        await callback_function(line)
 666                    else:
 667                        callback_function(line)
 668                    emitted_text = True
 669                except StopMonitoringLogs:
 670                    events['stop_exception'].set()
 671                    return
 672                except Exception:
 673                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
 674
 675        await emit_latest_lines()
 676
 677        tasks = (
 678            [check_job_status_task]
 679            + ([check_blocking_on_input_task] if accept_input else [])
 680            + [combine_events_task]
 681        )
 682        try:
 683            _ = asyncio.gather(*tasks, return_exceptions=True)
 684        except asyncio.exceptions.CancelledError:
 685            raise
 686        except Exception:
 687            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
 688
 689        watchfiles = mrsm.attempt_import('watchfiles', lazy=False)
 690        dir_path_to_monitor = (
 691            _logs_path
 692            or (log.file_path.parent if log else None)
 693            or LOGS_RESOURCES_PATH
 694        )
 695        async for changes in watchfiles.awatch(
 696            dir_path_to_monitor,
 697            stop_event=combined_event,
 698        ):
 699            for change in changes:
 700                file_path_str = change[1]
 701                file_path = pathlib.Path(file_path_str)
 702                latest_subfile_path = log.get_latest_subfile_path()
 703                if latest_subfile_path != file_path:
 704                    continue
 705
 706                await emit_latest_lines()
 707
 708        await emit_latest_lines()
 709
 710    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
 711        """
 712        Return whether a job's daemon is blocking on stdin.
 713        """
 714        if self.executor is not None:
 715            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
 716
 717        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()
 718
 719    def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]:
 720        """
 721        Return the kwargs to the blocking `prompt()`, if available.
 722        """
 723        if self.executor is not None:
 724            return self.executor.get_job_prompt_kwargs(self.name, debug=debug)
 725
 726        if not self.daemon.prompt_kwargs_file_path.exists():
 727            return {}
 728
 729        try:
 730            with open(self.daemon.prompt_kwargs_file_path, 'r', encoding='utf-8') as f:
 731                prompt_kwargs = json.load(f)
 732
 733            return prompt_kwargs
 734        
 735        except Exception:
 736            import traceback
 737            traceback.print_exc()
 738            return {}
 739
 740    def write_stdin(self, data):
 741        """
 742        Write to a job's daemon's `stdin`.
 743        """
 744        self.daemon.stdin_file.write(data)
 745
 746    @property
 747    def executor(self) -> Union[Executor, None]:
 748        """
 749        If the job is remote, return the connector to the remote API instance.
 750        """
 751        return (
 752            mrsm.get_connector(self.executor_keys)
 753            if self.executor_keys != 'local'
 754            else None
 755        )
 756
 757    @property
 758    def status(self) -> str:
 759        """
 760        Return the running status of the job's daemon.
 761        """
 762        if '_status_hook' in self.__dict__:
 763            return self._status_hook()
 764
 765        if self.executor is not None:
 766            return self.executor.get_job_status(self.name)
 767
 768        return self.daemon.status
 769
 770    @property
 771    def pid(self) -> Union[int, None]:
 772        """
 773        Return the PID of the job's dameon.
 774        """
 775        if self.executor is not None:
 776            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
 777
 778        return self.daemon.pid
 779
 780    @property
 781    def restart(self) -> bool:
 782        """
 783        Return whether to restart a stopped job.
 784        """
 785        if self.executor is not None:
 786            return self.executor.get_job_metadata(self.name).get('restart', False)
 787
 788        return self.daemon.properties.get('restart', False)
 789
 790    @property
 791    def result(self) -> SuccessTuple:
 792        """
 793        Return the `SuccessTuple` when the job has terminated.
 794        """
 795        if self.is_running():
 796            return True, f"{self} is running."
 797
 798        if '_result_hook' in self.__dict__:
 799            return self._result_hook()
 800
 801        if self.executor is not None:
 802            return (
 803                self.executor.get_job_metadata(self.name)
 804                .get('result', (False, "No result available."))
 805            )
 806
 807        _result = self.daemon.properties.get('result', None)
 808        if _result is None:
 809            from meerschaum.utils.daemon.Daemon import _results
 810            return _results.get(self.daemon.daemon_id, (False, "No result available."))
 811
 812        return tuple(_result)
 813
 814    @property
 815    def sysargs(self) -> List[str]:
 816        """
 817        Return the sysargs to use for the Daemon.
 818        """
 819        if self._sysargs:
 820            return self._sysargs
 821
 822        if self.executor is not None:
 823            return self.executor.get_job_metadata(self.name).get('sysargs', [])
 824
 825        target_args = self.daemon.target_args
 826        if target_args is None:
 827            return []
 828        self._sysargs = target_args[0] if len(target_args) > 0 else []
 829        return self._sysargs
 830
 831    def get_daemon_properties(self) -> Dict[str, Any]:
 832        """
 833        Return the `properties` dictionary for the job's daemon.
 834        """
 835        remote_properties = (
 836            {}
 837            if self.executor is None
 838            else self.executor.get_job_properties(self.name)
 839        )
 840        return {
 841            **remote_properties,
 842            **self._properties_patch
 843        }
 844
 845    @property
 846    def daemon(self) -> 'Daemon':
 847        """
 848        Return the daemon which this job manages.
 849        """
 850        from meerschaum.utils.daemon import Daemon
 851        if self._daemon is not None and self.executor is None and self._sysargs:
 852            return self._daemon
 853
 854        self._daemon = Daemon(
 855            target=entry,
 856            target_args=[self._sysargs],
 857            target_kw={},
 858            daemon_id=self.name,
 859            label=shlex.join(self._sysargs),
 860            properties=self.get_daemon_properties(),
 861        )
 862        if '_rotating_log' in self.__dict__:
 863            self._daemon._rotating_log = self._rotating_log
 864
 865        if '_stdin_file' in self.__dict__:
 866            self._daemon._stdin_file = self._stdin_file
 867            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
 868
 869        return self._daemon
 870
 871    @property
 872    def began(self) -> Union[datetime, None]:
 873        """
 874        The datetime when the job began running.
 875        """
 876        if self.executor is not None:
 877            began_str = self.executor.get_job_began(self.name)
 878            if began_str is None:
 879                return None
 880            return (
 881                datetime.fromisoformat(began_str)
 882                .astimezone(timezone.utc)
 883                .replace(tzinfo=None)
 884            )
 885
 886        began_str = self.daemon.properties.get('process', {}).get('began', None)
 887        if began_str is None:
 888            return None
 889
 890        return datetime.fromisoformat(began_str)
 891
 892    @property
 893    def ended(self) -> Union[datetime, None]:
 894        """
 895        The datetime when the job stopped running.
 896        """
 897        if self.executor is not None:
 898            ended_str = self.executor.get_job_ended(self.name)
 899            if ended_str is None:
 900                return None
 901            return (
 902                datetime.fromisoformat(ended_str)
 903                .astimezone(timezone.utc)
 904                .replace(tzinfo=None)
 905            )
 906
 907        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
 908        if ended_str is None:
 909            return None
 910
 911        return datetime.fromisoformat(ended_str)
 912
 913    @property
 914    def paused(self) -> Union[datetime, None]:
 915        """
 916        The datetime when the job was suspended while running.
 917        """
 918        if self.executor is not None:
 919            paused_str = self.executor.get_job_paused(self.name)
 920            if paused_str is None:
 921                return None
 922            return (
 923                datetime.fromisoformat(paused_str)
 924                .astimezone(timezone.utc)
 925                .replace(tzinfo=None)
 926            )
 927
 928        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
 929        if paused_str is None:
 930            return None
 931
 932        return datetime.fromisoformat(paused_str)
 933
 934    @property
 935    def stop_time(self) -> Union[datetime, None]:
 936        """
 937        Return the timestamp when the job was manually stopped.
 938        """
 939        if self.executor is not None:
 940            return self.executor.get_job_stop_time(self.name)
 941
 942        if not self.daemon.stop_path.exists():
 943            return None
 944
 945        stop_data = self.daemon._read_stop_file()
 946        if not stop_data:
 947            return None
 948
 949        stop_time_str = stop_data.get('stop_time', None)
 950        if not stop_time_str:
 951            warn(f"Could not read stop time for {self}.")
 952            return None
 953
 954        return datetime.fromisoformat(stop_time_str)
 955
 956    @property
 957    def hidden(self) -> bool:
 958        """
 959        Return a bool indicating whether this job should be displayed.
 960        """
 961        return (
 962            self.name.startswith('_')
 963            or self.name.startswith('.')
 964            or self._is_externally_managed
 965        )
 966
 967    def check_restart(self) -> SuccessTuple:
 968        """
 969        If `restart` is `True` and the daemon is not running,
 970        restart the job.
 971        Do not restart if the job was manually stopped.
 972        """
 973        if self.is_running():
 974            return True, f"{self} is running."
 975
 976        if not self.restart:
 977            return True, f"{self} does not need to be restarted."
 978
 979        if self.stop_time is not None:
 980            return True, f"{self} was manually stopped."
 981
 982        return self.start()
 983
 984    @property
 985    def label(self) -> str:
 986        """
 987        Return the job's Daemon label (joined sysargs).
 988        """
 989        from meerschaum._internal.arguments import compress_pipeline_sysargs
 990        sysargs = compress_pipeline_sysargs(self.sysargs)
 991        return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip()
 992
 993    @property
 994    def _externally_managed_file(self) -> pathlib.Path:
 995        """
 996        Return the path to the externally managed file.
 997        """
 998        return self.daemon.path / '.externally-managed'
 999
1000    def _set_externally_managed(self):
1001        """
1002        Set this job as externally managed.
1003        """
1004        self._externally_managed = True
1005        try:
1006            self._externally_managed_file.parent.mkdir(exist_ok=True, parents=True)
1007            self._externally_managed_file.touch()
1008        except Exception as e:
1009            warn(e)
1010
1011    @property
1012    def _is_externally_managed(self) -> bool:
1013        """
1014        Return whether this job is externally managed.
1015        """
1016        return self.executor_keys in (None, 'local') and (
1017            self._externally_managed or self._externally_managed_file.exists()
1018        )
1019
1020    @property
1021    def env(self) -> Dict[str, str]:
1022        """
1023        Return the environment variables to set for the job's process.
1024        """
1025        if '_env' in self.__dict__:
1026            return self.__dict__['_env']
1027
1028        _env = self.daemon.properties.get('env', {})
1029        default_env = {
1030            'PYTHONUNBUFFERED': '1',
1031            'LINES': str(get_config('jobs', 'terminal', 'lines')),
1032            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
1033            STATIC_CONFIG['environment']['noninteractive']: 'true',
1034        }
1035        self._env = {**default_env, **_env}
1036        return self._env
1037
1038    @property
1039    def delete_after_completion(self) -> bool:
1040        """
1041        Return whether this job is configured to delete itself after completion.
1042        """
1043        if '_delete_after_completion' in self.__dict__:
1044            return self.__dict__.get('_delete_after_completion', False)
1045
1046        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
1047        return self._delete_after_completion
1048
1049    def __str__(self) -> str:
1050        sysargs = self.sysargs
1051        sysargs_str = shlex.join(sysargs) if sysargs else ''
1052        job_str = f'Job("{self.name}"'
1053        if sysargs_str:
1054            job_str += f', "{sysargs_str}"'
1055
1056        job_str += ')'
1057        return job_str
1058
1059    def __repr__(self) -> str:
1060        return str(self)
1061
1062    def __hash__(self) -> int:
1063        return hash(self.name)

Manage a meerschaum.utils.daemon.Daemon, locally or remotely via the API.

Job( name: str, sysargs: Union[List[str], str, NoneType] = None, env: Optional[Dict[str, str]] = None, executor_keys: Optional[str] = None, delete_after_completion: bool = False, refresh_seconds: Union[int, float, NoneType] = None, _properties: Optional[Dict[str, Any]] = None, _rotating_log=None, _stdin_file=None, _status_hook: Optional[Callable[[], str]] = None, _result_hook: Optional[Callable[[], Tuple[bool, str]]] = None, _externally_managed: bool = False)
 75    def __init__(
 76        self,
 77        name: str,
 78        sysargs: Union[List[str], str, None] = None,
 79        env: Optional[Dict[str, str]] = None,
 80        executor_keys: Optional[str] = None,
 81        delete_after_completion: bool = False,
 82        refresh_seconds: Union[int, float, None] = None,
 83        _properties: Optional[Dict[str, Any]] = None,
 84        _rotating_log=None,
 85        _stdin_file=None,
 86        _status_hook: Optional[Callable[[], str]] = None,
 87        _result_hook: Optional[Callable[[], SuccessTuple]] = None,
 88        _externally_managed: bool = False,
 89    ):
 90        """
 91        Create a new job to manage a `meerschaum.utils.daemon.Daemon`.
 92
 93        Parameters
 94        ----------
 95        name: str
 96            The name of the job to be created.
 97            This will also be used as the Daemon ID.
 98
 99        sysargs: Union[List[str], str, None], default None
100            The sysargs of the command to be executed, e.g. 'start api'.
101
102        env: Optional[Dict[str, str]], default None
103            If provided, set these environment variables in the job's process.
104
105        executor_keys: Optional[str], default None
106            If provided, execute the job remotely on an API instance, e.g. 'api:main'.
107
108        delete_after_completion: bool, default False
109            If `True`, delete this job when it has finished executing.
110
111        refresh_seconds: Union[int, float, None], default None
112            The number of seconds to sleep between refreshes.
113            Defaults to the configured value `system.cli.refresh_seconds`.
114
115        _properties: Optional[Dict[str, Any]], default None
116            If provided, use this to patch the daemon's properties.
117        """
118        from meerschaum.utils.daemon import Daemon
119        for char in BANNED_CHARS:
120            if char in name:
121                raise ValueError(f"Invalid name: ({char}) is not allowed.")
122
123        if isinstance(sysargs, str):
124            sysargs = shlex.split(sysargs)
125
126        and_key = STATIC_CONFIG['system']['arguments']['and_key']
127        escaped_and_key = STATIC_CONFIG['system']['arguments']['escaped_and_key']
128        if sysargs:
129            sysargs = [
130                (arg if arg != escaped_and_key else and_key)
131                for arg in sysargs
132            ]
133
134        ### NOTE: 'local' and 'systemd' executors are being coalesced.
135        if executor_keys is None:
136            from meerschaum.jobs import get_executor_keys_from_context
137            executor_keys = get_executor_keys_from_context()
138
139        self.executor_keys = executor_keys
140        self.name = name
141        self.refresh_seconds = (
142            refresh_seconds
143            if refresh_seconds is not None
144            else mrsm.get_config('system', 'cli', 'refresh_seconds')
145        )
146        try:
147            self._daemon = (
148                Daemon(daemon_id=name)
149                if executor_keys == 'local'
150                else None
151            )
152        except Exception:
153            self._daemon = None
154
155        ### Handle any injected dependencies.
156        if _rotating_log is not None:
157            self._rotating_log = _rotating_log
158            if self._daemon is not None:
159                self._daemon._rotating_log = _rotating_log
160
161        if _stdin_file is not None:
162            self._stdin_file = _stdin_file
163            if self._daemon is not None:
164                self._daemon._stdin_file = _stdin_file
165                self._daemon._blocking_stdin_file_path = _stdin_file.blocking_file_path
166
167        if _status_hook is not None:
168            self._status_hook = _status_hook
169
170        if _result_hook is not None:
171            self._result_hook = _result_hook
172
173        self._externally_managed = _externally_managed
174        self._properties_patch = _properties or {}
175        if _externally_managed:
176            self._properties_patch.update({'externally_managed': _externally_managed})
177
178        if env:
179            self._properties_patch.update({'env': env})
180
181        if delete_after_completion:
182            self._properties_patch.update({'delete_after_completion': delete_after_completion})
183
184        daemon_sysargs = (
185            self._daemon.properties.get('target', {}).get('args', [None])[0]
186            if self._daemon is not None
187            else None
188        )
189
190        if daemon_sysargs and sysargs and daemon_sysargs != sysargs:
191            warn("Given sysargs differ from existing sysargs.")
192
193        self._sysargs = [
194            arg
195            for arg in (daemon_sysargs or sysargs or [])
196            if arg not in ('-d', '--daemon')
197        ]
198        for restart_flag in RESTART_FLAGS:
199            if restart_flag in self._sysargs:
200                self._properties_patch.update({'restart': True})
201                break

Create a new job to manage a meerschaum.utils.daemon.Daemon.

Parameters
  • name (str): The name of the job to be created. This will also be used as the Daemon ID.
  • sysargs (Union[List[str], str, None], default None): The sysargs of the command to be executed, e.g. 'start api'.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the job's process.
  • executor_keys (Optional[str], default None): If provided, execute the job remotely on an API instance, e.g. 'api:main'.
  • delete_after_completion (bool, default False): If True, delete this job when it has finished executing.
  • refresh_seconds (Union[int, float, None], default None): The number of seconds to sleep between refreshes. Defaults to the configured value system.cli.refresh_seconds.
  • _properties (Optional[Dict[str, Any]], default None): If provided, use this to patch the daemon's properties.
executor_keys
name
refresh_seconds
@staticmethod
def from_pid( pid: int, executor_keys: Optional[str] = None) -> Job:
203    @staticmethod
204    def from_pid(pid: int, executor_keys: Optional[str] = None) -> Job:
205        """
206        Build a `Job` from the PID of a running Meerschaum process.
207
208        Parameters
209        ----------
210        pid: int
211            The PID of the process.
212
213        executor_keys: Optional[str], default None
214            The executor keys to assign to the job.
215        """
216        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
217
218        psutil = mrsm.attempt_import('psutil')
219        try:
220            process = psutil.Process(pid)
221        except psutil.NoSuchProcess as e:
222            warn(f"Process with PID {pid} does not exist.", stack=False)
223            raise e
224
225        command_args = process.cmdline()
226        is_daemon = command_args[1] == '-c'
227
228        if is_daemon:
229            daemon_id = command_args[-1].split('daemon_id=')[-1].split(')')[0].replace("'", '')
230            root_dir = process.environ().get(STATIC_CONFIG['environment']['root'], None)
231            if root_dir is None:
232                from meerschaum.config.paths import ROOT_DIR_PATH
233                root_dir = ROOT_DIR_PATH
234            else:
235                root_dir = pathlib.Path(root_dir)
236            jobs_dir = root_dir / DAEMON_RESOURCES_PATH.name
237            daemon_dir = jobs_dir / daemon_id
238            pid_file = daemon_dir / 'process.pid'
239
240            if pid_file.exists():
241                with open(pid_file, 'r', encoding='utf-8') as f:
242                    daemon_pid = int(f.read())
243
244                if pid != daemon_pid:
245                    raise EnvironmentError(f"Differing PIDs: {pid=}, {daemon_pid=}")
246            else:
247                raise EnvironmentError(f"Is job '{daemon_id}' running?")
248
249            return Job(daemon_id, executor_keys=executor_keys)
250
251        from meerschaum._internal.arguments._parse_arguments import parse_arguments
252        from meerschaum.utils.daemon import get_new_daemon_name
253
254        mrsm_ix = 0
255        for i, arg in enumerate(command_args):
256            if 'mrsm' in arg or 'meerschaum' in arg.lower():
257                mrsm_ix = i
258                break
259
260        sysargs = command_args[mrsm_ix+1:]
261        kwargs = parse_arguments(sysargs)
262        name = kwargs.get('name', get_new_daemon_name())
263        return Job(name, sysargs, executor_keys=executor_keys)

Build a Job from the PID of a running Meerschaum process.

Parameters
  • pid (int): The PID of the process.
  • executor_keys (Optional[str], default None): The executor keys to assign to the job.
def start(self, debug: bool = False) -> Tuple[bool, str]:
265    def start(self, debug: bool = False) -> SuccessTuple:
266        """
267        Start the job's daemon.
268        """
269        if self.executor is not None:
270            if not self.exists(debug=debug):
271                return self.executor.create_job(
272                    self.name,
273                    self.sysargs,
274                    properties=self.daemon.properties,
275                    debug=debug,
276                )
277            return self.executor.start_job(self.name, debug=debug)
278
279        if self.is_running():
280            return True, f"{self} is already running."
281
282        success, msg = self.daemon.run(
283            keep_daemon_output=(not self.delete_after_completion),
284            allow_dirty_run=True,
285        )
286        if not success:
287            return success, msg
288
289        return success, f"Started {self}."

Start the job's daemon.

def stop( self, timeout_seconds: Union[int, float, NoneType] = None, debug: bool = False) -> Tuple[bool, str]:
291    def stop(
292        self,
293        timeout_seconds: Union[int, float, None] = None,
294        debug: bool = False,
295    ) -> SuccessTuple:
296        """
297        Stop the job's daemon.
298        """
299        if self.executor is not None:
300            return self.executor.stop_job(self.name, debug=debug)
301
302        if self.daemon.status == 'stopped':
303            if not self.restart:
304                return True, f"{self} is not running."
305            elif self.stop_time is not None:
306                return True, f"{self} will not restart until manually started."
307
308        quit_success, quit_msg = self.daemon.quit(timeout=timeout_seconds)
309        if quit_success:
310            return quit_success, f"Stopped {self}."
311
312        warn(
313            f"Failed to gracefully quit {self}.",
314            stack=False,
315        )
316        kill_success, kill_msg = self.daemon.kill(timeout=timeout_seconds)
317        if not kill_success:
318            return kill_success, kill_msg
319
320        return kill_success, f"Killed {self}."

Stop the job's daemon.

def pause( self, timeout_seconds: Union[int, float, NoneType] = None, debug: bool = False) -> Tuple[bool, str]:
322    def pause(
323        self,
324        timeout_seconds: Union[int, float, None] = None,
325        debug: bool = False,
326    ) -> SuccessTuple:
327        """
328        Pause the job's daemon.
329        """
330        if self.executor is not None:
331            return self.executor.pause_job(self.name, debug=debug)
332
333        pause_success, pause_msg = self.daemon.pause(timeout=timeout_seconds)
334        if not pause_success:
335            return pause_success, pause_msg
336
337        return pause_success, f"Paused {self}."

Pause the job's daemon.

def delete(self, debug: bool = False) -> Tuple[bool, str]:
339    def delete(self, debug: bool = False) -> SuccessTuple:
340        """
341        Delete the job and its daemon.
342        """
343        if self.executor is not None:
344            return self.executor.delete_job(self.name, debug=debug)
345
346        if self.is_running():
347            stop_success, stop_msg = self.stop()
348            if not stop_success:
349                return stop_success, stop_msg
350
351        cleanup_success, cleanup_msg = self.daemon.cleanup()
352        if not cleanup_success:
353            return cleanup_success, cleanup_msg
354
355        _ = self.daemon._properties.pop('result', None)
356        return cleanup_success, f"Deleted {self}."

Delete the job and its daemon.

def is_running(self) -> bool:
358    def is_running(self) -> bool:
359        """
360        Determine whether the job's daemon is running.
361        """
362        return self.status == 'running'

Determine whether the job's daemon is running.

def exists(self, debug: bool = False) -> bool:
364    def exists(self, debug: bool = False) -> bool:
365        """
366        Determine whether the job exists.
367        """
368        if self.executor is not None:
369            return self.executor.get_job_exists(self.name, debug=debug)
370
371        return self.daemon.path.exists()

Determine whether the job exists.

def get_logs(self) -> Optional[str]:
373    def get_logs(self) -> Union[str, None]:
374        """
375        Return the output text of the job's daemon.
376        """
377        if self.executor is not None:
378            return self.executor.get_logs(self.name)
379
380        return self.daemon.log_text

Return the output text of the job's daemon.

def monitor_logs( self, callback_function: Callable[[str], NoneType] = <function _default_stdout_callback>, input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False, _logs_path: Optional[pathlib.Path] = None, _log=None, _stdin_file=None, _wait_if_stopped: bool = True):
382    def monitor_logs(
383        self,
384        callback_function: Callable[[str], None] = _default_stdout_callback,
385        input_callback_function: Optional[Callable[[], str]] = None,
386        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
387        stop_event: Optional[asyncio.Event] = None,
388        stop_on_exit: bool = False,
389        strip_timestamps: bool = False,
390        accept_input: bool = True,
391        debug: bool = False,
392        _logs_path: Optional[pathlib.Path] = None,
393        _log=None,
394        _stdin_file=None,
395        _wait_if_stopped: bool = True,
396    ):
397        """
398        Monitor the job's log files and execute a callback on new lines.
399
400        Parameters
401        ----------
402        callback_function: Callable[[str], None], default partial(print, end='')
403            The callback to execute as new data comes in.
404            Defaults to printing the output directly to `stdout`.
405
406        input_callback_function: Optional[Callable[[], str]], default None
407            If provided, execute this callback when the daemon is blocking on stdin.
408            Defaults to `sys.stdin.readline()`.
409
410        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
411            If provided, execute this callback when the daemon stops.
412            The job's SuccessTuple will be passed to the callback.
413
414        stop_event: Optional[asyncio.Event], default None
415            If provided, stop monitoring when this event is set.
416            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
417            from within `callback_function` to stop monitoring.
418
419        stop_on_exit: bool, default False
420            If `True`, stop monitoring when the job stops.
421
422        strip_timestamps: bool, default False
423            If `True`, remove leading timestamps from lines.
424
425        accept_input: bool, default True
426            If `True`, accept input when the daemon blocks on stdin.
427        """
428        if self.executor is not None:
429            self.executor.monitor_logs(
430                self.name,
431                callback_function,
432                input_callback_function=input_callback_function,
433                stop_callback_function=stop_callback_function,
434                stop_on_exit=stop_on_exit,
435                accept_input=accept_input,
436                strip_timestamps=strip_timestamps,
437                debug=debug,
438            )
439            return
440
441        monitor_logs_coroutine = self.monitor_logs_async(
442            callback_function=callback_function,
443            input_callback_function=input_callback_function,
444            stop_callback_function=stop_callback_function,
445            stop_event=stop_event,
446            stop_on_exit=stop_on_exit,
447            strip_timestamps=strip_timestamps,
448            accept_input=accept_input,
449            debug=debug,
450            _logs_path=_logs_path,
451            _log=_log,
452            _stdin_file=_stdin_file,
453            _wait_if_stopped=_wait_if_stopped,
454        )
455        return asyncio.run(monitor_logs_coroutine)

Monitor the job's log files and execute a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default partial(print, end='')): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
async def monitor_logs_async( self, callback_function: Callable[[str], NoneType] = <function _default_stdout_callback>, input_callback_function: Optional[Callable[[], str]] = None, stop_callback_function: Optional[Callable[[Tuple[bool, str]], NoneType]] = None, stop_event: Optional[asyncio.locks.Event] = None, stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False, _logs_path: Optional[pathlib.Path] = None, _log=None, _stdin_file=None, _wait_if_stopped: bool = True):
457    async def monitor_logs_async(
458        self,
459        callback_function: Callable[[str], None] = _default_stdout_callback,
460        input_callback_function: Optional[Callable[[], str]] = None,
461        stop_callback_function: Optional[Callable[[SuccessTuple], None]] = None,
462        stop_event: Optional[asyncio.Event] = None,
463        stop_on_exit: bool = False,
464        strip_timestamps: bool = False,
465        accept_input: bool = True,
466        debug: bool = False,
467        _logs_path: Optional[pathlib.Path] = None,
468        _log=None,
469        _stdin_file=None,
470        _wait_if_stopped: bool = True,
471    ):
472        """
473        Monitor the job's log files and await a callback on new lines.
474
475        Parameters
476        ----------
477        callback_function: Callable[[str], None], default _default_stdout_callback
478            The callback to execute as new data comes in.
479            Defaults to printing the output directly to `stdout`.
480
481        input_callback_function: Optional[Callable[[], str]], default None
482            If provided, execute this callback when the daemon is blocking on stdin.
483            Defaults to `sys.stdin.readline()`.
484
485        stop_callback_function: Optional[Callable[[SuccessTuple]], str], default None
486            If provided, execute this callback when the daemon stops.
487            The job's SuccessTuple will be passed to the callback.
488
489        stop_event: Optional[asyncio.Event], default None
490            If provided, stop monitoring when this event is set.
491            You may instead raise `meerschaum.jobs.StopMonitoringLogs`
492            from within `callback_function` to stop monitoring.
493
494        stop_on_exit: bool, default False
495            If `True`, stop monitoring when the job stops.
496
497        strip_timestamps: bool, default False
498            If `True`, remove leading timestamps from lines.
499
500        accept_input: bool, default True
501            If `True`, accept input when the daemon blocks on stdin.
502        """
503        from meerschaum.utils.prompt import prompt
504
505        def default_input_callback_function():
506            prompt_kwargs = self.get_prompt_kwargs(debug=debug)
507            if prompt_kwargs:
508                answer = prompt(**prompt_kwargs)
509                return answer + '\n'
510            return sys.stdin.readline()
511
512        if input_callback_function is None:
513            input_callback_function = default_input_callback_function
514
515        if self.executor is not None:
516            await self.executor.monitor_logs_async(
517                self.name,
518                callback_function,
519                input_callback_function=input_callback_function,
520                stop_callback_function=stop_callback_function,
521                stop_on_exit=stop_on_exit,
522                strip_timestamps=strip_timestamps,
523                accept_input=accept_input,
524                debug=debug,
525            )
526            return
527
528        from meerschaum.utils.formatting._jobs import strip_timestamp_from_line
529
530        events = {
531            'user': stop_event,
532            'stopped': asyncio.Event(),
533            'stop_token': asyncio.Event(),
534            'stop_exception': asyncio.Event(),
535            'stopped_timeout': asyncio.Event(),
536        }
537        combined_event = asyncio.Event()
538        emitted_text = False
539        stdin_file = _stdin_file if _stdin_file is not None else self.daemon.stdin_file
540
541        async def check_job_status():
542            if not stop_on_exit:
543                return
544
545            nonlocal emitted_text
546
547            sleep_time = 0.1
548            while sleep_time < 0.2:
549                if self.status == 'stopped':
550                    if not emitted_text and _wait_if_stopped:
551                        await asyncio.sleep(sleep_time)
552                        sleep_time = round(sleep_time * 1.1, 3)
553                        continue
554
555                    if stop_callback_function is not None:
556                        try:
557                            if asyncio.iscoroutinefunction(stop_callback_function):
558                                await stop_callback_function(self.result)
559                            else:
560                                stop_callback_function(self.result)
561                        except asyncio.exceptions.CancelledError:
562                            break
563                        except Exception:
564                            warn(traceback.format_exc())
565
566                    if stop_on_exit:
567                        events['stopped'].set()
568
569                    break
570                await asyncio.sleep(0.1)
571
572            events['stopped_timeout'].set()
573
574        async def check_blocking_on_input():
575            while True:
576                if not emitted_text or not self.is_blocking_on_stdin():
577                    try:
578                        await asyncio.sleep(self.refresh_seconds)
579                    except asyncio.exceptions.CancelledError:
580                        break
581                    continue
582
583                if not self.is_running():
584                    break
585
586                await emit_latest_lines()
587
588                try:
589                    print('', end='', flush=True)
590                    if asyncio.iscoroutinefunction(input_callback_function):
591                        data = await input_callback_function()
592                    else:
593                        loop = asyncio.get_running_loop()
594                        data = await loop.run_in_executor(None, input_callback_function)
595                except KeyboardInterrupt:
596                    break
597                #  if not data.endswith('\n'):
598                    #  data += '\n'
599
600                stdin_file.write(data)
601                await asyncio.sleep(self.refresh_seconds)
602
603        async def combine_events():
604            event_tasks = [
605                asyncio.create_task(event.wait())
606                for event in events.values()
607                if event is not None
608            ]
609            if not event_tasks:
610                return
611
612            try:
613                done, pending = await asyncio.wait(
614                    event_tasks,
615                    return_when=asyncio.FIRST_COMPLETED,
616                )
617                for task in pending:
618                    task.cancel()
619            except asyncio.exceptions.CancelledError:
620                pass
621            finally:
622                combined_event.set()
623
624        check_job_status_task = asyncio.create_task(check_job_status())
625        check_blocking_on_input_task = asyncio.create_task(check_blocking_on_input())
626        combine_events_task = asyncio.create_task(combine_events())
627
628        log = _log if _log is not None else self.daemon.rotating_log
629        lines_to_show = (
630            self.daemon.properties.get(
631                'logs', {}
632            ).get(
633                'lines_to_show', get_config('jobs', 'logs', 'lines_to_show')
634            )
635        )
636
637        async def emit_latest_lines():
638            nonlocal emitted_text
639            nonlocal stop_event
640            lines = log.readlines()
641            for line in lines[(-1 * lines_to_show):]:
642                if stop_event is not None and stop_event.is_set():
643                    return
644
645                line_stripped_extra = strip_timestamp_from_line(line.strip())
646                line_stripped = strip_timestamp_from_line(line)
647
648                if line_stripped_extra == STOP_TOKEN:
649                    events['stop_token'].set()
650                    return
651
652                if line_stripped_extra == CLEAR_TOKEN:
653                    clear_screen(debug=debug)
654                    continue
655
656                if line_stripped_extra == FLUSH_TOKEN.strip():
657                    line_stripped = ''
658                    line = ''
659
660                if strip_timestamps:
661                    line = line_stripped
662
663                try:
664                    if asyncio.iscoroutinefunction(callback_function):
665                        await callback_function(line)
666                    else:
667                        callback_function(line)
668                    emitted_text = True
669                except StopMonitoringLogs:
670                    events['stop_exception'].set()
671                    return
672                except Exception:
673                    warn(f"Error in logs callback:\n{traceback.format_exc()}")
674
675        await emit_latest_lines()
676
677        tasks = (
678            [check_job_status_task]
679            + ([check_blocking_on_input_task] if accept_input else [])
680            + [combine_events_task]
681        )
682        try:
683            _ = asyncio.gather(*tasks, return_exceptions=True)
684        except asyncio.exceptions.CancelledError:
685            raise
686        except Exception:
687            warn(f"Failed to run async checks:\n{traceback.format_exc()}")
688
689        watchfiles = mrsm.attempt_import('watchfiles', lazy=False)
690        dir_path_to_monitor = (
691            _logs_path
692            or (log.file_path.parent if log else None)
693            or LOGS_RESOURCES_PATH
694        )
695        async for changes in watchfiles.awatch(
696            dir_path_to_monitor,
697            stop_event=combined_event,
698        ):
699            for change in changes:
700                file_path_str = change[1]
701                file_path = pathlib.Path(file_path_str)
702                latest_subfile_path = log.get_latest_subfile_path()
703                if latest_subfile_path != file_path:
704                    continue
705
706                await emit_latest_lines()
707
708        await emit_latest_lines()

Monitor the job's log files and await a callback on new lines.

Parameters
  • callback_function (Callable[[str], None], default _default_stdout_callback): The callback to execute as new data comes in. Defaults to printing the output directly to stdout.
  • input_callback_function (Optional[Callable[[], str]], default None): If provided, execute this callback when the daemon is blocking on stdin. Defaults to sys.stdin.readline().
  • stop_callback_function (Optional[Callable[[SuccessTuple]], str], default None): If provided, execute this callback when the daemon stops. The job's SuccessTuple will be passed to the callback.
  • stop_event (Optional[asyncio.Event], default None): If provided, stop monitoring when this event is set. You may instead raise meerschaum.jobs.StopMonitoringLogs from within callback_function to stop monitoring.
  • stop_on_exit (bool, default False): If True, stop monitoring when the job stops.
  • strip_timestamps (bool, default False): If True, remove leading timestamps from lines.
  • accept_input (bool, default True): If True, accept input when the daemon blocks on stdin.
def is_blocking_on_stdin(self, debug: bool = False) -> bool:
710    def is_blocking_on_stdin(self, debug: bool = False) -> bool:
711        """
712        Return whether a job's daemon is blocking on stdin.
713        """
714        if self.executor is not None:
715            return self.executor.get_job_is_blocking_on_stdin(self.name, debug=debug)
716
717        return self.is_running() and self.daemon.blocking_stdin_file_path.exists()

Return whether a job's daemon is blocking on stdin.

def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]:
719    def get_prompt_kwargs(self, debug: bool = False) -> Dict[str, Any]:
720        """
721        Return the kwargs to the blocking `prompt()`, if available.
722        """
723        if self.executor is not None:
724            return self.executor.get_job_prompt_kwargs(self.name, debug=debug)
725
726        if not self.daemon.prompt_kwargs_file_path.exists():
727            return {}
728
729        try:
730            with open(self.daemon.prompt_kwargs_file_path, 'r', encoding='utf-8') as f:
731                prompt_kwargs = json.load(f)
732
733            return prompt_kwargs
734        
735        except Exception:
736            import traceback
737            traceback.print_exc()
738            return {}

Return the kwargs to the blocking prompt(), if available.

def write_stdin(self, data):
740    def write_stdin(self, data):
741        """
742        Write to a job's daemon's `stdin`.
743        """
744        self.daemon.stdin_file.write(data)

Write to a job's daemon's stdin.

executor: Optional[Executor]
746    @property
747    def executor(self) -> Union[Executor, None]:
748        """
749        If the job is remote, return the connector to the remote API instance.
750        """
751        return (
752            mrsm.get_connector(self.executor_keys)
753            if self.executor_keys != 'local'
754            else None
755        )

If the job is remote, return the connector to the remote API instance.

status: str
757    @property
758    def status(self) -> str:
759        """
760        Return the running status of the job's daemon.
761        """
762        if '_status_hook' in self.__dict__:
763            return self._status_hook()
764
765        if self.executor is not None:
766            return self.executor.get_job_status(self.name)
767
768        return self.daemon.status

Return the running status of the job's daemon.

pid: Optional[int]
770    @property
771    def pid(self) -> Union[int, None]:
772        """
773        Return the PID of the job's dameon.
774        """
775        if self.executor is not None:
776            return self.executor.get_job_metadata(self.name).get('daemon', {}).get('pid', None)
777
778        return self.daemon.pid

Return the PID of the job's dameon.

restart: bool
780    @property
781    def restart(self) -> bool:
782        """
783        Return whether to restart a stopped job.
784        """
785        if self.executor is not None:
786            return self.executor.get_job_metadata(self.name).get('restart', False)
787
788        return self.daemon.properties.get('restart', False)

Return whether to restart a stopped job.

result: Tuple[bool, str]
790    @property
791    def result(self) -> SuccessTuple:
792        """
793        Return the `SuccessTuple` when the job has terminated.
794        """
795        if self.is_running():
796            return True, f"{self} is running."
797
798        if '_result_hook' in self.__dict__:
799            return self._result_hook()
800
801        if self.executor is not None:
802            return (
803                self.executor.get_job_metadata(self.name)
804                .get('result', (False, "No result available."))
805            )
806
807        _result = self.daemon.properties.get('result', None)
808        if _result is None:
809            from meerschaum.utils.daemon.Daemon import _results
810            return _results.get(self.daemon.daemon_id, (False, "No result available."))
811
812        return tuple(_result)

Return the SuccessTuple when the job has terminated.

sysargs: List[str]
814    @property
815    def sysargs(self) -> List[str]:
816        """
817        Return the sysargs to use for the Daemon.
818        """
819        if self._sysargs:
820            return self._sysargs
821
822        if self.executor is not None:
823            return self.executor.get_job_metadata(self.name).get('sysargs', [])
824
825        target_args = self.daemon.target_args
826        if target_args is None:
827            return []
828        self._sysargs = target_args[0] if len(target_args) > 0 else []
829        return self._sysargs

Return the sysargs to use for the Daemon.

def get_daemon_properties(self) -> Dict[str, Any]:
831    def get_daemon_properties(self) -> Dict[str, Any]:
832        """
833        Return the `properties` dictionary for the job's daemon.
834        """
835        remote_properties = (
836            {}
837            if self.executor is None
838            else self.executor.get_job_properties(self.name)
839        )
840        return {
841            **remote_properties,
842            **self._properties_patch
843        }

Return the properties dictionary for the job's daemon.

daemon: "'Daemon'"
845    @property
846    def daemon(self) -> 'Daemon':
847        """
848        Return the daemon which this job manages.
849        """
850        from meerschaum.utils.daemon import Daemon
851        if self._daemon is not None and self.executor is None and self._sysargs:
852            return self._daemon
853
854        self._daemon = Daemon(
855            target=entry,
856            target_args=[self._sysargs],
857            target_kw={},
858            daemon_id=self.name,
859            label=shlex.join(self._sysargs),
860            properties=self.get_daemon_properties(),
861        )
862        if '_rotating_log' in self.__dict__:
863            self._daemon._rotating_log = self._rotating_log
864
865        if '_stdin_file' in self.__dict__:
866            self._daemon._stdin_file = self._stdin_file
867            self._daemon._blocking_stdin_file_path = self._stdin_file.blocking_file_path
868
869        return self._daemon

Return the daemon which this job manages.

began: Optional[datetime.datetime]
871    @property
872    def began(self) -> Union[datetime, None]:
873        """
874        The datetime when the job began running.
875        """
876        if self.executor is not None:
877            began_str = self.executor.get_job_began(self.name)
878            if began_str is None:
879                return None
880            return (
881                datetime.fromisoformat(began_str)
882                .astimezone(timezone.utc)
883                .replace(tzinfo=None)
884            )
885
886        began_str = self.daemon.properties.get('process', {}).get('began', None)
887        if began_str is None:
888            return None
889
890        return datetime.fromisoformat(began_str)

The datetime when the job began running.

ended: Optional[datetime.datetime]
892    @property
893    def ended(self) -> Union[datetime, None]:
894        """
895        The datetime when the job stopped running.
896        """
897        if self.executor is not None:
898            ended_str = self.executor.get_job_ended(self.name)
899            if ended_str is None:
900                return None
901            return (
902                datetime.fromisoformat(ended_str)
903                .astimezone(timezone.utc)
904                .replace(tzinfo=None)
905            )
906
907        ended_str = self.daemon.properties.get('process', {}).get('ended', None)
908        if ended_str is None:
909            return None
910
911        return datetime.fromisoformat(ended_str)

The datetime when the job stopped running.

paused: Optional[datetime.datetime]
913    @property
914    def paused(self) -> Union[datetime, None]:
915        """
916        The datetime when the job was suspended while running.
917        """
918        if self.executor is not None:
919            paused_str = self.executor.get_job_paused(self.name)
920            if paused_str is None:
921                return None
922            return (
923                datetime.fromisoformat(paused_str)
924                .astimezone(timezone.utc)
925                .replace(tzinfo=None)
926            )
927
928        paused_str = self.daemon.properties.get('process', {}).get('paused', None)
929        if paused_str is None:
930            return None
931
932        return datetime.fromisoformat(paused_str)

The datetime when the job was suspended while running.

stop_time: Optional[datetime.datetime]
934    @property
935    def stop_time(self) -> Union[datetime, None]:
936        """
937        Return the timestamp when the job was manually stopped.
938        """
939        if self.executor is not None:
940            return self.executor.get_job_stop_time(self.name)
941
942        if not self.daemon.stop_path.exists():
943            return None
944
945        stop_data = self.daemon._read_stop_file()
946        if not stop_data:
947            return None
948
949        stop_time_str = stop_data.get('stop_time', None)
950        if not stop_time_str:
951            warn(f"Could not read stop time for {self}.")
952            return None
953
954        return datetime.fromisoformat(stop_time_str)

Return the timestamp when the job was manually stopped.

hidden: bool
956    @property
957    def hidden(self) -> bool:
958        """
959        Return a bool indicating whether this job should be displayed.
960        """
961        return (
962            self.name.startswith('_')
963            or self.name.startswith('.')
964            or self._is_externally_managed
965        )

Return a bool indicating whether this job should be displayed.

def check_restart(self) -> Tuple[bool, str]:
967    def check_restart(self) -> SuccessTuple:
968        """
969        If `restart` is `True` and the daemon is not running,
970        restart the job.
971        Do not restart if the job was manually stopped.
972        """
973        if self.is_running():
974            return True, f"{self} is running."
975
976        if not self.restart:
977            return True, f"{self} does not need to be restarted."
978
979        if self.stop_time is not None:
980            return True, f"{self} was manually stopped."
981
982        return self.start()

If restart is True and the daemon is not running, restart the job. Do not restart if the job was manually stopped.

label: str
984    @property
985    def label(self) -> str:
986        """
987        Return the job's Daemon label (joined sysargs).
988        """
989        from meerschaum._internal.arguments import compress_pipeline_sysargs
990        sysargs = compress_pipeline_sysargs(self.sysargs)
991        return shlex.join(sysargs).replace(' + ', '\n+ ').replace(' : ', '\n: ').lstrip().rstrip()

Return the job's Daemon label (joined sysargs).

env: Dict[str, str]
1020    @property
1021    def env(self) -> Dict[str, str]:
1022        """
1023        Return the environment variables to set for the job's process.
1024        """
1025        if '_env' in self.__dict__:
1026            return self.__dict__['_env']
1027
1028        _env = self.daemon.properties.get('env', {})
1029        default_env = {
1030            'PYTHONUNBUFFERED': '1',
1031            'LINES': str(get_config('jobs', 'terminal', 'lines')),
1032            'COLUMNS': str(get_config('jobs', 'terminal', 'columns')),
1033            STATIC_CONFIG['environment']['noninteractive']: 'true',
1034        }
1035        self._env = {**default_env, **_env}
1036        return self._env

Return the environment variables to set for the job's process.

delete_after_completion: bool
1038    @property
1039    def delete_after_completion(self) -> bool:
1040        """
1041        Return whether this job is configured to delete itself after completion.
1042        """
1043        if '_delete_after_completion' in self.__dict__:
1044            return self.__dict__.get('_delete_after_completion', False)
1045
1046        self._delete_after_completion = self.daemon.properties.get('delete_after_completion', False)
1047        return self._delete_after_completion

Return whether this job is configured to delete itself after completion.

class StopMonitoringLogs(builtins.Exception):
49class StopMonitoringLogs(Exception):
50    """
51    Raise this exception to stop the logs monitoring.
52    """

Raise this exception to stop the logs monitoring.

def get_jobs( executor_keys: Optional[str] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
 37def get_jobs(
 38    executor_keys: Optional[str] = None,
 39    include_hidden: bool = False,
 40    combine_local_and_systemd: bool = True,
 41    debug: bool = False,
 42) -> Dict[str, Job]:
 43    """
 44    Return a dictionary of the existing jobs.
 45
 46    Parameters
 47    ----------
 48    executor_keys: Optional[str], default None
 49        If provided, return remote jobs on the given API instance.
 50        Otherwise return local jobs.
 51
 52    include_hidden: bool, default False
 53        If `True`, include jobs with the `hidden` attribute.
 54
 55    Returns
 56    -------
 57    A dictionary mapping job names to jobs.
 58    """
 59    from meerschaum.connectors.parse import parse_executor_keys
 60    executor_keys = executor_keys or get_executor_keys_from_context()
 61
 62    include_local_and_system = (
 63        combine_local_and_systemd
 64        and str(executor_keys).split(':', maxsplit=1)[0] in ('None', 'local', 'systemd')
 65        and get_executor_keys_from_context() == 'systemd'
 66    )
 67
 68    def _get_local_jobs():
 69        from meerschaum.utils.daemon import get_daemons
 70        daemons = get_daemons()
 71        jobs = {
 72            daemon.daemon_id: Job(name=daemon.daemon_id, executor_keys='local')
 73            for daemon in daemons
 74        }
 75        return {
 76            name: job
 77            for name, job in jobs.items()
 78            if (include_hidden or not job.hidden) and not job._is_externally_managed
 79
 80        }
 81
 82    def _get_systemd_jobs():
 83        from meerschaum.jobs.systemd import SystemdExecutor
 84        conn = SystemdExecutor('systemd')
 85        jobs = conn.get_jobs(debug=debug)
 86        return {
 87            name: job
 88            for name, job in jobs.items()
 89            if include_hidden or not job.hidden
 90        }
 91
 92    if include_local_and_system:
 93        local_jobs = _get_local_jobs()
 94        systemd_jobs = _get_systemd_jobs()
 95        shared_jobs = set(local_jobs) & set(systemd_jobs)
 96        if shared_jobs:
 97            from meerschaum.utils.misc import items_str
 98            from meerschaum.utils.warnings import warn
 99            warn(
100                "Job"
101                + ('s' if len(shared_jobs) != 1 else '')
102                + f" {items_str(list(shared_jobs))} "
103                + "exist"
104                + ('s' if len(shared_jobs) == 1 else '')
105                + " in both `local` and `systemd`.",
106                stack=False,
107            )
108        return {**local_jobs, **systemd_jobs}
109
110    if executor_keys == 'local':
111        return _get_local_jobs()
112
113    if executor_keys == 'systemd':
114        return _get_systemd_jobs()
115
116    try:
117        _ = parse_executor_keys(executor_keys, construct=False)
118        conn = parse_executor_keys(executor_keys)
119        jobs = conn.get_jobs(debug=debug)
120        return {
121            name: job
122            for name, job in jobs.items()
123            if include_hidden or not job.hidden
124        }
125    except Exception:
126        return {}

Return a dictionary of the existing jobs.

Parameters
  • executor_keys (Optional[str], default None): If provided, return remote jobs on the given API instance. Otherwise return local jobs.
  • include_hidden (bool, default False): If True, include jobs with the hidden attribute.
Returns
  • A dictionary mapping job names to jobs.
def get_filtered_jobs( executor_keys: Optional[str] = None, filter_list: Optional[List[str]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, warn: bool = False, debug: bool = False) -> Dict[str, Job]:
129def get_filtered_jobs(
130    executor_keys: Optional[str] = None,
131    filter_list: Optional[List[str]] = None,
132    include_hidden: bool = False,
133    combine_local_and_systemd: bool = True,
134    warn: bool = False,
135    debug: bool = False,
136) -> Dict[str, Job]:
137    """
138    Return a list of jobs filtered by the user.
139    """
140    from meerschaum.utils.warnings import warn as _warn
141    jobs = get_jobs(
142        executor_keys,
143        include_hidden=True,
144        combine_local_and_systemd=combine_local_and_systemd,
145        debug=debug,
146    )
147    if not filter_list:
148        return {
149            name: job
150            for name, job in jobs.items()
151            if include_hidden or not job.hidden
152        }
153
154    jobs_to_return = {}
155    filter_list_without_underscores = [name for name in filter_list if not name.startswith('_')]
156    filter_list_with_underscores = [name for name in filter_list if name.startswith('_')]
157    if (
158        filter_list_without_underscores and not filter_list_with_underscores
159        or filter_list_with_underscores and not filter_list_without_underscores
160    ):
161        pass
162    for name in filter_list:
163        job = jobs.get(name, None)
164        if job is None:
165            if warn:
166                _warn(
167                    f"Job '{name}' does not exist.",
168                    stack=False,
169                )
170            continue
171        jobs_to_return[name] = job
172
173    if not jobs_to_return and filter_list_with_underscores:
174        names_to_exclude = [name.lstrip('_') for name in filter_list_with_underscores]
175        return {
176            name: job
177            for name, job in jobs.items()
178            if name not in names_to_exclude
179        }
180
181    return jobs_to_return

Return a list of jobs filtered by the user.

def get_restart_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
184def get_restart_jobs(
185    executor_keys: Optional[str] = None,
186    jobs: Optional[Dict[str, Job]] = None,
187    include_hidden: bool = False,
188    combine_local_and_systemd: bool = True,
189    debug: bool = False,
190) -> Dict[str, Job]:
191    """
192    Return jobs which were created with `--restart` or `--loop`.
193    """
194    if jobs is None:
195        jobs = get_jobs(
196            executor_keys,
197            include_hidden=include_hidden,
198            combine_local_and_systemd=combine_local_and_systemd,
199            debug=debug,
200        )
201
202    return {
203        name: job
204        for name, job in jobs.items()
205        if job.restart
206    }

Return jobs which were created with --restart or --loop.

def get_running_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
209def get_running_jobs(
210    executor_keys: Optional[str] = None,
211    jobs: Optional[Dict[str, Job]] = None,
212    include_hidden: bool = False,
213    combine_local_and_systemd: bool = True,
214    debug: bool = False,
215) -> Dict[str, Job]:
216    """
217    Return a dictionary of running jobs.
218    """
219    if jobs is None:
220        jobs = get_jobs(
221            executor_keys,
222            include_hidden=include_hidden,
223            combine_local_and_systemd=combine_local_and_systemd,
224            debug=debug,
225        )
226
227    return {
228        name: job
229        for name, job in jobs.items()
230        if job.status == 'running'
231    }

Return a dictionary of running jobs.

def get_stopped_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
259def get_stopped_jobs(
260    executor_keys: Optional[str] = None,
261    jobs: Optional[Dict[str, Job]] = None,
262    include_hidden: bool = False,
263    combine_local_and_systemd: bool = True,
264    debug: bool = False,
265) -> Dict[str, Job]:
266    """
267    Return a dictionary of stopped jobs.
268    """
269    if jobs is None:
270        jobs = get_jobs(
271            executor_keys,
272            include_hidden=include_hidden,
273            combine_local_and_systemd=combine_local_and_systemd,
274            debug=debug,
275        )
276
277    return {
278        name: job
279        for name, job in jobs.items()
280        if job.status == 'stopped'
281    }

Return a dictionary of stopped jobs.

def get_paused_jobs( executor_keys: Optional[str] = None, jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = False, combine_local_and_systemd: bool = True, debug: bool = False) -> Dict[str, Job]:
234def get_paused_jobs(
235    executor_keys: Optional[str] = None,
236    jobs: Optional[Dict[str, Job]] = None,
237    include_hidden: bool = False,
238    combine_local_and_systemd: bool = True,
239    debug: bool = False,
240) -> Dict[str, Job]:
241    """
242    Return a dictionary of paused jobs.
243    """
244    if jobs is None:
245        jobs = get_jobs(
246            executor_keys,
247            include_hidden=include_hidden,
248            combine_local_and_systemd=combine_local_and_systemd,
249            debug=debug,
250        )
251
252    return {
253        name: job
254        for name, job in jobs.items()
255        if job.status == 'paused'
256    }

Return a dictionary of paused jobs.

def make_executor(cls):
284def make_executor(cls):
285    """
286    Register a class as an `Executor`.
287    """
288    import re
289    from meerschaum.connectors import make_connector
290    suffix_regex = r'executor$'
291    typ = re.sub(suffix_regex, '', cls.__name__.lower())
292    if typ not in executor_types:
293        executor_types.append(typ)
294    return make_connector(cls, _is_executor=True)

Register a class as an Executor.

class Executor(meerschaum.connectors._Connector.Connector):
 22class Executor(Connector):
 23    """
 24    Define the methods for managing jobs.
 25    """
 26
 27    @abstractmethod
 28    def get_job_names(self, debug: bool = False) -> List[str]:
 29        """
 30        Return a list of existing jobs, including hidden ones.
 31        """
 32
 33    @abstractmethod
 34    def get_job_exists(self, name: str, debug: bool = False) -> bool:
 35        """
 36        Return whether a job exists.
 37        """
 38
 39    @abstractmethod
 40    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
 41        """
 42        Return a dictionary of existing jobs.
 43        """
 44
 45    @abstractmethod
 46    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
 47        """
 48        Return a job's metadata.
 49        """
 50
 51    @abstractmethod
 52    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
 53        """
 54        Return the underlying daemon's properties.
 55        """
 56    @abstractmethod
 57    def get_job_status(self, name: str, debug: bool = False) -> str:
 58        """
 59        Return the job's status.
 60        """
 61
 62    @abstractmethod
 63    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
 64        """
 65        Return when a job began running.
 66        """
 67
 68    @abstractmethod
 69    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
 70        """
 71        Return when a job stopped running.
 72        """
 73
 74    @abstractmethod
 75    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
 76        """
 77        Return a job's `paused` timestamp, if it exists.
 78        """
 79    
 80    @abstractmethod
 81    def create_job(
 82        self,
 83        name: str,
 84        sysargs: List[str],
 85        properties: Optional[Dict[str, Any]] = None,
 86        debug: bool = False,
 87    ) -> SuccessTuple:
 88        """
 89        Create a new job.
 90        """
 91
 92    @abstractmethod
 93    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
 94        """
 95        Start a job.
 96        """
 97
 98    @abstractmethod
 99    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
100        """
101        Stop a job.
102        """
103
104    @abstractmethod
105    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
106        """
107        Pause a job.
108        """
109
110    @abstractmethod
111    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
112        """
113        Delete a job.
114        """
115
116    @abstractmethod
117    def get_logs(self, name: str, debug: bool = False) -> str:
118        """
119        Return a job's log output.
120        """
121
122    @abstractmethod
123    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
124        """
125        Return the job's manual stop time.
126        """
127
128    @abstractmethod
129    async def monitor_logs_async(
130        self,
131        name: str,
132        callback_function: Callable[[Any], Any],
133        input_callback_function: Callable[[], str],
134        stop_callback_function: Callable[[SuccessTuple], str],
135        stop_on_exit: bool = False,
136        strip_timestamps: bool = False,
137        accept_input: bool = True,
138        debug: bool = False,
139    ):
140        """
141        Monitor a job's log files and await a callback with the changes.
142        """
143
144    @abstractmethod
145    def monitor_logs(self, *args, **kwargs):
146        """
147        Monitor a job's log files.
148        """
149
150    @abstractmethod
151    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
152        """
153        Return whether a job is blocking on stdin.
154        """
155
156    @abstractmethod
157    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
158        """
159        Return the kwargs to the blocking prompt.
160        """

Define the methods for managing jobs.

@abstractmethod
def get_job_names(self, debug: bool = False) -> List[str]:
27    @abstractmethod
28    def get_job_names(self, debug: bool = False) -> List[str]:
29        """
30        Return a list of existing jobs, including hidden ones.
31        """

Return a list of existing jobs, including hidden ones.

@abstractmethod
def get_job_exists(self, name: str, debug: bool = False) -> bool:
33    @abstractmethod
34    def get_job_exists(self, name: str, debug: bool = False) -> bool:
35        """
36        Return whether a job exists.
37        """

Return whether a job exists.

@abstractmethod
def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
39    @abstractmethod
40    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
41        """
42        Return a dictionary of existing jobs.
43        """

Return a dictionary of existing jobs.

@abstractmethod
def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
45    @abstractmethod
46    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
47        """
48        Return a job's metadata.
49        """

Return a job's metadata.

@abstractmethod
def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
51    @abstractmethod
52    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
53        """
54        Return the underlying daemon's properties.
55        """

Return the underlying daemon's properties.

@abstractmethod
def get_job_status(self, name: str, debug: bool = False) -> str:
56    @abstractmethod
57    def get_job_status(self, name: str, debug: bool = False) -> str:
58        """
59        Return the job's status.
60        """

Return the job's status.

@abstractmethod
def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
62    @abstractmethod
63    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
64        """
65        Return when a job began running.
66        """

Return when a job began running.

@abstractmethod
def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
68    @abstractmethod
69    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
70        """
71        Return when a job stopped running.
72        """

Return when a job stopped running.

@abstractmethod
def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
74    @abstractmethod
75    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
76        """
77        Return a job's `paused` timestamp, if it exists.
78        """

Return a job's paused timestamp, if it exists.

@abstractmethod
def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
80    @abstractmethod
81    def create_job(
82        self,
83        name: str,
84        sysargs: List[str],
85        properties: Optional[Dict[str, Any]] = None,
86        debug: bool = False,
87    ) -> SuccessTuple:
88        """
89        Create a new job.
90        """

Create a new job.

@abstractmethod
def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
92    @abstractmethod
93    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
94        """
95        Start a job.
96        """

Start a job.

@abstractmethod
def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
 98    @abstractmethod
 99    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
100        """
101        Stop a job.
102        """

Stop a job.

@abstractmethod
def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
104    @abstractmethod
105    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
106        """
107        Pause a job.
108        """

Pause a job.

@abstractmethod
def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
110    @abstractmethod
111    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
112        """
113        Delete a job.
114        """

Delete a job.

@abstractmethod
def get_logs(self, name: str, debug: bool = False) -> str:
116    @abstractmethod
117    def get_logs(self, name: str, debug: bool = False) -> str:
118        """
119        Return a job's log output.
120        """

Return a job's log output.

@abstractmethod
def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
122    @abstractmethod
123    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
124        """
125        Return the job's manual stop time.
126        """

Return the job's manual stop time.

@abstractmethod
async def monitor_logs_async( self, name: str, callback_function: Callable[[Any], Any], input_callback_function: Callable[[], str], stop_callback_function: Callable[[Tuple[bool, str]], str], stop_on_exit: bool = False, strip_timestamps: bool = False, accept_input: bool = True, debug: bool = False):
128    @abstractmethod
129    async def monitor_logs_async(
130        self,
131        name: str,
132        callback_function: Callable[[Any], Any],
133        input_callback_function: Callable[[], str],
134        stop_callback_function: Callable[[SuccessTuple], str],
135        stop_on_exit: bool = False,
136        strip_timestamps: bool = False,
137        accept_input: bool = True,
138        debug: bool = False,
139    ):
140        """
141        Monitor a job's log files and await a callback with the changes.
142        """

Monitor a job's log files and await a callback with the changes.

@abstractmethod
def monitor_logs(self, *args, **kwargs):
144    @abstractmethod
145    def monitor_logs(self, *args, **kwargs):
146        """
147        Monitor a job's log files.
148        """

Monitor a job's log files.

@abstractmethod
def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
150    @abstractmethod
151    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
152        """
153        Return whether a job is blocking on stdin.
154        """

Return whether a job is blocking on stdin.

@abstractmethod
def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
156    @abstractmethod
157    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
158        """
159        Return the kwargs to the blocking prompt.
160        """

Return the kwargs to the blocking prompt.

def check_restart_jobs( executor_keys: Optional[str] = 'local', jobs: Optional[Dict[str, Job]] = None, include_hidden: bool = True, silent: bool = False, debug: bool = False) -> Tuple[bool, str]:
297def check_restart_jobs(
298    executor_keys: Optional[str] = 'local',
299    jobs: Optional[Dict[str, Job]] = None,
300    include_hidden: bool = True,
301    silent: bool = False,
302    debug: bool = False,
303) -> SuccessTuple:
304    """
305    Restart any stopped jobs which were created with `--restart`.
306
307    Parameters
308    ----------
309    executor_keys: Optional[str], default None
310        If provided, check jobs on the given remote API instance.
311        Otherwise check local jobs.
312
313    include_hidden: bool, default True
314        If `True`, include hidden jobs in the check.
315
316    silent: bool, default False
317        If `True`, do not print the restart success message.
318    """
319    from meerschaum.utils.misc import items_str
320
321    if jobs is None:
322        jobs = get_jobs(
323            executor_keys,
324            include_hidden=include_hidden,
325            combine_local_and_systemd=False,
326            debug=debug,
327        )
328
329    if not jobs:
330        return True, "No jobs to restart."
331
332    results = {}
333    for name, job in jobs.items():
334        check_success, check_msg = job.check_restart()
335        results[job.name] = (check_success, check_msg)
336        if not silent:
337            mrsm.pprint((check_success, check_msg))
338
339    success_names = [name for name, (check_success, check_msg) in results.items() if check_success]
340    fail_names = [name for name, (check_success, check_msg) in results.items() if not check_success]
341    success = len(success_names) == len(jobs)
342    msg = (
343        (
344            "Successfully restarted job"
345            + ('s' if len(success_names) != 1 else '')
346            + ' ' + items_str(success_names) + '.'
347        )
348        if success
349        else (
350            "Failed to restart job"
351            + ('s' if len(success_names) != 1 else '')
352            + ' ' + items_str(fail_names) + '.'
353        )
354    )
355    return success, msg

Restart any stopped jobs which were created with --restart.

Parameters
  • executor_keys (Optional[str], default None): If provided, check jobs on the given remote API instance. Otherwise check local jobs.
  • include_hidden (bool, default True): If True, include hidden jobs in the check.
  • silent (bool, default False): If True, do not print the restart success message.
def start_check_jobs_thread():
367def start_check_jobs_thread():
368    """
369    Start a thread to regularly monitor jobs.
370    """
371    import atexit
372    from functools import partial
373    from meerschaum.utils.threading import RepeatTimer
374    from meerschaum._internal.static import STATIC_CONFIG
375
376    global _check_loop_stop_thread
377    sleep_seconds = STATIC_CONFIG['jobs']['check_restart_seconds']
378
379    _check_loop_stop_thread = RepeatTimer(
380        sleep_seconds,
381        partial(
382            _check_restart_jobs_against_lock,
383            silent=True,
384        )
385    )
386    _check_loop_stop_thread.daemon = True
387    atexit.register(stop_check_jobs_thread)
388
389    _check_loop_stop_thread.start()
390    return _check_loop_stop_thread

Start a thread to regularly monitor jobs.

def stop_check_jobs_thread():
393def stop_check_jobs_thread():
394    """
395    Stop the job monitoring thread.
396    """
397    from meerschaum.config.paths import CHECK_JOBS_LOCK_PATH
398    from meerschaum.utils.warnings import warn
399    if _check_loop_stop_thread is None:
400        return
401
402    _check_loop_stop_thread.cancel()
403
404    try:
405        if CHECK_JOBS_LOCK_PATH.exists():
406            CHECK_JOBS_LOCK_PATH.unlink()
407    except Exception as e:
408        warn(f"Failed to remove check jobs lock file:\n{e}")

Stop the job monitoring thread.