meerschaum.utils.daemon.Daemon

Manage running daemons via the Daemon class.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Manage running daemons via the Daemon class.
   7"""
   8
   9from __future__ import annotations
  10import os
  11import importlib
  12import pathlib
  13import json
  14import shutil
  15import signal
  16import time
  17import traceback
  18from functools import partial
  19from datetime import datetime, timezone
  20
  21import meerschaum as mrsm
  22from meerschaum.utils.typing import (
  23    Optional, Dict, Any, SuccessTuple, Callable, List, Union,
  24    is_success_tuple, Tuple,
  25)
  26from meerschaum.config import get_config
  27from meerschaum.config.static import STATIC_CONFIG
  28from meerschaum.config._paths import (
  29    DAEMON_RESOURCES_PATH, LOGS_RESOURCES_PATH, DAEMON_ERROR_LOG_PATH,
  30)
  31from meerschaum.config._patch import apply_patch_to_config
  32from meerschaum.utils.warnings import warn, error
  33from meerschaum.utils.packages import attempt_import
  34from meerschaum.utils.venv import venv_exec
  35from meerschaum.utils.daemon._names import get_new_daemon_name
  36from meerschaum.utils.daemon.RotatingFile import RotatingFile
  37from meerschaum.utils.daemon.StdinFile import StdinFile
  38from meerschaum.utils.threading import RepeatTimer
  39from meerschaum.__main__ import _close_pools
  40
  41_daemons = []
  42_results = {}
  43
  44class Daemon:
  45    """
  46    Daemonize Python functions into background processes.
  47
  48    Examples
  49    --------
  50    >>> import meerschaum as mrsm
  51    >>> from meerschaum.utils.daemons import Daemon
  52    >>> daemon = Daemon(print, ('hi',))
  53    >>> success, msg = daemon.run()
  54    >>> print(daemon.log_text)
  55
  56    2024-07-29 18:03 | hi
  57    2024-07-29 18:03 |
  58    >>> daemon.run(allow_dirty_run=True)
  59    >>> print(daemon.log_text)
  60
  61    2024-07-29 18:03 | hi
  62    2024-07-29 18:03 |
  63    2024-07-29 18:05 | hi
  64    2024-07-29 18:05 |
  65    >>> mrsm.pprint(daemon.properties)
  66    {
  67        'label': 'print',
  68        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  69        'result': None,
  70        'process': {'ended': '2024-07-29T18:03:33.752806'}
  71    }
  72
  73    """
  74
  75    def __new__(
  76        cls,
  77        *args,
  78        daemon_id: Optional[str] = None,
  79        **kw
  80    ):
  81        """
  82        If a daemon_id is provided and already exists, read from its pickle file.
  83        """
  84        instance = super(Daemon, cls).__new__(cls)
  85        if daemon_id is not None:
  86            instance.daemon_id = daemon_id
  87            if instance.pickle_path.exists():
  88                instance = instance.read_pickle()
  89        return instance
  90
  91    @classmethod
  92    def from_properties_file(cls, daemon_id: str) -> Daemon:
  93        """
  94        Return a Daemon from a properties dictionary.
  95        """
  96        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  97        if not properties_path.exists():
  98            raise OSError(f"Properties file '{properties_path}' does not exist.")
  99
 100        try:
 101            with open(properties_path, 'r', encoding='utf-8') as f:
 102                properties = json.load(f)
 103        except Exception:
 104            properties = {}
 105
 106        if not properties:
 107            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 108
 109        daemon_id = properties_path.parent.name
 110        target_cf = properties.get('target', {})
 111        target_module_name = target_cf.get('module', None)
 112        target_function_name = target_cf.get('name', None)
 113        target_args = target_cf.get('args', None)
 114        target_kw = target_cf.get('kw', None)
 115        label = properties.get('label', None)
 116
 117        if None in [
 118            target_module_name,
 119            target_function_name,
 120            target_args,
 121            target_kw,
 122        ]:
 123            raise ValueError("Missing target function information.")
 124
 125        target_module = importlib.import_module(target_module_name)
 126        target_function = getattr(target_module, target_function_name)
 127
 128        return Daemon(
 129            daemon_id=daemon_id,
 130            target=target_function,
 131            target_args=target_args,
 132            target_kw=target_kw,
 133            properties=properties,
 134            label=label,
 135        )
 136
 137
 138    def __init__(
 139        self,
 140        target: Optional[Callable[[Any], Any]] = None,
 141        target_args: Union[List[Any], Tuple[Any], None] = None,
 142        target_kw: Optional[Dict[str, Any]] = None,
 143        env: Optional[Dict[str, str]] = None,
 144        daemon_id: Optional[str] = None,
 145        label: Optional[str] = None,
 146        properties: Optional[Dict[str, Any]] = None,
 147    ):
 148        """
 149        Parameters
 150        ----------
 151        target: Optional[Callable[[Any], Any]], default None,
 152            The function to execute in a child process.
 153
 154        target_args: Union[List[Any], Tuple[Any], None], default None
 155            Positional arguments to pass to the target function.
 156
 157        target_kw: Optional[Dict[str, Any]], default None
 158            Keyword arguments to pass to the target function.
 159
 160        env: Optional[Dict[str, str]], default None
 161            If provided, set these environment variables in the daemon process.
 162
 163        daemon_id: Optional[str], default None
 164            Build a `Daemon` from an existing `daemon_id`.
 165            If `daemon_id` is provided, other arguments are ignored and are derived
 166            from the existing pickled `Daemon`.
 167
 168        label: Optional[str], default None
 169            Label string to help identifiy a daemon.
 170            If `None`, use the function name instead.
 171
 172        properties: Optional[Dict[str, Any]], default None
 173            Override reading from the properties JSON by providing an existing dictionary.
 174        """
 175        _pickle = self.__dict__.get('_pickle', False)
 176        if daemon_id is not None:
 177            self.daemon_id = daemon_id
 178            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 179
 180                if not self.properties_path.exists():
 181                    raise Exception(
 182                        f"Daemon '{self.daemon_id}' does not exist. "
 183                        + "Pass a target to create a new Daemon."
 184                    )
 185
 186                try:
 187                    new_daemon = self.from_properties_file(daemon_id)
 188                except Exception:
 189                    new_daemon = None
 190
 191                if new_daemon is not None:
 192                    new_daemon.write_pickle()
 193                    target = new_daemon.target
 194                    target_args = new_daemon.target_args
 195                    target_kw = new_daemon.target_kw
 196                    label = new_daemon.label
 197                    self._properties = new_daemon.properties
 198                else:
 199                    try:
 200                        self.properties_path.unlink()
 201                    except Exception:
 202                        pass
 203
 204                    raise Exception(
 205                        f"Could not recover daemon '{self.daemon_id}' "
 206                        + "from its properties file."
 207                    )
 208
 209        if 'target' not in self.__dict__:
 210            if target is None:
 211                error("Cannot create a Daemon without a target.")
 212            self.target = target
 213
 214        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 215        if '_target_args' not in self.__dict__:
 216            self._target_args = target_args
 217        if '_target_kw' not in self.__dict__:
 218            self._target_kw = target_kw
 219
 220        if 'label' not in self.__dict__:
 221            if label is None:
 222                label = (
 223                    self.target.__name__ if '__name__' in self.target.__dir__()
 224                        else str(self.target)
 225                )
 226            self.label = label
 227        if 'daemon_id' not in self.__dict__:
 228            self.daemon_id = get_new_daemon_name()
 229        if '_properties' not in self.__dict__:
 230            self._properties = properties
 231        if self._properties is None:
 232            self._properties = {}
 233
 234        self._properties.update({'label': self.label})
 235        if env:
 236            self._properties.update({'env': env})
 237
 238        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 239        _ = self.process
 240
 241
 242    def _run_exit(
 243        self,
 244        keep_daemon_output: bool = True,
 245        allow_dirty_run: bool = False,
 246    ) -> Any:
 247        """Run the daemon's target function.
 248        NOTE: This WILL EXIT the parent process!
 249
 250        Parameters
 251        ----------
 252        keep_daemon_output: bool, default True
 253            If `False`, delete the daemon's output directory upon exiting.
 254
 255        allow_dirty_run, bool, default False:
 256            If `True`, run the daemon, even if the `daemon_id` directory exists.
 257            This option is dangerous because if the same `daemon_id` runs twice,
 258            the last to finish will overwrite the output of the first.
 259
 260        Returns
 261        -------
 262        Nothing — this will exit the parent process.
 263        """
 264        import platform
 265        import sys
 266        import os
 267        import traceback
 268        from meerschaum.utils.warnings import warn
 269        from meerschaum.config import get_config
 270        daemon = attempt_import('daemon')
 271        lines = get_config('jobs', 'terminal', 'lines')
 272        columns = get_config('jobs', 'terminal', 'columns')
 273
 274        if platform.system() == 'Windows':
 275            return False, "Windows is no longer supported."
 276
 277        self._setup(allow_dirty_run)
 278
 279        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 280        ###       KeyboardInterrupts themselves.
 281        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 282        self._daemon_context = daemon.DaemonContext(
 283            pidfile=self.pid_lock,
 284            stdout=self.rotating_log,
 285            stderr=self.rotating_log,
 286            working_directory=os.getcwd(),
 287            detach_process=True,
 288            files_preserve=list(self.rotating_log.subfile_objects.values()),
 289            signal_map={
 290                signal.SIGTERM: self._handle_sigterm,
 291            },
 292        )
 293
 294        _daemons.append(self)
 295
 296        log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 297        self._log_refresh_timer = RepeatTimer(
 298            log_refresh_seconds,
 299            partial(self.rotating_log.refresh_files, start_interception=True),
 300        )
 301
 302        try:
 303            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 304            with self._daemon_context:
 305                sys.stdin = self.stdin_file
 306                _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None)
 307                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 308                os.environ['PYTHONUNBUFFERED'] = '1'
 309
 310                ### Allow the user to override environment variables.
 311                env = self.properties.get('env', {})
 312                if env and isinstance(env, dict):
 313                    os.environ.update({str(k): str(v) for k, v in env.items()})
 314
 315                self.rotating_log.refresh_files(start_interception=True)
 316                result = None
 317                try:
 318                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 319                        f.write(str(os.getpid()))
 320
 321                    ### NOTE: The timer fails to start for remote actions to localhost.
 322                    try:
 323                        if not self._log_refresh_timer.is_running():
 324                            self._log_refresh_timer.start()
 325                    except Exception:
 326                        pass
 327
 328                    self.properties['result'] = None
 329                    self._capture_process_timestamp('began')
 330                    result = self.target(*self.target_args, **self.target_kw)
 331                    self.properties['result'] = result
 332                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 333                    result = False, traceback.format_exc()
 334                except Exception as e:
 335                    warn(
 336                        f"Exception in daemon target function: {traceback.format_exc()}",
 337                    )
 338                    result = False, str(e)
 339                finally:
 340                    _results[self.daemon_id] = result
 341
 342                    if keep_daemon_output:
 343                        self._capture_process_timestamp('ended')
 344                    else:
 345                        self.cleanup()
 346
 347                    self._log_refresh_timer.cancel()
 348                    try:
 349                        if self.pid is None and self.pid_path.exists():
 350                            self.pid_path.unlink()
 351                    except Exception:
 352                        pass
 353
 354                    if is_success_tuple(result):
 355                        try:
 356                            mrsm.pprint(result)
 357                        except BrokenPipeError:
 358                            pass
 359
 360        except Exception:
 361            daemon_error = traceback.format_exc()
 362            with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 363                f.write(daemon_error)
 364            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 365
 366    def _capture_process_timestamp(
 367        self,
 368        process_key: str,
 369        write_properties: bool = True,
 370    ) -> None:
 371        """
 372        Record the current timestamp to the parameters `process:<process_key>`.
 373
 374        Parameters
 375        ----------
 376        process_key: str
 377            Under which key to store the timestamp.
 378
 379        write_properties: bool, default True
 380            If `True` persist the properties to disk immediately after capturing the timestamp.
 381        """
 382        if 'process' not in self.properties:
 383            self.properties['process'] = {}
 384
 385        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 386            raise ValueError(f"Invalid key '{process_key}'.")
 387
 388        self.properties['process'][process_key] = (
 389            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 390        )
 391        if write_properties:
 392            self.write_properties()
 393
 394    def run(
 395        self,
 396        keep_daemon_output: bool = True,
 397        allow_dirty_run: bool = False,
 398        debug: bool = False,
 399    ) -> SuccessTuple:
 400        """Run the daemon as a child process and continue executing the parent.
 401
 402        Parameters
 403        ----------
 404        keep_daemon_output: bool, default True
 405            If `False`, delete the daemon's output directory upon exiting.
 406
 407        allow_dirty_run: bool, default False
 408            If `True`, run the daemon, even if the `daemon_id` directory exists.
 409            This option is dangerous because if the same `daemon_id` runs concurrently,
 410            the last to finish will overwrite the output of the first.
 411
 412        Returns
 413        -------
 414        A SuccessTuple indicating success.
 415
 416        """
 417        import platform
 418        if platform.system() == 'Windows':
 419            return False, "Cannot run background jobs on Windows."
 420
 421        ### The daemon might exist and be paused.
 422        if self.status == 'paused':
 423            return self.resume()
 424
 425        self._remove_stop_file()
 426        if self.status == 'running':
 427            return True, f"Daemon '{self}' is already running."
 428
 429        self.mkdir_if_not_exists(allow_dirty_run)
 430        _write_pickle_success_tuple = self.write_pickle()
 431        if not _write_pickle_success_tuple[0]:
 432            return _write_pickle_success_tuple
 433
 434        _launch_daemon_code = (
 435            "from meerschaum.utils.daemon import Daemon; "
 436            + f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 437            + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 438            + "allow_dirty_run=True)"
 439        )
 440        env = dict(os.environ)
 441        env[STATIC_CONFIG['environment']['noninteractive']] = 'true'
 442        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 443        msg = (
 444            "Success"
 445            if _launch_success_bool
 446            else f"Failed to start daemon '{self.daemon_id}'."
 447        )
 448        return _launch_success_bool, msg
 449
 450    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 451        """
 452        Forcibly terminate a running daemon.
 453        Sends a SIGTERM signal to the process.
 454
 455        Parameters
 456        ----------
 457        timeout: Optional[int], default 3
 458            How many seconds to wait for the process to terminate.
 459
 460        Returns
 461        -------
 462        A SuccessTuple indicating success.
 463        """
 464        if self.status != 'paused':
 465            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 466            if success:
 467                self._write_stop_file('kill')
 468                return success, msg
 469
 470        if self.status == 'stopped':
 471            self._write_stop_file('kill')
 472            return True, "Process has already stopped."
 473
 474        psutil = attempt_import('psutil')
 475        process = self.process
 476        try:
 477            process.terminate()
 478            process.kill()
 479            process.wait(timeout=timeout)
 480        except Exception as e:
 481            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
 482
 483        try:
 484            if process.status():
 485                return False, "Failed to stop daemon '{self}' ({process})."
 486        except psutil.NoSuchProcess:
 487            pass
 488
 489        if self.pid_path.exists():
 490            try:
 491                self.pid_path.unlink()
 492            except Exception:
 493                pass
 494
 495        self._write_stop_file('kill')
 496        return True, "Success"
 497
 498    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 499        """Gracefully quit a running daemon."""
 500        if self.status == 'paused':
 501            return self.kill(timeout)
 502
 503        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 504        if signal_success:
 505            self._write_stop_file('quit')
 506        return signal_success, signal_msg
 507
 508    def pause(
 509        self,
 510        timeout: Union[int, float, None] = None,
 511        check_timeout_interval: Union[float, int, None] = None,
 512    ) -> SuccessTuple:
 513        """
 514        Pause the daemon if it is running.
 515
 516        Parameters
 517        ----------
 518        timeout: Union[float, int, None], default None
 519            The maximum number of seconds to wait for a process to suspend.
 520
 521        check_timeout_interval: Union[float, int, None], default None
 522            The number of seconds to wait between checking if the process is still running.
 523
 524        Returns
 525        -------
 526        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 527        """
 528        if self.process is None:
 529            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 530
 531        if self.status == 'paused':
 532            return True, f"Daemon '{self.daemon_id}' is already paused."
 533
 534        self._write_stop_file('pause')
 535        try:
 536            self.process.suspend()
 537        except Exception as e:
 538            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 539
 540        timeout = self.get_timeout_seconds(timeout)
 541        check_timeout_interval = self.get_check_timeout_interval_seconds(
 542            check_timeout_interval
 543        )
 544
 545        psutil = attempt_import('psutil')
 546
 547        if not timeout:
 548            try:
 549                success = self.process.status() == 'stopped'
 550            except psutil.NoSuchProcess:
 551                success = True
 552            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 553            if success:
 554                self._capture_process_timestamp('paused')
 555            return success, msg
 556
 557        begin = time.perf_counter()
 558        while (time.perf_counter() - begin) < timeout:
 559            try:
 560                if self.process.status() == 'stopped':
 561                    self._capture_process_timestamp('paused')
 562                    return True, "Success"
 563            except psutil.NoSuchProcess as e:
 564                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 565            time.sleep(check_timeout_interval)
 566
 567        return False, (
 568            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 569            + ('s' if timeout != 1 else '') + '.'
 570        )
 571
 572    def resume(
 573        self,
 574        timeout: Union[int, float, None] = None,
 575        check_timeout_interval: Union[float, int, None] = None,
 576    ) -> SuccessTuple:
 577        """
 578        Resume the daemon if it is paused.
 579
 580        Parameters
 581        ----------
 582        timeout: Union[float, int, None], default None
 583            The maximum number of seconds to wait for a process to resume.
 584
 585        check_timeout_interval: Union[float, int, None], default None
 586            The number of seconds to wait between checking if the process is still stopped.
 587
 588        Returns
 589        -------
 590        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 591        """
 592        if self.status == 'running':
 593            return True, f"Daemon '{self.daemon_id}' is already running."
 594
 595        if self.status == 'stopped':
 596            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 597
 598        self._remove_stop_file()
 599        try:
 600            self.process.resume()
 601        except Exception as e:
 602            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 603
 604        timeout = self.get_timeout_seconds(timeout)
 605        check_timeout_interval = self.get_check_timeout_interval_seconds(
 606            check_timeout_interval
 607        )
 608
 609        if not timeout:
 610            success = self.status == 'running'
 611            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 612            if success:
 613                self._capture_process_timestamp('began')
 614            return success, msg
 615
 616        begin = time.perf_counter()
 617        while (time.perf_counter() - begin) < timeout:
 618            if self.status == 'running':
 619                self._capture_process_timestamp('began')
 620                return True, "Success"
 621            time.sleep(check_timeout_interval)
 622
 623        return False, (
 624            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 625            + ('s' if timeout != 1 else '') + '.'
 626        )
 627
 628    def _write_stop_file(self, action: str) -> SuccessTuple:
 629        """Write the stop file timestamp and action."""
 630        if action not in ('quit', 'kill', 'pause'):
 631            return False, f"Unsupported action '{action}'."
 632
 633        if not self.stop_path.parent.exists():
 634            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 635
 636        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 637            json.dump(
 638                {
 639                    'stop_time': datetime.now(timezone.utc).isoformat(),
 640                    'action': action,
 641                },
 642                f
 643            )
 644
 645        return True, "Success"
 646
 647    def _remove_stop_file(self) -> SuccessTuple:
 648        """Remove the stop file"""
 649        if not self.stop_path.exists():
 650            return True, "Stop file does not exist."
 651
 652        try:
 653            self.stop_path.unlink()
 654        except Exception as e:
 655            return False, f"Failed to remove stop file:\n{e}"
 656
 657        return True, "Success"
 658
 659    def _read_stop_file(self) -> Dict[str, Any]:
 660        """
 661        Read the stop file if it exists.
 662        """
 663        if not self.stop_path.exists():
 664            return {}
 665
 666        try:
 667            with open(self.stop_path, 'r', encoding='utf-8') as f:
 668                data = json.load(f)
 669            return data
 670        except Exception:
 671            return {}
 672
 673    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 674        """
 675        Handle `SIGTERM` within the `Daemon` context.
 676        This method is injected into the `DaemonContext`.
 677        """
 678        from meerschaum.utils.process import signal_handler
 679        signal_handler(signal_number, stack_frame)
 680
 681        timer = self.__dict__.get('_log_refresh_timer', None)
 682        if timer is not None:
 683            timer.cancel()
 684
 685        daemon_context = self.__dict__.get('_daemon_context', None)
 686        if daemon_context is not None:
 687            daemon_context.close()
 688
 689        _close_pools()
 690        raise SystemExit(0)
 691
 692    def _send_signal(
 693        self,
 694        signal_to_send,
 695        timeout: Union[float, int, None] = None,
 696        check_timeout_interval: Union[float, int, None] = None,
 697    ) -> SuccessTuple:
 698        """Send a signal to the daemon process.
 699
 700        Parameters
 701        ----------
 702        signal_to_send:
 703            The signal the send to the daemon, e.g. `signals.SIGINT`.
 704
 705        timeout: Union[float, int, None], default None
 706            The maximum number of seconds to wait for a process to terminate.
 707
 708        check_timeout_interval: Union[float, int, None], default None
 709            The number of seconds to wait between checking if the process is still running.
 710
 711        Returns
 712        -------
 713        A SuccessTuple indicating success.
 714        """
 715        try:
 716            pid = self.pid
 717            if pid is None:
 718                return (
 719                    False,
 720                    f"Daemon '{self.daemon_id}' is not running, "
 721                    + f"cannot send signal '{signal_to_send}'."
 722                )
 723            
 724            os.kill(pid, signal_to_send)
 725        except Exception:
 726            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 727
 728        timeout = self.get_timeout_seconds(timeout)
 729        check_timeout_interval = self.get_check_timeout_interval_seconds(
 730            check_timeout_interval
 731        )
 732
 733        if not timeout:
 734            return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'."
 735
 736        begin = time.perf_counter()
 737        while (time.perf_counter() - begin) < timeout:
 738            if not self.status == 'running':
 739                return True, "Success"
 740            time.sleep(check_timeout_interval)
 741
 742        return False, (
 743            f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second"
 744            + ('s' if timeout != 1 else '') + '.'
 745        )
 746
 747    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 748        """Create the Daemon's directory.
 749        If `allow_dirty_run` is `False` and the directory already exists,
 750        raise a `FileExistsError`.
 751        """
 752        try:
 753            self.path.mkdir(parents=True, exist_ok=True)
 754            _already_exists = any(os.scandir(self.path))
 755        except FileExistsError:
 756            _already_exists = True
 757
 758        if _already_exists and not allow_dirty_run:
 759            error(
 760                f"Daemon '{self.daemon_id}' already exists. " +
 761                "To allow this daemon to run, do one of the following:\n"
 762                + "  - Execute `daemon.cleanup()`.\n"
 763                + f"  - Delete the directory '{self.path}'.\n"
 764                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 765                FileExistsError,
 766            )
 767
 768    @property
 769    def process(self) -> Union['psutil.Process', None]:
 770        """
 771        Return the psutil process for the Daemon.
 772        """
 773        psutil = attempt_import('psutil')
 774        pid = self.pid
 775        if pid is None:
 776            return None
 777        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 778            try:
 779                self._process = psutil.Process(int(pid))
 780                process_exists = True
 781            except Exception:
 782                process_exists = False
 783            if not process_exists:
 784                _ = self.__dict__.pop('_process', None)
 785                try:
 786                    if self.pid_path.exists():
 787                        self.pid_path.unlink()
 788                except Exception:
 789                    pass
 790                return None
 791        return self._process
 792
 793    @property
 794    def status(self) -> str:
 795        """
 796        Return the running status of this Daemon.
 797        """
 798        if self.process is None:
 799            return 'stopped'
 800
 801        psutil = attempt_import('psutil')
 802        try:
 803            if self.process.status() == 'stopped':
 804                return 'paused'
 805            if self.process.status() == 'zombie':
 806                raise psutil.NoSuchProcess(self.process.pid)
 807        except (psutil.NoSuchProcess, AttributeError):
 808            if self.pid_path.exists():
 809                try:
 810                    self.pid_path.unlink()
 811                except Exception:
 812                    pass
 813            return 'stopped'
 814
 815        return 'running'
 816
 817    @classmethod
 818    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 819        """
 820        Return a Daemon's path from its `daemon_id`.
 821        """
 822        return DAEMON_RESOURCES_PATH / daemon_id
 823
 824    @property
 825    def path(self) -> pathlib.Path:
 826        """
 827        Return the path for this Daemon's directory.
 828        """
 829        return self._get_path_from_daemon_id(self.daemon_id)
 830
 831    @classmethod
 832    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 833        """
 834        Return the `properties.json` path for a given `daemon_id`.
 835        """
 836        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 837
 838    @property
 839    def properties_path(self) -> pathlib.Path:
 840        """
 841        Return the `propterties.json` path for this Daemon.
 842        """
 843        return self._get_properties_path_from_daemon_id(self.daemon_id)
 844
 845    @property
 846    def stop_path(self) -> pathlib.Path:
 847        """
 848        Return the path for the stop file (created when manually stopped).
 849        """
 850        return self.path / '.stop.json'
 851
 852    @property
 853    def log_path(self) -> pathlib.Path:
 854        """
 855        Return the log path.
 856        """
 857        return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 858
 859    @property
 860    def stdin_file_path(self) -> pathlib.Path:
 861        """
 862        Return the stdin file path.
 863        """
 864        return self.path / 'input.stdin'
 865
 866    @property
 867    def blocking_stdin_file_path(self) -> pathlib.Path:
 868        """
 869        Return the stdin file path.
 870        """
 871        if '_blocking_stdin_file_path' in self.__dict__:
 872            return self._blocking_stdin_file_path
 873
 874        return self.path / 'input.stdin.block'
 875
 876    @property
 877    def log_offset_path(self) -> pathlib.Path:
 878        """
 879        Return the log offset file path.
 880        """
 881        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 882
 883    @property
 884    def rotating_log(self) -> RotatingFile:
 885        """
 886        The rotating log file for the daemon's output.
 887        """
 888        if '_rotating_log' in self.__dict__:
 889            return self._rotating_log
 890
 891        write_timestamps = (
 892            self.properties.get('logs', {}).get('write_timestamps', None)
 893        )
 894        if write_timestamps is None:
 895            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 896
 897        self._rotating_log = RotatingFile(
 898            self.log_path,
 899            redirect_streams=True,
 900            write_timestamps=write_timestamps,
 901            timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'),
 902        )
 903        return self._rotating_log
 904
 905    @property
 906    def stdin_file(self):
 907        """
 908        Return the file handler for the stdin file.
 909        """
 910        if (stdin_file := self.__dict__.get('_stdin_file', None)):
 911            return stdin_file
 912
 913        self._stdin_file = StdinFile(
 914            self.stdin_file_path,
 915            lock_file_path=self.blocking_stdin_file_path,
 916        )
 917        return self._stdin_file
 918
 919    @property
 920    def log_text(self) -> Optional[str]:
 921        """
 922        Read the log files and return their contents.
 923        Returns `None` if the log file does not exist.
 924        """
 925        new_rotating_log = RotatingFile(
 926            self.rotating_log.file_path,
 927            num_files_to_keep = self.rotating_log.num_files_to_keep,
 928            max_file_size = self.rotating_log.max_file_size,
 929            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'),
 930            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'),
 931        )
 932        return new_rotating_log.read()
 933
 934    def readlines(self) -> List[str]:
 935        """
 936        Read the next log lines, persisting the cursor for later use.
 937        Note this will alter the cursor of `self.rotating_log`.
 938        """
 939        self.rotating_log._cursor = self._read_log_offset()
 940        lines = self.rotating_log.readlines()
 941        self._write_log_offset()
 942        return lines
 943
 944    def _read_log_offset(self) -> Tuple[int, int]:
 945        """
 946        Return the current log offset cursor.
 947
 948        Returns
 949        -------
 950        A tuple of the form (`subfile_index`, `position`).
 951        """
 952        if not self.log_offset_path.exists():
 953            return 0, 0
 954
 955        with open(self.log_offset_path, 'r', encoding='utf-8') as f:
 956            cursor_text = f.read()
 957        cursor_parts = cursor_text.split(' ')
 958        subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
 959        return subfile_index, subfile_position
 960
 961    def _write_log_offset(self) -> None:
 962        """
 963        Write the current log offset file.
 964        """
 965        with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
 966            subfile_index = self.rotating_log._cursor[0]
 967            subfile_position = self.rotating_log._cursor[1]
 968            f.write(f"{subfile_index} {subfile_position}")
 969
 970    @property
 971    def pid(self) -> Union[int, None]:
 972        """
 973        Read the PID file and return its contents.
 974        Returns `None` if the PID file does not exist.
 975        """
 976        if not self.pid_path.exists():
 977            return None
 978        try:
 979            with open(self.pid_path, 'r', encoding='utf-8') as f:
 980                text = f.read()
 981            if len(text) == 0:
 982                return None
 983            pid = int(text.rstrip())
 984        except Exception as e:
 985            warn(e)
 986            text = None
 987            pid = None
 988        return pid
 989
 990    @property
 991    def pid_path(self) -> pathlib.Path:
 992        """
 993        Return the path to a file containing the PID for this Daemon.
 994        """
 995        return self.path / 'process.pid'
 996
 997    @property
 998    def pid_lock(self) -> 'fasteners.InterProcessLock':
 999        """
1000        Return the process lock context manager.
1001        """
1002        if '_pid_lock' in self.__dict__:
1003            return self._pid_lock
1004
1005        fasteners = attempt_import('fasteners')
1006        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1007        return self._pid_lock
1008
1009    @property
1010    def pickle_path(self) -> pathlib.Path:
1011        """
1012        Return the path for the pickle file.
1013        """
1014        return self.path / 'pickle.pkl'
1015
1016    def read_properties(self) -> Optional[Dict[str, Any]]:
1017        """Read the properties JSON file and return the dictionary."""
1018        if not self.properties_path.exists():
1019            return None
1020        try:
1021            with open(self.properties_path, 'r', encoding='utf-8') as file:
1022                properties = json.load(file)
1023        except Exception:
1024            properties = {}
1025        
1026        return properties or {}
1027
1028    def read_pickle(self) -> Daemon:
1029        """Read a Daemon's pickle file and return the `Daemon`."""
1030        import pickle
1031        import traceback
1032        if not self.pickle_path.exists():
1033            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1034
1035        if self.pickle_path.stat().st_size == 0:
1036            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1037
1038        try:
1039            with open(self.pickle_path, 'rb') as pickle_file:
1040                daemon = pickle.load(pickle_file)
1041            success, msg = True, 'Success'
1042        except Exception as e:
1043            success, msg = False, str(e)
1044            daemon = None
1045            traceback.print_exception(type(e), e, e.__traceback__)
1046        if not success:
1047            error(msg)
1048        return daemon
1049
1050    @property
1051    def properties(self) -> Dict[str, Any]:
1052        """
1053        Return the contents of the properties JSON file.
1054        """
1055        try:
1056            _file_properties = self.read_properties() or {}
1057        except Exception:
1058            traceback.print_exc()
1059            _file_properties = {}
1060
1061        if not self._properties:
1062            self._properties = _file_properties
1063
1064        if self._properties is None:
1065            self._properties = {}
1066
1067        if (
1068            self._properties.get('result', None) is None
1069            and _file_properties.get('result', None) is not None
1070        ):
1071            _ = self._properties.pop('result', None)
1072
1073        if _file_properties is not None:
1074            self._properties = apply_patch_to_config(
1075                _file_properties,
1076                self._properties,
1077            )
1078
1079        return self._properties
1080
1081    @property
1082    def hidden(self) -> bool:
1083        """
1084        Return a bool indicating whether this Daemon should be displayed.
1085        """
1086        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1087
1088    def write_properties(self) -> SuccessTuple:
1089        """Write the properties dictionary to the properties JSON file
1090        (only if self.properties exists).
1091        """
1092        success, msg = (
1093            False,
1094            f"No properties to write for daemon '{self.daemon_id}'."
1095        )
1096        if self.properties is not None:
1097            try:
1098                self.path.mkdir(parents=True, exist_ok=True)
1099                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1100                    json.dump(self.properties, properties_file)
1101                success, msg = True, 'Success'
1102            except Exception as e:
1103                success, msg = False, str(e)
1104        return success, msg
1105
1106    def write_pickle(self) -> SuccessTuple:
1107        """Write the pickle file for the daemon."""
1108        import pickle
1109        import traceback
1110        try:
1111            self.path.mkdir(parents=True, exist_ok=True)
1112            with open(self.pickle_path, 'wb+') as pickle_file:
1113                pickle.dump(self, pickle_file)
1114            success, msg = True, "Success"
1115        except Exception as e:
1116            success, msg = False, str(e)
1117            traceback.print_exception(type(e), e, e.__traceback__)
1118        return success, msg
1119
1120
1121    def _setup(
1122        self,
1123        allow_dirty_run: bool = False,
1124    ) -> None:
1125        """
1126        Update properties before starting the Daemon.
1127        """
1128        if self.properties is None:
1129            self._properties = {}
1130
1131        self._properties.update({
1132            'target': {
1133                'name': self.target.__name__,
1134                'module': self.target.__module__,
1135                'args': self.target_args,
1136                'kw': self.target_kw,
1137            },
1138        })
1139        self.mkdir_if_not_exists(allow_dirty_run)
1140        _write_properties_success_tuple = self.write_properties()
1141        if not _write_properties_success_tuple[0]:
1142            error(_write_properties_success_tuple[1])
1143
1144        _write_pickle_success_tuple = self.write_pickle()
1145        if not _write_pickle_success_tuple[0]:
1146            error(_write_pickle_success_tuple[1])
1147
1148    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1149        """
1150        Remove a daemon's directory after execution.
1151
1152        Parameters
1153        ----------
1154        keep_logs: bool, default False
1155            If `True`, skip deleting the daemon's log files.
1156
1157        Returns
1158        -------
1159        A `SuccessTuple` indicating success.
1160        """
1161        if self.path.exists():
1162            try:
1163                shutil.rmtree(self.path)
1164            except Exception as e:
1165                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1166                warn(msg)
1167                return False, msg
1168        if not keep_logs:
1169            self.rotating_log.delete()
1170            try:
1171                if self.log_offset_path.exists():
1172                    self.log_offset_path.unlink()
1173            except Exception as e:
1174                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1175                warn(msg)
1176                return False, msg
1177        return True, "Success"
1178
1179
1180    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1181        """
1182        Return the timeout value to use. Use `--timeout-seconds` if provided,
1183        else the configured default (8).
1184        """
1185        if isinstance(timeout, (int, float)):
1186            return timeout
1187        return get_config('jobs', 'timeout_seconds')
1188
1189
1190    def get_check_timeout_interval_seconds(
1191        self,
1192        check_timeout_interval: Union[int, float, None] = None,
1193    ) -> Union[int, float]:
1194        """
1195        Return the interval value to check the status of timeouts.
1196        """
1197        if isinstance(check_timeout_interval, (int, float)):
1198            return check_timeout_interval
1199        return get_config('jobs', 'check_timeout_interval_seconds')
1200
1201    @property
1202    def target_args(self) -> Union[Tuple[Any], None]:
1203        """
1204        Return the positional arguments to pass to the target function.
1205        """
1206        target_args = (
1207            self.__dict__.get('_target_args', None)
1208            or self.properties.get('target', {}).get('args', None)
1209        )
1210        if target_args is None:
1211            return tuple([])
1212
1213        return tuple(target_args)
1214
1215    @property
1216    def target_kw(self) -> Union[Dict[str, Any], None]:
1217        """
1218        Return the keyword arguments to pass to the target function.
1219        """
1220        target_kw = (
1221            self.__dict__.get('_target_kw', None)
1222            or self.properties.get('target', {}).get('kw', None)
1223        )
1224        if target_kw is None:
1225            return {}
1226
1227        return {key: val for key, val in target_kw.items()}
1228
1229    def __getstate__(self):
1230        """
1231        Pickle this Daemon.
1232        """
1233        dill = attempt_import('dill')
1234        return {
1235            'target': dill.dumps(self.target),
1236            'target_args': self.target_args,
1237            'target_kw': self.target_kw,
1238            'daemon_id': self.daemon_id,
1239            'label': self.label,
1240            'properties': self.properties,
1241        }
1242
1243    def __setstate__(self, _state: Dict[str, Any]):
1244        """
1245        Restore this Daemon from a pickled state.
1246        If the properties file exists, skip the old pickled version.
1247        """
1248        dill = attempt_import('dill')
1249        _state['target'] = dill.loads(_state['target'])
1250        self._pickle = True
1251        daemon_id = _state.get('daemon_id', None)
1252        if not daemon_id:
1253            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1254
1255        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1256        ignore_properties = properties_path.exists()
1257        if ignore_properties:
1258            _state = {
1259                key: val
1260                for key, val in _state.items()
1261                if key != 'properties'
1262            }
1263        self.__init__(**_state)
1264
1265
1266    def __repr__(self):
1267        return str(self)
1268
1269    def __str__(self):
1270        return self.daemon_id
1271
1272    def __eq__(self, other):
1273        if not isinstance(other, Daemon):
1274            return False
1275        return self.daemon_id == other.daemon_id
1276
1277    def __hash__(self):
1278        return hash(self.daemon_id)
class Daemon:
  45class Daemon:
  46    """
  47    Daemonize Python functions into background processes.
  48
  49    Examples
  50    --------
  51    >>> import meerschaum as mrsm
  52    >>> from meerschaum.utils.daemons import Daemon
  53    >>> daemon = Daemon(print, ('hi',))
  54    >>> success, msg = daemon.run()
  55    >>> print(daemon.log_text)
  56
  57    2024-07-29 18:03 | hi
  58    2024-07-29 18:03 |
  59    >>> daemon.run(allow_dirty_run=True)
  60    >>> print(daemon.log_text)
  61
  62    2024-07-29 18:03 | hi
  63    2024-07-29 18:03 |
  64    2024-07-29 18:05 | hi
  65    2024-07-29 18:05 |
  66    >>> mrsm.pprint(daemon.properties)
  67    {
  68        'label': 'print',
  69        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  70        'result': None,
  71        'process': {'ended': '2024-07-29T18:03:33.752806'}
  72    }
  73
  74    """
  75
  76    def __new__(
  77        cls,
  78        *args,
  79        daemon_id: Optional[str] = None,
  80        **kw
  81    ):
  82        """
  83        If a daemon_id is provided and already exists, read from its pickle file.
  84        """
  85        instance = super(Daemon, cls).__new__(cls)
  86        if daemon_id is not None:
  87            instance.daemon_id = daemon_id
  88            if instance.pickle_path.exists():
  89                instance = instance.read_pickle()
  90        return instance
  91
  92    @classmethod
  93    def from_properties_file(cls, daemon_id: str) -> Daemon:
  94        """
  95        Return a Daemon from a properties dictionary.
  96        """
  97        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  98        if not properties_path.exists():
  99            raise OSError(f"Properties file '{properties_path}' does not exist.")
 100
 101        try:
 102            with open(properties_path, 'r', encoding='utf-8') as f:
 103                properties = json.load(f)
 104        except Exception:
 105            properties = {}
 106
 107        if not properties:
 108            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 109
 110        daemon_id = properties_path.parent.name
 111        target_cf = properties.get('target', {})
 112        target_module_name = target_cf.get('module', None)
 113        target_function_name = target_cf.get('name', None)
 114        target_args = target_cf.get('args', None)
 115        target_kw = target_cf.get('kw', None)
 116        label = properties.get('label', None)
 117
 118        if None in [
 119            target_module_name,
 120            target_function_name,
 121            target_args,
 122            target_kw,
 123        ]:
 124            raise ValueError("Missing target function information.")
 125
 126        target_module = importlib.import_module(target_module_name)
 127        target_function = getattr(target_module, target_function_name)
 128
 129        return Daemon(
 130            daemon_id=daemon_id,
 131            target=target_function,
 132            target_args=target_args,
 133            target_kw=target_kw,
 134            properties=properties,
 135            label=label,
 136        )
 137
 138
 139    def __init__(
 140        self,
 141        target: Optional[Callable[[Any], Any]] = None,
 142        target_args: Union[List[Any], Tuple[Any], None] = None,
 143        target_kw: Optional[Dict[str, Any]] = None,
 144        env: Optional[Dict[str, str]] = None,
 145        daemon_id: Optional[str] = None,
 146        label: Optional[str] = None,
 147        properties: Optional[Dict[str, Any]] = None,
 148    ):
 149        """
 150        Parameters
 151        ----------
 152        target: Optional[Callable[[Any], Any]], default None,
 153            The function to execute in a child process.
 154
 155        target_args: Union[List[Any], Tuple[Any], None], default None
 156            Positional arguments to pass to the target function.
 157
 158        target_kw: Optional[Dict[str, Any]], default None
 159            Keyword arguments to pass to the target function.
 160
 161        env: Optional[Dict[str, str]], default None
 162            If provided, set these environment variables in the daemon process.
 163
 164        daemon_id: Optional[str], default None
 165            Build a `Daemon` from an existing `daemon_id`.
 166            If `daemon_id` is provided, other arguments are ignored and are derived
 167            from the existing pickled `Daemon`.
 168
 169        label: Optional[str], default None
 170            Label string to help identifiy a daemon.
 171            If `None`, use the function name instead.
 172
 173        properties: Optional[Dict[str, Any]], default None
 174            Override reading from the properties JSON by providing an existing dictionary.
 175        """
 176        _pickle = self.__dict__.get('_pickle', False)
 177        if daemon_id is not None:
 178            self.daemon_id = daemon_id
 179            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 180
 181                if not self.properties_path.exists():
 182                    raise Exception(
 183                        f"Daemon '{self.daemon_id}' does not exist. "
 184                        + "Pass a target to create a new Daemon."
 185                    )
 186
 187                try:
 188                    new_daemon = self.from_properties_file(daemon_id)
 189                except Exception:
 190                    new_daemon = None
 191
 192                if new_daemon is not None:
 193                    new_daemon.write_pickle()
 194                    target = new_daemon.target
 195                    target_args = new_daemon.target_args
 196                    target_kw = new_daemon.target_kw
 197                    label = new_daemon.label
 198                    self._properties = new_daemon.properties
 199                else:
 200                    try:
 201                        self.properties_path.unlink()
 202                    except Exception:
 203                        pass
 204
 205                    raise Exception(
 206                        f"Could not recover daemon '{self.daemon_id}' "
 207                        + "from its properties file."
 208                    )
 209
 210        if 'target' not in self.__dict__:
 211            if target is None:
 212                error("Cannot create a Daemon without a target.")
 213            self.target = target
 214
 215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 216        if '_target_args' not in self.__dict__:
 217            self._target_args = target_args
 218        if '_target_kw' not in self.__dict__:
 219            self._target_kw = target_kw
 220
 221        if 'label' not in self.__dict__:
 222            if label is None:
 223                label = (
 224                    self.target.__name__ if '__name__' in self.target.__dir__()
 225                        else str(self.target)
 226                )
 227            self.label = label
 228        if 'daemon_id' not in self.__dict__:
 229            self.daemon_id = get_new_daemon_name()
 230        if '_properties' not in self.__dict__:
 231            self._properties = properties
 232        if self._properties is None:
 233            self._properties = {}
 234
 235        self._properties.update({'label': self.label})
 236        if env:
 237            self._properties.update({'env': env})
 238
 239        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 240        _ = self.process
 241
 242
 243    def _run_exit(
 244        self,
 245        keep_daemon_output: bool = True,
 246        allow_dirty_run: bool = False,
 247    ) -> Any:
 248        """Run the daemon's target function.
 249        NOTE: This WILL EXIT the parent process!
 250
 251        Parameters
 252        ----------
 253        keep_daemon_output: bool, default True
 254            If `False`, delete the daemon's output directory upon exiting.
 255
 256        allow_dirty_run, bool, default False:
 257            If `True`, run the daemon, even if the `daemon_id` directory exists.
 258            This option is dangerous because if the same `daemon_id` runs twice,
 259            the last to finish will overwrite the output of the first.
 260
 261        Returns
 262        -------
 263        Nothing — this will exit the parent process.
 264        """
 265        import platform
 266        import sys
 267        import os
 268        import traceback
 269        from meerschaum.utils.warnings import warn
 270        from meerschaum.config import get_config
 271        daemon = attempt_import('daemon')
 272        lines = get_config('jobs', 'terminal', 'lines')
 273        columns = get_config('jobs', 'terminal', 'columns')
 274
 275        if platform.system() == 'Windows':
 276            return False, "Windows is no longer supported."
 277
 278        self._setup(allow_dirty_run)
 279
 280        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 281        ###       KeyboardInterrupts themselves.
 282        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 283        self._daemon_context = daemon.DaemonContext(
 284            pidfile=self.pid_lock,
 285            stdout=self.rotating_log,
 286            stderr=self.rotating_log,
 287            working_directory=os.getcwd(),
 288            detach_process=True,
 289            files_preserve=list(self.rotating_log.subfile_objects.values()),
 290            signal_map={
 291                signal.SIGTERM: self._handle_sigterm,
 292            },
 293        )
 294
 295        _daemons.append(self)
 296
 297        log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 298        self._log_refresh_timer = RepeatTimer(
 299            log_refresh_seconds,
 300            partial(self.rotating_log.refresh_files, start_interception=True),
 301        )
 302
 303        try:
 304            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 305            with self._daemon_context:
 306                sys.stdin = self.stdin_file
 307                _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None)
 308                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 309                os.environ['PYTHONUNBUFFERED'] = '1'
 310
 311                ### Allow the user to override environment variables.
 312                env = self.properties.get('env', {})
 313                if env and isinstance(env, dict):
 314                    os.environ.update({str(k): str(v) for k, v in env.items()})
 315
 316                self.rotating_log.refresh_files(start_interception=True)
 317                result = None
 318                try:
 319                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 320                        f.write(str(os.getpid()))
 321
 322                    ### NOTE: The timer fails to start for remote actions to localhost.
 323                    try:
 324                        if not self._log_refresh_timer.is_running():
 325                            self._log_refresh_timer.start()
 326                    except Exception:
 327                        pass
 328
 329                    self.properties['result'] = None
 330                    self._capture_process_timestamp('began')
 331                    result = self.target(*self.target_args, **self.target_kw)
 332                    self.properties['result'] = result
 333                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 334                    result = False, traceback.format_exc()
 335                except Exception as e:
 336                    warn(
 337                        f"Exception in daemon target function: {traceback.format_exc()}",
 338                    )
 339                    result = False, str(e)
 340                finally:
 341                    _results[self.daemon_id] = result
 342
 343                    if keep_daemon_output:
 344                        self._capture_process_timestamp('ended')
 345                    else:
 346                        self.cleanup()
 347
 348                    self._log_refresh_timer.cancel()
 349                    try:
 350                        if self.pid is None and self.pid_path.exists():
 351                            self.pid_path.unlink()
 352                    except Exception:
 353                        pass
 354
 355                    if is_success_tuple(result):
 356                        try:
 357                            mrsm.pprint(result)
 358                        except BrokenPipeError:
 359                            pass
 360
 361        except Exception:
 362            daemon_error = traceback.format_exc()
 363            with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 364                f.write(daemon_error)
 365            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 366
 367    def _capture_process_timestamp(
 368        self,
 369        process_key: str,
 370        write_properties: bool = True,
 371    ) -> None:
 372        """
 373        Record the current timestamp to the parameters `process:<process_key>`.
 374
 375        Parameters
 376        ----------
 377        process_key: str
 378            Under which key to store the timestamp.
 379
 380        write_properties: bool, default True
 381            If `True` persist the properties to disk immediately after capturing the timestamp.
 382        """
 383        if 'process' not in self.properties:
 384            self.properties['process'] = {}
 385
 386        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 387            raise ValueError(f"Invalid key '{process_key}'.")
 388
 389        self.properties['process'][process_key] = (
 390            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 391        )
 392        if write_properties:
 393            self.write_properties()
 394
 395    def run(
 396        self,
 397        keep_daemon_output: bool = True,
 398        allow_dirty_run: bool = False,
 399        debug: bool = False,
 400    ) -> SuccessTuple:
 401        """Run the daemon as a child process and continue executing the parent.
 402
 403        Parameters
 404        ----------
 405        keep_daemon_output: bool, default True
 406            If `False`, delete the daemon's output directory upon exiting.
 407
 408        allow_dirty_run: bool, default False
 409            If `True`, run the daemon, even if the `daemon_id` directory exists.
 410            This option is dangerous because if the same `daemon_id` runs concurrently,
 411            the last to finish will overwrite the output of the first.
 412
 413        Returns
 414        -------
 415        A SuccessTuple indicating success.
 416
 417        """
 418        import platform
 419        if platform.system() == 'Windows':
 420            return False, "Cannot run background jobs on Windows."
 421
 422        ### The daemon might exist and be paused.
 423        if self.status == 'paused':
 424            return self.resume()
 425
 426        self._remove_stop_file()
 427        if self.status == 'running':
 428            return True, f"Daemon '{self}' is already running."
 429
 430        self.mkdir_if_not_exists(allow_dirty_run)
 431        _write_pickle_success_tuple = self.write_pickle()
 432        if not _write_pickle_success_tuple[0]:
 433            return _write_pickle_success_tuple
 434
 435        _launch_daemon_code = (
 436            "from meerschaum.utils.daemon import Daemon; "
 437            + f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 438            + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 439            + "allow_dirty_run=True)"
 440        )
 441        env = dict(os.environ)
 442        env[STATIC_CONFIG['environment']['noninteractive']] = 'true'
 443        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 444        msg = (
 445            "Success"
 446            if _launch_success_bool
 447            else f"Failed to start daemon '{self.daemon_id}'."
 448        )
 449        return _launch_success_bool, msg
 450
 451    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 452        """
 453        Forcibly terminate a running daemon.
 454        Sends a SIGTERM signal to the process.
 455
 456        Parameters
 457        ----------
 458        timeout: Optional[int], default 3
 459            How many seconds to wait for the process to terminate.
 460
 461        Returns
 462        -------
 463        A SuccessTuple indicating success.
 464        """
 465        if self.status != 'paused':
 466            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 467            if success:
 468                self._write_stop_file('kill')
 469                return success, msg
 470
 471        if self.status == 'stopped':
 472            self._write_stop_file('kill')
 473            return True, "Process has already stopped."
 474
 475        psutil = attempt_import('psutil')
 476        process = self.process
 477        try:
 478            process.terminate()
 479            process.kill()
 480            process.wait(timeout=timeout)
 481        except Exception as e:
 482            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
 483
 484        try:
 485            if process.status():
 486                return False, "Failed to stop daemon '{self}' ({process})."
 487        except psutil.NoSuchProcess:
 488            pass
 489
 490        if self.pid_path.exists():
 491            try:
 492                self.pid_path.unlink()
 493            except Exception:
 494                pass
 495
 496        self._write_stop_file('kill')
 497        return True, "Success"
 498
 499    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 500        """Gracefully quit a running daemon."""
 501        if self.status == 'paused':
 502            return self.kill(timeout)
 503
 504        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 505        if signal_success:
 506            self._write_stop_file('quit')
 507        return signal_success, signal_msg
 508
 509    def pause(
 510        self,
 511        timeout: Union[int, float, None] = None,
 512        check_timeout_interval: Union[float, int, None] = None,
 513    ) -> SuccessTuple:
 514        """
 515        Pause the daemon if it is running.
 516
 517        Parameters
 518        ----------
 519        timeout: Union[float, int, None], default None
 520            The maximum number of seconds to wait for a process to suspend.
 521
 522        check_timeout_interval: Union[float, int, None], default None
 523            The number of seconds to wait between checking if the process is still running.
 524
 525        Returns
 526        -------
 527        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 528        """
 529        if self.process is None:
 530            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 531
 532        if self.status == 'paused':
 533            return True, f"Daemon '{self.daemon_id}' is already paused."
 534
 535        self._write_stop_file('pause')
 536        try:
 537            self.process.suspend()
 538        except Exception as e:
 539            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 540
 541        timeout = self.get_timeout_seconds(timeout)
 542        check_timeout_interval = self.get_check_timeout_interval_seconds(
 543            check_timeout_interval
 544        )
 545
 546        psutil = attempt_import('psutil')
 547
 548        if not timeout:
 549            try:
 550                success = self.process.status() == 'stopped'
 551            except psutil.NoSuchProcess:
 552                success = True
 553            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 554            if success:
 555                self._capture_process_timestamp('paused')
 556            return success, msg
 557
 558        begin = time.perf_counter()
 559        while (time.perf_counter() - begin) < timeout:
 560            try:
 561                if self.process.status() == 'stopped':
 562                    self._capture_process_timestamp('paused')
 563                    return True, "Success"
 564            except psutil.NoSuchProcess as e:
 565                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 566            time.sleep(check_timeout_interval)
 567
 568        return False, (
 569            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 570            + ('s' if timeout != 1 else '') + '.'
 571        )
 572
 573    def resume(
 574        self,
 575        timeout: Union[int, float, None] = None,
 576        check_timeout_interval: Union[float, int, None] = None,
 577    ) -> SuccessTuple:
 578        """
 579        Resume the daemon if it is paused.
 580
 581        Parameters
 582        ----------
 583        timeout: Union[float, int, None], default None
 584            The maximum number of seconds to wait for a process to resume.
 585
 586        check_timeout_interval: Union[float, int, None], default None
 587            The number of seconds to wait between checking if the process is still stopped.
 588
 589        Returns
 590        -------
 591        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 592        """
 593        if self.status == 'running':
 594            return True, f"Daemon '{self.daemon_id}' is already running."
 595
 596        if self.status == 'stopped':
 597            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 598
 599        self._remove_stop_file()
 600        try:
 601            self.process.resume()
 602        except Exception as e:
 603            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 604
 605        timeout = self.get_timeout_seconds(timeout)
 606        check_timeout_interval = self.get_check_timeout_interval_seconds(
 607            check_timeout_interval
 608        )
 609
 610        if not timeout:
 611            success = self.status == 'running'
 612            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 613            if success:
 614                self._capture_process_timestamp('began')
 615            return success, msg
 616
 617        begin = time.perf_counter()
 618        while (time.perf_counter() - begin) < timeout:
 619            if self.status == 'running':
 620                self._capture_process_timestamp('began')
 621                return True, "Success"
 622            time.sleep(check_timeout_interval)
 623
 624        return False, (
 625            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 626            + ('s' if timeout != 1 else '') + '.'
 627        )
 628
 629    def _write_stop_file(self, action: str) -> SuccessTuple:
 630        """Write the stop file timestamp and action."""
 631        if action not in ('quit', 'kill', 'pause'):
 632            return False, f"Unsupported action '{action}'."
 633
 634        if not self.stop_path.parent.exists():
 635            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 636
 637        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 638            json.dump(
 639                {
 640                    'stop_time': datetime.now(timezone.utc).isoformat(),
 641                    'action': action,
 642                },
 643                f
 644            )
 645
 646        return True, "Success"
 647
 648    def _remove_stop_file(self) -> SuccessTuple:
 649        """Remove the stop file"""
 650        if not self.stop_path.exists():
 651            return True, "Stop file does not exist."
 652
 653        try:
 654            self.stop_path.unlink()
 655        except Exception as e:
 656            return False, f"Failed to remove stop file:\n{e}"
 657
 658        return True, "Success"
 659
 660    def _read_stop_file(self) -> Dict[str, Any]:
 661        """
 662        Read the stop file if it exists.
 663        """
 664        if not self.stop_path.exists():
 665            return {}
 666
 667        try:
 668            with open(self.stop_path, 'r', encoding='utf-8') as f:
 669                data = json.load(f)
 670            return data
 671        except Exception:
 672            return {}
 673
 674    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 675        """
 676        Handle `SIGTERM` within the `Daemon` context.
 677        This method is injected into the `DaemonContext`.
 678        """
 679        from meerschaum.utils.process import signal_handler
 680        signal_handler(signal_number, stack_frame)
 681
 682        timer = self.__dict__.get('_log_refresh_timer', None)
 683        if timer is not None:
 684            timer.cancel()
 685
 686        daemon_context = self.__dict__.get('_daemon_context', None)
 687        if daemon_context is not None:
 688            daemon_context.close()
 689
 690        _close_pools()
 691        raise SystemExit(0)
 692
 693    def _send_signal(
 694        self,
 695        signal_to_send,
 696        timeout: Union[float, int, None] = None,
 697        check_timeout_interval: Union[float, int, None] = None,
 698    ) -> SuccessTuple:
 699        """Send a signal to the daemon process.
 700
 701        Parameters
 702        ----------
 703        signal_to_send:
 704            The signal the send to the daemon, e.g. `signals.SIGINT`.
 705
 706        timeout: Union[float, int, None], default None
 707            The maximum number of seconds to wait for a process to terminate.
 708
 709        check_timeout_interval: Union[float, int, None], default None
 710            The number of seconds to wait between checking if the process is still running.
 711
 712        Returns
 713        -------
 714        A SuccessTuple indicating success.
 715        """
 716        try:
 717            pid = self.pid
 718            if pid is None:
 719                return (
 720                    False,
 721                    f"Daemon '{self.daemon_id}' is not running, "
 722                    + f"cannot send signal '{signal_to_send}'."
 723                )
 724            
 725            os.kill(pid, signal_to_send)
 726        except Exception:
 727            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 728
 729        timeout = self.get_timeout_seconds(timeout)
 730        check_timeout_interval = self.get_check_timeout_interval_seconds(
 731            check_timeout_interval
 732        )
 733
 734        if not timeout:
 735            return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'."
 736
 737        begin = time.perf_counter()
 738        while (time.perf_counter() - begin) < timeout:
 739            if not self.status == 'running':
 740                return True, "Success"
 741            time.sleep(check_timeout_interval)
 742
 743        return False, (
 744            f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second"
 745            + ('s' if timeout != 1 else '') + '.'
 746        )
 747
 748    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 749        """Create the Daemon's directory.
 750        If `allow_dirty_run` is `False` and the directory already exists,
 751        raise a `FileExistsError`.
 752        """
 753        try:
 754            self.path.mkdir(parents=True, exist_ok=True)
 755            _already_exists = any(os.scandir(self.path))
 756        except FileExistsError:
 757            _already_exists = True
 758
 759        if _already_exists and not allow_dirty_run:
 760            error(
 761                f"Daemon '{self.daemon_id}' already exists. " +
 762                "To allow this daemon to run, do one of the following:\n"
 763                + "  - Execute `daemon.cleanup()`.\n"
 764                + f"  - Delete the directory '{self.path}'.\n"
 765                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 766                FileExistsError,
 767            )
 768
 769    @property
 770    def process(self) -> Union['psutil.Process', None]:
 771        """
 772        Return the psutil process for the Daemon.
 773        """
 774        psutil = attempt_import('psutil')
 775        pid = self.pid
 776        if pid is None:
 777            return None
 778        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 779            try:
 780                self._process = psutil.Process(int(pid))
 781                process_exists = True
 782            except Exception:
 783                process_exists = False
 784            if not process_exists:
 785                _ = self.__dict__.pop('_process', None)
 786                try:
 787                    if self.pid_path.exists():
 788                        self.pid_path.unlink()
 789                except Exception:
 790                    pass
 791                return None
 792        return self._process
 793
 794    @property
 795    def status(self) -> str:
 796        """
 797        Return the running status of this Daemon.
 798        """
 799        if self.process is None:
 800            return 'stopped'
 801
 802        psutil = attempt_import('psutil')
 803        try:
 804            if self.process.status() == 'stopped':
 805                return 'paused'
 806            if self.process.status() == 'zombie':
 807                raise psutil.NoSuchProcess(self.process.pid)
 808        except (psutil.NoSuchProcess, AttributeError):
 809            if self.pid_path.exists():
 810                try:
 811                    self.pid_path.unlink()
 812                except Exception:
 813                    pass
 814            return 'stopped'
 815
 816        return 'running'
 817
 818    @classmethod
 819    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 820        """
 821        Return a Daemon's path from its `daemon_id`.
 822        """
 823        return DAEMON_RESOURCES_PATH / daemon_id
 824
 825    @property
 826    def path(self) -> pathlib.Path:
 827        """
 828        Return the path for this Daemon's directory.
 829        """
 830        return self._get_path_from_daemon_id(self.daemon_id)
 831
 832    @classmethod
 833    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 834        """
 835        Return the `properties.json` path for a given `daemon_id`.
 836        """
 837        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 838
 839    @property
 840    def properties_path(self) -> pathlib.Path:
 841        """
 842        Return the `propterties.json` path for this Daemon.
 843        """
 844        return self._get_properties_path_from_daemon_id(self.daemon_id)
 845
 846    @property
 847    def stop_path(self) -> pathlib.Path:
 848        """
 849        Return the path for the stop file (created when manually stopped).
 850        """
 851        return self.path / '.stop.json'
 852
 853    @property
 854    def log_path(self) -> pathlib.Path:
 855        """
 856        Return the log path.
 857        """
 858        return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 859
 860    @property
 861    def stdin_file_path(self) -> pathlib.Path:
 862        """
 863        Return the stdin file path.
 864        """
 865        return self.path / 'input.stdin'
 866
 867    @property
 868    def blocking_stdin_file_path(self) -> pathlib.Path:
 869        """
 870        Return the stdin file path.
 871        """
 872        if '_blocking_stdin_file_path' in self.__dict__:
 873            return self._blocking_stdin_file_path
 874
 875        return self.path / 'input.stdin.block'
 876
 877    @property
 878    def log_offset_path(self) -> pathlib.Path:
 879        """
 880        Return the log offset file path.
 881        """
 882        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 883
 884    @property
 885    def rotating_log(self) -> RotatingFile:
 886        """
 887        The rotating log file for the daemon's output.
 888        """
 889        if '_rotating_log' in self.__dict__:
 890            return self._rotating_log
 891
 892        write_timestamps = (
 893            self.properties.get('logs', {}).get('write_timestamps', None)
 894        )
 895        if write_timestamps is None:
 896            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 897
 898        self._rotating_log = RotatingFile(
 899            self.log_path,
 900            redirect_streams=True,
 901            write_timestamps=write_timestamps,
 902            timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'),
 903        )
 904        return self._rotating_log
 905
 906    @property
 907    def stdin_file(self):
 908        """
 909        Return the file handler for the stdin file.
 910        """
 911        if (stdin_file := self.__dict__.get('_stdin_file', None)):
 912            return stdin_file
 913
 914        self._stdin_file = StdinFile(
 915            self.stdin_file_path,
 916            lock_file_path=self.blocking_stdin_file_path,
 917        )
 918        return self._stdin_file
 919
 920    @property
 921    def log_text(self) -> Optional[str]:
 922        """
 923        Read the log files and return their contents.
 924        Returns `None` if the log file does not exist.
 925        """
 926        new_rotating_log = RotatingFile(
 927            self.rotating_log.file_path,
 928            num_files_to_keep = self.rotating_log.num_files_to_keep,
 929            max_file_size = self.rotating_log.max_file_size,
 930            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'),
 931            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'),
 932        )
 933        return new_rotating_log.read()
 934
 935    def readlines(self) -> List[str]:
 936        """
 937        Read the next log lines, persisting the cursor for later use.
 938        Note this will alter the cursor of `self.rotating_log`.
 939        """
 940        self.rotating_log._cursor = self._read_log_offset()
 941        lines = self.rotating_log.readlines()
 942        self._write_log_offset()
 943        return lines
 944
 945    def _read_log_offset(self) -> Tuple[int, int]:
 946        """
 947        Return the current log offset cursor.
 948
 949        Returns
 950        -------
 951        A tuple of the form (`subfile_index`, `position`).
 952        """
 953        if not self.log_offset_path.exists():
 954            return 0, 0
 955
 956        with open(self.log_offset_path, 'r', encoding='utf-8') as f:
 957            cursor_text = f.read()
 958        cursor_parts = cursor_text.split(' ')
 959        subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
 960        return subfile_index, subfile_position
 961
 962    def _write_log_offset(self) -> None:
 963        """
 964        Write the current log offset file.
 965        """
 966        with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
 967            subfile_index = self.rotating_log._cursor[0]
 968            subfile_position = self.rotating_log._cursor[1]
 969            f.write(f"{subfile_index} {subfile_position}")
 970
 971    @property
 972    def pid(self) -> Union[int, None]:
 973        """
 974        Read the PID file and return its contents.
 975        Returns `None` if the PID file does not exist.
 976        """
 977        if not self.pid_path.exists():
 978            return None
 979        try:
 980            with open(self.pid_path, 'r', encoding='utf-8') as f:
 981                text = f.read()
 982            if len(text) == 0:
 983                return None
 984            pid = int(text.rstrip())
 985        except Exception as e:
 986            warn(e)
 987            text = None
 988            pid = None
 989        return pid
 990
 991    @property
 992    def pid_path(self) -> pathlib.Path:
 993        """
 994        Return the path to a file containing the PID for this Daemon.
 995        """
 996        return self.path / 'process.pid'
 997
 998    @property
 999    def pid_lock(self) -> 'fasteners.InterProcessLock':
1000        """
1001        Return the process lock context manager.
1002        """
1003        if '_pid_lock' in self.__dict__:
1004            return self._pid_lock
1005
1006        fasteners = attempt_import('fasteners')
1007        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1008        return self._pid_lock
1009
1010    @property
1011    def pickle_path(self) -> pathlib.Path:
1012        """
1013        Return the path for the pickle file.
1014        """
1015        return self.path / 'pickle.pkl'
1016
1017    def read_properties(self) -> Optional[Dict[str, Any]]:
1018        """Read the properties JSON file and return the dictionary."""
1019        if not self.properties_path.exists():
1020            return None
1021        try:
1022            with open(self.properties_path, 'r', encoding='utf-8') as file:
1023                properties = json.load(file)
1024        except Exception:
1025            properties = {}
1026        
1027        return properties or {}
1028
1029    def read_pickle(self) -> Daemon:
1030        """Read a Daemon's pickle file and return the `Daemon`."""
1031        import pickle
1032        import traceback
1033        if not self.pickle_path.exists():
1034            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1035
1036        if self.pickle_path.stat().st_size == 0:
1037            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1038
1039        try:
1040            with open(self.pickle_path, 'rb') as pickle_file:
1041                daemon = pickle.load(pickle_file)
1042            success, msg = True, 'Success'
1043        except Exception as e:
1044            success, msg = False, str(e)
1045            daemon = None
1046            traceback.print_exception(type(e), e, e.__traceback__)
1047        if not success:
1048            error(msg)
1049        return daemon
1050
1051    @property
1052    def properties(self) -> Dict[str, Any]:
1053        """
1054        Return the contents of the properties JSON file.
1055        """
1056        try:
1057            _file_properties = self.read_properties() or {}
1058        except Exception:
1059            traceback.print_exc()
1060            _file_properties = {}
1061
1062        if not self._properties:
1063            self._properties = _file_properties
1064
1065        if self._properties is None:
1066            self._properties = {}
1067
1068        if (
1069            self._properties.get('result', None) is None
1070            and _file_properties.get('result', None) is not None
1071        ):
1072            _ = self._properties.pop('result', None)
1073
1074        if _file_properties is not None:
1075            self._properties = apply_patch_to_config(
1076                _file_properties,
1077                self._properties,
1078            )
1079
1080        return self._properties
1081
1082    @property
1083    def hidden(self) -> bool:
1084        """
1085        Return a bool indicating whether this Daemon should be displayed.
1086        """
1087        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1088
1089    def write_properties(self) -> SuccessTuple:
1090        """Write the properties dictionary to the properties JSON file
1091        (only if self.properties exists).
1092        """
1093        success, msg = (
1094            False,
1095            f"No properties to write for daemon '{self.daemon_id}'."
1096        )
1097        if self.properties is not None:
1098            try:
1099                self.path.mkdir(parents=True, exist_ok=True)
1100                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1101                    json.dump(self.properties, properties_file)
1102                success, msg = True, 'Success'
1103            except Exception as e:
1104                success, msg = False, str(e)
1105        return success, msg
1106
1107    def write_pickle(self) -> SuccessTuple:
1108        """Write the pickle file for the daemon."""
1109        import pickle
1110        import traceback
1111        try:
1112            self.path.mkdir(parents=True, exist_ok=True)
1113            with open(self.pickle_path, 'wb+') as pickle_file:
1114                pickle.dump(self, pickle_file)
1115            success, msg = True, "Success"
1116        except Exception as e:
1117            success, msg = False, str(e)
1118            traceback.print_exception(type(e), e, e.__traceback__)
1119        return success, msg
1120
1121
1122    def _setup(
1123        self,
1124        allow_dirty_run: bool = False,
1125    ) -> None:
1126        """
1127        Update properties before starting the Daemon.
1128        """
1129        if self.properties is None:
1130            self._properties = {}
1131
1132        self._properties.update({
1133            'target': {
1134                'name': self.target.__name__,
1135                'module': self.target.__module__,
1136                'args': self.target_args,
1137                'kw': self.target_kw,
1138            },
1139        })
1140        self.mkdir_if_not_exists(allow_dirty_run)
1141        _write_properties_success_tuple = self.write_properties()
1142        if not _write_properties_success_tuple[0]:
1143            error(_write_properties_success_tuple[1])
1144
1145        _write_pickle_success_tuple = self.write_pickle()
1146        if not _write_pickle_success_tuple[0]:
1147            error(_write_pickle_success_tuple[1])
1148
1149    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1150        """
1151        Remove a daemon's directory after execution.
1152
1153        Parameters
1154        ----------
1155        keep_logs: bool, default False
1156            If `True`, skip deleting the daemon's log files.
1157
1158        Returns
1159        -------
1160        A `SuccessTuple` indicating success.
1161        """
1162        if self.path.exists():
1163            try:
1164                shutil.rmtree(self.path)
1165            except Exception as e:
1166                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1167                warn(msg)
1168                return False, msg
1169        if not keep_logs:
1170            self.rotating_log.delete()
1171            try:
1172                if self.log_offset_path.exists():
1173                    self.log_offset_path.unlink()
1174            except Exception as e:
1175                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1176                warn(msg)
1177                return False, msg
1178        return True, "Success"
1179
1180
1181    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1182        """
1183        Return the timeout value to use. Use `--timeout-seconds` if provided,
1184        else the configured default (8).
1185        """
1186        if isinstance(timeout, (int, float)):
1187            return timeout
1188        return get_config('jobs', 'timeout_seconds')
1189
1190
1191    def get_check_timeout_interval_seconds(
1192        self,
1193        check_timeout_interval: Union[int, float, None] = None,
1194    ) -> Union[int, float]:
1195        """
1196        Return the interval value to check the status of timeouts.
1197        """
1198        if isinstance(check_timeout_interval, (int, float)):
1199            return check_timeout_interval
1200        return get_config('jobs', 'check_timeout_interval_seconds')
1201
1202    @property
1203    def target_args(self) -> Union[Tuple[Any], None]:
1204        """
1205        Return the positional arguments to pass to the target function.
1206        """
1207        target_args = (
1208            self.__dict__.get('_target_args', None)
1209            or self.properties.get('target', {}).get('args', None)
1210        )
1211        if target_args is None:
1212            return tuple([])
1213
1214        return tuple(target_args)
1215
1216    @property
1217    def target_kw(self) -> Union[Dict[str, Any], None]:
1218        """
1219        Return the keyword arguments to pass to the target function.
1220        """
1221        target_kw = (
1222            self.__dict__.get('_target_kw', None)
1223            or self.properties.get('target', {}).get('kw', None)
1224        )
1225        if target_kw is None:
1226            return {}
1227
1228        return {key: val for key, val in target_kw.items()}
1229
1230    def __getstate__(self):
1231        """
1232        Pickle this Daemon.
1233        """
1234        dill = attempt_import('dill')
1235        return {
1236            'target': dill.dumps(self.target),
1237            'target_args': self.target_args,
1238            'target_kw': self.target_kw,
1239            'daemon_id': self.daemon_id,
1240            'label': self.label,
1241            'properties': self.properties,
1242        }
1243
1244    def __setstate__(self, _state: Dict[str, Any]):
1245        """
1246        Restore this Daemon from a pickled state.
1247        If the properties file exists, skip the old pickled version.
1248        """
1249        dill = attempt_import('dill')
1250        _state['target'] = dill.loads(_state['target'])
1251        self._pickle = True
1252        daemon_id = _state.get('daemon_id', None)
1253        if not daemon_id:
1254            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1255
1256        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1257        ignore_properties = properties_path.exists()
1258        if ignore_properties:
1259            _state = {
1260                key: val
1261                for key, val in _state.items()
1262                if key != 'properties'
1263            }
1264        self.__init__(**_state)
1265
1266
1267    def __repr__(self):
1268        return str(self)
1269
1270    def __str__(self):
1271        return self.daemon_id
1272
1273    def __eq__(self, other):
1274        if not isinstance(other, Daemon):
1275            return False
1276        return self.daemon_id == other.daemon_id
1277
1278    def __hash__(self):
1279        return hash(self.daemon_id)

Daemonize Python functions into background processes.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 |

>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |

>>> mrsm.pprint(daemon.properties)
{
    'label': 'print',
    'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
    'result': None,
    'process': {'ended': '2024-07-29T18:03:33.752806'}
}
Daemon( target: Optional[Callable[[Any], Any]] = None, target_args: Union[List[Any], Tuple[Any], NoneType] = None, target_kw: Optional[Dict[str, Any]] = None, env: Optional[Dict[str, str]] = None, daemon_id: Optional[str] = None, label: Optional[str] = None, properties: Optional[Dict[str, Any]] = None)
139    def __init__(
140        self,
141        target: Optional[Callable[[Any], Any]] = None,
142        target_args: Union[List[Any], Tuple[Any], None] = None,
143        target_kw: Optional[Dict[str, Any]] = None,
144        env: Optional[Dict[str, str]] = None,
145        daemon_id: Optional[str] = None,
146        label: Optional[str] = None,
147        properties: Optional[Dict[str, Any]] = None,
148    ):
149        """
150        Parameters
151        ----------
152        target: Optional[Callable[[Any], Any]], default None,
153            The function to execute in a child process.
154
155        target_args: Union[List[Any], Tuple[Any], None], default None
156            Positional arguments to pass to the target function.
157
158        target_kw: Optional[Dict[str, Any]], default None
159            Keyword arguments to pass to the target function.
160
161        env: Optional[Dict[str, str]], default None
162            If provided, set these environment variables in the daemon process.
163
164        daemon_id: Optional[str], default None
165            Build a `Daemon` from an existing `daemon_id`.
166            If `daemon_id` is provided, other arguments are ignored and are derived
167            from the existing pickled `Daemon`.
168
169        label: Optional[str], default None
170            Label string to help identifiy a daemon.
171            If `None`, use the function name instead.
172
173        properties: Optional[Dict[str, Any]], default None
174            Override reading from the properties JSON by providing an existing dictionary.
175        """
176        _pickle = self.__dict__.get('_pickle', False)
177        if daemon_id is not None:
178            self.daemon_id = daemon_id
179            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
180
181                if not self.properties_path.exists():
182                    raise Exception(
183                        f"Daemon '{self.daemon_id}' does not exist. "
184                        + "Pass a target to create a new Daemon."
185                    )
186
187                try:
188                    new_daemon = self.from_properties_file(daemon_id)
189                except Exception:
190                    new_daemon = None
191
192                if new_daemon is not None:
193                    new_daemon.write_pickle()
194                    target = new_daemon.target
195                    target_args = new_daemon.target_args
196                    target_kw = new_daemon.target_kw
197                    label = new_daemon.label
198                    self._properties = new_daemon.properties
199                else:
200                    try:
201                        self.properties_path.unlink()
202                    except Exception:
203                        pass
204
205                    raise Exception(
206                        f"Could not recover daemon '{self.daemon_id}' "
207                        + "from its properties file."
208                    )
209
210        if 'target' not in self.__dict__:
211            if target is None:
212                error("Cannot create a Daemon without a target.")
213            self.target = target
214
215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
216        if '_target_args' not in self.__dict__:
217            self._target_args = target_args
218        if '_target_kw' not in self.__dict__:
219            self._target_kw = target_kw
220
221        if 'label' not in self.__dict__:
222            if label is None:
223                label = (
224                    self.target.__name__ if '__name__' in self.target.__dir__()
225                        else str(self.target)
226                )
227            self.label = label
228        if 'daemon_id' not in self.__dict__:
229            self.daemon_id = get_new_daemon_name()
230        if '_properties' not in self.__dict__:
231            self._properties = properties
232        if self._properties is None:
233            self._properties = {}
234
235        self._properties.update({'label': self.label})
236        if env:
237            self._properties.update({'env': env})
238
239        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
240        _ = self.process
Parameters
  • target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
  • target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
  • target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
  • daemon_id (Optional[str], default None): Build a Daemon from an existing daemon_id. If daemon_id is provided, other arguments are ignored and are derived from the existing pickled Daemon.
  • label (Optional[str], default None): Label string to help identifiy a daemon. If None, use the function name instead.
  • properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
@classmethod
def from_properties_file(cls, daemon_id: str) -> Daemon:
 92    @classmethod
 93    def from_properties_file(cls, daemon_id: str) -> Daemon:
 94        """
 95        Return a Daemon from a properties dictionary.
 96        """
 97        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
 98        if not properties_path.exists():
 99            raise OSError(f"Properties file '{properties_path}' does not exist.")
100
101        try:
102            with open(properties_path, 'r', encoding='utf-8') as f:
103                properties = json.load(f)
104        except Exception:
105            properties = {}
106
107        if not properties:
108            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
109
110        daemon_id = properties_path.parent.name
111        target_cf = properties.get('target', {})
112        target_module_name = target_cf.get('module', None)
113        target_function_name = target_cf.get('name', None)
114        target_args = target_cf.get('args', None)
115        target_kw = target_cf.get('kw', None)
116        label = properties.get('label', None)
117
118        if None in [
119            target_module_name,
120            target_function_name,
121            target_args,
122            target_kw,
123        ]:
124            raise ValueError("Missing target function information.")
125
126        target_module = importlib.import_module(target_module_name)
127        target_function = getattr(target_module, target_function_name)
128
129        return Daemon(
130            daemon_id=daemon_id,
131            target=target_function,
132            target_args=target_args,
133            target_kw=target_kw,
134            properties=properties,
135            label=label,
136        )

Return a Daemon from a properties dictionary.

def run( self, keep_daemon_output: bool = True, allow_dirty_run: bool = False, debug: bool = False) -> Tuple[bool, str]:
395    def run(
396        self,
397        keep_daemon_output: bool = True,
398        allow_dirty_run: bool = False,
399        debug: bool = False,
400    ) -> SuccessTuple:
401        """Run the daemon as a child process and continue executing the parent.
402
403        Parameters
404        ----------
405        keep_daemon_output: bool, default True
406            If `False`, delete the daemon's output directory upon exiting.
407
408        allow_dirty_run: bool, default False
409            If `True`, run the daemon, even if the `daemon_id` directory exists.
410            This option is dangerous because if the same `daemon_id` runs concurrently,
411            the last to finish will overwrite the output of the first.
412
413        Returns
414        -------
415        A SuccessTuple indicating success.
416
417        """
418        import platform
419        if platform.system() == 'Windows':
420            return False, "Cannot run background jobs on Windows."
421
422        ### The daemon might exist and be paused.
423        if self.status == 'paused':
424            return self.resume()
425
426        self._remove_stop_file()
427        if self.status == 'running':
428            return True, f"Daemon '{self}' is already running."
429
430        self.mkdir_if_not_exists(allow_dirty_run)
431        _write_pickle_success_tuple = self.write_pickle()
432        if not _write_pickle_success_tuple[0]:
433            return _write_pickle_success_tuple
434
435        _launch_daemon_code = (
436            "from meerschaum.utils.daemon import Daemon; "
437            + f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
438            + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
439            + "allow_dirty_run=True)"
440        )
441        env = dict(os.environ)
442        env[STATIC_CONFIG['environment']['noninteractive']] = 'true'
443        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
444        msg = (
445            "Success"
446            if _launch_success_bool
447            else f"Failed to start daemon '{self.daemon_id}'."
448        )
449        return _launch_success_bool, msg

Run the daemon as a child process and continue executing the parent.

Parameters
  • keep_daemon_output (bool, default True): If False, delete the daemon's output directory upon exiting.
  • allow_dirty_run (bool, default False): If True, run the daemon, even if the daemon_id directory exists. This option is dangerous because if the same daemon_id runs concurrently, the last to finish will overwrite the output of the first.
Returns
  • A SuccessTuple indicating success.
def kill(self, timeout: Union[int, float, NoneType] = 8) -> Tuple[bool, str]:
451    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
452        """
453        Forcibly terminate a running daemon.
454        Sends a SIGTERM signal to the process.
455
456        Parameters
457        ----------
458        timeout: Optional[int], default 3
459            How many seconds to wait for the process to terminate.
460
461        Returns
462        -------
463        A SuccessTuple indicating success.
464        """
465        if self.status != 'paused':
466            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
467            if success:
468                self._write_stop_file('kill')
469                return success, msg
470
471        if self.status == 'stopped':
472            self._write_stop_file('kill')
473            return True, "Process has already stopped."
474
475        psutil = attempt_import('psutil')
476        process = self.process
477        try:
478            process.terminate()
479            process.kill()
480            process.wait(timeout=timeout)
481        except Exception as e:
482            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
483
484        try:
485            if process.status():
486                return False, "Failed to stop daemon '{self}' ({process})."
487        except psutil.NoSuchProcess:
488            pass
489
490        if self.pid_path.exists():
491            try:
492                self.pid_path.unlink()
493            except Exception:
494                pass
495
496        self._write_stop_file('kill')
497        return True, "Success"

Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.

Parameters
  • timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
  • A SuccessTuple indicating success.
def quit(self, timeout: Union[int, float, NoneType] = None) -> Tuple[bool, str]:
499    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
500        """Gracefully quit a running daemon."""
501        if self.status == 'paused':
502            return self.kill(timeout)
503
504        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
505        if signal_success:
506            self._write_stop_file('quit')
507        return signal_success, signal_msg

Gracefully quit a running daemon.

def pause( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
509    def pause(
510        self,
511        timeout: Union[int, float, None] = None,
512        check_timeout_interval: Union[float, int, None] = None,
513    ) -> SuccessTuple:
514        """
515        Pause the daemon if it is running.
516
517        Parameters
518        ----------
519        timeout: Union[float, int, None], default None
520            The maximum number of seconds to wait for a process to suspend.
521
522        check_timeout_interval: Union[float, int, None], default None
523            The number of seconds to wait between checking if the process is still running.
524
525        Returns
526        -------
527        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
528        """
529        if self.process is None:
530            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
531
532        if self.status == 'paused':
533            return True, f"Daemon '{self.daemon_id}' is already paused."
534
535        self._write_stop_file('pause')
536        try:
537            self.process.suspend()
538        except Exception as e:
539            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
540
541        timeout = self.get_timeout_seconds(timeout)
542        check_timeout_interval = self.get_check_timeout_interval_seconds(
543            check_timeout_interval
544        )
545
546        psutil = attempt_import('psutil')
547
548        if not timeout:
549            try:
550                success = self.process.status() == 'stopped'
551            except psutil.NoSuchProcess:
552                success = True
553            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
554            if success:
555                self._capture_process_timestamp('paused')
556            return success, msg
557
558        begin = time.perf_counter()
559        while (time.perf_counter() - begin) < timeout:
560            try:
561                if self.process.status() == 'stopped':
562                    self._capture_process_timestamp('paused')
563                    return True, "Success"
564            except psutil.NoSuchProcess as e:
565                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
566            time.sleep(check_timeout_interval)
567
568        return False, (
569            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
570            + ('s' if timeout != 1 else '') + '.'
571        )

Pause the daemon if it is running.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully suspended.
def resume( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
573    def resume(
574        self,
575        timeout: Union[int, float, None] = None,
576        check_timeout_interval: Union[float, int, None] = None,
577    ) -> SuccessTuple:
578        """
579        Resume the daemon if it is paused.
580
581        Parameters
582        ----------
583        timeout: Union[float, int, None], default None
584            The maximum number of seconds to wait for a process to resume.
585
586        check_timeout_interval: Union[float, int, None], default None
587            The number of seconds to wait between checking if the process is still stopped.
588
589        Returns
590        -------
591        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
592        """
593        if self.status == 'running':
594            return True, f"Daemon '{self.daemon_id}' is already running."
595
596        if self.status == 'stopped':
597            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
598
599        self._remove_stop_file()
600        try:
601            self.process.resume()
602        except Exception as e:
603            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
604
605        timeout = self.get_timeout_seconds(timeout)
606        check_timeout_interval = self.get_check_timeout_interval_seconds(
607            check_timeout_interval
608        )
609
610        if not timeout:
611            success = self.status == 'running'
612            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
613            if success:
614                self._capture_process_timestamp('began')
615            return success, msg
616
617        begin = time.perf_counter()
618        while (time.perf_counter() - begin) < timeout:
619            if self.status == 'running':
620                self._capture_process_timestamp('began')
621                return True, "Success"
622            time.sleep(check_timeout_interval)
623
624        return False, (
625            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
626            + ('s' if timeout != 1 else '') + '.'
627        )

Resume the daemon if it is paused.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully resumed.
def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
748    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
749        """Create the Daemon's directory.
750        If `allow_dirty_run` is `False` and the directory already exists,
751        raise a `FileExistsError`.
752        """
753        try:
754            self.path.mkdir(parents=True, exist_ok=True)
755            _already_exists = any(os.scandir(self.path))
756        except FileExistsError:
757            _already_exists = True
758
759        if _already_exists and not allow_dirty_run:
760            error(
761                f"Daemon '{self.daemon_id}' already exists. " +
762                "To allow this daemon to run, do one of the following:\n"
763                + "  - Execute `daemon.cleanup()`.\n"
764                + f"  - Delete the directory '{self.path}'.\n"
765                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
766                FileExistsError,
767            )

Create the Daemon's directory. If allow_dirty_run is False and the directory already exists, raise a FileExistsError.

process: "Union['psutil.Process', None]"
769    @property
770    def process(self) -> Union['psutil.Process', None]:
771        """
772        Return the psutil process for the Daemon.
773        """
774        psutil = attempt_import('psutil')
775        pid = self.pid
776        if pid is None:
777            return None
778        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
779            try:
780                self._process = psutil.Process(int(pid))
781                process_exists = True
782            except Exception:
783                process_exists = False
784            if not process_exists:
785                _ = self.__dict__.pop('_process', None)
786                try:
787                    if self.pid_path.exists():
788                        self.pid_path.unlink()
789                except Exception:
790                    pass
791                return None
792        return self._process

Return the psutil process for the Daemon.

status: str
794    @property
795    def status(self) -> str:
796        """
797        Return the running status of this Daemon.
798        """
799        if self.process is None:
800            return 'stopped'
801
802        psutil = attempt_import('psutil')
803        try:
804            if self.process.status() == 'stopped':
805                return 'paused'
806            if self.process.status() == 'zombie':
807                raise psutil.NoSuchProcess(self.process.pid)
808        except (psutil.NoSuchProcess, AttributeError):
809            if self.pid_path.exists():
810                try:
811                    self.pid_path.unlink()
812                except Exception:
813                    pass
814            return 'stopped'
815
816        return 'running'

Return the running status of this Daemon.

path: pathlib.Path
825    @property
826    def path(self) -> pathlib.Path:
827        """
828        Return the path for this Daemon's directory.
829        """
830        return self._get_path_from_daemon_id(self.daemon_id)

Return the path for this Daemon's directory.

properties_path: pathlib.Path
839    @property
840    def properties_path(self) -> pathlib.Path:
841        """
842        Return the `propterties.json` path for this Daemon.
843        """
844        return self._get_properties_path_from_daemon_id(self.daemon_id)

Return the propterties.json path for this Daemon.

stop_path: pathlib.Path
846    @property
847    def stop_path(self) -> pathlib.Path:
848        """
849        Return the path for the stop file (created when manually stopped).
850        """
851        return self.path / '.stop.json'

Return the path for the stop file (created when manually stopped).

log_path: pathlib.Path
853    @property
854    def log_path(self) -> pathlib.Path:
855        """
856        Return the log path.
857        """
858        return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')

Return the log path.

stdin_file_path: pathlib.Path
860    @property
861    def stdin_file_path(self) -> pathlib.Path:
862        """
863        Return the stdin file path.
864        """
865        return self.path / 'input.stdin'

Return the stdin file path.

blocking_stdin_file_path: pathlib.Path
867    @property
868    def blocking_stdin_file_path(self) -> pathlib.Path:
869        """
870        Return the stdin file path.
871        """
872        if '_blocking_stdin_file_path' in self.__dict__:
873            return self._blocking_stdin_file_path
874
875        return self.path / 'input.stdin.block'

Return the stdin file path.

log_offset_path: pathlib.Path
877    @property
878    def log_offset_path(self) -> pathlib.Path:
879        """
880        Return the log offset file path.
881        """
882        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')

Return the log offset file path.

884    @property
885    def rotating_log(self) -> RotatingFile:
886        """
887        The rotating log file for the daemon's output.
888        """
889        if '_rotating_log' in self.__dict__:
890            return self._rotating_log
891
892        write_timestamps = (
893            self.properties.get('logs', {}).get('write_timestamps', None)
894        )
895        if write_timestamps is None:
896            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
897
898        self._rotating_log = RotatingFile(
899            self.log_path,
900            redirect_streams=True,
901            write_timestamps=write_timestamps,
902            timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'),
903        )
904        return self._rotating_log

The rotating log file for the daemon's output.

stdin_file
906    @property
907    def stdin_file(self):
908        """
909        Return the file handler for the stdin file.
910        """
911        if (stdin_file := self.__dict__.get('_stdin_file', None)):
912            return stdin_file
913
914        self._stdin_file = StdinFile(
915            self.stdin_file_path,
916            lock_file_path=self.blocking_stdin_file_path,
917        )
918        return self._stdin_file

Return the file handler for the stdin file.

log_text: Optional[str]
920    @property
921    def log_text(self) -> Optional[str]:
922        """
923        Read the log files and return their contents.
924        Returns `None` if the log file does not exist.
925        """
926        new_rotating_log = RotatingFile(
927            self.rotating_log.file_path,
928            num_files_to_keep = self.rotating_log.num_files_to_keep,
929            max_file_size = self.rotating_log.max_file_size,
930            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'),
931            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'),
932        )
933        return new_rotating_log.read()

Read the log files and return their contents. Returns None if the log file does not exist.

def readlines(self) -> List[str]:
935    def readlines(self) -> List[str]:
936        """
937        Read the next log lines, persisting the cursor for later use.
938        Note this will alter the cursor of `self.rotating_log`.
939        """
940        self.rotating_log._cursor = self._read_log_offset()
941        lines = self.rotating_log.readlines()
942        self._write_log_offset()
943        return lines

Read the next log lines, persisting the cursor for later use. Note this will alter the cursor of self.rotating_log.

pid: Optional[int]
971    @property
972    def pid(self) -> Union[int, None]:
973        """
974        Read the PID file and return its contents.
975        Returns `None` if the PID file does not exist.
976        """
977        if not self.pid_path.exists():
978            return None
979        try:
980            with open(self.pid_path, 'r', encoding='utf-8') as f:
981                text = f.read()
982            if len(text) == 0:
983                return None
984            pid = int(text.rstrip())
985        except Exception as e:
986            warn(e)
987            text = None
988            pid = None
989        return pid

Read the PID file and return its contents. Returns None if the PID file does not exist.

pid_path: pathlib.Path
991    @property
992    def pid_path(self) -> pathlib.Path:
993        """
994        Return the path to a file containing the PID for this Daemon.
995        """
996        return self.path / 'process.pid'

Return the path to a file containing the PID for this Daemon.

pid_lock: "'fasteners.InterProcessLock'"
 998    @property
 999    def pid_lock(self) -> 'fasteners.InterProcessLock':
1000        """
1001        Return the process lock context manager.
1002        """
1003        if '_pid_lock' in self.__dict__:
1004            return self._pid_lock
1005
1006        fasteners = attempt_import('fasteners')
1007        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1008        return self._pid_lock

Return the process lock context manager.

pickle_path: pathlib.Path
1010    @property
1011    def pickle_path(self) -> pathlib.Path:
1012        """
1013        Return the path for the pickle file.
1014        """
1015        return self.path / 'pickle.pkl'

Return the path for the pickle file.

def read_properties(self) -> Optional[Dict[str, Any]]:
1017    def read_properties(self) -> Optional[Dict[str, Any]]:
1018        """Read the properties JSON file and return the dictionary."""
1019        if not self.properties_path.exists():
1020            return None
1021        try:
1022            with open(self.properties_path, 'r', encoding='utf-8') as file:
1023                properties = json.load(file)
1024        except Exception:
1025            properties = {}
1026        
1027        return properties or {}

Read the properties JSON file and return the dictionary.

def read_pickle(self) -> Daemon:
1029    def read_pickle(self) -> Daemon:
1030        """Read a Daemon's pickle file and return the `Daemon`."""
1031        import pickle
1032        import traceback
1033        if not self.pickle_path.exists():
1034            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1035
1036        if self.pickle_path.stat().st_size == 0:
1037            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1038
1039        try:
1040            with open(self.pickle_path, 'rb') as pickle_file:
1041                daemon = pickle.load(pickle_file)
1042            success, msg = True, 'Success'
1043        except Exception as e:
1044            success, msg = False, str(e)
1045            daemon = None
1046            traceback.print_exception(type(e), e, e.__traceback__)
1047        if not success:
1048            error(msg)
1049        return daemon

Read a Daemon's pickle file and return the Daemon.

properties: Dict[str, Any]
1051    @property
1052    def properties(self) -> Dict[str, Any]:
1053        """
1054        Return the contents of the properties JSON file.
1055        """
1056        try:
1057            _file_properties = self.read_properties() or {}
1058        except Exception:
1059            traceback.print_exc()
1060            _file_properties = {}
1061
1062        if not self._properties:
1063            self._properties = _file_properties
1064
1065        if self._properties is None:
1066            self._properties = {}
1067
1068        if (
1069            self._properties.get('result', None) is None
1070            and _file_properties.get('result', None) is not None
1071        ):
1072            _ = self._properties.pop('result', None)
1073
1074        if _file_properties is not None:
1075            self._properties = apply_patch_to_config(
1076                _file_properties,
1077                self._properties,
1078            )
1079
1080        return self._properties

Return the contents of the properties JSON file.

hidden: bool
1082    @property
1083    def hidden(self) -> bool:
1084        """
1085        Return a bool indicating whether this Daemon should be displayed.
1086        """
1087        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')

Return a bool indicating whether this Daemon should be displayed.

def write_properties(self) -> Tuple[bool, str]:
1089    def write_properties(self) -> SuccessTuple:
1090        """Write the properties dictionary to the properties JSON file
1091        (only if self.properties exists).
1092        """
1093        success, msg = (
1094            False,
1095            f"No properties to write for daemon '{self.daemon_id}'."
1096        )
1097        if self.properties is not None:
1098            try:
1099                self.path.mkdir(parents=True, exist_ok=True)
1100                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1101                    json.dump(self.properties, properties_file)
1102                success, msg = True, 'Success'
1103            except Exception as e:
1104                success, msg = False, str(e)
1105        return success, msg

Write the properties dictionary to the properties JSON file (only if self.properties exists).

def write_pickle(self) -> Tuple[bool, str]:
1107    def write_pickle(self) -> SuccessTuple:
1108        """Write the pickle file for the daemon."""
1109        import pickle
1110        import traceback
1111        try:
1112            self.path.mkdir(parents=True, exist_ok=True)
1113            with open(self.pickle_path, 'wb+') as pickle_file:
1114                pickle.dump(self, pickle_file)
1115            success, msg = True, "Success"
1116        except Exception as e:
1117            success, msg = False, str(e)
1118            traceback.print_exception(type(e), e, e.__traceback__)
1119        return success, msg

Write the pickle file for the daemon.

def cleanup(self, keep_logs: bool = False) -> Tuple[bool, str]:
1149    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1150        """
1151        Remove a daemon's directory after execution.
1152
1153        Parameters
1154        ----------
1155        keep_logs: bool, default False
1156            If `True`, skip deleting the daemon's log files.
1157
1158        Returns
1159        -------
1160        A `SuccessTuple` indicating success.
1161        """
1162        if self.path.exists():
1163            try:
1164                shutil.rmtree(self.path)
1165            except Exception as e:
1166                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1167                warn(msg)
1168                return False, msg
1169        if not keep_logs:
1170            self.rotating_log.delete()
1171            try:
1172                if self.log_offset_path.exists():
1173                    self.log_offset_path.unlink()
1174            except Exception as e:
1175                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1176                warn(msg)
1177                return False, msg
1178        return True, "Success"

Remove a daemon's directory after execution.

Parameters
  • keep_logs (bool, default False): If True, skip deleting the daemon's log files.
Returns
  • A SuccessTuple indicating success.
def get_timeout_seconds(self, timeout: Union[int, float, NoneType] = None) -> Union[int, float]:
1181    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1182        """
1183        Return the timeout value to use. Use `--timeout-seconds` if provided,
1184        else the configured default (8).
1185        """
1186        if isinstance(timeout, (int, float)):
1187            return timeout
1188        return get_config('jobs', 'timeout_seconds')

Return the timeout value to use. Use --timeout-seconds if provided, else the configured default (8).

def get_check_timeout_interval_seconds( self, check_timeout_interval: Union[int, float, NoneType] = None) -> Union[int, float]:
1191    def get_check_timeout_interval_seconds(
1192        self,
1193        check_timeout_interval: Union[int, float, None] = None,
1194    ) -> Union[int, float]:
1195        """
1196        Return the interval value to check the status of timeouts.
1197        """
1198        if isinstance(check_timeout_interval, (int, float)):
1199            return check_timeout_interval
1200        return get_config('jobs', 'check_timeout_interval_seconds')

Return the interval value to check the status of timeouts.

target_args: Optional[Tuple[Any]]
1202    @property
1203    def target_args(self) -> Union[Tuple[Any], None]:
1204        """
1205        Return the positional arguments to pass to the target function.
1206        """
1207        target_args = (
1208            self.__dict__.get('_target_args', None)
1209            or self.properties.get('target', {}).get('args', None)
1210        )
1211        if target_args is None:
1212            return tuple([])
1213
1214        return tuple(target_args)

Return the positional arguments to pass to the target function.

target_kw: Optional[Dict[str, Any]]
1216    @property
1217    def target_kw(self) -> Union[Dict[str, Any], None]:
1218        """
1219        Return the keyword arguments to pass to the target function.
1220        """
1221        target_kw = (
1222            self.__dict__.get('_target_kw', None)
1223            or self.properties.get('target', {}).get('kw', None)
1224        )
1225        if target_kw is None:
1226            return {}
1227
1228        return {key: val for key, val in target_kw.items()}

Return the keyword arguments to pass to the target function.