meerschaum.utils.daemon.Daemon

Manage running daemons via the Daemon class.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Manage running daemons via the Daemon class.
   7"""
   8
   9from __future__ import annotations
  10import os
  11import importlib
  12import pathlib
  13import json
  14import shutil
  15import signal
  16import time
  17import traceback
  18from functools import partial
  19from datetime import datetime, timezone
  20
  21import meerschaum as mrsm
  22from meerschaum.utils.typing import (
  23    Optional, Dict, Any, SuccessTuple, Callable, List, Union,
  24    is_success_tuple, Tuple,
  25)
  26from meerschaum.config import get_config
  27from meerschaum._internal.static import STATIC_CONFIG
  28from meerschaum.config._patch import apply_patch_to_config
  29from meerschaum.utils.warnings import warn, error
  30from meerschaum.utils.packages import attempt_import
  31from meerschaum.utils.venv import venv_exec
  32from meerschaum.utils.daemon._names import get_new_daemon_name
  33from meerschaum.utils.daemon.RotatingFile import RotatingFile
  34from meerschaum.utils.daemon.StdinFile import StdinFile
  35from meerschaum.utils.threading import RepeatTimer
  36from meerschaum.__main__ import _close_pools
  37
  38_daemons = []
  39_results = {}
  40
  41class Daemon:
  42    """
  43    Daemonize Python functions into background processes.
  44
  45    Examples
  46    --------
  47    >>> import meerschaum as mrsm
  48    >>> from meerschaum.utils.daemons import Daemon
  49    >>> daemon = Daemon(print, ('hi',))
  50    >>> success, msg = daemon.run()
  51    >>> print(daemon.log_text)
  52
  53    2024-07-29 18:03 | hi
  54    2024-07-29 18:03 |
  55    >>> daemon.run(allow_dirty_run=True)
  56    >>> print(daemon.log_text)
  57
  58    2024-07-29 18:03 | hi
  59    2024-07-29 18:03 |
  60    2024-07-29 18:05 | hi
  61    2024-07-29 18:05 |
  62    >>> mrsm.pprint(daemon.properties)
  63    {
  64        'label': 'print',
  65        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  66        'result': None,
  67        'process': {'ended': '2024-07-29T18:03:33.752806'}
  68    }
  69
  70    """
  71
  72    def __new__(
  73        cls,
  74        *args,
  75        daemon_id: Optional[str] = None,
  76        **kw
  77    ):
  78        """
  79        If a daemon_id is provided and already exists, read from its pickle file.
  80        """
  81        instance = super(Daemon, cls).__new__(cls)
  82        if daemon_id is not None:
  83            instance.daemon_id = daemon_id
  84            if instance.pickle_path.exists():
  85                instance = instance.read_pickle()
  86        return instance
  87
  88    @classmethod
  89    def from_properties_file(cls, daemon_id: str) -> Daemon:
  90        """
  91        Return a Daemon from a properties dictionary.
  92        """
  93        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  94        if not properties_path.exists():
  95            raise OSError(f"Properties file '{properties_path}' does not exist.")
  96
  97        try:
  98            with open(properties_path, 'r', encoding='utf-8') as f:
  99                properties = json.load(f)
 100        except Exception:
 101            properties = {}
 102
 103        if not properties:
 104            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 105
 106        daemon_id = properties_path.parent.name
 107        target_cf = properties.get('target', {})
 108        target_module_name = target_cf.get('module', None)
 109        target_function_name = target_cf.get('name', None)
 110        target_args = target_cf.get('args', None)
 111        target_kw = target_cf.get('kw', None)
 112        label = properties.get('label', None)
 113
 114        if None in [
 115            target_module_name,
 116            target_function_name,
 117            target_args,
 118            target_kw,
 119        ]:
 120            raise ValueError("Missing target function information.")
 121
 122        target_module = importlib.import_module(target_module_name)
 123        target_function = getattr(target_module, target_function_name)
 124
 125        return Daemon(
 126            daemon_id=daemon_id,
 127            target=target_function,
 128            target_args=target_args,
 129            target_kw=target_kw,
 130            properties=properties,
 131            label=label,
 132        )
 133
 134
 135    def __init__(
 136        self,
 137        target: Optional[Callable[[Any], Any]] = None,
 138        target_args: Union[List[Any], Tuple[Any], None] = None,
 139        target_kw: Optional[Dict[str, Any]] = None,
 140        env: Optional[Dict[str, str]] = None,
 141        daemon_id: Optional[str] = None,
 142        label: Optional[str] = None,
 143        properties: Optional[Dict[str, Any]] = None,
 144        pickle: bool = True,
 145    ):
 146        """
 147        Parameters
 148        ----------
 149        target: Optional[Callable[[Any], Any]], default None,
 150            The function to execute in a child process.
 151
 152        target_args: Union[List[Any], Tuple[Any], None], default None
 153            Positional arguments to pass to the target function.
 154
 155        target_kw: Optional[Dict[str, Any]], default None
 156            Keyword arguments to pass to the target function.
 157
 158        env: Optional[Dict[str, str]], default None
 159            If provided, set these environment variables in the daemon process.
 160
 161        daemon_id: Optional[str], default None
 162            Build a `Daemon` from an existing `daemon_id`.
 163            If `daemon_id` is provided, other arguments are ignored and are derived
 164            from the existing pickled `Daemon`.
 165
 166        label: Optional[str], default None
 167            Label string to help identifiy a daemon.
 168            If `None`, use the function name instead.
 169
 170        properties: Optional[Dict[str, Any]], default None
 171            Override reading from the properties JSON by providing an existing dictionary.
 172        """
 173        _pickle = self.__dict__.get('_pickle', False)
 174        if daemon_id is not None:
 175            self.daemon_id = daemon_id
 176            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 177
 178                if not self.properties_path.exists():
 179                    raise Exception(
 180                        f"Daemon '{self.daemon_id}' does not exist. "
 181                        + "Pass a target to create a new Daemon."
 182                    )
 183
 184                try:
 185                    new_daemon = self.from_properties_file(daemon_id)
 186                except Exception:
 187                    new_daemon = None
 188
 189                if new_daemon is not None:
 190                    new_daemon.write_pickle()
 191                    target = new_daemon.target
 192                    target_args = new_daemon.target_args
 193                    target_kw = new_daemon.target_kw
 194                    label = new_daemon.label
 195                    self._properties = new_daemon.properties
 196                else:
 197                    try:
 198                        self.properties_path.unlink()
 199                    except Exception:
 200                        pass
 201
 202                    raise Exception(
 203                        f"Could not recover daemon '{self.daemon_id}' "
 204                        + "from its properties file."
 205                    )
 206
 207        if 'target' not in self.__dict__:
 208            if target is None:
 209                error("Cannot create a Daemon without a target.")
 210            self.target = target
 211
 212        self.pickle = pickle
 213
 214        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 215        if '_target_args' not in self.__dict__:
 216            self._target_args = target_args
 217        if '_target_kw' not in self.__dict__:
 218            self._target_kw = target_kw
 219
 220        if 'label' not in self.__dict__:
 221            if label is None:
 222                label = (
 223                    self.target.__name__ if '__name__' in self.target.__dir__()
 224                        else str(self.target)
 225                )
 226            self.label = label
 227        elif label is not None:
 228            self.label = label
 229
 230        if 'daemon_id' not in self.__dict__:
 231            self.daemon_id = get_new_daemon_name()
 232        if '_properties' not in self.__dict__:
 233            self._properties = properties
 234        elif properties:
 235            if self._properties is None:
 236                self._properties = {}
 237            self._properties.update(properties)
 238        if self._properties is None:
 239            self._properties = {}
 240
 241        self._properties.update({'label': self.label})
 242        if env:
 243            self._properties.update({'env': env})
 244
 245        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 246        _ = self.process
 247
 248
 249    def _run_exit(
 250        self,
 251        keep_daemon_output: bool = True,
 252        allow_dirty_run: bool = False,
 253    ) -> Any:
 254        """Run the daemon's target function.
 255        NOTE: This WILL EXIT the parent process!
 256
 257        Parameters
 258        ----------
 259        keep_daemon_output: bool, default True
 260            If `False`, delete the daemon's output directory upon exiting.
 261
 262        allow_dirty_run, bool, default False:
 263            If `True`, run the daemon, even if the `daemon_id` directory exists.
 264            This option is dangerous because if the same `daemon_id` runs twice,
 265            the last to finish will overwrite the output of the first.
 266
 267        Returns
 268        -------
 269        Nothing — this will exit the parent process.
 270        """
 271        import platform
 272        import sys
 273        import os
 274        import traceback
 275        from meerschaum.utils.warnings import warn
 276        from meerschaum.config import get_config
 277        daemon = attempt_import('daemon')
 278        lines = get_config('jobs', 'terminal', 'lines')
 279        columns = get_config('jobs', 'terminal', 'columns')
 280
 281        if platform.system() == 'Windows':
 282            return False, "Windows is no longer supported."
 283
 284        self._setup(allow_dirty_run)
 285
 286        _daemons.append(self)
 287
 288        logs_cf = self.properties.get('logs', {})
 289        log_refresh_seconds = logs_cf.get('refresh_files_seconds', None)
 290        if log_refresh_seconds is None:
 291            log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 292        write_timestamps = logs_cf.get('write_timestamps', None)
 293        if write_timestamps is None:
 294            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 295
 296        self._log_refresh_timer = RepeatTimer(
 297            log_refresh_seconds,
 298            partial(self.rotating_log.refresh_files, start_interception=write_timestamps),
 299        )
 300
 301        capture_stdin = logs_cf.get('stdin', True)
 302        cwd = self.properties.get('cwd', os.getcwd())
 303
 304        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 305        ###       KeyboardInterrupts themselves.
 306        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 307        self._daemon_context = daemon.DaemonContext(
 308            pidfile=self.pid_lock,
 309            stdout=self.rotating_log,
 310            stderr=self.rotating_log,
 311            stdin=(self.stdin_file if capture_stdin else None),
 312            working_directory=cwd,
 313            detach_process=True,
 314            files_preserve=list(self.rotating_log.subfile_objects.values()),
 315            signal_map={
 316                signal.SIGTERM: self._handle_sigterm,
 317            },
 318        )
 319
 320        if capture_stdin and sys.stdin is None:
 321            raise OSError("Cannot daemonize without stdin.")
 322
 323        try:
 324            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 325            with self._daemon_context:
 326                if capture_stdin:
 327                    sys.stdin = self.stdin_file
 328                _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None)
 329                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 330                os.environ['PYTHONUNBUFFERED'] = '1'
 331
 332                ### Allow the user to override environment variables.
 333                env = self.properties.get('env', {})
 334                if env and isinstance(env, dict):
 335                    os.environ.update({str(k): str(v) for k, v in env.items()})
 336
 337                self.rotating_log.refresh_files(start_interception=True)
 338                result = None
 339                try:
 340                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 341                        f.write(str(os.getpid()))
 342
 343                    ### NOTE: The timer fails to start for remote actions to localhost.
 344                    try:
 345                        if not self._log_refresh_timer.is_running():
 346                            self._log_refresh_timer.start()
 347                    except Exception:
 348                        pass
 349
 350                    self.properties['result'] = None
 351                    self._capture_process_timestamp('began')
 352                    result = self.target(*self.target_args, **self.target_kw)
 353                    self.properties['result'] = result
 354                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 355                    result = False, traceback.format_exc()
 356                except Exception as e:
 357                    warn(
 358                        f"Exception in daemon target function: {traceback.format_exc()}",
 359                    )
 360                    result = False, str(e)
 361                finally:
 362                    _results[self.daemon_id] = result
 363                    self.properties['result'] = result
 364
 365                    if keep_daemon_output:
 366                        self._capture_process_timestamp('ended')
 367                    else:
 368                        self.cleanup()
 369
 370                    self._log_refresh_timer.cancel()
 371                    try:
 372                        if self.pid is None and self.pid_path.exists():
 373                            self.pid_path.unlink()
 374                    except Exception:
 375                        pass
 376
 377                    if is_success_tuple(result):
 378                        try:
 379                            mrsm.pprint(result)
 380                        except BrokenPipeError:
 381                            pass
 382
 383        except Exception:
 384            daemon_error = traceback.format_exc()
 385            from meerschaum.config.paths import DAEMON_ERROR_LOG_PATH
 386            with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 387                f.write(
 388                    f"Error in Daemon '{self}':\n\n"
 389                    f"{sys.stdin=}\n"
 390                    f"{self.stdin_file_path=}\n"
 391                    f"{self.stdin_file_path.exists()=}\n\n"
 392                    f"{daemon_error}\n\n"
 393                )
 394            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 395
 396    def _capture_process_timestamp(
 397        self,
 398        process_key: str,
 399        write_properties: bool = True,
 400    ) -> None:
 401        """
 402        Record the current timestamp to the parameters `process:<process_key>`.
 403
 404        Parameters
 405        ----------
 406        process_key: str
 407            Under which key to store the timestamp.
 408
 409        write_properties: bool, default True
 410            If `True` persist the properties to disk immediately after capturing the timestamp.
 411        """
 412        if 'process' not in self.properties:
 413            self.properties['process'] = {}
 414
 415        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 416            raise ValueError(f"Invalid key '{process_key}'.")
 417
 418        self.properties['process'][process_key] = (
 419            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 420        )
 421        if write_properties:
 422            self.write_properties()
 423
 424    def run(
 425        self,
 426        keep_daemon_output: bool = True,
 427        allow_dirty_run: bool = False,
 428        wait: bool = False,
 429        timeout: Union[int, float] = 4,
 430        debug: bool = False,
 431    ) -> SuccessTuple:
 432        """Run the daemon as a child process and continue executing the parent.
 433
 434        Parameters
 435        ----------
 436        keep_daemon_output: bool, default True
 437            If `False`, delete the daemon's output directory upon exiting.
 438
 439        allow_dirty_run: bool, default False
 440            If `True`, run the daemon, even if the `daemon_id` directory exists.
 441            This option is dangerous because if the same `daemon_id` runs concurrently,
 442            the last to finish will overwrite the output of the first.
 443
 444        wait: bool, default True
 445            If `True`, block until `Daemon.status` is running (or the timeout expires).
 446
 447        timeout: Union[int, float], default 4
 448            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
 449
 450        Returns
 451        -------
 452        A SuccessTuple indicating success.
 453
 454        """
 455        import platform
 456        if platform.system() == 'Windows':
 457            return False, "Cannot run background jobs on Windows."
 458
 459        ### The daemon might exist and be paused.
 460        if self.status == 'paused':
 461            return self.resume()
 462
 463        self._remove_stop_file()
 464        if self.status == 'running':
 465            return True, f"Daemon '{self}' is already running."
 466
 467        self.mkdir_if_not_exists(allow_dirty_run)
 468        _write_pickle_success_tuple = self.write_pickle()
 469        if not _write_pickle_success_tuple[0]:
 470            return _write_pickle_success_tuple
 471
 472        _launch_daemon_code = (
 473            "from meerschaum.utils.daemon import Daemon, _daemons; "
 474            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 475            f"_daemons['{self.daemon_id}'] = daemon; "
 476            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 477            "allow_dirty_run=True)"
 478        )
 479        env = dict(os.environ)
 480        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 481        msg = (
 482            "Success"
 483            if _launch_success_bool
 484            else f"Failed to start daemon '{self.daemon_id}'."
 485        )
 486        if not wait or not _launch_success_bool:
 487            return _launch_success_bool, msg
 488
 489        timeout = self.get_timeout_seconds(timeout)
 490        check_timeout_interval = self.get_check_timeout_interval_seconds()
 491
 492        if not timeout:
 493            success = self.status == 'running'
 494            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
 495            if success:
 496                self._capture_process_timestamp('began')
 497            return success, msg
 498
 499        begin = time.perf_counter()
 500        while (time.perf_counter() - begin) < timeout:
 501            if self.status == 'running':
 502                self._capture_process_timestamp('began')
 503                return True, "Success"
 504            time.sleep(check_timeout_interval)
 505
 506        return False, (
 507            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
 508            + ('s' if timeout != 1 else '') + '.'
 509        )
 510
 511
 512    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 513        """
 514        Forcibly terminate a running daemon.
 515        Sends a SIGTERM signal to the process.
 516
 517        Parameters
 518        ----------
 519        timeout: Optional[int], default 3
 520            How many seconds to wait for the process to terminate.
 521
 522        Returns
 523        -------
 524        A SuccessTuple indicating success.
 525        """
 526        if self.status != 'paused':
 527            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 528            if success:
 529                self._write_stop_file('kill')
 530                self.stdin_file.close()
 531                self._remove_blocking_stdin_file()
 532                return success, msg
 533
 534        if self.status == 'stopped':
 535            self._write_stop_file('kill')
 536            self.stdin_file.close()
 537            self._remove_blocking_stdin_file()
 538            return True, "Process has already stopped."
 539
 540        psutil = attempt_import('psutil')
 541        process = self.process
 542        try:
 543            process.terminate()
 544            process.kill()
 545            process.wait(timeout=timeout)
 546        except Exception as e:
 547            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
 548
 549        try:
 550            if process.status():
 551                return False, "Failed to stop daemon '{self}' ({process})."
 552        except psutil.NoSuchProcess:
 553            pass
 554
 555        if self.pid_path.exists():
 556            try:
 557                self.pid_path.unlink()
 558            except Exception:
 559                pass
 560
 561        self._write_stop_file('kill')
 562        self.stdin_file.close()
 563        self._remove_blocking_stdin_file()
 564        return True, "Success"
 565
 566    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 567        """Gracefully quit a running daemon."""
 568        if self.status == 'paused':
 569            return self.kill(timeout)
 570
 571        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 572        if signal_success:
 573            self._write_stop_file('quit')
 574            self.stdin_file.close()
 575            self._remove_blocking_stdin_file()
 576        return signal_success, signal_msg
 577
 578    def pause(
 579        self,
 580        timeout: Union[int, float, None] = None,
 581        check_timeout_interval: Union[float, int, None] = None,
 582    ) -> SuccessTuple:
 583        """
 584        Pause the daemon if it is running.
 585
 586        Parameters
 587        ----------
 588        timeout: Union[float, int, None], default None
 589            The maximum number of seconds to wait for a process to suspend.
 590
 591        check_timeout_interval: Union[float, int, None], default None
 592            The number of seconds to wait between checking if the process is still running.
 593
 594        Returns
 595        -------
 596        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 597        """
 598        self._remove_blocking_stdin_file()
 599
 600        if self.process is None:
 601            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 602
 603        if self.status == 'paused':
 604            return True, f"Daemon '{self.daemon_id}' is already paused."
 605
 606        self._write_stop_file('pause')
 607        self.stdin_file.close()
 608        self._remove_blocking_stdin_file()
 609        try:
 610            self.process.suspend()
 611        except Exception as e:
 612            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 613
 614        timeout = self.get_timeout_seconds(timeout)
 615        check_timeout_interval = self.get_check_timeout_interval_seconds(
 616            check_timeout_interval
 617        )
 618
 619        psutil = attempt_import('psutil')
 620
 621        if not timeout:
 622            try:
 623                success = self.process.status() == 'stopped'
 624            except psutil.NoSuchProcess:
 625                success = True
 626            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 627            if success:
 628                self._capture_process_timestamp('paused')
 629            return success, msg
 630
 631        begin = time.perf_counter()
 632        while (time.perf_counter() - begin) < timeout:
 633            try:
 634                if self.process.status() == 'stopped':
 635                    self._capture_process_timestamp('paused')
 636                    return True, "Success"
 637            except psutil.NoSuchProcess as e:
 638                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 639            time.sleep(check_timeout_interval)
 640
 641        return False, (
 642            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 643            + ('s' if timeout != 1 else '') + '.'
 644        )
 645
 646    def resume(
 647        self,
 648        timeout: Union[int, float, None] = None,
 649        check_timeout_interval: Union[float, int, None] = None,
 650    ) -> SuccessTuple:
 651        """
 652        Resume the daemon if it is paused.
 653
 654        Parameters
 655        ----------
 656        timeout: Union[float, int, None], default None
 657            The maximum number of seconds to wait for a process to resume.
 658
 659        check_timeout_interval: Union[float, int, None], default None
 660            The number of seconds to wait between checking if the process is still stopped.
 661
 662        Returns
 663        -------
 664        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 665        """
 666        if self.status == 'running':
 667            return True, f"Daemon '{self.daemon_id}' is already running."
 668
 669        if self.status == 'stopped':
 670            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 671
 672        self._remove_stop_file()
 673        try:
 674            if self.process is None:
 675                return False, f"Cannot resume daemon '{self.daemon_id}'."
 676
 677            self.process.resume()
 678        except Exception as e:
 679            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 680
 681        timeout = self.get_timeout_seconds(timeout)
 682        check_timeout_interval = self.get_check_timeout_interval_seconds(
 683            check_timeout_interval
 684        )
 685
 686        if not timeout:
 687            success = self.status == 'running'
 688            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 689            if success:
 690                self._capture_process_timestamp('began')
 691            return success, msg
 692
 693        begin = time.perf_counter()
 694        while (time.perf_counter() - begin) < timeout:
 695            if self.status == 'running':
 696                self._capture_process_timestamp('began')
 697                return True, "Success"
 698            time.sleep(check_timeout_interval)
 699
 700        return False, (
 701            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 702            + ('s' if timeout != 1 else '') + '.'
 703        )
 704
 705    def _write_stop_file(self, action: str) -> SuccessTuple:
 706        """Write the stop file timestamp and action."""
 707        if action not in ('quit', 'kill', 'pause'):
 708            return False, f"Unsupported action '{action}'."
 709
 710        if not self.stop_path.parent.exists():
 711            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 712
 713        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 714            json.dump(
 715                {
 716                    'stop_time': datetime.now(timezone.utc).isoformat(),
 717                    'action': action,
 718                },
 719                f
 720            )
 721
 722        return True, "Success"
 723
 724    def _remove_stop_file(self) -> SuccessTuple:
 725        """Remove the stop file"""
 726        if not self.stop_path.exists():
 727            return True, "Stop file does not exist."
 728
 729        try:
 730            self.stop_path.unlink()
 731        except Exception as e:
 732            return False, f"Failed to remove stop file:\n{e}"
 733
 734        return True, "Success"
 735
 736    def _read_stop_file(self) -> Dict[str, Any]:
 737        """
 738        Read the stop file if it exists.
 739        """
 740        if not self.stop_path.exists():
 741            return {}
 742
 743        try:
 744            with open(self.stop_path, 'r', encoding='utf-8') as f:
 745                data = json.load(f)
 746            return data
 747        except Exception:
 748            return {}
 749
 750    def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple:
 751        """
 752        Remove the blocking STDIN file if it exists.
 753        """
 754        try:
 755            if self.blocking_stdin_file_path.exists():
 756                self.blocking_stdin_file_path.unlink()
 757        except Exception as e:
 758            return False, str(e)
 759
 760        return True, "Success"
 761
 762    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 763        """
 764        Handle `SIGTERM` within the `Daemon` context.
 765        This method is injected into the `DaemonContext`.
 766        """
 767        from meerschaum.utils.process import signal_handler
 768        signal_handler(signal_number, stack_frame)
 769
 770        timer = self.__dict__.get('_log_refresh_timer', None)
 771        if timer is not None:
 772            timer.cancel()
 773
 774        daemon_context = self.__dict__.get('_daemon_context', None)
 775        if daemon_context is not None:
 776            daemon_context.close()
 777
 778        _close_pools()
 779        raise SystemExit(0)
 780
 781    def _send_signal(
 782        self,
 783        signal_to_send,
 784        timeout: Union[float, int, None] = None,
 785        check_timeout_interval: Union[float, int, None] = None,
 786    ) -> SuccessTuple:
 787        """Send a signal to the daemon process.
 788
 789        Parameters
 790        ----------
 791        signal_to_send:
 792            The signal the send to the daemon, e.g. `signals.SIGINT`.
 793
 794        timeout: Union[float, int, None], default None
 795            The maximum number of seconds to wait for a process to terminate.
 796
 797        check_timeout_interval: Union[float, int, None], default None
 798            The number of seconds to wait between checking if the process is still running.
 799
 800        Returns
 801        -------
 802        A SuccessTuple indicating success.
 803        """
 804        try:
 805            pid = self.pid
 806            if pid is None:
 807                return (
 808                    False,
 809                    f"Daemon '{self.daemon_id}' is not running, "
 810                    + f"cannot send signal '{signal_to_send}'."
 811                )
 812            
 813            os.kill(pid, signal_to_send)
 814        except Exception:
 815            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 816
 817        timeout = self.get_timeout_seconds(timeout)
 818        check_timeout_interval = self.get_check_timeout_interval_seconds(
 819            check_timeout_interval
 820        )
 821
 822        if not timeout:
 823            return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'."
 824
 825        begin = time.perf_counter()
 826        while (time.perf_counter() - begin) < timeout:
 827            if not self.status == 'running':
 828                return True, "Success"
 829            time.sleep(check_timeout_interval)
 830
 831        return False, (
 832            f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second"
 833            + ('s' if timeout != 1 else '') + '.'
 834        )
 835
 836    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 837        """Create the Daemon's directory.
 838        If `allow_dirty_run` is `False` and the directory already exists,
 839        raise a `FileExistsError`.
 840        """
 841        try:
 842            self.path.mkdir(parents=True, exist_ok=True)
 843            _already_exists = any(os.scandir(self.path))
 844        except FileExistsError:
 845            _already_exists = True
 846
 847        if _already_exists and not allow_dirty_run:
 848            error(
 849                f"Daemon '{self.daemon_id}' already exists. " +
 850                "To allow this daemon to run, do one of the following:\n"
 851                + "  - Execute `daemon.cleanup()`.\n"
 852                + f"  - Delete the directory '{self.path}'.\n"
 853                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 854                FileExistsError,
 855            )
 856
 857    @property
 858    def process(self) -> Union['psutil.Process', None]:
 859        """
 860        Return the psutil process for the Daemon.
 861        """
 862        psutil = attempt_import('psutil')
 863        pid = self.pid
 864        if pid is None:
 865            return None
 866        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 867            try:
 868                self._process = psutil.Process(int(pid))
 869                process_exists = True
 870            except Exception:
 871                process_exists = False
 872            if not process_exists:
 873                _ = self.__dict__.pop('_process', None)
 874                try:
 875                    if self.pid_path.exists():
 876                        self.pid_path.unlink()
 877                except Exception:
 878                    pass
 879                return None
 880        return self._process
 881
 882    @property
 883    def status(self) -> str:
 884        """
 885        Return the running status of this Daemon.
 886        """
 887        if self.process is None:
 888            return 'stopped'
 889
 890        psutil = attempt_import('psutil', lazy=False)
 891        try:
 892            if self.process.status() == 'stopped':
 893                return 'paused'
 894            if self.process.status() == 'zombie':
 895                raise psutil.NoSuchProcess(self.process.pid)
 896        except (psutil.NoSuchProcess, AttributeError):
 897            if self.pid_path.exists():
 898                try:
 899                    self.pid_path.unlink()
 900                except Exception:
 901                    pass
 902            return 'stopped'
 903
 904        return 'running'
 905
 906    @classmethod
 907    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 908        """
 909        Return a Daemon's path from its `daemon_id`.
 910        """
 911        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
 912        return DAEMON_RESOURCES_PATH / daemon_id
 913
 914    @property
 915    def path(self) -> pathlib.Path:
 916        """
 917        Return the path for this Daemon's directory.
 918        """
 919        return self._get_path_from_daemon_id(self.daemon_id)
 920
 921    @classmethod
 922    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 923        """
 924        Return the `properties.json` path for a given `daemon_id`.
 925        """
 926        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 927
 928    @property
 929    def properties_path(self) -> pathlib.Path:
 930        """
 931        Return the `propterties.json` path for this Daemon.
 932        """
 933        return self._get_properties_path_from_daemon_id(self.daemon_id)
 934
 935    @property
 936    def stop_path(self) -> pathlib.Path:
 937        """
 938        Return the path for the stop file (created when manually stopped).
 939        """
 940        return self.path / '.stop.json'
 941
 942    @property
 943    def log_path(self) -> pathlib.Path:
 944        """
 945        Return the log path.
 946        """
 947        logs_cf = self.properties.get('logs', None) or {}
 948        if 'path' not in logs_cf:
 949            from meerschaum.config.paths import LOGS_RESOURCES_PATH
 950            return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 951
 952        return pathlib.Path(logs_cf['path'])
 953
 954    @property
 955    def stdin_file_path(self) -> pathlib.Path:
 956        """
 957        Return the stdin file path.
 958        """
 959        return self.path / 'input.stdin'
 960
 961    @property
 962    def blocking_stdin_file_path(self) -> pathlib.Path:
 963        """
 964        Return the stdin file path.
 965        """
 966        if '_blocking_stdin_file_path' in self.__dict__:
 967            return self._blocking_stdin_file_path
 968
 969        return self.path / 'input.stdin.block'
 970
 971    @property
 972    def prompt_kwargs_file_path(self) -> pathlib.Path:
 973        """
 974        Return the file path to the kwargs for the invoking `prompt()`.
 975        """
 976        return self.path / 'prompt_kwargs.json'
 977
 978    @property
 979    def log_offset_path(self) -> pathlib.Path:
 980        """
 981        Return the log offset file path.
 982        """
 983        from meerschaum.config.paths import LOGS_RESOURCES_PATH
 984        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 985
 986    @property
 987    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
 988        """
 989        Return the process lock context manager.
 990        """
 991        if '_log_offset_lock' in self.__dict__:
 992            return self._log_offset_lock
 993
 994        fasteners = attempt_import('fasteners')
 995        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
 996        return self._log_offset_lock
 997
 998    @property
 999    def rotating_log(self) -> RotatingFile:
1000        """
1001        The rotating log file for the daemon's output.
1002        """
1003        if '_rotating_log' in self.__dict__:
1004            return self._rotating_log
1005
1006        logs_cf = self.properties.get('logs', None) or {}
1007        write_timestamps = logs_cf.get('write_timestamps', None)
1008        if write_timestamps is None:
1009            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1010
1011        timestamp_format = logs_cf.get('timestamp_format', None)
1012        if timestamp_format is None:
1013            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1014
1015        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1016        if num_files_to_keep is None:
1017            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1018
1019        max_file_size = logs_cf.get('max_file_size', None)
1020        if max_file_size is None:
1021            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1022
1023        redirect_streams = logs_cf.get('redirect_streams', True)
1024
1025        self._rotating_log = RotatingFile(
1026            self.log_path,
1027            redirect_streams=redirect_streams,
1028            write_timestamps=write_timestamps,
1029            timestamp_format=timestamp_format,
1030            num_files_to_keep=num_files_to_keep,
1031            max_file_size=max_file_size,
1032        )
1033        return self._rotating_log
1034
1035    @property
1036    def stdin_file(self):
1037        """
1038        Return the file handler for the stdin file.
1039        """
1040        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1041            return _stdin_file
1042
1043        self._stdin_file = StdinFile(
1044            self.stdin_file_path,
1045            lock_file_path=self.blocking_stdin_file_path,
1046        )
1047        return self._stdin_file
1048
1049    @property
1050    def log_text(self) -> Union[str, None]:
1051        """
1052        Read the log files and return their contents.
1053        Returns `None` if the log file does not exist.
1054        """
1055        logs_cf = self.properties.get('logs', None) or {}
1056        write_timestamps = logs_cf.get('write_timestamps', None)
1057        if write_timestamps is None:
1058            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1059
1060        timestamp_format = logs_cf.get('timestamp_format', None)
1061        if timestamp_format is None:
1062            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1063
1064        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1065        if num_files_to_keep is None:
1066            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1067
1068        max_file_size = logs_cf.get('max_file_size', None)
1069        if max_file_size is None:
1070            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1071
1072        new_rotating_log = RotatingFile(
1073            self.rotating_log.file_path,
1074            num_files_to_keep=num_files_to_keep,
1075            max_file_size=max_file_size,
1076            write_timestamps=write_timestamps,
1077            timestamp_format=timestamp_format,
1078        )
1079        return new_rotating_log.read()
1080
1081    def readlines(self) -> List[str]:
1082        """
1083        Read the next log lines, persisting the cursor for later use.
1084        Note this will alter the cursor of `self.rotating_log`.
1085        """
1086        self.rotating_log._cursor = self._read_log_offset()
1087        lines = self.rotating_log.readlines()
1088        self._write_log_offset()
1089        return lines
1090
1091    def _read_log_offset(self) -> Tuple[int, int]:
1092        """
1093        Return the current log offset cursor.
1094
1095        Returns
1096        -------
1097        A tuple of the form (`subfile_index`, `position`).
1098        """
1099        if not self.log_offset_path.exists():
1100            return 0, 0
1101
1102        try:
1103            with open(self.log_offset_path, 'r', encoding='utf-8') as f:
1104                cursor_text = f.read()
1105            cursor_parts = cursor_text.split(' ')
1106            subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
1107            return subfile_index, subfile_position
1108        except Exception as e:
1109            warn(f"Failed to read cursor:\n{e}")
1110        return 0, 0
1111
1112    def _write_log_offset(self) -> None:
1113        """
1114        Write the current log offset file.
1115        """
1116        with self.log_offset_lock:
1117            with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
1118                subfile_index = self.rotating_log._cursor[0]
1119                subfile_position = self.rotating_log._cursor[1]
1120                f.write(f"{subfile_index} {subfile_position}")
1121
1122    @property
1123    def pid(self) -> Union[int, None]:
1124        """
1125        Read the PID file and return its contents.
1126        Returns `None` if the PID file does not exist.
1127        """
1128        if not self.pid_path.exists():
1129            return None
1130        try:
1131            with open(self.pid_path, 'r', encoding='utf-8') as f:
1132                text = f.read()
1133            if len(text) == 0:
1134                return None
1135            pid = int(text.rstrip())
1136        except Exception as e:
1137            warn(e)
1138            text = None
1139            pid = None
1140        return pid
1141
1142    @property
1143    def pid_path(self) -> pathlib.Path:
1144        """
1145        Return the path to a file containing the PID for this Daemon.
1146        """
1147        return self.path / 'process.pid'
1148
1149    @property
1150    def pid_lock(self) -> 'fasteners.InterProcessLock':
1151        """
1152        Return the process lock context manager.
1153        """
1154        if '_pid_lock' in self.__dict__:
1155            return self._pid_lock
1156
1157        fasteners = attempt_import('fasteners')
1158        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1159        return self._pid_lock
1160
1161    @property
1162    def pickle_path(self) -> pathlib.Path:
1163        """
1164        Return the path for the pickle file.
1165        """
1166        return self.path / 'pickle.pkl'
1167
1168    def read_properties(self) -> Optional[Dict[str, Any]]:
1169        """Read the properties JSON file and return the dictionary."""
1170        if not self.properties_path.exists():
1171            return None
1172        try:
1173            with open(self.properties_path, 'r', encoding='utf-8') as file:
1174                properties = json.load(file)
1175        except Exception:
1176            properties = {}
1177        
1178        return properties or {}
1179
1180    def read_pickle(self) -> Daemon:
1181        """Read a Daemon's pickle file and return the `Daemon`."""
1182        import pickle
1183        import traceback
1184        if not self.pickle_path.exists():
1185            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1186
1187        if self.pickle_path.stat().st_size == 0:
1188            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1189
1190        try:
1191            with open(self.pickle_path, 'rb') as pickle_file:
1192                daemon = pickle.load(pickle_file)
1193            success, msg = True, 'Success'
1194        except Exception as e:
1195            success, msg = False, str(e)
1196            daemon = None
1197            traceback.print_exception(type(e), e, e.__traceback__)
1198        if not success:
1199            error(msg)
1200        return daemon
1201
1202    @property
1203    def properties(self) -> Dict[str, Any]:
1204        """
1205        Return the contents of the properties JSON file.
1206        """
1207        try:
1208            _file_properties = self.read_properties() or {}
1209        except Exception:
1210            traceback.print_exc()
1211            _file_properties = {}
1212
1213        if not self._properties:
1214            self._properties = _file_properties
1215
1216        if self._properties is None:
1217            self._properties = {}
1218
1219        if (
1220            self._properties.get('result', None) is None
1221            and _file_properties.get('result', None) is not None
1222        ):
1223            _ = self._properties.pop('result', None)
1224
1225        if _file_properties is not None:
1226            self._properties = apply_patch_to_config(
1227                _file_properties,
1228                self._properties,
1229            )
1230
1231        return self._properties
1232
1233    @property
1234    def hidden(self) -> bool:
1235        """
1236        Return a bool indicating whether this Daemon should be displayed.
1237        """
1238        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1239
1240    def write_properties(self) -> SuccessTuple:
1241        """Write the properties dictionary to the properties JSON file
1242        (only if self.properties exists).
1243        """
1244        from meerschaum.utils.misc import generate_password
1245        success, msg = (
1246            False,
1247            f"No properties to write for daemon '{self.daemon_id}'."
1248        )
1249        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1250        props = self.properties
1251        if props is not None:
1252            try:
1253                self.path.mkdir(parents=True, exist_ok=True)
1254                if self.properties_path.exists():
1255                    self.properties_path.rename(backup_path)
1256                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1257                    json.dump(props, properties_file)
1258                success, msg = True, 'Success'
1259            except Exception as e:
1260                success, msg = False, str(e)
1261
1262        try:
1263            if backup_path.exists():
1264                if not success:
1265                    backup_path.rename(self.properties_path)
1266                else:
1267                    backup_path.unlink()
1268        except Exception as e:
1269            success, msg = False, str(e)
1270
1271        return success, msg
1272
1273    def write_pickle(self) -> SuccessTuple:
1274        """Write the pickle file for the daemon."""
1275        import pickle
1276        import traceback
1277        from meerschaum.utils.misc import generate_password
1278
1279        if not self.pickle:
1280            return True, "Success"
1281
1282        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1283        try:
1284            self.path.mkdir(parents=True, exist_ok=True)
1285            if self.pickle_path.exists():
1286                self.pickle_path.rename(backup_path)
1287            with open(self.pickle_path, 'wb+') as pickle_file:
1288                pickle.dump(self, pickle_file)
1289            success, msg = True, "Success"
1290        except Exception as e:
1291            success, msg = False, str(e)
1292            traceback.print_exception(type(e), e, e.__traceback__)
1293        try:
1294            if backup_path.exists():
1295                if not success:
1296                    backup_path.rename(self.pickle_path)
1297                else:
1298                    backup_path.unlink()
1299        except Exception as e:
1300            success, msg = False, str(e)
1301        return success, msg
1302
1303
1304    def _setup(
1305        self,
1306        allow_dirty_run: bool = False,
1307    ) -> None:
1308        """
1309        Update properties before starting the Daemon.
1310        """
1311        if self.properties is None:
1312            self._properties = {}
1313
1314        self._properties.update({
1315            'target': {
1316                'name': self.target.__name__,
1317                'module': self.target.__module__,
1318                'args': self.target_args,
1319                'kw': self.target_kw,
1320            },
1321        })
1322        self.mkdir_if_not_exists(allow_dirty_run)
1323        _write_properties_success_tuple = self.write_properties()
1324        if not _write_properties_success_tuple[0]:
1325            error(_write_properties_success_tuple[1])
1326
1327        _write_pickle_success_tuple = self.write_pickle()
1328        if not _write_pickle_success_tuple[0]:
1329            error(_write_pickle_success_tuple[1])
1330
1331    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1332        """
1333        Remove a daemon's directory after execution.
1334
1335        Parameters
1336        ----------
1337        keep_logs: bool, default False
1338            If `True`, skip deleting the daemon's log files.
1339
1340        Returns
1341        -------
1342        A `SuccessTuple` indicating success.
1343        """
1344        if self.path.exists():
1345            try:
1346                shutil.rmtree(self.path)
1347            except Exception as e:
1348                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1349                warn(msg)
1350                return False, msg
1351        if not keep_logs:
1352            self.rotating_log.delete()
1353            try:
1354                if self.log_offset_path.exists():
1355                    self.log_offset_path.unlink()
1356            except Exception as e:
1357                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1358                warn(msg)
1359                return False, msg
1360        return True, "Success"
1361
1362
1363    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1364        """
1365        Return the timeout value to use. Use `--timeout-seconds` if provided,
1366        else the configured default (8).
1367        """
1368        if isinstance(timeout, (int, float)):
1369            return timeout
1370        return get_config('jobs', 'timeout_seconds')
1371
1372
1373    def get_check_timeout_interval_seconds(
1374        self,
1375        check_timeout_interval: Union[int, float, None] = None,
1376    ) -> Union[int, float]:
1377        """
1378        Return the interval value to check the status of timeouts.
1379        """
1380        if isinstance(check_timeout_interval, (int, float)):
1381            return check_timeout_interval
1382        return get_config('jobs', 'check_timeout_interval_seconds')
1383
1384    @property
1385    def target_args(self) -> Union[Tuple[Any], None]:
1386        """
1387        Return the positional arguments to pass to the target function.
1388        """
1389        target_args = (
1390            self.__dict__.get('_target_args', None)
1391            or self.properties.get('target', {}).get('args', None)
1392        )
1393        if target_args is None:
1394            return tuple([])
1395
1396        return tuple(target_args)
1397
1398    @property
1399    def target_kw(self) -> Union[Dict[str, Any], None]:
1400        """
1401        Return the keyword arguments to pass to the target function.
1402        """
1403        target_kw = (
1404            self.__dict__.get('_target_kw', None)
1405            or self.properties.get('target', {}).get('kw', None)
1406        )
1407        if target_kw is None:
1408            return {}
1409
1410        return {key: val for key, val in target_kw.items()}
1411
1412    def __getstate__(self):
1413        """
1414        Pickle this Daemon.
1415        """
1416        dill = attempt_import('dill')
1417        return {
1418            'target': dill.dumps(self.target),
1419            'target_args': self.target_args,
1420            'target_kw': self.target_kw,
1421            'daemon_id': self.daemon_id,
1422            'label': self.label,
1423            'properties': self.properties,
1424        }
1425
1426    def __setstate__(self, _state: Dict[str, Any]):
1427        """
1428        Restore this Daemon from a pickled state.
1429        If the properties file exists, skip the old pickled version.
1430        """
1431        dill = attempt_import('dill')
1432        _state['target'] = dill.loads(_state['target'])
1433        self._pickle = True
1434        daemon_id = _state.get('daemon_id', None)
1435        if not daemon_id:
1436            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1437
1438        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1439        ignore_properties = properties_path.exists()
1440        if ignore_properties:
1441            _state = {
1442                key: val
1443                for key, val in _state.items()
1444                if key != 'properties'
1445            }
1446        self.__init__(**_state)
1447
1448
1449    def __repr__(self):
1450        return str(self)
1451
1452    def __str__(self):
1453        return self.daemon_id
1454
1455    def __eq__(self, other):
1456        if not isinstance(other, Daemon):
1457            return False
1458        return self.daemon_id == other.daemon_id
1459
1460    def __hash__(self):
1461        return hash(self.daemon_id)
class Daemon:
  42class Daemon:
  43    """
  44    Daemonize Python functions into background processes.
  45
  46    Examples
  47    --------
  48    >>> import meerschaum as mrsm
  49    >>> from meerschaum.utils.daemons import Daemon
  50    >>> daemon = Daemon(print, ('hi',))
  51    >>> success, msg = daemon.run()
  52    >>> print(daemon.log_text)
  53
  54    2024-07-29 18:03 | hi
  55    2024-07-29 18:03 |
  56    >>> daemon.run(allow_dirty_run=True)
  57    >>> print(daemon.log_text)
  58
  59    2024-07-29 18:03 | hi
  60    2024-07-29 18:03 |
  61    2024-07-29 18:05 | hi
  62    2024-07-29 18:05 |
  63    >>> mrsm.pprint(daemon.properties)
  64    {
  65        'label': 'print',
  66        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  67        'result': None,
  68        'process': {'ended': '2024-07-29T18:03:33.752806'}
  69    }
  70
  71    """
  72
  73    def __new__(
  74        cls,
  75        *args,
  76        daemon_id: Optional[str] = None,
  77        **kw
  78    ):
  79        """
  80        If a daemon_id is provided and already exists, read from its pickle file.
  81        """
  82        instance = super(Daemon, cls).__new__(cls)
  83        if daemon_id is not None:
  84            instance.daemon_id = daemon_id
  85            if instance.pickle_path.exists():
  86                instance = instance.read_pickle()
  87        return instance
  88
  89    @classmethod
  90    def from_properties_file(cls, daemon_id: str) -> Daemon:
  91        """
  92        Return a Daemon from a properties dictionary.
  93        """
  94        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  95        if not properties_path.exists():
  96            raise OSError(f"Properties file '{properties_path}' does not exist.")
  97
  98        try:
  99            with open(properties_path, 'r', encoding='utf-8') as f:
 100                properties = json.load(f)
 101        except Exception:
 102            properties = {}
 103
 104        if not properties:
 105            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 106
 107        daemon_id = properties_path.parent.name
 108        target_cf = properties.get('target', {})
 109        target_module_name = target_cf.get('module', None)
 110        target_function_name = target_cf.get('name', None)
 111        target_args = target_cf.get('args', None)
 112        target_kw = target_cf.get('kw', None)
 113        label = properties.get('label', None)
 114
 115        if None in [
 116            target_module_name,
 117            target_function_name,
 118            target_args,
 119            target_kw,
 120        ]:
 121            raise ValueError("Missing target function information.")
 122
 123        target_module = importlib.import_module(target_module_name)
 124        target_function = getattr(target_module, target_function_name)
 125
 126        return Daemon(
 127            daemon_id=daemon_id,
 128            target=target_function,
 129            target_args=target_args,
 130            target_kw=target_kw,
 131            properties=properties,
 132            label=label,
 133        )
 134
 135
 136    def __init__(
 137        self,
 138        target: Optional[Callable[[Any], Any]] = None,
 139        target_args: Union[List[Any], Tuple[Any], None] = None,
 140        target_kw: Optional[Dict[str, Any]] = None,
 141        env: Optional[Dict[str, str]] = None,
 142        daemon_id: Optional[str] = None,
 143        label: Optional[str] = None,
 144        properties: Optional[Dict[str, Any]] = None,
 145        pickle: bool = True,
 146    ):
 147        """
 148        Parameters
 149        ----------
 150        target: Optional[Callable[[Any], Any]], default None,
 151            The function to execute in a child process.
 152
 153        target_args: Union[List[Any], Tuple[Any], None], default None
 154            Positional arguments to pass to the target function.
 155
 156        target_kw: Optional[Dict[str, Any]], default None
 157            Keyword arguments to pass to the target function.
 158
 159        env: Optional[Dict[str, str]], default None
 160            If provided, set these environment variables in the daemon process.
 161
 162        daemon_id: Optional[str], default None
 163            Build a `Daemon` from an existing `daemon_id`.
 164            If `daemon_id` is provided, other arguments are ignored and are derived
 165            from the existing pickled `Daemon`.
 166
 167        label: Optional[str], default None
 168            Label string to help identifiy a daemon.
 169            If `None`, use the function name instead.
 170
 171        properties: Optional[Dict[str, Any]], default None
 172            Override reading from the properties JSON by providing an existing dictionary.
 173        """
 174        _pickle = self.__dict__.get('_pickle', False)
 175        if daemon_id is not None:
 176            self.daemon_id = daemon_id
 177            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 178
 179                if not self.properties_path.exists():
 180                    raise Exception(
 181                        f"Daemon '{self.daemon_id}' does not exist. "
 182                        + "Pass a target to create a new Daemon."
 183                    )
 184
 185                try:
 186                    new_daemon = self.from_properties_file(daemon_id)
 187                except Exception:
 188                    new_daemon = None
 189
 190                if new_daemon is not None:
 191                    new_daemon.write_pickle()
 192                    target = new_daemon.target
 193                    target_args = new_daemon.target_args
 194                    target_kw = new_daemon.target_kw
 195                    label = new_daemon.label
 196                    self._properties = new_daemon.properties
 197                else:
 198                    try:
 199                        self.properties_path.unlink()
 200                    except Exception:
 201                        pass
 202
 203                    raise Exception(
 204                        f"Could not recover daemon '{self.daemon_id}' "
 205                        + "from its properties file."
 206                    )
 207
 208        if 'target' not in self.__dict__:
 209            if target is None:
 210                error("Cannot create a Daemon without a target.")
 211            self.target = target
 212
 213        self.pickle = pickle
 214
 215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 216        if '_target_args' not in self.__dict__:
 217            self._target_args = target_args
 218        if '_target_kw' not in self.__dict__:
 219            self._target_kw = target_kw
 220
 221        if 'label' not in self.__dict__:
 222            if label is None:
 223                label = (
 224                    self.target.__name__ if '__name__' in self.target.__dir__()
 225                        else str(self.target)
 226                )
 227            self.label = label
 228        elif label is not None:
 229            self.label = label
 230
 231        if 'daemon_id' not in self.__dict__:
 232            self.daemon_id = get_new_daemon_name()
 233        if '_properties' not in self.__dict__:
 234            self._properties = properties
 235        elif properties:
 236            if self._properties is None:
 237                self._properties = {}
 238            self._properties.update(properties)
 239        if self._properties is None:
 240            self._properties = {}
 241
 242        self._properties.update({'label': self.label})
 243        if env:
 244            self._properties.update({'env': env})
 245
 246        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 247        _ = self.process
 248
 249
 250    def _run_exit(
 251        self,
 252        keep_daemon_output: bool = True,
 253        allow_dirty_run: bool = False,
 254    ) -> Any:
 255        """Run the daemon's target function.
 256        NOTE: This WILL EXIT the parent process!
 257
 258        Parameters
 259        ----------
 260        keep_daemon_output: bool, default True
 261            If `False`, delete the daemon's output directory upon exiting.
 262
 263        allow_dirty_run, bool, default False:
 264            If `True`, run the daemon, even if the `daemon_id` directory exists.
 265            This option is dangerous because if the same `daemon_id` runs twice,
 266            the last to finish will overwrite the output of the first.
 267
 268        Returns
 269        -------
 270        Nothing — this will exit the parent process.
 271        """
 272        import platform
 273        import sys
 274        import os
 275        import traceback
 276        from meerschaum.utils.warnings import warn
 277        from meerschaum.config import get_config
 278        daemon = attempt_import('daemon')
 279        lines = get_config('jobs', 'terminal', 'lines')
 280        columns = get_config('jobs', 'terminal', 'columns')
 281
 282        if platform.system() == 'Windows':
 283            return False, "Windows is no longer supported."
 284
 285        self._setup(allow_dirty_run)
 286
 287        _daemons.append(self)
 288
 289        logs_cf = self.properties.get('logs', {})
 290        log_refresh_seconds = logs_cf.get('refresh_files_seconds', None)
 291        if log_refresh_seconds is None:
 292            log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 293        write_timestamps = logs_cf.get('write_timestamps', None)
 294        if write_timestamps is None:
 295            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 296
 297        self._log_refresh_timer = RepeatTimer(
 298            log_refresh_seconds,
 299            partial(self.rotating_log.refresh_files, start_interception=write_timestamps),
 300        )
 301
 302        capture_stdin = logs_cf.get('stdin', True)
 303        cwd = self.properties.get('cwd', os.getcwd())
 304
 305        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 306        ###       KeyboardInterrupts themselves.
 307        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 308        self._daemon_context = daemon.DaemonContext(
 309            pidfile=self.pid_lock,
 310            stdout=self.rotating_log,
 311            stderr=self.rotating_log,
 312            stdin=(self.stdin_file if capture_stdin else None),
 313            working_directory=cwd,
 314            detach_process=True,
 315            files_preserve=list(self.rotating_log.subfile_objects.values()),
 316            signal_map={
 317                signal.SIGTERM: self._handle_sigterm,
 318            },
 319        )
 320
 321        if capture_stdin and sys.stdin is None:
 322            raise OSError("Cannot daemonize without stdin.")
 323
 324        try:
 325            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 326            with self._daemon_context:
 327                if capture_stdin:
 328                    sys.stdin = self.stdin_file
 329                _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None)
 330                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 331                os.environ['PYTHONUNBUFFERED'] = '1'
 332
 333                ### Allow the user to override environment variables.
 334                env = self.properties.get('env', {})
 335                if env and isinstance(env, dict):
 336                    os.environ.update({str(k): str(v) for k, v in env.items()})
 337
 338                self.rotating_log.refresh_files(start_interception=True)
 339                result = None
 340                try:
 341                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 342                        f.write(str(os.getpid()))
 343
 344                    ### NOTE: The timer fails to start for remote actions to localhost.
 345                    try:
 346                        if not self._log_refresh_timer.is_running():
 347                            self._log_refresh_timer.start()
 348                    except Exception:
 349                        pass
 350
 351                    self.properties['result'] = None
 352                    self._capture_process_timestamp('began')
 353                    result = self.target(*self.target_args, **self.target_kw)
 354                    self.properties['result'] = result
 355                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 356                    result = False, traceback.format_exc()
 357                except Exception as e:
 358                    warn(
 359                        f"Exception in daemon target function: {traceback.format_exc()}",
 360                    )
 361                    result = False, str(e)
 362                finally:
 363                    _results[self.daemon_id] = result
 364                    self.properties['result'] = result
 365
 366                    if keep_daemon_output:
 367                        self._capture_process_timestamp('ended')
 368                    else:
 369                        self.cleanup()
 370
 371                    self._log_refresh_timer.cancel()
 372                    try:
 373                        if self.pid is None and self.pid_path.exists():
 374                            self.pid_path.unlink()
 375                    except Exception:
 376                        pass
 377
 378                    if is_success_tuple(result):
 379                        try:
 380                            mrsm.pprint(result)
 381                        except BrokenPipeError:
 382                            pass
 383
 384        except Exception:
 385            daemon_error = traceback.format_exc()
 386            from meerschaum.config.paths import DAEMON_ERROR_LOG_PATH
 387            with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 388                f.write(
 389                    f"Error in Daemon '{self}':\n\n"
 390                    f"{sys.stdin=}\n"
 391                    f"{self.stdin_file_path=}\n"
 392                    f"{self.stdin_file_path.exists()=}\n\n"
 393                    f"{daemon_error}\n\n"
 394                )
 395            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 396
 397    def _capture_process_timestamp(
 398        self,
 399        process_key: str,
 400        write_properties: bool = True,
 401    ) -> None:
 402        """
 403        Record the current timestamp to the parameters `process:<process_key>`.
 404
 405        Parameters
 406        ----------
 407        process_key: str
 408            Under which key to store the timestamp.
 409
 410        write_properties: bool, default True
 411            If `True` persist the properties to disk immediately after capturing the timestamp.
 412        """
 413        if 'process' not in self.properties:
 414            self.properties['process'] = {}
 415
 416        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 417            raise ValueError(f"Invalid key '{process_key}'.")
 418
 419        self.properties['process'][process_key] = (
 420            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 421        )
 422        if write_properties:
 423            self.write_properties()
 424
 425    def run(
 426        self,
 427        keep_daemon_output: bool = True,
 428        allow_dirty_run: bool = False,
 429        wait: bool = False,
 430        timeout: Union[int, float] = 4,
 431        debug: bool = False,
 432    ) -> SuccessTuple:
 433        """Run the daemon as a child process and continue executing the parent.
 434
 435        Parameters
 436        ----------
 437        keep_daemon_output: bool, default True
 438            If `False`, delete the daemon's output directory upon exiting.
 439
 440        allow_dirty_run: bool, default False
 441            If `True`, run the daemon, even if the `daemon_id` directory exists.
 442            This option is dangerous because if the same `daemon_id` runs concurrently,
 443            the last to finish will overwrite the output of the first.
 444
 445        wait: bool, default True
 446            If `True`, block until `Daemon.status` is running (or the timeout expires).
 447
 448        timeout: Union[int, float], default 4
 449            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
 450
 451        Returns
 452        -------
 453        A SuccessTuple indicating success.
 454
 455        """
 456        import platform
 457        if platform.system() == 'Windows':
 458            return False, "Cannot run background jobs on Windows."
 459
 460        ### The daemon might exist and be paused.
 461        if self.status == 'paused':
 462            return self.resume()
 463
 464        self._remove_stop_file()
 465        if self.status == 'running':
 466            return True, f"Daemon '{self}' is already running."
 467
 468        self.mkdir_if_not_exists(allow_dirty_run)
 469        _write_pickle_success_tuple = self.write_pickle()
 470        if not _write_pickle_success_tuple[0]:
 471            return _write_pickle_success_tuple
 472
 473        _launch_daemon_code = (
 474            "from meerschaum.utils.daemon import Daemon, _daemons; "
 475            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 476            f"_daemons['{self.daemon_id}'] = daemon; "
 477            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 478            "allow_dirty_run=True)"
 479        )
 480        env = dict(os.environ)
 481        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 482        msg = (
 483            "Success"
 484            if _launch_success_bool
 485            else f"Failed to start daemon '{self.daemon_id}'."
 486        )
 487        if not wait or not _launch_success_bool:
 488            return _launch_success_bool, msg
 489
 490        timeout = self.get_timeout_seconds(timeout)
 491        check_timeout_interval = self.get_check_timeout_interval_seconds()
 492
 493        if not timeout:
 494            success = self.status == 'running'
 495            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
 496            if success:
 497                self._capture_process_timestamp('began')
 498            return success, msg
 499
 500        begin = time.perf_counter()
 501        while (time.perf_counter() - begin) < timeout:
 502            if self.status == 'running':
 503                self._capture_process_timestamp('began')
 504                return True, "Success"
 505            time.sleep(check_timeout_interval)
 506
 507        return False, (
 508            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
 509            + ('s' if timeout != 1 else '') + '.'
 510        )
 511
 512
 513    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 514        """
 515        Forcibly terminate a running daemon.
 516        Sends a SIGTERM signal to the process.
 517
 518        Parameters
 519        ----------
 520        timeout: Optional[int], default 3
 521            How many seconds to wait for the process to terminate.
 522
 523        Returns
 524        -------
 525        A SuccessTuple indicating success.
 526        """
 527        if self.status != 'paused':
 528            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 529            if success:
 530                self._write_stop_file('kill')
 531                self.stdin_file.close()
 532                self._remove_blocking_stdin_file()
 533                return success, msg
 534
 535        if self.status == 'stopped':
 536            self._write_stop_file('kill')
 537            self.stdin_file.close()
 538            self._remove_blocking_stdin_file()
 539            return True, "Process has already stopped."
 540
 541        psutil = attempt_import('psutil')
 542        process = self.process
 543        try:
 544            process.terminate()
 545            process.kill()
 546            process.wait(timeout=timeout)
 547        except Exception as e:
 548            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
 549
 550        try:
 551            if process.status():
 552                return False, "Failed to stop daemon '{self}' ({process})."
 553        except psutil.NoSuchProcess:
 554            pass
 555
 556        if self.pid_path.exists():
 557            try:
 558                self.pid_path.unlink()
 559            except Exception:
 560                pass
 561
 562        self._write_stop_file('kill')
 563        self.stdin_file.close()
 564        self._remove_blocking_stdin_file()
 565        return True, "Success"
 566
 567    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 568        """Gracefully quit a running daemon."""
 569        if self.status == 'paused':
 570            return self.kill(timeout)
 571
 572        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 573        if signal_success:
 574            self._write_stop_file('quit')
 575            self.stdin_file.close()
 576            self._remove_blocking_stdin_file()
 577        return signal_success, signal_msg
 578
 579    def pause(
 580        self,
 581        timeout: Union[int, float, None] = None,
 582        check_timeout_interval: Union[float, int, None] = None,
 583    ) -> SuccessTuple:
 584        """
 585        Pause the daemon if it is running.
 586
 587        Parameters
 588        ----------
 589        timeout: Union[float, int, None], default None
 590            The maximum number of seconds to wait for a process to suspend.
 591
 592        check_timeout_interval: Union[float, int, None], default None
 593            The number of seconds to wait between checking if the process is still running.
 594
 595        Returns
 596        -------
 597        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 598        """
 599        self._remove_blocking_stdin_file()
 600
 601        if self.process is None:
 602            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 603
 604        if self.status == 'paused':
 605            return True, f"Daemon '{self.daemon_id}' is already paused."
 606
 607        self._write_stop_file('pause')
 608        self.stdin_file.close()
 609        self._remove_blocking_stdin_file()
 610        try:
 611            self.process.suspend()
 612        except Exception as e:
 613            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 614
 615        timeout = self.get_timeout_seconds(timeout)
 616        check_timeout_interval = self.get_check_timeout_interval_seconds(
 617            check_timeout_interval
 618        )
 619
 620        psutil = attempt_import('psutil')
 621
 622        if not timeout:
 623            try:
 624                success = self.process.status() == 'stopped'
 625            except psutil.NoSuchProcess:
 626                success = True
 627            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 628            if success:
 629                self._capture_process_timestamp('paused')
 630            return success, msg
 631
 632        begin = time.perf_counter()
 633        while (time.perf_counter() - begin) < timeout:
 634            try:
 635                if self.process.status() == 'stopped':
 636                    self._capture_process_timestamp('paused')
 637                    return True, "Success"
 638            except psutil.NoSuchProcess as e:
 639                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 640            time.sleep(check_timeout_interval)
 641
 642        return False, (
 643            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 644            + ('s' if timeout != 1 else '') + '.'
 645        )
 646
 647    def resume(
 648        self,
 649        timeout: Union[int, float, None] = None,
 650        check_timeout_interval: Union[float, int, None] = None,
 651    ) -> SuccessTuple:
 652        """
 653        Resume the daemon if it is paused.
 654
 655        Parameters
 656        ----------
 657        timeout: Union[float, int, None], default None
 658            The maximum number of seconds to wait for a process to resume.
 659
 660        check_timeout_interval: Union[float, int, None], default None
 661            The number of seconds to wait between checking if the process is still stopped.
 662
 663        Returns
 664        -------
 665        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 666        """
 667        if self.status == 'running':
 668            return True, f"Daemon '{self.daemon_id}' is already running."
 669
 670        if self.status == 'stopped':
 671            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 672
 673        self._remove_stop_file()
 674        try:
 675            if self.process is None:
 676                return False, f"Cannot resume daemon '{self.daemon_id}'."
 677
 678            self.process.resume()
 679        except Exception as e:
 680            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 681
 682        timeout = self.get_timeout_seconds(timeout)
 683        check_timeout_interval = self.get_check_timeout_interval_seconds(
 684            check_timeout_interval
 685        )
 686
 687        if not timeout:
 688            success = self.status == 'running'
 689            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 690            if success:
 691                self._capture_process_timestamp('began')
 692            return success, msg
 693
 694        begin = time.perf_counter()
 695        while (time.perf_counter() - begin) < timeout:
 696            if self.status == 'running':
 697                self._capture_process_timestamp('began')
 698                return True, "Success"
 699            time.sleep(check_timeout_interval)
 700
 701        return False, (
 702            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 703            + ('s' if timeout != 1 else '') + '.'
 704        )
 705
 706    def _write_stop_file(self, action: str) -> SuccessTuple:
 707        """Write the stop file timestamp and action."""
 708        if action not in ('quit', 'kill', 'pause'):
 709            return False, f"Unsupported action '{action}'."
 710
 711        if not self.stop_path.parent.exists():
 712            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 713
 714        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 715            json.dump(
 716                {
 717                    'stop_time': datetime.now(timezone.utc).isoformat(),
 718                    'action': action,
 719                },
 720                f
 721            )
 722
 723        return True, "Success"
 724
 725    def _remove_stop_file(self) -> SuccessTuple:
 726        """Remove the stop file"""
 727        if not self.stop_path.exists():
 728            return True, "Stop file does not exist."
 729
 730        try:
 731            self.stop_path.unlink()
 732        except Exception as e:
 733            return False, f"Failed to remove stop file:\n{e}"
 734
 735        return True, "Success"
 736
 737    def _read_stop_file(self) -> Dict[str, Any]:
 738        """
 739        Read the stop file if it exists.
 740        """
 741        if not self.stop_path.exists():
 742            return {}
 743
 744        try:
 745            with open(self.stop_path, 'r', encoding='utf-8') as f:
 746                data = json.load(f)
 747            return data
 748        except Exception:
 749            return {}
 750
 751    def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple:
 752        """
 753        Remove the blocking STDIN file if it exists.
 754        """
 755        try:
 756            if self.blocking_stdin_file_path.exists():
 757                self.blocking_stdin_file_path.unlink()
 758        except Exception as e:
 759            return False, str(e)
 760
 761        return True, "Success"
 762
 763    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 764        """
 765        Handle `SIGTERM` within the `Daemon` context.
 766        This method is injected into the `DaemonContext`.
 767        """
 768        from meerschaum.utils.process import signal_handler
 769        signal_handler(signal_number, stack_frame)
 770
 771        timer = self.__dict__.get('_log_refresh_timer', None)
 772        if timer is not None:
 773            timer.cancel()
 774
 775        daemon_context = self.__dict__.get('_daemon_context', None)
 776        if daemon_context is not None:
 777            daemon_context.close()
 778
 779        _close_pools()
 780        raise SystemExit(0)
 781
 782    def _send_signal(
 783        self,
 784        signal_to_send,
 785        timeout: Union[float, int, None] = None,
 786        check_timeout_interval: Union[float, int, None] = None,
 787    ) -> SuccessTuple:
 788        """Send a signal to the daemon process.
 789
 790        Parameters
 791        ----------
 792        signal_to_send:
 793            The signal the send to the daemon, e.g. `signals.SIGINT`.
 794
 795        timeout: Union[float, int, None], default None
 796            The maximum number of seconds to wait for a process to terminate.
 797
 798        check_timeout_interval: Union[float, int, None], default None
 799            The number of seconds to wait between checking if the process is still running.
 800
 801        Returns
 802        -------
 803        A SuccessTuple indicating success.
 804        """
 805        try:
 806            pid = self.pid
 807            if pid is None:
 808                return (
 809                    False,
 810                    f"Daemon '{self.daemon_id}' is not running, "
 811                    + f"cannot send signal '{signal_to_send}'."
 812                )
 813            
 814            os.kill(pid, signal_to_send)
 815        except Exception:
 816            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 817
 818        timeout = self.get_timeout_seconds(timeout)
 819        check_timeout_interval = self.get_check_timeout_interval_seconds(
 820            check_timeout_interval
 821        )
 822
 823        if not timeout:
 824            return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'."
 825
 826        begin = time.perf_counter()
 827        while (time.perf_counter() - begin) < timeout:
 828            if not self.status == 'running':
 829                return True, "Success"
 830            time.sleep(check_timeout_interval)
 831
 832        return False, (
 833            f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second"
 834            + ('s' if timeout != 1 else '') + '.'
 835        )
 836
 837    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 838        """Create the Daemon's directory.
 839        If `allow_dirty_run` is `False` and the directory already exists,
 840        raise a `FileExistsError`.
 841        """
 842        try:
 843            self.path.mkdir(parents=True, exist_ok=True)
 844            _already_exists = any(os.scandir(self.path))
 845        except FileExistsError:
 846            _already_exists = True
 847
 848        if _already_exists and not allow_dirty_run:
 849            error(
 850                f"Daemon '{self.daemon_id}' already exists. " +
 851                "To allow this daemon to run, do one of the following:\n"
 852                + "  - Execute `daemon.cleanup()`.\n"
 853                + f"  - Delete the directory '{self.path}'.\n"
 854                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 855                FileExistsError,
 856            )
 857
 858    @property
 859    def process(self) -> Union['psutil.Process', None]:
 860        """
 861        Return the psutil process for the Daemon.
 862        """
 863        psutil = attempt_import('psutil')
 864        pid = self.pid
 865        if pid is None:
 866            return None
 867        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 868            try:
 869                self._process = psutil.Process(int(pid))
 870                process_exists = True
 871            except Exception:
 872                process_exists = False
 873            if not process_exists:
 874                _ = self.__dict__.pop('_process', None)
 875                try:
 876                    if self.pid_path.exists():
 877                        self.pid_path.unlink()
 878                except Exception:
 879                    pass
 880                return None
 881        return self._process
 882
 883    @property
 884    def status(self) -> str:
 885        """
 886        Return the running status of this Daemon.
 887        """
 888        if self.process is None:
 889            return 'stopped'
 890
 891        psutil = attempt_import('psutil', lazy=False)
 892        try:
 893            if self.process.status() == 'stopped':
 894                return 'paused'
 895            if self.process.status() == 'zombie':
 896                raise psutil.NoSuchProcess(self.process.pid)
 897        except (psutil.NoSuchProcess, AttributeError):
 898            if self.pid_path.exists():
 899                try:
 900                    self.pid_path.unlink()
 901                except Exception:
 902                    pass
 903            return 'stopped'
 904
 905        return 'running'
 906
 907    @classmethod
 908    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 909        """
 910        Return a Daemon's path from its `daemon_id`.
 911        """
 912        from meerschaum.config.paths import DAEMON_RESOURCES_PATH
 913        return DAEMON_RESOURCES_PATH / daemon_id
 914
 915    @property
 916    def path(self) -> pathlib.Path:
 917        """
 918        Return the path for this Daemon's directory.
 919        """
 920        return self._get_path_from_daemon_id(self.daemon_id)
 921
 922    @classmethod
 923    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 924        """
 925        Return the `properties.json` path for a given `daemon_id`.
 926        """
 927        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 928
 929    @property
 930    def properties_path(self) -> pathlib.Path:
 931        """
 932        Return the `propterties.json` path for this Daemon.
 933        """
 934        return self._get_properties_path_from_daemon_id(self.daemon_id)
 935
 936    @property
 937    def stop_path(self) -> pathlib.Path:
 938        """
 939        Return the path for the stop file (created when manually stopped).
 940        """
 941        return self.path / '.stop.json'
 942
 943    @property
 944    def log_path(self) -> pathlib.Path:
 945        """
 946        Return the log path.
 947        """
 948        logs_cf = self.properties.get('logs', None) or {}
 949        if 'path' not in logs_cf:
 950            from meerschaum.config.paths import LOGS_RESOURCES_PATH
 951            return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 952
 953        return pathlib.Path(logs_cf['path'])
 954
 955    @property
 956    def stdin_file_path(self) -> pathlib.Path:
 957        """
 958        Return the stdin file path.
 959        """
 960        return self.path / 'input.stdin'
 961
 962    @property
 963    def blocking_stdin_file_path(self) -> pathlib.Path:
 964        """
 965        Return the stdin file path.
 966        """
 967        if '_blocking_stdin_file_path' in self.__dict__:
 968            return self._blocking_stdin_file_path
 969
 970        return self.path / 'input.stdin.block'
 971
 972    @property
 973    def prompt_kwargs_file_path(self) -> pathlib.Path:
 974        """
 975        Return the file path to the kwargs for the invoking `prompt()`.
 976        """
 977        return self.path / 'prompt_kwargs.json'
 978
 979    @property
 980    def log_offset_path(self) -> pathlib.Path:
 981        """
 982        Return the log offset file path.
 983        """
 984        from meerschaum.config.paths import LOGS_RESOURCES_PATH
 985        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 986
 987    @property
 988    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
 989        """
 990        Return the process lock context manager.
 991        """
 992        if '_log_offset_lock' in self.__dict__:
 993            return self._log_offset_lock
 994
 995        fasteners = attempt_import('fasteners')
 996        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
 997        return self._log_offset_lock
 998
 999    @property
1000    def rotating_log(self) -> RotatingFile:
1001        """
1002        The rotating log file for the daemon's output.
1003        """
1004        if '_rotating_log' in self.__dict__:
1005            return self._rotating_log
1006
1007        logs_cf = self.properties.get('logs', None) or {}
1008        write_timestamps = logs_cf.get('write_timestamps', None)
1009        if write_timestamps is None:
1010            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1011
1012        timestamp_format = logs_cf.get('timestamp_format', None)
1013        if timestamp_format is None:
1014            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1015
1016        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1017        if num_files_to_keep is None:
1018            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1019
1020        max_file_size = logs_cf.get('max_file_size', None)
1021        if max_file_size is None:
1022            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1023
1024        redirect_streams = logs_cf.get('redirect_streams', True)
1025
1026        self._rotating_log = RotatingFile(
1027            self.log_path,
1028            redirect_streams=redirect_streams,
1029            write_timestamps=write_timestamps,
1030            timestamp_format=timestamp_format,
1031            num_files_to_keep=num_files_to_keep,
1032            max_file_size=max_file_size,
1033        )
1034        return self._rotating_log
1035
1036    @property
1037    def stdin_file(self):
1038        """
1039        Return the file handler for the stdin file.
1040        """
1041        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1042            return _stdin_file
1043
1044        self._stdin_file = StdinFile(
1045            self.stdin_file_path,
1046            lock_file_path=self.blocking_stdin_file_path,
1047        )
1048        return self._stdin_file
1049
1050    @property
1051    def log_text(self) -> Union[str, None]:
1052        """
1053        Read the log files and return their contents.
1054        Returns `None` if the log file does not exist.
1055        """
1056        logs_cf = self.properties.get('logs', None) or {}
1057        write_timestamps = logs_cf.get('write_timestamps', None)
1058        if write_timestamps is None:
1059            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1060
1061        timestamp_format = logs_cf.get('timestamp_format', None)
1062        if timestamp_format is None:
1063            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1064
1065        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1066        if num_files_to_keep is None:
1067            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1068
1069        max_file_size = logs_cf.get('max_file_size', None)
1070        if max_file_size is None:
1071            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1072
1073        new_rotating_log = RotatingFile(
1074            self.rotating_log.file_path,
1075            num_files_to_keep=num_files_to_keep,
1076            max_file_size=max_file_size,
1077            write_timestamps=write_timestamps,
1078            timestamp_format=timestamp_format,
1079        )
1080        return new_rotating_log.read()
1081
1082    def readlines(self) -> List[str]:
1083        """
1084        Read the next log lines, persisting the cursor for later use.
1085        Note this will alter the cursor of `self.rotating_log`.
1086        """
1087        self.rotating_log._cursor = self._read_log_offset()
1088        lines = self.rotating_log.readlines()
1089        self._write_log_offset()
1090        return lines
1091
1092    def _read_log_offset(self) -> Tuple[int, int]:
1093        """
1094        Return the current log offset cursor.
1095
1096        Returns
1097        -------
1098        A tuple of the form (`subfile_index`, `position`).
1099        """
1100        if not self.log_offset_path.exists():
1101            return 0, 0
1102
1103        try:
1104            with open(self.log_offset_path, 'r', encoding='utf-8') as f:
1105                cursor_text = f.read()
1106            cursor_parts = cursor_text.split(' ')
1107            subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
1108            return subfile_index, subfile_position
1109        except Exception as e:
1110            warn(f"Failed to read cursor:\n{e}")
1111        return 0, 0
1112
1113    def _write_log_offset(self) -> None:
1114        """
1115        Write the current log offset file.
1116        """
1117        with self.log_offset_lock:
1118            with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
1119                subfile_index = self.rotating_log._cursor[0]
1120                subfile_position = self.rotating_log._cursor[1]
1121                f.write(f"{subfile_index} {subfile_position}")
1122
1123    @property
1124    def pid(self) -> Union[int, None]:
1125        """
1126        Read the PID file and return its contents.
1127        Returns `None` if the PID file does not exist.
1128        """
1129        if not self.pid_path.exists():
1130            return None
1131        try:
1132            with open(self.pid_path, 'r', encoding='utf-8') as f:
1133                text = f.read()
1134            if len(text) == 0:
1135                return None
1136            pid = int(text.rstrip())
1137        except Exception as e:
1138            warn(e)
1139            text = None
1140            pid = None
1141        return pid
1142
1143    @property
1144    def pid_path(self) -> pathlib.Path:
1145        """
1146        Return the path to a file containing the PID for this Daemon.
1147        """
1148        return self.path / 'process.pid'
1149
1150    @property
1151    def pid_lock(self) -> 'fasteners.InterProcessLock':
1152        """
1153        Return the process lock context manager.
1154        """
1155        if '_pid_lock' in self.__dict__:
1156            return self._pid_lock
1157
1158        fasteners = attempt_import('fasteners')
1159        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1160        return self._pid_lock
1161
1162    @property
1163    def pickle_path(self) -> pathlib.Path:
1164        """
1165        Return the path for the pickle file.
1166        """
1167        return self.path / 'pickle.pkl'
1168
1169    def read_properties(self) -> Optional[Dict[str, Any]]:
1170        """Read the properties JSON file and return the dictionary."""
1171        if not self.properties_path.exists():
1172            return None
1173        try:
1174            with open(self.properties_path, 'r', encoding='utf-8') as file:
1175                properties = json.load(file)
1176        except Exception:
1177            properties = {}
1178        
1179        return properties or {}
1180
1181    def read_pickle(self) -> Daemon:
1182        """Read a Daemon's pickle file and return the `Daemon`."""
1183        import pickle
1184        import traceback
1185        if not self.pickle_path.exists():
1186            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1187
1188        if self.pickle_path.stat().st_size == 0:
1189            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1190
1191        try:
1192            with open(self.pickle_path, 'rb') as pickle_file:
1193                daemon = pickle.load(pickle_file)
1194            success, msg = True, 'Success'
1195        except Exception as e:
1196            success, msg = False, str(e)
1197            daemon = None
1198            traceback.print_exception(type(e), e, e.__traceback__)
1199        if not success:
1200            error(msg)
1201        return daemon
1202
1203    @property
1204    def properties(self) -> Dict[str, Any]:
1205        """
1206        Return the contents of the properties JSON file.
1207        """
1208        try:
1209            _file_properties = self.read_properties() or {}
1210        except Exception:
1211            traceback.print_exc()
1212            _file_properties = {}
1213
1214        if not self._properties:
1215            self._properties = _file_properties
1216
1217        if self._properties is None:
1218            self._properties = {}
1219
1220        if (
1221            self._properties.get('result', None) is None
1222            and _file_properties.get('result', None) is not None
1223        ):
1224            _ = self._properties.pop('result', None)
1225
1226        if _file_properties is not None:
1227            self._properties = apply_patch_to_config(
1228                _file_properties,
1229                self._properties,
1230            )
1231
1232        return self._properties
1233
1234    @property
1235    def hidden(self) -> bool:
1236        """
1237        Return a bool indicating whether this Daemon should be displayed.
1238        """
1239        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1240
1241    def write_properties(self) -> SuccessTuple:
1242        """Write the properties dictionary to the properties JSON file
1243        (only if self.properties exists).
1244        """
1245        from meerschaum.utils.misc import generate_password
1246        success, msg = (
1247            False,
1248            f"No properties to write for daemon '{self.daemon_id}'."
1249        )
1250        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1251        props = self.properties
1252        if props is not None:
1253            try:
1254                self.path.mkdir(parents=True, exist_ok=True)
1255                if self.properties_path.exists():
1256                    self.properties_path.rename(backup_path)
1257                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1258                    json.dump(props, properties_file)
1259                success, msg = True, 'Success'
1260            except Exception as e:
1261                success, msg = False, str(e)
1262
1263        try:
1264            if backup_path.exists():
1265                if not success:
1266                    backup_path.rename(self.properties_path)
1267                else:
1268                    backup_path.unlink()
1269        except Exception as e:
1270            success, msg = False, str(e)
1271
1272        return success, msg
1273
1274    def write_pickle(self) -> SuccessTuple:
1275        """Write the pickle file for the daemon."""
1276        import pickle
1277        import traceback
1278        from meerschaum.utils.misc import generate_password
1279
1280        if not self.pickle:
1281            return True, "Success"
1282
1283        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1284        try:
1285            self.path.mkdir(parents=True, exist_ok=True)
1286            if self.pickle_path.exists():
1287                self.pickle_path.rename(backup_path)
1288            with open(self.pickle_path, 'wb+') as pickle_file:
1289                pickle.dump(self, pickle_file)
1290            success, msg = True, "Success"
1291        except Exception as e:
1292            success, msg = False, str(e)
1293            traceback.print_exception(type(e), e, e.__traceback__)
1294        try:
1295            if backup_path.exists():
1296                if not success:
1297                    backup_path.rename(self.pickle_path)
1298                else:
1299                    backup_path.unlink()
1300        except Exception as e:
1301            success, msg = False, str(e)
1302        return success, msg
1303
1304
1305    def _setup(
1306        self,
1307        allow_dirty_run: bool = False,
1308    ) -> None:
1309        """
1310        Update properties before starting the Daemon.
1311        """
1312        if self.properties is None:
1313            self._properties = {}
1314
1315        self._properties.update({
1316            'target': {
1317                'name': self.target.__name__,
1318                'module': self.target.__module__,
1319                'args': self.target_args,
1320                'kw': self.target_kw,
1321            },
1322        })
1323        self.mkdir_if_not_exists(allow_dirty_run)
1324        _write_properties_success_tuple = self.write_properties()
1325        if not _write_properties_success_tuple[0]:
1326            error(_write_properties_success_tuple[1])
1327
1328        _write_pickle_success_tuple = self.write_pickle()
1329        if not _write_pickle_success_tuple[0]:
1330            error(_write_pickle_success_tuple[1])
1331
1332    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1333        """
1334        Remove a daemon's directory after execution.
1335
1336        Parameters
1337        ----------
1338        keep_logs: bool, default False
1339            If `True`, skip deleting the daemon's log files.
1340
1341        Returns
1342        -------
1343        A `SuccessTuple` indicating success.
1344        """
1345        if self.path.exists():
1346            try:
1347                shutil.rmtree(self.path)
1348            except Exception as e:
1349                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1350                warn(msg)
1351                return False, msg
1352        if not keep_logs:
1353            self.rotating_log.delete()
1354            try:
1355                if self.log_offset_path.exists():
1356                    self.log_offset_path.unlink()
1357            except Exception as e:
1358                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1359                warn(msg)
1360                return False, msg
1361        return True, "Success"
1362
1363
1364    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1365        """
1366        Return the timeout value to use. Use `--timeout-seconds` if provided,
1367        else the configured default (8).
1368        """
1369        if isinstance(timeout, (int, float)):
1370            return timeout
1371        return get_config('jobs', 'timeout_seconds')
1372
1373
1374    def get_check_timeout_interval_seconds(
1375        self,
1376        check_timeout_interval: Union[int, float, None] = None,
1377    ) -> Union[int, float]:
1378        """
1379        Return the interval value to check the status of timeouts.
1380        """
1381        if isinstance(check_timeout_interval, (int, float)):
1382            return check_timeout_interval
1383        return get_config('jobs', 'check_timeout_interval_seconds')
1384
1385    @property
1386    def target_args(self) -> Union[Tuple[Any], None]:
1387        """
1388        Return the positional arguments to pass to the target function.
1389        """
1390        target_args = (
1391            self.__dict__.get('_target_args', None)
1392            or self.properties.get('target', {}).get('args', None)
1393        )
1394        if target_args is None:
1395            return tuple([])
1396
1397        return tuple(target_args)
1398
1399    @property
1400    def target_kw(self) -> Union[Dict[str, Any], None]:
1401        """
1402        Return the keyword arguments to pass to the target function.
1403        """
1404        target_kw = (
1405            self.__dict__.get('_target_kw', None)
1406            or self.properties.get('target', {}).get('kw', None)
1407        )
1408        if target_kw is None:
1409            return {}
1410
1411        return {key: val for key, val in target_kw.items()}
1412
1413    def __getstate__(self):
1414        """
1415        Pickle this Daemon.
1416        """
1417        dill = attempt_import('dill')
1418        return {
1419            'target': dill.dumps(self.target),
1420            'target_args': self.target_args,
1421            'target_kw': self.target_kw,
1422            'daemon_id': self.daemon_id,
1423            'label': self.label,
1424            'properties': self.properties,
1425        }
1426
1427    def __setstate__(self, _state: Dict[str, Any]):
1428        """
1429        Restore this Daemon from a pickled state.
1430        If the properties file exists, skip the old pickled version.
1431        """
1432        dill = attempt_import('dill')
1433        _state['target'] = dill.loads(_state['target'])
1434        self._pickle = True
1435        daemon_id = _state.get('daemon_id', None)
1436        if not daemon_id:
1437            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1438
1439        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1440        ignore_properties = properties_path.exists()
1441        if ignore_properties:
1442            _state = {
1443                key: val
1444                for key, val in _state.items()
1445                if key != 'properties'
1446            }
1447        self.__init__(**_state)
1448
1449
1450    def __repr__(self):
1451        return str(self)
1452
1453    def __str__(self):
1454        return self.daemon_id
1455
1456    def __eq__(self, other):
1457        if not isinstance(other, Daemon):
1458            return False
1459        return self.daemon_id == other.daemon_id
1460
1461    def __hash__(self):
1462        return hash(self.daemon_id)

Daemonize Python functions into background processes.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 |

>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |

>>> mrsm.pprint(daemon.properties)
{
    'label': 'print',
    'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
    'result': None,
    'process': {'ended': '2024-07-29T18:03:33.752806'}
}
Daemon( target: Optional[Callable[[Any], Any]] = None, target_args: Union[List[Any], Tuple[Any], NoneType] = None, target_kw: Optional[Dict[str, Any]] = None, env: Optional[Dict[str, str]] = None, daemon_id: Optional[str] = None, label: Optional[str] = None, properties: Optional[Dict[str, Any]] = None, pickle: bool = True)
136    def __init__(
137        self,
138        target: Optional[Callable[[Any], Any]] = None,
139        target_args: Union[List[Any], Tuple[Any], None] = None,
140        target_kw: Optional[Dict[str, Any]] = None,
141        env: Optional[Dict[str, str]] = None,
142        daemon_id: Optional[str] = None,
143        label: Optional[str] = None,
144        properties: Optional[Dict[str, Any]] = None,
145        pickle: bool = True,
146    ):
147        """
148        Parameters
149        ----------
150        target: Optional[Callable[[Any], Any]], default None,
151            The function to execute in a child process.
152
153        target_args: Union[List[Any], Tuple[Any], None], default None
154            Positional arguments to pass to the target function.
155
156        target_kw: Optional[Dict[str, Any]], default None
157            Keyword arguments to pass to the target function.
158
159        env: Optional[Dict[str, str]], default None
160            If provided, set these environment variables in the daemon process.
161
162        daemon_id: Optional[str], default None
163            Build a `Daemon` from an existing `daemon_id`.
164            If `daemon_id` is provided, other arguments are ignored and are derived
165            from the existing pickled `Daemon`.
166
167        label: Optional[str], default None
168            Label string to help identifiy a daemon.
169            If `None`, use the function name instead.
170
171        properties: Optional[Dict[str, Any]], default None
172            Override reading from the properties JSON by providing an existing dictionary.
173        """
174        _pickle = self.__dict__.get('_pickle', False)
175        if daemon_id is not None:
176            self.daemon_id = daemon_id
177            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
178
179                if not self.properties_path.exists():
180                    raise Exception(
181                        f"Daemon '{self.daemon_id}' does not exist. "
182                        + "Pass a target to create a new Daemon."
183                    )
184
185                try:
186                    new_daemon = self.from_properties_file(daemon_id)
187                except Exception:
188                    new_daemon = None
189
190                if new_daemon is not None:
191                    new_daemon.write_pickle()
192                    target = new_daemon.target
193                    target_args = new_daemon.target_args
194                    target_kw = new_daemon.target_kw
195                    label = new_daemon.label
196                    self._properties = new_daemon.properties
197                else:
198                    try:
199                        self.properties_path.unlink()
200                    except Exception:
201                        pass
202
203                    raise Exception(
204                        f"Could not recover daemon '{self.daemon_id}' "
205                        + "from its properties file."
206                    )
207
208        if 'target' not in self.__dict__:
209            if target is None:
210                error("Cannot create a Daemon without a target.")
211            self.target = target
212
213        self.pickle = pickle
214
215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
216        if '_target_args' not in self.__dict__:
217            self._target_args = target_args
218        if '_target_kw' not in self.__dict__:
219            self._target_kw = target_kw
220
221        if 'label' not in self.__dict__:
222            if label is None:
223                label = (
224                    self.target.__name__ if '__name__' in self.target.__dir__()
225                        else str(self.target)
226                )
227            self.label = label
228        elif label is not None:
229            self.label = label
230
231        if 'daemon_id' not in self.__dict__:
232            self.daemon_id = get_new_daemon_name()
233        if '_properties' not in self.__dict__:
234            self._properties = properties
235        elif properties:
236            if self._properties is None:
237                self._properties = {}
238            self._properties.update(properties)
239        if self._properties is None:
240            self._properties = {}
241
242        self._properties.update({'label': self.label})
243        if env:
244            self._properties.update({'env': env})
245
246        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
247        _ = self.process
Parameters
  • target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
  • target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
  • target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
  • daemon_id (Optional[str], default None): Build a Daemon from an existing daemon_id. If daemon_id is provided, other arguments are ignored and are derived from the existing pickled Daemon.
  • label (Optional[str], default None): Label string to help identifiy a daemon. If None, use the function name instead.
  • properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
@classmethod
def from_properties_file(cls, daemon_id: str) -> Daemon:
 89    @classmethod
 90    def from_properties_file(cls, daemon_id: str) -> Daemon:
 91        """
 92        Return a Daemon from a properties dictionary.
 93        """
 94        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
 95        if not properties_path.exists():
 96            raise OSError(f"Properties file '{properties_path}' does not exist.")
 97
 98        try:
 99            with open(properties_path, 'r', encoding='utf-8') as f:
100                properties = json.load(f)
101        except Exception:
102            properties = {}
103
104        if not properties:
105            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
106
107        daemon_id = properties_path.parent.name
108        target_cf = properties.get('target', {})
109        target_module_name = target_cf.get('module', None)
110        target_function_name = target_cf.get('name', None)
111        target_args = target_cf.get('args', None)
112        target_kw = target_cf.get('kw', None)
113        label = properties.get('label', None)
114
115        if None in [
116            target_module_name,
117            target_function_name,
118            target_args,
119            target_kw,
120        ]:
121            raise ValueError("Missing target function information.")
122
123        target_module = importlib.import_module(target_module_name)
124        target_function = getattr(target_module, target_function_name)
125
126        return Daemon(
127            daemon_id=daemon_id,
128            target=target_function,
129            target_args=target_args,
130            target_kw=target_kw,
131            properties=properties,
132            label=label,
133        )

Return a Daemon from a properties dictionary.

pickle
def run( self, keep_daemon_output: bool = True, allow_dirty_run: bool = False, wait: bool = False, timeout: Union[int, float] = 4, debug: bool = False) -> Tuple[bool, str]:
425    def run(
426        self,
427        keep_daemon_output: bool = True,
428        allow_dirty_run: bool = False,
429        wait: bool = False,
430        timeout: Union[int, float] = 4,
431        debug: bool = False,
432    ) -> SuccessTuple:
433        """Run the daemon as a child process and continue executing the parent.
434
435        Parameters
436        ----------
437        keep_daemon_output: bool, default True
438            If `False`, delete the daemon's output directory upon exiting.
439
440        allow_dirty_run: bool, default False
441            If `True`, run the daemon, even if the `daemon_id` directory exists.
442            This option is dangerous because if the same `daemon_id` runs concurrently,
443            the last to finish will overwrite the output of the first.
444
445        wait: bool, default True
446            If `True`, block until `Daemon.status` is running (or the timeout expires).
447
448        timeout: Union[int, float], default 4
449            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
450
451        Returns
452        -------
453        A SuccessTuple indicating success.
454
455        """
456        import platform
457        if platform.system() == 'Windows':
458            return False, "Cannot run background jobs on Windows."
459
460        ### The daemon might exist and be paused.
461        if self.status == 'paused':
462            return self.resume()
463
464        self._remove_stop_file()
465        if self.status == 'running':
466            return True, f"Daemon '{self}' is already running."
467
468        self.mkdir_if_not_exists(allow_dirty_run)
469        _write_pickle_success_tuple = self.write_pickle()
470        if not _write_pickle_success_tuple[0]:
471            return _write_pickle_success_tuple
472
473        _launch_daemon_code = (
474            "from meerschaum.utils.daemon import Daemon, _daemons; "
475            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
476            f"_daemons['{self.daemon_id}'] = daemon; "
477            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
478            "allow_dirty_run=True)"
479        )
480        env = dict(os.environ)
481        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
482        msg = (
483            "Success"
484            if _launch_success_bool
485            else f"Failed to start daemon '{self.daemon_id}'."
486        )
487        if not wait or not _launch_success_bool:
488            return _launch_success_bool, msg
489
490        timeout = self.get_timeout_seconds(timeout)
491        check_timeout_interval = self.get_check_timeout_interval_seconds()
492
493        if not timeout:
494            success = self.status == 'running'
495            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
496            if success:
497                self._capture_process_timestamp('began')
498            return success, msg
499
500        begin = time.perf_counter()
501        while (time.perf_counter() - begin) < timeout:
502            if self.status == 'running':
503                self._capture_process_timestamp('began')
504                return True, "Success"
505            time.sleep(check_timeout_interval)
506
507        return False, (
508            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
509            + ('s' if timeout != 1 else '') + '.'
510        )

Run the daemon as a child process and continue executing the parent.

Parameters
  • keep_daemon_output (bool, default True): If False, delete the daemon's output directory upon exiting.
  • allow_dirty_run (bool, default False): If True, run the daemon, even if the daemon_id directory exists. This option is dangerous because if the same daemon_id runs concurrently, the last to finish will overwrite the output of the first.
  • wait (bool, default True): If True, block until Daemon.status is running (or the timeout expires).
  • timeout (Union[int, float], default 4): If wait is True, block for up to timeout seconds before returning a failure.
Returns
  • A SuccessTuple indicating success.
def kill(self, timeout: Union[int, float, NoneType] = 8) -> Tuple[bool, str]:
513    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
514        """
515        Forcibly terminate a running daemon.
516        Sends a SIGTERM signal to the process.
517
518        Parameters
519        ----------
520        timeout: Optional[int], default 3
521            How many seconds to wait for the process to terminate.
522
523        Returns
524        -------
525        A SuccessTuple indicating success.
526        """
527        if self.status != 'paused':
528            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
529            if success:
530                self._write_stop_file('kill')
531                self.stdin_file.close()
532                self._remove_blocking_stdin_file()
533                return success, msg
534
535        if self.status == 'stopped':
536            self._write_stop_file('kill')
537            self.stdin_file.close()
538            self._remove_blocking_stdin_file()
539            return True, "Process has already stopped."
540
541        psutil = attempt_import('psutil')
542        process = self.process
543        try:
544            process.terminate()
545            process.kill()
546            process.wait(timeout=timeout)
547        except Exception as e:
548            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
549
550        try:
551            if process.status():
552                return False, "Failed to stop daemon '{self}' ({process})."
553        except psutil.NoSuchProcess:
554            pass
555
556        if self.pid_path.exists():
557            try:
558                self.pid_path.unlink()
559            except Exception:
560                pass
561
562        self._write_stop_file('kill')
563        self.stdin_file.close()
564        self._remove_blocking_stdin_file()
565        return True, "Success"

Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.

Parameters
  • timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
  • A SuccessTuple indicating success.
def quit(self, timeout: Union[int, float, NoneType] = None) -> Tuple[bool, str]:
567    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
568        """Gracefully quit a running daemon."""
569        if self.status == 'paused':
570            return self.kill(timeout)
571
572        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
573        if signal_success:
574            self._write_stop_file('quit')
575            self.stdin_file.close()
576            self._remove_blocking_stdin_file()
577        return signal_success, signal_msg

Gracefully quit a running daemon.

def pause( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
579    def pause(
580        self,
581        timeout: Union[int, float, None] = None,
582        check_timeout_interval: Union[float, int, None] = None,
583    ) -> SuccessTuple:
584        """
585        Pause the daemon if it is running.
586
587        Parameters
588        ----------
589        timeout: Union[float, int, None], default None
590            The maximum number of seconds to wait for a process to suspend.
591
592        check_timeout_interval: Union[float, int, None], default None
593            The number of seconds to wait between checking if the process is still running.
594
595        Returns
596        -------
597        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
598        """
599        self._remove_blocking_stdin_file()
600
601        if self.process is None:
602            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
603
604        if self.status == 'paused':
605            return True, f"Daemon '{self.daemon_id}' is already paused."
606
607        self._write_stop_file('pause')
608        self.stdin_file.close()
609        self._remove_blocking_stdin_file()
610        try:
611            self.process.suspend()
612        except Exception as e:
613            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
614
615        timeout = self.get_timeout_seconds(timeout)
616        check_timeout_interval = self.get_check_timeout_interval_seconds(
617            check_timeout_interval
618        )
619
620        psutil = attempt_import('psutil')
621
622        if not timeout:
623            try:
624                success = self.process.status() == 'stopped'
625            except psutil.NoSuchProcess:
626                success = True
627            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
628            if success:
629                self._capture_process_timestamp('paused')
630            return success, msg
631
632        begin = time.perf_counter()
633        while (time.perf_counter() - begin) < timeout:
634            try:
635                if self.process.status() == 'stopped':
636                    self._capture_process_timestamp('paused')
637                    return True, "Success"
638            except psutil.NoSuchProcess as e:
639                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
640            time.sleep(check_timeout_interval)
641
642        return False, (
643            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
644            + ('s' if timeout != 1 else '') + '.'
645        )

Pause the daemon if it is running.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully suspended.
def resume( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
647    def resume(
648        self,
649        timeout: Union[int, float, None] = None,
650        check_timeout_interval: Union[float, int, None] = None,
651    ) -> SuccessTuple:
652        """
653        Resume the daemon if it is paused.
654
655        Parameters
656        ----------
657        timeout: Union[float, int, None], default None
658            The maximum number of seconds to wait for a process to resume.
659
660        check_timeout_interval: Union[float, int, None], default None
661            The number of seconds to wait between checking if the process is still stopped.
662
663        Returns
664        -------
665        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
666        """
667        if self.status == 'running':
668            return True, f"Daemon '{self.daemon_id}' is already running."
669
670        if self.status == 'stopped':
671            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
672
673        self._remove_stop_file()
674        try:
675            if self.process is None:
676                return False, f"Cannot resume daemon '{self.daemon_id}'."
677
678            self.process.resume()
679        except Exception as e:
680            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
681
682        timeout = self.get_timeout_seconds(timeout)
683        check_timeout_interval = self.get_check_timeout_interval_seconds(
684            check_timeout_interval
685        )
686
687        if not timeout:
688            success = self.status == 'running'
689            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
690            if success:
691                self._capture_process_timestamp('began')
692            return success, msg
693
694        begin = time.perf_counter()
695        while (time.perf_counter() - begin) < timeout:
696            if self.status == 'running':
697                self._capture_process_timestamp('began')
698                return True, "Success"
699            time.sleep(check_timeout_interval)
700
701        return False, (
702            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
703            + ('s' if timeout != 1 else '') + '.'
704        )

Resume the daemon if it is paused.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully resumed.
def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
837    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
838        """Create the Daemon's directory.
839        If `allow_dirty_run` is `False` and the directory already exists,
840        raise a `FileExistsError`.
841        """
842        try:
843            self.path.mkdir(parents=True, exist_ok=True)
844            _already_exists = any(os.scandir(self.path))
845        except FileExistsError:
846            _already_exists = True
847
848        if _already_exists and not allow_dirty_run:
849            error(
850                f"Daemon '{self.daemon_id}' already exists. " +
851                "To allow this daemon to run, do one of the following:\n"
852                + "  - Execute `daemon.cleanup()`.\n"
853                + f"  - Delete the directory '{self.path}'.\n"
854                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
855                FileExistsError,
856            )

Create the Daemon's directory. If allow_dirty_run is False and the directory already exists, raise a FileExistsError.

process: "Union['psutil.Process', None]"
858    @property
859    def process(self) -> Union['psutil.Process', None]:
860        """
861        Return the psutil process for the Daemon.
862        """
863        psutil = attempt_import('psutil')
864        pid = self.pid
865        if pid is None:
866            return None
867        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
868            try:
869                self._process = psutil.Process(int(pid))
870                process_exists = True
871            except Exception:
872                process_exists = False
873            if not process_exists:
874                _ = self.__dict__.pop('_process', None)
875                try:
876                    if self.pid_path.exists():
877                        self.pid_path.unlink()
878                except Exception:
879                    pass
880                return None
881        return self._process

Return the psutil process for the Daemon.

status: str
883    @property
884    def status(self) -> str:
885        """
886        Return the running status of this Daemon.
887        """
888        if self.process is None:
889            return 'stopped'
890
891        psutil = attempt_import('psutil', lazy=False)
892        try:
893            if self.process.status() == 'stopped':
894                return 'paused'
895            if self.process.status() == 'zombie':
896                raise psutil.NoSuchProcess(self.process.pid)
897        except (psutil.NoSuchProcess, AttributeError):
898            if self.pid_path.exists():
899                try:
900                    self.pid_path.unlink()
901                except Exception:
902                    pass
903            return 'stopped'
904
905        return 'running'

Return the running status of this Daemon.

path: pathlib.Path
915    @property
916    def path(self) -> pathlib.Path:
917        """
918        Return the path for this Daemon's directory.
919        """
920        return self._get_path_from_daemon_id(self.daemon_id)

Return the path for this Daemon's directory.

properties_path: pathlib.Path
929    @property
930    def properties_path(self) -> pathlib.Path:
931        """
932        Return the `propterties.json` path for this Daemon.
933        """
934        return self._get_properties_path_from_daemon_id(self.daemon_id)

Return the propterties.json path for this Daemon.

stop_path: pathlib.Path
936    @property
937    def stop_path(self) -> pathlib.Path:
938        """
939        Return the path for the stop file (created when manually stopped).
940        """
941        return self.path / '.stop.json'

Return the path for the stop file (created when manually stopped).

log_path: pathlib.Path
943    @property
944    def log_path(self) -> pathlib.Path:
945        """
946        Return the log path.
947        """
948        logs_cf = self.properties.get('logs', None) or {}
949        if 'path' not in logs_cf:
950            from meerschaum.config.paths import LOGS_RESOURCES_PATH
951            return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
952
953        return pathlib.Path(logs_cf['path'])

Return the log path.

stdin_file_path: pathlib.Path
955    @property
956    def stdin_file_path(self) -> pathlib.Path:
957        """
958        Return the stdin file path.
959        """
960        return self.path / 'input.stdin'

Return the stdin file path.

blocking_stdin_file_path: pathlib.Path
962    @property
963    def blocking_stdin_file_path(self) -> pathlib.Path:
964        """
965        Return the stdin file path.
966        """
967        if '_blocking_stdin_file_path' in self.__dict__:
968            return self._blocking_stdin_file_path
969
970        return self.path / 'input.stdin.block'

Return the stdin file path.

prompt_kwargs_file_path: pathlib.Path
972    @property
973    def prompt_kwargs_file_path(self) -> pathlib.Path:
974        """
975        Return the file path to the kwargs for the invoking `prompt()`.
976        """
977        return self.path / 'prompt_kwargs.json'

Return the file path to the kwargs for the invoking prompt().

log_offset_path: pathlib.Path
979    @property
980    def log_offset_path(self) -> pathlib.Path:
981        """
982        Return the log offset file path.
983        """
984        from meerschaum.config.paths import LOGS_RESOURCES_PATH
985        return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')

Return the log offset file path.

log_offset_lock: "'fasteners.InterProcessLock'"
987    @property
988    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
989        """
990        Return the process lock context manager.
991        """
992        if '_log_offset_lock' in self.__dict__:
993            return self._log_offset_lock
994
995        fasteners = attempt_import('fasteners')
996        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
997        return self._log_offset_lock

Return the process lock context manager.

rotating_log: meerschaum.utils.daemon.RotatingFile
 999    @property
1000    def rotating_log(self) -> RotatingFile:
1001        """
1002        The rotating log file for the daemon's output.
1003        """
1004        if '_rotating_log' in self.__dict__:
1005            return self._rotating_log
1006
1007        logs_cf = self.properties.get('logs', None) or {}
1008        write_timestamps = logs_cf.get('write_timestamps', None)
1009        if write_timestamps is None:
1010            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1011
1012        timestamp_format = logs_cf.get('timestamp_format', None)
1013        if timestamp_format is None:
1014            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1015
1016        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1017        if num_files_to_keep is None:
1018            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1019
1020        max_file_size = logs_cf.get('max_file_size', None)
1021        if max_file_size is None:
1022            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1023
1024        redirect_streams = logs_cf.get('redirect_streams', True)
1025
1026        self._rotating_log = RotatingFile(
1027            self.log_path,
1028            redirect_streams=redirect_streams,
1029            write_timestamps=write_timestamps,
1030            timestamp_format=timestamp_format,
1031            num_files_to_keep=num_files_to_keep,
1032            max_file_size=max_file_size,
1033        )
1034        return self._rotating_log

The rotating log file for the daemon's output.

stdin_file
1036    @property
1037    def stdin_file(self):
1038        """
1039        Return the file handler for the stdin file.
1040        """
1041        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1042            return _stdin_file
1043
1044        self._stdin_file = StdinFile(
1045            self.stdin_file_path,
1046            lock_file_path=self.blocking_stdin_file_path,
1047        )
1048        return self._stdin_file

Return the file handler for the stdin file.

log_text: Optional[str]
1050    @property
1051    def log_text(self) -> Union[str, None]:
1052        """
1053        Read the log files and return their contents.
1054        Returns `None` if the log file does not exist.
1055        """
1056        logs_cf = self.properties.get('logs', None) or {}
1057        write_timestamps = logs_cf.get('write_timestamps', None)
1058        if write_timestamps is None:
1059            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1060
1061        timestamp_format = logs_cf.get('timestamp_format', None)
1062        if timestamp_format is None:
1063            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1064
1065        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1066        if num_files_to_keep is None:
1067            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1068
1069        max_file_size = logs_cf.get('max_file_size', None)
1070        if max_file_size is None:
1071            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1072
1073        new_rotating_log = RotatingFile(
1074            self.rotating_log.file_path,
1075            num_files_to_keep=num_files_to_keep,
1076            max_file_size=max_file_size,
1077            write_timestamps=write_timestamps,
1078            timestamp_format=timestamp_format,
1079        )
1080        return new_rotating_log.read()

Read the log files and return their contents. Returns None if the log file does not exist.

def readlines(self) -> List[str]:
1082    def readlines(self) -> List[str]:
1083        """
1084        Read the next log lines, persisting the cursor for later use.
1085        Note this will alter the cursor of `self.rotating_log`.
1086        """
1087        self.rotating_log._cursor = self._read_log_offset()
1088        lines = self.rotating_log.readlines()
1089        self._write_log_offset()
1090        return lines

Read the next log lines, persisting the cursor for later use. Note this will alter the cursor of self.rotating_log.

pid: Optional[int]
1123    @property
1124    def pid(self) -> Union[int, None]:
1125        """
1126        Read the PID file and return its contents.
1127        Returns `None` if the PID file does not exist.
1128        """
1129        if not self.pid_path.exists():
1130            return None
1131        try:
1132            with open(self.pid_path, 'r', encoding='utf-8') as f:
1133                text = f.read()
1134            if len(text) == 0:
1135                return None
1136            pid = int(text.rstrip())
1137        except Exception as e:
1138            warn(e)
1139            text = None
1140            pid = None
1141        return pid

Read the PID file and return its contents. Returns None if the PID file does not exist.

pid_path: pathlib.Path
1143    @property
1144    def pid_path(self) -> pathlib.Path:
1145        """
1146        Return the path to a file containing the PID for this Daemon.
1147        """
1148        return self.path / 'process.pid'

Return the path to a file containing the PID for this Daemon.

pid_lock: "'fasteners.InterProcessLock'"
1150    @property
1151    def pid_lock(self) -> 'fasteners.InterProcessLock':
1152        """
1153        Return the process lock context manager.
1154        """
1155        if '_pid_lock' in self.__dict__:
1156            return self._pid_lock
1157
1158        fasteners = attempt_import('fasteners')
1159        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1160        return self._pid_lock

Return the process lock context manager.

pickle_path: pathlib.Path
1162    @property
1163    def pickle_path(self) -> pathlib.Path:
1164        """
1165        Return the path for the pickle file.
1166        """
1167        return self.path / 'pickle.pkl'

Return the path for the pickle file.

def read_properties(self) -> Optional[Dict[str, Any]]:
1169    def read_properties(self) -> Optional[Dict[str, Any]]:
1170        """Read the properties JSON file and return the dictionary."""
1171        if not self.properties_path.exists():
1172            return None
1173        try:
1174            with open(self.properties_path, 'r', encoding='utf-8') as file:
1175                properties = json.load(file)
1176        except Exception:
1177            properties = {}
1178        
1179        return properties or {}

Read the properties JSON file and return the dictionary.

def read_pickle(self) -> Daemon:
1181    def read_pickle(self) -> Daemon:
1182        """Read a Daemon's pickle file and return the `Daemon`."""
1183        import pickle
1184        import traceback
1185        if not self.pickle_path.exists():
1186            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1187
1188        if self.pickle_path.stat().st_size == 0:
1189            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1190
1191        try:
1192            with open(self.pickle_path, 'rb') as pickle_file:
1193                daemon = pickle.load(pickle_file)
1194            success, msg = True, 'Success'
1195        except Exception as e:
1196            success, msg = False, str(e)
1197            daemon = None
1198            traceback.print_exception(type(e), e, e.__traceback__)
1199        if not success:
1200            error(msg)
1201        return daemon

Read a Daemon's pickle file and return the Daemon.

properties: Dict[str, Any]
1203    @property
1204    def properties(self) -> Dict[str, Any]:
1205        """
1206        Return the contents of the properties JSON file.
1207        """
1208        try:
1209            _file_properties = self.read_properties() or {}
1210        except Exception:
1211            traceback.print_exc()
1212            _file_properties = {}
1213
1214        if not self._properties:
1215            self._properties = _file_properties
1216
1217        if self._properties is None:
1218            self._properties = {}
1219
1220        if (
1221            self._properties.get('result', None) is None
1222            and _file_properties.get('result', None) is not None
1223        ):
1224            _ = self._properties.pop('result', None)
1225
1226        if _file_properties is not None:
1227            self._properties = apply_patch_to_config(
1228                _file_properties,
1229                self._properties,
1230            )
1231
1232        return self._properties

Return the contents of the properties JSON file.

hidden: bool
1234    @property
1235    def hidden(self) -> bool:
1236        """
1237        Return a bool indicating whether this Daemon should be displayed.
1238        """
1239        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')

Return a bool indicating whether this Daemon should be displayed.

def write_properties(self) -> Tuple[bool, str]:
1241    def write_properties(self) -> SuccessTuple:
1242        """Write the properties dictionary to the properties JSON file
1243        (only if self.properties exists).
1244        """
1245        from meerschaum.utils.misc import generate_password
1246        success, msg = (
1247            False,
1248            f"No properties to write for daemon '{self.daemon_id}'."
1249        )
1250        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1251        props = self.properties
1252        if props is not None:
1253            try:
1254                self.path.mkdir(parents=True, exist_ok=True)
1255                if self.properties_path.exists():
1256                    self.properties_path.rename(backup_path)
1257                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1258                    json.dump(props, properties_file)
1259                success, msg = True, 'Success'
1260            except Exception as e:
1261                success, msg = False, str(e)
1262
1263        try:
1264            if backup_path.exists():
1265                if not success:
1266                    backup_path.rename(self.properties_path)
1267                else:
1268                    backup_path.unlink()
1269        except Exception as e:
1270            success, msg = False, str(e)
1271
1272        return success, msg

Write the properties dictionary to the properties JSON file (only if self.properties exists).

def write_pickle(self) -> Tuple[bool, str]:
1274    def write_pickle(self) -> SuccessTuple:
1275        """Write the pickle file for the daemon."""
1276        import pickle
1277        import traceback
1278        from meerschaum.utils.misc import generate_password
1279
1280        if not self.pickle:
1281            return True, "Success"
1282
1283        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1284        try:
1285            self.path.mkdir(parents=True, exist_ok=True)
1286            if self.pickle_path.exists():
1287                self.pickle_path.rename(backup_path)
1288            with open(self.pickle_path, 'wb+') as pickle_file:
1289                pickle.dump(self, pickle_file)
1290            success, msg = True, "Success"
1291        except Exception as e:
1292            success, msg = False, str(e)
1293            traceback.print_exception(type(e), e, e.__traceback__)
1294        try:
1295            if backup_path.exists():
1296                if not success:
1297                    backup_path.rename(self.pickle_path)
1298                else:
1299                    backup_path.unlink()
1300        except Exception as e:
1301            success, msg = False, str(e)
1302        return success, msg

Write the pickle file for the daemon.

def cleanup(self, keep_logs: bool = False) -> Tuple[bool, str]:
1332    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1333        """
1334        Remove a daemon's directory after execution.
1335
1336        Parameters
1337        ----------
1338        keep_logs: bool, default False
1339            If `True`, skip deleting the daemon's log files.
1340
1341        Returns
1342        -------
1343        A `SuccessTuple` indicating success.
1344        """
1345        if self.path.exists():
1346            try:
1347                shutil.rmtree(self.path)
1348            except Exception as e:
1349                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1350                warn(msg)
1351                return False, msg
1352        if not keep_logs:
1353            self.rotating_log.delete()
1354            try:
1355                if self.log_offset_path.exists():
1356                    self.log_offset_path.unlink()
1357            except Exception as e:
1358                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1359                warn(msg)
1360                return False, msg
1361        return True, "Success"

Remove a daemon's directory after execution.

Parameters
  • keep_logs (bool, default False): If True, skip deleting the daemon's log files.
Returns
  • A SuccessTuple indicating success.
def get_timeout_seconds(self, timeout: Union[int, float, NoneType] = None) -> Union[int, float]:
1364    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1365        """
1366        Return the timeout value to use. Use `--timeout-seconds` if provided,
1367        else the configured default (8).
1368        """
1369        if isinstance(timeout, (int, float)):
1370            return timeout
1371        return get_config('jobs', 'timeout_seconds')

Return the timeout value to use. Use --timeout-seconds if provided, else the configured default (8).

def get_check_timeout_interval_seconds( self, check_timeout_interval: Union[int, float, NoneType] = None) -> Union[int, float]:
1374    def get_check_timeout_interval_seconds(
1375        self,
1376        check_timeout_interval: Union[int, float, None] = None,
1377    ) -> Union[int, float]:
1378        """
1379        Return the interval value to check the status of timeouts.
1380        """
1381        if isinstance(check_timeout_interval, (int, float)):
1382            return check_timeout_interval
1383        return get_config('jobs', 'check_timeout_interval_seconds')

Return the interval value to check the status of timeouts.

target_args: Optional[Tuple[Any]]
1385    @property
1386    def target_args(self) -> Union[Tuple[Any], None]:
1387        """
1388        Return the positional arguments to pass to the target function.
1389        """
1390        target_args = (
1391            self.__dict__.get('_target_args', None)
1392            or self.properties.get('target', {}).get('args', None)
1393        )
1394        if target_args is None:
1395            return tuple([])
1396
1397        return tuple(target_args)

Return the positional arguments to pass to the target function.

target_kw: Optional[Dict[str, Any]]
1399    @property
1400    def target_kw(self) -> Union[Dict[str, Any], None]:
1401        """
1402        Return the keyword arguments to pass to the target function.
1403        """
1404        target_kw = (
1405            self.__dict__.get('_target_kw', None)
1406            or self.properties.get('target', {}).get('kw', None)
1407        )
1408        if target_kw is None:
1409            return {}
1410
1411        return {key: val for key, val in target_kw.items()}

Return the keyword arguments to pass to the target function.