meerschaum.jobs.systemd

Manage meerschaum.jobs.Job via systemd.

  1#! /usr/bin/env python3
  2# vim:fenc=utf-8
  3
  4"""
  5Manage `meerschaum.jobs.Job` via `systemd`.
  6"""
  7
  8import os
  9import pathlib
 10import shlex
 11import sys
 12import asyncio
 13import json
 14import time
 15import shutil
 16from datetime import datetime, timezone
 17from functools import partial
 18
 19import meerschaum as mrsm
 20from meerschaum.jobs import Job, Executor, make_executor
 21from meerschaum.utils.typing import Dict, Any, List, SuccessTuple, Union, Optional, Callable
 22from meerschaum.config import get_config
 23from meerschaum._internal.static import STATIC_CONFIG
 24from meerschaum.utils.warnings import warn, dprint
 25
 26JOB_METADATA_CACHE_SECONDS: int = STATIC_CONFIG['api']['jobs']['metadata_cache_seconds']
 27
 28
 29@make_executor
 30class SystemdExecutor(Executor):
 31    """
 32    Execute Meerschaum jobs via `systemd`.
 33    """
 34
 35    def get_job_names(self, debug: bool = False) -> List[str]:
 36        """
 37        Return a list of existing jobs, including hidden ones.
 38        """
 39        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
 40        return [
 41            service_name[len('mrsm-'):(-1 * len('.service'))]
 42            for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH)
 43            if (
 44                service_name.startswith('mrsm-')
 45                and service_name.endswith('.service')
 46                ### Check for broken symlinks.
 47                and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists()
 48            )
 49        ]
 50
 51    def get_job_exists(self, name: str, debug: bool = False) -> bool:
 52        """
 53        Return whether a job exists.
 54        """
 55        user_services = self.get_job_names(debug=debug)
 56        if debug:
 57            dprint(f'Existing services: {user_services}')
 58        return name in user_services
 59
 60    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
 61        """
 62        Return a dictionary of `systemd` Jobs (including hidden jobs).
 63        """
 64        user_services = self.get_job_names(debug=debug)
 65        jobs = {
 66            name: Job(name, executor_keys=str(self))
 67            for name in user_services
 68        }
 69        return {
 70            name: job
 71            for name, job in jobs.items()
 72        }
 73
 74    def get_service_name(self, name: str, debug: bool = False) -> str:
 75        """
 76        Return a job's service name.
 77        """
 78        return f"mrsm-{name.replace(' ', '-')}.service"
 79
 80    def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
 81        """
 82        Return the path for the job's files under the root directory.
 83        """
 84        from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH
 85        return SYSTEMD_JOBS_RESOURCES_PATH / name
 86
 87    def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 88        """
 89        Return the path to where to create the service symlink.
 90        """
 91        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
 92        return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug)
 93
 94    def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 95        """
 96        Return the path to a Job's service file.
 97        """
 98        return (
 99            self.get_service_job_path(name, debug=debug)
100            / self.get_service_name(name, debug=debug)
101        )
102
103    def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
104        """
105        Return the path to direct service logs to.
106        """
107        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
108        return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log')
109
110    def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
111        """
112        Return the path to the FIFO file.
113        """
114        return (
115            self.get_service_job_path(name, debug=debug)
116            / (self.get_service_name(name, debug=debug) + '.stdin')
117        )
118
119    def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
120        """
121        Return the path to the result file.
122        """
123        return (
124            self.get_service_job_path(name, debug=debug)
125            / (self.get_service_name(name, debug=debug) + '.result.json')
126        )
127
128    def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
129        """
130        Return the contents of the unit file.
131        """
132        service_logs_path = self.get_service_logs_path(name, debug=debug)
133        socket_path = self.get_socket_path(name, debug=debug)
134        result_path = self.get_result_path(name, debug=debug)
135        job = self.get_hidden_job(name, debug=debug)
136
137        sysargs_str = shlex.join(sysargs)
138        exec_str = f'{sys.executable} -m meerschaum {sysargs_str}'
139        mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()])
140        mrsm_env_vars = {
141            key: val
142            for key, val in os.environ.items()
143            if key in mrsm_env_var_names
144        }
145
146        ### Add new environment variables for the service process.
147        mrsm_env_vars.update({
148            STATIC_CONFIG['environment']['daemon_id']: name,
149            STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(),
150            STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(),
151            STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(),
152            STATIC_CONFIG['environment']['systemd_delete_job']: (
153                '1'
154                if job.delete_after_completion
155                else '0'
156            ),
157        })
158
159        ### Allow for user-defined environment variables.
160        mrsm_env_vars.update(job.env)
161
162        environment_lines = [
163            f"Environment={key}={shlex.quote(str(val))}"
164            for key, val in mrsm_env_vars.items()
165        ]
166        environment_str = '\n'.join(environment_lines)
167        service_name = self.get_service_name(name, debug=debug)
168
169        service_text = (
170            "[Unit]\n"
171            f"Description=Run the job '{name}'\n"
172            "\n"
173            "[Service]\n"
174            f"ExecStart={exec_str}\n"
175            "KillSignal=SIGTERM\n"
176            "Restart=always\n"
177            "RestartPreventExitStatus=0\n"
178            f"SyslogIdentifier={service_name}\n"
179            f"{environment_str}\n"
180            "\n"
181            "[Install]\n"
182            "WantedBy=default.target\n"
183        )
184        return service_text
185
186    def get_socket_file_text(self, name: str, debug: bool = False) -> str:
187        """
188        Return the contents of the socket file.
189        """
190        service_name = self.get_service_name(name, debug=debug)
191        socket_path = self.get_socket_path(name, debug=debug)
192        socket_text = (
193            "[Unit]\n"
194            f"BindsTo={service_name}\n"
195            "\n"
196            "[Socket]\n"
197            f"ListenFIFO={socket_path.as_posix()}\n"
198            "FileDescriptorName=stdin\n"
199            "RemoveOnStop=true\n"
200            "SocketMode=0660\n"
201        )
202        return socket_text
203
204    def get_hidden_job(
205        self,
206        name: str,
207        sysargs: Optional[List[str]] = None,
208        properties: Optional[Dict[str, Any]] = None,
209        debug: bool = False,
210    ):
211        """
212        Return the hidden "sister" job to store a job's parameters.
213        """
214        job = Job(
215            name,
216            sysargs,
217            executor_keys='local',
218            _properties=properties,
219            _rotating_log=self.get_job_rotating_file(name, debug=debug),
220            _stdin_file=self.get_job_stdin_file(name, debug=debug),
221            _status_hook=partial(self.get_job_status, name),
222            _result_hook=partial(self.get_job_result, name),
223            _externally_managed=True,
224        )
225        return job
226
227
228    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
229        """
230        Return metadata about a job.
231        """
232        now = time.perf_counter()
233
234        if '_jobs_metadata' not in self.__dict__:
235            self._jobs_metadata: Dict[str, Any] = {}
236
237        if name in self._jobs_metadata:
238            ts = self._jobs_metadata[name].get('timestamp', None)
239
240            if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS:
241                if debug:
242                    dprint(f"Retuning cached metadata for job '{name}'.")
243                return self._jobs_metadata[name]['metadata']
244
245        metadata = {
246            'sysargs': self.get_job_sysargs(name, debug=debug),
247            'result': self.get_job_result(name, debug=debug),
248            'restart': self.get_job_restart(name, debug=debug),
249            'daemon': {
250                'status': self.get_job_status(name, debug=debug),
251                'pid': self.get_job_pid(name, debug=debug),
252                'properties': self.get_job_properties(name, debug=debug),
253            },
254        }
255        self._jobs_metadata[name] = {
256            'timestamp': now,
257            'metadata': metadata,
258        }
259        return metadata
260
261    def get_job_restart(self, name: str, debug: bool = False) -> bool:
262        """
263        Return whether a job restarts.
264        """
265        from meerschaum.jobs._Job import RESTART_FLAGS
266        sysargs = self.get_job_sysargs(name, debug=debug)
267        if not sysargs:
268            return False
269
270        for flag in RESTART_FLAGS:
271            if flag in sysargs:
272                return True
273
274        return False
275
276    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
277        """
278        Return the properties for a job.
279        """
280        job = self.get_hidden_job(name, debug=debug)
281        return {
282            k: v for k, v in job.daemon.properties.items()
283            if k != 'externally_managed'
284        }
285
286    def get_job_process(self, name: str, debug: bool = False):
287        """
288        Return a `psutil.Process` for the job's PID.
289        """
290        pid = self.get_job_pid(name, debug=debug)
291        if pid is None:
292            return None
293
294        psutil = mrsm.attempt_import('psutil')
295        try:
296            return psutil.Process(pid)
297        except Exception:
298            return None
299
300    def get_job_status(self, name: str, debug: bool = False) -> str:
301        """
302        Return the job's service status.
303        """
304        output = self.run_command(
305            ['is-active', self.get_service_name(name, debug=debug)],
306            as_output=True,
307            debug=debug,
308        )
309
310        if output == 'activating':
311            return 'running'
312
313        if output == 'active':
314            process = self.get_job_process(name, debug=debug)
315            if process is None:
316                return 'stopped'
317
318            try:
319                if process.status() == 'stopped':
320                    return 'paused'
321            except Exception:
322                return 'stopped'
323
324            return 'running'
325
326        return 'stopped'
327
328    def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]:
329        """
330        Return the job's service PID.
331        """
332        from meerschaum.utils.misc import is_int
333
334        output = self.run_command(
335            [
336                'show',
337                self.get_service_name(name, debug=debug),
338                '--property=MainPID',
339            ],
340            as_output=True,
341            debug=debug,
342        )
343        if not output.startswith('MainPID='):
344            return None
345
346        pid_str = output[len('MainPID='):]
347        if pid_str == '0':
348            return None
349
350        if is_int(pid_str):
351            return int(pid_str)
352        
353        return None
354
355    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
356        """
357        Return when a job began running.
358        """
359        output = self.run_command(
360            [
361                'show',
362                self.get_service_name(name, debug=debug),
363                '--property=ActiveEnterTimestamp'
364            ],
365            as_output=True,
366            debug=debug,
367        )
368        if not output.startswith('ActiveEnterTimestamp'):
369            return None
370
371        dt_str = output.split('=')[-1]
372        if not dt_str:
373            return None
374
375        dateutil_parser = mrsm.attempt_import('dateutil.parser')
376        try:
377            dt = dateutil_parser.parse(dt_str)
378        except Exception as e:
379            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
380            return None
381
382        return dt.astimezone(timezone.utc).isoformat()
383
384    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
385        """
386        Return when a job began running.
387        """
388        output = self.run_command(
389            [
390                'show',
391                self.get_service_name(name, debug=debug),
392                '--property=InactiveEnterTimestamp'
393            ],
394            as_output=True,
395            debug=debug,
396        )
397        if not output.startswith('InactiveEnterTimestamp'):
398            return None
399
400        dt_str = output.split('=')[-1]
401        if not dt_str:
402            return None
403
404        dateutil_parser = mrsm.attempt_import('dateutil.parser')
405
406        try:
407            dt = dateutil_parser.parse(dt_str)
408        except Exception as e:
409            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
410            return None
411        return dt.astimezone(timezone.utc).isoformat()
412
413    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
414        """
415        Return when a job was paused.
416        """
417        job = self.get_hidden_job(name, debug=debug)
418        if self.get_job_status(name, debug=debug) != 'paused':
419            return None
420
421        stop_time = job.stop_time
422        if stop_time is None:
423            return None
424
425        return stop_time.isoformat()
426
427    def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple:
428        """
429        Return the job's result SuccessTuple.
430        """
431        result_path = self.get_result_path(name, debug=debug)
432        if not result_path.exists():
433            return False, "No result available."
434
435        try:
436            with open(result_path, 'r', encoding='utf-8') as f:
437                result = json.load(f)
438        except Exception:
439            return False, f"Could not read result for Job '{name}'."
440
441        return tuple(result)
442
443    def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]:
444        """
445        Return the sysargs from the service file.
446        """
447        service_file_path = self.get_service_file_path(name, debug=debug)
448        if not service_file_path.exists():
449            return []
450
451        with open(service_file_path, 'r', encoding='utf-8') as f:
452            service_lines = f.readlines()
453
454        for line in service_lines:
455            if line.startswith('ExecStart='):
456                sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0]
457                return shlex.split(sysargs_str)
458
459        return []
460
461    def run_command(
462        self,
463        command_args: List[str],
464        as_output: bool = False,
465        debug: bool = False,
466    ) -> Union[SuccessTuple, str]:
467        """
468        Run a `systemd` command and return success.
469
470        Parameters
471        ----------
472        command_args: List[str]
473            The command to pass to `systemctl --user`.
474
475        as_output: bool, default False
476            If `True`, return the process stdout output.
477            Defaults to a `SuccessTuple`.
478
479        Returns
480        -------
481        A `SuccessTuple` indicating success or a str for the process output.
482        """
483        from meerschaum.utils.process import run_process
484
485        if command_args[:2] != ['systemctl', '--user']:
486            command_args = ['systemctl', '--user'] + command_args
487
488        if debug:
489            dprint(shlex.join(command_args))
490
491        proc = run_process(
492            command_args,
493            foreground=False,
494            as_proc=True,
495            capture_output=True,
496            text=True,
497        )
498        stdout, stderr = proc.communicate()
499        if debug:
500            dprint(f"{stdout}")
501
502        if as_output:
503            return stdout.strip()
504            
505        command_success = proc.wait() == 0
506        command_msg = (
507            "Success"
508            if command_success
509            else f"Failed to execute command `{shlex.join(command_args)}`."
510        )
511        return command_success, command_msg
512
513    def get_job_stdin_file(self, name: str, debug: bool = False):
514        """
515        Return a `StdinFile` for the job.
516        """
517        from meerschaum.utils.daemon import StdinFile
518        if '_stdin_files' not in self.__dict__:
519            self._stdin_files: Dict[str, StdinFile] = {}
520
521        if name not in self._stdin_files:
522            socket_path = self.get_socket_path(name, debug=debug)
523            socket_path.parent.mkdir(parents=True, exist_ok=True)
524            self._stdin_files[name] = StdinFile(socket_path)
525
526        return self._stdin_files[name]
527
528    def create_job(
529        self,
530        name: str,
531        sysargs: List[str],
532        properties: Optional[Dict[str, Any]] = None,
533        debug: bool = False,
534    ) -> SuccessTuple:
535        """
536        Create a job as a service to be run by `systemd`.
537        """
538        from meerschaum.utils.misc import make_symlink
539        service_name = self.get_service_name(name, debug=debug)
540        service_file_path = self.get_service_file_path(name, debug=debug)
541        service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug)
542        socket_stdin = self.get_job_stdin_file(name, debug=debug)
543        _ = socket_stdin.file_handler
544
545        ### Init the externally_managed file.
546        ### NOTE: We must write the pickle file in addition to the properties file.
547        job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug)
548        job._set_externally_managed()
549        pickle_success, pickle_msg = job.daemon.write_pickle()
550        if not pickle_success:
551            return pickle_success, pickle_msg
552        properties_success, properties_msg = job.daemon.write_properties()
553        if not properties_success:
554            return properties_success, properties_msg
555
556        service_file_path.parent.mkdir(parents=True, exist_ok=True)
557
558        with open(service_file_path, 'w+', encoding='utf-8') as f:
559            f.write(self.get_service_file_text(name, sysargs, debug=debug))
560
561        symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path)
562        if not symlink_success:
563            return symlink_success, symlink_msg
564
565        commands = [
566            ['daemon-reload'],
567            ['enable', service_name],
568            ['start', service_name],
569        ]
570
571        fails = 0
572        for command_list in commands:
573            command_success, command_msg = self.run_command(command_list, debug=debug)
574            if not command_success:
575                fails += 1
576
577        if fails > 1:
578            return False, "Failed to reload systemd."
579
580        return True, f"Started job '{name}' via systemd."
581
582    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
583        """
584        Stop a job's service.
585        """
586        job = self.get_hidden_job(name, debug=debug)
587        job.daemon._remove_stop_file()
588
589        status = self.get_job_status(name, debug=debug)
590        if status == 'paused':
591            return self.run_command(
592                ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)],
593                debug=debug,
594            )
595
596        return self.run_command(
597            ['start', self.get_service_name(name, debug=debug)],
598            debug=debug,
599        )
600
601    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
602        """
603        Stop a job's service.
604        """
605        job = self.get_hidden_job(name, debug=debug)
606        job.daemon._write_stop_file('quit')
607        sigint_success, sigint_msg = self.run_command(
608            ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)],
609            debug=debug,
610        )
611
612        check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds')
613        loop_start = time.perf_counter()
614        timeout_seconds = get_config('jobs', 'timeout_seconds')
615        while (time.perf_counter() - loop_start) < timeout_seconds:
616            if self.get_job_status(name, debug=debug) == 'stopped':
617                return True, 'Success'
618
619            time.sleep(check_timeout_interval)
620
621        return self.run_command(
622            ['stop', self.get_service_name(name, debug=debug)],
623            debug=debug,
624        )
625
626    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
627        """
628        Pause a job's service.
629        """
630        job = self.get_hidden_job(name, debug=debug)
631        job.daemon._write_stop_file('pause')
632        return self.run_command(
633            ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)],
634            debug=debug,
635        )
636
637    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
638        """
639        Delete a job's service.
640        """
641        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
642        job = self.get_hidden_job(name, debug=debug)
643
644        if not job.delete_after_completion:
645            _ = self.stop_job(name, debug=debug)
646            _ = self.run_command(
647                ['disable', self.get_service_name(name, debug=debug)],
648                debug=debug,
649            )
650
651        service_job_path = self.get_service_job_path(name, debug=debug)
652        try:
653            if service_job_path.exists():
654                shutil.rmtree(service_job_path)
655        except Exception as e:
656            warn(e)
657            return False, str(e)
658
659        service_logs_path = self.get_service_logs_path(name, debug=debug)
660        logs_paths = [
661            (SYSTEMD_LOGS_RESOURCES_PATH / name)
662            for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH)
663            if name.startswith(service_logs_path.name + '.')
664        ]
665        paths = [
666            self.get_service_file_path(name, debug=debug),
667            self.get_service_symlink_file_path(name, debug=debug),
668            self.get_socket_path(name, debug=debug),
669            self.get_result_path(name, debug=debug),
670        ] + logs_paths
671
672        for path in paths:
673            if path.exists():
674                try:
675                    path.unlink()
676                except Exception as e:
677                    warn(e)
678                    return False, str(e)
679
680        _ = job.delete()
681
682        return self.run_command(['daemon-reload'], debug=debug)
683
684    def get_logs(self, name: str, debug: bool = False) -> str:
685        """
686        Return a job's journal logs.
687        """
688        rotating_file = self.get_job_rotating_file(name, debug=debug)
689        return rotating_file.read()
690
691    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
692        """
693        Return a job's stop time.
694        """
695        job = self.get_hidden_job(name, debug=debug)
696        return job.stop_time
697
698    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
699        """
700        Return whether a job is blocking on stdin.
701        """
702        socket_path = self.get_socket_path(name, debug=debug)
703        blocking_path = socket_path.parent / (socket_path.name + '.block')
704        return blocking_path.exists()
705
706    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
707        """
708        Return the kwargs to the blocking prompt.
709        """
710        job = self.get_hidden_job(name, debug=debug)
711        return job.get_prompt_kwargs(debug=debug)
712
713    def get_job_rotating_file(self, name: str, debug: bool = False):
714        """
715        Return a `RotatingFile` for the job's log output.
716        """
717        from meerschaum.utils.daemon import RotatingFile
718        service_logs_path = self.get_service_logs_path(name, debug=debug)
719        return RotatingFile(service_logs_path)
720
721    async def monitor_logs_async(
722        self,
723        name: str,
724        *args,
725        debug: bool = False,
726        **kwargs
727    ):
728        """
729        Monitor a job's output.
730        """
731        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
732        job = self.get_hidden_job(name, debug=debug)
733        kwargs.update({
734            '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH,
735            '_log': self.get_job_rotating_file(name, debug=debug),
736            '_stdin_file': self.get_job_stdin_file(name, debug=debug),
737            'debug': debug,
738        })
739        await job.monitor_logs_async(*args, **kwargs)
740
741    def monitor_logs(self, *args, **kwargs):
742        """
743        Monitor a job's output.
744        """
745        asyncio.run(self.monitor_logs_async(*args, **kwargs))
JOB_METADATA_CACHE_SECONDS: int = 5
@make_executor
class SystemdExecutor(meerschaum.jobs._Executor.Executor):
 30@make_executor
 31class SystemdExecutor(Executor):
 32    """
 33    Execute Meerschaum jobs via `systemd`.
 34    """
 35
 36    def get_job_names(self, debug: bool = False) -> List[str]:
 37        """
 38        Return a list of existing jobs, including hidden ones.
 39        """
 40        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
 41        return [
 42            service_name[len('mrsm-'):(-1 * len('.service'))]
 43            for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH)
 44            if (
 45                service_name.startswith('mrsm-')
 46                and service_name.endswith('.service')
 47                ### Check for broken symlinks.
 48                and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists()
 49            )
 50        ]
 51
 52    def get_job_exists(self, name: str, debug: bool = False) -> bool:
 53        """
 54        Return whether a job exists.
 55        """
 56        user_services = self.get_job_names(debug=debug)
 57        if debug:
 58            dprint(f'Existing services: {user_services}')
 59        return name in user_services
 60
 61    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
 62        """
 63        Return a dictionary of `systemd` Jobs (including hidden jobs).
 64        """
 65        user_services = self.get_job_names(debug=debug)
 66        jobs = {
 67            name: Job(name, executor_keys=str(self))
 68            for name in user_services
 69        }
 70        return {
 71            name: job
 72            for name, job in jobs.items()
 73        }
 74
 75    def get_service_name(self, name: str, debug: bool = False) -> str:
 76        """
 77        Return a job's service name.
 78        """
 79        return f"mrsm-{name.replace(' ', '-')}.service"
 80
 81    def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
 82        """
 83        Return the path for the job's files under the root directory.
 84        """
 85        from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH
 86        return SYSTEMD_JOBS_RESOURCES_PATH / name
 87
 88    def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 89        """
 90        Return the path to where to create the service symlink.
 91        """
 92        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
 93        return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug)
 94
 95    def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 96        """
 97        Return the path to a Job's service file.
 98        """
 99        return (
100            self.get_service_job_path(name, debug=debug)
101            / self.get_service_name(name, debug=debug)
102        )
103
104    def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
105        """
106        Return the path to direct service logs to.
107        """
108        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
109        return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log')
110
111    def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
112        """
113        Return the path to the FIFO file.
114        """
115        return (
116            self.get_service_job_path(name, debug=debug)
117            / (self.get_service_name(name, debug=debug) + '.stdin')
118        )
119
120    def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
121        """
122        Return the path to the result file.
123        """
124        return (
125            self.get_service_job_path(name, debug=debug)
126            / (self.get_service_name(name, debug=debug) + '.result.json')
127        )
128
129    def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
130        """
131        Return the contents of the unit file.
132        """
133        service_logs_path = self.get_service_logs_path(name, debug=debug)
134        socket_path = self.get_socket_path(name, debug=debug)
135        result_path = self.get_result_path(name, debug=debug)
136        job = self.get_hidden_job(name, debug=debug)
137
138        sysargs_str = shlex.join(sysargs)
139        exec_str = f'{sys.executable} -m meerschaum {sysargs_str}'
140        mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()])
141        mrsm_env_vars = {
142            key: val
143            for key, val in os.environ.items()
144            if key in mrsm_env_var_names
145        }
146
147        ### Add new environment variables for the service process.
148        mrsm_env_vars.update({
149            STATIC_CONFIG['environment']['daemon_id']: name,
150            STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(),
151            STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(),
152            STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(),
153            STATIC_CONFIG['environment']['systemd_delete_job']: (
154                '1'
155                if job.delete_after_completion
156                else '0'
157            ),
158        })
159
160        ### Allow for user-defined environment variables.
161        mrsm_env_vars.update(job.env)
162
163        environment_lines = [
164            f"Environment={key}={shlex.quote(str(val))}"
165            for key, val in mrsm_env_vars.items()
166        ]
167        environment_str = '\n'.join(environment_lines)
168        service_name = self.get_service_name(name, debug=debug)
169
170        service_text = (
171            "[Unit]\n"
172            f"Description=Run the job '{name}'\n"
173            "\n"
174            "[Service]\n"
175            f"ExecStart={exec_str}\n"
176            "KillSignal=SIGTERM\n"
177            "Restart=always\n"
178            "RestartPreventExitStatus=0\n"
179            f"SyslogIdentifier={service_name}\n"
180            f"{environment_str}\n"
181            "\n"
182            "[Install]\n"
183            "WantedBy=default.target\n"
184        )
185        return service_text
186
187    def get_socket_file_text(self, name: str, debug: bool = False) -> str:
188        """
189        Return the contents of the socket file.
190        """
191        service_name = self.get_service_name(name, debug=debug)
192        socket_path = self.get_socket_path(name, debug=debug)
193        socket_text = (
194            "[Unit]\n"
195            f"BindsTo={service_name}\n"
196            "\n"
197            "[Socket]\n"
198            f"ListenFIFO={socket_path.as_posix()}\n"
199            "FileDescriptorName=stdin\n"
200            "RemoveOnStop=true\n"
201            "SocketMode=0660\n"
202        )
203        return socket_text
204
205    def get_hidden_job(
206        self,
207        name: str,
208        sysargs: Optional[List[str]] = None,
209        properties: Optional[Dict[str, Any]] = None,
210        debug: bool = False,
211    ):
212        """
213        Return the hidden "sister" job to store a job's parameters.
214        """
215        job = Job(
216            name,
217            sysargs,
218            executor_keys='local',
219            _properties=properties,
220            _rotating_log=self.get_job_rotating_file(name, debug=debug),
221            _stdin_file=self.get_job_stdin_file(name, debug=debug),
222            _status_hook=partial(self.get_job_status, name),
223            _result_hook=partial(self.get_job_result, name),
224            _externally_managed=True,
225        )
226        return job
227
228
229    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
230        """
231        Return metadata about a job.
232        """
233        now = time.perf_counter()
234
235        if '_jobs_metadata' not in self.__dict__:
236            self._jobs_metadata: Dict[str, Any] = {}
237
238        if name in self._jobs_metadata:
239            ts = self._jobs_metadata[name].get('timestamp', None)
240
241            if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS:
242                if debug:
243                    dprint(f"Retuning cached metadata for job '{name}'.")
244                return self._jobs_metadata[name]['metadata']
245
246        metadata = {
247            'sysargs': self.get_job_sysargs(name, debug=debug),
248            'result': self.get_job_result(name, debug=debug),
249            'restart': self.get_job_restart(name, debug=debug),
250            'daemon': {
251                'status': self.get_job_status(name, debug=debug),
252                'pid': self.get_job_pid(name, debug=debug),
253                'properties': self.get_job_properties(name, debug=debug),
254            },
255        }
256        self._jobs_metadata[name] = {
257            'timestamp': now,
258            'metadata': metadata,
259        }
260        return metadata
261
262    def get_job_restart(self, name: str, debug: bool = False) -> bool:
263        """
264        Return whether a job restarts.
265        """
266        from meerschaum.jobs._Job import RESTART_FLAGS
267        sysargs = self.get_job_sysargs(name, debug=debug)
268        if not sysargs:
269            return False
270
271        for flag in RESTART_FLAGS:
272            if flag in sysargs:
273                return True
274
275        return False
276
277    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
278        """
279        Return the properties for a job.
280        """
281        job = self.get_hidden_job(name, debug=debug)
282        return {
283            k: v for k, v in job.daemon.properties.items()
284            if k != 'externally_managed'
285        }
286
287    def get_job_process(self, name: str, debug: bool = False):
288        """
289        Return a `psutil.Process` for the job's PID.
290        """
291        pid = self.get_job_pid(name, debug=debug)
292        if pid is None:
293            return None
294
295        psutil = mrsm.attempt_import('psutil')
296        try:
297            return psutil.Process(pid)
298        except Exception:
299            return None
300
301    def get_job_status(self, name: str, debug: bool = False) -> str:
302        """
303        Return the job's service status.
304        """
305        output = self.run_command(
306            ['is-active', self.get_service_name(name, debug=debug)],
307            as_output=True,
308            debug=debug,
309        )
310
311        if output == 'activating':
312            return 'running'
313
314        if output == 'active':
315            process = self.get_job_process(name, debug=debug)
316            if process is None:
317                return 'stopped'
318
319            try:
320                if process.status() == 'stopped':
321                    return 'paused'
322            except Exception:
323                return 'stopped'
324
325            return 'running'
326
327        return 'stopped'
328
329    def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]:
330        """
331        Return the job's service PID.
332        """
333        from meerschaum.utils.misc import is_int
334
335        output = self.run_command(
336            [
337                'show',
338                self.get_service_name(name, debug=debug),
339                '--property=MainPID',
340            ],
341            as_output=True,
342            debug=debug,
343        )
344        if not output.startswith('MainPID='):
345            return None
346
347        pid_str = output[len('MainPID='):]
348        if pid_str == '0':
349            return None
350
351        if is_int(pid_str):
352            return int(pid_str)
353        
354        return None
355
356    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
357        """
358        Return when a job began running.
359        """
360        output = self.run_command(
361            [
362                'show',
363                self.get_service_name(name, debug=debug),
364                '--property=ActiveEnterTimestamp'
365            ],
366            as_output=True,
367            debug=debug,
368        )
369        if not output.startswith('ActiveEnterTimestamp'):
370            return None
371
372        dt_str = output.split('=')[-1]
373        if not dt_str:
374            return None
375
376        dateutil_parser = mrsm.attempt_import('dateutil.parser')
377        try:
378            dt = dateutil_parser.parse(dt_str)
379        except Exception as e:
380            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
381            return None
382
383        return dt.astimezone(timezone.utc).isoformat()
384
385    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
386        """
387        Return when a job began running.
388        """
389        output = self.run_command(
390            [
391                'show',
392                self.get_service_name(name, debug=debug),
393                '--property=InactiveEnterTimestamp'
394            ],
395            as_output=True,
396            debug=debug,
397        )
398        if not output.startswith('InactiveEnterTimestamp'):
399            return None
400
401        dt_str = output.split('=')[-1]
402        if not dt_str:
403            return None
404
405        dateutil_parser = mrsm.attempt_import('dateutil.parser')
406
407        try:
408            dt = dateutil_parser.parse(dt_str)
409        except Exception as e:
410            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
411            return None
412        return dt.astimezone(timezone.utc).isoformat()
413
414    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
415        """
416        Return when a job was paused.
417        """
418        job = self.get_hidden_job(name, debug=debug)
419        if self.get_job_status(name, debug=debug) != 'paused':
420            return None
421
422        stop_time = job.stop_time
423        if stop_time is None:
424            return None
425
426        return stop_time.isoformat()
427
428    def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple:
429        """
430        Return the job's result SuccessTuple.
431        """
432        result_path = self.get_result_path(name, debug=debug)
433        if not result_path.exists():
434            return False, "No result available."
435
436        try:
437            with open(result_path, 'r', encoding='utf-8') as f:
438                result = json.load(f)
439        except Exception:
440            return False, f"Could not read result for Job '{name}'."
441
442        return tuple(result)
443
444    def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]:
445        """
446        Return the sysargs from the service file.
447        """
448        service_file_path = self.get_service_file_path(name, debug=debug)
449        if not service_file_path.exists():
450            return []
451
452        with open(service_file_path, 'r', encoding='utf-8') as f:
453            service_lines = f.readlines()
454
455        for line in service_lines:
456            if line.startswith('ExecStart='):
457                sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0]
458                return shlex.split(sysargs_str)
459
460        return []
461
462    def run_command(
463        self,
464        command_args: List[str],
465        as_output: bool = False,
466        debug: bool = False,
467    ) -> Union[SuccessTuple, str]:
468        """
469        Run a `systemd` command and return success.
470
471        Parameters
472        ----------
473        command_args: List[str]
474            The command to pass to `systemctl --user`.
475
476        as_output: bool, default False
477            If `True`, return the process stdout output.
478            Defaults to a `SuccessTuple`.
479
480        Returns
481        -------
482        A `SuccessTuple` indicating success or a str for the process output.
483        """
484        from meerschaum.utils.process import run_process
485
486        if command_args[:2] != ['systemctl', '--user']:
487            command_args = ['systemctl', '--user'] + command_args
488
489        if debug:
490            dprint(shlex.join(command_args))
491
492        proc = run_process(
493            command_args,
494            foreground=False,
495            as_proc=True,
496            capture_output=True,
497            text=True,
498        )
499        stdout, stderr = proc.communicate()
500        if debug:
501            dprint(f"{stdout}")
502
503        if as_output:
504            return stdout.strip()
505            
506        command_success = proc.wait() == 0
507        command_msg = (
508            "Success"
509            if command_success
510            else f"Failed to execute command `{shlex.join(command_args)}`."
511        )
512        return command_success, command_msg
513
514    def get_job_stdin_file(self, name: str, debug: bool = False):
515        """
516        Return a `StdinFile` for the job.
517        """
518        from meerschaum.utils.daemon import StdinFile
519        if '_stdin_files' not in self.__dict__:
520            self._stdin_files: Dict[str, StdinFile] = {}
521
522        if name not in self._stdin_files:
523            socket_path = self.get_socket_path(name, debug=debug)
524            socket_path.parent.mkdir(parents=True, exist_ok=True)
525            self._stdin_files[name] = StdinFile(socket_path)
526
527        return self._stdin_files[name]
528
529    def create_job(
530        self,
531        name: str,
532        sysargs: List[str],
533        properties: Optional[Dict[str, Any]] = None,
534        debug: bool = False,
535    ) -> SuccessTuple:
536        """
537        Create a job as a service to be run by `systemd`.
538        """
539        from meerschaum.utils.misc import make_symlink
540        service_name = self.get_service_name(name, debug=debug)
541        service_file_path = self.get_service_file_path(name, debug=debug)
542        service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug)
543        socket_stdin = self.get_job_stdin_file(name, debug=debug)
544        _ = socket_stdin.file_handler
545
546        ### Init the externally_managed file.
547        ### NOTE: We must write the pickle file in addition to the properties file.
548        job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug)
549        job._set_externally_managed()
550        pickle_success, pickle_msg = job.daemon.write_pickle()
551        if not pickle_success:
552            return pickle_success, pickle_msg
553        properties_success, properties_msg = job.daemon.write_properties()
554        if not properties_success:
555            return properties_success, properties_msg
556
557        service_file_path.parent.mkdir(parents=True, exist_ok=True)
558
559        with open(service_file_path, 'w+', encoding='utf-8') as f:
560            f.write(self.get_service_file_text(name, sysargs, debug=debug))
561
562        symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path)
563        if not symlink_success:
564            return symlink_success, symlink_msg
565
566        commands = [
567            ['daemon-reload'],
568            ['enable', service_name],
569            ['start', service_name],
570        ]
571
572        fails = 0
573        for command_list in commands:
574            command_success, command_msg = self.run_command(command_list, debug=debug)
575            if not command_success:
576                fails += 1
577
578        if fails > 1:
579            return False, "Failed to reload systemd."
580
581        return True, f"Started job '{name}' via systemd."
582
583    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
584        """
585        Stop a job's service.
586        """
587        job = self.get_hidden_job(name, debug=debug)
588        job.daemon._remove_stop_file()
589
590        status = self.get_job_status(name, debug=debug)
591        if status == 'paused':
592            return self.run_command(
593                ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)],
594                debug=debug,
595            )
596
597        return self.run_command(
598            ['start', self.get_service_name(name, debug=debug)],
599            debug=debug,
600        )
601
602    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
603        """
604        Stop a job's service.
605        """
606        job = self.get_hidden_job(name, debug=debug)
607        job.daemon._write_stop_file('quit')
608        sigint_success, sigint_msg = self.run_command(
609            ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)],
610            debug=debug,
611        )
612
613        check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds')
614        loop_start = time.perf_counter()
615        timeout_seconds = get_config('jobs', 'timeout_seconds')
616        while (time.perf_counter() - loop_start) < timeout_seconds:
617            if self.get_job_status(name, debug=debug) == 'stopped':
618                return True, 'Success'
619
620            time.sleep(check_timeout_interval)
621
622        return self.run_command(
623            ['stop', self.get_service_name(name, debug=debug)],
624            debug=debug,
625        )
626
627    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
628        """
629        Pause a job's service.
630        """
631        job = self.get_hidden_job(name, debug=debug)
632        job.daemon._write_stop_file('pause')
633        return self.run_command(
634            ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)],
635            debug=debug,
636        )
637
638    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
639        """
640        Delete a job's service.
641        """
642        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
643        job = self.get_hidden_job(name, debug=debug)
644
645        if not job.delete_after_completion:
646            _ = self.stop_job(name, debug=debug)
647            _ = self.run_command(
648                ['disable', self.get_service_name(name, debug=debug)],
649                debug=debug,
650            )
651
652        service_job_path = self.get_service_job_path(name, debug=debug)
653        try:
654            if service_job_path.exists():
655                shutil.rmtree(service_job_path)
656        except Exception as e:
657            warn(e)
658            return False, str(e)
659
660        service_logs_path = self.get_service_logs_path(name, debug=debug)
661        logs_paths = [
662            (SYSTEMD_LOGS_RESOURCES_PATH / name)
663            for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH)
664            if name.startswith(service_logs_path.name + '.')
665        ]
666        paths = [
667            self.get_service_file_path(name, debug=debug),
668            self.get_service_symlink_file_path(name, debug=debug),
669            self.get_socket_path(name, debug=debug),
670            self.get_result_path(name, debug=debug),
671        ] + logs_paths
672
673        for path in paths:
674            if path.exists():
675                try:
676                    path.unlink()
677                except Exception as e:
678                    warn(e)
679                    return False, str(e)
680
681        _ = job.delete()
682
683        return self.run_command(['daemon-reload'], debug=debug)
684
685    def get_logs(self, name: str, debug: bool = False) -> str:
686        """
687        Return a job's journal logs.
688        """
689        rotating_file = self.get_job_rotating_file(name, debug=debug)
690        return rotating_file.read()
691
692    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
693        """
694        Return a job's stop time.
695        """
696        job = self.get_hidden_job(name, debug=debug)
697        return job.stop_time
698
699    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
700        """
701        Return whether a job is blocking on stdin.
702        """
703        socket_path = self.get_socket_path(name, debug=debug)
704        blocking_path = socket_path.parent / (socket_path.name + '.block')
705        return blocking_path.exists()
706
707    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
708        """
709        Return the kwargs to the blocking prompt.
710        """
711        job = self.get_hidden_job(name, debug=debug)
712        return job.get_prompt_kwargs(debug=debug)
713
714    def get_job_rotating_file(self, name: str, debug: bool = False):
715        """
716        Return a `RotatingFile` for the job's log output.
717        """
718        from meerschaum.utils.daemon import RotatingFile
719        service_logs_path = self.get_service_logs_path(name, debug=debug)
720        return RotatingFile(service_logs_path)
721
722    async def monitor_logs_async(
723        self,
724        name: str,
725        *args,
726        debug: bool = False,
727        **kwargs
728    ):
729        """
730        Monitor a job's output.
731        """
732        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
733        job = self.get_hidden_job(name, debug=debug)
734        kwargs.update({
735            '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH,
736            '_log': self.get_job_rotating_file(name, debug=debug),
737            '_stdin_file': self.get_job_stdin_file(name, debug=debug),
738            'debug': debug,
739        })
740        await job.monitor_logs_async(*args, **kwargs)
741
742    def monitor_logs(self, *args, **kwargs):
743        """
744        Monitor a job's output.
745        """
746        asyncio.run(self.monitor_logs_async(*args, **kwargs))

Execute Meerschaum jobs via systemd.

def get_job_names(self, debug: bool = False) -> List[str]:
36    def get_job_names(self, debug: bool = False) -> List[str]:
37        """
38        Return a list of existing jobs, including hidden ones.
39        """
40        from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH
41        return [
42            service_name[len('mrsm-'):(-1 * len('.service'))]
43            for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH)
44            if (
45                service_name.startswith('mrsm-')
46                and service_name.endswith('.service')
47                ### Check for broken symlinks.
48                and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists()
49            )
50        ]

Return a list of existing jobs, including hidden ones.

def get_job_exists(self, name: str, debug: bool = False) -> bool:
52    def get_job_exists(self, name: str, debug: bool = False) -> bool:
53        """
54        Return whether a job exists.
55        """
56        user_services = self.get_job_names(debug=debug)
57        if debug:
58            dprint(f'Existing services: {user_services}')
59        return name in user_services

Return whether a job exists.

def get_jobs(self, debug: bool = False) -> Dict[str, meerschaum.Job]:
61    def get_jobs(self, debug: bool = False) -> Dict[str, Job]:
62        """
63        Return a dictionary of `systemd` Jobs (including hidden jobs).
64        """
65        user_services = self.get_job_names(debug=debug)
66        jobs = {
67            name: Job(name, executor_keys=str(self))
68            for name in user_services
69        }
70        return {
71            name: job
72            for name, job in jobs.items()
73        }

Return a dictionary of systemd Jobs (including hidden jobs).

def get_service_name(self, name: str, debug: bool = False) -> str:
75    def get_service_name(self, name: str, debug: bool = False) -> str:
76        """
77        Return a job's service name.
78        """
79        return f"mrsm-{name.replace(' ', '-')}.service"

Return a job's service name.

def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
81    def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path:
82        """
83        Return the path for the job's files under the root directory.
84        """
85        from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH
86        return SYSTEMD_JOBS_RESOURCES_PATH / name

Return the path for the job's files under the root directory.

def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 95    def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path:
 96        """
 97        Return the path to a Job's service file.
 98        """
 99        return (
100            self.get_service_job_path(name, debug=debug)
101            / self.get_service_name(name, debug=debug)
102        )

Return the path to a Job's service file.

def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
104    def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path:
105        """
106        Return the path to direct service logs to.
107        """
108        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
109        return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log')

Return the path to direct service logs to.

def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
111    def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path:
112        """
113        Return the path to the FIFO file.
114        """
115        return (
116            self.get_service_job_path(name, debug=debug)
117            / (self.get_service_name(name, debug=debug) + '.stdin')
118        )

Return the path to the FIFO file.

def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
120    def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path:
121        """
122        Return the path to the result file.
123        """
124        return (
125            self.get_service_job_path(name, debug=debug)
126            / (self.get_service_name(name, debug=debug) + '.result.json')
127        )

Return the path to the result file.

def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
129    def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str:
130        """
131        Return the contents of the unit file.
132        """
133        service_logs_path = self.get_service_logs_path(name, debug=debug)
134        socket_path = self.get_socket_path(name, debug=debug)
135        result_path = self.get_result_path(name, debug=debug)
136        job = self.get_hidden_job(name, debug=debug)
137
138        sysargs_str = shlex.join(sysargs)
139        exec_str = f'{sys.executable} -m meerschaum {sysargs_str}'
140        mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()])
141        mrsm_env_vars = {
142            key: val
143            for key, val in os.environ.items()
144            if key in mrsm_env_var_names
145        }
146
147        ### Add new environment variables for the service process.
148        mrsm_env_vars.update({
149            STATIC_CONFIG['environment']['daemon_id']: name,
150            STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(),
151            STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(),
152            STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(),
153            STATIC_CONFIG['environment']['systemd_delete_job']: (
154                '1'
155                if job.delete_after_completion
156                else '0'
157            ),
158        })
159
160        ### Allow for user-defined environment variables.
161        mrsm_env_vars.update(job.env)
162
163        environment_lines = [
164            f"Environment={key}={shlex.quote(str(val))}"
165            for key, val in mrsm_env_vars.items()
166        ]
167        environment_str = '\n'.join(environment_lines)
168        service_name = self.get_service_name(name, debug=debug)
169
170        service_text = (
171            "[Unit]\n"
172            f"Description=Run the job '{name}'\n"
173            "\n"
174            "[Service]\n"
175            f"ExecStart={exec_str}\n"
176            "KillSignal=SIGTERM\n"
177            "Restart=always\n"
178            "RestartPreventExitStatus=0\n"
179            f"SyslogIdentifier={service_name}\n"
180            f"{environment_str}\n"
181            "\n"
182            "[Install]\n"
183            "WantedBy=default.target\n"
184        )
185        return service_text

Return the contents of the unit file.

def get_socket_file_text(self, name: str, debug: bool = False) -> str:
187    def get_socket_file_text(self, name: str, debug: bool = False) -> str:
188        """
189        Return the contents of the socket file.
190        """
191        service_name = self.get_service_name(name, debug=debug)
192        socket_path = self.get_socket_path(name, debug=debug)
193        socket_text = (
194            "[Unit]\n"
195            f"BindsTo={service_name}\n"
196            "\n"
197            "[Socket]\n"
198            f"ListenFIFO={socket_path.as_posix()}\n"
199            "FileDescriptorName=stdin\n"
200            "RemoveOnStop=true\n"
201            "SocketMode=0660\n"
202        )
203        return socket_text

Return the contents of the socket file.

def get_hidden_job( self, name: str, sysargs: Optional[List[str]] = None, properties: Optional[Dict[str, Any]] = None, debug: bool = False):
205    def get_hidden_job(
206        self,
207        name: str,
208        sysargs: Optional[List[str]] = None,
209        properties: Optional[Dict[str, Any]] = None,
210        debug: bool = False,
211    ):
212        """
213        Return the hidden "sister" job to store a job's parameters.
214        """
215        job = Job(
216            name,
217            sysargs,
218            executor_keys='local',
219            _properties=properties,
220            _rotating_log=self.get_job_rotating_file(name, debug=debug),
221            _stdin_file=self.get_job_stdin_file(name, debug=debug),
222            _status_hook=partial(self.get_job_status, name),
223            _result_hook=partial(self.get_job_result, name),
224            _externally_managed=True,
225        )
226        return job

Return the hidden "sister" job to store a job's parameters.

def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
229    def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]:
230        """
231        Return metadata about a job.
232        """
233        now = time.perf_counter()
234
235        if '_jobs_metadata' not in self.__dict__:
236            self._jobs_metadata: Dict[str, Any] = {}
237
238        if name in self._jobs_metadata:
239            ts = self._jobs_metadata[name].get('timestamp', None)
240
241            if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS:
242                if debug:
243                    dprint(f"Retuning cached metadata for job '{name}'.")
244                return self._jobs_metadata[name]['metadata']
245
246        metadata = {
247            'sysargs': self.get_job_sysargs(name, debug=debug),
248            'result': self.get_job_result(name, debug=debug),
249            'restart': self.get_job_restart(name, debug=debug),
250            'daemon': {
251                'status': self.get_job_status(name, debug=debug),
252                'pid': self.get_job_pid(name, debug=debug),
253                'properties': self.get_job_properties(name, debug=debug),
254            },
255        }
256        self._jobs_metadata[name] = {
257            'timestamp': now,
258            'metadata': metadata,
259        }
260        return metadata

Return metadata about a job.

def get_job_restart(self, name: str, debug: bool = False) -> bool:
262    def get_job_restart(self, name: str, debug: bool = False) -> bool:
263        """
264        Return whether a job restarts.
265        """
266        from meerschaum.jobs._Job import RESTART_FLAGS
267        sysargs = self.get_job_sysargs(name, debug=debug)
268        if not sysargs:
269            return False
270
271        for flag in RESTART_FLAGS:
272            if flag in sysargs:
273                return True
274
275        return False

Return whether a job restarts.

def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
277    def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]:
278        """
279        Return the properties for a job.
280        """
281        job = self.get_hidden_job(name, debug=debug)
282        return {
283            k: v for k, v in job.daemon.properties.items()
284            if k != 'externally_managed'
285        }

Return the properties for a job.

def get_job_process(self, name: str, debug: bool = False):
287    def get_job_process(self, name: str, debug: bool = False):
288        """
289        Return a `psutil.Process` for the job's PID.
290        """
291        pid = self.get_job_pid(name, debug=debug)
292        if pid is None:
293            return None
294
295        psutil = mrsm.attempt_import('psutil')
296        try:
297            return psutil.Process(pid)
298        except Exception:
299            return None

Return a psutil.Process for the job's PID.

def get_job_status(self, name: str, debug: bool = False) -> str:
301    def get_job_status(self, name: str, debug: bool = False) -> str:
302        """
303        Return the job's service status.
304        """
305        output = self.run_command(
306            ['is-active', self.get_service_name(name, debug=debug)],
307            as_output=True,
308            debug=debug,
309        )
310
311        if output == 'activating':
312            return 'running'
313
314        if output == 'active':
315            process = self.get_job_process(name, debug=debug)
316            if process is None:
317                return 'stopped'
318
319            try:
320                if process.status() == 'stopped':
321                    return 'paused'
322            except Exception:
323                return 'stopped'
324
325            return 'running'
326
327        return 'stopped'

Return the job's service status.

def get_job_pid(self, name: str, debug: bool = False) -> Optional[int]:
329    def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]:
330        """
331        Return the job's service PID.
332        """
333        from meerschaum.utils.misc import is_int
334
335        output = self.run_command(
336            [
337                'show',
338                self.get_service_name(name, debug=debug),
339                '--property=MainPID',
340            ],
341            as_output=True,
342            debug=debug,
343        )
344        if not output.startswith('MainPID='):
345            return None
346
347        pid_str = output[len('MainPID='):]
348        if pid_str == '0':
349            return None
350
351        if is_int(pid_str):
352            return int(pid_str)
353        
354        return None

Return the job's service PID.

def get_job_began(self, name: str, debug: bool = False) -> Optional[str]:
356    def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]:
357        """
358        Return when a job began running.
359        """
360        output = self.run_command(
361            [
362                'show',
363                self.get_service_name(name, debug=debug),
364                '--property=ActiveEnterTimestamp'
365            ],
366            as_output=True,
367            debug=debug,
368        )
369        if not output.startswith('ActiveEnterTimestamp'):
370            return None
371
372        dt_str = output.split('=')[-1]
373        if not dt_str:
374            return None
375
376        dateutil_parser = mrsm.attempt_import('dateutil.parser')
377        try:
378            dt = dateutil_parser.parse(dt_str)
379        except Exception as e:
380            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
381            return None
382
383        return dt.astimezone(timezone.utc).isoformat()

Return when a job began running.

def get_job_ended(self, name: str, debug: bool = False) -> Optional[str]:
385    def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]:
386        """
387        Return when a job began running.
388        """
389        output = self.run_command(
390            [
391                'show',
392                self.get_service_name(name, debug=debug),
393                '--property=InactiveEnterTimestamp'
394            ],
395            as_output=True,
396            debug=debug,
397        )
398        if not output.startswith('InactiveEnterTimestamp'):
399            return None
400
401        dt_str = output.split('=')[-1]
402        if not dt_str:
403            return None
404
405        dateutil_parser = mrsm.attempt_import('dateutil.parser')
406
407        try:
408            dt = dateutil_parser.parse(dt_str)
409        except Exception as e:
410            warn(f"Cannot parse '{output}' as a datetime:\n{e}")
411            return None
412        return dt.astimezone(timezone.utc).isoformat()

Return when a job began running.

def get_job_paused(self, name: str, debug: bool = False) -> Optional[str]:
414    def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]:
415        """
416        Return when a job was paused.
417        """
418        job = self.get_hidden_job(name, debug=debug)
419        if self.get_job_status(name, debug=debug) != 'paused':
420            return None
421
422        stop_time = job.stop_time
423        if stop_time is None:
424            return None
425
426        return stop_time.isoformat()

Return when a job was paused.

def get_job_result(self, name: str, debug: bool = False) -> Tuple[bool, str]:
428    def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple:
429        """
430        Return the job's result SuccessTuple.
431        """
432        result_path = self.get_result_path(name, debug=debug)
433        if not result_path.exists():
434            return False, "No result available."
435
436        try:
437            with open(result_path, 'r', encoding='utf-8') as f:
438                result = json.load(f)
439        except Exception:
440            return False, f"Could not read result for Job '{name}'."
441
442        return tuple(result)

Return the job's result SuccessTuple.

def get_job_sysargs(self, name: str, debug: bool = False) -> Optional[List[str]]:
444    def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]:
445        """
446        Return the sysargs from the service file.
447        """
448        service_file_path = self.get_service_file_path(name, debug=debug)
449        if not service_file_path.exists():
450            return []
451
452        with open(service_file_path, 'r', encoding='utf-8') as f:
453            service_lines = f.readlines()
454
455        for line in service_lines:
456            if line.startswith('ExecStart='):
457                sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0]
458                return shlex.split(sysargs_str)
459
460        return []

Return the sysargs from the service file.

def run_command( self, command_args: List[str], as_output: bool = False, debug: bool = False) -> Union[Tuple[bool, str], str]:
462    def run_command(
463        self,
464        command_args: List[str],
465        as_output: bool = False,
466        debug: bool = False,
467    ) -> Union[SuccessTuple, str]:
468        """
469        Run a `systemd` command and return success.
470
471        Parameters
472        ----------
473        command_args: List[str]
474            The command to pass to `systemctl --user`.
475
476        as_output: bool, default False
477            If `True`, return the process stdout output.
478            Defaults to a `SuccessTuple`.
479
480        Returns
481        -------
482        A `SuccessTuple` indicating success or a str for the process output.
483        """
484        from meerschaum.utils.process import run_process
485
486        if command_args[:2] != ['systemctl', '--user']:
487            command_args = ['systemctl', '--user'] + command_args
488
489        if debug:
490            dprint(shlex.join(command_args))
491
492        proc = run_process(
493            command_args,
494            foreground=False,
495            as_proc=True,
496            capture_output=True,
497            text=True,
498        )
499        stdout, stderr = proc.communicate()
500        if debug:
501            dprint(f"{stdout}")
502
503        if as_output:
504            return stdout.strip()
505            
506        command_success = proc.wait() == 0
507        command_msg = (
508            "Success"
509            if command_success
510            else f"Failed to execute command `{shlex.join(command_args)}`."
511        )
512        return command_success, command_msg

Run a systemd command and return success.

Parameters
  • command_args (List[str]): The command to pass to systemctl --user.
  • as_output (bool, default False): If True, return the process stdout output. Defaults to a SuccessTuple.
Returns
  • A SuccessTuple indicating success or a str for the process output.
def get_job_stdin_file(self, name: str, debug: bool = False):
514    def get_job_stdin_file(self, name: str, debug: bool = False):
515        """
516        Return a `StdinFile` for the job.
517        """
518        from meerschaum.utils.daemon import StdinFile
519        if '_stdin_files' not in self.__dict__:
520            self._stdin_files: Dict[str, StdinFile] = {}
521
522        if name not in self._stdin_files:
523            socket_path = self.get_socket_path(name, debug=debug)
524            socket_path.parent.mkdir(parents=True, exist_ok=True)
525            self._stdin_files[name] = StdinFile(socket_path)
526
527        return self._stdin_files[name]

Return a StdinFile for the job.

def create_job( self, name: str, sysargs: List[str], properties: Optional[Dict[str, Any]] = None, debug: bool = False) -> Tuple[bool, str]:
529    def create_job(
530        self,
531        name: str,
532        sysargs: List[str],
533        properties: Optional[Dict[str, Any]] = None,
534        debug: bool = False,
535    ) -> SuccessTuple:
536        """
537        Create a job as a service to be run by `systemd`.
538        """
539        from meerschaum.utils.misc import make_symlink
540        service_name = self.get_service_name(name, debug=debug)
541        service_file_path = self.get_service_file_path(name, debug=debug)
542        service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug)
543        socket_stdin = self.get_job_stdin_file(name, debug=debug)
544        _ = socket_stdin.file_handler
545
546        ### Init the externally_managed file.
547        ### NOTE: We must write the pickle file in addition to the properties file.
548        job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug)
549        job._set_externally_managed()
550        pickle_success, pickle_msg = job.daemon.write_pickle()
551        if not pickle_success:
552            return pickle_success, pickle_msg
553        properties_success, properties_msg = job.daemon.write_properties()
554        if not properties_success:
555            return properties_success, properties_msg
556
557        service_file_path.parent.mkdir(parents=True, exist_ok=True)
558
559        with open(service_file_path, 'w+', encoding='utf-8') as f:
560            f.write(self.get_service_file_text(name, sysargs, debug=debug))
561
562        symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path)
563        if not symlink_success:
564            return symlink_success, symlink_msg
565
566        commands = [
567            ['daemon-reload'],
568            ['enable', service_name],
569            ['start', service_name],
570        ]
571
572        fails = 0
573        for command_list in commands:
574            command_success, command_msg = self.run_command(command_list, debug=debug)
575            if not command_success:
576                fails += 1
577
578        if fails > 1:
579            return False, "Failed to reload systemd."
580
581        return True, f"Started job '{name}' via systemd."

Create a job as a service to be run by systemd.

def start_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
583    def start_job(self, name: str, debug: bool = False) -> SuccessTuple:
584        """
585        Stop a job's service.
586        """
587        job = self.get_hidden_job(name, debug=debug)
588        job.daemon._remove_stop_file()
589
590        status = self.get_job_status(name, debug=debug)
591        if status == 'paused':
592            return self.run_command(
593                ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)],
594                debug=debug,
595            )
596
597        return self.run_command(
598            ['start', self.get_service_name(name, debug=debug)],
599            debug=debug,
600        )

Stop a job's service.

def stop_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
602    def stop_job(self, name: str, debug: bool = False) -> SuccessTuple:
603        """
604        Stop a job's service.
605        """
606        job = self.get_hidden_job(name, debug=debug)
607        job.daemon._write_stop_file('quit')
608        sigint_success, sigint_msg = self.run_command(
609            ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)],
610            debug=debug,
611        )
612
613        check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds')
614        loop_start = time.perf_counter()
615        timeout_seconds = get_config('jobs', 'timeout_seconds')
616        while (time.perf_counter() - loop_start) < timeout_seconds:
617            if self.get_job_status(name, debug=debug) == 'stopped':
618                return True, 'Success'
619
620            time.sleep(check_timeout_interval)
621
622        return self.run_command(
623            ['stop', self.get_service_name(name, debug=debug)],
624            debug=debug,
625        )

Stop a job's service.

def pause_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
627    def pause_job(self, name: str, debug: bool = False) -> SuccessTuple:
628        """
629        Pause a job's service.
630        """
631        job = self.get_hidden_job(name, debug=debug)
632        job.daemon._write_stop_file('pause')
633        return self.run_command(
634            ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)],
635            debug=debug,
636        )

Pause a job's service.

def delete_job(self, name: str, debug: bool = False) -> Tuple[bool, str]:
638    def delete_job(self, name: str, debug: bool = False) -> SuccessTuple:
639        """
640        Delete a job's service.
641        """
642        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
643        job = self.get_hidden_job(name, debug=debug)
644
645        if not job.delete_after_completion:
646            _ = self.stop_job(name, debug=debug)
647            _ = self.run_command(
648                ['disable', self.get_service_name(name, debug=debug)],
649                debug=debug,
650            )
651
652        service_job_path = self.get_service_job_path(name, debug=debug)
653        try:
654            if service_job_path.exists():
655                shutil.rmtree(service_job_path)
656        except Exception as e:
657            warn(e)
658            return False, str(e)
659
660        service_logs_path = self.get_service_logs_path(name, debug=debug)
661        logs_paths = [
662            (SYSTEMD_LOGS_RESOURCES_PATH / name)
663            for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH)
664            if name.startswith(service_logs_path.name + '.')
665        ]
666        paths = [
667            self.get_service_file_path(name, debug=debug),
668            self.get_service_symlink_file_path(name, debug=debug),
669            self.get_socket_path(name, debug=debug),
670            self.get_result_path(name, debug=debug),
671        ] + logs_paths
672
673        for path in paths:
674            if path.exists():
675                try:
676                    path.unlink()
677                except Exception as e:
678                    warn(e)
679                    return False, str(e)
680
681        _ = job.delete()
682
683        return self.run_command(['daemon-reload'], debug=debug)

Delete a job's service.

def get_logs(self, name: str, debug: bool = False) -> str:
685    def get_logs(self, name: str, debug: bool = False) -> str:
686        """
687        Return a job's journal logs.
688        """
689        rotating_file = self.get_job_rotating_file(name, debug=debug)
690        return rotating_file.read()

Return a job's journal logs.

def get_job_stop_time(self, name: str, debug: bool = False) -> Optional[datetime.datetime]:
692    def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]:
693        """
694        Return a job's stop time.
695        """
696        job = self.get_hidden_job(name, debug=debug)
697        return job.stop_time

Return a job's stop time.

def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
699    def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool:
700        """
701        Return whether a job is blocking on stdin.
702        """
703        socket_path = self.get_socket_path(name, debug=debug)
704        blocking_path = socket_path.parent / (socket_path.name + '.block')
705        return blocking_path.exists()

Return whether a job is blocking on stdin.

def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
707    def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]:
708        """
709        Return the kwargs to the blocking prompt.
710        """
711        job = self.get_hidden_job(name, debug=debug)
712        return job.get_prompt_kwargs(debug=debug)

Return the kwargs to the blocking prompt.

def get_job_rotating_file(self, name: str, debug: bool = False):
714    def get_job_rotating_file(self, name: str, debug: bool = False):
715        """
716        Return a `RotatingFile` for the job's log output.
717        """
718        from meerschaum.utils.daemon import RotatingFile
719        service_logs_path = self.get_service_logs_path(name, debug=debug)
720        return RotatingFile(service_logs_path)

Return a RotatingFile for the job's log output.

async def monitor_logs_async(self, name: str, *args, debug: bool = False, **kwargs):
722    async def monitor_logs_async(
723        self,
724        name: str,
725        *args,
726        debug: bool = False,
727        **kwargs
728    ):
729        """
730        Monitor a job's output.
731        """
732        from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH
733        job = self.get_hidden_job(name, debug=debug)
734        kwargs.update({
735            '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH,
736            '_log': self.get_job_rotating_file(name, debug=debug),
737            '_stdin_file': self.get_job_stdin_file(name, debug=debug),
738            'debug': debug,
739        })
740        await job.monitor_logs_async(*args, **kwargs)

Monitor a job's output.

def monitor_logs(self, *args, **kwargs):
742    def monitor_logs(self, *args, **kwargs):
743        """
744        Monitor a job's output.
745        """
746        asyncio.run(self.monitor_logs_async(*args, **kwargs))

Monitor a job's output.