meerschaum.jobs.systemd

Manage meerschaum.jobs.Job via systemd.

  1#! /usr/bin/env python3
  2# vim:fenc=utf-8
  3
  4"""
  5Manage `meerschaum.jobs.Job` via `systemd`.
  6"""
  7
  8import os
  9import pathlib
 10import shlex
 11import sys
 12import asyncio
 13import json
 14import time
 15import shutil
 16from datetime import datetime, timezone
 17from functools import partial
 18
 19import meerschaum as mrsm
 20from meerschaum.jobs import Job, Executor, make_executor
 21from meerschaum.utils.typing import Dict, Any, List, SuccessTuple, Union, Optional
 22from meerschaum.config import get_config
 23from meerschaum._internal.static import STATIC_CONFIG
 24from meerschaum.utils.warnings import warn, dprint
 25
 26JOB_METADATA_CACHE_SECONDS: int = STATIC_CONFIG['api']['jobs']['metadata_cache_seconds']
 27
 28
 29@make_executor
 30class SystemdExecutor(Executor):
 31    """
 32    Execute Meerschaum jobs via `systemd`.
 33    """
 34
 35    def get_job_names(self, debug: bool = False) -> List[str]:
 36        """
 37        Return a list of existing jobs, including hidden ones.
 38        """
 39        import meerschaum.config.paths as paths
 40        return [
 41            service_name[len('mrsm-'):(-1 * len('.service'))]
 42            for service_name in os.listdir(paths.SYSTEMD_USER_RESOURCES_PATH)
 43            if (
 44                service_name.startswith('mrsm-')
 45                and service_name.endswith('.service')
 46                ### Check for broken symlinks.
 47                and (paths.SYSTEMD_USER_RESOURCES_PATH / service_name).exists()
 48            )
 49        ]
 50
 51    def get_job_exists(self, name: str, debug: bool = False) -> bool:
 52        """
 53        Return whether a job exists.
 54        """
 55        user_services = self.get_job_names(debug=debug)
 56        if debug:
 57            dprint(f'Existing services: {user_services}')
 58        return name in user_services
 59
 60    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
 61        """
 62        Return a dictionary of `systemd` Jobs (including hidden jobs).
 63        """
 64        user_services = self.get_job_names(debug=debug)
 65        jobs = {
 66            name: Job(name, executor_keys=str(self))
 67            for name in user_services
 68        }
 69        return {
 70            name: job
 71            for name, job in jobs.items()
 72        }
 73
 74    def get_service_name(self, name: str, debug: bool = False) -> str:
 75        """
 76        Return a job's service name.
 77        """
 78        return f"mrsm-{name.replace(' ', '-')}.service"
 79
 80    def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
 81        """
 82        Return the path for the job's files under the root directory.
 83        """
 84        import meerschaum.config.paths as paths
 85        return paths.SYSTEMD_JOBS_RESOURCES_PATH / name
 86
 87    def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 88        """
 89        Return the path to where to create the service symlink.
 90        """
 91        import meerschaum.config.paths as paths
 92        return paths.SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug)
 93
 94    def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 95        """
 96        Return the path to a Job's service file.
 97        """
 98        return (
 99            self.get_service_job_path(name, debug=debug)
100            / self.get_service_name(name, debug=debug)
101        )
102
103    def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
104        """
105        Return the path to direct service logs to.
106        """
107        import meerschaum.config.paths as paths
108        return (
109            paths.SYSTEMD_LOGS_RESOURCES_PATH
110            / (self.get_service_name(name, debug=debug) + '.log')
111        )
112
113    def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
114        """
115        Return the path to the FIFO file.
116        """
117        return (
118            self.get_service_job_path(name, debug=debug)
119            / (self.get_service_name(name, debug=debug) + '.stdin')
120        )
121
122    def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
123        """
124        Return the path to the result file.
125        """
126        return (
127            self.get_service_job_path(name, debug=debug)
128            / (self.get_service_name(name, debug=debug) + '.result.json')
129        )
130
131    def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
132        """
133        Return the contents of the unit file.
134        """
135        service_logs_path = self.get_service_logs_path(name, debug=debug)
136        socket_path = self.get_socket_path(name, debug=debug)
137        result_path = self.get_result_path(name, debug=debug)
138        job = self.get_hidden_job(name, debug=debug)
139
140        sysargs_str = shlex.join(sysargs)
141        exec_str = f'{sys.executable} -m meerschaum {sysargs_str}'
142        mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()])
143        mrsm_env_vars = {
144            key: val
145            for key, val in os.environ.items()
146            if key in mrsm_env_var_names
147        }
148
149        ### Add new environment variables for the service process.
150        mrsm_env_vars.update({
151            STATIC_CONFIG['environment']['daemon_id']: name,
152            STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(),
153            STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(),
154            STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(),
155            STATIC_CONFIG['environment']['systemd_delete_job']: (
156                '1'
157                if job.delete_after_completion
158                else '0'
159            ),
160        })
161
162        ### Allow for user-defined environment variables.
163        mrsm_env_vars.update(job.env)
164
165        environment_lines = [
166            f"Environment={key}={shlex.quote(str(val))}"
167            for key, val in mrsm_env_vars.items()
168        ]
169        environment_str = '\n'.join(environment_lines)
170        service_name = self.get_service_name(name, debug=debug)
171
172        service_text = (
173            "[Unit]\n"
174            f"Description=Run the job '{name}'\n"
175            "\n"
176            "[Service]\n"
177            f"ExecStart={exec_str}\n"
178            "KillSignal=SIGTERM\n"
179            "Restart=always\n"
180            "RestartPreventExitStatus=0\n"
181            f"SyslogIdentifier={service_name}\n"
182            f"{environment_str}\n"
183            "\n"
184            "[Install]\n"
185            "WantedBy=default.target\n"
186        )
187        return service_text
188
189    def get_socket_file_text(self, name: str, debug: bool = False) -> str:
190        """
191        Return the contents of the socket file.
192        """
193        service_name = self.get_service_name(name, debug=debug)
194        socket_path = self.get_socket_path(name, debug=debug)
195        socket_text = (
196            "[Unit]\n"
197            f"BindsTo={service_name}\n"
198            "\n"
199            "[Socket]\n"
200            f"ListenFIFO={socket_path.as_posix()}\n"
201            "FileDescriptorName=stdin\n"
202            "RemoveOnStop=true\n"
203            "SocketMode=0660\n"
204        )
205        return socket_text
206
207    def get_hidden_job(
208        self,
209        name: str,
210        sysargs: Optional[List[str]] = None,
211        properties: Optional[Dict[str, Any]] = None,
212        debug: bool = False,
213    ):
214        """
215        Return the hidden "sister" job to store a job's parameters.
216        """
217        job = Job(
218            name,
219            sysargs,
220            executor_keys='local',
221            _properties=properties,
222            _rotating_log=self.get_job_rotating_file(name, debug=debug),
223            _stdin_file=self.get_job_stdin_file(name, debug=debug),
224            _status_hook=partial(self.get_job_status, name),
225            _result_hook=partial(self.get_job_result, name),
226            _externally_managed=True,
227        )
228        return job
229
230
231    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
232        """
233        Return metadata about a job.
234        """
235        now = time.perf_counter()
236
237        if '_jobs_metadata' not in self.__dict__:
238            self._jobs_metadata: Dict[str, Any] = {}
239
240        if name in self._jobs_metadata:
241            ts = self._jobs_metadata[name].get('timestamp', None)
242
243            if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS:
244                if debug:
245                    dprint(f"Retuning cached metadata for job '{name}'.")
246                return self._jobs_metadata[name]['metadata']
247
248        metadata = {
249            'sysargs': self.get_job_sysargs(name, debug=debug),
250            'result': self.get_job_result(name, debug=debug),
251            'restart': self.get_job_restart(name, debug=debug),
252            'daemon': {
253                'status': self.get_job_status(name, debug=debug),
254                'pid': self.get_job_pid(name, debug=debug),
255                'properties': self.get_job_properties(name, debug=debug),
256            },
257        }
258        self._jobs_metadata[name] = {
259            'timestamp': now,
260            'metadata': metadata,
261        }
262        return metadata
263
264    def get_job_restart(self, name: str, debug: bool = False) -> bool:
265        """
266        Return whether a job restarts.
267        """
268        from meerschaum.jobs._Job import RESTART_FLAGS
269        sysargs = self.get_job_sysargs(name, debug=debug)
270        if not sysargs:
271            return False
272
273        for flag in RESTART_FLAGS:
274            if flag in sysargs:
275                return True
276
277        return False
278
279    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
280        """
281        Return the properties for a job.
282        """
283        job = self.get_hidden_job(name, debug=debug)
284        return {
285            k: v for k, v in job.daemon.properties.items()
286            if k != 'externally_managed'
287        }
288
289    def get_job_process(self, name: str, debug: bool = False):
290        """
291        Return a `psutil.Process` for the job's PID.
292        """
293        pid = self.get_job_pid(name, debug=debug)
294        if pid is None:
295            return None
296
297        psutil = mrsm.attempt_import('psutil')
298        try:
299            return psutil.Process(pid)
300        except Exception:
301            return None
302
303    def get_job_status(self, name: str, debug: bool = False) -> str:
304        """
305        Return the job's service status.
306        """
307        output = self.run_command(
308            ['is-active', self.get_service_name(name, debug=debug)],
309            as_output=True,
310            debug=debug,
311        )
312
313        if output == 'activating':
314            return 'running'
315
316        if output == 'active':
317            process = self.get_job_process(name, debug=debug)
318            if process is None:
319                return 'stopped'
320
321            try:
322                if process.status() == 'stopped':
323                    return 'paused'
324            except Exception:
325                return 'stopped'
326
327            return 'running'
328
329        return 'stopped'
330
331    def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]:
332        """
333        Return the job's service PID.
334        """
335        from meerschaum.utils.misc import is_int
336
337        output = self.run_command(
338            [
339                'show',
340                self.get_service_name(name, debug=debug),
341                '--property=MainPID',
342            ],
343            as_output=True,
344            debug=debug,
345        )
346        if not output.startswith('MainPID='):
347            return None
348
349        pid_str = output[len('MainPID='):]
350        if pid_str == '0':
351            return None
352
353        if is_int(pid_str):
354            return int(pid_str)
355        
356        return None
357
358    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
359        """
360        Return when a job began running.
361        """
362        output = self.run_command(
363            [
364                'show',
365                self.get_service_name(name, debug=debug),
366                '--property=ActiveEnterTimestamp'
367            ],
368            as_output=True,
369            debug=debug,
370        )
371        if not output.startswith('ActiveEnterTimestamp'):
372            return None
373
374        dt_str = output.split('=')[-1]
375        if not dt_str:
376            return None
377
378        dateutil_parser = mrsm.attempt_import('dateutil.parser')
379        try:
380            dt = dateutil_parser.parse(dt_str)
381        except Exception as e:
382            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
383            return None
384
385        return dt.astimezone(timezone.utc).isoformat()
386
387    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
388        """
389        Return when a job began running.
390        """
391        output = self.run_command(
392            [
393                'show',
394                self.get_service_name(name, debug=debug),
395                '--property=InactiveEnterTimestamp'
396            ],
397            as_output=True,
398            debug=debug,
399        )
400        if not output.startswith('InactiveEnterTimestamp'):
401            return None
402
403        dt_str = output.split('=')[-1]
404        if not dt_str:
405            return None
406
407        dateutil_parser = mrsm.attempt_import('dateutil.parser')
408
409        try:
410            dt = dateutil_parser.parse(dt_str)
411        except Exception as e:
412            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
413            return None
414        return dt.astimezone(timezone.utc).isoformat()
415
416    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
417        """
418        Return when a job was paused.
419        """
420        job = self.get_hidden_job(name, debug=debug)
421        if self.get_job_status(name, debug=debug) != 'paused':
422            return None
423
424        stop_time = job.stop_time
425        if stop_time is None:
426            return None
427
428        return stop_time.isoformat()
429
430    def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple:
431        """
432        Return the job's result SuccessTuple.
433        """
434        result_path = self.get_result_path(name, debug=debug)
435        if not result_path.exists():
436            return False, "No result available."
437
438        try:
439            with open(result_path, 'r', encoding='utf-8') as f:
440                result = json.load(f)
441        except Exception:
442            return False, f"Could not read result for Job '{name}'."
443
444        return tuple(result)
445
446    def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]:
447        """
448        Return the sysargs from the service file.
449        """
450        service_file_path = self.get_service_file_path(name, debug=debug)
451        if not service_file_path.exists():
452            return []
453
454        with open(service_file_path, 'r', encoding='utf-8') as f:
455            service_lines = f.readlines()
456
457        for line in service_lines:
458            if line.startswith('ExecStart='):
459                sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0]
460                return shlex.split(sysargs_str)
461
462        return []
463
464    def run_command(
465        self,
466        command_args: List[str],
467        as_output: bool = False,
468        debug: bool = False,
469    ) -> Union[SuccessTuple, str]:
470        """
471        Run a `systemd` command and return success.
472
473        Parameters
474        ----------
475        command_args: List[str]
476            The command to pass to `systemctl --user`.
477
478        as_output: bool, default False
479            If `True`, return the process stdout output.
480            Defaults to a `SuccessTuple`.
481
482        Returns
483        -------
484        A `SuccessTuple` indicating success or a str for the process output.
485        """
486        from meerschaum.utils.process import run_process
487
488        if command_args[:2] != ['systemctl', '--user']:
489            command_args = ['systemctl', '--user'] + command_args
490
491        if debug:
492            dprint(shlex.join(command_args))
493
494        proc = run_process(
495            command_args,
496            foreground=False,
497            as_proc=True,
498            capture_output=True,
499            text=True,
500        )
501        stdout, stderr = proc.communicate()
502        if debug:
503            dprint(f"{stdout}")
504
505        if as_output:
506            return stdout.strip()
507            
508        command_success = proc.wait() == 0
509        command_msg = (
510            "Success"
511            if command_success
512            else f"Failed to execute command `{shlex.join(command_args)}`."
513        )
514        return command_success, command_msg
515
516    def get_job_stdin_file(self, name: str, debug: bool = False):
517        """
518        Return a `StdinFile` for the job.
519        """
520        from meerschaum.utils.daemon import StdinFile
521        if '_stdin_files' not in self.__dict__:
522            self._stdin_files: Dict[str, StdinFile] = {}
523
524        if name not in self._stdin_files:
525            socket_path = self.get_socket_path(name, debug=debug)
526            socket_path.parent.mkdir(parents=True, exist_ok=True)
527            self._stdin_files[name] = StdinFile(socket_path)
528
529        return self._stdin_files[name]
530
531    def create_job(
532        self,
533        name: str,
534        sysargs: List[str],
535        properties: Optional[Dict[str, Any]] = None,
536        debug: bool = False,
537    ) -> SuccessTuple:
538        """
539        Create a job as a service to be run by `systemd`.
540        """
541        from meerschaum.utils.misc import make_symlink
542        service_name = self.get_service_name(name, debug=debug)
543        service_file_path = self.get_service_file_path(name, debug=debug)
544        service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug)
545        socket_stdin = self.get_job_stdin_file(name, debug=debug)
546        _ = socket_stdin.file_handler
547
548        ### Init the externally_managed file.
549        ### NOTE: We must write the pickle file in addition to the properties file.
550        job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug)
551        job._set_externally_managed()
552        pickle_success, pickle_msg = job.daemon.write_pickle()
553        if not pickle_success:
554            return pickle_success, pickle_msg
555        properties_success, properties_msg = job.daemon.write_properties()
556        if not properties_success:
557            return properties_success, properties_msg
558
559        service_file_path.parent.mkdir(parents=True, exist_ok=True)
560
561        with open(service_file_path, 'w+', encoding='utf-8') as f:
562            f.write(self.get_service_file_text(name, sysargs, debug=debug))
563
564        symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path)
565        if not symlink_success:
566            return symlink_success, symlink_msg
567
568        commands = [
569            ['daemon-reload'],
570            ['enable', service_name],
571            ['start', service_name],
572        ]
573
574        fails = 0
575        for command_list in commands:
576            command_success, command_msg = self.run_command(command_list, debug=debug)
577            if not command_success:
578                fails += 1
579
580        if fails > 1:
581            return False, "Failed to reload systemd."
582
583        return True, f"Started job '{name}' via systemd."
584
585    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
586        """
587        Stop a job's service.
588        """
589        job = self.get_hidden_job(name, debug=debug)
590        job.daemon._remove_stop_file()
591
592        status = self.get_job_status(name, debug=debug)
593        if status == 'paused':
594            return self.run_command(
595                ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)],
596                debug=debug,
597            )
598
599        return self.run_command(
600            ['start', self.get_service_name(name, debug=debug)],
601            debug=debug,
602        )
603
604    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
605        """
606        Stop a job's service.
607        """
608        job = self.get_hidden_job(name, debug=debug)
609        job.daemon._write_stop_file('quit')
610        sigint_success, sigint_msg = self.run_command(
611            ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)],
612            debug=debug,
613        )
614
615        check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds')
616        loop_start = time.perf_counter()
617        timeout_seconds = get_config('jobs', 'timeout_seconds')
618        while (time.perf_counter() - loop_start) < timeout_seconds:
619            if self.get_job_status(name, debug=debug) == 'stopped':
620                return True, 'Success'
621
622            time.sleep(check_timeout_interval)
623
624        return self.run_command(
625            ['stop', self.get_service_name(name, debug=debug)],
626            debug=debug,
627        )
628
629    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
630        """
631        Pause a job's service.
632        """
633        job = self.get_hidden_job(name, debug=debug)
634        job.daemon._write_stop_file('pause')
635        return self.run_command(
636            ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)],
637            debug=debug,
638        )
639
640    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
641        """
642        Delete a job's service.
643        """
644        import meerschaum.config.paths as paths
645        job = self.get_hidden_job(name, debug=debug)
646
647        if not job.delete_after_completion:
648            _ = self.stop_job(name, debug=debug)
649            _ = self.run_command(
650                ['disable', self.get_service_name(name, debug=debug)],
651                debug=debug,
652            )
653
654        service_job_path = self.get_service_job_path(name, debug=debug)
655        try:
656            if service_job_path.exists():
657                shutil.rmtree(service_job_path)
658        except Exception as e:
659            warn(e)
660            return False, str(e)
661
662        service_logs_path = self.get_service_logs_path(name, debug=debug)
663        logs_paths = [
664            (paths.SYSTEMD_LOGS_RESOURCES_PATH / name)
665            for name in os.listdir(paths.SYSTEMD_LOGS_RESOURCES_PATH)
666            if name.startswith(service_logs_path.name + '.')
667        ]
668        file_paths = [
669            self.get_service_file_path(name, debug=debug),
670            self.get_service_symlink_file_path(name, debug=debug),
671            self.get_socket_path(name, debug=debug),
672            self.get_result_path(name, debug=debug),
673        ] + logs_paths
674
675        for path in file_paths:
676            if path.exists():
677                try:
678                    path.unlink()
679                except Exception as e:
680                    warn(e)
681                    return False, str(e)
682
683        _ = job.delete()
684
685        return self.run_command(['daemon-reload'], debug=debug)
686
687    def get_logs(self, name: str, debug: bool = False) -> str:
688        """
689        Return a job's journal logs.
690        """
691        rotating_file = self.get_job_rotating_file(name, debug=debug)
692        return rotating_file.read()
693
694    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
695        """
696        Return a job's stop time.
697        """
698        job = self.get_hidden_job(name, debug=debug)
699        return job.stop_time
700
701    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
702        """
703        Return whether a job is blocking on stdin.
704        """
705        socket_path = self.get_socket_path(name, debug=debug)
706        blocking_path = socket_path.parent / (socket_path.name + '.block')
707        return blocking_path.exists()
708
709    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
710        """
711        Return the kwargs to the blocking prompt.
712        """
713        job = self.get_hidden_job(name, debug=debug)
714        return job.get_prompt_kwargs(debug=debug)
715
716    def get_job_rotating_file(self, name: str, debug: bool = False):
717        """
718        Return a `RotatingFile` for the job's log output.
719        """
720        from meerschaum.utils.daemon import RotatingFile
721        service_logs_path = self.get_service_logs_path(name, debug=debug)
722        return RotatingFile(service_logs_path)
723
724    async def monitor_logs_async(
725        self,
726        name: str,
727        *args,
728        debug: bool = False,
729        **kwargs
730    ):
731        """
732        Monitor a job's output.
733        """
734        import meerschaum.config.paths as paths
735        job = self.get_hidden_job(name, debug=debug)
736        kwargs.update({
737            '_logs_path': paths.SYSTEMD_LOGS_RESOURCES_PATH,
738            '_log': self.get_job_rotating_file(name, debug=debug),
739            '_stdin_file': self.get_job_stdin_file(name, debug=debug),
740            'debug': debug,
741        })
742        await job.monitor_logs_async(*args, **kwargs)
743
744    def monitor_logs(self, *args, **kwargs):
745        """
746        Monitor a job's output.
747        """
748        asyncio.run(self.monitor_logs_async(*args, **kwargs))
JOB_METADATA_CACHE_SECONDS: int = 5
@make_executor
class SystemdExecutor(meerschaum.jobs._Executor.Executor):
 30@make_executor
 31class SystemdExecutor(Executor):
 32    """
 33    Execute Meerschaum jobs via `systemd`.
 34    """
 35
 36    def get_job_names(self, debug: bool = False) -> List[str]:
 37        """
 38        Return a list of existing jobs, including hidden ones.
 39        """
 40        import meerschaum.config.paths as paths
 41        return [
 42            service_name[len('mrsm-'):(-1 * len('.service'))]
 43            for service_name in os.listdir(paths.SYSTEMD_USER_RESOURCES_PATH)
 44            if (
 45                service_name.startswith('mrsm-')
 46                and service_name.endswith('.service')
 47                ### Check for broken symlinks.
 48                and (paths.SYSTEMD_USER_RESOURCES_PATH / service_name).exists()
 49            )
 50        ]
 51
 52    def get_job_exists(self, name: str, debug: bool = False) -> bool:
 53        """
 54        Return whether a job exists.
 55        """
 56        user_services = self.get_job_names(debug=debug)
 57        if debug:
 58            dprint(f'Existing services: {user_services}')
 59        return name in user_services
 60
 61    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
 62        """
 63        Return a dictionary of `systemd` Jobs (including hidden jobs).
 64        """
 65        user_services = self.get_job_names(debug=debug)
 66        jobs = {
 67            name: Job(name, executor_keys=str(self))
 68            for name in user_services
 69        }
 70        return {
 71            name: job
 72            for name, job in jobs.items()
 73        }
 74
 75    def get_service_name(self, name: str, debug: bool = False) -> str:
 76        """
 77        Return a job's service name.
 78        """
 79        return f"mrsm-{name.replace(' ', '-')}.service"
 80
 81    def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
 82        """
 83        Return the path for the job's files under the root directory.
 84        """
 85        import meerschaum.config.paths as paths
 86        return paths.SYSTEMD_JOBS_RESOURCES_PATH / name
 87
 88    def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 89        """
 90        Return the path to where to create the service symlink.
 91        """
 92        import meerschaum.config.paths as paths
 93        return paths.SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug)
 94
 95    def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 96        """
 97        Return the path to a Job's service file.
 98        """
 99        return (
100            self.get_service_job_path(name, debug=debug)
101            / self.get_service_name(name, debug=debug)
102        )
103
104    def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
105        """
106        Return the path to direct service logs to.
107        """
108        import meerschaum.config.paths as paths
109        return (
110            paths.SYSTEMD_LOGS_RESOURCES_PATH
111            / (self.get_service_name(name, debug=debug) + '.log')
112        )
113
114    def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
115        """
116        Return the path to the FIFO file.
117        """
118        return (
119            self.get_service_job_path(name, debug=debug)
120            / (self.get_service_name(name, debug=debug) + '.stdin')
121        )
122
123    def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
124        """
125        Return the path to the result file.
126        """
127        return (
128            self.get_service_job_path(name, debug=debug)
129            / (self.get_service_name(name, debug=debug) + '.result.json')
130        )
131
132    def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
133        """
134        Return the contents of the unit file.
135        """
136        service_logs_path = self.get_service_logs_path(name, debug=debug)
137        socket_path = self.get_socket_path(name, debug=debug)
138        result_path = self.get_result_path(name, debug=debug)
139        job = self.get_hidden_job(name, debug=debug)
140
141        sysargs_str = shlex.join(sysargs)
142        exec_str = f'{sys.executable} -m meerschaum {sysargs_str}'
143        mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()])
144        mrsm_env_vars = {
145            key: val
146            for key, val in os.environ.items()
147            if key in mrsm_env_var_names
148        }
149
150        ### Add new environment variables for the service process.
151        mrsm_env_vars.update({
152            STATIC_CONFIG['environment']['daemon_id']: name,
153            STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(),
154            STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(),
155            STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(),
156            STATIC_CONFIG['environment']['systemd_delete_job']: (
157                '1'
158                if job.delete_after_completion
159                else '0'
160            ),
161        })
162
163        ### Allow for user-defined environment variables.
164        mrsm_env_vars.update(job.env)
165
166        environment_lines = [
167            f"Environment={key}={shlex.quote(str(val))}"
168            for key, val in mrsm_env_vars.items()
169        ]
170        environment_str = '\n'.join(environment_lines)
171        service_name = self.get_service_name(name, debug=debug)
172
173        service_text = (
174            "[Unit]\n"
175            f"Description=Run the job '{name}'\n"
176            "\n"
177            "[Service]\n"
178            f"ExecStart={exec_str}\n"
179            "KillSignal=SIGTERM\n"
180            "Restart=always\n"
181            "RestartPreventExitStatus=0\n"
182            f"SyslogIdentifier={service_name}\n"
183            f"{environment_str}\n"
184            "\n"
185            "[Install]\n"
186            "WantedBy=default.target\n"
187        )
188        return service_text
189
190    def get_socket_file_text(self, name: str, debug: bool = False) -> str:
191        """
192        Return the contents of the socket file.
193        """
194        service_name = self.get_service_name(name, debug=debug)
195        socket_path = self.get_socket_path(name, debug=debug)
196        socket_text = (
197            "[Unit]\n"
198            f"BindsTo={service_name}\n"
199            "\n"
200            "[Socket]\n"
201            f"ListenFIFO={socket_path.as_posix()}\n"
202            "FileDescriptorName=stdin\n"
203            "RemoveOnStop=true\n"
204            "SocketMode=0660\n"
205        )
206        return socket_text
207
208    def get_hidden_job(
209        self,
210        name: str,
211        sysargs: Optional[List[str]] = None,
212        properties: Optional[Dict[str, Any]] = None,
213        debug: bool = False,
214    ):
215        """
216        Return the hidden "sister" job to store a job's parameters.
217        """
218        job = Job(
219            name,
220            sysargs,
221            executor_keys='local',
222            _properties=properties,
223            _rotating_log=self.get_job_rotating_file(name, debug=debug),
224            _stdin_file=self.get_job_stdin_file(name, debug=debug),
225            _status_hook=partial(self.get_job_status, name),
226            _result_hook=partial(self.get_job_result, name),
227            _externally_managed=True,
228        )
229        return job
230
231
232    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
233        """
234        Return metadata about a job.
235        """
236        now = time.perf_counter()
237
238        if '_jobs_metadata' not in self.__dict__:
239            self._jobs_metadata: Dict[str, Any] = {}
240
241        if name in self._jobs_metadata:
242            ts = self._jobs_metadata[name].get('timestamp', None)
243
244            if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS:
245                if debug:
246                    dprint(f"Retuning cached metadata for job '{name}'.")
247                return self._jobs_metadata[name]['metadata']
248
249        metadata = {
250            'sysargs': self.get_job_sysargs(name, debug=debug),
251            'result': self.get_job_result(name, debug=debug),
252            'restart': self.get_job_restart(name, debug=debug),
253            'daemon': {
254                'status': self.get_job_status(name, debug=debug),
255                'pid': self.get_job_pid(name, debug=debug),
256                'properties': self.get_job_properties(name, debug=debug),
257            },
258        }
259        self._jobs_metadata[name] = {
260            'timestamp': now,
261            'metadata': metadata,
262        }
263        return metadata
264
265    def get_job_restart(self, name: str, debug: bool = False) -> bool:
266        """
267        Return whether a job restarts.
268        """
269        from meerschaum.jobs._Job import RESTART_FLAGS
270        sysargs = self.get_job_sysargs(name, debug=debug)
271        if not sysargs:
272            return False
273
274        for flag in RESTART_FLAGS:
275            if flag in sysargs:
276                return True
277
278        return False
279
280    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
281        """
282        Return the properties for a job.
283        """
284        job = self.get_hidden_job(name, debug=debug)
285        return {
286            k: v for k, v in job.daemon.properties.items()
287            if k != 'externally_managed'
288        }
289
290    def get_job_process(self, name: str, debug: bool = False):
291        """
292        Return a `psutil.Process` for the job's PID.
293        """
294        pid = self.get_job_pid(name, debug=debug)
295        if pid is None:
296            return None
297
298        psutil = mrsm.attempt_import('psutil')
299        try:
300            return psutil.Process(pid)
301        except Exception:
302            return None
303
304    def get_job_status(self, name: str, debug: bool = False) -> str:
305        """
306        Return the job's service status.
307        """
308        output = self.run_command(
309            ['is-active', self.get_service_name(name, debug=debug)],
310            as_output=True,
311            debug=debug,
312        )
313
314        if output == 'activating':
315            return 'running'
316
317        if output == 'active':
318            process = self.get_job_process(name, debug=debug)
319            if process is None:
320                return 'stopped'
321
322            try:
323                if process.status() == 'stopped':
324                    return 'paused'
325            except Exception:
326                return 'stopped'
327
328            return 'running'
329
330        return 'stopped'
331
332    def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]:
333        """
334        Return the job's service PID.
335        """
336        from meerschaum.utils.misc import is_int
337
338        output = self.run_command(
339            [
340                'show',
341                self.get_service_name(name, debug=debug),
342                '--property=MainPID',
343            ],
344            as_output=True,
345            debug=debug,
346        )
347        if not output.startswith('MainPID='):
348            return None
349
350        pid_str = output[len('MainPID='):]
351        if pid_str == '0':
352            return None
353
354        if is_int(pid_str):
355            return int(pid_str)
356        
357        return None
358
359    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
360        """
361        Return when a job began running.
362        """
363        output = self.run_command(
364            [
365                'show',
366                self.get_service_name(name, debug=debug),
367                '--property=ActiveEnterTimestamp'
368            ],
369            as_output=True,
370            debug=debug,
371        )
372        if not output.startswith('ActiveEnterTimestamp'):
373            return None
374
375        dt_str = output.split('=')[-1]
376        if not dt_str:
377            return None
378
379        dateutil_parser = mrsm.attempt_import('dateutil.parser')
380        try:
381            dt = dateutil_parser.parse(dt_str)
382        except Exception as e:
383            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
384            return None
385
386        return dt.astimezone(timezone.utc).isoformat()
387
388    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
389        """
390        Return when a job began running.
391        """
392        output = self.run_command(
393            [
394                'show',
395                self.get_service_name(name, debug=debug),
396                '--property=InactiveEnterTimestamp'
397            ],
398            as_output=True,
399            debug=debug,
400        )
401        if not output.startswith('InactiveEnterTimestamp'):
402            return None
403
404        dt_str = output.split('=')[-1]
405        if not dt_str:
406            return None
407
408        dateutil_parser = mrsm.attempt_import('dateutil.parser')
409
410        try:
411            dt = dateutil_parser.parse(dt_str)
412        except Exception as e:
413            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
414            return None
415        return dt.astimezone(timezone.utc).isoformat()
416
417    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
418        """
419        Return when a job was paused.
420        """
421        job = self.get_hidden_job(name, debug=debug)
422        if self.get_job_status(name, debug=debug) != 'paused':
423            return None
424
425        stop_time = job.stop_time
426        if stop_time is None:
427            return None
428
429        return stop_time.isoformat()
430
431    def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple:
432        """
433        Return the job's result SuccessTuple.
434        """
435        result_path = self.get_result_path(name, debug=debug)
436        if not result_path.exists():
437            return False, "No result available."
438
439        try:
440            with open(result_path, 'r', encoding='utf-8') as f:
441                result = json.load(f)
442        except Exception:
443            return False, f"Could not read result for Job '{name}'."
444
445        return tuple(result)
446
447    def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]:
448        """
449        Return the sysargs from the service file.
450        """
451        service_file_path = self.get_service_file_path(name, debug=debug)
452        if not service_file_path.exists():
453            return []
454
455        with open(service_file_path, 'r', encoding='utf-8') as f:
456            service_lines = f.readlines()
457
458        for line in service_lines:
459            if line.startswith('ExecStart='):
460                sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0]
461                return shlex.split(sysargs_str)
462
463        return []
464
465    def run_command(
466        self,
467        command_args: List[str],
468        as_output: bool = False,
469        debug: bool = False,
470    ) -> Union[SuccessTuple, str]:
471        """
472        Run a `systemd` command and return success.
473
474        Parameters
475        ----------
476        command_args: List[str]
477            The command to pass to `systemctl --user`.
478
479        as_output: bool, default False
480            If `True`, return the process stdout output.
481            Defaults to a `SuccessTuple`.
482
483        Returns
484        -------
485        A `SuccessTuple` indicating success or a str for the process output.
486        """
487        from meerschaum.utils.process import run_process
488
489        if command_args[:2] != ['systemctl', '--user']:
490            command_args = ['systemctl', '--user'] + command_args
491
492        if debug:
493            dprint(shlex.join(command_args))
494
495        proc = run_process(
496            command_args,
497            foreground=False,
498            as_proc=True,
499            capture_output=True,
500            text=True,
501        )
502        stdout, stderr = proc.communicate()
503        if debug:
504            dprint(f"{stdout}")
505
506        if as_output:
507            return stdout.strip()
508            
509        command_success = proc.wait() == 0
510        command_msg = (
511            "Success"
512            if command_success
513            else f"Failed to execute command `{shlex.join(command_args)}`."
514        )
515        return command_success, command_msg
516
517    def get_job_stdin_file(self, name: str, debug: bool = False):
518        """
519        Return a `StdinFile` for the job.
520        """
521        from meerschaum.utils.daemon import StdinFile
522        if '_stdin_files' not in self.__dict__:
523            self._stdin_files: Dict[str, StdinFile] = {}
524
525        if name not in self._stdin_files:
526            socket_path = self.get_socket_path(name, debug=debug)
527            socket_path.parent.mkdir(parents=True, exist_ok=True)
528            self._stdin_files[name] = StdinFile(socket_path)
529
530        return self._stdin_files[name]
531
532    def create_job(
533        self,
534        name: str,
535        sysargs: List[str],
536        properties: Optional[Dict[str, Any]] = None,
537        debug: bool = False,
538    ) -> SuccessTuple:
539        """
540        Create a job as a service to be run by `systemd`.
541        """
542        from meerschaum.utils.misc import make_symlink
543        service_name = self.get_service_name(name, debug=debug)
544        service_file_path = self.get_service_file_path(name, debug=debug)
545        service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug)
546        socket_stdin = self.get_job_stdin_file(name, debug=debug)
547        _ = socket_stdin.file_handler
548
549        ### Init the externally_managed file.
550        ### NOTE: We must write the pickle file in addition to the properties file.
551        job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug)
552        job._set_externally_managed()
553        pickle_success, pickle_msg = job.daemon.write_pickle()
554        if not pickle_success:
555            return pickle_success, pickle_msg
556        properties_success, properties_msg = job.daemon.write_properties()
557        if not properties_success:
558            return properties_success, properties_msg
559
560        service_file_path.parent.mkdir(parents=True, exist_ok=True)
561
562        with open(service_file_path, 'w+', encoding='utf-8') as f:
563            f.write(self.get_service_file_text(name, sysargs, debug=debug))
564
565        symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path)
566        if not symlink_success:
567            return symlink_success, symlink_msg
568
569        commands = [
570            ['daemon-reload'],
571            ['enable', service_name],
572            ['start', service_name],
573        ]
574
575        fails = 0
576        for command_list in commands:
577            command_success, command_msg = self.run_command(command_list, debug=debug)
578            if not command_success:
579                fails += 1
580
581        if fails > 1:
582            return False, "Failed to reload systemd."
583
584        return True, f"Started job '{name}' via systemd."
585
586    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
587        """
588        Stop a job's service.
589        """
590        job = self.get_hidden_job(name, debug=debug)
591        job.daemon._remove_stop_file()
592
593        status = self.get_job_status(name, debug=debug)
594        if status == 'paused':
595            return self.run_command(
596                ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)],
597                debug=debug,
598            )
599
600        return self.run_command(
601            ['start', self.get_service_name(name, debug=debug)],
602            debug=debug,
603        )
604
605    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
606        """
607        Stop a job's service.
608        """
609        job = self.get_hidden_job(name, debug=debug)
610        job.daemon._write_stop_file('quit')
611        sigint_success, sigint_msg = self.run_command(
612            ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)],
613            debug=debug,
614        )
615
616        check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds')
617        loop_start = time.perf_counter()
618        timeout_seconds = get_config('jobs', 'timeout_seconds')
619        while (time.perf_counter() - loop_start) < timeout_seconds:
620            if self.get_job_status(name, debug=debug) == 'stopped':
621                return True, 'Success'
622
623            time.sleep(check_timeout_interval)
624
625        return self.run_command(
626            ['stop', self.get_service_name(name, debug=debug)],
627            debug=debug,
628        )
629
630    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
631        """
632        Pause a job's service.
633        """
634        job = self.get_hidden_job(name, debug=debug)
635        job.daemon._write_stop_file('pause')
636        return self.run_command(
637            ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)],
638            debug=debug,
639        )
640
641    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
642        """
643        Delete a job's service.
644        """
645        import meerschaum.config.paths as paths
646        job = self.get_hidden_job(name, debug=debug)
647
648        if not job.delete_after_completion:
649            _ = self.stop_job(name, debug=debug)
650            _ = self.run_command(
651                ['disable', self.get_service_name(name, debug=debug)],
652                debug=debug,
653            )
654
655        service_job_path = self.get_service_job_path(name, debug=debug)
656        try:
657            if service_job_path.exists():
658                shutil.rmtree(service_job_path)
659        except Exception as e:
660            warn(e)
661            return False, str(e)
662
663        service_logs_path = self.get_service_logs_path(name, debug=debug)
664        logs_paths = [
665            (paths.SYSTEMD_LOGS_RESOURCES_PATH / name)
666            for name in os.listdir(paths.SYSTEMD_LOGS_RESOURCES_PATH)
667            if name.startswith(service_logs_path.name + '.')
668        ]
669        file_paths = [
670            self.get_service_file_path(name, debug=debug),
671            self.get_service_symlink_file_path(name, debug=debug),
672            self.get_socket_path(name, debug=debug),
673            self.get_result_path(name, debug=debug),
674        ] + logs_paths
675
676        for path in file_paths:
677            if path.exists():
678                try:
679                    path.unlink()
680                except Exception as e:
681                    warn(e)
682                    return False, str(e)
683
684        _ = job.delete()
685
686        return self.run_command(['daemon-reload'], debug=debug)
687
688    def get_logs(self, name: str, debug: bool = False) -> str:
689        """
690        Return a job's journal logs.
691        """
692        rotating_file = self.get_job_rotating_file(name, debug=debug)
693        return rotating_file.read()
694
695    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
696        """
697        Return a job's stop time.
698        """
699        job = self.get_hidden_job(name, debug=debug)
700        return job.stop_time
701
702    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
703        """
704        Return whether a job is blocking on stdin.
705        """
706        socket_path = self.get_socket_path(name, debug=debug)
707        blocking_path = socket_path.parent / (socket_path.name + '.block')
708        return blocking_path.exists()
709
710    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
711        """
712        Return the kwargs to the blocking prompt.
713        """
714        job = self.get_hidden_job(name, debug=debug)
715        return job.get_prompt_kwargs(debug=debug)
716
717    def get_job_rotating_file(self, name: str, debug: bool = False):
718        """
719        Return a `RotatingFile` for the job's log output.
720        """
721        from meerschaum.utils.daemon import RotatingFile
722        service_logs_path = self.get_service_logs_path(name, debug=debug)
723        return RotatingFile(service_logs_path)
724
725    async def monitor_logs_async(
726        self,
727        name: str,
728        *args,
729        debug: bool = False,
730        **kwargs
731    ):
732        """
733        Monitor a job's output.
734        """
735        import meerschaum.config.paths as paths
736        job = self.get_hidden_job(name, debug=debug)
737        kwargs.update({
738            '_logs_path': paths.SYSTEMD_LOGS_RESOURCES_PATH,
739            '_log': self.get_job_rotating_file(name, debug=debug),
740            '_stdin_file': self.get_job_stdin_file(name, debug=debug),
741            'debug': debug,
742        })
743        await job.monitor_logs_async(*args, **kwargs)
744
745    def monitor_logs(self, *args, **kwargs):
746        """
747        Monitor a job's output.
748        """
749        asyncio.run(self.monitor_logs_async(*args, **kwargs))

Execute Meerschaum jobs via systemd.

def get_job_names(self, debug: bool = False) -> List[str]:
36    def get_job_names(self, debug: bool = False) -> List[str]:
37        """
38        Return a list of existing jobs, including hidden ones.
39        """
40        import meerschaum.config.paths as paths
41        return [
42            service_name[len('mrsm-'):(-1 * len('.service'))]
43            for service_name in os.listdir(paths.SYSTEMD_USER_RESOURCES_PATH)
44            if (
45                service_name.startswith('mrsm-')
46                and service_name.endswith('.service')
47                ### Check for broken symlinks.
48                and (paths.SYSTEMD_USER_RESOURCES_PATH / service_name).exists()
49            )
50        ]

Return a list of existing jobs, including hidden ones.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
52    def get_job_exists(self, name: str, debug: bool = False) -> bool:
53        """
54        Return whether a job exists.
55        """
56        user_services = self.get_job_names(debug=debug)
57        if debug:
58            dprint(f'Existing services: {user_services}')
59        return name in user_services

Return whether a job exists.

def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
61    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
62        """
63        Return a dictionary of `systemd` Jobs (including hidden jobs).
64        """
65        user_services = self.get_job_names(debug=debug)
66        jobs = {
67            name: Job(name, executor_keys=str(self))
68            for name in user_services
69        }
70        return {
71            name: job
72            for name, job in jobs.items()
73        }

Return a dictionary of systemd Jobs (including hidden jobs).

def get_service_name(self, name: str, debug: bool = False) -> str:
75    def get_service_name(self, name: str, debug: bool = False) -> str:
76        """
77        Return a job's service name.
78        """
79        return f"mrsm-{name.replace(' ', '-')}.service"

Return a job's service name.

def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
81    def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
82        """
83        Return the path for the job's files under the root directory.
84        """
85        import meerschaum.config.paths as paths
86        return paths.SYSTEMD_JOBS_RESOURCES_PATH / name

Return the path for the job's files under the root directory.

def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 95    def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 96        """
 97        Return the path to a Job's service file.
 98        """
 99        return (
100            self.get_service_job_path(name, debug=debug)
101            / self.get_service_name(name, debug=debug)
102        )

Return the path to a Job's service file.

def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
104    def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
105        """
106        Return the path to direct service logs to.
107        """
108        import meerschaum.config.paths as paths
109        return (
110            paths.SYSTEMD_LOGS_RESOURCES_PATH
111            / (self.get_service_name(name, debug=debug) + '.log')
112        )

Return the path to direct service logs to.

def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
114    def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
115        """
116        Return the path to the FIFO file.
117        """
118        return (
119            self.get_service_job_path(name, debug=debug)
120            / (self.get_service_name(name, debug=debug) + '.stdin')
121        )

Return the path to the FIFO file.

def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
123    def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
124        """
125        Return the path to the result file.
126        """
127        return (
128            self.get_service_job_path(name, debug=debug)
129            / (self.get_service_name(name, debug=debug) + '.result.json')
130        )

Return the path to the result file.

def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
132    def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
133        """
134        Return the contents of the unit file.
135        """
136        service_logs_path = self.get_service_logs_path(name, debug=debug)
137        socket_path = self.get_socket_path(name, debug=debug)
138        result_path = self.get_result_path(name, debug=debug)
139        job = self.get_hidden_job(name, debug=debug)
140
141        sysargs_str = shlex.join(sysargs)
142        exec_str = f'{sys.executable} -m meerschaum {sysargs_str}'
143        mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()])
144        mrsm_env_vars = {
145            key: val
146            for key, val in os.environ.items()
147            if key in mrsm_env_var_names
148        }
149
150        ### Add new environment variables for the service process.
151        mrsm_env_vars.update({
152            STATIC_CONFIG['environment']['daemon_id']: name,
153            STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(),
154            STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(),
155            STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(),
156            STATIC_CONFIG['environment']['systemd_delete_job']: (
157                '1'
158                if job.delete_after_completion
159                else '0'
160            ),
161        })
162
163        ### Allow for user-defined environment variables.
164        mrsm_env_vars.update(job.env)
165
166        environment_lines = [
167            f"Environment={key}={shlex.quote(str(val))}"
168            for key, val in mrsm_env_vars.items()
169        ]
170        environment_str = '\n'.join(environment_lines)
171        service_name = self.get_service_name(name, debug=debug)
172
173        service_text = (
174            "[Unit]\n"
175            f"Description=Run the job '{name}'\n"
176            "\n"
177            "[Service]\n"
178            f"ExecStart={exec_str}\n"
179            "KillSignal=SIGTERM\n"
180            "Restart=always\n"
181            "RestartPreventExitStatus=0\n"
182            f"SyslogIdentifier={service_name}\n"
183            f"{environment_str}\n"
184            "\n"
185            "[Install]\n"
186            "WantedBy=default.target\n"
187        )
188        return service_text

Return the contents of the unit file.

def get_socket_file_text(self, name: str, debug: bool = False) -> str:
190    def get_socket_file_text(self, name: str, debug: bool = False) -> str:
191        """
192        Return the contents of the socket file.
193        """
194        service_name = self.get_service_name(name, debug=debug)
195        socket_path = self.get_socket_path(name, debug=debug)
196        socket_text = (
197            "[Unit]\n"
198            f"BindsTo={service_name}\n"
199            "\n"
200            "[Socket]\n"
201            f"ListenFIFO={socket_path.as_posix()}\n"
202            "FileDescriptorName=stdin\n"
203            "RemoveOnStop=true\n"
204            "SocketMode=0660\n"
205        )
206        return socket_text

Return the contents of the socket file.

def get_hidden_job( self, name: str, sysargs: Optional[List[str]] = None, properties: Optional[Dict[str, Any]] = None, debug: bool = False):
208    def get_hidden_job(
209        self,
210        name: str,
211        sysargs: Optional[List[str]] = None,
212        properties: Optional[Dict[str, Any]] = None,
213        debug: bool = False,
214    ):
215        """
216        Return the hidden "sister" job to store a job's parameters.
217        """
218        job = Job(
219            name,
220            sysargs,
221            executor_keys='local',
222            _properties=properties,
223            _rotating_log=self.get_job_rotating_file(name, debug=debug),
224            _stdin_file=self.get_job_stdin_file(name, debug=debug),
225            _status_hook=partial(self.get_job_status, name),
226            _result_hook=partial(self.get_job_result, name),
227            _externally_managed=True,
228        )
229        return job

Return the hidden "sister" job to store a job's parameters.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
232    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
233        """
234        Return metadata about a job.
235        """
236        now = time.perf_counter()
237
238        if '_jobs_metadata' not in self.__dict__:
239            self._jobs_metadata: Dict[str, Any] = {}
240
241        if name in self._jobs_metadata:
242            ts = self._jobs_metadata[name].get('timestamp', None)
243
244            if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS:
245                if debug:
246                    dprint(f"Retuning cached metadata for job '{name}'.")
247                return self._jobs_metadata[name]['metadata']
248
249        metadata = {
250            'sysargs': self.get_job_sysargs(name, debug=debug),
251            'result': self.get_job_result(name, debug=debug),
252            'restart': self.get_job_restart(name, debug=debug),
253            'daemon': {
254                'status': self.get_job_status(name, debug=debug),
255                'pid': self.get_job_pid(name, debug=debug),
256                'properties': self.get_job_properties(name, debug=debug),
257            },
258        }
259        self._jobs_metadata[name] = {
260            'timestamp': now,
261            'metadata': metadata,
262        }
263        return metadata

Return metadata about a job.

def get_job_restart(self, name: str, debug: bool = False) -> bool:
265    def get_job_restart(self, name: str, debug: bool = False) -> bool:
266        """
267        Return whether a job restarts.
268        """
269        from meerschaum.jobs._Job import RESTART_FLAGS
270        sysargs = self.get_job_sysargs(name, debug=debug)
271        if not sysargs:
272            return False
273
274        for flag in RESTART_FLAGS:
275            if flag in sysargs:
276                return True
277
278        return False

Return whether a job restarts.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
280    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
281        """
282        Return the properties for a job.
283        """
284        job = self.get_hidden_job(name, debug=debug)
285        return {
286            k: v for k, v in job.daemon.properties.items()
287            if k != 'externally_managed'
288        }

Return the properties for a job.

def get_job_process(self, name: str, debug: bool = False):
290    def get_job_process(self, name: str, debug: bool = False):
291        """
292        Return a `psutil.Process` for the job's PID.
293        """
294        pid = self.get_job_pid(name, debug=debug)
295        if pid is None:
296            return None
297
298        psutil = mrsm.attempt_import('psutil')
299        try:
300            return psutil.Process(pid)
301        except Exception:
302            return None

Return a psutil.Process for the job's PID.

def get_job_status(self, name: str, debug: bool = False) -> str:
304    def get_job_status(self, name: str, debug: bool = False) -> str:
305        """
306        Return the job's service status.
307        """
308        output = self.run_command(
309            ['is-active', self.get_service_name(name, debug=debug)],
310            as_output=True,
311            debug=debug,
312        )
313
314        if output == 'activating':
315            return 'running'
316
317        if output == 'active':
318            process = self.get_job_process(name, debug=debug)
319            if process is None:
320                return 'stopped'
321
322            try:
323                if process.status() == 'stopped':
324                    return 'paused'
325            except Exception:
326                return 'stopped'
327
328            return 'running'
329
330        return 'stopped'

Return the job's service status.

def get_job_pid(self, name: str, debug: bool = False) -> Optional[int]:
332    def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]:
333        """
334        Return the job's service PID.
335        """
336        from meerschaum.utils.misc import is_int
337
338        output = self.run_command(
339            [
340                'show',
341                self.get_service_name(name, debug=debug),
342                '--property=MainPID',
343            ],
344            as_output=True,
345            debug=debug,
346        )
347        if not output.startswith('MainPID='):
348            return None
349
350        pid_str = output[len('MainPID='):]
351        if pid_str == '0':
352            return None
353
354        if is_int(pid_str):
355            return int(pid_str)
356        
357        return None

Return the job's service PID.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
359    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
360        """
361        Return when a job began running.
362        """
363        output = self.run_command(
364            [
365                'show',
366                self.get_service_name(name, debug=debug),
367                '--property=ActiveEnterTimestamp'
368            ],
369            as_output=True,
370            debug=debug,
371        )
372        if not output.startswith('ActiveEnterTimestamp'):
373            return None
374
375        dt_str = output.split('=')[-1]
376        if not dt_str:
377            return None
378
379        dateutil_parser = mrsm.attempt_import('dateutil.parser')
380        try:
381            dt = dateutil_parser.parse(dt_str)
382        except Exception as e:
383            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
384            return None
385
386        return dt.astimezone(timezone.utc).isoformat()

Return when a job began running.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
388    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
389        """
390        Return when a job began running.
391        """
392        output = self.run_command(
393            [
394                'show',
395                self.get_service_name(name, debug=debug),
396                '--property=InactiveEnterTimestamp'
397            ],
398            as_output=True,
399            debug=debug,
400        )
401        if not output.startswith('InactiveEnterTimestamp'):
402            return None
403
404        dt_str = output.split('=')[-1]
405        if not dt_str:
406            return None
407
408        dateutil_parser = mrsm.attempt_import('dateutil.parser')
409
410        try:
411            dt = dateutil_parser.parse(dt_str)
412        except Exception as e:
413            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
414            return None
415        return dt.astimezone(timezone.utc).isoformat()

Return when a job began running.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
417    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
418        """
419        Return when a job was paused.
420        """
421        job = self.get_hidden_job(name, debug=debug)
422        if self.get_job_status(name, debug=debug) != 'paused':
423            return None
424
425        stop_time = job.stop_time
426        if stop_time is None:
427            return None
428
429        return stop_time.isoformat()

Return when a job was paused.

def get_job_result(self, name: str, debug: bool = False) -> Tuple[bool, str]:
431    def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple:
432        """
433        Return the job's result SuccessTuple.
434        """
435        result_path = self.get_result_path(name, debug=debug)
436        if not result_path.exists():
437            return False, "No result available."
438
439        try:
440            with open(result_path, 'r', encoding='utf-8') as f:
441                result = json.load(f)
442        except Exception:
443            return False, f"Could not read result for Job '{name}'."
444
445        return tuple(result)

Return the job's result SuccessTuple.

def get_job_sysargs(self, name: str, debug: bool = False) -> Optional[List[str]]:
447    def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]:
448        """
449        Return the sysargs from the service file.
450        """
451        service_file_path = self.get_service_file_path(name, debug=debug)
452        if not service_file_path.exists():
453            return []
454
455        with open(service_file_path, 'r', encoding='utf-8') as f:
456            service_lines = f.readlines()
457
458        for line in service_lines:
459            if line.startswith('ExecStart='):
460                sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0]
461                return shlex.split(sysargs_str)
462
463        return []

Return the sysargs from the service file.

def run_command( self, command_args: List[str], as_output: bool = False, debug: bool = False) -> Union[Tuple[bool, str], str]:
465    def run_command(
466        self,
467        command_args: List[str],
468        as_output: bool = False,
469        debug: bool = False,
470    ) -> Union[SuccessTuple, str]:
471        """
472        Run a `systemd` command and return success.
473
474        Parameters
475        ----------
476        command_args: List[str]
477            The command to pass to `systemctl --user`.
478
479        as_output: bool, default False
480            If `True`, return the process stdout output.
481            Defaults to a `SuccessTuple`.
482
483        Returns
484        -------
485        A `SuccessTuple` indicating success or a str for the process output.
486        """
487        from meerschaum.utils.process import run_process
488
489        if command_args[:2] != ['systemctl', '--user']:
490            command_args = ['systemctl', '--user'] + command_args
491
492        if debug:
493            dprint(shlex.join(command_args))
494
495        proc = run_process(
496            command_args,
497            foreground=False,
498            as_proc=True,
499            capture_output=True,
500            text=True,
501        )
502        stdout, stderr = proc.communicate()
503        if debug:
504            dprint(f"{stdout}")
505
506        if as_output:
507            return stdout.strip()
508            
509        command_success = proc.wait() == 0
510        command_msg = (
511            "Success"
512            if command_success
513            else f"Failed to execute command `{shlex.join(command_args)}`."
514        )
515        return command_success, command_msg

Run a systemd command and return success.

Parameters
  • command_args (List[str]): The command to pass to systemctl --user.
  • as_output (bool, default False): If True, return the process stdout output. Defaults to a SuccessTuple.
Returns
  • A SuccessTuple indicating success or a str for the process output.
def get_job_stdin_file(self, name: str, debug: bool = False):
517    def get_job_stdin_file(self, name: str, debug: bool = False):
518        """
519        Return a `StdinFile` for the job.
520        """
521        from meerschaum.utils.daemon import StdinFile
522        if '_stdin_files' not in self.__dict__:
523            self._stdin_files: Dict[str, StdinFile] = {}
524
525        if name not in self._stdin_files:
526            socket_path = self.get_socket_path(name, debug=debug)
527            socket_path.parent.mkdir(parents=True, exist_ok=True)
528            self._stdin_files[name] = StdinFile(socket_path)
529
530        return self._stdin_files[name]

Return a StdinFile for the job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
532    def create_job(
533        self,
534        name: str,
535        sysargs: List[str],
536        properties: Optional[Dict[str, Any]] = None,
537        debug: bool = False,
538    ) -> SuccessTuple:
539        """
540        Create a job as a service to be run by `systemd`.
541        """
542        from meerschaum.utils.misc import make_symlink
543        service_name = self.get_service_name(name, debug=debug)
544        service_file_path = self.get_service_file_path(name, debug=debug)
545        service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug)
546        socket_stdin = self.get_job_stdin_file(name, debug=debug)
547        _ = socket_stdin.file_handler
548
549        ### Init the externally_managed file.
550        ### NOTE: We must write the pickle file in addition to the properties file.
551        job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug)
552        job._set_externally_managed()
553        pickle_success, pickle_msg = job.daemon.write_pickle()
554        if not pickle_success:
555            return pickle_success, pickle_msg
556        properties_success, properties_msg = job.daemon.write_properties()
557        if not properties_success:
558            return properties_success, properties_msg
559
560        service_file_path.parent.mkdir(parents=True, exist_ok=True)
561
562        with open(service_file_path, 'w+', encoding='utf-8') as f:
563            f.write(self.get_service_file_text(name, sysargs, debug=debug))
564
565        symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path)
566        if not symlink_success:
567            return symlink_success, symlink_msg
568
569        commands = [
570            ['daemon-reload'],
571            ['enable', service_name],
572            ['start', service_name],
573        ]
574
575        fails = 0
576        for command_list in commands:
577            command_success, command_msg = self.run_command(command_list, debug=debug)
578            if not command_success:
579                fails += 1
580
581        if fails > 1:
582            return False, "Failed to reload systemd."
583
584        return True, f"Started job '{name}' via systemd."

Create a job as a service to be run by systemd.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
586    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
587        """
588        Stop a job's service.
589        """
590        job = self.get_hidden_job(name, debug=debug)
591        job.daemon._remove_stop_file()
592
593        status = self.get_job_status(name, debug=debug)
594        if status == 'paused':
595            return self.run_command(
596                ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)],
597                debug=debug,
598            )
599
600        return self.run_command(
601            ['start', self.get_service_name(name, debug=debug)],
602            debug=debug,
603        )

Stop a job's service.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
605    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
606        """
607        Stop a job's service.
608        """
609        job = self.get_hidden_job(name, debug=debug)
610        job.daemon._write_stop_file('quit')
611        sigint_success, sigint_msg = self.run_command(
612            ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)],
613            debug=debug,
614        )
615
616        check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds')
617        loop_start = time.perf_counter()
618        timeout_seconds = get_config('jobs', 'timeout_seconds')
619        while (time.perf_counter() - loop_start) < timeout_seconds:
620            if self.get_job_status(name, debug=debug) == 'stopped':
621                return True, 'Success'
622
623            time.sleep(check_timeout_interval)
624
625        return self.run_command(
626            ['stop', self.get_service_name(name, debug=debug)],
627            debug=debug,
628        )

Stop a job's service.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
630    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
631        """
632        Pause a job's service.
633        """
634        job = self.get_hidden_job(name, debug=debug)
635        job.daemon._write_stop_file('pause')
636        return self.run_command(
637            ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)],
638            debug=debug,
639        )

Pause a job's service.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
641    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
642        """
643        Delete a job's service.
644        """
645        import meerschaum.config.paths as paths
646        job = self.get_hidden_job(name, debug=debug)
647
648        if not job.delete_after_completion:
649            _ = self.stop_job(name, debug=debug)
650            _ = self.run_command(
651                ['disable', self.get_service_name(name, debug=debug)],
652                debug=debug,
653            )
654
655        service_job_path = self.get_service_job_path(name, debug=debug)
656        try:
657            if service_job_path.exists():
658                shutil.rmtree(service_job_path)
659        except Exception as e:
660            warn(e)
661            return False, str(e)
662
663        service_logs_path = self.get_service_logs_path(name, debug=debug)
664        logs_paths = [
665            (paths.SYSTEMD_LOGS_RESOURCES_PATH / name)
666            for name in os.listdir(paths.SYSTEMD_LOGS_RESOURCES_PATH)
667            if name.startswith(service_logs_path.name + '.')
668        ]
669        file_paths = [
670            self.get_service_file_path(name, debug=debug),
671            self.get_service_symlink_file_path(name, debug=debug),
672            self.get_socket_path(name, debug=debug),
673            self.get_result_path(name, debug=debug),
674        ] + logs_paths
675
676        for path in file_paths:
677            if path.exists():
678                try:
679                    path.unlink()
680                except Exception as e:
681                    warn(e)
682                    return False, str(e)
683
684        _ = job.delete()
685
686        return self.run_command(['daemon-reload'], debug=debug)

Delete a job's service.

def get_logs(self, name: str, debug: bool = False) -> str:
688    def get_logs(self, name: str, debug: bool = False) -> str:
689        """
690        Return a job's journal logs.
691        """
692        rotating_file = self.get_job_rotating_file(name, debug=debug)
693        return rotating_file.read()

Return a job's journal logs.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
695    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
696        """
697        Return a job's stop time.
698        """
699        job = self.get_hidden_job(name, debug=debug)
700        return job.stop_time

Return a job's stop time.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
702    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
703        """
704        Return whether a job is blocking on stdin.
705        """
706        socket_path = self.get_socket_path(name, debug=debug)
707        blocking_path = socket_path.parent / (socket_path.name + '.block')
708        return blocking_path.exists()

Return whether a job is blocking on stdin.

def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
710    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
711        """
712        Return the kwargs to the blocking prompt.
713        """
714        job = self.get_hidden_job(name, debug=debug)
715        return job.get_prompt_kwargs(debug=debug)

Return the kwargs to the blocking prompt.

def get_job_rotating_file(self, name: str, debug: bool = False):
717    def get_job_rotating_file(self, name: str, debug: bool = False):
718        """
719        Return a `RotatingFile` for the job's log output.
720        """
721        from meerschaum.utils.daemon import RotatingFile
722        service_logs_path = self.get_service_logs_path(name, debug=debug)
723        return RotatingFile(service_logs_path)

Return a RotatingFile for the job's log output.

async def monitor_logs_async(self, name: str, *args, debug: bool = False, **kwargs):
725    async def monitor_logs_async(
726        self,
727        name: str,
728        *args,
729        debug: bool = False,
730        **kwargs
731    ):
732        """
733        Monitor a job's output.
734        """
735        import meerschaum.config.paths as paths
736        job = self.get_hidden_job(name, debug=debug)
737        kwargs.update({
738            '_logs_path': paths.SYSTEMD_LOGS_RESOURCES_PATH,
739            '_log': self.get_job_rotating_file(name, debug=debug),
740            '_stdin_file': self.get_job_stdin_file(name, debug=debug),
741            'debug': debug,
742        })
743        await job.monitor_logs_async(*args, **kwargs)

Monitor a job's output.

def monitor_logs(self, *args, **kwargs):
745    def monitor_logs(self, *args, **kwargs):
746        """
747        Monitor a job's output.
748        """
749        asyncio.run(self.monitor_logs_async(*args, **kwargs))

Monitor a job's output.