meerschaum.utils.daemon.Daemon

Manage running daemons via the Daemon class.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Manage running daemons via the Daemon class.
   7"""
   8
   9from __future__ import annotations
  10import os
  11import importlib
  12import pathlib
  13import json
  14import shutil
  15import signal
  16import sys
  17import time
  18import traceback
  19from functools import partial
  20from datetime import datetime, timezone
  21
  22import meerschaum as mrsm
  23from meerschaum.utils.typing import (
  24    Optional, Dict, Any, SuccessTuple, Callable, List, Union,
  25    is_success_tuple, Tuple,
  26)
  27from meerschaum.config import get_config
  28from meerschaum.config.static import STATIC_CONFIG
  29from meerschaum.config._paths import (
  30    DAEMON_RESOURCES_PATH, LOGS_RESOURCES_PATH, DAEMON_ERROR_LOG_PATH,
  31)
  32from meerschaum.config._patch import apply_patch_to_config
  33from meerschaum.utils.warnings import warn, error
  34from meerschaum.utils.packages import attempt_import
  35from meerschaum.utils.venv import venv_exec
  36from meerschaum.utils.daemon._names import get_new_daemon_name
  37from meerschaum.utils.daemon.RotatingFile import RotatingFile
  38from meerschaum.utils.daemon.StdinFile import StdinFile
  39from meerschaum.utils.threading import RepeatTimer
  40from meerschaum.__main__ import _close_pools
  41
  42_daemons = []
  43_results = {}
  44
  45class Daemon:
  46    """
  47    Daemonize Python functions into background processes.
  48
  49    Examples
  50    --------
  51    >>> import meerschaum as mrsm
  52    >>> from meerschaum.utils.daemons import Daemon
  53    >>> daemon = Daemon(print, ('hi',))
  54    >>> success, msg = daemon.run()
  55    >>> print(daemon.log_text)
  56
  57    2024-07-29 18:03 | hi
  58    2024-07-29 18:03 |
  59    >>> daemon.run(allow_dirty_run=True)
  60    >>> print(daemon.log_text)
  61
  62    2024-07-29 18:03 | hi
  63    2024-07-29 18:03 |
  64    2024-07-29 18:05 | hi
  65    2024-07-29 18:05 |
  66    >>> mrsm.pprint(daemon.properties)
  67    {
  68        'label': 'print',
  69        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  70        'result': None,
  71        'process': {'ended': '2024-07-29T18:03:33.752806'}
  72    }
  73
  74    """
  75
  76    def __new__(
  77        cls,
  78        *args,
  79        daemon_id: Optional[str] = None,
  80        **kw
  81    ):
  82        """
  83        If a daemon_id is provided and already exists, read from its pickle file.
  84        """
  85        instance = super(Daemon, cls).__new__(cls)
  86        if daemon_id is not None:
  87            instance.daemon_id = daemon_id
  88            if instance.pickle_path.exists():
  89                instance = instance.read_pickle()
  90        return instance
  91
  92    @classmethod
  93    def from_properties_file(cls, daemon_id: str) -> Daemon:
  94        """
  95        Return a Daemon from a properties dictionary.
  96        """
  97        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  98        if not properties_path.exists():
  99            raise OSError(f"Properties file '{properties_path}' does not exist.")
 100
 101        try:
 102            with open(properties_path, 'r', encoding='utf-8') as f:
 103                properties = json.load(f)
 104        except Exception:
 105            properties = {}
 106
 107        if not properties:
 108            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 109
 110        daemon_id = properties_path.parent.name
 111        target_cf = properties.get('target', {})
 112        target_module_name = target_cf.get('module', None)
 113        target_function_name = target_cf.get('name', None)
 114        target_args = target_cf.get('args', None)
 115        target_kw = target_cf.get('kw', None)
 116        label = properties.get('label', None)
 117
 118        if None in [
 119            target_module_name,
 120            target_function_name,
 121            target_args,
 122            target_kw,
 123        ]:
 124            raise ValueError("Missing target function information.")
 125
 126        target_module = importlib.import_module(target_module_name)
 127        target_function = getattr(target_module, target_function_name)
 128
 129        return Daemon(
 130            daemon_id=daemon_id,
 131            target=target_function,
 132            target_args=target_args,
 133            target_kw=target_kw,
 134            properties=properties,
 135            label=label,
 136        )
 137
 138
 139    def __init__(
 140        self,
 141        target: Optional[Callable[[Any], Any]] = None,
 142        target_args: Union[List[Any], Tuple[Any], None] = None,
 143        target_kw: Optional[Dict[str, Any]] = None,
 144        env: Optional[Dict[str, str]] = None,
 145        daemon_id: Optional[str] = None,
 146        label: Optional[str] = None,
 147        properties: Optional[Dict[str, Any]] = None,
 148    ):
 149        """
 150        Parameters
 151        ----------
 152        target: Optional[Callable[[Any], Any]], default None,
 153            The function to execute in a child process.
 154
 155        target_args: Union[List[Any], Tuple[Any], None], default None
 156            Positional arguments to pass to the target function.
 157
 158        target_kw: Optional[Dict[str, Any]], default None
 159            Keyword arguments to pass to the target function.
 160
 161        env: Optional[Dict[str, str]], default None
 162            If provided, set these environment variables in the daemon process.
 163
 164        daemon_id: Optional[str], default None
 165            Build a `Daemon` from an existing `daemon_id`.
 166            If `daemon_id` is provided, other arguments are ignored and are derived
 167            from the existing pickled `Daemon`.
 168
 169        label: Optional[str], default None
 170            Label string to help identifiy a daemon.
 171            If `None`, use the function name instead.
 172
 173        properties: Optional[Dict[str, Any]], default None
 174            Override reading from the properties JSON by providing an existing dictionary.
 175        """
 176        _pickle = self.__dict__.get('_pickle', False)
 177        if daemon_id is not None:
 178            self.daemon_id = daemon_id
 179            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 180
 181                if not self.properties_path.exists():
 182                    raise Exception(
 183                        f"Daemon '{self.daemon_id}' does not exist. "
 184                        + "Pass a target to create a new Daemon."
 185                    )
 186
 187                try:
 188                    new_daemon = self.from_properties_file(daemon_id)
 189                except Exception:
 190                    new_daemon = None
 191
 192                if new_daemon is not None:
 193                    new_daemon.write_pickle()
 194                    target = new_daemon.target
 195                    target_args = new_daemon.target_args
 196                    target_kw = new_daemon.target_kw
 197                    label = new_daemon.label
 198                    self._properties = new_daemon.properties
 199                else:
 200                    try:
 201                        self.properties_path.unlink()
 202                    except Exception:
 203                        pass
 204
 205                    raise Exception(
 206                        f"Could not recover daemon '{self.daemon_id}' "
 207                        + "from its properties file."
 208                    )
 209
 210        if 'target' not in self.__dict__:
 211            if target is None:
 212                error("Cannot create a Daemon without a target.")
 213            self.target = target
 214
 215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 216        if '_target_args' not in self.__dict__:
 217            self._target_args = target_args
 218        if '_target_kw' not in self.__dict__:
 219            self._target_kw = target_kw
 220
 221        if 'label' not in self.__dict__:
 222            if label is None:
 223                label = (
 224                    self.target.__name__ if '__name__' in self.target.__dir__()
 225                        else str(self.target)
 226                )
 227            self.label = label
 228        if 'daemon_id' not in self.__dict__:
 229            self.daemon_id = get_new_daemon_name()
 230        if '_properties' not in self.__dict__:
 231            self._properties = properties
 232        if self._properties is None:
 233            self._properties = {}
 234
 235        self._properties.update({'label': self.label})
 236        if env:
 237            self._properties.update({'env': env})
 238
 239        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 240        _ = self.process
 241
 242
 243    def _run_exit(
 244        self,
 245        keep_daemon_output: bool = True,
 246        allow_dirty_run: bool = False,
 247    ) -> Any:
 248        """Run the daemon's target function.
 249        NOTE: This WILL EXIT the parent process!
 250
 251        Parameters
 252        ----------
 253        keep_daemon_output: bool, default True
 254            If `False`, delete the daemon's output directory upon exiting.
 255
 256        allow_dirty_run, bool, default False:
 257            If `True`, run the daemon, even if the `daemon_id` directory exists.
 258            This option is dangerous because if the same `daemon_id` runs twice,
 259            the last to finish will overwrite the output of the first.
 260
 261        Returns
 262        -------
 263        Nothing — this will exit the parent process.
 264        """
 265        import platform, sys, os, traceback
 266        from meerschaum.utils.warnings import warn
 267        from meerschaum.config import get_config
 268        daemon = attempt_import('daemon')
 269        lines = get_config('jobs', 'terminal', 'lines')
 270        columns = get_config('jobs', 'terminal', 'columns')
 271
 272        if platform.system() == 'Windows':
 273            return False, "Windows is no longer supported."
 274
 275        self._setup(allow_dirty_run)
 276
 277        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 278        ###       KeyboardInterrupts themselves.
 279        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 280        self._daemon_context = daemon.DaemonContext(
 281            pidfile=self.pid_lock,
 282            stdout=self.rotating_log,
 283            stderr=self.rotating_log,
 284            working_directory=os.getcwd(),
 285            detach_process=True,
 286            files_preserve=list(self.rotating_log.subfile_objects.values()),
 287            signal_map={
 288                signal.SIGTERM: self._handle_sigterm,
 289            },
 290        )
 291
 292        _daemons.append(self)
 293
 294        log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 295        self._log_refresh_timer = RepeatTimer(
 296            log_refresh_seconds,
 297            partial(self.rotating_log.refresh_files, start_interception=True),
 298        )
 299
 300        try:
 301            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 302            with self._daemon_context:
 303                sys.stdin = self.stdin_file
 304                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 305                os.environ['PYTHONUNBUFFERED'] = '1'
 306
 307                ### Allow the user to override environment variables.
 308                env = self.properties.get('env', {})
 309                if env and isinstance(env, dict):
 310                    os.environ.update({str(k): str(v) for k, v in env.items()})
 311
 312                self.rotating_log.refresh_files(start_interception=True)
 313                result = None
 314                try:
 315                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 316                        f.write(str(os.getpid()))
 317
 318                    self._log_refresh_timer.start()
 319                    self.properties['result'] = None
 320                    self._capture_process_timestamp('began')
 321                    result = self.target(*self.target_args, **self.target_kw)
 322                    self.properties['result'] = result
 323                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 324                    pass
 325                except Exception as e:
 326                    warn(
 327                        f"Exception in daemon target function: {traceback.format_exc()}",
 328                    )
 329                    result = e
 330                finally:
 331                    _results[self.daemon_id] = result
 332
 333                    if keep_daemon_output:
 334                        self._capture_process_timestamp('ended')
 335                    else:
 336                        self.cleanup()
 337
 338                    self._log_refresh_timer.cancel()
 339                    if self.pid is None and self.pid_path.exists():
 340                        self.pid_path.unlink()
 341
 342                    if is_success_tuple(result):
 343                        try:
 344                            mrsm.pprint(result)
 345                        except BrokenPipeError:
 346                            pass
 347
 348        except Exception as e:
 349            daemon_error = traceback.format_exc()
 350            with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 351                f.write(daemon_error)
 352            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 353
 354    def _capture_process_timestamp(
 355        self,
 356        process_key: str,
 357        write_properties: bool = True,
 358    ) -> None:
 359        """
 360        Record the current timestamp to the parameters `process:<process_key>`.
 361
 362        Parameters
 363        ----------
 364        process_key: str
 365            Under which key to store the timestamp.
 366
 367        write_properties: bool, default True
 368            If `True` persist the properties to disk immediately after capturing the timestamp.
 369        """
 370        if 'process' not in self.properties:
 371            self.properties['process'] = {}
 372
 373        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 374            raise ValueError(f"Invalid key '{process_key}'.")
 375
 376        self.properties['process'][process_key] = (
 377            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 378        )
 379        if write_properties:
 380            self.write_properties()
 381
 382    def run(
 383        self,
 384        keep_daemon_output: bool = True,
 385        allow_dirty_run: bool = False,
 386        debug: bool = False,
 387    ) -> SuccessTuple:
 388        """Run the daemon as a child process and continue executing the parent.
 389
 390        Parameters
 391        ----------
 392        keep_daemon_output: bool, default True
 393            If `False`, delete the daemon's output directory upon exiting.
 394
 395        allow_dirty_run: bool, default False
 396            If `True`, run the daemon, even if the `daemon_id` directory exists.
 397            This option is dangerous because if the same `daemon_id` runs concurrently,
 398            the last to finish will overwrite the output of the first.
 399
 400        Returns
 401        -------
 402        A SuccessTuple indicating success.
 403
 404        """
 405        import platform
 406        if platform.system() == 'Windows':
 407            return False, "Cannot run background jobs on Windows."
 408
 409        ### The daemon might exist and be paused.
 410        if self.status == 'paused':
 411            return self.resume()
 412
 413        self._remove_stop_file()
 414        if self.status == 'running':
 415            return True, f"Daemon '{self}' is already running."
 416
 417        self.mkdir_if_not_exists(allow_dirty_run)
 418        _write_pickle_success_tuple = self.write_pickle()
 419        if not _write_pickle_success_tuple[0]:
 420            return _write_pickle_success_tuple
 421
 422        _launch_daemon_code = (
 423            "from meerschaum.utils.daemon import Daemon; "
 424            + f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 425            + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 426            + "allow_dirty_run=True)"
 427        )
 428        env = dict(os.environ)
 429        env['MRSM_NOASK'] = 'true'
 430        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 431        msg = (
 432            "Success"
 433            if _launch_success_bool
 434            else f"Failed to start daemon '{self.daemon_id}'."
 435        )
 436        return _launch_success_bool, msg
 437
 438    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 439        """
 440        Forcibly terminate a running daemon.
 441        Sends a SIGTERM signal to the process.
 442
 443        Parameters
 444        ----------
 445        timeout: Optional[int], default 3
 446            How many seconds to wait for the process to terminate.
 447
 448        Returns
 449        -------
 450        A SuccessTuple indicating success.
 451        """
 452        if self.status != 'paused':
 453            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 454            if success:
 455                self._write_stop_file('kill')
 456                return success, msg
 457
 458        if self.status == 'stopped':
 459            self._write_stop_file('kill')
 460            return True, "Process has already stopped."
 461
 462        process = self.process
 463        try:
 464            process.terminate()
 465            process.kill()
 466            process.wait(timeout=timeout)
 467        except Exception as e:
 468            return False, f"Failed to kill job {self} with exception: {e}"
 469
 470        if self.pid_path.exists():
 471            try:
 472                self.pid_path.unlink()
 473            except Exception as e:
 474                pass
 475
 476        self._write_stop_file('kill')
 477        return True, "Success"
 478
 479    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 480        """Gracefully quit a running daemon."""
 481        if self.status == 'paused':
 482            return self.kill(timeout)
 483
 484        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 485        if signal_success:
 486            self._write_stop_file('quit')
 487        return signal_success, signal_msg
 488
 489    def pause(
 490        self,
 491        timeout: Union[int, float, None] = None,
 492        check_timeout_interval: Union[float, int, None] = None,
 493    ) -> SuccessTuple:
 494        """
 495        Pause the daemon if it is running.
 496
 497        Parameters
 498        ----------
 499        timeout: Union[float, int, None], default None
 500            The maximum number of seconds to wait for a process to suspend.
 501
 502        check_timeout_interval: Union[float, int, None], default None
 503            The number of seconds to wait between checking if the process is still running.
 504
 505        Returns
 506        -------
 507        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 508        """
 509        if self.process is None:
 510            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 511
 512        if self.status == 'paused':
 513            return True, f"Daemon '{self.daemon_id}' is already paused."
 514
 515        self._write_stop_file('pause')
 516        try:
 517            self.process.suspend()
 518        except Exception as e:
 519            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 520
 521        timeout = self.get_timeout_seconds(timeout)
 522        check_timeout_interval = self.get_check_timeout_interval_seconds(
 523            check_timeout_interval
 524        )
 525
 526        psutil = attempt_import('psutil')
 527
 528        if not timeout:
 529            try:
 530                success = self.process.status() == 'stopped'
 531            except psutil.NoSuchProcess as e:
 532                success = True
 533            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 534            if success:
 535                self._capture_process_timestamp('paused')
 536            return success, msg
 537
 538        begin = time.perf_counter()
 539        while (time.perf_counter() - begin) < timeout:
 540            try:
 541                if self.process.status() == 'stopped':
 542                    self._capture_process_timestamp('paused')
 543                    return True, "Success"
 544            except psutil.NoSuchProcess as e:
 545                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 546            time.sleep(check_timeout_interval)
 547
 548        return False, (
 549            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 550            + ('s' if timeout != 1 else '') + '.'
 551        )
 552
 553    def resume(
 554        self,
 555        timeout: Union[int, float, None] = None,
 556        check_timeout_interval: Union[float, int, None] = None,
 557    ) -> SuccessTuple:
 558        """
 559        Resume the daemon if it is paused.
 560
 561        Parameters
 562        ----------
 563        timeout: Union[float, int, None], default None
 564            The maximum number of seconds to wait for a process to resume.
 565
 566        check_timeout_interval: Union[float, int, None], default None
 567            The number of seconds to wait between checking if the process is still stopped.
 568
 569        Returns
 570        -------
 571        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 572        """
 573        if self.status == 'running':
 574            return True, f"Daemon '{self.daemon_id}' is already running."
 575
 576        if self.status == 'stopped':
 577            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 578
 579        self._remove_stop_file()
 580        try:
 581            self.process.resume()
 582        except Exception as e:
 583            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 584
 585        timeout = self.get_timeout_seconds(timeout)
 586        check_timeout_interval = self.get_check_timeout_interval_seconds(
 587            check_timeout_interval
 588        )
 589
 590        if not timeout:
 591            success = self.status == 'running'
 592            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 593            if success:
 594                self._capture_process_timestamp('began')
 595            return success, msg
 596
 597        begin = time.perf_counter()
 598        while (time.perf_counter() - begin) < timeout:
 599            if self.status == 'running':
 600                self._capture_process_timestamp('began')
 601                return True, "Success"
 602            time.sleep(check_timeout_interval)
 603
 604        return False, (
 605            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 606            + ('s' if timeout != 1 else '') + '.'
 607        )
 608
 609    def _write_stop_file(self, action: str) -> SuccessTuple:
 610        """Write the stop file timestamp and action."""
 611        if action not in ('quit', 'kill', 'pause'):
 612            return False, f"Unsupported action '{action}'."
 613
 614        if not self.stop_path.parent.exists():
 615            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 616
 617        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 618            json.dump(
 619                {
 620                    'stop_time': datetime.now(timezone.utc).isoformat(),
 621                    'action': action,
 622                },
 623                f
 624            )
 625
 626        return True, "Success"
 627
 628    def _remove_stop_file(self) -> SuccessTuple:
 629        """Remove the stop file"""
 630        if not self.stop_path.exists():
 631            return True, "Stop file does not exist."
 632
 633        try:
 634            self.stop_path.unlink()
 635        except Exception as e:
 636            return False, f"Failed to remove stop file:\n{e}"
 637
 638        return True, "Success"
 639
 640    def _read_stop_file(self) -> Dict[str, Any]:
 641        """
 642        Read the stop file if it exists.
 643        """
 644        if not self.stop_path.exists():
 645            return {}
 646
 647        try:
 648            with open(self.stop_path, 'r', encoding='utf-8') as f:
 649                data = json.load(f)
 650            return data
 651        except Exception:
 652            return {}
 653
 654    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 655        """
 656        Handle `SIGTERM` within the `Daemon` context.
 657        This method is injected into the `DaemonContext`.
 658        """
 659        from meerschaum.utils.process import signal_handler
 660        signal_handler(signal_number, stack_frame)
 661
 662        timer = self.__dict__.get('_log_refresh_timer', None)
 663        if timer is not None:
 664            timer.cancel()
 665
 666        daemon_context = self.__dict__.get('_daemon_context', None)
 667        if daemon_context is not None:
 668            daemon_context.close()
 669
 670        _close_pools()
 671        raise SystemExit(0)
 672
 673    def _send_signal(
 674            self,
 675            signal_to_send,
 676            timeout: Union[float, int, None] = None,
 677            check_timeout_interval: Union[float, int, None] = None,
 678        ) -> SuccessTuple:
 679        """Send a signal to the daemon process.
 680
 681        Parameters
 682        ----------
 683        signal_to_send:
 684            The signal the send to the daemon, e.g. `signals.SIGINT`.
 685
 686        timeout: Union[float, int, None], default None
 687            The maximum number of seconds to wait for a process to terminate.
 688
 689        check_timeout_interval: Union[float, int, None], default None
 690            The number of seconds to wait between checking if the process is still running.
 691
 692        Returns
 693        -------
 694        A SuccessTuple indicating success.
 695        """
 696        try:
 697            pid = self.pid
 698            if pid is None:
 699                return (
 700                    False,
 701                    f"Daemon '{self.daemon_id}' is not running, "
 702                    + f"cannot send signal '{signal_to_send}'."
 703                )
 704            
 705            os.kill(pid, signal_to_send)
 706        except Exception as e:
 707            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 708
 709        timeout = self.get_timeout_seconds(timeout)
 710        check_timeout_interval = self.get_check_timeout_interval_seconds(
 711            check_timeout_interval
 712        )
 713
 714        if not timeout:
 715            return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'."
 716
 717        begin = time.perf_counter()
 718        while (time.perf_counter() - begin) < timeout:
 719            if not self.status == 'running':
 720                return True, "Success"
 721            time.sleep(check_timeout_interval)
 722
 723        return False, (
 724            f"Failed to stop daemon '{self.daemon_id}' within {timeout} second"
 725            + ('s' if timeout != 1 else '') + '.'
 726        )
 727
 728    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 729        """Create the Daemon's directory.
 730        If `allow_dirty_run` is `False` and the directory already exists,
 731        raise a `FileExistsError`.
 732        """
 733        try:
 734            self.path.mkdir(parents=True, exist_ok=True)
 735            _already_exists = any(os.scandir(self.path))
 736        except FileExistsError:
 737            _already_exists = True
 738
 739        if _already_exists and not allow_dirty_run:
 740            error(
 741                f"Daemon '{self.daemon_id}' already exists. " +
 742                f"To allow this daemon to run, do one of the following:\n"
 743                + "  - Execute `daemon.cleanup()`.\n"
 744                + f"  - Delete the directory '{self.path}'.\n"
 745                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 746                FileExistsError,
 747            )
 748
 749    @property
 750    def process(self) -> Union['psutil.Process', None]:
 751        """
 752        Return the psutil process for the Daemon.
 753        """
 754        psutil = attempt_import('psutil')
 755        pid = self.pid
 756        if pid is None:
 757            return None
 758        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 759            try:
 760                self._process = psutil.Process(int(pid))
 761            except Exception as e:
 762                if self.pid_path.exists():
 763                    self.pid_path.unlink()
 764                return None
 765        return self._process
 766
 767    @property
 768    def status(self) -> str:
 769        """
 770        Return the running status of this Daemon.
 771        """
 772        if self.process is None:
 773            return 'stopped'
 774
 775        psutil = attempt_import('psutil')
 776        try:
 777            if self.process.status() == 'stopped':
 778                return 'paused'
 779            if self.process.status() == 'zombie':
 780                raise psutil.NoSuchProcess(self.process.pid)
 781        except (psutil.NoSuchProcess, AttributeError):
 782            if self.pid_path.exists():
 783                try:
 784                    self.pid_path.unlink()
 785                except Exception as e:
 786                    pass
 787            return 'stopped'
 788
 789        return 'running'
 790
 791    @classmethod
 792    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 793        """
 794        Return a Daemon's path from its `daemon_id`.
 795        """
 796        return DAEMON_RESOURCES_PATH / daemon_id
 797
 798    @property
 799    def path(self) -> pathlib.Path:
 800        """
 801        Return the path for this Daemon's directory.
 802        """
 803        return self._get_path_from_daemon_id(self.daemon_id)
 804
 805    @classmethod
 806    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 807        """
 808        Return the `properties.json` path for a given `daemon_id`.
 809        """
 810        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 811
 812    @property
 813    def properties_path(self) -> pathlib.Path:
 814        """
 815        Return the `propterties.json` path for this Daemon.
 816        """
 817        return self._get_properties_path_from_daemon_id(self.daemon_id)
 818
 819    @property
 820    def stop_path(self) -> pathlib.Path:
 821        """
 822        Return the path for the stop file (created when manually stopped).
 823        """
 824        return self.path / '.stop.json'
 825
 826    @property
 827    def log_path(self) -> pathlib.Path:
 828        """
 829        Return the log path.
 830        """
 831        return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 832
 833    @property
 834    def stdin_file_path(self) -> pathlib.Path:
 835        """
 836        Return the stdin file path.
 837        """
 838        return self.path / 'input.stdin'
 839
 840    @property
 841    def blocking_stdin_file_path(self) -> pathlib.Path:
 842        """
 843        Return the stdin file path.
 844        """
 845        if '_blocking_stdin_file_path' in self.__dict__:
 846            return self._blocking_stdin_file_path
 847
 848        return self.path / 'input.stdin.block'
 849
 850    @property
 851    def log_offset_path(self) -> pathlib.Path:
 852        """
 853        Return the log offset file path.
 854        """
 855        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 856
 857    @property
 858    def rotating_log(self) -> RotatingFile:
 859        """
 860        The rotating log file for the daemon's output.
 861        """
 862        if '_rotating_log' in self.__dict__:
 863            return self._rotating_log
 864
 865        write_timestamps = (
 866            self.properties.get('logs', {}).get('write_timestamps', None)
 867        )
 868        if write_timestamps is None:
 869            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 870
 871        self._rotating_log = RotatingFile(
 872            self.log_path,
 873            redirect_streams=True,
 874            write_timestamps=write_timestamps,
 875            timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'),
 876        )
 877        return self._rotating_log
 878
 879    @property
 880    def stdin_file(self):
 881        """
 882        Return the file handler for the stdin file.
 883        """
 884        if '_stdin_file' in self.__dict__:
 885            return self._stdin_file
 886
 887        self._stdin_file = StdinFile(
 888            self.stdin_file_path,
 889            lock_file_path=self.blocking_stdin_file_path,
 890        )
 891        return self._stdin_file
 892
 893    @property
 894    def log_text(self) -> Optional[str]:
 895        """
 896        Read the log files and return their contents.
 897        Returns `None` if the log file does not exist.
 898        """
 899        new_rotating_log = RotatingFile(
 900            self.rotating_log.file_path,
 901            num_files_to_keep = self.rotating_log.num_files_to_keep,
 902            max_file_size = self.rotating_log.max_file_size,
 903            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'),
 904            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'),
 905        )
 906        return new_rotating_log.read()
 907
 908    def readlines(self) -> List[str]:
 909        """
 910        Read the next log lines, persisting the cursor for later use.
 911        Note this will alter the cursor of `self.rotating_log`.
 912        """
 913        self.rotating_log._cursor = self._read_log_offset()
 914        lines = self.rotating_log.readlines()
 915        self._write_log_offset()
 916        return lines
 917
 918    def _read_log_offset(self) -> Tuple[int, int]:
 919        """
 920        Return the current log offset cursor.
 921
 922        Returns
 923        -------
 924        A tuple of the form (`subfile_index`, `position`).
 925        """
 926        if not self.log_offset_path.exists():
 927            return 0, 0
 928
 929        with open(self.log_offset_path, 'r', encoding='utf-8') as f:
 930            cursor_text = f.read()
 931        cursor_parts = cursor_text.split(' ')
 932        subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
 933        return subfile_index, subfile_position
 934
 935    def _write_log_offset(self) -> None:
 936        """
 937        Write the current log offset file.
 938        """
 939        with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
 940            subfile_index = self.rotating_log._cursor[0]
 941            subfile_position = self.rotating_log._cursor[1]
 942            f.write(f"{subfile_index} {subfile_position}")
 943
 944    @property
 945    def pid(self) -> Union[int, None]:
 946        """
 947        Read the PID file and return its contents.
 948        Returns `None` if the PID file does not exist.
 949        """
 950        if not self.pid_path.exists():
 951            return None
 952        try:
 953            with open(self.pid_path, 'r', encoding='utf-8') as f:
 954                text = f.read()
 955            if len(text) == 0:
 956                return None
 957            pid = int(text.rstrip())
 958        except Exception as e:
 959            warn(e)
 960            text = None
 961            pid = None
 962        return pid
 963
 964    @property
 965    def pid_path(self) -> pathlib.Path:
 966        """
 967        Return the path to a file containing the PID for this Daemon.
 968        """
 969        return self.path / 'process.pid'
 970
 971    @property
 972    def pid_lock(self) -> 'fasteners.InterProcessLock':
 973        """
 974        Return the process lock context manager.
 975        """
 976        if '_pid_lock' in self.__dict__:
 977            return self._pid_lock
 978
 979        fasteners = attempt_import('fasteners')
 980        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
 981        return self._pid_lock
 982
 983    @property
 984    def pickle_path(self) -> pathlib.Path:
 985        """
 986        Return the path for the pickle file.
 987        """
 988        return self.path / 'pickle.pkl'
 989
 990    def read_properties(self) -> Optional[Dict[str, Any]]:
 991        """Read the properties JSON file and return the dictionary."""
 992        if not self.properties_path.exists():
 993            return None
 994        try:
 995            with open(self.properties_path, 'r', encoding='utf-8') as file:
 996                properties = json.load(file)
 997        except Exception as e:
 998            properties = {}
 999        
1000        return properties
1001
1002    def read_pickle(self) -> Daemon:
1003        """Read a Daemon's pickle file and return the `Daemon`."""
1004        import pickle, traceback
1005        if not self.pickle_path.exists():
1006            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1007
1008        if self.pickle_path.stat().st_size == 0:
1009            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1010
1011        try:
1012            with open(self.pickle_path, 'rb') as pickle_file:
1013                daemon = pickle.load(pickle_file)
1014            success, msg = True, 'Success'
1015        except Exception as e:
1016            success, msg = False, str(e)
1017            daemon = None
1018            traceback.print_exception(type(e), e, e.__traceback__)
1019        if not success:
1020            error(msg)
1021        return daemon
1022
1023    @property
1024    def properties(self) -> Dict[str, Any]:
1025        """
1026        Return the contents of the properties JSON file.
1027        """
1028        try:
1029            _file_properties = self.read_properties()
1030        except Exception:
1031            traceback.print_exc()
1032            _file_properties = {}
1033
1034        if not self._properties:
1035            self._properties = _file_properties
1036
1037        if self._properties is None:
1038            self._properties = {}
1039
1040        if _file_properties is not None:
1041            self._properties = apply_patch_to_config(
1042                _file_properties,
1043                self._properties,
1044            )
1045
1046        return self._properties
1047
1048    @property
1049    def hidden(self) -> bool:
1050        """
1051        Return a bool indicating whether this Daemon should be displayed.
1052        """
1053        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1054
1055    def write_properties(self) -> SuccessTuple:
1056        """Write the properties dictionary to the properties JSON file
1057        (only if self.properties exists).
1058        """
1059        success, msg = (
1060            False,
1061            f"No properties to write for daemon '{self.daemon_id}'."
1062        )
1063        if self.properties is not None:
1064            try:
1065                self.path.mkdir(parents=True, exist_ok=True)
1066                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1067                    json.dump(self.properties, properties_file)
1068                success, msg = True, 'Success'
1069            except Exception as e:
1070                success, msg = False, str(e)
1071        return success, msg
1072
1073    def write_pickle(self) -> SuccessTuple:
1074        """Write the pickle file for the daemon."""
1075        import pickle, traceback
1076        try:
1077            self.path.mkdir(parents=True, exist_ok=True)
1078            with open(self.pickle_path, 'wb+') as pickle_file:
1079                pickle.dump(self, pickle_file)
1080            success, msg = True, "Success"
1081        except Exception as e:
1082            success, msg = False, str(e)
1083            traceback.print_exception(type(e), e, e.__traceback__)
1084        return success, msg
1085
1086
1087    def _setup(
1088        self,
1089        allow_dirty_run: bool = False,
1090    ) -> None:
1091        """
1092        Update properties before starting the Daemon.
1093        """
1094        if self.properties is None:
1095            self._properties = {}
1096
1097        self._properties.update({
1098            'target': {
1099                'name': self.target.__name__,
1100                'module': self.target.__module__,
1101                'args': self.target_args,
1102                'kw': self.target_kw,
1103            },
1104        })
1105        self.mkdir_if_not_exists(allow_dirty_run)
1106        _write_properties_success_tuple = self.write_properties()
1107        if not _write_properties_success_tuple[0]:
1108            error(_write_properties_success_tuple[1])
1109
1110        _write_pickle_success_tuple = self.write_pickle()
1111        if not _write_pickle_success_tuple[0]:
1112            error(_write_pickle_success_tuple[1])
1113
1114    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1115        """
1116        Remove a daemon's directory after execution.
1117
1118        Parameters
1119        ----------
1120        keep_logs: bool, default False
1121            If `True`, skip deleting the daemon's log files.
1122
1123        Returns
1124        -------
1125        A `SuccessTuple` indicating success.
1126        """
1127        if self.path.exists():
1128            try:
1129                shutil.rmtree(self.path)
1130            except Exception as e:
1131                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1132                warn(msg)
1133                return False, msg
1134        if not keep_logs:
1135            self.rotating_log.delete()
1136            try:
1137                if self.log_offset_path.exists():
1138                    self.log_offset_path.unlink()
1139            except Exception as e:
1140                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1141                warn(msg)
1142                return False, msg
1143        return True, "Success"
1144
1145
1146    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1147        """
1148        Return the timeout value to use. Use `--timeout-seconds` if provided,
1149        else the configured default (8).
1150        """
1151        if isinstance(timeout, (int, float)):
1152            return timeout
1153        return get_config('jobs', 'timeout_seconds')
1154
1155
1156    def get_check_timeout_interval_seconds(
1157        self,
1158        check_timeout_interval: Union[int, float, None] = None,
1159    ) -> Union[int, float]:
1160        """
1161        Return the interval value to check the status of timeouts.
1162        """
1163        if isinstance(check_timeout_interval, (int, float)):
1164            return check_timeout_interval
1165        return get_config('jobs', 'check_timeout_interval_seconds')
1166
1167    @property
1168    def target_args(self) -> Union[Tuple[Any], None]:
1169        """
1170        Return the positional arguments to pass to the target function.
1171        """
1172        target_args = (
1173            self.__dict__.get('_target_args', None)
1174            or self.properties.get('target', {}).get('args', None)
1175        )
1176        if target_args is None:
1177            return tuple([])
1178
1179        return tuple(target_args)
1180
1181    @property
1182    def target_kw(self) -> Union[Dict[str, Any], None]:
1183        """
1184        Return the keyword arguments to pass to the target function.
1185        """
1186        target_kw = (
1187            self.__dict__.get('_target_kw', None)
1188            or self.properties.get('target', {}).get('kw', None)
1189        )
1190        if target_kw is None:
1191            return {}
1192
1193        return {key: val for key, val in target_kw.items()}
1194
1195    def __getstate__(self):
1196        """
1197        Pickle this Daemon.
1198        """
1199        dill = attempt_import('dill')
1200        return {
1201            'target': dill.dumps(self.target),
1202            'target_args': self.target_args,
1203            'target_kw': self.target_kw,
1204            'daemon_id': self.daemon_id,
1205            'label': self.label,
1206            'properties': self.properties,
1207        }
1208
1209    def __setstate__(self, _state: Dict[str, Any]):
1210        """
1211        Restore this Daemon from a pickled state.
1212        If the properties file exists, skip the old pickled version.
1213        """
1214        dill = attempt_import('dill')
1215        _state['target'] = dill.loads(_state['target'])
1216        self._pickle = True
1217        daemon_id = _state.get('daemon_id', None)
1218        if not daemon_id:
1219            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1220
1221        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1222        ignore_properties = properties_path.exists()
1223        if ignore_properties:
1224            _state = {
1225                key: val
1226                for key, val in _state.items()
1227                if key != 'properties'
1228            }
1229        self.__init__(**_state)
1230
1231
1232    def __repr__(self):
1233        return str(self)
1234
1235    def __str__(self):
1236        return self.daemon_id
1237
1238    def __eq__(self, other):
1239        if not isinstance(other, Daemon):
1240            return False
1241        return self.daemon_id == other.daemon_id
1242
1243    def __hash__(self):
1244        return hash(self.daemon_id)
class Daemon:
  46class Daemon:
  47    """
  48    Daemonize Python functions into background processes.
  49
  50    Examples
  51    --------
  52    >>> import meerschaum as mrsm
  53    >>> from meerschaum.utils.daemons import Daemon
  54    >>> daemon = Daemon(print, ('hi',))
  55    >>> success, msg = daemon.run()
  56    >>> print(daemon.log_text)
  57
  58    2024-07-29 18:03 | hi
  59    2024-07-29 18:03 |
  60    >>> daemon.run(allow_dirty_run=True)
  61    >>> print(daemon.log_text)
  62
  63    2024-07-29 18:03 | hi
  64    2024-07-29 18:03 |
  65    2024-07-29 18:05 | hi
  66    2024-07-29 18:05 |
  67    >>> mrsm.pprint(daemon.properties)
  68    {
  69        'label': 'print',
  70        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  71        'result': None,
  72        'process': {'ended': '2024-07-29T18:03:33.752806'}
  73    }
  74
  75    """
  76
  77    def __new__(
  78        cls,
  79        *args,
  80        daemon_id: Optional[str] = None,
  81        **kw
  82    ):
  83        """
  84        If a daemon_id is provided and already exists, read from its pickle file.
  85        """
  86        instance = super(Daemon, cls).__new__(cls)
  87        if daemon_id is not None:
  88            instance.daemon_id = daemon_id
  89            if instance.pickle_path.exists():
  90                instance = instance.read_pickle()
  91        return instance
  92
  93    @classmethod
  94    def from_properties_file(cls, daemon_id: str) -> Daemon:
  95        """
  96        Return a Daemon from a properties dictionary.
  97        """
  98        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  99        if not properties_path.exists():
 100            raise OSError(f"Properties file '{properties_path}' does not exist.")
 101
 102        try:
 103            with open(properties_path, 'r', encoding='utf-8') as f:
 104                properties = json.load(f)
 105        except Exception:
 106            properties = {}
 107
 108        if not properties:
 109            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 110
 111        daemon_id = properties_path.parent.name
 112        target_cf = properties.get('target', {})
 113        target_module_name = target_cf.get('module', None)
 114        target_function_name = target_cf.get('name', None)
 115        target_args = target_cf.get('args', None)
 116        target_kw = target_cf.get('kw', None)
 117        label = properties.get('label', None)
 118
 119        if None in [
 120            target_module_name,
 121            target_function_name,
 122            target_args,
 123            target_kw,
 124        ]:
 125            raise ValueError("Missing target function information.")
 126
 127        target_module = importlib.import_module(target_module_name)
 128        target_function = getattr(target_module, target_function_name)
 129
 130        return Daemon(
 131            daemon_id=daemon_id,
 132            target=target_function,
 133            target_args=target_args,
 134            target_kw=target_kw,
 135            properties=properties,
 136            label=label,
 137        )
 138
 139
 140    def __init__(
 141        self,
 142        target: Optional[Callable[[Any], Any]] = None,
 143        target_args: Union[List[Any], Tuple[Any], None] = None,
 144        target_kw: Optional[Dict[str, Any]] = None,
 145        env: Optional[Dict[str, str]] = None,
 146        daemon_id: Optional[str] = None,
 147        label: Optional[str] = None,
 148        properties: Optional[Dict[str, Any]] = None,
 149    ):
 150        """
 151        Parameters
 152        ----------
 153        target: Optional[Callable[[Any], Any]], default None,
 154            The function to execute in a child process.
 155
 156        target_args: Union[List[Any], Tuple[Any], None], default None
 157            Positional arguments to pass to the target function.
 158
 159        target_kw: Optional[Dict[str, Any]], default None
 160            Keyword arguments to pass to the target function.
 161
 162        env: Optional[Dict[str, str]], default None
 163            If provided, set these environment variables in the daemon process.
 164
 165        daemon_id: Optional[str], default None
 166            Build a `Daemon` from an existing `daemon_id`.
 167            If `daemon_id` is provided, other arguments are ignored and are derived
 168            from the existing pickled `Daemon`.
 169
 170        label: Optional[str], default None
 171            Label string to help identifiy a daemon.
 172            If `None`, use the function name instead.
 173
 174        properties: Optional[Dict[str, Any]], default None
 175            Override reading from the properties JSON by providing an existing dictionary.
 176        """
 177        _pickle = self.__dict__.get('_pickle', False)
 178        if daemon_id is not None:
 179            self.daemon_id = daemon_id
 180            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 181
 182                if not self.properties_path.exists():
 183                    raise Exception(
 184                        f"Daemon '{self.daemon_id}' does not exist. "
 185                        + "Pass a target to create a new Daemon."
 186                    )
 187
 188                try:
 189                    new_daemon = self.from_properties_file(daemon_id)
 190                except Exception:
 191                    new_daemon = None
 192
 193                if new_daemon is not None:
 194                    new_daemon.write_pickle()
 195                    target = new_daemon.target
 196                    target_args = new_daemon.target_args
 197                    target_kw = new_daemon.target_kw
 198                    label = new_daemon.label
 199                    self._properties = new_daemon.properties
 200                else:
 201                    try:
 202                        self.properties_path.unlink()
 203                    except Exception:
 204                        pass
 205
 206                    raise Exception(
 207                        f"Could not recover daemon '{self.daemon_id}' "
 208                        + "from its properties file."
 209                    )
 210
 211        if 'target' not in self.__dict__:
 212            if target is None:
 213                error("Cannot create a Daemon without a target.")
 214            self.target = target
 215
 216        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 217        if '_target_args' not in self.__dict__:
 218            self._target_args = target_args
 219        if '_target_kw' not in self.__dict__:
 220            self._target_kw = target_kw
 221
 222        if 'label' not in self.__dict__:
 223            if label is None:
 224                label = (
 225                    self.target.__name__ if '__name__' in self.target.__dir__()
 226                        else str(self.target)
 227                )
 228            self.label = label
 229        if 'daemon_id' not in self.__dict__:
 230            self.daemon_id = get_new_daemon_name()
 231        if '_properties' not in self.__dict__:
 232            self._properties = properties
 233        if self._properties is None:
 234            self._properties = {}
 235
 236        self._properties.update({'label': self.label})
 237        if env:
 238            self._properties.update({'env': env})
 239
 240        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 241        _ = self.process
 242
 243
 244    def _run_exit(
 245        self,
 246        keep_daemon_output: bool = True,
 247        allow_dirty_run: bool = False,
 248    ) -> Any:
 249        """Run the daemon's target function.
 250        NOTE: This WILL EXIT the parent process!
 251
 252        Parameters
 253        ----------
 254        keep_daemon_output: bool, default True
 255            If `False`, delete the daemon's output directory upon exiting.
 256
 257        allow_dirty_run, bool, default False:
 258            If `True`, run the daemon, even if the `daemon_id` directory exists.
 259            This option is dangerous because if the same `daemon_id` runs twice,
 260            the last to finish will overwrite the output of the first.
 261
 262        Returns
 263        -------
 264        Nothing — this will exit the parent process.
 265        """
 266        import platform, sys, os, traceback
 267        from meerschaum.utils.warnings import warn
 268        from meerschaum.config import get_config
 269        daemon = attempt_import('daemon')
 270        lines = get_config('jobs', 'terminal', 'lines')
 271        columns = get_config('jobs', 'terminal', 'columns')
 272
 273        if platform.system() == 'Windows':
 274            return False, "Windows is no longer supported."
 275
 276        self._setup(allow_dirty_run)
 277
 278        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 279        ###       KeyboardInterrupts themselves.
 280        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 281        self._daemon_context = daemon.DaemonContext(
 282            pidfile=self.pid_lock,
 283            stdout=self.rotating_log,
 284            stderr=self.rotating_log,
 285            working_directory=os.getcwd(),
 286            detach_process=True,
 287            files_preserve=list(self.rotating_log.subfile_objects.values()),
 288            signal_map={
 289                signal.SIGTERM: self._handle_sigterm,
 290            },
 291        )
 292
 293        _daemons.append(self)
 294
 295        log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 296        self._log_refresh_timer = RepeatTimer(
 297            log_refresh_seconds,
 298            partial(self.rotating_log.refresh_files, start_interception=True),
 299        )
 300
 301        try:
 302            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 303            with self._daemon_context:
 304                sys.stdin = self.stdin_file
 305                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 306                os.environ['PYTHONUNBUFFERED'] = '1'
 307
 308                ### Allow the user to override environment variables.
 309                env = self.properties.get('env', {})
 310                if env and isinstance(env, dict):
 311                    os.environ.update({str(k): str(v) for k, v in env.items()})
 312
 313                self.rotating_log.refresh_files(start_interception=True)
 314                result = None
 315                try:
 316                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 317                        f.write(str(os.getpid()))
 318
 319                    self._log_refresh_timer.start()
 320                    self.properties['result'] = None
 321                    self._capture_process_timestamp('began')
 322                    result = self.target(*self.target_args, **self.target_kw)
 323                    self.properties['result'] = result
 324                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 325                    pass
 326                except Exception as e:
 327                    warn(
 328                        f"Exception in daemon target function: {traceback.format_exc()}",
 329                    )
 330                    result = e
 331                finally:
 332                    _results[self.daemon_id] = result
 333
 334                    if keep_daemon_output:
 335                        self._capture_process_timestamp('ended')
 336                    else:
 337                        self.cleanup()
 338
 339                    self._log_refresh_timer.cancel()
 340                    if self.pid is None and self.pid_path.exists():
 341                        self.pid_path.unlink()
 342
 343                    if is_success_tuple(result):
 344                        try:
 345                            mrsm.pprint(result)
 346                        except BrokenPipeError:
 347                            pass
 348
 349        except Exception as e:
 350            daemon_error = traceback.format_exc()
 351            with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 352                f.write(daemon_error)
 353            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 354
 355    def _capture_process_timestamp(
 356        self,
 357        process_key: str,
 358        write_properties: bool = True,
 359    ) -> None:
 360        """
 361        Record the current timestamp to the parameters `process:<process_key>`.
 362
 363        Parameters
 364        ----------
 365        process_key: str
 366            Under which key to store the timestamp.
 367
 368        write_properties: bool, default True
 369            If `True` persist the properties to disk immediately after capturing the timestamp.
 370        """
 371        if 'process' not in self.properties:
 372            self.properties['process'] = {}
 373
 374        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 375            raise ValueError(f"Invalid key '{process_key}'.")
 376
 377        self.properties['process'][process_key] = (
 378            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 379        )
 380        if write_properties:
 381            self.write_properties()
 382
 383    def run(
 384        self,
 385        keep_daemon_output: bool = True,
 386        allow_dirty_run: bool = False,
 387        debug: bool = False,
 388    ) -> SuccessTuple:
 389        """Run the daemon as a child process and continue executing the parent.
 390
 391        Parameters
 392        ----------
 393        keep_daemon_output: bool, default True
 394            If `False`, delete the daemon's output directory upon exiting.
 395
 396        allow_dirty_run: bool, default False
 397            If `True`, run the daemon, even if the `daemon_id` directory exists.
 398            This option is dangerous because if the same `daemon_id` runs concurrently,
 399            the last to finish will overwrite the output of the first.
 400
 401        Returns
 402        -------
 403        A SuccessTuple indicating success.
 404
 405        """
 406        import platform
 407        if platform.system() == 'Windows':
 408            return False, "Cannot run background jobs on Windows."
 409
 410        ### The daemon might exist and be paused.
 411        if self.status == 'paused':
 412            return self.resume()
 413
 414        self._remove_stop_file()
 415        if self.status == 'running':
 416            return True, f"Daemon '{self}' is already running."
 417
 418        self.mkdir_if_not_exists(allow_dirty_run)
 419        _write_pickle_success_tuple = self.write_pickle()
 420        if not _write_pickle_success_tuple[0]:
 421            return _write_pickle_success_tuple
 422
 423        _launch_daemon_code = (
 424            "from meerschaum.utils.daemon import Daemon; "
 425            + f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 426            + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 427            + "allow_dirty_run=True)"
 428        )
 429        env = dict(os.environ)
 430        env['MRSM_NOASK'] = 'true'
 431        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 432        msg = (
 433            "Success"
 434            if _launch_success_bool
 435            else f"Failed to start daemon '{self.daemon_id}'."
 436        )
 437        return _launch_success_bool, msg
 438
 439    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 440        """
 441        Forcibly terminate a running daemon.
 442        Sends a SIGTERM signal to the process.
 443
 444        Parameters
 445        ----------
 446        timeout: Optional[int], default 3
 447            How many seconds to wait for the process to terminate.
 448
 449        Returns
 450        -------
 451        A SuccessTuple indicating success.
 452        """
 453        if self.status != 'paused':
 454            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 455            if success:
 456                self._write_stop_file('kill')
 457                return success, msg
 458
 459        if self.status == 'stopped':
 460            self._write_stop_file('kill')
 461            return True, "Process has already stopped."
 462
 463        process = self.process
 464        try:
 465            process.terminate()
 466            process.kill()
 467            process.wait(timeout=timeout)
 468        except Exception as e:
 469            return False, f"Failed to kill job {self} with exception: {e}"
 470
 471        if self.pid_path.exists():
 472            try:
 473                self.pid_path.unlink()
 474            except Exception as e:
 475                pass
 476
 477        self._write_stop_file('kill')
 478        return True, "Success"
 479
 480    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 481        """Gracefully quit a running daemon."""
 482        if self.status == 'paused':
 483            return self.kill(timeout)
 484
 485        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 486        if signal_success:
 487            self._write_stop_file('quit')
 488        return signal_success, signal_msg
 489
 490    def pause(
 491        self,
 492        timeout: Union[int, float, None] = None,
 493        check_timeout_interval: Union[float, int, None] = None,
 494    ) -> SuccessTuple:
 495        """
 496        Pause the daemon if it is running.
 497
 498        Parameters
 499        ----------
 500        timeout: Union[float, int, None], default None
 501            The maximum number of seconds to wait for a process to suspend.
 502
 503        check_timeout_interval: Union[float, int, None], default None
 504            The number of seconds to wait between checking if the process is still running.
 505
 506        Returns
 507        -------
 508        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 509        """
 510        if self.process is None:
 511            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 512
 513        if self.status == 'paused':
 514            return True, f"Daemon '{self.daemon_id}' is already paused."
 515
 516        self._write_stop_file('pause')
 517        try:
 518            self.process.suspend()
 519        except Exception as e:
 520            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 521
 522        timeout = self.get_timeout_seconds(timeout)
 523        check_timeout_interval = self.get_check_timeout_interval_seconds(
 524            check_timeout_interval
 525        )
 526
 527        psutil = attempt_import('psutil')
 528
 529        if not timeout:
 530            try:
 531                success = self.process.status() == 'stopped'
 532            except psutil.NoSuchProcess as e:
 533                success = True
 534            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 535            if success:
 536                self._capture_process_timestamp('paused')
 537            return success, msg
 538
 539        begin = time.perf_counter()
 540        while (time.perf_counter() - begin) < timeout:
 541            try:
 542                if self.process.status() == 'stopped':
 543                    self._capture_process_timestamp('paused')
 544                    return True, "Success"
 545            except psutil.NoSuchProcess as e:
 546                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 547            time.sleep(check_timeout_interval)
 548
 549        return False, (
 550            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 551            + ('s' if timeout != 1 else '') + '.'
 552        )
 553
 554    def resume(
 555        self,
 556        timeout: Union[int, float, None] = None,
 557        check_timeout_interval: Union[float, int, None] = None,
 558    ) -> SuccessTuple:
 559        """
 560        Resume the daemon if it is paused.
 561
 562        Parameters
 563        ----------
 564        timeout: Union[float, int, None], default None
 565            The maximum number of seconds to wait for a process to resume.
 566
 567        check_timeout_interval: Union[float, int, None], default None
 568            The number of seconds to wait between checking if the process is still stopped.
 569
 570        Returns
 571        -------
 572        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 573        """
 574        if self.status == 'running':
 575            return True, f"Daemon '{self.daemon_id}' is already running."
 576
 577        if self.status == 'stopped':
 578            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 579
 580        self._remove_stop_file()
 581        try:
 582            self.process.resume()
 583        except Exception as e:
 584            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 585
 586        timeout = self.get_timeout_seconds(timeout)
 587        check_timeout_interval = self.get_check_timeout_interval_seconds(
 588            check_timeout_interval
 589        )
 590
 591        if not timeout:
 592            success = self.status == 'running'
 593            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 594            if success:
 595                self._capture_process_timestamp('began')
 596            return success, msg
 597
 598        begin = time.perf_counter()
 599        while (time.perf_counter() - begin) < timeout:
 600            if self.status == 'running':
 601                self._capture_process_timestamp('began')
 602                return True, "Success"
 603            time.sleep(check_timeout_interval)
 604
 605        return False, (
 606            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 607            + ('s' if timeout != 1 else '') + '.'
 608        )
 609
 610    def _write_stop_file(self, action: str) -> SuccessTuple:
 611        """Write the stop file timestamp and action."""
 612        if action not in ('quit', 'kill', 'pause'):
 613            return False, f"Unsupported action '{action}'."
 614
 615        if not self.stop_path.parent.exists():
 616            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 617
 618        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 619            json.dump(
 620                {
 621                    'stop_time': datetime.now(timezone.utc).isoformat(),
 622                    'action': action,
 623                },
 624                f
 625            )
 626
 627        return True, "Success"
 628
 629    def _remove_stop_file(self) -> SuccessTuple:
 630        """Remove the stop file"""
 631        if not self.stop_path.exists():
 632            return True, "Stop file does not exist."
 633
 634        try:
 635            self.stop_path.unlink()
 636        except Exception as e:
 637            return False, f"Failed to remove stop file:\n{e}"
 638
 639        return True, "Success"
 640
 641    def _read_stop_file(self) -> Dict[str, Any]:
 642        """
 643        Read the stop file if it exists.
 644        """
 645        if not self.stop_path.exists():
 646            return {}
 647
 648        try:
 649            with open(self.stop_path, 'r', encoding='utf-8') as f:
 650                data = json.load(f)
 651            return data
 652        except Exception:
 653            return {}
 654
 655    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 656        """
 657        Handle `SIGTERM` within the `Daemon` context.
 658        This method is injected into the `DaemonContext`.
 659        """
 660        from meerschaum.utils.process import signal_handler
 661        signal_handler(signal_number, stack_frame)
 662
 663        timer = self.__dict__.get('_log_refresh_timer', None)
 664        if timer is not None:
 665            timer.cancel()
 666
 667        daemon_context = self.__dict__.get('_daemon_context', None)
 668        if daemon_context is not None:
 669            daemon_context.close()
 670
 671        _close_pools()
 672        raise SystemExit(0)
 673
 674    def _send_signal(
 675            self,
 676            signal_to_send,
 677            timeout: Union[float, int, None] = None,
 678            check_timeout_interval: Union[float, int, None] = None,
 679        ) -> SuccessTuple:
 680        """Send a signal to the daemon process.
 681
 682        Parameters
 683        ----------
 684        signal_to_send:
 685            The signal the send to the daemon, e.g. `signals.SIGINT`.
 686
 687        timeout: Union[float, int, None], default None
 688            The maximum number of seconds to wait for a process to terminate.
 689
 690        check_timeout_interval: Union[float, int, None], default None
 691            The number of seconds to wait between checking if the process is still running.
 692
 693        Returns
 694        -------
 695        A SuccessTuple indicating success.
 696        """
 697        try:
 698            pid = self.pid
 699            if pid is None:
 700                return (
 701                    False,
 702                    f"Daemon '{self.daemon_id}' is not running, "
 703                    + f"cannot send signal '{signal_to_send}'."
 704                )
 705            
 706            os.kill(pid, signal_to_send)
 707        except Exception as e:
 708            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 709
 710        timeout = self.get_timeout_seconds(timeout)
 711        check_timeout_interval = self.get_check_timeout_interval_seconds(
 712            check_timeout_interval
 713        )
 714
 715        if not timeout:
 716            return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'."
 717
 718        begin = time.perf_counter()
 719        while (time.perf_counter() - begin) < timeout:
 720            if not self.status == 'running':
 721                return True, "Success"
 722            time.sleep(check_timeout_interval)
 723
 724        return False, (
 725            f"Failed to stop daemon '{self.daemon_id}' within {timeout} second"
 726            + ('s' if timeout != 1 else '') + '.'
 727        )
 728
 729    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 730        """Create the Daemon's directory.
 731        If `allow_dirty_run` is `False` and the directory already exists,
 732        raise a `FileExistsError`.
 733        """
 734        try:
 735            self.path.mkdir(parents=True, exist_ok=True)
 736            _already_exists = any(os.scandir(self.path))
 737        except FileExistsError:
 738            _already_exists = True
 739
 740        if _already_exists and not allow_dirty_run:
 741            error(
 742                f"Daemon '{self.daemon_id}' already exists. " +
 743                f"To allow this daemon to run, do one of the following:\n"
 744                + "  - Execute `daemon.cleanup()`.\n"
 745                + f"  - Delete the directory '{self.path}'.\n"
 746                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 747                FileExistsError,
 748            )
 749
 750    @property
 751    def process(self) -> Union['psutil.Process', None]:
 752        """
 753        Return the psutil process for the Daemon.
 754        """
 755        psutil = attempt_import('psutil')
 756        pid = self.pid
 757        if pid is None:
 758            return None
 759        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 760            try:
 761                self._process = psutil.Process(int(pid))
 762            except Exception as e:
 763                if self.pid_path.exists():
 764                    self.pid_path.unlink()
 765                return None
 766        return self._process
 767
 768    @property
 769    def status(self) -> str:
 770        """
 771        Return the running status of this Daemon.
 772        """
 773        if self.process is None:
 774            return 'stopped'
 775
 776        psutil = attempt_import('psutil')
 777        try:
 778            if self.process.status() == 'stopped':
 779                return 'paused'
 780            if self.process.status() == 'zombie':
 781                raise psutil.NoSuchProcess(self.process.pid)
 782        except (psutil.NoSuchProcess, AttributeError):
 783            if self.pid_path.exists():
 784                try:
 785                    self.pid_path.unlink()
 786                except Exception as e:
 787                    pass
 788            return 'stopped'
 789
 790        return 'running'
 791
 792    @classmethod
 793    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 794        """
 795        Return a Daemon's path from its `daemon_id`.
 796        """
 797        return DAEMON_RESOURCES_PATH / daemon_id
 798
 799    @property
 800    def path(self) -> pathlib.Path:
 801        """
 802        Return the path for this Daemon's directory.
 803        """
 804        return self._get_path_from_daemon_id(self.daemon_id)
 805
 806    @classmethod
 807    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 808        """
 809        Return the `properties.json` path for a given `daemon_id`.
 810        """
 811        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 812
 813    @property
 814    def properties_path(self) -> pathlib.Path:
 815        """
 816        Return the `propterties.json` path for this Daemon.
 817        """
 818        return self._get_properties_path_from_daemon_id(self.daemon_id)
 819
 820    @property
 821    def stop_path(self) -> pathlib.Path:
 822        """
 823        Return the path for the stop file (created when manually stopped).
 824        """
 825        return self.path / '.stop.json'
 826
 827    @property
 828    def log_path(self) -> pathlib.Path:
 829        """
 830        Return the log path.
 831        """
 832        return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 833
 834    @property
 835    def stdin_file_path(self) -> pathlib.Path:
 836        """
 837        Return the stdin file path.
 838        """
 839        return self.path / 'input.stdin'
 840
 841    @property
 842    def blocking_stdin_file_path(self) -> pathlib.Path:
 843        """
 844        Return the stdin file path.
 845        """
 846        if '_blocking_stdin_file_path' in self.__dict__:
 847            return self._blocking_stdin_file_path
 848
 849        return self.path / 'input.stdin.block'
 850
 851    @property
 852    def log_offset_path(self) -> pathlib.Path:
 853        """
 854        Return the log offset file path.
 855        """
 856        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 857
 858    @property
 859    def rotating_log(self) -> RotatingFile:
 860        """
 861        The rotating log file for the daemon's output.
 862        """
 863        if '_rotating_log' in self.__dict__:
 864            return self._rotating_log
 865
 866        write_timestamps = (
 867            self.properties.get('logs', {}).get('write_timestamps', None)
 868        )
 869        if write_timestamps is None:
 870            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 871
 872        self._rotating_log = RotatingFile(
 873            self.log_path,
 874            redirect_streams=True,
 875            write_timestamps=write_timestamps,
 876            timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'),
 877        )
 878        return self._rotating_log
 879
 880    @property
 881    def stdin_file(self):
 882        """
 883        Return the file handler for the stdin file.
 884        """
 885        if '_stdin_file' in self.__dict__:
 886            return self._stdin_file
 887
 888        self._stdin_file = StdinFile(
 889            self.stdin_file_path,
 890            lock_file_path=self.blocking_stdin_file_path,
 891        )
 892        return self._stdin_file
 893
 894    @property
 895    def log_text(self) -> Optional[str]:
 896        """
 897        Read the log files and return their contents.
 898        Returns `None` if the log file does not exist.
 899        """
 900        new_rotating_log = RotatingFile(
 901            self.rotating_log.file_path,
 902            num_files_to_keep = self.rotating_log.num_files_to_keep,
 903            max_file_size = self.rotating_log.max_file_size,
 904            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'),
 905            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'),
 906        )
 907        return new_rotating_log.read()
 908
 909    def readlines(self) -> List[str]:
 910        """
 911        Read the next log lines, persisting the cursor for later use.
 912        Note this will alter the cursor of `self.rotating_log`.
 913        """
 914        self.rotating_log._cursor = self._read_log_offset()
 915        lines = self.rotating_log.readlines()
 916        self._write_log_offset()
 917        return lines
 918
 919    def _read_log_offset(self) -> Tuple[int, int]:
 920        """
 921        Return the current log offset cursor.
 922
 923        Returns
 924        -------
 925        A tuple of the form (`subfile_index`, `position`).
 926        """
 927        if not self.log_offset_path.exists():
 928            return 0, 0
 929
 930        with open(self.log_offset_path, 'r', encoding='utf-8') as f:
 931            cursor_text = f.read()
 932        cursor_parts = cursor_text.split(' ')
 933        subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
 934        return subfile_index, subfile_position
 935
 936    def _write_log_offset(self) -> None:
 937        """
 938        Write the current log offset file.
 939        """
 940        with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
 941            subfile_index = self.rotating_log._cursor[0]
 942            subfile_position = self.rotating_log._cursor[1]
 943            f.write(f"{subfile_index} {subfile_position}")
 944
 945    @property
 946    def pid(self) -> Union[int, None]:
 947        """
 948        Read the PID file and return its contents.
 949        Returns `None` if the PID file does not exist.
 950        """
 951        if not self.pid_path.exists():
 952            return None
 953        try:
 954            with open(self.pid_path, 'r', encoding='utf-8') as f:
 955                text = f.read()
 956            if len(text) == 0:
 957                return None
 958            pid = int(text.rstrip())
 959        except Exception as e:
 960            warn(e)
 961            text = None
 962            pid = None
 963        return pid
 964
 965    @property
 966    def pid_path(self) -> pathlib.Path:
 967        """
 968        Return the path to a file containing the PID for this Daemon.
 969        """
 970        return self.path / 'process.pid'
 971
 972    @property
 973    def pid_lock(self) -> 'fasteners.InterProcessLock':
 974        """
 975        Return the process lock context manager.
 976        """
 977        if '_pid_lock' in self.__dict__:
 978            return self._pid_lock
 979
 980        fasteners = attempt_import('fasteners')
 981        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
 982        return self._pid_lock
 983
 984    @property
 985    def pickle_path(self) -> pathlib.Path:
 986        """
 987        Return the path for the pickle file.
 988        """
 989        return self.path / 'pickle.pkl'
 990
 991    def read_properties(self) -> Optional[Dict[str, Any]]:
 992        """Read the properties JSON file and return the dictionary."""
 993        if not self.properties_path.exists():
 994            return None
 995        try:
 996            with open(self.properties_path, 'r', encoding='utf-8') as file:
 997                properties = json.load(file)
 998        except Exception as e:
 999            properties = {}
1000        
1001        return properties
1002
1003    def read_pickle(self) -> Daemon:
1004        """Read a Daemon's pickle file and return the `Daemon`."""
1005        import pickle, traceback
1006        if not self.pickle_path.exists():
1007            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1008
1009        if self.pickle_path.stat().st_size == 0:
1010            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1011
1012        try:
1013            with open(self.pickle_path, 'rb') as pickle_file:
1014                daemon = pickle.load(pickle_file)
1015            success, msg = True, 'Success'
1016        except Exception as e:
1017            success, msg = False, str(e)
1018            daemon = None
1019            traceback.print_exception(type(e), e, e.__traceback__)
1020        if not success:
1021            error(msg)
1022        return daemon
1023
1024    @property
1025    def properties(self) -> Dict[str, Any]:
1026        """
1027        Return the contents of the properties JSON file.
1028        """
1029        try:
1030            _file_properties = self.read_properties()
1031        except Exception:
1032            traceback.print_exc()
1033            _file_properties = {}
1034
1035        if not self._properties:
1036            self._properties = _file_properties
1037
1038        if self._properties is None:
1039            self._properties = {}
1040
1041        if _file_properties is not None:
1042            self._properties = apply_patch_to_config(
1043                _file_properties,
1044                self._properties,
1045            )
1046
1047        return self._properties
1048
1049    @property
1050    def hidden(self) -> bool:
1051        """
1052        Return a bool indicating whether this Daemon should be displayed.
1053        """
1054        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1055
1056    def write_properties(self) -> SuccessTuple:
1057        """Write the properties dictionary to the properties JSON file
1058        (only if self.properties exists).
1059        """
1060        success, msg = (
1061            False,
1062            f"No properties to write for daemon '{self.daemon_id}'."
1063        )
1064        if self.properties is not None:
1065            try:
1066                self.path.mkdir(parents=True, exist_ok=True)
1067                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1068                    json.dump(self.properties, properties_file)
1069                success, msg = True, 'Success'
1070            except Exception as e:
1071                success, msg = False, str(e)
1072        return success, msg
1073
1074    def write_pickle(self) -> SuccessTuple:
1075        """Write the pickle file for the daemon."""
1076        import pickle, traceback
1077        try:
1078            self.path.mkdir(parents=True, exist_ok=True)
1079            with open(self.pickle_path, 'wb+') as pickle_file:
1080                pickle.dump(self, pickle_file)
1081            success, msg = True, "Success"
1082        except Exception as e:
1083            success, msg = False, str(e)
1084            traceback.print_exception(type(e), e, e.__traceback__)
1085        return success, msg
1086
1087
1088    def _setup(
1089        self,
1090        allow_dirty_run: bool = False,
1091    ) -> None:
1092        """
1093        Update properties before starting the Daemon.
1094        """
1095        if self.properties is None:
1096            self._properties = {}
1097
1098        self._properties.update({
1099            'target': {
1100                'name': self.target.__name__,
1101                'module': self.target.__module__,
1102                'args': self.target_args,
1103                'kw': self.target_kw,
1104            },
1105        })
1106        self.mkdir_if_not_exists(allow_dirty_run)
1107        _write_properties_success_tuple = self.write_properties()
1108        if not _write_properties_success_tuple[0]:
1109            error(_write_properties_success_tuple[1])
1110
1111        _write_pickle_success_tuple = self.write_pickle()
1112        if not _write_pickle_success_tuple[0]:
1113            error(_write_pickle_success_tuple[1])
1114
1115    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1116        """
1117        Remove a daemon's directory after execution.
1118
1119        Parameters
1120        ----------
1121        keep_logs: bool, default False
1122            If `True`, skip deleting the daemon's log files.
1123
1124        Returns
1125        -------
1126        A `SuccessTuple` indicating success.
1127        """
1128        if self.path.exists():
1129            try:
1130                shutil.rmtree(self.path)
1131            except Exception as e:
1132                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1133                warn(msg)
1134                return False, msg
1135        if not keep_logs:
1136            self.rotating_log.delete()
1137            try:
1138                if self.log_offset_path.exists():
1139                    self.log_offset_path.unlink()
1140            except Exception as e:
1141                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1142                warn(msg)
1143                return False, msg
1144        return True, "Success"
1145
1146
1147    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1148        """
1149        Return the timeout value to use. Use `--timeout-seconds` if provided,
1150        else the configured default (8).
1151        """
1152        if isinstance(timeout, (int, float)):
1153            return timeout
1154        return get_config('jobs', 'timeout_seconds')
1155
1156
1157    def get_check_timeout_interval_seconds(
1158        self,
1159        check_timeout_interval: Union[int, float, None] = None,
1160    ) -> Union[int, float]:
1161        """
1162        Return the interval value to check the status of timeouts.
1163        """
1164        if isinstance(check_timeout_interval, (int, float)):
1165            return check_timeout_interval
1166        return get_config('jobs', 'check_timeout_interval_seconds')
1167
1168    @property
1169    def target_args(self) -> Union[Tuple[Any], None]:
1170        """
1171        Return the positional arguments to pass to the target function.
1172        """
1173        target_args = (
1174            self.__dict__.get('_target_args', None)
1175            or self.properties.get('target', {}).get('args', None)
1176        )
1177        if target_args is None:
1178            return tuple([])
1179
1180        return tuple(target_args)
1181
1182    @property
1183    def target_kw(self) -> Union[Dict[str, Any], None]:
1184        """
1185        Return the keyword arguments to pass to the target function.
1186        """
1187        target_kw = (
1188            self.__dict__.get('_target_kw', None)
1189            or self.properties.get('target', {}).get('kw', None)
1190        )
1191        if target_kw is None:
1192            return {}
1193
1194        return {key: val for key, val in target_kw.items()}
1195
1196    def __getstate__(self):
1197        """
1198        Pickle this Daemon.
1199        """
1200        dill = attempt_import('dill')
1201        return {
1202            'target': dill.dumps(self.target),
1203            'target_args': self.target_args,
1204            'target_kw': self.target_kw,
1205            'daemon_id': self.daemon_id,
1206            'label': self.label,
1207            'properties': self.properties,
1208        }
1209
1210    def __setstate__(self, _state: Dict[str, Any]):
1211        """
1212        Restore this Daemon from a pickled state.
1213        If the properties file exists, skip the old pickled version.
1214        """
1215        dill = attempt_import('dill')
1216        _state['target'] = dill.loads(_state['target'])
1217        self._pickle = True
1218        daemon_id = _state.get('daemon_id', None)
1219        if not daemon_id:
1220            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1221
1222        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1223        ignore_properties = properties_path.exists()
1224        if ignore_properties:
1225            _state = {
1226                key: val
1227                for key, val in _state.items()
1228                if key != 'properties'
1229            }
1230        self.__init__(**_state)
1231
1232
1233    def __repr__(self):
1234        return str(self)
1235
1236    def __str__(self):
1237        return self.daemon_id
1238
1239    def __eq__(self, other):
1240        if not isinstance(other, Daemon):
1241            return False
1242        return self.daemon_id == other.daemon_id
1243
1244    def __hash__(self):
1245        return hash(self.daemon_id)

Daemonize Python functions into background processes.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 |

>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |

>>> mrsm.pprint(daemon.properties)
{
    'label': 'print',
    'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
    'result': None,
    'process': {'ended': '2024-07-29T18:03:33.752806'}
}
Daemon( target: Optional[Callable[[Any], Any]] = None, target_args: Union[List[Any], Tuple[Any], NoneType] = None, target_kw: Optional[Dict[str, Any]] = None, env: Optional[Dict[str, str]] = None, daemon_id: Optional[str] = None, label: Optional[str] = None, properties: Optional[Dict[str, Any]] = None)
140    def __init__(
141        self,
142        target: Optional[Callable[[Any], Any]] = None,
143        target_args: Union[List[Any], Tuple[Any], None] = None,
144        target_kw: Optional[Dict[str, Any]] = None,
145        env: Optional[Dict[str, str]] = None,
146        daemon_id: Optional[str] = None,
147        label: Optional[str] = None,
148        properties: Optional[Dict[str, Any]] = None,
149    ):
150        """
151        Parameters
152        ----------
153        target: Optional[Callable[[Any], Any]], default None,
154            The function to execute in a child process.
155
156        target_args: Union[List[Any], Tuple[Any], None], default None
157            Positional arguments to pass to the target function.
158
159        target_kw: Optional[Dict[str, Any]], default None
160            Keyword arguments to pass to the target function.
161
162        env: Optional[Dict[str, str]], default None
163            If provided, set these environment variables in the daemon process.
164
165        daemon_id: Optional[str], default None
166            Build a `Daemon` from an existing `daemon_id`.
167            If `daemon_id` is provided, other arguments are ignored and are derived
168            from the existing pickled `Daemon`.
169
170        label: Optional[str], default None
171            Label string to help identifiy a daemon.
172            If `None`, use the function name instead.
173
174        properties: Optional[Dict[str, Any]], default None
175            Override reading from the properties JSON by providing an existing dictionary.
176        """
177        _pickle = self.__dict__.get('_pickle', False)
178        if daemon_id is not None:
179            self.daemon_id = daemon_id
180            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
181
182                if not self.properties_path.exists():
183                    raise Exception(
184                        f"Daemon '{self.daemon_id}' does not exist. "
185                        + "Pass a target to create a new Daemon."
186                    )
187
188                try:
189                    new_daemon = self.from_properties_file(daemon_id)
190                except Exception:
191                    new_daemon = None
192
193                if new_daemon is not None:
194                    new_daemon.write_pickle()
195                    target = new_daemon.target
196                    target_args = new_daemon.target_args
197                    target_kw = new_daemon.target_kw
198                    label = new_daemon.label
199                    self._properties = new_daemon.properties
200                else:
201                    try:
202                        self.properties_path.unlink()
203                    except Exception:
204                        pass
205
206                    raise Exception(
207                        f"Could not recover daemon '{self.daemon_id}' "
208                        + "from its properties file."
209                    )
210
211        if 'target' not in self.__dict__:
212            if target is None:
213                error("Cannot create a Daemon without a target.")
214            self.target = target
215
216        ### NOTE: We have to check self.__dict__ in case we un-pickling.
217        if '_target_args' not in self.__dict__:
218            self._target_args = target_args
219        if '_target_kw' not in self.__dict__:
220            self._target_kw = target_kw
221
222        if 'label' not in self.__dict__:
223            if label is None:
224                label = (
225                    self.target.__name__ if '__name__' in self.target.__dir__()
226                        else str(self.target)
227                )
228            self.label = label
229        if 'daemon_id' not in self.__dict__:
230            self.daemon_id = get_new_daemon_name()
231        if '_properties' not in self.__dict__:
232            self._properties = properties
233        if self._properties is None:
234            self._properties = {}
235
236        self._properties.update({'label': self.label})
237        if env:
238            self._properties.update({'env': env})
239
240        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
241        _ = self.process
Parameters
  • target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
  • target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
  • target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
  • daemon_id (Optional[str], default None): Build a Daemon from an existing daemon_id. If daemon_id is provided, other arguments are ignored and are derived from the existing pickled Daemon.
  • label (Optional[str], default None): Label string to help identifiy a daemon. If None, use the function name instead.
  • properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
@classmethod
def from_properties_file(cls, daemon_id: str) -> Daemon:
 93    @classmethod
 94    def from_properties_file(cls, daemon_id: str) -> Daemon:
 95        """
 96        Return a Daemon from a properties dictionary.
 97        """
 98        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
 99        if not properties_path.exists():
100            raise OSError(f"Properties file '{properties_path}' does not exist.")
101
102        try:
103            with open(properties_path, 'r', encoding='utf-8') as f:
104                properties = json.load(f)
105        except Exception:
106            properties = {}
107
108        if not properties:
109            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
110
111        daemon_id = properties_path.parent.name
112        target_cf = properties.get('target', {})
113        target_module_name = target_cf.get('module', None)
114        target_function_name = target_cf.get('name', None)
115        target_args = target_cf.get('args', None)
116        target_kw = target_cf.get('kw', None)
117        label = properties.get('label', None)
118
119        if None in [
120            target_module_name,
121            target_function_name,
122            target_args,
123            target_kw,
124        ]:
125            raise ValueError("Missing target function information.")
126
127        target_module = importlib.import_module(target_module_name)
128        target_function = getattr(target_module, target_function_name)
129
130        return Daemon(
131            daemon_id=daemon_id,
132            target=target_function,
133            target_args=target_args,
134            target_kw=target_kw,
135            properties=properties,
136            label=label,
137        )

Return a Daemon from a properties dictionary.

def run( self, keep_daemon_output: bool = True, allow_dirty_run: bool = False, debug: bool = False) -> Tuple[bool, str]:
383    def run(
384        self,
385        keep_daemon_output: bool = True,
386        allow_dirty_run: bool = False,
387        debug: bool = False,
388    ) -> SuccessTuple:
389        """Run the daemon as a child process and continue executing the parent.
390
391        Parameters
392        ----------
393        keep_daemon_output: bool, default True
394            If `False`, delete the daemon's output directory upon exiting.
395
396        allow_dirty_run: bool, default False
397            If `True`, run the daemon, even if the `daemon_id` directory exists.
398            This option is dangerous because if the same `daemon_id` runs concurrently,
399            the last to finish will overwrite the output of the first.
400
401        Returns
402        -------
403        A SuccessTuple indicating success.
404
405        """
406        import platform
407        if platform.system() == 'Windows':
408            return False, "Cannot run background jobs on Windows."
409
410        ### The daemon might exist and be paused.
411        if self.status == 'paused':
412            return self.resume()
413
414        self._remove_stop_file()
415        if self.status == 'running':
416            return True, f"Daemon '{self}' is already running."
417
418        self.mkdir_if_not_exists(allow_dirty_run)
419        _write_pickle_success_tuple = self.write_pickle()
420        if not _write_pickle_success_tuple[0]:
421            return _write_pickle_success_tuple
422
423        _launch_daemon_code = (
424            "from meerschaum.utils.daemon import Daemon; "
425            + f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
426            + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
427            + "allow_dirty_run=True)"
428        )
429        env = dict(os.environ)
430        env['MRSM_NOASK'] = 'true'
431        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
432        msg = (
433            "Success"
434            if _launch_success_bool
435            else f"Failed to start daemon '{self.daemon_id}'."
436        )
437        return _launch_success_bool, msg

Run the daemon as a child process and continue executing the parent.

Parameters
  • keep_daemon_output (bool, default True): If False, delete the daemon's output directory upon exiting.
  • allow_dirty_run (bool, default False): If True, run the daemon, even if the daemon_id directory exists. This option is dangerous because if the same daemon_id runs concurrently, the last to finish will overwrite the output of the first.
Returns
  • A SuccessTuple indicating success.
def kill(self, timeout: Union[int, float, NoneType] = 8) -> Tuple[bool, str]:
439    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
440        """
441        Forcibly terminate a running daemon.
442        Sends a SIGTERM signal to the process.
443
444        Parameters
445        ----------
446        timeout: Optional[int], default 3
447            How many seconds to wait for the process to terminate.
448
449        Returns
450        -------
451        A SuccessTuple indicating success.
452        """
453        if self.status != 'paused':
454            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
455            if success:
456                self._write_stop_file('kill')
457                return success, msg
458
459        if self.status == 'stopped':
460            self._write_stop_file('kill')
461            return True, "Process has already stopped."
462
463        process = self.process
464        try:
465            process.terminate()
466            process.kill()
467            process.wait(timeout=timeout)
468        except Exception as e:
469            return False, f"Failed to kill job {self} with exception: {e}"
470
471        if self.pid_path.exists():
472            try:
473                self.pid_path.unlink()
474            except Exception as e:
475                pass
476
477        self._write_stop_file('kill')
478        return True, "Success"

Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.

Parameters
  • timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
  • A SuccessTuple indicating success.
def quit(self, timeout: Union[int, float, NoneType] = None) -> Tuple[bool, str]:
480    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
481        """Gracefully quit a running daemon."""
482        if self.status == 'paused':
483            return self.kill(timeout)
484
485        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
486        if signal_success:
487            self._write_stop_file('quit')
488        return signal_success, signal_msg

Gracefully quit a running daemon.

def pause( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
490    def pause(
491        self,
492        timeout: Union[int, float, None] = None,
493        check_timeout_interval: Union[float, int, None] = None,
494    ) -> SuccessTuple:
495        """
496        Pause the daemon if it is running.
497
498        Parameters
499        ----------
500        timeout: Union[float, int, None], default None
501            The maximum number of seconds to wait for a process to suspend.
502
503        check_timeout_interval: Union[float, int, None], default None
504            The number of seconds to wait between checking if the process is still running.
505
506        Returns
507        -------
508        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
509        """
510        if self.process is None:
511            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
512
513        if self.status == 'paused':
514            return True, f"Daemon '{self.daemon_id}' is already paused."
515
516        self._write_stop_file('pause')
517        try:
518            self.process.suspend()
519        except Exception as e:
520            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
521
522        timeout = self.get_timeout_seconds(timeout)
523        check_timeout_interval = self.get_check_timeout_interval_seconds(
524            check_timeout_interval
525        )
526
527        psutil = attempt_import('psutil')
528
529        if not timeout:
530            try:
531                success = self.process.status() == 'stopped'
532            except psutil.NoSuchProcess as e:
533                success = True
534            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
535            if success:
536                self._capture_process_timestamp('paused')
537            return success, msg
538
539        begin = time.perf_counter()
540        while (time.perf_counter() - begin) < timeout:
541            try:
542                if self.process.status() == 'stopped':
543                    self._capture_process_timestamp('paused')
544                    return True, "Success"
545            except psutil.NoSuchProcess as e:
546                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
547            time.sleep(check_timeout_interval)
548
549        return False, (
550            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
551            + ('s' if timeout != 1 else '') + '.'
552        )

Pause the daemon if it is running.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully suspended.
def resume( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
554    def resume(
555        self,
556        timeout: Union[int, float, None] = None,
557        check_timeout_interval: Union[float, int, None] = None,
558    ) -> SuccessTuple:
559        """
560        Resume the daemon if it is paused.
561
562        Parameters
563        ----------
564        timeout: Union[float, int, None], default None
565            The maximum number of seconds to wait for a process to resume.
566
567        check_timeout_interval: Union[float, int, None], default None
568            The number of seconds to wait between checking if the process is still stopped.
569
570        Returns
571        -------
572        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
573        """
574        if self.status == 'running':
575            return True, f"Daemon '{self.daemon_id}' is already running."
576
577        if self.status == 'stopped':
578            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
579
580        self._remove_stop_file()
581        try:
582            self.process.resume()
583        except Exception as e:
584            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
585
586        timeout = self.get_timeout_seconds(timeout)
587        check_timeout_interval = self.get_check_timeout_interval_seconds(
588            check_timeout_interval
589        )
590
591        if not timeout:
592            success = self.status == 'running'
593            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
594            if success:
595                self._capture_process_timestamp('began')
596            return success, msg
597
598        begin = time.perf_counter()
599        while (time.perf_counter() - begin) < timeout:
600            if self.status == 'running':
601                self._capture_process_timestamp('began')
602                return True, "Success"
603            time.sleep(check_timeout_interval)
604
605        return False, (
606            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
607            + ('s' if timeout != 1 else '') + '.'
608        )

Resume the daemon if it is paused.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully resumed.
def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
729    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
730        """Create the Daemon's directory.
731        If `allow_dirty_run` is `False` and the directory already exists,
732        raise a `FileExistsError`.
733        """
734        try:
735            self.path.mkdir(parents=True, exist_ok=True)
736            _already_exists = any(os.scandir(self.path))
737        except FileExistsError:
738            _already_exists = True
739
740        if _already_exists and not allow_dirty_run:
741            error(
742                f"Daemon '{self.daemon_id}' already exists. " +
743                f"To allow this daemon to run, do one of the following:\n"
744                + "  - Execute `daemon.cleanup()`.\n"
745                + f"  - Delete the directory '{self.path}'.\n"
746                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
747                FileExistsError,
748            )

Create the Daemon's directory. If allow_dirty_run is False and the directory already exists, raise a FileExistsError.

process: Optional[psutil.Process]
750    @property
751    def process(self) -> Union['psutil.Process', None]:
752        """
753        Return the psutil process for the Daemon.
754        """
755        psutil = attempt_import('psutil')
756        pid = self.pid
757        if pid is None:
758            return None
759        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
760            try:
761                self._process = psutil.Process(int(pid))
762            except Exception as e:
763                if self.pid_path.exists():
764                    self.pid_path.unlink()
765                return None
766        return self._process

Return the psutil process for the Daemon.

status: str
768    @property
769    def status(self) -> str:
770        """
771        Return the running status of this Daemon.
772        """
773        if self.process is None:
774            return 'stopped'
775
776        psutil = attempt_import('psutil')
777        try:
778            if self.process.status() == 'stopped':
779                return 'paused'
780            if self.process.status() == 'zombie':
781                raise psutil.NoSuchProcess(self.process.pid)
782        except (psutil.NoSuchProcess, AttributeError):
783            if self.pid_path.exists():
784                try:
785                    self.pid_path.unlink()
786                except Exception as e:
787                    pass
788            return 'stopped'
789
790        return 'running'

Return the running status of this Daemon.

path: pathlib.Path
799    @property
800    def path(self) -> pathlib.Path:
801        """
802        Return the path for this Daemon's directory.
803        """
804        return self._get_path_from_daemon_id(self.daemon_id)

Return the path for this Daemon's directory.

properties_path: pathlib.Path
813    @property
814    def properties_path(self) -> pathlib.Path:
815        """
816        Return the `propterties.json` path for this Daemon.
817        """
818        return self._get_properties_path_from_daemon_id(self.daemon_id)

Return the propterties.json path for this Daemon.

stop_path: pathlib.Path
820    @property
821    def stop_path(self) -> pathlib.Path:
822        """
823        Return the path for the stop file (created when manually stopped).
824        """
825        return self.path / '.stop.json'

Return the path for the stop file (created when manually stopped).

log_path: pathlib.Path
827    @property
828    def log_path(self) -> pathlib.Path:
829        """
830        Return the log path.
831        """
832        return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')

Return the log path.

stdin_file_path: pathlib.Path
834    @property
835    def stdin_file_path(self) -> pathlib.Path:
836        """
837        Return the stdin file path.
838        """
839        return self.path / 'input.stdin'

Return the stdin file path.

blocking_stdin_file_path: pathlib.Path
841    @property
842    def blocking_stdin_file_path(self) -> pathlib.Path:
843        """
844        Return the stdin file path.
845        """
846        if '_blocking_stdin_file_path' in self.__dict__:
847            return self._blocking_stdin_file_path
848
849        return self.path / 'input.stdin.block'

Return the stdin file path.

log_offset_path: pathlib.Path
851    @property
852    def log_offset_path(self) -> pathlib.Path:
853        """
854        Return the log offset file path.
855        """
856        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')

Return the log offset file path.

858    @property
859    def rotating_log(self) -> RotatingFile:
860        """
861        The rotating log file for the daemon's output.
862        """
863        if '_rotating_log' in self.__dict__:
864            return self._rotating_log
865
866        write_timestamps = (
867            self.properties.get('logs', {}).get('write_timestamps', None)
868        )
869        if write_timestamps is None:
870            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
871
872        self._rotating_log = RotatingFile(
873            self.log_path,
874            redirect_streams=True,
875            write_timestamps=write_timestamps,
876            timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'),
877        )
878        return self._rotating_log

The rotating log file for the daemon's output.

stdin_file
880    @property
881    def stdin_file(self):
882        """
883        Return the file handler for the stdin file.
884        """
885        if '_stdin_file' in self.__dict__:
886            return self._stdin_file
887
888        self._stdin_file = StdinFile(
889            self.stdin_file_path,
890            lock_file_path=self.blocking_stdin_file_path,
891        )
892        return self._stdin_file

Return the file handler for the stdin file.

log_text: Optional[str]
894    @property
895    def log_text(self) -> Optional[str]:
896        """
897        Read the log files and return their contents.
898        Returns `None` if the log file does not exist.
899        """
900        new_rotating_log = RotatingFile(
901            self.rotating_log.file_path,
902            num_files_to_keep = self.rotating_log.num_files_to_keep,
903            max_file_size = self.rotating_log.max_file_size,
904            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'),
905            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'),
906        )
907        return new_rotating_log.read()

Read the log files and return their contents. Returns None if the log file does not exist.

def readlines(self) -> List[str]:
909    def readlines(self) -> List[str]:
910        """
911        Read the next log lines, persisting the cursor for later use.
912        Note this will alter the cursor of `self.rotating_log`.
913        """
914        self.rotating_log._cursor = self._read_log_offset()
915        lines = self.rotating_log.readlines()
916        self._write_log_offset()
917        return lines

Read the next log lines, persisting the cursor for later use. Note this will alter the cursor of self.rotating_log.

pid: Optional[int]
945    @property
946    def pid(self) -> Union[int, None]:
947        """
948        Read the PID file and return its contents.
949        Returns `None` if the PID file does not exist.
950        """
951        if not self.pid_path.exists():
952            return None
953        try:
954            with open(self.pid_path, 'r', encoding='utf-8') as f:
955                text = f.read()
956            if len(text) == 0:
957                return None
958            pid = int(text.rstrip())
959        except Exception as e:
960            warn(e)
961            text = None
962            pid = None
963        return pid

Read the PID file and return its contents. Returns None if the PID file does not exist.

pid_path: pathlib.Path
965    @property
966    def pid_path(self) -> pathlib.Path:
967        """
968        Return the path to a file containing the PID for this Daemon.
969        """
970        return self.path / 'process.pid'

Return the path to a file containing the PID for this Daemon.

pid_lock: "'fasteners.InterProcessLock'"
972    @property
973    def pid_lock(self) -> 'fasteners.InterProcessLock':
974        """
975        Return the process lock context manager.
976        """
977        if '_pid_lock' in self.__dict__:
978            return self._pid_lock
979
980        fasteners = attempt_import('fasteners')
981        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
982        return self._pid_lock

Return the process lock context manager.

pickle_path: pathlib.Path
984    @property
985    def pickle_path(self) -> pathlib.Path:
986        """
987        Return the path for the pickle file.
988        """
989        return self.path / 'pickle.pkl'

Return the path for the pickle file.

def read_properties(self) -> Optional[Dict[str, Any]]:
 991    def read_properties(self) -> Optional[Dict[str, Any]]:
 992        """Read the properties JSON file and return the dictionary."""
 993        if not self.properties_path.exists():
 994            return None
 995        try:
 996            with open(self.properties_path, 'r', encoding='utf-8') as file:
 997                properties = json.load(file)
 998        except Exception as e:
 999            properties = {}
1000        
1001        return properties

Read the properties JSON file and return the dictionary.

def read_pickle(self) -> Daemon:
1003    def read_pickle(self) -> Daemon:
1004        """Read a Daemon's pickle file and return the `Daemon`."""
1005        import pickle, traceback
1006        if not self.pickle_path.exists():
1007            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1008
1009        if self.pickle_path.stat().st_size == 0:
1010            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1011
1012        try:
1013            with open(self.pickle_path, 'rb') as pickle_file:
1014                daemon = pickle.load(pickle_file)
1015            success, msg = True, 'Success'
1016        except Exception as e:
1017            success, msg = False, str(e)
1018            daemon = None
1019            traceback.print_exception(type(e), e, e.__traceback__)
1020        if not success:
1021            error(msg)
1022        return daemon

Read a Daemon's pickle file and return the Daemon.

properties: Dict[str, Any]
1024    @property
1025    def properties(self) -> Dict[str, Any]:
1026        """
1027        Return the contents of the properties JSON file.
1028        """
1029        try:
1030            _file_properties = self.read_properties()
1031        except Exception:
1032            traceback.print_exc()
1033            _file_properties = {}
1034
1035        if not self._properties:
1036            self._properties = _file_properties
1037
1038        if self._properties is None:
1039            self._properties = {}
1040
1041        if _file_properties is not None:
1042            self._properties = apply_patch_to_config(
1043                _file_properties,
1044                self._properties,
1045            )
1046
1047        return self._properties

Return the contents of the properties JSON file.

hidden: bool
1049    @property
1050    def hidden(self) -> bool:
1051        """
1052        Return a bool indicating whether this Daemon should be displayed.
1053        """
1054        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')

Return a bool indicating whether this Daemon should be displayed.

def write_properties(self) -> Tuple[bool, str]:
1056    def write_properties(self) -> SuccessTuple:
1057        """Write the properties dictionary to the properties JSON file
1058        (only if self.properties exists).
1059        """
1060        success, msg = (
1061            False,
1062            f"No properties to write for daemon '{self.daemon_id}'."
1063        )
1064        if self.properties is not None:
1065            try:
1066                self.path.mkdir(parents=True, exist_ok=True)
1067                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1068                    json.dump(self.properties, properties_file)
1069                success, msg = True, 'Success'
1070            except Exception as e:
1071                success, msg = False, str(e)
1072        return success, msg

Write the properties dictionary to the properties JSON file (only if self.properties exists).

def write_pickle(self) -> Tuple[bool, str]:
1074    def write_pickle(self) -> SuccessTuple:
1075        """Write the pickle file for the daemon."""
1076        import pickle, traceback
1077        try:
1078            self.path.mkdir(parents=True, exist_ok=True)
1079            with open(self.pickle_path, 'wb+') as pickle_file:
1080                pickle.dump(self, pickle_file)
1081            success, msg = True, "Success"
1082        except Exception as e:
1083            success, msg = False, str(e)
1084            traceback.print_exception(type(e), e, e.__traceback__)
1085        return success, msg

Write the pickle file for the daemon.

def cleanup(self, keep_logs: bool = False) -> Tuple[bool, str]:
1115    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1116        """
1117        Remove a daemon's directory after execution.
1118
1119        Parameters
1120        ----------
1121        keep_logs: bool, default False
1122            If `True`, skip deleting the daemon's log files.
1123
1124        Returns
1125        -------
1126        A `SuccessTuple` indicating success.
1127        """
1128        if self.path.exists():
1129            try:
1130                shutil.rmtree(self.path)
1131            except Exception as e:
1132                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1133                warn(msg)
1134                return False, msg
1135        if not keep_logs:
1136            self.rotating_log.delete()
1137            try:
1138                if self.log_offset_path.exists():
1139                    self.log_offset_path.unlink()
1140            except Exception as e:
1141                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1142                warn(msg)
1143                return False, msg
1144        return True, "Success"

Remove a daemon's directory after execution.

Parameters
  • keep_logs (bool, default False): If True, skip deleting the daemon's log files.
Returns
  • A SuccessTuple indicating success.
def get_timeout_seconds(self, timeout: Union[int, float, NoneType] = None) -> Union[int, float]:
1147    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1148        """
1149        Return the timeout value to use. Use `--timeout-seconds` if provided,
1150        else the configured default (8).
1151        """
1152        if isinstance(timeout, (int, float)):
1153            return timeout
1154        return get_config('jobs', 'timeout_seconds')

Return the timeout value to use. Use --timeout-seconds if provided, else the configured default (8).

def get_check_timeout_interval_seconds( self, check_timeout_interval: Union[int, float, NoneType] = None) -> Union[int, float]:
1157    def get_check_timeout_interval_seconds(
1158        self,
1159        check_timeout_interval: Union[int, float, None] = None,
1160    ) -> Union[int, float]:
1161        """
1162        Return the interval value to check the status of timeouts.
1163        """
1164        if isinstance(check_timeout_interval, (int, float)):
1165            return check_timeout_interval
1166        return get_config('jobs', 'check_timeout_interval_seconds')

Return the interval value to check the status of timeouts.

target_args: Optional[Tuple[Any]]
1168    @property
1169    def target_args(self) -> Union[Tuple[Any], None]:
1170        """
1171        Return the positional arguments to pass to the target function.
1172        """
1173        target_args = (
1174            self.__dict__.get('_target_args', None)
1175            or self.properties.get('target', {}).get('args', None)
1176        )
1177        if target_args is None:
1178            return tuple([])
1179
1180        return tuple(target_args)

Return the positional arguments to pass to the target function.

target_kw: Optional[Dict[str, Any]]
1182    @property
1183    def target_kw(self) -> Union[Dict[str, Any], None]:
1184        """
1185        Return the keyword arguments to pass to the target function.
1186        """
1187        target_kw = (
1188            self.__dict__.get('_target_kw', None)
1189            or self.properties.get('target', {}).get('kw', None)
1190        )
1191        if target_kw is None:
1192            return {}
1193
1194        return {key: val for key, val in target_kw.items()}

Return the keyword arguments to pass to the target function.