meerschaum.utils.daemon.Daemon

Manage running daemons via the Daemon class.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Manage running daemons via the Daemon class.
   7"""
   8
   9from __future__ import annotations
  10import os
  11import importlib
  12import pathlib
  13import json
  14import shutil
  15import signal
  16import sys
  17import time
  18import traceback
  19from functools import partial
  20from datetime import datetime, timezone
  21
  22import meerschaum as mrsm
  23from meerschaum.utils.typing import (
  24    Optional, Dict, Any, SuccessTuple, Callable, List, Union,
  25    is_success_tuple, Tuple,
  26)
  27from meerschaum.config import get_config
  28from meerschaum.config.static import STATIC_CONFIG
  29from meerschaum.config._paths import (
  30    DAEMON_RESOURCES_PATH, LOGS_RESOURCES_PATH, DAEMON_ERROR_LOG_PATH,
  31)
  32from meerschaum.config._patch import apply_patch_to_config
  33from meerschaum.utils.warnings import warn, error
  34from meerschaum.utils.packages import attempt_import
  35from meerschaum.utils.venv import venv_exec
  36from meerschaum.utils.daemon._names import get_new_daemon_name
  37from meerschaum.utils.daemon.RotatingFile import RotatingFile
  38from meerschaum.utils.daemon.StdinFile import StdinFile
  39from meerschaum.utils.threading import RepeatTimer
  40from meerschaum.__main__ import _close_pools
  41
  42_daemons = []
  43_results = {}
  44
  45class Daemon:
  46    """
  47    Daemonize Python functions into background processes.
  48
  49    Examples
  50    --------
  51    >>> import meerschaum as mrsm
  52    >>> from meerschaum.utils.daemons import Daemon
  53    >>> daemon = Daemon(print, ('hi',))
  54    >>> success, msg = daemon.run()
  55    >>> print(daemon.log_text)
  56
  57    2024-07-29 18:03 | hi
  58    2024-07-29 18:03 |
  59    >>> daemon.run(allow_dirty_run=True)
  60    >>> print(daemon.log_text)
  61
  62    2024-07-29 18:03 | hi
  63    2024-07-29 18:03 |
  64    2024-07-29 18:05 | hi
  65    2024-07-29 18:05 |
  66    >>> mrsm.pprint(daemon.properties)
  67    {
  68        'label': 'print',
  69        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  70        'result': None,
  71        'process': {'ended': '2024-07-29T18:03:33.752806'}
  72    }
  73
  74    """
  75
  76    def __new__(
  77        cls,
  78        *args,
  79        daemon_id: Optional[str] = None,
  80        **kw
  81    ):
  82        """
  83        If a daemon_id is provided and already exists, read from its pickle file.
  84        """
  85        instance = super(Daemon, cls).__new__(cls)
  86        if daemon_id is not None:
  87            instance.daemon_id = daemon_id
  88            if instance.pickle_path.exists():
  89                instance = instance.read_pickle()
  90        return instance
  91
  92    @classmethod
  93    def from_properties_file(cls, daemon_id: str) -> Daemon:
  94        """
  95        Return a Daemon from a properties dictionary.
  96        """
  97        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  98        if not properties_path.exists():
  99            raise OSError(f"Properties file '{properties_path}' does not exist.")
 100
 101        try:
 102            with open(properties_path, 'r', encoding='utf-8') as f:
 103                properties = json.load(f)
 104        except Exception:
 105            properties = {}
 106
 107        if not properties:
 108            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 109
 110        daemon_id = properties_path.parent.name
 111        target_cf = properties.get('target', {})
 112        target_module_name = target_cf.get('module', None)
 113        target_function_name = target_cf.get('name', None)
 114        target_args = target_cf.get('args', None)
 115        target_kw = target_cf.get('kw', None)
 116        label = properties.get('label', None)
 117
 118        if None in [
 119            target_module_name,
 120            target_function_name,
 121            target_args,
 122            target_kw,
 123        ]:
 124            raise ValueError("Missing target function information.")
 125
 126        target_module = importlib.import_module(target_module_name)
 127        target_function = getattr(target_module, target_function_name)
 128
 129        return Daemon(
 130            daemon_id=daemon_id,
 131            target=target_function,
 132            target_args=target_args,
 133            target_kw=target_kw,
 134            properties=properties,
 135            label=label,
 136        )
 137
 138
 139    def __init__(
 140        self,
 141        target: Optional[Callable[[Any], Any]] = None,
 142        target_args: Union[List[Any], Tuple[Any], None] = None,
 143        target_kw: Optional[Dict[str, Any]] = None,
 144        env: Optional[Dict[str, str]] = None,
 145        daemon_id: Optional[str] = None,
 146        label: Optional[str] = None,
 147        properties: Optional[Dict[str, Any]] = None,
 148    ):
 149        """
 150        Parameters
 151        ----------
 152        target: Optional[Callable[[Any], Any]], default None,
 153            The function to execute in a child process.
 154
 155        target_args: Union[List[Any], Tuple[Any], None], default None
 156            Positional arguments to pass to the target function.
 157
 158        target_kw: Optional[Dict[str, Any]], default None
 159            Keyword arguments to pass to the target function.
 160
 161        env: Optional[Dict[str, str]], default None
 162            If provided, set these environment variables in the daemon process.
 163
 164        daemon_id: Optional[str], default None
 165            Build a `Daemon` from an existing `daemon_id`.
 166            If `daemon_id` is provided, other arguments are ignored and are derived
 167            from the existing pickled `Daemon`.
 168
 169        label: Optional[str], default None
 170            Label string to help identifiy a daemon.
 171            If `None`, use the function name instead.
 172
 173        properties: Optional[Dict[str, Any]], default None
 174            Override reading from the properties JSON by providing an existing dictionary.
 175        """
 176        _pickle = self.__dict__.get('_pickle', False)
 177        if daemon_id is not None:
 178            self.daemon_id = daemon_id
 179            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 180
 181                if not self.properties_path.exists():
 182                    raise Exception(
 183                        f"Daemon '{self.daemon_id}' does not exist. "
 184                        + "Pass a target to create a new Daemon."
 185                    )
 186
 187                try:
 188                    new_daemon = self.from_properties_file(daemon_id)
 189                except Exception:
 190                    new_daemon = None
 191
 192                if new_daemon is not None:
 193                    new_daemon.write_pickle()
 194                    target = new_daemon.target
 195                    target_args = new_daemon.target_args
 196                    target_kw = new_daemon.target_kw
 197                    label = new_daemon.label
 198                    self._properties = new_daemon.properties
 199                else:
 200                    try:
 201                        self.properties_path.unlink()
 202                    except Exception:
 203                        pass
 204
 205                    raise Exception(
 206                        f"Could not recover daemon '{self.daemon_id}' "
 207                        + "from its properties file."
 208                    )
 209
 210        if 'target' not in self.__dict__:
 211            if target is None:
 212                error("Cannot create a Daemon without a target.")
 213            self.target = target
 214
 215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 216        if '_target_args' not in self.__dict__:
 217            self._target_args = target_args
 218        if '_target_kw' not in self.__dict__:
 219            self._target_kw = target_kw
 220
 221        if 'label' not in self.__dict__:
 222            if label is None:
 223                label = (
 224                    self.target.__name__ if '__name__' in self.target.__dir__()
 225                        else str(self.target)
 226                )
 227            self.label = label
 228        if 'daemon_id' not in self.__dict__:
 229            self.daemon_id = get_new_daemon_name()
 230        if '_properties' not in self.__dict__:
 231            self._properties = properties
 232        if self._properties is None:
 233            self._properties = {}
 234
 235        self._properties.update({'label': self.label})
 236        if env:
 237            self._properties.update({'env': env})
 238
 239        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 240        _ = self.process
 241
 242
 243    def _run_exit(
 244        self,
 245        keep_daemon_output: bool = True,
 246        allow_dirty_run: bool = False,
 247    ) -> Any:
 248        """Run the daemon's target function.
 249        NOTE: This WILL EXIT the parent process!
 250
 251        Parameters
 252        ----------
 253        keep_daemon_output: bool, default True
 254            If `False`, delete the daemon's output directory upon exiting.
 255
 256        allow_dirty_run, bool, default False:
 257            If `True`, run the daemon, even if the `daemon_id` directory exists.
 258            This option is dangerous because if the same `daemon_id` runs twice,
 259            the last to finish will overwrite the output of the first.
 260
 261        Returns
 262        -------
 263        Nothing — this will exit the parent process.
 264        """
 265        import platform, sys, os, traceback
 266        from meerschaum.utils.warnings import warn
 267        from meerschaum.config import get_config
 268        daemon = attempt_import('daemon')
 269        lines = get_config('jobs', 'terminal', 'lines')
 270        columns = get_config('jobs', 'terminal', 'columns')
 271
 272        if platform.system() == 'Windows':
 273            return False, "Windows is no longer supported."
 274
 275        self._setup(allow_dirty_run)
 276
 277        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 278        ###       KeyboardInterrupts themselves.
 279        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 280        self._daemon_context = daemon.DaemonContext(
 281            pidfile=self.pid_lock,
 282            stdout=self.rotating_log,
 283            stderr=self.rotating_log,
 284            working_directory=os.getcwd(),
 285            detach_process=True,
 286            files_preserve=list(self.rotating_log.subfile_objects.values()),
 287            signal_map={
 288                signal.SIGTERM: self._handle_sigterm,
 289            },
 290        )
 291
 292        _daemons.append(self)
 293
 294        log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 295        self._log_refresh_timer = RepeatTimer(
 296            log_refresh_seconds,
 297            partial(self.rotating_log.refresh_files, start_interception=True),
 298        )
 299
 300        try:
 301            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 302            with self._daemon_context:
 303                sys.stdin = self.stdin_file
 304                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 305                os.environ['PYTHONUNBUFFERED'] = '1'
 306
 307                ### Allow the user to override environment variables.
 308                env = self.properties.get('env', {})
 309                if env and isinstance(env, dict):
 310                    os.environ.update({str(k): str(v) for k, v in env.items()})
 311
 312                self.rotating_log.refresh_files(start_interception=True)
 313                result = None
 314                try:
 315                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 316                        f.write(str(os.getpid()))
 317
 318                    ### NOTE: The timer fails to start for remote actions to localhost.
 319                    try:
 320                        if not self._log_refresh_timer.is_running():
 321                            self._log_refresh_timer.start()
 322                    except Exception:
 323                        pass
 324
 325                    self.properties['result'] = None
 326                    self._capture_process_timestamp('began')
 327                    result = self.target(*self.target_args, **self.target_kw)
 328                    self.properties['result'] = result
 329                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 330                    pass
 331                except Exception as e:
 332                    warn(
 333                        f"Exception in daemon target function: {traceback.format_exc()}",
 334                    )
 335                    result = e
 336                finally:
 337                    _results[self.daemon_id] = result
 338
 339                    if keep_daemon_output:
 340                        self._capture_process_timestamp('ended')
 341                    else:
 342                        self.cleanup()
 343
 344                    self._log_refresh_timer.cancel()
 345                    if self.pid is None and self.pid_path.exists():
 346                        self.pid_path.unlink()
 347
 348                    if is_success_tuple(result):
 349                        try:
 350                            mrsm.pprint(result)
 351                        except BrokenPipeError:
 352                            pass
 353
 354        except Exception:
 355            daemon_error = traceback.format_exc()
 356            with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 357                f.write(daemon_error)
 358            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 359
 360    def _capture_process_timestamp(
 361        self,
 362        process_key: str,
 363        write_properties: bool = True,
 364    ) -> None:
 365        """
 366        Record the current timestamp to the parameters `process:<process_key>`.
 367
 368        Parameters
 369        ----------
 370        process_key: str
 371            Under which key to store the timestamp.
 372
 373        write_properties: bool, default True
 374            If `True` persist the properties to disk immediately after capturing the timestamp.
 375        """
 376        if 'process' not in self.properties:
 377            self.properties['process'] = {}
 378
 379        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 380            raise ValueError(f"Invalid key '{process_key}'.")
 381
 382        self.properties['process'][process_key] = (
 383            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 384        )
 385        if write_properties:
 386            self.write_properties()
 387
 388    def run(
 389        self,
 390        keep_daemon_output: bool = True,
 391        allow_dirty_run: bool = False,
 392        debug: bool = False,
 393    ) -> SuccessTuple:
 394        """Run the daemon as a child process and continue executing the parent.
 395
 396        Parameters
 397        ----------
 398        keep_daemon_output: bool, default True
 399            If `False`, delete the daemon's output directory upon exiting.
 400
 401        allow_dirty_run: bool, default False
 402            If `True`, run the daemon, even if the `daemon_id` directory exists.
 403            This option is dangerous because if the same `daemon_id` runs concurrently,
 404            the last to finish will overwrite the output of the first.
 405
 406        Returns
 407        -------
 408        A SuccessTuple indicating success.
 409
 410        """
 411        import platform
 412        if platform.system() == 'Windows':
 413            return False, "Cannot run background jobs on Windows."
 414
 415        ### The daemon might exist and be paused.
 416        if self.status == 'paused':
 417            return self.resume()
 418
 419        self._remove_stop_file()
 420        if self.status == 'running':
 421            return True, f"Daemon '{self}' is already running."
 422
 423        self.mkdir_if_not_exists(allow_dirty_run)
 424        _write_pickle_success_tuple = self.write_pickle()
 425        if not _write_pickle_success_tuple[0]:
 426            return _write_pickle_success_tuple
 427
 428        _launch_daemon_code = (
 429            "from meerschaum.utils.daemon import Daemon; "
 430            + f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 431            + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 432            + "allow_dirty_run=True)"
 433        )
 434        env = dict(os.environ)
 435        env['MRSM_NOASK'] = 'true'
 436        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 437        msg = (
 438            "Success"
 439            if _launch_success_bool
 440            else f"Failed to start daemon '{self.daemon_id}'."
 441        )
 442        return _launch_success_bool, msg
 443
 444    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 445        """
 446        Forcibly terminate a running daemon.
 447        Sends a SIGTERM signal to the process.
 448
 449        Parameters
 450        ----------
 451        timeout: Optional[int], default 3
 452            How many seconds to wait for the process to terminate.
 453
 454        Returns
 455        -------
 456        A SuccessTuple indicating success.
 457        """
 458        if self.status != 'paused':
 459            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 460            if success:
 461                self._write_stop_file('kill')
 462                return success, msg
 463
 464        if self.status == 'stopped':
 465            self._write_stop_file('kill')
 466            return True, "Process has already stopped."
 467
 468        process = self.process
 469        try:
 470            process.terminate()
 471            process.kill()
 472            process.wait(timeout=timeout)
 473        except Exception as e:
 474            return False, f"Failed to kill job {self} with exception: {e}"
 475
 476        if self.pid_path.exists():
 477            try:
 478                self.pid_path.unlink()
 479            except Exception as e:
 480                pass
 481
 482        self._write_stop_file('kill')
 483        return True, "Success"
 484
 485    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 486        """Gracefully quit a running daemon."""
 487        if self.status == 'paused':
 488            return self.kill(timeout)
 489
 490        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 491        if signal_success:
 492            self._write_stop_file('quit')
 493        return signal_success, signal_msg
 494
 495    def pause(
 496        self,
 497        timeout: Union[int, float, None] = None,
 498        check_timeout_interval: Union[float, int, None] = None,
 499    ) -> SuccessTuple:
 500        """
 501        Pause the daemon if it is running.
 502
 503        Parameters
 504        ----------
 505        timeout: Union[float, int, None], default None
 506            The maximum number of seconds to wait for a process to suspend.
 507
 508        check_timeout_interval: Union[float, int, None], default None
 509            The number of seconds to wait between checking if the process is still running.
 510
 511        Returns
 512        -------
 513        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 514        """
 515        if self.process is None:
 516            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 517
 518        if self.status == 'paused':
 519            return True, f"Daemon '{self.daemon_id}' is already paused."
 520
 521        self._write_stop_file('pause')
 522        try:
 523            self.process.suspend()
 524        except Exception as e:
 525            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 526
 527        timeout = self.get_timeout_seconds(timeout)
 528        check_timeout_interval = self.get_check_timeout_interval_seconds(
 529            check_timeout_interval
 530        )
 531
 532        psutil = attempt_import('psutil')
 533
 534        if not timeout:
 535            try:
 536                success = self.process.status() == 'stopped'
 537            except psutil.NoSuchProcess as e:
 538                success = True
 539            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 540            if success:
 541                self._capture_process_timestamp('paused')
 542            return success, msg
 543
 544        begin = time.perf_counter()
 545        while (time.perf_counter() - begin) < timeout:
 546            try:
 547                if self.process.status() == 'stopped':
 548                    self._capture_process_timestamp('paused')
 549                    return True, "Success"
 550            except psutil.NoSuchProcess as e:
 551                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 552            time.sleep(check_timeout_interval)
 553
 554        return False, (
 555            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 556            + ('s' if timeout != 1 else '') + '.'
 557        )
 558
 559    def resume(
 560        self,
 561        timeout: Union[int, float, None] = None,
 562        check_timeout_interval: Union[float, int, None] = None,
 563    ) -> SuccessTuple:
 564        """
 565        Resume the daemon if it is paused.
 566
 567        Parameters
 568        ----------
 569        timeout: Union[float, int, None], default None
 570            The maximum number of seconds to wait for a process to resume.
 571
 572        check_timeout_interval: Union[float, int, None], default None
 573            The number of seconds to wait between checking if the process is still stopped.
 574
 575        Returns
 576        -------
 577        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 578        """
 579        if self.status == 'running':
 580            return True, f"Daemon '{self.daemon_id}' is already running."
 581
 582        if self.status == 'stopped':
 583            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 584
 585        self._remove_stop_file()
 586        try:
 587            self.process.resume()
 588        except Exception as e:
 589            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 590
 591        timeout = self.get_timeout_seconds(timeout)
 592        check_timeout_interval = self.get_check_timeout_interval_seconds(
 593            check_timeout_interval
 594        )
 595
 596        if not timeout:
 597            success = self.status == 'running'
 598            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 599            if success:
 600                self._capture_process_timestamp('began')
 601            return success, msg
 602
 603        begin = time.perf_counter()
 604        while (time.perf_counter() - begin) < timeout:
 605            if self.status == 'running':
 606                self._capture_process_timestamp('began')
 607                return True, "Success"
 608            time.sleep(check_timeout_interval)
 609
 610        return False, (
 611            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 612            + ('s' if timeout != 1 else '') + '.'
 613        )
 614
 615    def _write_stop_file(self, action: str) -> SuccessTuple:
 616        """Write the stop file timestamp and action."""
 617        if action not in ('quit', 'kill', 'pause'):
 618            return False, f"Unsupported action '{action}'."
 619
 620        if not self.stop_path.parent.exists():
 621            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 622
 623        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 624            json.dump(
 625                {
 626                    'stop_time': datetime.now(timezone.utc).isoformat(),
 627                    'action': action,
 628                },
 629                f
 630            )
 631
 632        return True, "Success"
 633
 634    def _remove_stop_file(self) -> SuccessTuple:
 635        """Remove the stop file"""
 636        if not self.stop_path.exists():
 637            return True, "Stop file does not exist."
 638
 639        try:
 640            self.stop_path.unlink()
 641        except Exception as e:
 642            return False, f"Failed to remove stop file:\n{e}"
 643
 644        return True, "Success"
 645
 646    def _read_stop_file(self) -> Dict[str, Any]:
 647        """
 648        Read the stop file if it exists.
 649        """
 650        if not self.stop_path.exists():
 651            return {}
 652
 653        try:
 654            with open(self.stop_path, 'r', encoding='utf-8') as f:
 655                data = json.load(f)
 656            return data
 657        except Exception:
 658            return {}
 659
 660    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 661        """
 662        Handle `SIGTERM` within the `Daemon` context.
 663        This method is injected into the `DaemonContext`.
 664        """
 665        from meerschaum.utils.process import signal_handler
 666        signal_handler(signal_number, stack_frame)
 667
 668        timer = self.__dict__.get('_log_refresh_timer', None)
 669        if timer is not None:
 670            timer.cancel()
 671
 672        daemon_context = self.__dict__.get('_daemon_context', None)
 673        if daemon_context is not None:
 674            daemon_context.close()
 675
 676        _close_pools()
 677        raise SystemExit(0)
 678
 679    def _send_signal(
 680            self,
 681            signal_to_send,
 682            timeout: Union[float, int, None] = None,
 683            check_timeout_interval: Union[float, int, None] = None,
 684        ) -> SuccessTuple:
 685        """Send a signal to the daemon process.
 686
 687        Parameters
 688        ----------
 689        signal_to_send:
 690            The signal the send to the daemon, e.g. `signals.SIGINT`.
 691
 692        timeout: Union[float, int, None], default None
 693            The maximum number of seconds to wait for a process to terminate.
 694
 695        check_timeout_interval: Union[float, int, None], default None
 696            The number of seconds to wait between checking if the process is still running.
 697
 698        Returns
 699        -------
 700        A SuccessTuple indicating success.
 701        """
 702        try:
 703            pid = self.pid
 704            if pid is None:
 705                return (
 706                    False,
 707                    f"Daemon '{self.daemon_id}' is not running, "
 708                    + f"cannot send signal '{signal_to_send}'."
 709                )
 710            
 711            os.kill(pid, signal_to_send)
 712        except Exception as e:
 713            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 714
 715        timeout = self.get_timeout_seconds(timeout)
 716        check_timeout_interval = self.get_check_timeout_interval_seconds(
 717            check_timeout_interval
 718        )
 719
 720        if not timeout:
 721            return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'."
 722
 723        begin = time.perf_counter()
 724        while (time.perf_counter() - begin) < timeout:
 725            if not self.status == 'running':
 726                return True, "Success"
 727            time.sleep(check_timeout_interval)
 728
 729        return False, (
 730            f"Failed to stop daemon '{self.daemon_id}' within {timeout} second"
 731            + ('s' if timeout != 1 else '') + '.'
 732        )
 733
 734    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 735        """Create the Daemon's directory.
 736        If `allow_dirty_run` is `False` and the directory already exists,
 737        raise a `FileExistsError`.
 738        """
 739        try:
 740            self.path.mkdir(parents=True, exist_ok=True)
 741            _already_exists = any(os.scandir(self.path))
 742        except FileExistsError:
 743            _already_exists = True
 744
 745        if _already_exists and not allow_dirty_run:
 746            error(
 747                f"Daemon '{self.daemon_id}' already exists. " +
 748                f"To allow this daemon to run, do one of the following:\n"
 749                + "  - Execute `daemon.cleanup()`.\n"
 750                + f"  - Delete the directory '{self.path}'.\n"
 751                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 752                FileExistsError,
 753            )
 754
 755    @property
 756    def process(self) -> Union['psutil.Process', None]:
 757        """
 758        Return the psutil process for the Daemon.
 759        """
 760        psutil = attempt_import('psutil')
 761        pid = self.pid
 762        if pid is None:
 763            return None
 764        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 765            try:
 766                self._process = psutil.Process(int(pid))
 767            except Exception as e:
 768                if self.pid_path.exists():
 769                    self.pid_path.unlink()
 770                return None
 771        return self._process
 772
 773    @property
 774    def status(self) -> str:
 775        """
 776        Return the running status of this Daemon.
 777        """
 778        if self.process is None:
 779            return 'stopped'
 780
 781        psutil = attempt_import('psutil')
 782        try:
 783            if self.process.status() == 'stopped':
 784                return 'paused'
 785            if self.process.status() == 'zombie':
 786                raise psutil.NoSuchProcess(self.process.pid)
 787        except (psutil.NoSuchProcess, AttributeError):
 788            if self.pid_path.exists():
 789                try:
 790                    self.pid_path.unlink()
 791                except Exception as e:
 792                    pass
 793            return 'stopped'
 794
 795        return 'running'
 796
 797    @classmethod
 798    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 799        """
 800        Return a Daemon's path from its `daemon_id`.
 801        """
 802        return DAEMON_RESOURCES_PATH / daemon_id
 803
 804    @property
 805    def path(self) -> pathlib.Path:
 806        """
 807        Return the path for this Daemon's directory.
 808        """
 809        return self._get_path_from_daemon_id(self.daemon_id)
 810
 811    @classmethod
 812    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 813        """
 814        Return the `properties.json` path for a given `daemon_id`.
 815        """
 816        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 817
 818    @property
 819    def properties_path(self) -> pathlib.Path:
 820        """
 821        Return the `propterties.json` path for this Daemon.
 822        """
 823        return self._get_properties_path_from_daemon_id(self.daemon_id)
 824
 825    @property
 826    def stop_path(self) -> pathlib.Path:
 827        """
 828        Return the path for the stop file (created when manually stopped).
 829        """
 830        return self.path / '.stop.json'
 831
 832    @property
 833    def log_path(self) -> pathlib.Path:
 834        """
 835        Return the log path.
 836        """
 837        return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 838
 839    @property
 840    def stdin_file_path(self) -> pathlib.Path:
 841        """
 842        Return the stdin file path.
 843        """
 844        return self.path / 'input.stdin'
 845
 846    @property
 847    def blocking_stdin_file_path(self) -> pathlib.Path:
 848        """
 849        Return the stdin file path.
 850        """
 851        if '_blocking_stdin_file_path' in self.__dict__:
 852            return self._blocking_stdin_file_path
 853
 854        return self.path / 'input.stdin.block'
 855
 856    @property
 857    def log_offset_path(self) -> pathlib.Path:
 858        """
 859        Return the log offset file path.
 860        """
 861        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 862
 863    @property
 864    def rotating_log(self) -> RotatingFile:
 865        """
 866        The rotating log file for the daemon's output.
 867        """
 868        if '_rotating_log' in self.__dict__:
 869            return self._rotating_log
 870
 871        write_timestamps = (
 872            self.properties.get('logs', {}).get('write_timestamps', None)
 873        )
 874        if write_timestamps is None:
 875            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 876
 877        self._rotating_log = RotatingFile(
 878            self.log_path,
 879            redirect_streams=True,
 880            write_timestamps=write_timestamps,
 881            timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'),
 882        )
 883        return self._rotating_log
 884
 885    @property
 886    def stdin_file(self):
 887        """
 888        Return the file handler for the stdin file.
 889        """
 890        if '_stdin_file' in self.__dict__:
 891            return self._stdin_file
 892
 893        self._stdin_file = StdinFile(
 894            self.stdin_file_path,
 895            lock_file_path=self.blocking_stdin_file_path,
 896        )
 897        return self._stdin_file
 898
 899    @property
 900    def log_text(self) -> Optional[str]:
 901        """
 902        Read the log files and return their contents.
 903        Returns `None` if the log file does not exist.
 904        """
 905        new_rotating_log = RotatingFile(
 906            self.rotating_log.file_path,
 907            num_files_to_keep = self.rotating_log.num_files_to_keep,
 908            max_file_size = self.rotating_log.max_file_size,
 909            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'),
 910            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'),
 911        )
 912        return new_rotating_log.read()
 913
 914    def readlines(self) -> List[str]:
 915        """
 916        Read the next log lines, persisting the cursor for later use.
 917        Note this will alter the cursor of `self.rotating_log`.
 918        """
 919        self.rotating_log._cursor = self._read_log_offset()
 920        lines = self.rotating_log.readlines()
 921        self._write_log_offset()
 922        return lines
 923
 924    def _read_log_offset(self) -> Tuple[int, int]:
 925        """
 926        Return the current log offset cursor.
 927
 928        Returns
 929        -------
 930        A tuple of the form (`subfile_index`, `position`).
 931        """
 932        if not self.log_offset_path.exists():
 933            return 0, 0
 934
 935        with open(self.log_offset_path, 'r', encoding='utf-8') as f:
 936            cursor_text = f.read()
 937        cursor_parts = cursor_text.split(' ')
 938        subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
 939        return subfile_index, subfile_position
 940
 941    def _write_log_offset(self) -> None:
 942        """
 943        Write the current log offset file.
 944        """
 945        with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
 946            subfile_index = self.rotating_log._cursor[0]
 947            subfile_position = self.rotating_log._cursor[1]
 948            f.write(f"{subfile_index} {subfile_position}")
 949
 950    @property
 951    def pid(self) -> Union[int, None]:
 952        """
 953        Read the PID file and return its contents.
 954        Returns `None` if the PID file does not exist.
 955        """
 956        if not self.pid_path.exists():
 957            return None
 958        try:
 959            with open(self.pid_path, 'r', encoding='utf-8') as f:
 960                text = f.read()
 961            if len(text) == 0:
 962                return None
 963            pid = int(text.rstrip())
 964        except Exception as e:
 965            warn(e)
 966            text = None
 967            pid = None
 968        return pid
 969
 970    @property
 971    def pid_path(self) -> pathlib.Path:
 972        """
 973        Return the path to a file containing the PID for this Daemon.
 974        """
 975        return self.path / 'process.pid'
 976
 977    @property
 978    def pid_lock(self) -> 'fasteners.InterProcessLock':
 979        """
 980        Return the process lock context manager.
 981        """
 982        if '_pid_lock' in self.__dict__:
 983            return self._pid_lock
 984
 985        fasteners = attempt_import('fasteners')
 986        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
 987        return self._pid_lock
 988
 989    @property
 990    def pickle_path(self) -> pathlib.Path:
 991        """
 992        Return the path for the pickle file.
 993        """
 994        return self.path / 'pickle.pkl'
 995
 996    def read_properties(self) -> Optional[Dict[str, Any]]:
 997        """Read the properties JSON file and return the dictionary."""
 998        if not self.properties_path.exists():
 999            return None
1000        try:
1001            with open(self.properties_path, 'r', encoding='utf-8') as file:
1002                properties = json.load(file)
1003        except Exception as e:
1004            properties = {}
1005        
1006        return properties
1007
1008    def read_pickle(self) -> Daemon:
1009        """Read a Daemon's pickle file and return the `Daemon`."""
1010        import pickle, traceback
1011        if not self.pickle_path.exists():
1012            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1013
1014        if self.pickle_path.stat().st_size == 0:
1015            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1016
1017        try:
1018            with open(self.pickle_path, 'rb') as pickle_file:
1019                daemon = pickle.load(pickle_file)
1020            success, msg = True, 'Success'
1021        except Exception as e:
1022            success, msg = False, str(e)
1023            daemon = None
1024            traceback.print_exception(type(e), e, e.__traceback__)
1025        if not success:
1026            error(msg)
1027        return daemon
1028
1029    @property
1030    def properties(self) -> Dict[str, Any]:
1031        """
1032        Return the contents of the properties JSON file.
1033        """
1034        try:
1035            _file_properties = self.read_properties()
1036        except Exception:
1037            traceback.print_exc()
1038            _file_properties = {}
1039
1040        if not self._properties:
1041            self._properties = _file_properties
1042
1043        if self._properties is None:
1044            self._properties = {}
1045
1046        if _file_properties is not None:
1047            self._properties = apply_patch_to_config(
1048                _file_properties,
1049                self._properties,
1050            )
1051
1052        return self._properties
1053
1054    @property
1055    def hidden(self) -> bool:
1056        """
1057        Return a bool indicating whether this Daemon should be displayed.
1058        """
1059        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1060
1061    def write_properties(self) -> SuccessTuple:
1062        """Write the properties dictionary to the properties JSON file
1063        (only if self.properties exists).
1064        """
1065        success, msg = (
1066            False,
1067            f"No properties to write for daemon '{self.daemon_id}'."
1068        )
1069        if self.properties is not None:
1070            try:
1071                self.path.mkdir(parents=True, exist_ok=True)
1072                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1073                    json.dump(self.properties, properties_file)
1074                success, msg = True, 'Success'
1075            except Exception as e:
1076                success, msg = False, str(e)
1077        return success, msg
1078
1079    def write_pickle(self) -> SuccessTuple:
1080        """Write the pickle file for the daemon."""
1081        import pickle, traceback
1082        try:
1083            self.path.mkdir(parents=True, exist_ok=True)
1084            with open(self.pickle_path, 'wb+') as pickle_file:
1085                pickle.dump(self, pickle_file)
1086            success, msg = True, "Success"
1087        except Exception as e:
1088            success, msg = False, str(e)
1089            traceback.print_exception(type(e), e, e.__traceback__)
1090        return success, msg
1091
1092
1093    def _setup(
1094        self,
1095        allow_dirty_run: bool = False,
1096    ) -> None:
1097        """
1098        Update properties before starting the Daemon.
1099        """
1100        if self.properties is None:
1101            self._properties = {}
1102
1103        self._properties.update({
1104            'target': {
1105                'name': self.target.__name__,
1106                'module': self.target.__module__,
1107                'args': self.target_args,
1108                'kw': self.target_kw,
1109            },
1110        })
1111        self.mkdir_if_not_exists(allow_dirty_run)
1112        _write_properties_success_tuple = self.write_properties()
1113        if not _write_properties_success_tuple[0]:
1114            error(_write_properties_success_tuple[1])
1115
1116        _write_pickle_success_tuple = self.write_pickle()
1117        if not _write_pickle_success_tuple[0]:
1118            error(_write_pickle_success_tuple[1])
1119
1120    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1121        """
1122        Remove a daemon's directory after execution.
1123
1124        Parameters
1125        ----------
1126        keep_logs: bool, default False
1127            If `True`, skip deleting the daemon's log files.
1128
1129        Returns
1130        -------
1131        A `SuccessTuple` indicating success.
1132        """
1133        if self.path.exists():
1134            try:
1135                shutil.rmtree(self.path)
1136            except Exception as e:
1137                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1138                warn(msg)
1139                return False, msg
1140        if not keep_logs:
1141            self.rotating_log.delete()
1142            try:
1143                if self.log_offset_path.exists():
1144                    self.log_offset_path.unlink()
1145            except Exception as e:
1146                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1147                warn(msg)
1148                return False, msg
1149        return True, "Success"
1150
1151
1152    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1153        """
1154        Return the timeout value to use. Use `--timeout-seconds` if provided,
1155        else the configured default (8).
1156        """
1157        if isinstance(timeout, (int, float)):
1158            return timeout
1159        return get_config('jobs', 'timeout_seconds')
1160
1161
1162    def get_check_timeout_interval_seconds(
1163        self,
1164        check_timeout_interval: Union[int, float, None] = None,
1165    ) -> Union[int, float]:
1166        """
1167        Return the interval value to check the status of timeouts.
1168        """
1169        if isinstance(check_timeout_interval, (int, float)):
1170            return check_timeout_interval
1171        return get_config('jobs', 'check_timeout_interval_seconds')
1172
1173    @property
1174    def target_args(self) -> Union[Tuple[Any], None]:
1175        """
1176        Return the positional arguments to pass to the target function.
1177        """
1178        target_args = (
1179            self.__dict__.get('_target_args', None)
1180            or self.properties.get('target', {}).get('args', None)
1181        )
1182        if target_args is None:
1183            return tuple([])
1184
1185        return tuple(target_args)
1186
1187    @property
1188    def target_kw(self) -> Union[Dict[str, Any], None]:
1189        """
1190        Return the keyword arguments to pass to the target function.
1191        """
1192        target_kw = (
1193            self.__dict__.get('_target_kw', None)
1194            or self.properties.get('target', {}).get('kw', None)
1195        )
1196        if target_kw is None:
1197            return {}
1198
1199        return {key: val for key, val in target_kw.items()}
1200
1201    def __getstate__(self):
1202        """
1203        Pickle this Daemon.
1204        """
1205        dill = attempt_import('dill')
1206        return {
1207            'target': dill.dumps(self.target),
1208            'target_args': self.target_args,
1209            'target_kw': self.target_kw,
1210            'daemon_id': self.daemon_id,
1211            'label': self.label,
1212            'properties': self.properties,
1213        }
1214
1215    def __setstate__(self, _state: Dict[str, Any]):
1216        """
1217        Restore this Daemon from a pickled state.
1218        If the properties file exists, skip the old pickled version.
1219        """
1220        dill = attempt_import('dill')
1221        _state['target'] = dill.loads(_state['target'])
1222        self._pickle = True
1223        daemon_id = _state.get('daemon_id', None)
1224        if not daemon_id:
1225            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1226
1227        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1228        ignore_properties = properties_path.exists()
1229        if ignore_properties:
1230            _state = {
1231                key: val
1232                for key, val in _state.items()
1233                if key != 'properties'
1234            }
1235        self.__init__(**_state)
1236
1237
1238    def __repr__(self):
1239        return str(self)
1240
1241    def __str__(self):
1242        return self.daemon_id
1243
1244    def __eq__(self, other):
1245        if not isinstance(other, Daemon):
1246            return False
1247        return self.daemon_id == other.daemon_id
1248
1249    def __hash__(self):
1250        return hash(self.daemon_id)
class Daemon:
  46class Daemon:
  47    """
  48    Daemonize Python functions into background processes.
  49
  50    Examples
  51    --------
  52    >>> import meerschaum as mrsm
  53    >>> from meerschaum.utils.daemons import Daemon
  54    >>> daemon = Daemon(print, ('hi',))
  55    >>> success, msg = daemon.run()
  56    >>> print(daemon.log_text)
  57
  58    2024-07-29 18:03 | hi
  59    2024-07-29 18:03 |
  60    >>> daemon.run(allow_dirty_run=True)
  61    >>> print(daemon.log_text)
  62
  63    2024-07-29 18:03 | hi
  64    2024-07-29 18:03 |
  65    2024-07-29 18:05 | hi
  66    2024-07-29 18:05 |
  67    >>> mrsm.pprint(daemon.properties)
  68    {
  69        'label': 'print',
  70        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  71        'result': None,
  72        'process': {'ended': '2024-07-29T18:03:33.752806'}
  73    }
  74
  75    """
  76
  77    def __new__(
  78        cls,
  79        *args,
  80        daemon_id: Optional[str] = None,
  81        **kw
  82    ):
  83        """
  84        If a daemon_id is provided and already exists, read from its pickle file.
  85        """
  86        instance = super(Daemon, cls).__new__(cls)
  87        if daemon_id is not None:
  88            instance.daemon_id = daemon_id
  89            if instance.pickle_path.exists():
  90                instance = instance.read_pickle()
  91        return instance
  92
  93    @classmethod
  94    def from_properties_file(cls, daemon_id: str) -> Daemon:
  95        """
  96        Return a Daemon from a properties dictionary.
  97        """
  98        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  99        if not properties_path.exists():
 100            raise OSError(f"Properties file '{properties_path}' does not exist.")
 101
 102        try:
 103            with open(properties_path, 'r', encoding='utf-8') as f:
 104                properties = json.load(f)
 105        except Exception:
 106            properties = {}
 107
 108        if not properties:
 109            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 110
 111        daemon_id = properties_path.parent.name
 112        target_cf = properties.get('target', {})
 113        target_module_name = target_cf.get('module', None)
 114        target_function_name = target_cf.get('name', None)
 115        target_args = target_cf.get('args', None)
 116        target_kw = target_cf.get('kw', None)
 117        label = properties.get('label', None)
 118
 119        if None in [
 120            target_module_name,
 121            target_function_name,
 122            target_args,
 123            target_kw,
 124        ]:
 125            raise ValueError("Missing target function information.")
 126
 127        target_module = importlib.import_module(target_module_name)
 128        target_function = getattr(target_module, target_function_name)
 129
 130        return Daemon(
 131            daemon_id=daemon_id,
 132            target=target_function,
 133            target_args=target_args,
 134            target_kw=target_kw,
 135            properties=properties,
 136            label=label,
 137        )
 138
 139
 140    def __init__(
 141        self,
 142        target: Optional[Callable[[Any], Any]] = None,
 143        target_args: Union[List[Any], Tuple[Any], None] = None,
 144        target_kw: Optional[Dict[str, Any]] = None,
 145        env: Optional[Dict[str, str]] = None,
 146        daemon_id: Optional[str] = None,
 147        label: Optional[str] = None,
 148        properties: Optional[Dict[str, Any]] = None,
 149    ):
 150        """
 151        Parameters
 152        ----------
 153        target: Optional[Callable[[Any], Any]], default None,
 154            The function to execute in a child process.
 155
 156        target_args: Union[List[Any], Tuple[Any], None], default None
 157            Positional arguments to pass to the target function.
 158
 159        target_kw: Optional[Dict[str, Any]], default None
 160            Keyword arguments to pass to the target function.
 161
 162        env: Optional[Dict[str, str]], default None
 163            If provided, set these environment variables in the daemon process.
 164
 165        daemon_id: Optional[str], default None
 166            Build a `Daemon` from an existing `daemon_id`.
 167            If `daemon_id` is provided, other arguments are ignored and are derived
 168            from the existing pickled `Daemon`.
 169
 170        label: Optional[str], default None
 171            Label string to help identifiy a daemon.
 172            If `None`, use the function name instead.
 173
 174        properties: Optional[Dict[str, Any]], default None
 175            Override reading from the properties JSON by providing an existing dictionary.
 176        """
 177        _pickle = self.__dict__.get('_pickle', False)
 178        if daemon_id is not None:
 179            self.daemon_id = daemon_id
 180            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 181
 182                if not self.properties_path.exists():
 183                    raise Exception(
 184                        f"Daemon '{self.daemon_id}' does not exist. "
 185                        + "Pass a target to create a new Daemon."
 186                    )
 187
 188                try:
 189                    new_daemon = self.from_properties_file(daemon_id)
 190                except Exception:
 191                    new_daemon = None
 192
 193                if new_daemon is not None:
 194                    new_daemon.write_pickle()
 195                    target = new_daemon.target
 196                    target_args = new_daemon.target_args
 197                    target_kw = new_daemon.target_kw
 198                    label = new_daemon.label
 199                    self._properties = new_daemon.properties
 200                else:
 201                    try:
 202                        self.properties_path.unlink()
 203                    except Exception:
 204                        pass
 205
 206                    raise Exception(
 207                        f"Could not recover daemon '{self.daemon_id}' "
 208                        + "from its properties file."
 209                    )
 210
 211        if 'target' not in self.__dict__:
 212            if target is None:
 213                error("Cannot create a Daemon without a target.")
 214            self.target = target
 215
 216        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 217        if '_target_args' not in self.__dict__:
 218            self._target_args = target_args
 219        if '_target_kw' not in self.__dict__:
 220            self._target_kw = target_kw
 221
 222        if 'label' not in self.__dict__:
 223            if label is None:
 224                label = (
 225                    self.target.__name__ if '__name__' in self.target.__dir__()
 226                        else str(self.target)
 227                )
 228            self.label = label
 229        if 'daemon_id' not in self.__dict__:
 230            self.daemon_id = get_new_daemon_name()
 231        if '_properties' not in self.__dict__:
 232            self._properties = properties
 233        if self._properties is None:
 234            self._properties = {}
 235
 236        self._properties.update({'label': self.label})
 237        if env:
 238            self._properties.update({'env': env})
 239
 240        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 241        _ = self.process
 242
 243
 244    def _run_exit(
 245        self,
 246        keep_daemon_output: bool = True,
 247        allow_dirty_run: bool = False,
 248    ) -> Any:
 249        """Run the daemon's target function.
 250        NOTE: This WILL EXIT the parent process!
 251
 252        Parameters
 253        ----------
 254        keep_daemon_output: bool, default True
 255            If `False`, delete the daemon's output directory upon exiting.
 256
 257        allow_dirty_run, bool, default False:
 258            If `True`, run the daemon, even if the `daemon_id` directory exists.
 259            This option is dangerous because if the same `daemon_id` runs twice,
 260            the last to finish will overwrite the output of the first.
 261
 262        Returns
 263        -------
 264        Nothing — this will exit the parent process.
 265        """
 266        import platform, sys, os, traceback
 267        from meerschaum.utils.warnings import warn
 268        from meerschaum.config import get_config
 269        daemon = attempt_import('daemon')
 270        lines = get_config('jobs', 'terminal', 'lines')
 271        columns = get_config('jobs', 'terminal', 'columns')
 272
 273        if platform.system() == 'Windows':
 274            return False, "Windows is no longer supported."
 275
 276        self._setup(allow_dirty_run)
 277
 278        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 279        ###       KeyboardInterrupts themselves.
 280        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 281        self._daemon_context = daemon.DaemonContext(
 282            pidfile=self.pid_lock,
 283            stdout=self.rotating_log,
 284            stderr=self.rotating_log,
 285            working_directory=os.getcwd(),
 286            detach_process=True,
 287            files_preserve=list(self.rotating_log.subfile_objects.values()),
 288            signal_map={
 289                signal.SIGTERM: self._handle_sigterm,
 290            },
 291        )
 292
 293        _daemons.append(self)
 294
 295        log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 296        self._log_refresh_timer = RepeatTimer(
 297            log_refresh_seconds,
 298            partial(self.rotating_log.refresh_files, start_interception=True),
 299        )
 300
 301        try:
 302            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 303            with self._daemon_context:
 304                sys.stdin = self.stdin_file
 305                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 306                os.environ['PYTHONUNBUFFERED'] = '1'
 307
 308                ### Allow the user to override environment variables.
 309                env = self.properties.get('env', {})
 310                if env and isinstance(env, dict):
 311                    os.environ.update({str(k): str(v) for k, v in env.items()})
 312
 313                self.rotating_log.refresh_files(start_interception=True)
 314                result = None
 315                try:
 316                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 317                        f.write(str(os.getpid()))
 318
 319                    ### NOTE: The timer fails to start for remote actions to localhost.
 320                    try:
 321                        if not self._log_refresh_timer.is_running():
 322                            self._log_refresh_timer.start()
 323                    except Exception:
 324                        pass
 325
 326                    self.properties['result'] = None
 327                    self._capture_process_timestamp('began')
 328                    result = self.target(*self.target_args, **self.target_kw)
 329                    self.properties['result'] = result
 330                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 331                    pass
 332                except Exception as e:
 333                    warn(
 334                        f"Exception in daemon target function: {traceback.format_exc()}",
 335                    )
 336                    result = e
 337                finally:
 338                    _results[self.daemon_id] = result
 339
 340                    if keep_daemon_output:
 341                        self._capture_process_timestamp('ended')
 342                    else:
 343                        self.cleanup()
 344
 345                    self._log_refresh_timer.cancel()
 346                    if self.pid is None and self.pid_path.exists():
 347                        self.pid_path.unlink()
 348
 349                    if is_success_tuple(result):
 350                        try:
 351                            mrsm.pprint(result)
 352                        except BrokenPipeError:
 353                            pass
 354
 355        except Exception:
 356            daemon_error = traceback.format_exc()
 357            with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 358                f.write(daemon_error)
 359            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 360
 361    def _capture_process_timestamp(
 362        self,
 363        process_key: str,
 364        write_properties: bool = True,
 365    ) -> None:
 366        """
 367        Record the current timestamp to the parameters `process:<process_key>`.
 368
 369        Parameters
 370        ----------
 371        process_key: str
 372            Under which key to store the timestamp.
 373
 374        write_properties: bool, default True
 375            If `True` persist the properties to disk immediately after capturing the timestamp.
 376        """
 377        if 'process' not in self.properties:
 378            self.properties['process'] = {}
 379
 380        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 381            raise ValueError(f"Invalid key '{process_key}'.")
 382
 383        self.properties['process'][process_key] = (
 384            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 385        )
 386        if write_properties:
 387            self.write_properties()
 388
 389    def run(
 390        self,
 391        keep_daemon_output: bool = True,
 392        allow_dirty_run: bool = False,
 393        debug: bool = False,
 394    ) -> SuccessTuple:
 395        """Run the daemon as a child process and continue executing the parent.
 396
 397        Parameters
 398        ----------
 399        keep_daemon_output: bool, default True
 400            If `False`, delete the daemon's output directory upon exiting.
 401
 402        allow_dirty_run: bool, default False
 403            If `True`, run the daemon, even if the `daemon_id` directory exists.
 404            This option is dangerous because if the same `daemon_id` runs concurrently,
 405            the last to finish will overwrite the output of the first.
 406
 407        Returns
 408        -------
 409        A SuccessTuple indicating success.
 410
 411        """
 412        import platform
 413        if platform.system() == 'Windows':
 414            return False, "Cannot run background jobs on Windows."
 415
 416        ### The daemon might exist and be paused.
 417        if self.status == 'paused':
 418            return self.resume()
 419
 420        self._remove_stop_file()
 421        if self.status == 'running':
 422            return True, f"Daemon '{self}' is already running."
 423
 424        self.mkdir_if_not_exists(allow_dirty_run)
 425        _write_pickle_success_tuple = self.write_pickle()
 426        if not _write_pickle_success_tuple[0]:
 427            return _write_pickle_success_tuple
 428
 429        _launch_daemon_code = (
 430            "from meerschaum.utils.daemon import Daemon; "
 431            + f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 432            + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 433            + "allow_dirty_run=True)"
 434        )
 435        env = dict(os.environ)
 436        env['MRSM_NOASK'] = 'true'
 437        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 438        msg = (
 439            "Success"
 440            if _launch_success_bool
 441            else f"Failed to start daemon '{self.daemon_id}'."
 442        )
 443        return _launch_success_bool, msg
 444
 445    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 446        """
 447        Forcibly terminate a running daemon.
 448        Sends a SIGTERM signal to the process.
 449
 450        Parameters
 451        ----------
 452        timeout: Optional[int], default 3
 453            How many seconds to wait for the process to terminate.
 454
 455        Returns
 456        -------
 457        A SuccessTuple indicating success.
 458        """
 459        if self.status != 'paused':
 460            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 461            if success:
 462                self._write_stop_file('kill')
 463                return success, msg
 464
 465        if self.status == 'stopped':
 466            self._write_stop_file('kill')
 467            return True, "Process has already stopped."
 468
 469        process = self.process
 470        try:
 471            process.terminate()
 472            process.kill()
 473            process.wait(timeout=timeout)
 474        except Exception as e:
 475            return False, f"Failed to kill job {self} with exception: {e}"
 476
 477        if self.pid_path.exists():
 478            try:
 479                self.pid_path.unlink()
 480            except Exception as e:
 481                pass
 482
 483        self._write_stop_file('kill')
 484        return True, "Success"
 485
 486    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 487        """Gracefully quit a running daemon."""
 488        if self.status == 'paused':
 489            return self.kill(timeout)
 490
 491        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 492        if signal_success:
 493            self._write_stop_file('quit')
 494        return signal_success, signal_msg
 495
 496    def pause(
 497        self,
 498        timeout: Union[int, float, None] = None,
 499        check_timeout_interval: Union[float, int, None] = None,
 500    ) -> SuccessTuple:
 501        """
 502        Pause the daemon if it is running.
 503
 504        Parameters
 505        ----------
 506        timeout: Union[float, int, None], default None
 507            The maximum number of seconds to wait for a process to suspend.
 508
 509        check_timeout_interval: Union[float, int, None], default None
 510            The number of seconds to wait between checking if the process is still running.
 511
 512        Returns
 513        -------
 514        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 515        """
 516        if self.process is None:
 517            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 518
 519        if self.status == 'paused':
 520            return True, f"Daemon '{self.daemon_id}' is already paused."
 521
 522        self._write_stop_file('pause')
 523        try:
 524            self.process.suspend()
 525        except Exception as e:
 526            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 527
 528        timeout = self.get_timeout_seconds(timeout)
 529        check_timeout_interval = self.get_check_timeout_interval_seconds(
 530            check_timeout_interval
 531        )
 532
 533        psutil = attempt_import('psutil')
 534
 535        if not timeout:
 536            try:
 537                success = self.process.status() == 'stopped'
 538            except psutil.NoSuchProcess as e:
 539                success = True
 540            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 541            if success:
 542                self._capture_process_timestamp('paused')
 543            return success, msg
 544
 545        begin = time.perf_counter()
 546        while (time.perf_counter() - begin) < timeout:
 547            try:
 548                if self.process.status() == 'stopped':
 549                    self._capture_process_timestamp('paused')
 550                    return True, "Success"
 551            except psutil.NoSuchProcess as e:
 552                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 553            time.sleep(check_timeout_interval)
 554
 555        return False, (
 556            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 557            + ('s' if timeout != 1 else '') + '.'
 558        )
 559
 560    def resume(
 561        self,
 562        timeout: Union[int, float, None] = None,
 563        check_timeout_interval: Union[float, int, None] = None,
 564    ) -> SuccessTuple:
 565        """
 566        Resume the daemon if it is paused.
 567
 568        Parameters
 569        ----------
 570        timeout: Union[float, int, None], default None
 571            The maximum number of seconds to wait for a process to resume.
 572
 573        check_timeout_interval: Union[float, int, None], default None
 574            The number of seconds to wait between checking if the process is still stopped.
 575
 576        Returns
 577        -------
 578        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 579        """
 580        if self.status == 'running':
 581            return True, f"Daemon '{self.daemon_id}' is already running."
 582
 583        if self.status == 'stopped':
 584            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 585
 586        self._remove_stop_file()
 587        try:
 588            self.process.resume()
 589        except Exception as e:
 590            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 591
 592        timeout = self.get_timeout_seconds(timeout)
 593        check_timeout_interval = self.get_check_timeout_interval_seconds(
 594            check_timeout_interval
 595        )
 596
 597        if not timeout:
 598            success = self.status == 'running'
 599            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 600            if success:
 601                self._capture_process_timestamp('began')
 602            return success, msg
 603
 604        begin = time.perf_counter()
 605        while (time.perf_counter() - begin) < timeout:
 606            if self.status == 'running':
 607                self._capture_process_timestamp('began')
 608                return True, "Success"
 609            time.sleep(check_timeout_interval)
 610
 611        return False, (
 612            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 613            + ('s' if timeout != 1 else '') + '.'
 614        )
 615
 616    def _write_stop_file(self, action: str) -> SuccessTuple:
 617        """Write the stop file timestamp and action."""
 618        if action not in ('quit', 'kill', 'pause'):
 619            return False, f"Unsupported action '{action}'."
 620
 621        if not self.stop_path.parent.exists():
 622            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 623
 624        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 625            json.dump(
 626                {
 627                    'stop_time': datetime.now(timezone.utc).isoformat(),
 628                    'action': action,
 629                },
 630                f
 631            )
 632
 633        return True, "Success"
 634
 635    def _remove_stop_file(self) -> SuccessTuple:
 636        """Remove the stop file"""
 637        if not self.stop_path.exists():
 638            return True, "Stop file does not exist."
 639
 640        try:
 641            self.stop_path.unlink()
 642        except Exception as e:
 643            return False, f"Failed to remove stop file:\n{e}"
 644
 645        return True, "Success"
 646
 647    def _read_stop_file(self) -> Dict[str, Any]:
 648        """
 649        Read the stop file if it exists.
 650        """
 651        if not self.stop_path.exists():
 652            return {}
 653
 654        try:
 655            with open(self.stop_path, 'r', encoding='utf-8') as f:
 656                data = json.load(f)
 657            return data
 658        except Exception:
 659            return {}
 660
 661    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 662        """
 663        Handle `SIGTERM` within the `Daemon` context.
 664        This method is injected into the `DaemonContext`.
 665        """
 666        from meerschaum.utils.process import signal_handler
 667        signal_handler(signal_number, stack_frame)
 668
 669        timer = self.__dict__.get('_log_refresh_timer', None)
 670        if timer is not None:
 671            timer.cancel()
 672
 673        daemon_context = self.__dict__.get('_daemon_context', None)
 674        if daemon_context is not None:
 675            daemon_context.close()
 676
 677        _close_pools()
 678        raise SystemExit(0)
 679
 680    def _send_signal(
 681            self,
 682            signal_to_send,
 683            timeout: Union[float, int, None] = None,
 684            check_timeout_interval: Union[float, int, None] = None,
 685        ) -> SuccessTuple:
 686        """Send a signal to the daemon process.
 687
 688        Parameters
 689        ----------
 690        signal_to_send:
 691            The signal the send to the daemon, e.g. `signals.SIGINT`.
 692
 693        timeout: Union[float, int, None], default None
 694            The maximum number of seconds to wait for a process to terminate.
 695
 696        check_timeout_interval: Union[float, int, None], default None
 697            The number of seconds to wait between checking if the process is still running.
 698
 699        Returns
 700        -------
 701        A SuccessTuple indicating success.
 702        """
 703        try:
 704            pid = self.pid
 705            if pid is None:
 706                return (
 707                    False,
 708                    f"Daemon '{self.daemon_id}' is not running, "
 709                    + f"cannot send signal '{signal_to_send}'."
 710                )
 711            
 712            os.kill(pid, signal_to_send)
 713        except Exception as e:
 714            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 715
 716        timeout = self.get_timeout_seconds(timeout)
 717        check_timeout_interval = self.get_check_timeout_interval_seconds(
 718            check_timeout_interval
 719        )
 720
 721        if not timeout:
 722            return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'."
 723
 724        begin = time.perf_counter()
 725        while (time.perf_counter() - begin) < timeout:
 726            if not self.status == 'running':
 727                return True, "Success"
 728            time.sleep(check_timeout_interval)
 729
 730        return False, (
 731            f"Failed to stop daemon '{self.daemon_id}' within {timeout} second"
 732            + ('s' if timeout != 1 else '') + '.'
 733        )
 734
 735    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 736        """Create the Daemon's directory.
 737        If `allow_dirty_run` is `False` and the directory already exists,
 738        raise a `FileExistsError`.
 739        """
 740        try:
 741            self.path.mkdir(parents=True, exist_ok=True)
 742            _already_exists = any(os.scandir(self.path))
 743        except FileExistsError:
 744            _already_exists = True
 745
 746        if _already_exists and not allow_dirty_run:
 747            error(
 748                f"Daemon '{self.daemon_id}' already exists. " +
 749                f"To allow this daemon to run, do one of the following:\n"
 750                + "  - Execute `daemon.cleanup()`.\n"
 751                + f"  - Delete the directory '{self.path}'.\n"
 752                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 753                FileExistsError,
 754            )
 755
 756    @property
 757    def process(self) -> Union['psutil.Process', None]:
 758        """
 759        Return the psutil process for the Daemon.
 760        """
 761        psutil = attempt_import('psutil')
 762        pid = self.pid
 763        if pid is None:
 764            return None
 765        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 766            try:
 767                self._process = psutil.Process(int(pid))
 768            except Exception as e:
 769                if self.pid_path.exists():
 770                    self.pid_path.unlink()
 771                return None
 772        return self._process
 773
 774    @property
 775    def status(self) -> str:
 776        """
 777        Return the running status of this Daemon.
 778        """
 779        if self.process is None:
 780            return 'stopped'
 781
 782        psutil = attempt_import('psutil')
 783        try:
 784            if self.process.status() == 'stopped':
 785                return 'paused'
 786            if self.process.status() == 'zombie':
 787                raise psutil.NoSuchProcess(self.process.pid)
 788        except (psutil.NoSuchProcess, AttributeError):
 789            if self.pid_path.exists():
 790                try:
 791                    self.pid_path.unlink()
 792                except Exception as e:
 793                    pass
 794            return 'stopped'
 795
 796        return 'running'
 797
 798    @classmethod
 799    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 800        """
 801        Return a Daemon's path from its `daemon_id`.
 802        """
 803        return DAEMON_RESOURCES_PATH / daemon_id
 804
 805    @property
 806    def path(self) -> pathlib.Path:
 807        """
 808        Return the path for this Daemon's directory.
 809        """
 810        return self._get_path_from_daemon_id(self.daemon_id)
 811
 812    @classmethod
 813    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 814        """
 815        Return the `properties.json` path for a given `daemon_id`.
 816        """
 817        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 818
 819    @property
 820    def properties_path(self) -> pathlib.Path:
 821        """
 822        Return the `propterties.json` path for this Daemon.
 823        """
 824        return self._get_properties_path_from_daemon_id(self.daemon_id)
 825
 826    @property
 827    def stop_path(self) -> pathlib.Path:
 828        """
 829        Return the path for the stop file (created when manually stopped).
 830        """
 831        return self.path / '.stop.json'
 832
 833    @property
 834    def log_path(self) -> pathlib.Path:
 835        """
 836        Return the log path.
 837        """
 838        return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 839
 840    @property
 841    def stdin_file_path(self) -> pathlib.Path:
 842        """
 843        Return the stdin file path.
 844        """
 845        return self.path / 'input.stdin'
 846
 847    @property
 848    def blocking_stdin_file_path(self) -> pathlib.Path:
 849        """
 850        Return the stdin file path.
 851        """
 852        if '_blocking_stdin_file_path' in self.__dict__:
 853            return self._blocking_stdin_file_path
 854
 855        return self.path / 'input.stdin.block'
 856
 857    @property
 858    def log_offset_path(self) -> pathlib.Path:
 859        """
 860        Return the log offset file path.
 861        """
 862        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 863
 864    @property
 865    def rotating_log(self) -> RotatingFile:
 866        """
 867        The rotating log file for the daemon's output.
 868        """
 869        if '_rotating_log' in self.__dict__:
 870            return self._rotating_log
 871
 872        write_timestamps = (
 873            self.properties.get('logs', {}).get('write_timestamps', None)
 874        )
 875        if write_timestamps is None:
 876            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 877
 878        self._rotating_log = RotatingFile(
 879            self.log_path,
 880            redirect_streams=True,
 881            write_timestamps=write_timestamps,
 882            timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'),
 883        )
 884        return self._rotating_log
 885
 886    @property
 887    def stdin_file(self):
 888        """
 889        Return the file handler for the stdin file.
 890        """
 891        if '_stdin_file' in self.__dict__:
 892            return self._stdin_file
 893
 894        self._stdin_file = StdinFile(
 895            self.stdin_file_path,
 896            lock_file_path=self.blocking_stdin_file_path,
 897        )
 898        return self._stdin_file
 899
 900    @property
 901    def log_text(self) -> Optional[str]:
 902        """
 903        Read the log files and return their contents.
 904        Returns `None` if the log file does not exist.
 905        """
 906        new_rotating_log = RotatingFile(
 907            self.rotating_log.file_path,
 908            num_files_to_keep = self.rotating_log.num_files_to_keep,
 909            max_file_size = self.rotating_log.max_file_size,
 910            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'),
 911            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'),
 912        )
 913        return new_rotating_log.read()
 914
 915    def readlines(self) -> List[str]:
 916        """
 917        Read the next log lines, persisting the cursor for later use.
 918        Note this will alter the cursor of `self.rotating_log`.
 919        """
 920        self.rotating_log._cursor = self._read_log_offset()
 921        lines = self.rotating_log.readlines()
 922        self._write_log_offset()
 923        return lines
 924
 925    def _read_log_offset(self) -> Tuple[int, int]:
 926        """
 927        Return the current log offset cursor.
 928
 929        Returns
 930        -------
 931        A tuple of the form (`subfile_index`, `position`).
 932        """
 933        if not self.log_offset_path.exists():
 934            return 0, 0
 935
 936        with open(self.log_offset_path, 'r', encoding='utf-8') as f:
 937            cursor_text = f.read()
 938        cursor_parts = cursor_text.split(' ')
 939        subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
 940        return subfile_index, subfile_position
 941
 942    def _write_log_offset(self) -> None:
 943        """
 944        Write the current log offset file.
 945        """
 946        with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
 947            subfile_index = self.rotating_log._cursor[0]
 948            subfile_position = self.rotating_log._cursor[1]
 949            f.write(f"{subfile_index} {subfile_position}")
 950
 951    @property
 952    def pid(self) -> Union[int, None]:
 953        """
 954        Read the PID file and return its contents.
 955        Returns `None` if the PID file does not exist.
 956        """
 957        if not self.pid_path.exists():
 958            return None
 959        try:
 960            with open(self.pid_path, 'r', encoding='utf-8') as f:
 961                text = f.read()
 962            if len(text) == 0:
 963                return None
 964            pid = int(text.rstrip())
 965        except Exception as e:
 966            warn(e)
 967            text = None
 968            pid = None
 969        return pid
 970
 971    @property
 972    def pid_path(self) -> pathlib.Path:
 973        """
 974        Return the path to a file containing the PID for this Daemon.
 975        """
 976        return self.path / 'process.pid'
 977
 978    @property
 979    def pid_lock(self) -> 'fasteners.InterProcessLock':
 980        """
 981        Return the process lock context manager.
 982        """
 983        if '_pid_lock' in self.__dict__:
 984            return self._pid_lock
 985
 986        fasteners = attempt_import('fasteners')
 987        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
 988        return self._pid_lock
 989
 990    @property
 991    def pickle_path(self) -> pathlib.Path:
 992        """
 993        Return the path for the pickle file.
 994        """
 995        return self.path / 'pickle.pkl'
 996
 997    def read_properties(self) -> Optional[Dict[str, Any]]:
 998        """Read the properties JSON file and return the dictionary."""
 999        if not self.properties_path.exists():
1000            return None
1001        try:
1002            with open(self.properties_path, 'r', encoding='utf-8') as file:
1003                properties = json.load(file)
1004        except Exception as e:
1005            properties = {}
1006        
1007        return properties
1008
1009    def read_pickle(self) -> Daemon:
1010        """Read a Daemon's pickle file and return the `Daemon`."""
1011        import pickle, traceback
1012        if not self.pickle_path.exists():
1013            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1014
1015        if self.pickle_path.stat().st_size == 0:
1016            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1017
1018        try:
1019            with open(self.pickle_path, 'rb') as pickle_file:
1020                daemon = pickle.load(pickle_file)
1021            success, msg = True, 'Success'
1022        except Exception as e:
1023            success, msg = False, str(e)
1024            daemon = None
1025            traceback.print_exception(type(e), e, e.__traceback__)
1026        if not success:
1027            error(msg)
1028        return daemon
1029
1030    @property
1031    def properties(self) -> Dict[str, Any]:
1032        """
1033        Return the contents of the properties JSON file.
1034        """
1035        try:
1036            _file_properties = self.read_properties()
1037        except Exception:
1038            traceback.print_exc()
1039            _file_properties = {}
1040
1041        if not self._properties:
1042            self._properties = _file_properties
1043
1044        if self._properties is None:
1045            self._properties = {}
1046
1047        if _file_properties is not None:
1048            self._properties = apply_patch_to_config(
1049                _file_properties,
1050                self._properties,
1051            )
1052
1053        return self._properties
1054
1055    @property
1056    def hidden(self) -> bool:
1057        """
1058        Return a bool indicating whether this Daemon should be displayed.
1059        """
1060        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1061
1062    def write_properties(self) -> SuccessTuple:
1063        """Write the properties dictionary to the properties JSON file
1064        (only if self.properties exists).
1065        """
1066        success, msg = (
1067            False,
1068            f"No properties to write for daemon '{self.daemon_id}'."
1069        )
1070        if self.properties is not None:
1071            try:
1072                self.path.mkdir(parents=True, exist_ok=True)
1073                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1074                    json.dump(self.properties, properties_file)
1075                success, msg = True, 'Success'
1076            except Exception as e:
1077                success, msg = False, str(e)
1078        return success, msg
1079
1080    def write_pickle(self) -> SuccessTuple:
1081        """Write the pickle file for the daemon."""
1082        import pickle, traceback
1083        try:
1084            self.path.mkdir(parents=True, exist_ok=True)
1085            with open(self.pickle_path, 'wb+') as pickle_file:
1086                pickle.dump(self, pickle_file)
1087            success, msg = True, "Success"
1088        except Exception as e:
1089            success, msg = False, str(e)
1090            traceback.print_exception(type(e), e, e.__traceback__)
1091        return success, msg
1092
1093
1094    def _setup(
1095        self,
1096        allow_dirty_run: bool = False,
1097    ) -> None:
1098        """
1099        Update properties before starting the Daemon.
1100        """
1101        if self.properties is None:
1102            self._properties = {}
1103
1104        self._properties.update({
1105            'target': {
1106                'name': self.target.__name__,
1107                'module': self.target.__module__,
1108                'args': self.target_args,
1109                'kw': self.target_kw,
1110            },
1111        })
1112        self.mkdir_if_not_exists(allow_dirty_run)
1113        _write_properties_success_tuple = self.write_properties()
1114        if not _write_properties_success_tuple[0]:
1115            error(_write_properties_success_tuple[1])
1116
1117        _write_pickle_success_tuple = self.write_pickle()
1118        if not _write_pickle_success_tuple[0]:
1119            error(_write_pickle_success_tuple[1])
1120
1121    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1122        """
1123        Remove a daemon's directory after execution.
1124
1125        Parameters
1126        ----------
1127        keep_logs: bool, default False
1128            If `True`, skip deleting the daemon's log files.
1129
1130        Returns
1131        -------
1132        A `SuccessTuple` indicating success.
1133        """
1134        if self.path.exists():
1135            try:
1136                shutil.rmtree(self.path)
1137            except Exception as e:
1138                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1139                warn(msg)
1140                return False, msg
1141        if not keep_logs:
1142            self.rotating_log.delete()
1143            try:
1144                if self.log_offset_path.exists():
1145                    self.log_offset_path.unlink()
1146            except Exception as e:
1147                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1148                warn(msg)
1149                return False, msg
1150        return True, "Success"
1151
1152
1153    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1154        """
1155        Return the timeout value to use. Use `--timeout-seconds` if provided,
1156        else the configured default (8).
1157        """
1158        if isinstance(timeout, (int, float)):
1159            return timeout
1160        return get_config('jobs', 'timeout_seconds')
1161
1162
1163    def get_check_timeout_interval_seconds(
1164        self,
1165        check_timeout_interval: Union[int, float, None] = None,
1166    ) -> Union[int, float]:
1167        """
1168        Return the interval value to check the status of timeouts.
1169        """
1170        if isinstance(check_timeout_interval, (int, float)):
1171            return check_timeout_interval
1172        return get_config('jobs', 'check_timeout_interval_seconds')
1173
1174    @property
1175    def target_args(self) -> Union[Tuple[Any], None]:
1176        """
1177        Return the positional arguments to pass to the target function.
1178        """
1179        target_args = (
1180            self.__dict__.get('_target_args', None)
1181            or self.properties.get('target', {}).get('args', None)
1182        )
1183        if target_args is None:
1184            return tuple([])
1185
1186        return tuple(target_args)
1187
1188    @property
1189    def target_kw(self) -> Union[Dict[str, Any], None]:
1190        """
1191        Return the keyword arguments to pass to the target function.
1192        """
1193        target_kw = (
1194            self.__dict__.get('_target_kw', None)
1195            or self.properties.get('target', {}).get('kw', None)
1196        )
1197        if target_kw is None:
1198            return {}
1199
1200        return {key: val for key, val in target_kw.items()}
1201
1202    def __getstate__(self):
1203        """
1204        Pickle this Daemon.
1205        """
1206        dill = attempt_import('dill')
1207        return {
1208            'target': dill.dumps(self.target),
1209            'target_args': self.target_args,
1210            'target_kw': self.target_kw,
1211            'daemon_id': self.daemon_id,
1212            'label': self.label,
1213            'properties': self.properties,
1214        }
1215
1216    def __setstate__(self, _state: Dict[str, Any]):
1217        """
1218        Restore this Daemon from a pickled state.
1219        If the properties file exists, skip the old pickled version.
1220        """
1221        dill = attempt_import('dill')
1222        _state['target'] = dill.loads(_state['target'])
1223        self._pickle = True
1224        daemon_id = _state.get('daemon_id', None)
1225        if not daemon_id:
1226            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1227
1228        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1229        ignore_properties = properties_path.exists()
1230        if ignore_properties:
1231            _state = {
1232                key: val
1233                for key, val in _state.items()
1234                if key != 'properties'
1235            }
1236        self.__init__(**_state)
1237
1238
1239    def __repr__(self):
1240        return str(self)
1241
1242    def __str__(self):
1243        return self.daemon_id
1244
1245    def __eq__(self, other):
1246        if not isinstance(other, Daemon):
1247            return False
1248        return self.daemon_id == other.daemon_id
1249
1250    def __hash__(self):
1251        return hash(self.daemon_id)

Daemonize Python functions into background processes.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 |

>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |

>>> mrsm.pprint(daemon.properties)
{
    'label': 'print',
    'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
    'result': None,
    'process': {'ended': '2024-07-29T18:03:33.752806'}
}
Daemon( target: Optional[Callable[[Any], Any]] = None, target_args: Union[List[Any], Tuple[Any], NoneType] = None, target_kw: Optional[Dict[str, Any]] = None, env: Optional[Dict[str, str]] = None, daemon_id: Optional[str] = None, label: Optional[str] = None, properties: Optional[Dict[str, Any]] = None)
140    def __init__(
141        self,
142        target: Optional[Callable[[Any], Any]] = None,
143        target_args: Union[List[Any], Tuple[Any], None] = None,
144        target_kw: Optional[Dict[str, Any]] = None,
145        env: Optional[Dict[str, str]] = None,
146        daemon_id: Optional[str] = None,
147        label: Optional[str] = None,
148        properties: Optional[Dict[str, Any]] = None,
149    ):
150        """
151        Parameters
152        ----------
153        target: Optional[Callable[[Any], Any]], default None,
154            The function to execute in a child process.
155
156        target_args: Union[List[Any], Tuple[Any], None], default None
157            Positional arguments to pass to the target function.
158
159        target_kw: Optional[Dict[str, Any]], default None
160            Keyword arguments to pass to the target function.
161
162        env: Optional[Dict[str, str]], default None
163            If provided, set these environment variables in the daemon process.
164
165        daemon_id: Optional[str], default None
166            Build a `Daemon` from an existing `daemon_id`.
167            If `daemon_id` is provided, other arguments are ignored and are derived
168            from the existing pickled `Daemon`.
169
170        label: Optional[str], default None
171            Label string to help identifiy a daemon.
172            If `None`, use the function name instead.
173
174        properties: Optional[Dict[str, Any]], default None
175            Override reading from the properties JSON by providing an existing dictionary.
176        """
177        _pickle = self.__dict__.get('_pickle', False)
178        if daemon_id is not None:
179            self.daemon_id = daemon_id
180            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
181
182                if not self.properties_path.exists():
183                    raise Exception(
184                        f"Daemon '{self.daemon_id}' does not exist. "
185                        + "Pass a target to create a new Daemon."
186                    )
187
188                try:
189                    new_daemon = self.from_properties_file(daemon_id)
190                except Exception:
191                    new_daemon = None
192
193                if new_daemon is not None:
194                    new_daemon.write_pickle()
195                    target = new_daemon.target
196                    target_args = new_daemon.target_args
197                    target_kw = new_daemon.target_kw
198                    label = new_daemon.label
199                    self._properties = new_daemon.properties
200                else:
201                    try:
202                        self.properties_path.unlink()
203                    except Exception:
204                        pass
205
206                    raise Exception(
207                        f"Could not recover daemon '{self.daemon_id}' "
208                        + "from its properties file."
209                    )
210
211        if 'target' not in self.__dict__:
212            if target is None:
213                error("Cannot create a Daemon without a target.")
214            self.target = target
215
216        ### NOTE: We have to check self.__dict__ in case we un-pickling.
217        if '_target_args' not in self.__dict__:
218            self._target_args = target_args
219        if '_target_kw' not in self.__dict__:
220            self._target_kw = target_kw
221
222        if 'label' not in self.__dict__:
223            if label is None:
224                label = (
225                    self.target.__name__ if '__name__' in self.target.__dir__()
226                        else str(self.target)
227                )
228            self.label = label
229        if 'daemon_id' not in self.__dict__:
230            self.daemon_id = get_new_daemon_name()
231        if '_properties' not in self.__dict__:
232            self._properties = properties
233        if self._properties is None:
234            self._properties = {}
235
236        self._properties.update({'label': self.label})
237        if env:
238            self._properties.update({'env': env})
239
240        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
241        _ = self.process
Parameters
  • target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
  • target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
  • target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
  • daemon_id (Optional[str], default None): Build a Daemon from an existing daemon_id. If daemon_id is provided, other arguments are ignored and are derived from the existing pickled Daemon.
  • label (Optional[str], default None): Label string to help identifiy a daemon. If None, use the function name instead.
  • properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
@classmethod
def from_properties_file(cls, daemon_id: str) -> Daemon:
 93    @classmethod
 94    def from_properties_file(cls, daemon_id: str) -> Daemon:
 95        """
 96        Return a Daemon from a properties dictionary.
 97        """
 98        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
 99        if not properties_path.exists():
100            raise OSError(f"Properties file '{properties_path}' does not exist.")
101
102        try:
103            with open(properties_path, 'r', encoding='utf-8') as f:
104                properties = json.load(f)
105        except Exception:
106            properties = {}
107
108        if not properties:
109            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
110
111        daemon_id = properties_path.parent.name
112        target_cf = properties.get('target', {})
113        target_module_name = target_cf.get('module', None)
114        target_function_name = target_cf.get('name', None)
115        target_args = target_cf.get('args', None)
116        target_kw = target_cf.get('kw', None)
117        label = properties.get('label', None)
118
119        if None in [
120            target_module_name,
121            target_function_name,
122            target_args,
123            target_kw,
124        ]:
125            raise ValueError("Missing target function information.")
126
127        target_module = importlib.import_module(target_module_name)
128        target_function = getattr(target_module, target_function_name)
129
130        return Daemon(
131            daemon_id=daemon_id,
132            target=target_function,
133            target_args=target_args,
134            target_kw=target_kw,
135            properties=properties,
136            label=label,
137        )

Return a Daemon from a properties dictionary.

def run( self, keep_daemon_output: bool = True, allow_dirty_run: bool = False, debug: bool = False) -> Tuple[bool, str]:
389    def run(
390        self,
391        keep_daemon_output: bool = True,
392        allow_dirty_run: bool = False,
393        debug: bool = False,
394    ) -> SuccessTuple:
395        """Run the daemon as a child process and continue executing the parent.
396
397        Parameters
398        ----------
399        keep_daemon_output: bool, default True
400            If `False`, delete the daemon's output directory upon exiting.
401
402        allow_dirty_run: bool, default False
403            If `True`, run the daemon, even if the `daemon_id` directory exists.
404            This option is dangerous because if the same `daemon_id` runs concurrently,
405            the last to finish will overwrite the output of the first.
406
407        Returns
408        -------
409        A SuccessTuple indicating success.
410
411        """
412        import platform
413        if platform.system() == 'Windows':
414            return False, "Cannot run background jobs on Windows."
415
416        ### The daemon might exist and be paused.
417        if self.status == 'paused':
418            return self.resume()
419
420        self._remove_stop_file()
421        if self.status == 'running':
422            return True, f"Daemon '{self}' is already running."
423
424        self.mkdir_if_not_exists(allow_dirty_run)
425        _write_pickle_success_tuple = self.write_pickle()
426        if not _write_pickle_success_tuple[0]:
427            return _write_pickle_success_tuple
428
429        _launch_daemon_code = (
430            "from meerschaum.utils.daemon import Daemon; "
431            + f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
432            + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
433            + "allow_dirty_run=True)"
434        )
435        env = dict(os.environ)
436        env['MRSM_NOASK'] = 'true'
437        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
438        msg = (
439            "Success"
440            if _launch_success_bool
441            else f"Failed to start daemon '{self.daemon_id}'."
442        )
443        return _launch_success_bool, msg

Run the daemon as a child process and continue executing the parent.

Parameters
  • keep_daemon_output (bool, default True): If False, delete the daemon's output directory upon exiting.
  • allow_dirty_run (bool, default False): If True, run the daemon, even if the daemon_id directory exists. This option is dangerous because if the same daemon_id runs concurrently, the last to finish will overwrite the output of the first.
Returns
  • A SuccessTuple indicating success.
def kill(self, timeout: Union[int, float, NoneType] = 8) -> Tuple[bool, str]:
445    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
446        """
447        Forcibly terminate a running daemon.
448        Sends a SIGTERM signal to the process.
449
450        Parameters
451        ----------
452        timeout: Optional[int], default 3
453            How many seconds to wait for the process to terminate.
454
455        Returns
456        -------
457        A SuccessTuple indicating success.
458        """
459        if self.status != 'paused':
460            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
461            if success:
462                self._write_stop_file('kill')
463                return success, msg
464
465        if self.status == 'stopped':
466            self._write_stop_file('kill')
467            return True, "Process has already stopped."
468
469        process = self.process
470        try:
471            process.terminate()
472            process.kill()
473            process.wait(timeout=timeout)
474        except Exception as e:
475            return False, f"Failed to kill job {self} with exception: {e}"
476
477        if self.pid_path.exists():
478            try:
479                self.pid_path.unlink()
480            except Exception as e:
481                pass
482
483        self._write_stop_file('kill')
484        return True, "Success"

Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.

Parameters
  • timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
  • A SuccessTuple indicating success.
def quit(self, timeout: Union[int, float, NoneType] = None) -> Tuple[bool, str]:
486    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
487        """Gracefully quit a running daemon."""
488        if self.status == 'paused':
489            return self.kill(timeout)
490
491        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
492        if signal_success:
493            self._write_stop_file('quit')
494        return signal_success, signal_msg

Gracefully quit a running daemon.

def pause( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
496    def pause(
497        self,
498        timeout: Union[int, float, None] = None,
499        check_timeout_interval: Union[float, int, None] = None,
500    ) -> SuccessTuple:
501        """
502        Pause the daemon if it is running.
503
504        Parameters
505        ----------
506        timeout: Union[float, int, None], default None
507            The maximum number of seconds to wait for a process to suspend.
508
509        check_timeout_interval: Union[float, int, None], default None
510            The number of seconds to wait between checking if the process is still running.
511
512        Returns
513        -------
514        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
515        """
516        if self.process is None:
517            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
518
519        if self.status == 'paused':
520            return True, f"Daemon '{self.daemon_id}' is already paused."
521
522        self._write_stop_file('pause')
523        try:
524            self.process.suspend()
525        except Exception as e:
526            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
527
528        timeout = self.get_timeout_seconds(timeout)
529        check_timeout_interval = self.get_check_timeout_interval_seconds(
530            check_timeout_interval
531        )
532
533        psutil = attempt_import('psutil')
534
535        if not timeout:
536            try:
537                success = self.process.status() == 'stopped'
538            except psutil.NoSuchProcess as e:
539                success = True
540            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
541            if success:
542                self._capture_process_timestamp('paused')
543            return success, msg
544
545        begin = time.perf_counter()
546        while (time.perf_counter() - begin) < timeout:
547            try:
548                if self.process.status() == 'stopped':
549                    self._capture_process_timestamp('paused')
550                    return True, "Success"
551            except psutil.NoSuchProcess as e:
552                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
553            time.sleep(check_timeout_interval)
554
555        return False, (
556            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
557            + ('s' if timeout != 1 else '') + '.'
558        )

Pause the daemon if it is running.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully suspended.
def resume( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
560    def resume(
561        self,
562        timeout: Union[int, float, None] = None,
563        check_timeout_interval: Union[float, int, None] = None,
564    ) -> SuccessTuple:
565        """
566        Resume the daemon if it is paused.
567
568        Parameters
569        ----------
570        timeout: Union[float, int, None], default None
571            The maximum number of seconds to wait for a process to resume.
572
573        check_timeout_interval: Union[float, int, None], default None
574            The number of seconds to wait between checking if the process is still stopped.
575
576        Returns
577        -------
578        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
579        """
580        if self.status == 'running':
581            return True, f"Daemon '{self.daemon_id}' is already running."
582
583        if self.status == 'stopped':
584            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
585
586        self._remove_stop_file()
587        try:
588            self.process.resume()
589        except Exception as e:
590            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
591
592        timeout = self.get_timeout_seconds(timeout)
593        check_timeout_interval = self.get_check_timeout_interval_seconds(
594            check_timeout_interval
595        )
596
597        if not timeout:
598            success = self.status == 'running'
599            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
600            if success:
601                self._capture_process_timestamp('began')
602            return success, msg
603
604        begin = time.perf_counter()
605        while (time.perf_counter() - begin) < timeout:
606            if self.status == 'running':
607                self._capture_process_timestamp('began')
608                return True, "Success"
609            time.sleep(check_timeout_interval)
610
611        return False, (
612            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
613            + ('s' if timeout != 1 else '') + '.'
614        )

Resume the daemon if it is paused.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully resumed.
def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
735    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
736        """Create the Daemon's directory.
737        If `allow_dirty_run` is `False` and the directory already exists,
738        raise a `FileExistsError`.
739        """
740        try:
741            self.path.mkdir(parents=True, exist_ok=True)
742            _already_exists = any(os.scandir(self.path))
743        except FileExistsError:
744            _already_exists = True
745
746        if _already_exists and not allow_dirty_run:
747            error(
748                f"Daemon '{self.daemon_id}' already exists. " +
749                f"To allow this daemon to run, do one of the following:\n"
750                + "  - Execute `daemon.cleanup()`.\n"
751                + f"  - Delete the directory '{self.path}'.\n"
752                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
753                FileExistsError,
754            )

Create the Daemon's directory. If allow_dirty_run is False and the directory already exists, raise a FileExistsError.

process: "Union['psutil.Process', None]"
756    @property
757    def process(self) -> Union['psutil.Process', None]:
758        """
759        Return the psutil process for the Daemon.
760        """
761        psutil = attempt_import('psutil')
762        pid = self.pid
763        if pid is None:
764            return None
765        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
766            try:
767                self._process = psutil.Process(int(pid))
768            except Exception as e:
769                if self.pid_path.exists():
770                    self.pid_path.unlink()
771                return None
772        return self._process

Return the psutil process for the Daemon.

status: str
774    @property
775    def status(self) -> str:
776        """
777        Return the running status of this Daemon.
778        """
779        if self.process is None:
780            return 'stopped'
781
782        psutil = attempt_import('psutil')
783        try:
784            if self.process.status() == 'stopped':
785                return 'paused'
786            if self.process.status() == 'zombie':
787                raise psutil.NoSuchProcess(self.process.pid)
788        except (psutil.NoSuchProcess, AttributeError):
789            if self.pid_path.exists():
790                try:
791                    self.pid_path.unlink()
792                except Exception as e:
793                    pass
794            return 'stopped'
795
796        return 'running'

Return the running status of this Daemon.

path: pathlib.Path
805    @property
806    def path(self) -> pathlib.Path:
807        """
808        Return the path for this Daemon's directory.
809        """
810        return self._get_path_from_daemon_id(self.daemon_id)

Return the path for this Daemon's directory.

properties_path: pathlib.Path
819    @property
820    def properties_path(self) -> pathlib.Path:
821        """
822        Return the `propterties.json` path for this Daemon.
823        """
824        return self._get_properties_path_from_daemon_id(self.daemon_id)

Return the propterties.json path for this Daemon.

stop_path: pathlib.Path
826    @property
827    def stop_path(self) -> pathlib.Path:
828        """
829        Return the path for the stop file (created when manually stopped).
830        """
831        return self.path / '.stop.json'

Return the path for the stop file (created when manually stopped).

log_path: pathlib.Path
833    @property
834    def log_path(self) -> pathlib.Path:
835        """
836        Return the log path.
837        """
838        return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')

Return the log path.

stdin_file_path: pathlib.Path
840    @property
841    def stdin_file_path(self) -> pathlib.Path:
842        """
843        Return the stdin file path.
844        """
845        return self.path / 'input.stdin'

Return the stdin file path.

blocking_stdin_file_path: pathlib.Path
847    @property
848    def blocking_stdin_file_path(self) -> pathlib.Path:
849        """
850        Return the stdin file path.
851        """
852        if '_blocking_stdin_file_path' in self.__dict__:
853            return self._blocking_stdin_file_path
854
855        return self.path / 'input.stdin.block'

Return the stdin file path.

log_offset_path: pathlib.Path
857    @property
858    def log_offset_path(self) -> pathlib.Path:
859        """
860        Return the log offset file path.
861        """
862        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')

Return the log offset file path.

864    @property
865    def rotating_log(self) -> RotatingFile:
866        """
867        The rotating log file for the daemon's output.
868        """
869        if '_rotating_log' in self.__dict__:
870            return self._rotating_log
871
872        write_timestamps = (
873            self.properties.get('logs', {}).get('write_timestamps', None)
874        )
875        if write_timestamps is None:
876            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
877
878        self._rotating_log = RotatingFile(
879            self.log_path,
880            redirect_streams=True,
881            write_timestamps=write_timestamps,
882            timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'),
883        )
884        return self._rotating_log

The rotating log file for the daemon's output.

stdin_file
886    @property
887    def stdin_file(self):
888        """
889        Return the file handler for the stdin file.
890        """
891        if '_stdin_file' in self.__dict__:
892            return self._stdin_file
893
894        self._stdin_file = StdinFile(
895            self.stdin_file_path,
896            lock_file_path=self.blocking_stdin_file_path,
897        )
898        return self._stdin_file

Return the file handler for the stdin file.

log_text: Optional[str]
900    @property
901    def log_text(self) -> Optional[str]:
902        """
903        Read the log files and return their contents.
904        Returns `None` if the log file does not exist.
905        """
906        new_rotating_log = RotatingFile(
907            self.rotating_log.file_path,
908            num_files_to_keep = self.rotating_log.num_files_to_keep,
909            max_file_size = self.rotating_log.max_file_size,
910            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'),
911            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'),
912        )
913        return new_rotating_log.read()

Read the log files and return their contents. Returns None if the log file does not exist.

def readlines(self) -> List[str]:
915    def readlines(self) -> List[str]:
916        """
917        Read the next log lines, persisting the cursor for later use.
918        Note this will alter the cursor of `self.rotating_log`.
919        """
920        self.rotating_log._cursor = self._read_log_offset()
921        lines = self.rotating_log.readlines()
922        self._write_log_offset()
923        return lines

Read the next log lines, persisting the cursor for later use. Note this will alter the cursor of self.rotating_log.

pid: Optional[int]
951    @property
952    def pid(self) -> Union[int, None]:
953        """
954        Read the PID file and return its contents.
955        Returns `None` if the PID file does not exist.
956        """
957        if not self.pid_path.exists():
958            return None
959        try:
960            with open(self.pid_path, 'r', encoding='utf-8') as f:
961                text = f.read()
962            if len(text) == 0:
963                return None
964            pid = int(text.rstrip())
965        except Exception as e:
966            warn(e)
967            text = None
968            pid = None
969        return pid

Read the PID file and return its contents. Returns None if the PID file does not exist.

pid_path: pathlib.Path
971    @property
972    def pid_path(self) -> pathlib.Path:
973        """
974        Return the path to a file containing the PID for this Daemon.
975        """
976        return self.path / 'process.pid'

Return the path to a file containing the PID for this Daemon.

pid_lock: "'fasteners.InterProcessLock'"
978    @property
979    def pid_lock(self) -> 'fasteners.InterProcessLock':
980        """
981        Return the process lock context manager.
982        """
983        if '_pid_lock' in self.__dict__:
984            return self._pid_lock
985
986        fasteners = attempt_import('fasteners')
987        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
988        return self._pid_lock

Return the process lock context manager.

pickle_path: pathlib.Path
990    @property
991    def pickle_path(self) -> pathlib.Path:
992        """
993        Return the path for the pickle file.
994        """
995        return self.path / 'pickle.pkl'

Return the path for the pickle file.

def read_properties(self) -> Optional[Dict[str, Any]]:
 997    def read_properties(self) -> Optional[Dict[str, Any]]:
 998        """Read the properties JSON file and return the dictionary."""
 999        if not self.properties_path.exists():
1000            return None
1001        try:
1002            with open(self.properties_path, 'r', encoding='utf-8') as file:
1003                properties = json.load(file)
1004        except Exception as e:
1005            properties = {}
1006        
1007        return properties

Read the properties JSON file and return the dictionary.

def read_pickle(self) -> Daemon:
1009    def read_pickle(self) -> Daemon:
1010        """Read a Daemon's pickle file and return the `Daemon`."""
1011        import pickle, traceback
1012        if not self.pickle_path.exists():
1013            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1014
1015        if self.pickle_path.stat().st_size == 0:
1016            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1017
1018        try:
1019            with open(self.pickle_path, 'rb') as pickle_file:
1020                daemon = pickle.load(pickle_file)
1021            success, msg = True, 'Success'
1022        except Exception as e:
1023            success, msg = False, str(e)
1024            daemon = None
1025            traceback.print_exception(type(e), e, e.__traceback__)
1026        if not success:
1027            error(msg)
1028        return daemon

Read a Daemon's pickle file and return the Daemon.

properties: Dict[str, Any]
1030    @property
1031    def properties(self) -> Dict[str, Any]:
1032        """
1033        Return the contents of the properties JSON file.
1034        """
1035        try:
1036            _file_properties = self.read_properties()
1037        except Exception:
1038            traceback.print_exc()
1039            _file_properties = {}
1040
1041        if not self._properties:
1042            self._properties = _file_properties
1043
1044        if self._properties is None:
1045            self._properties = {}
1046
1047        if _file_properties is not None:
1048            self._properties = apply_patch_to_config(
1049                _file_properties,
1050                self._properties,
1051            )
1052
1053        return self._properties

Return the contents of the properties JSON file.

hidden: bool
1055    @property
1056    def hidden(self) -> bool:
1057        """
1058        Return a bool indicating whether this Daemon should be displayed.
1059        """
1060        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')

Return a bool indicating whether this Daemon should be displayed.

def write_properties(self) -> Tuple[bool, str]:
1062    def write_properties(self) -> SuccessTuple:
1063        """Write the properties dictionary to the properties JSON file
1064        (only if self.properties exists).
1065        """
1066        success, msg = (
1067            False,
1068            f"No properties to write for daemon '{self.daemon_id}'."
1069        )
1070        if self.properties is not None:
1071            try:
1072                self.path.mkdir(parents=True, exist_ok=True)
1073                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1074                    json.dump(self.properties, properties_file)
1075                success, msg = True, 'Success'
1076            except Exception as e:
1077                success, msg = False, str(e)
1078        return success, msg

Write the properties dictionary to the properties JSON file (only if self.properties exists).

def write_pickle(self) -> Tuple[bool, str]:
1080    def write_pickle(self) -> SuccessTuple:
1081        """Write the pickle file for the daemon."""
1082        import pickle, traceback
1083        try:
1084            self.path.mkdir(parents=True, exist_ok=True)
1085            with open(self.pickle_path, 'wb+') as pickle_file:
1086                pickle.dump(self, pickle_file)
1087            success, msg = True, "Success"
1088        except Exception as e:
1089            success, msg = False, str(e)
1090            traceback.print_exception(type(e), e, e.__traceback__)
1091        return success, msg

Write the pickle file for the daemon.

def cleanup(self, keep_logs: bool = False) -> Tuple[bool, str]:
1121    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1122        """
1123        Remove a daemon's directory after execution.
1124
1125        Parameters
1126        ----------
1127        keep_logs: bool, default False
1128            If `True`, skip deleting the daemon's log files.
1129
1130        Returns
1131        -------
1132        A `SuccessTuple` indicating success.
1133        """
1134        if self.path.exists():
1135            try:
1136                shutil.rmtree(self.path)
1137            except Exception as e:
1138                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1139                warn(msg)
1140                return False, msg
1141        if not keep_logs:
1142            self.rotating_log.delete()
1143            try:
1144                if self.log_offset_path.exists():
1145                    self.log_offset_path.unlink()
1146            except Exception as e:
1147                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1148                warn(msg)
1149                return False, msg
1150        return True, "Success"

Remove a daemon's directory after execution.

Parameters
  • keep_logs (bool, default False): If True, skip deleting the daemon's log files.
Returns
  • A SuccessTuple indicating success.
def get_timeout_seconds(self, timeout: Union[int, float, NoneType] = None) -> Union[int, float]:
1153    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1154        """
1155        Return the timeout value to use. Use `--timeout-seconds` if provided,
1156        else the configured default (8).
1157        """
1158        if isinstance(timeout, (int, float)):
1159            return timeout
1160        return get_config('jobs', 'timeout_seconds')

Return the timeout value to use. Use --timeout-seconds if provided, else the configured default (8).

def get_check_timeout_interval_seconds( self, check_timeout_interval: Union[int, float, NoneType] = None) -> Union[int, float]:
1163    def get_check_timeout_interval_seconds(
1164        self,
1165        check_timeout_interval: Union[int, float, None] = None,
1166    ) -> Union[int, float]:
1167        """
1168        Return the interval value to check the status of timeouts.
1169        """
1170        if isinstance(check_timeout_interval, (int, float)):
1171            return check_timeout_interval
1172        return get_config('jobs', 'check_timeout_interval_seconds')

Return the interval value to check the status of timeouts.

target_args: Optional[Tuple[Any]]
1174    @property
1175    def target_args(self) -> Union[Tuple[Any], None]:
1176        """
1177        Return the positional arguments to pass to the target function.
1178        """
1179        target_args = (
1180            self.__dict__.get('_target_args', None)
1181            or self.properties.get('target', {}).get('args', None)
1182        )
1183        if target_args is None:
1184            return tuple([])
1185
1186        return tuple(target_args)

Return the positional arguments to pass to the target function.

target_kw: Optional[Dict[str, Any]]
1188    @property
1189    def target_kw(self) -> Union[Dict[str, Any], None]:
1190        """
1191        Return the keyword arguments to pass to the target function.
1192        """
1193        target_kw = (
1194            self.__dict__.get('_target_kw', None)
1195            or self.properties.get('target', {}).get('kw', None)
1196        )
1197        if target_kw is None:
1198            return {}
1199
1200        return {key: val for key, val in target_kw.items()}

Return the keyword arguments to pass to the target function.