meerschaum.jobs.systemd

Manage meerschaum.jobs.Job via systemd.

  1#! /usr/bin/env python3
  2# vim:fenc=utf-8
  3
  4"""
  5Manage `meerschaum.jobs.Job` via `systemd`.
  6"""
  7
  8import os
  9import pathlib
 10import shlex
 11import sys
 12import asyncio
 13import json
 14import time
 15import traceback
 16import shutil
 17from datetime import datetime, timezone
 18from functools import partial
 19
 20import meerschaum as mrsm
 21from meerschaum.jobs import Job, Executor, make_executor
 22from meerschaum.utils.typing import Dict, Any, List, SuccessTuple, Union, Optional, Callable
 23from meerschaum.config import get_config
 24from meerschaum.config.static import STATIC_CONFIG
 25from meerschaum.utils.warnings import warn, dprint
 26from meerschaum._internal.arguments._parse_arguments import parse_arguments
 27
 28JOB_METADATA_CACHE_SECONDS: int = STATIC_CONFIG['api']['jobs']['metadata_cache_seconds']
 29
 30
 31@make_executor
 32class SystemdExecutor(Executor):
 33    """
 34    Execute Meerschaum jobs via `systemd`.
 35    """
 36
 37    def get_job_names(self, debug: bool = False) -> List[str]:
 38        """
 39        Return a list of existing jobs, including hidden ones.
 40        """
 41        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
 42        return [
 43            service_name[len('mrsm-'):(-1 * len('.service'))]
 44            for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH)
 45            if (
 46                service_name.startswith('mrsm-')
 47                and service_name.endswith('.service')
 48                ### Check for broken symlinks.
 49                and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists()
 50            )
 51        ]
 52
 53    def get_job_exists(self, name: str, debug: bool = False) -> bool:
 54        """
 55        Return whether a job exists.
 56        """
 57        user_services = self.get_job_names(debug=debug)
 58        if debug:
 59            dprint(f'Existing services: {user_services}')
 60        return name in user_services
 61
 62    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
 63        """
 64        Return a dictionary of `systemd` Jobs (including hidden jobs).
 65        """
 66        user_services = self.get_job_names(debug=debug)
 67        jobs = {
 68            name: Job(name, executor_keys=str(self))
 69            for name in user_services
 70        }
 71        return {
 72            name: job
 73            for name, job in jobs.items()
 74        }
 75
 76    def get_service_name(self, name: str, debug: bool = False) -> str:
 77        """
 78        Return a job's service name.
 79        """
 80        return f"mrsm-{name.replace(' ', '-')}.service"
 81
 82    def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
 83        """
 84        Return the path for the job's files under the root directory.
 85        """
 86        from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH
 87        return SYSTEMD_JOBS_RESOURCES_PATH / name
 88
 89    def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 90        """
 91        Return the path to where to create the service symlink.
 92        """
 93        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
 94        return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug)
 95
 96    def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 97        """
 98        Return the path to a Job's service file.
 99        """
100        return (
101            self.get_service_job_path(name, debug=debug)
102            / self.get_service_name(name, debug=debug)
103        )
104
105    def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
106        """
107        Return the path to direct service logs to.
108        """
109        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
110        return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log')
111
112    def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
113        """
114        Return the path to the FIFO file.
115        """
116        return (
117            self.get_service_job_path(name, debug=debug)
118            / (self.get_service_name(name, debug=debug) + '.stdin')
119        )
120
121    def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
122        """
123        Return the path to the result file.
124        """
125        return (
126            self.get_service_job_path(name, debug=debug)
127            / (self.get_service_name(name, debug=debug) + '.result.json')
128        )
129
130    def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
131        """
132        Return the contents of the unit file.
133        """
134        service_logs_path = self.get_service_logs_path(name, debug=debug)
135        socket_path = self.get_socket_path(name, debug=debug)
136        result_path = self.get_result_path(name, debug=debug)
137        job = self.get_hidden_job(name, debug=debug)
138
139        sysargs_str = shlex.join(sysargs)
140        exec_str = f'{sys.executable} -m meerschaum {sysargs_str}'
141        mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()])
142        mrsm_env_vars = {
143            key: val
144            for key, val in os.environ.items()
145            if key in mrsm_env_var_names
146        }
147
148        ### Add new environment variables for the service process.
149        mrsm_env_vars.update({
150            STATIC_CONFIG['environment']['daemon_id']: name,
151            STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(),
152            STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(),
153            STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(),
154            STATIC_CONFIG['environment']['systemd_delete_job']: (
155                '1'
156                if job.delete_after_completion
157                else '0',
158            ),
159        })
160
161        ### Allow for user-defined environment variables.
162        mrsm_env_vars.update(job.env)
163
164        environment_lines = [
165            f"Environment={key}={shlex.quote(str(val))}"
166            for key, val in mrsm_env_vars.items()
167        ]
168        environment_str = '\n'.join(environment_lines)
169        service_name = self.get_service_name(name, debug=debug)
170
171        service_text = (
172            "[Unit]\n"
173            f"Description=Run the job '{name}'\n"
174            "\n"
175            "[Service]\n"
176            f"ExecStart={exec_str}\n"
177            "KillSignal=SIGTERM\n"
178            "Restart=always\n"
179            "RestartPreventExitStatus=0\n"
180            f"SyslogIdentifier={service_name}\n"
181            f"{environment_str}\n"
182            "\n"
183            "[Install]\n"
184            "WantedBy=default.target\n"
185        )
186        return service_text
187
188    def get_socket_file_text(self, name: str, debug: bool = False) -> str:
189        """
190        Return the contents of the socket file.
191        """
192        service_name = self.get_service_name(name, debug=debug)
193        socket_path = self.get_socket_path(name, debug=debug)
194        socket_text = (
195            "[Unit]\n"
196            f"BindsTo={service_name}\n"
197            "\n"
198            "[Socket]\n"
199            f"ListenFIFO={socket_path.as_posix()}\n"
200            "FileDescriptorName=stdin\n"
201            "RemoveOnStop=true\n"
202            "SocketMode=0660\n"
203        )
204        return socket_text
205
206    def get_hidden_job(
207        self,
208        name: str,
209        sysargs: Optional[List[str]] = None,
210        properties: Optional[Dict[str, Any]] = None,
211        debug: bool = False,
212    ):
213        """
214        Return the hidden "sister" job to store a job's parameters.
215        """
216        job = Job(
217            name,
218            sysargs,
219            executor_keys='local',
220            _properties=properties,
221            _rotating_log=self.get_job_rotating_file(name, debug=debug),
222            _stdin_file=self.get_job_stdin_file(name, debug=debug),
223            _status_hook=partial(self.get_job_status, name),
224            _result_hook=partial(self.get_job_result, name),
225            _externally_managed=True,
226        )
227        return job
228
229
230    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
231        """
232        Return metadata about a job.
233        """
234        now = time.perf_counter()
235
236        if '_jobs_metadata' not in self.__dict__:
237            self._jobs_metadata: Dict[str, Any] = {}
238
239        if name in self._jobs_metadata:
240            ts = self._jobs_metadata[name].get('timestamp', None)
241
242            if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS:
243                if debug:
244                    dprint(f"Retuning cached metadata for job '{name}'.")
245                return self._jobs_metadata[name]['metadata']
246
247        metadata = {
248            'sysargs': self.get_job_sysargs(name, debug=debug),
249            'result': self.get_job_result(name, debug=debug),
250            'restart': self.get_job_restart(name, debug=debug),
251            'daemon': {
252                'status': self.get_job_status(name, debug=debug),
253                'pid': self.get_job_pid(name, debug=debug),
254                'properties': self.get_job_properties(name, debug=debug),
255            },
256        }
257        self._jobs_metadata[name] = {
258            'timestamp': now,
259            'metadata': metadata,
260        }
261        return metadata
262
263    def get_job_restart(self, name: str, debug: bool = False) -> bool:
264        """
265        Return whether a job restarts.
266        """
267        from meerschaum.jobs._Job import RESTART_FLAGS
268        sysargs = self.get_job_sysargs(name, debug=debug)
269        if not sysargs:
270            return False
271
272        for flag in RESTART_FLAGS:
273            if flag in sysargs:
274                return True
275
276        return False
277
278    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
279        """
280        Return the properties for a job.
281        """
282        job = self.get_hidden_job(name, debug=debug)
283        return {
284            k: v for k, v in job.daemon.properties.items()
285            if k != 'externally_managed'
286        }
287
288    def get_job_process(self, name: str, debug: bool = False):
289        """
290        Return a `psutil.Process` for the job's PID.
291        """
292        pid = self.get_job_pid(name, debug=debug)
293        if pid is None:
294            return None
295
296        psutil = mrsm.attempt_import('psutil')
297        try:
298            return psutil.Process(pid)
299        except Exception:
300            return None
301
302    def get_job_status(self, name: str, debug: bool = False) -> str:
303        """
304        Return the job's service status.
305        """
306        output = self.run_command(
307            ['is-active', self.get_service_name(name, debug=debug)],
308            as_output=True,
309            debug=debug,
310        )
311
312        if output == 'activating':
313            return 'running'
314
315        if output == 'active':
316            process = self.get_job_process(name, debug=debug)
317            if process is None:
318                return 'stopped'
319
320            try:
321                if process.status() == 'stopped':
322                    return 'paused'
323            except Exception:
324                return 'stopped'
325
326            return 'running'
327
328        return 'stopped'
329
330    def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]:
331        """
332        Return the job's service PID.
333        """
334        from meerschaum.utils.misc import is_int
335
336        output = self.run_command(
337            [
338                'show',
339                self.get_service_name(name, debug=debug),
340                '--property=MainPID',
341            ],
342            as_output=True,
343            debug=debug,
344        )
345        if not output.startswith('MainPID='):
346            return None
347
348        pid_str = output[len('MainPID='):]
349        if pid_str == '0':
350            return None
351
352        if is_int(pid_str):
353            return int(pid_str)
354        
355        return None
356
357    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
358        """
359        Return when a job began running.
360        """
361        output = self.run_command(
362            [
363                'show',
364                self.get_service_name(name, debug=debug),
365                '--property=ActiveEnterTimestamp'
366            ],
367            as_output=True,
368            debug=debug,
369        )
370        if not output.startswith('ActiveEnterTimestamp'):
371            return None
372
373        dt_str = output.split('=')[-1]
374        if not dt_str:
375            return None
376
377        dateutil_parser = mrsm.attempt_import('dateutil.parser')
378        try:
379            dt = dateutil_parser.parse(dt_str)
380        except Exception as e:
381            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
382            return None
383
384        return dt.astimezone(timezone.utc).isoformat()
385
386    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
387        """
388        Return when a job began running.
389        """
390        output = self.run_command(
391            [
392                'show',
393                self.get_service_name(name, debug=debug),
394                '--property=InactiveEnterTimestamp'
395            ],
396            as_output=True,
397            debug=debug,
398        )
399        if not output.startswith('InactiveEnterTimestamp'):
400            return None
401
402        dt_str = output.split('=')[-1]
403        if not dt_str:
404            return None
405
406        dateutil_parser = mrsm.attempt_import('dateutil.parser')
407
408        try:
409            dt = dateutil_parser.parse(dt_str)
410        except Exception as e:
411            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
412            return None
413        return dt.astimezone(timezone.utc).isoformat()
414
415    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
416        """
417        Return when a job was paused.
418        """
419        job = self.get_hidden_job(name, debug=debug)
420        if self.get_job_status(name, debug=debug) != 'paused':
421            return None
422
423        stop_time = job.stop_time
424        if stop_time is None:
425            return None
426
427        return stop_time.isoformat()
428
429    def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple:
430        """
431        Return the job's result SuccessTuple.
432        """
433        result_path = self.get_result_path(name, debug=debug)
434        if not result_path.exists():
435            return False, "No result available."
436
437        try:
438            with open(result_path, 'r', encoding='utf-8') as f:
439                result = json.load(f)
440        except Exception:
441            return False, f"Could not read result for Job '{name}'."
442
443        return tuple(result)
444
445    def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]:
446        """
447        Return the sysargs from the service file.
448        """
449        service_file_path = self.get_service_file_path(name, debug=debug)
450        if not service_file_path.exists():
451            return []
452
453        with open(service_file_path, 'r', encoding='utf-8') as f:
454            service_lines = f.readlines()
455
456        for line in service_lines:
457            if line.startswith('ExecStart='):
458                sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0]
459                return shlex.split(sysargs_str)
460
461        return []
462
463    def run_command(
464        self,
465        command_args: List[str],
466        as_output: bool = False,
467        debug: bool = False,
468    ) -> Union[SuccessTuple, str]:
469        """
470        Run a `systemd` command and return success.
471
472        Parameters
473        ----------
474        command_args: List[str]
475            The command to pass to `systemctl --user`.
476
477        as_output: bool, default False
478            If `True`, return the process stdout output.
479            Defaults to a `SuccessTuple`.
480
481        Returns
482        -------
483        A `SuccessTuple` indicating success or a str for the process output.
484        """
485        from meerschaum.utils.process import run_process
486
487        if command_args[:2] != ['systemctl', '--user']:
488            command_args = ['systemctl', '--user'] + command_args
489
490        if debug:
491            dprint(shlex.join(command_args))
492
493        proc = run_process(
494            command_args,
495            foreground=False,
496            as_proc=True,
497            capture_output=True,
498            text=True,
499        )
500        stdout, stderr = proc.communicate()
501        if debug:
502            dprint(f"{stdout}")
503
504        if as_output:
505            return stdout.strip()
506            
507        command_success = proc.wait() == 0
508        command_msg = (
509            "Success"
510            if command_success
511            else f"Failed to execute command `{shlex.join(command_args)}`."
512        )
513        return command_success, command_msg
514
515    def get_job_stdin_file(self, name: str, debug: bool = False):
516        """
517        Return a `StdinFile` for the job.
518        """
519        from meerschaum.utils.daemon import StdinFile
520        if '_stdin_files' not in self.__dict__:
521            self._stdin_files: Dict[str, StdinFile] = {}
522
523        if name not in self._stdin_files:
524            socket_path = self.get_socket_path(name, debug=debug)
525            socket_path.parent.mkdir(parents=True, exist_ok=True)
526            self._stdin_files[name] = StdinFile(socket_path)
527
528        return self._stdin_files[name]
529
530    def create_job(
531        self,
532        name: str,
533        sysargs: List[str],
534        properties: Optional[Dict[str, Any]] = None,
535        debug: bool = False,
536    ) -> SuccessTuple:
537        """
538        Create a job as a service to be run by `systemd`.
539        """
540        from meerschaum.utils.misc import make_symlink
541        service_name = self.get_service_name(name, debug=debug)
542        service_file_path = self.get_service_file_path(name, debug=debug)
543        service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug)
544        socket_stdin = self.get_job_stdin_file(name, debug=debug)
545        _ = socket_stdin.file_handler
546
547        ### Init the externally_managed file.
548        ### NOTE: We must write the pickle file in addition to the properties file.
549        job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug)
550        job._set_externally_managed()
551        pickle_success, pickle_msg = job.daemon.write_pickle()
552        if not pickle_success:
553            return pickle_success, pickle_msg
554        properties_success, properties_msg = job.daemon.write_properties()
555        if not properties_success:
556            return properties_success, properties_msg
557
558        service_file_path.parent.mkdir(parents=True, exist_ok=True)
559
560        with open(service_file_path, 'w+', encoding='utf-8') as f:
561            f.write(self.get_service_file_text(name, sysargs, debug=debug))
562
563        symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path)
564        if not symlink_success:
565            return symlink_success, symlink_msg
566
567        commands = [
568            ['daemon-reload'],
569            ['enable', service_name],
570            ['start', service_name],
571        ]
572
573        fails = 0
574        for command_list in commands:
575            command_success, command_msg = self.run_command(command_list, debug=debug)
576            if not command_success:
577                fails += 1
578
579        if fails > 1:
580            return False, "Failed to reload systemd."
581
582        return True, f"Started job '{name}' via systemd."
583
584    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
585        """
586        Stop a job's service.
587        """
588        job = self.get_hidden_job(name, debug=debug)
589        job.daemon._remove_stop_file()
590
591        status = self.get_job_status(name, debug=debug)
592        if status == 'paused':
593            return self.run_command(
594                ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)],
595                debug=debug,
596            )
597
598        return self.run_command(
599            ['start', self.get_service_name(name, debug=debug)],
600            debug=debug,
601        )
602
603    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
604        """
605        Stop a job's service.
606        """
607        job = self.get_hidden_job(name, debug=debug)
608        job.daemon._write_stop_file('quit')
609        sigint_success, sigint_msg = self.run_command(
610            ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)],
611            debug=debug,
612        )
613
614        check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds')
615        loop_start = time.perf_counter()
616        timeout_seconds = get_config('jobs', 'timeout_seconds')
617        while (time.perf_counter() - loop_start) < timeout_seconds:
618            if self.get_job_status(name, debug=debug) == 'stopped':
619                return True, 'Success'
620
621            time.sleep(check_timeout_interval)
622
623        return self.run_command(
624            ['stop', self.get_service_name(name, debug=debug)],
625            debug=debug,
626        )
627
628    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
629        """
630        Pause a job's service.
631        """
632        job = self.get_hidden_job(name, debug=debug)
633        job.daemon._write_stop_file('pause')
634        return self.run_command(
635            ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)],
636            debug=debug,
637        )
638
639    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
640        """
641        Delete a job's service.
642        """
643        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
644        job = self.get_hidden_job(name, debug=debug)
645
646        if not job.delete_after_completion:
647            _ = self.stop_job(name, debug=debug)
648            _ = self.run_command(
649                ['disable', self.get_service_name(name, debug=debug)],
650                debug=debug,
651            )
652
653        service_job_path = self.get_service_job_path(name, debug=debug)
654        try:
655            if service_job_path.exists():
656                shutil.rmtree(service_job_path)
657        except Exception as e:
658            warn(e)
659            return False, str(e)
660
661        service_logs_path = self.get_service_logs_path(name, debug=debug)
662        logs_paths = [
663            (SYSTEMD_LOGS_RESOURCES_PATH / name)
664            for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH)
665            if name.startswith(service_logs_path.name + '.')
666        ]
667        paths = [
668            self.get_service_file_path(name, debug=debug),
669            self.get_service_symlink_file_path(name, debug=debug),
670            self.get_socket_path(name, debug=debug),
671            self.get_result_path(name, debug=debug),
672        ] + logs_paths
673
674        for path in paths:
675            if path.exists():
676                try:
677                    path.unlink()
678                except Exception as e:
679                    warn(e)
680                    return False, str(e)
681
682        _ = job.delete()
683
684        return self.run_command(['daemon-reload'], debug=debug)
685
686    def get_logs(self, name: str, debug: bool = False) -> str:
687        """
688        Return a job's journal logs.
689        """
690        rotating_file = self.get_job_rotating_file(name, debug=debug)
691        return rotating_file.read()
692
693    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
694        """
695        Return a job's stop time.
696        """
697        job = self.get_hidden_job(name, debug=debug)
698        return job.stop_time
699
700    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
701        """
702        Return whether a job is blocking on stdin.
703        """
704        socket_path = self.get_socket_path(name, debug=debug)
705        blocking_path = socket_path.parent / (socket_path.name + '.block')
706        return blocking_path.exists()
707
708    def get_job_rotating_file(self, name: str, debug: bool = False):
709        """
710        Return a `RotatingFile` for the job's log output.
711        """
712        from meerschaum.utils.daemon import RotatingFile
713        service_logs_path = self.get_service_logs_path(name, debug=debug)
714        return RotatingFile(service_logs_path)
715
716    async def monitor_logs_async(
717        self,
718        name: str,
719        *args,
720        debug: bool = False,
721        **kwargs
722    ):
723        """
724        Monitor a job's output.
725        """
726        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
727        job = self.get_hidden_job(name, debug=debug)
728        kwargs.update({
729            '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH,
730            '_log': self.get_job_rotating_file(name, debug=debug),
731            '_stdin_file': self.get_job_stdin_file(name, debug=debug),
732            'debug': debug,
733        })
734        await job.monitor_logs_async(*args, **kwargs)
735
736    def monitor_logs(self, *args, **kwargs):
737        """
738        Monitor a job's output.
739        """
740        asyncio.run(self.monitor_logs_async(*args, **kwargs))
JOB_METADATA_CACHE_SECONDS: int = 5
@make_executor
class SystemdExecutor(meerschaum.jobs._Executor.Executor):
 32@make_executor
 33class SystemdExecutor(Executor):
 34    """
 35    Execute Meerschaum jobs via `systemd`.
 36    """
 37
 38    def get_job_names(self, debug: bool = False) -> List[str]:
 39        """
 40        Return a list of existing jobs, including hidden ones.
 41        """
 42        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
 43        return [
 44            service_name[len('mrsm-'):(-1 * len('.service'))]
 45            for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH)
 46            if (
 47                service_name.startswith('mrsm-')
 48                and service_name.endswith('.service')
 49                ### Check for broken symlinks.
 50                and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists()
 51            )
 52        ]
 53
 54    def get_job_exists(self, name: str, debug: bool = False) -> bool:
 55        """
 56        Return whether a job exists.
 57        """
 58        user_services = self.get_job_names(debug=debug)
 59        if debug:
 60            dprint(f'Existing services: {user_services}')
 61        return name in user_services
 62
 63    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
 64        """
 65        Return a dictionary of `systemd` Jobs (including hidden jobs).
 66        """
 67        user_services = self.get_job_names(debug=debug)
 68        jobs = {
 69            name: Job(name, executor_keys=str(self))
 70            for name in user_services
 71        }
 72        return {
 73            name: job
 74            for name, job in jobs.items()
 75        }
 76
 77    def get_service_name(self, name: str, debug: bool = False) -> str:
 78        """
 79        Return a job's service name.
 80        """
 81        return f"mrsm-{name.replace(' ', '-')}.service"
 82
 83    def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
 84        """
 85        Return the path for the job's files under the root directory.
 86        """
 87        from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH
 88        return SYSTEMD_JOBS_RESOURCES_PATH / name
 89
 90    def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 91        """
 92        Return the path to where to create the service symlink.
 93        """
 94        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
 95        return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug)
 96
 97    def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 98        """
 99        Return the path to a Job's service file.
100        """
101        return (
102            self.get_service_job_path(name, debug=debug)
103            / self.get_service_name(name, debug=debug)
104        )
105
106    def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
107        """
108        Return the path to direct service logs to.
109        """
110        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
111        return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log')
112
113    def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
114        """
115        Return the path to the FIFO file.
116        """
117        return (
118            self.get_service_job_path(name, debug=debug)
119            / (self.get_service_name(name, debug=debug) + '.stdin')
120        )
121
122    def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
123        """
124        Return the path to the result file.
125        """
126        return (
127            self.get_service_job_path(name, debug=debug)
128            / (self.get_service_name(name, debug=debug) + '.result.json')
129        )
130
131    def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
132        """
133        Return the contents of the unit file.
134        """
135        service_logs_path = self.get_service_logs_path(name, debug=debug)
136        socket_path = self.get_socket_path(name, debug=debug)
137        result_path = self.get_result_path(name, debug=debug)
138        job = self.get_hidden_job(name, debug=debug)
139
140        sysargs_str = shlex.join(sysargs)
141        exec_str = f'{sys.executable} -m meerschaum {sysargs_str}'
142        mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()])
143        mrsm_env_vars = {
144            key: val
145            for key, val in os.environ.items()
146            if key in mrsm_env_var_names
147        }
148
149        ### Add new environment variables for the service process.
150        mrsm_env_vars.update({
151            STATIC_CONFIG['environment']['daemon_id']: name,
152            STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(),
153            STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(),
154            STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(),
155            STATIC_CONFIG['environment']['systemd_delete_job']: (
156                '1'
157                if job.delete_after_completion
158                else '0',
159            ),
160        })
161
162        ### Allow for user-defined environment variables.
163        mrsm_env_vars.update(job.env)
164
165        environment_lines = [
166            f"Environment={key}={shlex.quote(str(val))}"
167            for key, val in mrsm_env_vars.items()
168        ]
169        environment_str = '\n'.join(environment_lines)
170        service_name = self.get_service_name(name, debug=debug)
171
172        service_text = (
173            "[Unit]\n"
174            f"Description=Run the job '{name}'\n"
175            "\n"
176            "[Service]\n"
177            f"ExecStart={exec_str}\n"
178            "KillSignal=SIGTERM\n"
179            "Restart=always\n"
180            "RestartPreventExitStatus=0\n"
181            f"SyslogIdentifier={service_name}\n"
182            f"{environment_str}\n"
183            "\n"
184            "[Install]\n"
185            "WantedBy=default.target\n"
186        )
187        return service_text
188
189    def get_socket_file_text(self, name: str, debug: bool = False) -> str:
190        """
191        Return the contents of the socket file.
192        """
193        service_name = self.get_service_name(name, debug=debug)
194        socket_path = self.get_socket_path(name, debug=debug)
195        socket_text = (
196            "[Unit]\n"
197            f"BindsTo={service_name}\n"
198            "\n"
199            "[Socket]\n"
200            f"ListenFIFO={socket_path.as_posix()}\n"
201            "FileDescriptorName=stdin\n"
202            "RemoveOnStop=true\n"
203            "SocketMode=0660\n"
204        )
205        return socket_text
206
207    def get_hidden_job(
208        self,
209        name: str,
210        sysargs: Optional[List[str]] = None,
211        properties: Optional[Dict[str, Any]] = None,
212        debug: bool = False,
213    ):
214        """
215        Return the hidden "sister" job to store a job's parameters.
216        """
217        job = Job(
218            name,
219            sysargs,
220            executor_keys='local',
221            _properties=properties,
222            _rotating_log=self.get_job_rotating_file(name, debug=debug),
223            _stdin_file=self.get_job_stdin_file(name, debug=debug),
224            _status_hook=partial(self.get_job_status, name),
225            _result_hook=partial(self.get_job_result, name),
226            _externally_managed=True,
227        )
228        return job
229
230
231    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
232        """
233        Return metadata about a job.
234        """
235        now = time.perf_counter()
236
237        if '_jobs_metadata' not in self.__dict__:
238            self._jobs_metadata: Dict[str, Any] = {}
239
240        if name in self._jobs_metadata:
241            ts = self._jobs_metadata[name].get('timestamp', None)
242
243            if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS:
244                if debug:
245                    dprint(f"Retuning cached metadata for job '{name}'.")
246                return self._jobs_metadata[name]['metadata']
247
248        metadata = {
249            'sysargs': self.get_job_sysargs(name, debug=debug),
250            'result': self.get_job_result(name, debug=debug),
251            'restart': self.get_job_restart(name, debug=debug),
252            'daemon': {
253                'status': self.get_job_status(name, debug=debug),
254                'pid': self.get_job_pid(name, debug=debug),
255                'properties': self.get_job_properties(name, debug=debug),
256            },
257        }
258        self._jobs_metadata[name] = {
259            'timestamp': now,
260            'metadata': metadata,
261        }
262        return metadata
263
264    def get_job_restart(self, name: str, debug: bool = False) -> bool:
265        """
266        Return whether a job restarts.
267        """
268        from meerschaum.jobs._Job import RESTART_FLAGS
269        sysargs = self.get_job_sysargs(name, debug=debug)
270        if not sysargs:
271            return False
272
273        for flag in RESTART_FLAGS:
274            if flag in sysargs:
275                return True
276
277        return False
278
279    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
280        """
281        Return the properties for a job.
282        """
283        job = self.get_hidden_job(name, debug=debug)
284        return {
285            k: v for k, v in job.daemon.properties.items()
286            if k != 'externally_managed'
287        }
288
289    def get_job_process(self, name: str, debug: bool = False):
290        """
291        Return a `psutil.Process` for the job's PID.
292        """
293        pid = self.get_job_pid(name, debug=debug)
294        if pid is None:
295            return None
296
297        psutil = mrsm.attempt_import('psutil')
298        try:
299            return psutil.Process(pid)
300        except Exception:
301            return None
302
303    def get_job_status(self, name: str, debug: bool = False) -> str:
304        """
305        Return the job's service status.
306        """
307        output = self.run_command(
308            ['is-active', self.get_service_name(name, debug=debug)],
309            as_output=True,
310            debug=debug,
311        )
312
313        if output == 'activating':
314            return 'running'
315
316        if output == 'active':
317            process = self.get_job_process(name, debug=debug)
318            if process is None:
319                return 'stopped'
320
321            try:
322                if process.status() == 'stopped':
323                    return 'paused'
324            except Exception:
325                return 'stopped'
326
327            return 'running'
328
329        return 'stopped'
330
331    def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]:
332        """
333        Return the job's service PID.
334        """
335        from meerschaum.utils.misc import is_int
336
337        output = self.run_command(
338            [
339                'show',
340                self.get_service_name(name, debug=debug),
341                '--property=MainPID',
342            ],
343            as_output=True,
344            debug=debug,
345        )
346        if not output.startswith('MainPID='):
347            return None
348
349        pid_str = output[len('MainPID='):]
350        if pid_str == '0':
351            return None
352
353        if is_int(pid_str):
354            return int(pid_str)
355        
356        return None
357
358    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
359        """
360        Return when a job began running.
361        """
362        output = self.run_command(
363            [
364                'show',
365                self.get_service_name(name, debug=debug),
366                '--property=ActiveEnterTimestamp'
367            ],
368            as_output=True,
369            debug=debug,
370        )
371        if not output.startswith('ActiveEnterTimestamp'):
372            return None
373
374        dt_str = output.split('=')[-1]
375        if not dt_str:
376            return None
377
378        dateutil_parser = mrsm.attempt_import('dateutil.parser')
379        try:
380            dt = dateutil_parser.parse(dt_str)
381        except Exception as e:
382            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
383            return None
384
385        return dt.astimezone(timezone.utc).isoformat()
386
387    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
388        """
389        Return when a job began running.
390        """
391        output = self.run_command(
392            [
393                'show',
394                self.get_service_name(name, debug=debug),
395                '--property=InactiveEnterTimestamp'
396            ],
397            as_output=True,
398            debug=debug,
399        )
400        if not output.startswith('InactiveEnterTimestamp'):
401            return None
402
403        dt_str = output.split('=')[-1]
404        if not dt_str:
405            return None
406
407        dateutil_parser = mrsm.attempt_import('dateutil.parser')
408
409        try:
410            dt = dateutil_parser.parse(dt_str)
411        except Exception as e:
412            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
413            return None
414        return dt.astimezone(timezone.utc).isoformat()
415
416    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
417        """
418        Return when a job was paused.
419        """
420        job = self.get_hidden_job(name, debug=debug)
421        if self.get_job_status(name, debug=debug) != 'paused':
422            return None
423
424        stop_time = job.stop_time
425        if stop_time is None:
426            return None
427
428        return stop_time.isoformat()
429
430    def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple:
431        """
432        Return the job's result SuccessTuple.
433        """
434        result_path = self.get_result_path(name, debug=debug)
435        if not result_path.exists():
436            return False, "No result available."
437
438        try:
439            with open(result_path, 'r', encoding='utf-8') as f:
440                result = json.load(f)
441        except Exception:
442            return False, f"Could not read result for Job '{name}'."
443
444        return tuple(result)
445
446    def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]:
447        """
448        Return the sysargs from the service file.
449        """
450        service_file_path = self.get_service_file_path(name, debug=debug)
451        if not service_file_path.exists():
452            return []
453
454        with open(service_file_path, 'r', encoding='utf-8') as f:
455            service_lines = f.readlines()
456
457        for line in service_lines:
458            if line.startswith('ExecStart='):
459                sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0]
460                return shlex.split(sysargs_str)
461
462        return []
463
464    def run_command(
465        self,
466        command_args: List[str],
467        as_output: bool = False,
468        debug: bool = False,
469    ) -> Union[SuccessTuple, str]:
470        """
471        Run a `systemd` command and return success.
472
473        Parameters
474        ----------
475        command_args: List[str]
476            The command to pass to `systemctl --user`.
477
478        as_output: bool, default False
479            If `True`, return the process stdout output.
480            Defaults to a `SuccessTuple`.
481
482        Returns
483        -------
484        A `SuccessTuple` indicating success or a str for the process output.
485        """
486        from meerschaum.utils.process import run_process
487
488        if command_args[:2] != ['systemctl', '--user']:
489            command_args = ['systemctl', '--user'] + command_args
490
491        if debug:
492            dprint(shlex.join(command_args))
493
494        proc = run_process(
495            command_args,
496            foreground=False,
497            as_proc=True,
498            capture_output=True,
499            text=True,
500        )
501        stdout, stderr = proc.communicate()
502        if debug:
503            dprint(f"{stdout}")
504
505        if as_output:
506            return stdout.strip()
507            
508        command_success = proc.wait() == 0
509        command_msg = (
510            "Success"
511            if command_success
512            else f"Failed to execute command `{shlex.join(command_args)}`."
513        )
514        return command_success, command_msg
515
516    def get_job_stdin_file(self, name: str, debug: bool = False):
517        """
518        Return a `StdinFile` for the job.
519        """
520        from meerschaum.utils.daemon import StdinFile
521        if '_stdin_files' not in self.__dict__:
522            self._stdin_files: Dict[str, StdinFile] = {}
523
524        if name not in self._stdin_files:
525            socket_path = self.get_socket_path(name, debug=debug)
526            socket_path.parent.mkdir(parents=True, exist_ok=True)
527            self._stdin_files[name] = StdinFile(socket_path)
528
529        return self._stdin_files[name]
530
531    def create_job(
532        self,
533        name: str,
534        sysargs: List[str],
535        properties: Optional[Dict[str, Any]] = None,
536        debug: bool = False,
537    ) -> SuccessTuple:
538        """
539        Create a job as a service to be run by `systemd`.
540        """
541        from meerschaum.utils.misc import make_symlink
542        service_name = self.get_service_name(name, debug=debug)
543        service_file_path = self.get_service_file_path(name, debug=debug)
544        service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug)
545        socket_stdin = self.get_job_stdin_file(name, debug=debug)
546        _ = socket_stdin.file_handler
547
548        ### Init the externally_managed file.
549        ### NOTE: We must write the pickle file in addition to the properties file.
550        job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug)
551        job._set_externally_managed()
552        pickle_success, pickle_msg = job.daemon.write_pickle()
553        if not pickle_success:
554            return pickle_success, pickle_msg
555        properties_success, properties_msg = job.daemon.write_properties()
556        if not properties_success:
557            return properties_success, properties_msg
558
559        service_file_path.parent.mkdir(parents=True, exist_ok=True)
560
561        with open(service_file_path, 'w+', encoding='utf-8') as f:
562            f.write(self.get_service_file_text(name, sysargs, debug=debug))
563
564        symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path)
565        if not symlink_success:
566            return symlink_success, symlink_msg
567
568        commands = [
569            ['daemon-reload'],
570            ['enable', service_name],
571            ['start', service_name],
572        ]
573
574        fails = 0
575        for command_list in commands:
576            command_success, command_msg = self.run_command(command_list, debug=debug)
577            if not command_success:
578                fails += 1
579
580        if fails > 1:
581            return False, "Failed to reload systemd."
582
583        return True, f"Started job '{name}' via systemd."
584
585    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
586        """
587        Stop a job's service.
588        """
589        job = self.get_hidden_job(name, debug=debug)
590        job.daemon._remove_stop_file()
591
592        status = self.get_job_status(name, debug=debug)
593        if status == 'paused':
594            return self.run_command(
595                ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)],
596                debug=debug,
597            )
598
599        return self.run_command(
600            ['start', self.get_service_name(name, debug=debug)],
601            debug=debug,
602        )
603
604    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
605        """
606        Stop a job's service.
607        """
608        job = self.get_hidden_job(name, debug=debug)
609        job.daemon._write_stop_file('quit')
610        sigint_success, sigint_msg = self.run_command(
611            ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)],
612            debug=debug,
613        )
614
615        check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds')
616        loop_start = time.perf_counter()
617        timeout_seconds = get_config('jobs', 'timeout_seconds')
618        while (time.perf_counter() - loop_start) < timeout_seconds:
619            if self.get_job_status(name, debug=debug) == 'stopped':
620                return True, 'Success'
621
622            time.sleep(check_timeout_interval)
623
624        return self.run_command(
625            ['stop', self.get_service_name(name, debug=debug)],
626            debug=debug,
627        )
628
629    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
630        """
631        Pause a job's service.
632        """
633        job = self.get_hidden_job(name, debug=debug)
634        job.daemon._write_stop_file('pause')
635        return self.run_command(
636            ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)],
637            debug=debug,
638        )
639
640    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
641        """
642        Delete a job's service.
643        """
644        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
645        job = self.get_hidden_job(name, debug=debug)
646
647        if not job.delete_after_completion:
648            _ = self.stop_job(name, debug=debug)
649            _ = self.run_command(
650                ['disable', self.get_service_name(name, debug=debug)],
651                debug=debug,
652            )
653
654        service_job_path = self.get_service_job_path(name, debug=debug)
655        try:
656            if service_job_path.exists():
657                shutil.rmtree(service_job_path)
658        except Exception as e:
659            warn(e)
660            return False, str(e)
661
662        service_logs_path = self.get_service_logs_path(name, debug=debug)
663        logs_paths = [
664            (SYSTEMD_LOGS_RESOURCES_PATH / name)
665            for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH)
666            if name.startswith(service_logs_path.name + '.')
667        ]
668        paths = [
669            self.get_service_file_path(name, debug=debug),
670            self.get_service_symlink_file_path(name, debug=debug),
671            self.get_socket_path(name, debug=debug),
672            self.get_result_path(name, debug=debug),
673        ] + logs_paths
674
675        for path in paths:
676            if path.exists():
677                try:
678                    path.unlink()
679                except Exception as e:
680                    warn(e)
681                    return False, str(e)
682
683        _ = job.delete()
684
685        return self.run_command(['daemon-reload'], debug=debug)
686
687    def get_logs(self, name: str, debug: bool = False) -> str:
688        """
689        Return a job's journal logs.
690        """
691        rotating_file = self.get_job_rotating_file(name, debug=debug)
692        return rotating_file.read()
693
694    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
695        """
696        Return a job's stop time.
697        """
698        job = self.get_hidden_job(name, debug=debug)
699        return job.stop_time
700
701    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
702        """
703        Return whether a job is blocking on stdin.
704        """
705        socket_path = self.get_socket_path(name, debug=debug)
706        blocking_path = socket_path.parent / (socket_path.name + '.block')
707        return blocking_path.exists()
708
709    def get_job_rotating_file(self, name: str, debug: bool = False):
710        """
711        Return a `RotatingFile` for the job's log output.
712        """
713        from meerschaum.utils.daemon import RotatingFile
714        service_logs_path = self.get_service_logs_path(name, debug=debug)
715        return RotatingFile(service_logs_path)
716
717    async def monitor_logs_async(
718        self,
719        name: str,
720        *args,
721        debug: bool = False,
722        **kwargs
723    ):
724        """
725        Monitor a job's output.
726        """
727        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
728        job = self.get_hidden_job(name, debug=debug)
729        kwargs.update({
730            '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH,
731            '_log': self.get_job_rotating_file(name, debug=debug),
732            '_stdin_file': self.get_job_stdin_file(name, debug=debug),
733            'debug': debug,
734        })
735        await job.monitor_logs_async(*args, **kwargs)
736
737    def monitor_logs(self, *args, **kwargs):
738        """
739        Monitor a job's output.
740        """
741        asyncio.run(self.monitor_logs_async(*args, **kwargs))

Execute Meerschaum jobs via systemd.

def get_job_names(self, debug: bool = False) -> List[str]:
38    def get_job_names(self, debug: bool = False) -> List[str]:
39        """
40        Return a list of existing jobs, including hidden ones.
41        """
42        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
43        return [
44            service_name[len('mrsm-'):(-1 * len('.service'))]
45            for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH)
46            if (
47                service_name.startswith('mrsm-')
48                and service_name.endswith('.service')
49                ### Check for broken symlinks.
50                and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists()
51            )
52        ]

Return a list of existing jobs, including hidden ones.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
54    def get_job_exists(self, name: str, debug: bool = False) -> bool:
55        """
56        Return whether a job exists.
57        """
58        user_services = self.get_job_names(debug=debug)
59        if debug:
60            dprint(f'Existing services: {user_services}')
61        return name in user_services

Return whether a job exists.

def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
63    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
64        """
65        Return a dictionary of `systemd` Jobs (including hidden jobs).
66        """
67        user_services = self.get_job_names(debug=debug)
68        jobs = {
69            name: Job(name, executor_keys=str(self))
70            for name in user_services
71        }
72        return {
73            name: job
74            for name, job in jobs.items()
75        }

Return a dictionary of systemd Jobs (including hidden jobs).

def get_service_name(self, name: str, debug: bool = False) -> str:
77    def get_service_name(self, name: str, debug: bool = False) -> str:
78        """
79        Return a job's service name.
80        """
81        return f"mrsm-{name.replace(' ', '-')}.service"

Return a job's service name.

def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
83    def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
84        """
85        Return the path for the job's files under the root directory.
86        """
87        from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH
88        return SYSTEMD_JOBS_RESOURCES_PATH / name

Return the path for the job's files under the root directory.

def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 97    def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 98        """
 99        Return the path to a Job's service file.
100        """
101        return (
102            self.get_service_job_path(name, debug=debug)
103            / self.get_service_name(name, debug=debug)
104        )

Return the path to a Job's service file.

def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
106    def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
107        """
108        Return the path to direct service logs to.
109        """
110        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
111        return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log')

Return the path to direct service logs to.

def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
113    def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
114        """
115        Return the path to the FIFO file.
116        """
117        return (
118            self.get_service_job_path(name, debug=debug)
119            / (self.get_service_name(name, debug=debug) + '.stdin')
120        )

Return the path to the FIFO file.

def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
122    def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
123        """
124        Return the path to the result file.
125        """
126        return (
127            self.get_service_job_path(name, debug=debug)
128            / (self.get_service_name(name, debug=debug) + '.result.json')
129        )

Return the path to the result file.

def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
131    def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
132        """
133        Return the contents of the unit file.
134        """
135        service_logs_path = self.get_service_logs_path(name, debug=debug)
136        socket_path = self.get_socket_path(name, debug=debug)
137        result_path = self.get_result_path(name, debug=debug)
138        job = self.get_hidden_job(name, debug=debug)
139
140        sysargs_str = shlex.join(sysargs)
141        exec_str = f'{sys.executable} -m meerschaum {sysargs_str}'
142        mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()])
143        mrsm_env_vars = {
144            key: val
145            for key, val in os.environ.items()
146            if key in mrsm_env_var_names
147        }
148
149        ### Add new environment variables for the service process.
150        mrsm_env_vars.update({
151            STATIC_CONFIG['environment']['daemon_id']: name,
152            STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(),
153            STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(),
154            STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(),
155            STATIC_CONFIG['environment']['systemd_delete_job']: (
156                '1'
157                if job.delete_after_completion
158                else '0',
159            ),
160        })
161
162        ### Allow for user-defined environment variables.
163        mrsm_env_vars.update(job.env)
164
165        environment_lines = [
166            f"Environment={key}={shlex.quote(str(val))}"
167            for key, val in mrsm_env_vars.items()
168        ]
169        environment_str = '\n'.join(environment_lines)
170        service_name = self.get_service_name(name, debug=debug)
171
172        service_text = (
173            "[Unit]\n"
174            f"Description=Run the job '{name}'\n"
175            "\n"
176            "[Service]\n"
177            f"ExecStart={exec_str}\n"
178            "KillSignal=SIGTERM\n"
179            "Restart=always\n"
180            "RestartPreventExitStatus=0\n"
181            f"SyslogIdentifier={service_name}\n"
182            f"{environment_str}\n"
183            "\n"
184            "[Install]\n"
185            "WantedBy=default.target\n"
186        )
187        return service_text

Return the contents of the unit file.

def get_socket_file_text(self, name: str, debug: bool = False) -> str:
189    def get_socket_file_text(self, name: str, debug: bool = False) -> str:
190        """
191        Return the contents of the socket file.
192        """
193        service_name = self.get_service_name(name, debug=debug)
194        socket_path = self.get_socket_path(name, debug=debug)
195        socket_text = (
196            "[Unit]\n"
197            f"BindsTo={service_name}\n"
198            "\n"
199            "[Socket]\n"
200            f"ListenFIFO={socket_path.as_posix()}\n"
201            "FileDescriptorName=stdin\n"
202            "RemoveOnStop=true\n"
203            "SocketMode=0660\n"
204        )
205        return socket_text

Return the contents of the socket file.

def get_hidden_job( self, name: str, sysargs: Optional[List[str]] = None, properties: Optional[Dict[str, Any]] = None, debug: bool = False):
207    def get_hidden_job(
208        self,
209        name: str,
210        sysargs: Optional[List[str]] = None,
211        properties: Optional[Dict[str, Any]] = None,
212        debug: bool = False,
213    ):
214        """
215        Return the hidden "sister" job to store a job's parameters.
216        """
217        job = Job(
218            name,
219            sysargs,
220            executor_keys='local',
221            _properties=properties,
222            _rotating_log=self.get_job_rotating_file(name, debug=debug),
223            _stdin_file=self.get_job_stdin_file(name, debug=debug),
224            _status_hook=partial(self.get_job_status, name),
225            _result_hook=partial(self.get_job_result, name),
226            _externally_managed=True,
227        )
228        return job

Return the hidden "sister" job to store a job's parameters.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
231    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
232        """
233        Return metadata about a job.
234        """
235        now = time.perf_counter()
236
237        if '_jobs_metadata' not in self.__dict__:
238            self._jobs_metadata: Dict[str, Any] = {}
239
240        if name in self._jobs_metadata:
241            ts = self._jobs_metadata[name].get('timestamp', None)
242
243            if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS:
244                if debug:
245                    dprint(f"Retuning cached metadata for job '{name}'.")
246                return self._jobs_metadata[name]['metadata']
247
248        metadata = {
249            'sysargs': self.get_job_sysargs(name, debug=debug),
250            'result': self.get_job_result(name, debug=debug),
251            'restart': self.get_job_restart(name, debug=debug),
252            'daemon': {
253                'status': self.get_job_status(name, debug=debug),
254                'pid': self.get_job_pid(name, debug=debug),
255                'properties': self.get_job_properties(name, debug=debug),
256            },
257        }
258        self._jobs_metadata[name] = {
259            'timestamp': now,
260            'metadata': metadata,
261        }
262        return metadata

Return metadata about a job.

def get_job_restart(self, name: str, debug: bool = False) -> bool:
264    def get_job_restart(self, name: str, debug: bool = False) -> bool:
265        """
266        Return whether a job restarts.
267        """
268        from meerschaum.jobs._Job import RESTART_FLAGS
269        sysargs = self.get_job_sysargs(name, debug=debug)
270        if not sysargs:
271            return False
272
273        for flag in RESTART_FLAGS:
274            if flag in sysargs:
275                return True
276
277        return False

Return whether a job restarts.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
279    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
280        """
281        Return the properties for a job.
282        """
283        job = self.get_hidden_job(name, debug=debug)
284        return {
285            k: v for k, v in job.daemon.properties.items()
286            if k != 'externally_managed'
287        }

Return the properties for a job.

def get_job_process(self, name: str, debug: bool = False):
289    def get_job_process(self, name: str, debug: bool = False):
290        """
291        Return a `psutil.Process` for the job's PID.
292        """
293        pid = self.get_job_pid(name, debug=debug)
294        if pid is None:
295            return None
296
297        psutil = mrsm.attempt_import('psutil')
298        try:
299            return psutil.Process(pid)
300        except Exception:
301            return None

Return a psutil.Process for the job's PID.

def get_job_status(self, name: str, debug: bool = False) -> str:
303    def get_job_status(self, name: str, debug: bool = False) -> str:
304        """
305        Return the job's service status.
306        """
307        output = self.run_command(
308            ['is-active', self.get_service_name(name, debug=debug)],
309            as_output=True,
310            debug=debug,
311        )
312
313        if output == 'activating':
314            return 'running'
315
316        if output == 'active':
317            process = self.get_job_process(name, debug=debug)
318            if process is None:
319                return 'stopped'
320
321            try:
322                if process.status() == 'stopped':
323                    return 'paused'
324            except Exception:
325                return 'stopped'
326
327            return 'running'
328
329        return 'stopped'

Return the job's service status.

def get_job_pid(self, name: str, debug: bool = False) -> Optional[int]:
331    def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]:
332        """
333        Return the job's service PID.
334        """
335        from meerschaum.utils.misc import is_int
336
337        output = self.run_command(
338            [
339                'show',
340                self.get_service_name(name, debug=debug),
341                '--property=MainPID',
342            ],
343            as_output=True,
344            debug=debug,
345        )
346        if not output.startswith('MainPID='):
347            return None
348
349        pid_str = output[len('MainPID='):]
350        if pid_str == '0':
351            return None
352
353        if is_int(pid_str):
354            return int(pid_str)
355        
356        return None

Return the job's service PID.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
358    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
359        """
360        Return when a job began running.
361        """
362        output = self.run_command(
363            [
364                'show',
365                self.get_service_name(name, debug=debug),
366                '--property=ActiveEnterTimestamp'
367            ],
368            as_output=True,
369            debug=debug,
370        )
371        if not output.startswith('ActiveEnterTimestamp'):
372            return None
373
374        dt_str = output.split('=')[-1]
375        if not dt_str:
376            return None
377
378        dateutil_parser = mrsm.attempt_import('dateutil.parser')
379        try:
380            dt = dateutil_parser.parse(dt_str)
381        except Exception as e:
382            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
383            return None
384
385        return dt.astimezone(timezone.utc).isoformat()

Return when a job began running.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
387    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
388        """
389        Return when a job began running.
390        """
391        output = self.run_command(
392            [
393                'show',
394                self.get_service_name(name, debug=debug),
395                '--property=InactiveEnterTimestamp'
396            ],
397            as_output=True,
398            debug=debug,
399        )
400        if not output.startswith('InactiveEnterTimestamp'):
401            return None
402
403        dt_str = output.split('=')[-1]
404        if not dt_str:
405            return None
406
407        dateutil_parser = mrsm.attempt_import('dateutil.parser')
408
409        try:
410            dt = dateutil_parser.parse(dt_str)
411        except Exception as e:
412            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
413            return None
414        return dt.astimezone(timezone.utc).isoformat()

Return when a job began running.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
416    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
417        """
418        Return when a job was paused.
419        """
420        job = self.get_hidden_job(name, debug=debug)
421        if self.get_job_status(name, debug=debug) != 'paused':
422            return None
423
424        stop_time = job.stop_time
425        if stop_time is None:
426            return None
427
428        return stop_time.isoformat()

Return when a job was paused.

def get_job_result(self, name: str, debug: bool = False) -> Tuple[bool, str]:
430    def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple:
431        """
432        Return the job's result SuccessTuple.
433        """
434        result_path = self.get_result_path(name, debug=debug)
435        if not result_path.exists():
436            return False, "No result available."
437
438        try:
439            with open(result_path, 'r', encoding='utf-8') as f:
440                result = json.load(f)
441        except Exception:
442            return False, f"Could not read result for Job '{name}'."
443
444        return tuple(result)

Return the job's result SuccessTuple.

def get_job_sysargs(self, name: str, debug: bool = False) -> Optional[List[str]]:
446    def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]:
447        """
448        Return the sysargs from the service file.
449        """
450        service_file_path = self.get_service_file_path(name, debug=debug)
451        if not service_file_path.exists():
452            return []
453
454        with open(service_file_path, 'r', encoding='utf-8') as f:
455            service_lines = f.readlines()
456
457        for line in service_lines:
458            if line.startswith('ExecStart='):
459                sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0]
460                return shlex.split(sysargs_str)
461
462        return []

Return the sysargs from the service file.

def run_command( self, command_args: List[str], as_output: bool = False, debug: bool = False) -> Union[Tuple[bool, str], str]:
464    def run_command(
465        self,
466        command_args: List[str],
467        as_output: bool = False,
468        debug: bool = False,
469    ) -> Union[SuccessTuple, str]:
470        """
471        Run a `systemd` command and return success.
472
473        Parameters
474        ----------
475        command_args: List[str]
476            The command to pass to `systemctl --user`.
477
478        as_output: bool, default False
479            If `True`, return the process stdout output.
480            Defaults to a `SuccessTuple`.
481
482        Returns
483        -------
484        A `SuccessTuple` indicating success or a str for the process output.
485        """
486        from meerschaum.utils.process import run_process
487
488        if command_args[:2] != ['systemctl', '--user']:
489            command_args = ['systemctl', '--user'] + command_args
490
491        if debug:
492            dprint(shlex.join(command_args))
493
494        proc = run_process(
495            command_args,
496            foreground=False,
497            as_proc=True,
498            capture_output=True,
499            text=True,
500        )
501        stdout, stderr = proc.communicate()
502        if debug:
503            dprint(f"{stdout}")
504
505        if as_output:
506            return stdout.strip()
507            
508        command_success = proc.wait() == 0
509        command_msg = (
510            "Success"
511            if command_success
512            else f"Failed to execute command `{shlex.join(command_args)}`."
513        )
514        return command_success, command_msg

Run a systemd command and return success.

Parameters
  • command_args (List[str]): The command to pass to systemctl --user.
  • as_output (bool, default False): If True, return the process stdout output. Defaults to a SuccessTuple.
Returns
  • A SuccessTuple indicating success or a str for the process output.
def get_job_stdin_file(self, name: str, debug: bool = False):
516    def get_job_stdin_file(self, name: str, debug: bool = False):
517        """
518        Return a `StdinFile` for the job.
519        """
520        from meerschaum.utils.daemon import StdinFile
521        if '_stdin_files' not in self.__dict__:
522            self._stdin_files: Dict[str, StdinFile] = {}
523
524        if name not in self._stdin_files:
525            socket_path = self.get_socket_path(name, debug=debug)
526            socket_path.parent.mkdir(parents=True, exist_ok=True)
527            self._stdin_files[name] = StdinFile(socket_path)
528
529        return self._stdin_files[name]

Return a StdinFile for the job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
531    def create_job(
532        self,
533        name: str,
534        sysargs: List[str],
535        properties: Optional[Dict[str, Any]] = None,
536        debug: bool = False,
537    ) -> SuccessTuple:
538        """
539        Create a job as a service to be run by `systemd`.
540        """
541        from meerschaum.utils.misc import make_symlink
542        service_name = self.get_service_name(name, debug=debug)
543        service_file_path = self.get_service_file_path(name, debug=debug)
544        service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug)
545        socket_stdin = self.get_job_stdin_file(name, debug=debug)
546        _ = socket_stdin.file_handler
547
548        ### Init the externally_managed file.
549        ### NOTE: We must write the pickle file in addition to the properties file.
550        job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug)
551        job._set_externally_managed()
552        pickle_success, pickle_msg = job.daemon.write_pickle()
553        if not pickle_success:
554            return pickle_success, pickle_msg
555        properties_success, properties_msg = job.daemon.write_properties()
556        if not properties_success:
557            return properties_success, properties_msg
558
559        service_file_path.parent.mkdir(parents=True, exist_ok=True)
560
561        with open(service_file_path, 'w+', encoding='utf-8') as f:
562            f.write(self.get_service_file_text(name, sysargs, debug=debug))
563
564        symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path)
565        if not symlink_success:
566            return symlink_success, symlink_msg
567
568        commands = [
569            ['daemon-reload'],
570            ['enable', service_name],
571            ['start', service_name],
572        ]
573
574        fails = 0
575        for command_list in commands:
576            command_success, command_msg = self.run_command(command_list, debug=debug)
577            if not command_success:
578                fails += 1
579
580        if fails > 1:
581            return False, "Failed to reload systemd."
582
583        return True, f"Started job '{name}' via systemd."

Create a job as a service to be run by systemd.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
585    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
586        """
587        Stop a job's service.
588        """
589        job = self.get_hidden_job(name, debug=debug)
590        job.daemon._remove_stop_file()
591
592        status = self.get_job_status(name, debug=debug)
593        if status == 'paused':
594            return self.run_command(
595                ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)],
596                debug=debug,
597            )
598
599        return self.run_command(
600            ['start', self.get_service_name(name, debug=debug)],
601            debug=debug,
602        )

Stop a job's service.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
604    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
605        """
606        Stop a job's service.
607        """
608        job = self.get_hidden_job(name, debug=debug)
609        job.daemon._write_stop_file('quit')
610        sigint_success, sigint_msg = self.run_command(
611            ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)],
612            debug=debug,
613        )
614
615        check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds')
616        loop_start = time.perf_counter()
617        timeout_seconds = get_config('jobs', 'timeout_seconds')
618        while (time.perf_counter() - loop_start) < timeout_seconds:
619            if self.get_job_status(name, debug=debug) == 'stopped':
620                return True, 'Success'
621
622            time.sleep(check_timeout_interval)
623
624        return self.run_command(
625            ['stop', self.get_service_name(name, debug=debug)],
626            debug=debug,
627        )

Stop a job's service.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
629    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
630        """
631        Pause a job's service.
632        """
633        job = self.get_hidden_job(name, debug=debug)
634        job.daemon._write_stop_file('pause')
635        return self.run_command(
636            ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)],
637            debug=debug,
638        )

Pause a job's service.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
640    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
641        """
642        Delete a job's service.
643        """
644        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
645        job = self.get_hidden_job(name, debug=debug)
646
647        if not job.delete_after_completion:
648            _ = self.stop_job(name, debug=debug)
649            _ = self.run_command(
650                ['disable', self.get_service_name(name, debug=debug)],
651                debug=debug,
652            )
653
654        service_job_path = self.get_service_job_path(name, debug=debug)
655        try:
656            if service_job_path.exists():
657                shutil.rmtree(service_job_path)
658        except Exception as e:
659            warn(e)
660            return False, str(e)
661
662        service_logs_path = self.get_service_logs_path(name, debug=debug)
663        logs_paths = [
664            (SYSTEMD_LOGS_RESOURCES_PATH / name)
665            for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH)
666            if name.startswith(service_logs_path.name + '.')
667        ]
668        paths = [
669            self.get_service_file_path(name, debug=debug),
670            self.get_service_symlink_file_path(name, debug=debug),
671            self.get_socket_path(name, debug=debug),
672            self.get_result_path(name, debug=debug),
673        ] + logs_paths
674
675        for path in paths:
676            if path.exists():
677                try:
678                    path.unlink()
679                except Exception as e:
680                    warn(e)
681                    return False, str(e)
682
683        _ = job.delete()
684
685        return self.run_command(['daemon-reload'], debug=debug)

Delete a job's service.

def get_logs(self, name: str, debug: bool = False) -> str:
687    def get_logs(self, name: str, debug: bool = False) -> str:
688        """
689        Return a job's journal logs.
690        """
691        rotating_file = self.get_job_rotating_file(name, debug=debug)
692        return rotating_file.read()

Return a job's journal logs.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
694    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
695        """
696        Return a job's stop time.
697        """
698        job = self.get_hidden_job(name, debug=debug)
699        return job.stop_time

Return a job's stop time.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
701    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
702        """
703        Return whether a job is blocking on stdin.
704        """
705        socket_path = self.get_socket_path(name, debug=debug)
706        blocking_path = socket_path.parent / (socket_path.name + '.block')
707        return blocking_path.exists()

Return whether a job is blocking on stdin.

def get_job_rotating_file(self, name: str, debug: bool = False):
709    def get_job_rotating_file(self, name: str, debug: bool = False):
710        """
711        Return a `RotatingFile` for the job's log output.
712        """
713        from meerschaum.utils.daemon import RotatingFile
714        service_logs_path = self.get_service_logs_path(name, debug=debug)
715        return RotatingFile(service_logs_path)

Return a RotatingFile for the job's log output.

async def monitor_logs_async(self, name: str, *args, debug: bool = False, **kwargs):
717    async def monitor_logs_async(
718        self,
719        name: str,
720        *args,
721        debug: bool = False,
722        **kwargs
723    ):
724        """
725        Monitor a job's output.
726        """
727        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
728        job = self.get_hidden_job(name, debug=debug)
729        kwargs.update({
730            '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH,
731            '_log': self.get_job_rotating_file(name, debug=debug),
732            '_stdin_file': self.get_job_stdin_file(name, debug=debug),
733            'debug': debug,
734        })
735        await job.monitor_logs_async(*args, **kwargs)

Monitor a job's output.

def monitor_logs(self, *args, **kwargs):
737    def monitor_logs(self, *args, **kwargs):
738        """
739        Monitor a job's output.
740        """
741        asyncio.run(self.monitor_logs_async(*args, **kwargs))

Monitor a job's output.