meerschaum.utils.daemon.Daemon

Manage running daemons via the Daemon class.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Manage running daemons via the Daemon class.
   7"""
   8
   9from __future__ import annotations
  10import os
  11import importlib
  12import pathlib
  13import json
  14import shutil
  15import signal
  16import time
  17import traceback
  18from functools import partial
  19from datetime import datetime, timezone
  20
  21import meerschaum as mrsm
  22from meerschaum.utils.typing import (
  23    Optional, Dict, Any, SuccessTuple, Callable, List, Union,
  24    is_success_tuple, Tuple,
  25)
  26from meerschaum.config import get_config
  27from meerschaum._internal.static import STATIC_CONFIG
  28from meerschaum.config._patch import apply_patch_to_config
  29from meerschaum.utils.warnings import warn, error
  30from meerschaum.utils.packages import attempt_import
  31from meerschaum.utils.venv import venv_exec
  32from meerschaum.utils.daemon._names import get_new_daemon_name
  33from meerschaum.utils.daemon.RotatingFile import RotatingFile
  34from meerschaum.utils.daemon.StdinFile import StdinFile
  35from meerschaum.utils.threading import RepeatTimer
  36from meerschaum.__main__ import _close_pools
  37
  38_daemons = []
  39_results = {}
  40
  41class Daemon:
  42    """
  43    Daemonize Python functions into background processes.
  44
  45    Examples
  46    --------
  47    >>> import meerschaum as mrsm
  48    >>> from meerschaum.utils.daemons import Daemon
  49    >>> daemon = Daemon(print, ('hi',))
  50    >>> success, msg = daemon.run()
  51    >>> print(daemon.log_text)
  52
  53    2024-07-29 18:03 | hi
  54    2024-07-29 18:03 |
  55    >>> daemon.run(allow_dirty_run=True)
  56    >>> print(daemon.log_text)
  57
  58    2024-07-29 18:03 | hi
  59    2024-07-29 18:03 |
  60    2024-07-29 18:05 | hi
  61    2024-07-29 18:05 |
  62    >>> mrsm.pprint(daemon.properties)
  63    {
  64        'label': 'print',
  65        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  66        'result': None,
  67        'process': {'ended': '2024-07-29T18:03:33.752806'}
  68    }
  69
  70    """
  71
  72    def __new__(
  73        cls,
  74        *args,
  75        daemon_id: Optional[str] = None,
  76        **kw
  77    ):
  78        """
  79        If a daemon_id is provided and already exists, read from its pickle file.
  80        """
  81        instance = super(Daemon, cls).__new__(cls)
  82        if daemon_id is not None:
  83            instance.daemon_id = daemon_id
  84            if instance.pickle_path.exists():
  85                instance = instance.read_pickle()
  86        return instance
  87
  88    @classmethod
  89    def from_properties_file(cls, daemon_id: str) -> Daemon:
  90        """
  91        Return a Daemon from a properties dictionary.
  92        """
  93        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  94        if not properties_path.exists():
  95            raise OSError(f"Properties file '{properties_path}' does not exist.")
  96
  97        try:
  98            with open(properties_path, 'r', encoding='utf-8') as f:
  99                properties = json.load(f)
 100        except Exception:
 101            properties = {}
 102
 103        if not properties:
 104            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 105
 106        daemon_id = properties_path.parent.name
 107        target_cf = properties.get('target', {})
 108        target_module_name = target_cf.get('module', None)
 109        target_function_name = target_cf.get('name', None)
 110        target_args = target_cf.get('args', None)
 111        target_kw = target_cf.get('kw', None)
 112        label = properties.get('label', None)
 113
 114        if None in [
 115            target_module_name,
 116            target_function_name,
 117            target_args,
 118            target_kw,
 119        ]:
 120            raise ValueError("Missing target function information.")
 121
 122        target_module = importlib.import_module(target_module_name)
 123        target_function = getattr(target_module, target_function_name)
 124
 125        return Daemon(
 126            daemon_id=daemon_id,
 127            target=target_function,
 128            target_args=target_args,
 129            target_kw=target_kw,
 130            properties=properties,
 131            label=label,
 132        )
 133
 134
 135    def __init__(
 136        self,
 137        target: Optional[Callable[[Any], Any]] = None,
 138        target_args: Union[List[Any], Tuple[Any], None] = None,
 139        target_kw: Optional[Dict[str, Any]] = None,
 140        env: Optional[Dict[str, str]] = None,
 141        daemon_id: Optional[str] = None,
 142        label: Optional[str] = None,
 143        properties: Optional[Dict[str, Any]] = None,
 144        pickle: bool = True,
 145    ):
 146        """
 147        Parameters
 148        ----------
 149        target: Optional[Callable[[Any], Any]], default None,
 150            The function to execute in a child process.
 151
 152        target_args: Union[List[Any], Tuple[Any], None], default None
 153            Positional arguments to pass to the target function.
 154
 155        target_kw: Optional[Dict[str, Any]], default None
 156            Keyword arguments to pass to the target function.
 157
 158        env: Optional[Dict[str, str]], default None
 159            If provided, set these environment variables in the daemon process.
 160
 161        daemon_id: Optional[str], default None
 162            Build a `Daemon` from an existing `daemon_id`.
 163            If `daemon_id` is provided, other arguments are ignored and are derived
 164            from the existing pickled `Daemon`.
 165
 166        label: Optional[str], default None
 167            Label string to help identifiy a daemon.
 168            If `None`, use the function name instead.
 169
 170        properties: Optional[Dict[str, Any]], default None
 171            Override reading from the properties JSON by providing an existing dictionary.
 172        """
 173        _pickle = self.__dict__.get('_pickle', False)
 174        if daemon_id is not None:
 175            self.daemon_id = daemon_id
 176            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 177
 178                if not self.properties_path.exists():
 179                    raise Exception(
 180                        f"Daemon '{self.daemon_id}' does not exist. "
 181                        + "Pass a target to create a new Daemon."
 182                    )
 183
 184                try:
 185                    new_daemon = self.from_properties_file(daemon_id)
 186                except Exception:
 187                    new_daemon = None
 188
 189                if new_daemon is not None:
 190                    new_daemon.write_pickle()
 191                    target = new_daemon.target
 192                    target_args = new_daemon.target_args
 193                    target_kw = new_daemon.target_kw
 194                    label = new_daemon.label
 195                    self._properties = new_daemon.properties
 196                else:
 197                    try:
 198                        self.properties_path.unlink()
 199                    except Exception:
 200                        pass
 201
 202                    raise Exception(
 203                        f"Could not recover daemon '{self.daemon_id}' "
 204                        + "from its properties file."
 205                    )
 206
 207        if 'target' not in self.__dict__:
 208            if target is None:
 209                error("Cannot create a Daemon without a target.")
 210            self.target = target
 211
 212        self.pickle = pickle
 213
 214        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 215        if '_target_args' not in self.__dict__:
 216            self._target_args = target_args
 217        if '_target_kw' not in self.__dict__:
 218            self._target_kw = target_kw
 219
 220        if 'label' not in self.__dict__:
 221            if label is None:
 222                label = (
 223                    self.target.__name__ if '__name__' in self.target.__dir__()
 224                        else str(self.target)
 225                )
 226            self.label = label
 227        elif label is not None:
 228            self.label = label
 229
 230        if 'daemon_id' not in self.__dict__:
 231            self.daemon_id = get_new_daemon_name()
 232        if '_properties' not in self.__dict__:
 233            self._properties = properties
 234        elif properties:
 235            if self._properties is None:
 236                self._properties = {}
 237            self._properties.update(properties)
 238        if self._properties is None:
 239            self._properties = {}
 240
 241        self._properties.update({'label': self.label})
 242        if env:
 243            self._properties.update({'env': env})
 244
 245        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 246        _ = self.process
 247
 248
 249    def _run_exit(
 250        self,
 251        keep_daemon_output: bool = True,
 252        allow_dirty_run: bool = False,
 253    ) -> Any:
 254        """Run the daemon's target function.
 255        NOTE: This WILL EXIT the parent process!
 256
 257        Parameters
 258        ----------
 259        keep_daemon_output: bool, default True
 260            If `False`, delete the daemon's output directory upon exiting.
 261
 262        allow_dirty_run, bool, default False:
 263            If `True`, run the daemon, even if the `daemon_id` directory exists.
 264            This option is dangerous because if the same `daemon_id` runs twice,
 265            the last to finish will overwrite the output of the first.
 266
 267        Returns
 268        -------
 269        Nothing — this will exit the parent process.
 270        """
 271        import platform
 272        import sys
 273        import os
 274        import traceback
 275        from meerschaum.utils.warnings import warn
 276        from meerschaum.config import get_config
 277        daemon = attempt_import('daemon')
 278        lines = get_config('jobs', 'terminal', 'lines')
 279        columns = get_config('jobs', 'terminal', 'columns')
 280
 281        if platform.system() == 'Windows':
 282            return False, "Windows is no longer supported."
 283
 284        self._setup(allow_dirty_run)
 285
 286        _daemons.append(self)
 287
 288        logs_cf = self.properties.get('logs', {})
 289        log_refresh_seconds = logs_cf.get('refresh_files_seconds', None)
 290        if log_refresh_seconds is None:
 291            log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 292        write_timestamps = logs_cf.get('write_timestamps', None)
 293        if write_timestamps is None:
 294            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 295
 296        self._log_refresh_timer = RepeatTimer(
 297            log_refresh_seconds,
 298            partial(self.rotating_log.refresh_files, start_interception=write_timestamps),
 299        )
 300
 301        capture_stdin = logs_cf.get('stdin', True)
 302        cwd = self.properties.get('cwd', os.getcwd())
 303
 304        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 305        ###       KeyboardInterrupts themselves.
 306        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 307        self._daemon_context = daemon.DaemonContext(
 308            pidfile=self.pid_lock,
 309            stdout=self.rotating_log,
 310            stderr=self.rotating_log,
 311            stdin=(self.stdin_file if capture_stdin else None),
 312            working_directory=cwd,
 313            detach_process=True,
 314            files_preserve=list(self.rotating_log.subfile_objects.values()),
 315            signal_map={
 316                signal.SIGTERM: self._handle_sigterm,
 317            },
 318        )
 319
 320        if capture_stdin and sys.stdin is None:
 321            raise OSError("Cannot daemonize without stdin.")
 322
 323        try:
 324            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 325            with self._daemon_context:
 326                if capture_stdin:
 327                    sys.stdin = self.stdin_file
 328                _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None)
 329                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 330                os.environ['PYTHONUNBUFFERED'] = '1'
 331
 332                ### Allow the user to override environment variables.
 333                env = self.properties.get('env', {})
 334                if env and isinstance(env, dict):
 335                    os.environ.update({str(k): str(v) for k, v in env.items()})
 336
 337                self.rotating_log.refresh_files(start_interception=True)
 338                result = None
 339                try:
 340                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 341                        f.write(str(os.getpid()))
 342
 343                    ### NOTE: The timer fails to start for remote actions to localhost.
 344                    try:
 345                        if not self._log_refresh_timer.is_running():
 346                            self._log_refresh_timer.start()
 347                    except Exception:
 348                        pass
 349
 350                    self.properties['result'] = None
 351                    self._capture_process_timestamp('began')
 352                    result = self.target(*self.target_args, **self.target_kw)
 353                    self.properties['result'] = result
 354                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 355                    result = False, traceback.format_exc()
 356                except Exception as e:
 357                    warn(
 358                        f"Exception in daemon target function: {traceback.format_exc()}",
 359                    )
 360                    result = False, str(e)
 361                finally:
 362                    _results[self.daemon_id] = result
 363                    self.properties['result'] = result
 364
 365                    if keep_daemon_output:
 366                        self._capture_process_timestamp('ended')
 367                    else:
 368                        self.cleanup()
 369
 370                    self._log_refresh_timer.cancel()
 371                    try:
 372                        if self.pid is None and self.pid_path.exists():
 373                            self.pid_path.unlink()
 374                    except Exception:
 375                        pass
 376
 377                    if is_success_tuple(result):
 378                        try:
 379                            mrsm.pprint(result)
 380                        except BrokenPipeError:
 381                            pass
 382
 383        except Exception:
 384            daemon_error = traceback.format_exc()
 385            import meerschaum.config.paths as paths
 386            with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 387                f.write(
 388                    f"Error in Daemon '{self}':\n\n"
 389                    f"{sys.stdin=}\n"
 390                    f"{self.stdin_file_path=}\n"
 391                    f"{self.stdin_file_path.exists()=}\n\n"
 392                    f"{daemon_error}\n\n"
 393                )
 394            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 395
 396    def _capture_process_timestamp(
 397        self,
 398        process_key: str,
 399        write_properties: bool = True,
 400    ) -> None:
 401        """
 402        Record the current timestamp to the parameters `process:<process_key>`.
 403
 404        Parameters
 405        ----------
 406        process_key: str
 407            Under which key to store the timestamp.
 408
 409        write_properties: bool, default True
 410            If `True` persist the properties to disk immediately after capturing the timestamp.
 411        """
 412        if 'process' not in self.properties:
 413            self.properties['process'] = {}
 414
 415        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 416            raise ValueError(f"Invalid key '{process_key}'.")
 417
 418        self.properties['process'][process_key] = (
 419            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 420        )
 421        if write_properties:
 422            self.write_properties()
 423
 424    def run(
 425        self,
 426        keep_daemon_output: bool = True,
 427        allow_dirty_run: bool = False,
 428        wait: bool = False,
 429        timeout: Union[int, float] = 4,
 430        debug: bool = False,
 431    ) -> SuccessTuple:
 432        """Run the daemon as a child process and continue executing the parent.
 433
 434        Parameters
 435        ----------
 436        keep_daemon_output: bool, default True
 437            If `False`, delete the daemon's output directory upon exiting.
 438
 439        allow_dirty_run: bool, default False
 440            If `True`, run the daemon, even if the `daemon_id` directory exists.
 441            This option is dangerous because if the same `daemon_id` runs concurrently,
 442            the last to finish will overwrite the output of the first.
 443
 444        wait: bool, default True
 445            If `True`, block until `Daemon.status` is running (or the timeout expires).
 446
 447        timeout: Union[int, float], default 4
 448            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
 449
 450        Returns
 451        -------
 452        A SuccessTuple indicating success.
 453
 454        """
 455        import platform
 456        if platform.system() == 'Windows':
 457            return False, "Cannot run background jobs on Windows."
 458
 459        ### The daemon might exist and be paused.
 460        if self.status == 'paused':
 461            return self.resume()
 462
 463        self._remove_stop_file()
 464        if self.status == 'running':
 465            return True, f"Daemon '{self}' is already running."
 466
 467        self.mkdir_if_not_exists(allow_dirty_run)
 468        _write_pickle_success_tuple = self.write_pickle()
 469        if not _write_pickle_success_tuple[0]:
 470            return _write_pickle_success_tuple
 471
 472        _launch_daemon_code = (
 473            "from meerschaum.utils.daemon import Daemon, _daemons; "
 474            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 475            f"_daemons['{self.daemon_id}'] = daemon; "
 476            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 477            "allow_dirty_run=True)"
 478        )
 479        env = dict(os.environ)
 480        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 481        msg = (
 482            "Success"
 483            if _launch_success_bool
 484            else f"Failed to start daemon '{self.daemon_id}'."
 485        )
 486        if not wait or not _launch_success_bool:
 487            return _launch_success_bool, msg
 488
 489        timeout = self.get_timeout_seconds(timeout)
 490        check_timeout_interval = self.get_check_timeout_interval_seconds()
 491
 492        if not timeout:
 493            success = self.status == 'running'
 494            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
 495            if success:
 496                self._capture_process_timestamp('began')
 497            return success, msg
 498
 499        begin = time.perf_counter()
 500        while (time.perf_counter() - begin) < timeout:
 501            if self.status == 'running':
 502                self._capture_process_timestamp('began')
 503                return True, "Success"
 504            time.sleep(check_timeout_interval)
 505
 506        return False, (
 507            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
 508            + ('s' if timeout != 1 else '') + '.'
 509        )
 510
 511
 512    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 513        """
 514        Forcibly terminate a running daemon.
 515        Sends a SIGTERM signal to the process.
 516
 517        Parameters
 518        ----------
 519        timeout: Optional[int], default 3
 520            How many seconds to wait for the process to terminate.
 521
 522        Returns
 523        -------
 524        A SuccessTuple indicating success.
 525        """
 526        if self.status != 'paused':
 527            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 528            if success:
 529                self._write_stop_file('kill')
 530                self.stdin_file.close()
 531                self._remove_blocking_stdin_file()
 532                return success, msg
 533
 534        if self.status == 'stopped':
 535            self._write_stop_file('kill')
 536            self.stdin_file.close()
 537            self._remove_blocking_stdin_file()
 538            return True, "Process has already stopped."
 539
 540        psutil = attempt_import('psutil')
 541        process = self.process
 542        try:
 543            process.terminate()
 544            process.kill()
 545            process.wait(timeout=timeout)
 546        except Exception as e:
 547            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
 548
 549        try:
 550            if process.status():
 551                return False, "Failed to stop daemon '{self}' ({process})."
 552        except psutil.NoSuchProcess:
 553            pass
 554
 555        if self.pid_path.exists():
 556            try:
 557                self.pid_path.unlink()
 558            except Exception:
 559                pass
 560
 561        self._write_stop_file('kill')
 562        self.stdin_file.close()
 563        self._remove_blocking_stdin_file()
 564        return True, "Success"
 565
 566    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 567        """Gracefully quit a running daemon."""
 568        if self.status == 'paused':
 569            return self.kill(timeout)
 570
 571        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 572        if signal_success:
 573            self._write_stop_file('quit')
 574            self.stdin_file.close()
 575            self._remove_blocking_stdin_file()
 576        return signal_success, signal_msg
 577
 578    def pause(
 579        self,
 580        timeout: Union[int, float, None] = None,
 581        check_timeout_interval: Union[float, int, None] = None,
 582    ) -> SuccessTuple:
 583        """
 584        Pause the daemon if it is running.
 585
 586        Parameters
 587        ----------
 588        timeout: Union[float, int, None], default None
 589            The maximum number of seconds to wait for a process to suspend.
 590
 591        check_timeout_interval: Union[float, int, None], default None
 592            The number of seconds to wait between checking if the process is still running.
 593
 594        Returns
 595        -------
 596        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 597        """
 598        self._remove_blocking_stdin_file()
 599
 600        if self.process is None:
 601            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 602
 603        if self.status == 'paused':
 604            return True, f"Daemon '{self.daemon_id}' is already paused."
 605
 606        self._write_stop_file('pause')
 607        self.stdin_file.close()
 608        self._remove_blocking_stdin_file()
 609        try:
 610            self.process.suspend()
 611        except Exception as e:
 612            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 613
 614        timeout = self.get_timeout_seconds(timeout)
 615        check_timeout_interval = self.get_check_timeout_interval_seconds(
 616            check_timeout_interval
 617        )
 618
 619        psutil = attempt_import('psutil')
 620
 621        if not timeout:
 622            try:
 623                success = self.process.status() == 'stopped'
 624            except psutil.NoSuchProcess:
 625                success = True
 626            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 627            if success:
 628                self._capture_process_timestamp('paused')
 629            return success, msg
 630
 631        begin = time.perf_counter()
 632        while (time.perf_counter() - begin) < timeout:
 633            try:
 634                if self.process.status() == 'stopped':
 635                    self._capture_process_timestamp('paused')
 636                    return True, "Success"
 637            except psutil.NoSuchProcess as e:
 638                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 639            time.sleep(check_timeout_interval)
 640
 641        return False, (
 642            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 643            + ('s' if timeout != 1 else '') + '.'
 644        )
 645
 646    def resume(
 647        self,
 648        timeout: Union[int, float, None] = None,
 649        check_timeout_interval: Union[float, int, None] = None,
 650    ) -> SuccessTuple:
 651        """
 652        Resume the daemon if it is paused.
 653
 654        Parameters
 655        ----------
 656        timeout: Union[float, int, None], default None
 657            The maximum number of seconds to wait for a process to resume.
 658
 659        check_timeout_interval: Union[float, int, None], default None
 660            The number of seconds to wait between checking if the process is still stopped.
 661
 662        Returns
 663        -------
 664        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 665        """
 666        if self.status == 'running':
 667            return True, f"Daemon '{self.daemon_id}' is already running."
 668
 669        if self.status == 'stopped':
 670            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 671
 672        self._remove_stop_file()
 673        try:
 674            if self.process is None:
 675                return False, f"Cannot resume daemon '{self.daemon_id}'."
 676
 677            self.process.resume()
 678        except Exception as e:
 679            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 680
 681        timeout = self.get_timeout_seconds(timeout)
 682        check_timeout_interval = self.get_check_timeout_interval_seconds(
 683            check_timeout_interval
 684        )
 685
 686        if not timeout:
 687            success = self.status == 'running'
 688            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 689            if success:
 690                self._capture_process_timestamp('began')
 691            return success, msg
 692
 693        begin = time.perf_counter()
 694        while (time.perf_counter() - begin) < timeout:
 695            if self.status == 'running':
 696                self._capture_process_timestamp('began')
 697                return True, "Success"
 698            time.sleep(check_timeout_interval)
 699
 700        return False, (
 701            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 702            + ('s' if timeout != 1 else '') + '.'
 703        )
 704
 705    def _write_stop_file(self, action: str) -> SuccessTuple:
 706        """Write the stop file timestamp and action."""
 707        if action not in ('quit', 'kill', 'pause'):
 708            return False, f"Unsupported action '{action}'."
 709
 710        if not self.stop_path.parent.exists():
 711            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 712
 713        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 714            json.dump(
 715                {
 716                    'stop_time': datetime.now(timezone.utc).isoformat(),
 717                    'action': action,
 718                },
 719                f
 720            )
 721
 722        return True, "Success"
 723
 724    def _remove_stop_file(self) -> SuccessTuple:
 725        """Remove the stop file"""
 726        if not self.stop_path.exists():
 727            return True, "Stop file does not exist."
 728
 729        try:
 730            self.stop_path.unlink()
 731        except Exception as e:
 732            return False, f"Failed to remove stop file:\n{e}"
 733
 734        return True, "Success"
 735
 736    def _read_stop_file(self) -> Dict[str, Any]:
 737        """
 738        Read the stop file if it exists.
 739        """
 740        if not self.stop_path.exists():
 741            return {}
 742
 743        try:
 744            with open(self.stop_path, 'r', encoding='utf-8') as f:
 745                data = json.load(f)
 746            return data
 747        except Exception:
 748            return {}
 749
 750    def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple:
 751        """
 752        Remove the blocking STDIN file if it exists.
 753        """
 754        try:
 755            if self.blocking_stdin_file_path.exists():
 756                self.blocking_stdin_file_path.unlink()
 757        except Exception as e:
 758            return False, str(e)
 759
 760        return True, "Success"
 761
 762    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 763        """
 764        Handle `SIGTERM` within the `Daemon` context.
 765        This method is injected into the `DaemonContext`.
 766        """
 767        from meerschaum.utils.process import signal_handler
 768        from meerschaum.utils.threading import request_stop, interrupt_threads
 769        signal_handler(signal_number, stack_frame)
 770
 771        ### Tell cooperative loops (e.g. `sync pipes`) to stop, then actively unwind
 772        ### any worker threads so they cannot keep the process alive as a zombie.
 773        request_stop()
 774        interrupt_threads(SystemExit)
 775
 776        timer = self.__dict__.get('_log_refresh_timer', None)
 777        if timer is not None:
 778            timer.cancel()
 779
 780        daemon_context = self.__dict__.get('_daemon_context', None)
 781        if daemon_context is not None:
 782            daemon_context.close()
 783
 784        _close_pools()
 785        raise SystemExit(0)
 786
 787    def _send_signal(
 788        self,
 789        signal_to_send,
 790        timeout: Union[float, int, None] = None,
 791        check_timeout_interval: Union[float, int, None] = None,
 792    ) -> SuccessTuple:
 793        """Send a signal to the daemon process.
 794
 795        Parameters
 796        ----------
 797        signal_to_send:
 798            The signal the send to the daemon, e.g. `signals.SIGINT`.
 799
 800        timeout: Union[float, int, None], default None
 801            The maximum number of seconds to wait for a process to terminate.
 802
 803        check_timeout_interval: Union[float, int, None], default None
 804            The number of seconds to wait between checking if the process is still running.
 805
 806        Returns
 807        -------
 808        A SuccessTuple indicating success.
 809        """
 810        try:
 811            pid = self.pid
 812            if pid is None:
 813                return (
 814                    False,
 815                    f"Daemon '{self.daemon_id}' is not running, "
 816                    + f"cannot send signal '{signal_to_send}'."
 817                )
 818            
 819            os.kill(pid, signal_to_send)
 820        except Exception:
 821            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 822
 823        timeout = self.get_timeout_seconds(timeout)
 824        check_timeout_interval = self.get_check_timeout_interval_seconds(
 825            check_timeout_interval
 826        )
 827
 828        if not timeout:
 829            return True, f"Successfully sent '{signal_to_send}' to daemon '{self.daemon_id}'."
 830
 831        begin = time.perf_counter()
 832        while (time.perf_counter() - begin) < timeout:
 833            if not self.status == 'running':
 834                return True, "Success"
 835            time.sleep(check_timeout_interval)
 836
 837        return False, (
 838            f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second"
 839            + ('s' if timeout != 1 else '') + '.'
 840        )
 841
 842    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 843        """Create the Daemon's directory.
 844        If `allow_dirty_run` is `False` and the directory already exists,
 845        raise a `FileExistsError`.
 846        """
 847        try:
 848            self.path.mkdir(parents=True, exist_ok=True)
 849            _already_exists = any(os.scandir(self.path))
 850        except FileExistsError:
 851            _already_exists = True
 852
 853        if _already_exists and not allow_dirty_run:
 854            error(
 855                f"Daemon '{self.daemon_id}' already exists. " +
 856                "To allow this daemon to run, do one of the following:\n"
 857                + "  - Execute `daemon.cleanup()`.\n"
 858                + f"  - Delete the directory '{self.path}'.\n"
 859                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 860                FileExistsError,
 861            )
 862
 863    @property
 864    def process(self) -> Union['psutil.Process', None]:
 865        """
 866        Return the psutil process for the Daemon.
 867        """
 868        psutil = attempt_import('psutil')
 869        pid = self.pid
 870        if pid is None:
 871            return None
 872        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 873            try:
 874                self._process = psutil.Process(int(pid))
 875                process_exists = True
 876            except Exception:
 877                process_exists = False
 878            if not process_exists:
 879                _ = self.__dict__.pop('_process', None)
 880                try:
 881                    if self.pid_path.exists():
 882                        self.pid_path.unlink()
 883                except Exception:
 884                    pass
 885                return None
 886        return self._process
 887
 888    @property
 889    def status(self) -> str:
 890        """
 891        Return the running status of this Daemon.
 892        """
 893        if self.process is None:
 894            return 'stopped'
 895
 896        psutil = attempt_import('psutil', lazy=False)
 897        try:
 898            if self.process.status() == 'stopped':
 899                return 'paused'
 900            if self.process.status() == 'zombie':
 901                raise psutil.NoSuchProcess(self.process.pid)
 902        except (psutil.NoSuchProcess, AttributeError):
 903            if self.pid_path.exists():
 904                try:
 905                    self.pid_path.unlink()
 906                except Exception:
 907                    pass
 908            return 'stopped'
 909
 910        return 'running'
 911
 912    @classmethod
 913    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 914        """
 915        Return a Daemon's path from its `daemon_id`.
 916        """
 917        import meerschaum.config.paths as paths
 918        return paths.DAEMON_RESOURCES_PATH / daemon_id
 919
 920    @property
 921    def path(self) -> pathlib.Path:
 922        """
 923        Return the path for this Daemon's directory.
 924        """
 925        return self._get_path_from_daemon_id(self.daemon_id)
 926
 927    @classmethod
 928    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 929        """
 930        Return the `properties.json` path for a given `daemon_id`.
 931        """
 932        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 933
 934    @property
 935    def properties_path(self) -> pathlib.Path:
 936        """
 937        Return the `propterties.json` path for this Daemon.
 938        """
 939        return self._get_properties_path_from_daemon_id(self.daemon_id)
 940
 941    @property
 942    def stop_path(self) -> pathlib.Path:
 943        """
 944        Return the path for the stop file (created when manually stopped).
 945        """
 946        return self.path / '.stop.json'
 947
 948    @property
 949    def log_path(self) -> pathlib.Path:
 950        """
 951        Return the log path.
 952        """
 953        logs_cf = self.properties.get('logs', None) or {}
 954        if 'path' not in logs_cf:
 955            import meerschaum.config.paths as paths
 956            return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 957
 958        return pathlib.Path(logs_cf['path'])
 959
 960    @property
 961    def stdin_file_path(self) -> pathlib.Path:
 962        """
 963        Return the stdin file path.
 964        """
 965        return self.path / 'input.stdin'
 966
 967    @property
 968    def blocking_stdin_file_path(self) -> pathlib.Path:
 969        """
 970        Return the stdin file path.
 971        """
 972        if '_blocking_stdin_file_path' in self.__dict__:
 973            return self._blocking_stdin_file_path
 974
 975        return self.path / 'input.stdin.block'
 976
 977    @property
 978    def prompt_kwargs_file_path(self) -> pathlib.Path:
 979        """
 980        Return the file path to the kwargs for the invoking `prompt()`.
 981        """
 982        return self.path / 'prompt_kwargs.json'
 983
 984    @property
 985    def log_offset_path(self) -> pathlib.Path:
 986        """
 987        Return the log offset file path.
 988        """
 989        import meerschaum.config.paths as paths
 990        return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 991
 992    @property
 993    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
 994        """
 995        Return the process lock context manager.
 996        """
 997        if '_log_offset_lock' in self.__dict__:
 998            return self._log_offset_lock
 999
1000        fasteners = attempt_import('fasteners')
1001        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
1002        return self._log_offset_lock
1003
1004    @property
1005    def rotating_log(self) -> RotatingFile:
1006        """
1007        The rotating log file for the daemon's output.
1008        """
1009        if '_rotating_log' in self.__dict__:
1010            return self._rotating_log
1011
1012        logs_cf = self.properties.get('logs', None) or {}
1013        write_timestamps = logs_cf.get('write_timestamps', None)
1014        if write_timestamps is None:
1015            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1016
1017        timestamp_format = logs_cf.get('timestamp_format', None)
1018        if timestamp_format is None:
1019            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1020
1021        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1022        if num_files_to_keep is None:
1023            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1024
1025        max_file_size = logs_cf.get('max_file_size', None)
1026        if max_file_size is None:
1027            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1028
1029        redirect_streams = logs_cf.get('redirect_streams', True)
1030
1031        self._rotating_log = RotatingFile(
1032            self.log_path,
1033            redirect_streams=redirect_streams,
1034            write_timestamps=write_timestamps,
1035            timestamp_format=timestamp_format,
1036            num_files_to_keep=num_files_to_keep,
1037            max_file_size=max_file_size,
1038        )
1039        return self._rotating_log
1040
1041    @property
1042    def stdin_file(self):
1043        """
1044        Return the file handler for the stdin file.
1045        """
1046        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1047            return _stdin_file
1048
1049        self._stdin_file = StdinFile(
1050            self.stdin_file_path,
1051            lock_file_path=self.blocking_stdin_file_path,
1052        )
1053        return self._stdin_file
1054
1055    @property
1056    def log_text(self) -> Union[str, None]:
1057        """
1058        Read the log files and return their contents.
1059        Returns `None` if the log file does not exist.
1060        """
1061        logs_cf = self.properties.get('logs', None) or {}
1062        write_timestamps = logs_cf.get('write_timestamps', None)
1063        if write_timestamps is None:
1064            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1065
1066        timestamp_format = logs_cf.get('timestamp_format', None)
1067        if timestamp_format is None:
1068            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1069
1070        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1071        if num_files_to_keep is None:
1072            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1073
1074        max_file_size = logs_cf.get('max_file_size', None)
1075        if max_file_size is None:
1076            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1077
1078        new_rotating_log = RotatingFile(
1079            self.rotating_log.file_path,
1080            num_files_to_keep=num_files_to_keep,
1081            max_file_size=max_file_size,
1082            write_timestamps=write_timestamps,
1083            timestamp_format=timestamp_format,
1084        )
1085        return new_rotating_log.read()
1086
1087    def readlines(self) -> List[str]:
1088        """
1089        Read the next log lines, persisting the cursor for later use.
1090        Note this will alter the cursor of `self.rotating_log`.
1091        """
1092        self.rotating_log._cursor = self._read_log_offset()
1093        lines = self.rotating_log.readlines()
1094        self._write_log_offset()
1095        return lines
1096
1097    def _read_log_offset(self) -> Tuple[int, int]:
1098        """
1099        Return the current log offset cursor.
1100
1101        Returns
1102        -------
1103        A tuple of the form (`subfile_index`, `position`).
1104        """
1105        if not self.log_offset_path.exists():
1106            return 0, 0
1107
1108        try:
1109            with open(self.log_offset_path, 'r', encoding='utf-8') as f:
1110                cursor_text = f.read()
1111            cursor_parts = cursor_text.split(' ')
1112            subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
1113            return subfile_index, subfile_position
1114        except Exception as e:
1115            warn(f"Failed to read cursor:\n{e}")
1116        return 0, 0
1117
1118    def _write_log_offset(self) -> None:
1119        """
1120        Write the current log offset file.
1121        """
1122        with self.log_offset_lock:
1123            with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
1124                subfile_index = self.rotating_log._cursor[0]
1125                subfile_position = self.rotating_log._cursor[1]
1126                f.write(f"{subfile_index} {subfile_position}")
1127
1128    @property
1129    def pid(self) -> Union[int, None]:
1130        """
1131        Read the PID file and return its contents.
1132        Returns `None` if the PID file does not exist.
1133        """
1134        if not self.pid_path.exists():
1135            return None
1136        try:
1137            with open(self.pid_path, 'r', encoding='utf-8') as f:
1138                text = f.read()
1139            if len(text) == 0:
1140                return None
1141            pid = int(text.rstrip())
1142        except Exception as e:
1143            warn(e)
1144            text = None
1145            pid = None
1146        return pid
1147
1148    @property
1149    def pid_path(self) -> pathlib.Path:
1150        """
1151        Return the path to a file containing the PID for this Daemon.
1152        """
1153        return self.path / 'process.pid'
1154
1155    @property
1156    def pid_lock(self) -> 'fasteners.InterProcessLock':
1157        """
1158        Return the process lock context manager.
1159        """
1160        if '_pid_lock' in self.__dict__:
1161            return self._pid_lock
1162
1163        fasteners = attempt_import('fasteners')
1164        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1165        return self._pid_lock
1166
1167    @property
1168    def pickle_path(self) -> pathlib.Path:
1169        """
1170        Return the path for the pickle file.
1171        """
1172        return self.path / 'pickle.pkl'
1173
1174    def read_properties(self) -> Optional[Dict[str, Any]]:
1175        """Read the properties JSON file and return the dictionary."""
1176        if not self.properties_path.exists():
1177            return None
1178        try:
1179            with open(self.properties_path, 'r', encoding='utf-8') as file:
1180                properties = json.load(file)
1181        except Exception:
1182            properties = {}
1183        
1184        return properties or {}
1185
1186    def read_pickle(self) -> Daemon:
1187        """Read a Daemon's pickle file and return the `Daemon`."""
1188        import pickle
1189        import traceback
1190        if not self.pickle_path.exists():
1191            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1192
1193        if self.pickle_path.stat().st_size == 0:
1194            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1195
1196        try:
1197            with open(self.pickle_path, 'rb') as pickle_file:
1198                daemon = pickle.load(pickle_file)
1199            success, msg = True, 'Success'
1200        except Exception as e:
1201            success, msg = False, str(e)
1202            daemon = None
1203            traceback.print_exception(type(e), e, e.__traceback__)
1204        if not success:
1205            error(msg)
1206        return daemon
1207
1208    @property
1209    def properties(self) -> Dict[str, Any]:
1210        """
1211        Return the contents of the properties JSON file.
1212        """
1213        try:
1214            _file_properties = self.read_properties() or {}
1215        except Exception:
1216            traceback.print_exc()
1217            _file_properties = {}
1218
1219        if not self._properties:
1220            self._properties = _file_properties
1221
1222        if self._properties is None:
1223            self._properties = {}
1224
1225        if (
1226            self._properties.get('result', None) is None
1227            and _file_properties.get('result', None) is not None
1228        ):
1229            _ = self._properties.pop('result', None)
1230
1231        if _file_properties is not None:
1232            self._properties = apply_patch_to_config(
1233                _file_properties,
1234                self._properties,
1235            )
1236
1237        return self._properties
1238
1239    @property
1240    def hidden(self) -> bool:
1241        """
1242        Return a bool indicating whether this Daemon should be displayed.
1243        """
1244        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1245
1246    def write_properties(self) -> SuccessTuple:
1247        """Write the properties dictionary to the properties JSON file
1248        (only if self.properties exists).
1249        """
1250        from meerschaum.utils.misc import generate_password
1251        success, msg = (
1252            False,
1253            f"No properties to write for daemon '{self.daemon_id}'."
1254        )
1255        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1256        props = self.properties
1257        if props is not None:
1258            try:
1259                self.path.mkdir(parents=True, exist_ok=True)
1260                if self.properties_path.exists():
1261                    self.properties_path.rename(backup_path)
1262                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1263                    json.dump(props, properties_file)
1264                success, msg = True, 'Success'
1265            except Exception as e:
1266                success, msg = False, str(e)
1267
1268        try:
1269            if backup_path.exists():
1270                if not success:
1271                    backup_path.rename(self.properties_path)
1272                else:
1273                    backup_path.unlink()
1274        except Exception as e:
1275            success, msg = False, str(e)
1276
1277        return success, msg
1278
1279    def write_pickle(self) -> SuccessTuple:
1280        """Write the pickle file for the daemon."""
1281        import pickle
1282        import traceback
1283        from meerschaum.utils.misc import generate_password
1284
1285        if not self.pickle:
1286            return True, "Success"
1287
1288        from meerschaum._internal.entry import _shells
1289        if _shells:
1290            from meerschaum._internal.shell.Shell import revert_input
1291            revert_input()
1292
1293        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1294        try:
1295            self.path.mkdir(parents=True, exist_ok=True)
1296            if self.pickle_path.exists():
1297                self.pickle_path.rename(backup_path)
1298            with open(self.pickle_path, 'wb+') as pickle_file:
1299                pickle.dump(self, pickle_file)
1300            success, msg = True, "Success"
1301        except Exception as e:
1302            success, msg = False, str(e)
1303            traceback.print_exception(type(e), e, e.__traceback__)
1304        try:
1305            if backup_path.exists():
1306                if not success:
1307                    backup_path.rename(self.pickle_path)
1308                else:
1309                    backup_path.unlink()
1310        except Exception as e:
1311            success, msg = False, str(e)
1312        return success, msg
1313
1314
1315    def _setup(
1316        self,
1317        allow_dirty_run: bool = False,
1318    ) -> None:
1319        """
1320        Update properties before starting the Daemon.
1321        """
1322        if self.properties is None:
1323            self._properties = {}
1324
1325        self._properties.update({
1326            'target': {
1327                'name': self.target.__name__,
1328                'module': self.target.__module__,
1329                'args': self.target_args,
1330                'kw': self.target_kw,
1331            },
1332        })
1333        self.mkdir_if_not_exists(allow_dirty_run)
1334        _write_properties_success_tuple = self.write_properties()
1335        if not _write_properties_success_tuple[0]:
1336            error(_write_properties_success_tuple[1])
1337
1338        _write_pickle_success_tuple = self.write_pickle()
1339        if not _write_pickle_success_tuple[0]:
1340            error(_write_pickle_success_tuple[1])
1341
1342    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1343        """
1344        Remove a daemon's directory after execution.
1345
1346        Parameters
1347        ----------
1348        keep_logs: bool, default False
1349            If `True`, skip deleting the daemon's log files.
1350
1351        Returns
1352        -------
1353        A `SuccessTuple` indicating success.
1354        """
1355        if self.path.exists():
1356            try:
1357                shutil.rmtree(self.path)
1358            except Exception as e:
1359                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1360                warn(msg)
1361                return False, msg
1362        if not keep_logs:
1363            self.rotating_log.delete()
1364            try:
1365                if self.log_offset_path.exists():
1366                    self.log_offset_path.unlink()
1367            except Exception as e:
1368                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1369                warn(msg)
1370                return False, msg
1371        return True, "Success"
1372
1373
1374    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1375        """
1376        Return the timeout value to use. Use `--timeout-seconds` if provided,
1377        else the configured default (8).
1378        """
1379        if isinstance(timeout, (int, float)):
1380            return timeout
1381        return get_config('jobs', 'timeout_seconds')
1382
1383
1384    def get_check_timeout_interval_seconds(
1385        self,
1386        check_timeout_interval: Union[int, float, None] = None,
1387    ) -> Union[int, float]:
1388        """
1389        Return the interval value to check the status of timeouts.
1390        """
1391        if isinstance(check_timeout_interval, (int, float)):
1392            return check_timeout_interval
1393        return get_config('jobs', 'check_timeout_interval_seconds')
1394
1395    @property
1396    def target_args(self) -> Union[Tuple[Any], None]:
1397        """
1398        Return the positional arguments to pass to the target function.
1399        """
1400        target_args = (
1401            self.__dict__.get('_target_args', None)
1402            or self.properties.get('target', {}).get('args', None)
1403        )
1404        if target_args is None:
1405            return tuple([])
1406
1407        return tuple(target_args)
1408
1409    @property
1410    def target_kw(self) -> Union[Dict[str, Any], None]:
1411        """
1412        Return the keyword arguments to pass to the target function.
1413        """
1414        target_kw = (
1415            self.__dict__.get('_target_kw', None)
1416            or self.properties.get('target', {}).get('kw', None)
1417        )
1418        if target_kw is None:
1419            return {}
1420
1421        return {key: val for key, val in target_kw.items()}
1422
1423    @staticmethod
1424    def _get_target_reference(target) -> Union[Dict[str, str], None]:
1425        """
1426        If `target` is an importable, top-level function (e.g. `entry`), return a
1427        ``{'module': ..., 'qualname': ...}`` reference so it can be re-imported in the
1428        daemon process instead of pickled by value.
1429
1430        Pickling such a target by value serializes its entire global graph, which can
1431        reach unpicklable live state (e.g. a connector caching an `_asyncio.Task`) and
1432        raise `TypeError: cannot pickle '_asyncio.Task' object`. dill also falls back to
1433        by-value pickling whenever it cannot match the function by identity — which
1434        happens when two copies of a package are importable (a stale install shadowing
1435        a venv), so `byref=True` alone is not enough. Returns `None` for closures,
1436        lambdas, and anything not importable, which fall back to dill.
1437        """
1438        module = getattr(target, '__module__', None)
1439        qualname = getattr(target, '__qualname__', None)
1440        if not module or not qualname:
1441            return None
1442        if '<locals>' in qualname or '<lambda>' in qualname:
1443            return None
1444        return {'module': module, 'qualname': qualname}
1445
1446    @staticmethod
1447    def _load_target_reference(target_ref: Dict[str, str]):
1448        """
1449        Re-import a target from a `{'module': ..., 'qualname': ...}` reference.
1450        """
1451        import importlib
1452        obj = importlib.import_module(target_ref['module'])
1453        for part in target_ref['qualname'].split('.'):
1454            obj = getattr(obj, part)
1455        return obj
1456
1457    def __getstate__(self):
1458        """
1459        Pickle this Daemon.
1460        """
1461        dill = attempt_import('dill')
1462        state = {
1463            'target_args': self.target_args,
1464            'target_kw': self.target_kw,
1465            'daemon_id': self.daemon_id,
1466            'label': self.label,
1467            'properties': self.properties,
1468        }
1469        target_ref = self._get_target_reference(self.target)
1470        if target_ref is not None:
1471            ### Store by reference (re-imported in the daemon process), not by value.
1472            state['target'] = None
1473            state['target_ref'] = target_ref
1474        else:
1475            state['target'] = dill.dumps(self.target, byref=True)
1476        return state
1477
1478    def __setstate__(self, _state: Dict[str, Any]):
1479        """
1480        Restore this Daemon from a pickled state.
1481        If the properties file exists, skip the old pickled version.
1482        """
1483        dill = attempt_import('dill')
1484        target_ref = _state.pop('target_ref', None)
1485        if target_ref is not None and _state.get('target', None) is None:
1486            _state['target'] = self._load_target_reference(target_ref)
1487        else:
1488            _state['target'] = dill.loads(_state['target'])
1489        self._pickle = True
1490        daemon_id = _state.get('daemon_id', None)
1491        if not daemon_id:
1492            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1493
1494        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1495        ignore_properties = properties_path.exists()
1496        if ignore_properties:
1497            _state = {
1498                key: val
1499                for key, val in _state.items()
1500                if key != 'properties'
1501            }
1502        self.__init__(**_state)
1503
1504
1505    def __repr__(self):
1506        return str(self)
1507
1508    def __str__(self):
1509        return self.daemon_id
1510
1511    def __eq__(self, other):
1512        if not isinstance(other, Daemon):
1513            return False
1514        return self.daemon_id == other.daemon_id
1515
1516    def __hash__(self):
1517        return hash(self.daemon_id)
class Daemon:
  42class Daemon:
  43    """
  44    Daemonize Python functions into background processes.
  45
  46    Examples
  47    --------
  48    >>> import meerschaum as mrsm
  49    >>> from meerschaum.utils.daemons import Daemon
  50    >>> daemon = Daemon(print, ('hi',))
  51    >>> success, msg = daemon.run()
  52    >>> print(daemon.log_text)
  53
  54    2024-07-29 18:03 | hi
  55    2024-07-29 18:03 |
  56    >>> daemon.run(allow_dirty_run=True)
  57    >>> print(daemon.log_text)
  58
  59    2024-07-29 18:03 | hi
  60    2024-07-29 18:03 |
  61    2024-07-29 18:05 | hi
  62    2024-07-29 18:05 |
  63    >>> mrsm.pprint(daemon.properties)
  64    {
  65        'label': 'print',
  66        'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
  67        'result': None,
  68        'process': {'ended': '2024-07-29T18:03:33.752806'}
  69    }
  70
  71    """
  72
  73    def __new__(
  74        cls,
  75        *args,
  76        daemon_id: Optional[str] = None,
  77        **kw
  78    ):
  79        """
  80        If a daemon_id is provided and already exists, read from its pickle file.
  81        """
  82        instance = super(Daemon, cls).__new__(cls)
  83        if daemon_id is not None:
  84            instance.daemon_id = daemon_id
  85            if instance.pickle_path.exists():
  86                instance = instance.read_pickle()
  87        return instance
  88
  89    @classmethod
  90    def from_properties_file(cls, daemon_id: str) -> Daemon:
  91        """
  92        Return a Daemon from a properties dictionary.
  93        """
  94        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
  95        if not properties_path.exists():
  96            raise OSError(f"Properties file '{properties_path}' does not exist.")
  97
  98        try:
  99            with open(properties_path, 'r', encoding='utf-8') as f:
 100                properties = json.load(f)
 101        except Exception:
 102            properties = {}
 103
 104        if not properties:
 105            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
 106
 107        daemon_id = properties_path.parent.name
 108        target_cf = properties.get('target', {})
 109        target_module_name = target_cf.get('module', None)
 110        target_function_name = target_cf.get('name', None)
 111        target_args = target_cf.get('args', None)
 112        target_kw = target_cf.get('kw', None)
 113        label = properties.get('label', None)
 114
 115        if None in [
 116            target_module_name,
 117            target_function_name,
 118            target_args,
 119            target_kw,
 120        ]:
 121            raise ValueError("Missing target function information.")
 122
 123        target_module = importlib.import_module(target_module_name)
 124        target_function = getattr(target_module, target_function_name)
 125
 126        return Daemon(
 127            daemon_id=daemon_id,
 128            target=target_function,
 129            target_args=target_args,
 130            target_kw=target_kw,
 131            properties=properties,
 132            label=label,
 133        )
 134
 135
 136    def __init__(
 137        self,
 138        target: Optional[Callable[[Any], Any]] = None,
 139        target_args: Union[List[Any], Tuple[Any], None] = None,
 140        target_kw: Optional[Dict[str, Any]] = None,
 141        env: Optional[Dict[str, str]] = None,
 142        daemon_id: Optional[str] = None,
 143        label: Optional[str] = None,
 144        properties: Optional[Dict[str, Any]] = None,
 145        pickle: bool = True,
 146    ):
 147        """
 148        Parameters
 149        ----------
 150        target: Optional[Callable[[Any], Any]], default None,
 151            The function to execute in a child process.
 152
 153        target_args: Union[List[Any], Tuple[Any], None], default None
 154            Positional arguments to pass to the target function.
 155
 156        target_kw: Optional[Dict[str, Any]], default None
 157            Keyword arguments to pass to the target function.
 158
 159        env: Optional[Dict[str, str]], default None
 160            If provided, set these environment variables in the daemon process.
 161
 162        daemon_id: Optional[str], default None
 163            Build a `Daemon` from an existing `daemon_id`.
 164            If `daemon_id` is provided, other arguments are ignored and are derived
 165            from the existing pickled `Daemon`.
 166
 167        label: Optional[str], default None
 168            Label string to help identifiy a daemon.
 169            If `None`, use the function name instead.
 170
 171        properties: Optional[Dict[str, Any]], default None
 172            Override reading from the properties JSON by providing an existing dictionary.
 173        """
 174        _pickle = self.__dict__.get('_pickle', False)
 175        if daemon_id is not None:
 176            self.daemon_id = daemon_id
 177            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
 178
 179                if not self.properties_path.exists():
 180                    raise Exception(
 181                        f"Daemon '{self.daemon_id}' does not exist. "
 182                        + "Pass a target to create a new Daemon."
 183                    )
 184
 185                try:
 186                    new_daemon = self.from_properties_file(daemon_id)
 187                except Exception:
 188                    new_daemon = None
 189
 190                if new_daemon is not None:
 191                    new_daemon.write_pickle()
 192                    target = new_daemon.target
 193                    target_args = new_daemon.target_args
 194                    target_kw = new_daemon.target_kw
 195                    label = new_daemon.label
 196                    self._properties = new_daemon.properties
 197                else:
 198                    try:
 199                        self.properties_path.unlink()
 200                    except Exception:
 201                        pass
 202
 203                    raise Exception(
 204                        f"Could not recover daemon '{self.daemon_id}' "
 205                        + "from its properties file."
 206                    )
 207
 208        if 'target' not in self.__dict__:
 209            if target is None:
 210                error("Cannot create a Daemon without a target.")
 211            self.target = target
 212
 213        self.pickle = pickle
 214
 215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
 216        if '_target_args' not in self.__dict__:
 217            self._target_args = target_args
 218        if '_target_kw' not in self.__dict__:
 219            self._target_kw = target_kw
 220
 221        if 'label' not in self.__dict__:
 222            if label is None:
 223                label = (
 224                    self.target.__name__ if '__name__' in self.target.__dir__()
 225                        else str(self.target)
 226                )
 227            self.label = label
 228        elif label is not None:
 229            self.label = label
 230
 231        if 'daemon_id' not in self.__dict__:
 232            self.daemon_id = get_new_daemon_name()
 233        if '_properties' not in self.__dict__:
 234            self._properties = properties
 235        elif properties:
 236            if self._properties is None:
 237                self._properties = {}
 238            self._properties.update(properties)
 239        if self._properties is None:
 240            self._properties = {}
 241
 242        self._properties.update({'label': self.label})
 243        if env:
 244            self._properties.update({'env': env})
 245
 246        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
 247        _ = self.process
 248
 249
 250    def _run_exit(
 251        self,
 252        keep_daemon_output: bool = True,
 253        allow_dirty_run: bool = False,
 254    ) -> Any:
 255        """Run the daemon's target function.
 256        NOTE: This WILL EXIT the parent process!
 257
 258        Parameters
 259        ----------
 260        keep_daemon_output: bool, default True
 261            If `False`, delete the daemon's output directory upon exiting.
 262
 263        allow_dirty_run, bool, default False:
 264            If `True`, run the daemon, even if the `daemon_id` directory exists.
 265            This option is dangerous because if the same `daemon_id` runs twice,
 266            the last to finish will overwrite the output of the first.
 267
 268        Returns
 269        -------
 270        Nothing — this will exit the parent process.
 271        """
 272        import platform
 273        import sys
 274        import os
 275        import traceback
 276        from meerschaum.utils.warnings import warn
 277        from meerschaum.config import get_config
 278        daemon = attempt_import('daemon')
 279        lines = get_config('jobs', 'terminal', 'lines')
 280        columns = get_config('jobs', 'terminal', 'columns')
 281
 282        if platform.system() == 'Windows':
 283            return False, "Windows is no longer supported."
 284
 285        self._setup(allow_dirty_run)
 286
 287        _daemons.append(self)
 288
 289        logs_cf = self.properties.get('logs', {})
 290        log_refresh_seconds = logs_cf.get('refresh_files_seconds', None)
 291        if log_refresh_seconds is None:
 292            log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds')
 293        write_timestamps = logs_cf.get('write_timestamps', None)
 294        if write_timestamps is None:
 295            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
 296
 297        self._log_refresh_timer = RepeatTimer(
 298            log_refresh_seconds,
 299            partial(self.rotating_log.refresh_files, start_interception=write_timestamps),
 300        )
 301
 302        capture_stdin = logs_cf.get('stdin', True)
 303        cwd = self.properties.get('cwd', os.getcwd())
 304
 305        ### NOTE: The SIGINT handler has been removed so that child processes may handle
 306        ###       KeyboardInterrupts themselves.
 307        ###       The previous aggressive approach was redundant because of the SIGTERM handler.
 308        self._daemon_context = daemon.DaemonContext(
 309            pidfile=self.pid_lock,
 310            stdout=self.rotating_log,
 311            stderr=self.rotating_log,
 312            stdin=(self.stdin_file if capture_stdin else None),
 313            working_directory=cwd,
 314            detach_process=True,
 315            files_preserve=list(self.rotating_log.subfile_objects.values()),
 316            signal_map={
 317                signal.SIGTERM: self._handle_sigterm,
 318            },
 319        )
 320
 321        if capture_stdin and sys.stdin is None:
 322            raise OSError("Cannot daemonize without stdin.")
 323
 324        try:
 325            os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns))
 326            with self._daemon_context:
 327                if capture_stdin:
 328                    sys.stdin = self.stdin_file
 329                _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None)
 330                os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id
 331                os.environ['PYTHONUNBUFFERED'] = '1'
 332
 333                ### Allow the user to override environment variables.
 334                env = self.properties.get('env', {})
 335                if env and isinstance(env, dict):
 336                    os.environ.update({str(k): str(v) for k, v in env.items()})
 337
 338                self.rotating_log.refresh_files(start_interception=True)
 339                result = None
 340                try:
 341                    with open(self.pid_path, 'w+', encoding='utf-8') as f:
 342                        f.write(str(os.getpid()))
 343
 344                    ### NOTE: The timer fails to start for remote actions to localhost.
 345                    try:
 346                        if not self._log_refresh_timer.is_running():
 347                            self._log_refresh_timer.start()
 348                    except Exception:
 349                        pass
 350
 351                    self.properties['result'] = None
 352                    self._capture_process_timestamp('began')
 353                    result = self.target(*self.target_args, **self.target_kw)
 354                    self.properties['result'] = result
 355                except (BrokenPipeError, KeyboardInterrupt, SystemExit):
 356                    result = False, traceback.format_exc()
 357                except Exception as e:
 358                    warn(
 359                        f"Exception in daemon target function: {traceback.format_exc()}",
 360                    )
 361                    result = False, str(e)
 362                finally:
 363                    _results[self.daemon_id] = result
 364                    self.properties['result'] = result
 365
 366                    if keep_daemon_output:
 367                        self._capture_process_timestamp('ended')
 368                    else:
 369                        self.cleanup()
 370
 371                    self._log_refresh_timer.cancel()
 372                    try:
 373                        if self.pid is None and self.pid_path.exists():
 374                            self.pid_path.unlink()
 375                    except Exception:
 376                        pass
 377
 378                    if is_success_tuple(result):
 379                        try:
 380                            mrsm.pprint(result)
 381                        except BrokenPipeError:
 382                            pass
 383
 384        except Exception:
 385            daemon_error = traceback.format_exc()
 386            import meerschaum.config.paths as paths
 387            with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
 388                f.write(
 389                    f"Error in Daemon '{self}':\n\n"
 390                    f"{sys.stdin=}\n"
 391                    f"{self.stdin_file_path=}\n"
 392                    f"{self.stdin_file_path.exists()=}\n\n"
 393                    f"{daemon_error}\n\n"
 394                )
 395            warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}")
 396
 397    def _capture_process_timestamp(
 398        self,
 399        process_key: str,
 400        write_properties: bool = True,
 401    ) -> None:
 402        """
 403        Record the current timestamp to the parameters `process:<process_key>`.
 404
 405        Parameters
 406        ----------
 407        process_key: str
 408            Under which key to store the timestamp.
 409
 410        write_properties: bool, default True
 411            If `True` persist the properties to disk immediately after capturing the timestamp.
 412        """
 413        if 'process' not in self.properties:
 414            self.properties['process'] = {}
 415
 416        if process_key not in ('began', 'ended', 'paused', 'stopped'):
 417            raise ValueError(f"Invalid key '{process_key}'.")
 418
 419        self.properties['process'][process_key] = (
 420            datetime.now(timezone.utc).replace(tzinfo=None).isoformat()
 421        )
 422        if write_properties:
 423            self.write_properties()
 424
 425    def run(
 426        self,
 427        keep_daemon_output: bool = True,
 428        allow_dirty_run: bool = False,
 429        wait: bool = False,
 430        timeout: Union[int, float] = 4,
 431        debug: bool = False,
 432    ) -> SuccessTuple:
 433        """Run the daemon as a child process and continue executing the parent.
 434
 435        Parameters
 436        ----------
 437        keep_daemon_output: bool, default True
 438            If `False`, delete the daemon's output directory upon exiting.
 439
 440        allow_dirty_run: bool, default False
 441            If `True`, run the daemon, even if the `daemon_id` directory exists.
 442            This option is dangerous because if the same `daemon_id` runs concurrently,
 443            the last to finish will overwrite the output of the first.
 444
 445        wait: bool, default True
 446            If `True`, block until `Daemon.status` is running (or the timeout expires).
 447
 448        timeout: Union[int, float], default 4
 449            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
 450
 451        Returns
 452        -------
 453        A SuccessTuple indicating success.
 454
 455        """
 456        import platform
 457        if platform.system() == 'Windows':
 458            return False, "Cannot run background jobs on Windows."
 459
 460        ### The daemon might exist and be paused.
 461        if self.status == 'paused':
 462            return self.resume()
 463
 464        self._remove_stop_file()
 465        if self.status == 'running':
 466            return True, f"Daemon '{self}' is already running."
 467
 468        self.mkdir_if_not_exists(allow_dirty_run)
 469        _write_pickle_success_tuple = self.write_pickle()
 470        if not _write_pickle_success_tuple[0]:
 471            return _write_pickle_success_tuple
 472
 473        _launch_daemon_code = (
 474            "from meerschaum.utils.daemon import Daemon, _daemons; "
 475            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
 476            f"_daemons['{self.daemon_id}'] = daemon; "
 477            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
 478            "allow_dirty_run=True)"
 479        )
 480        env = dict(os.environ)
 481        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
 482        msg = (
 483            "Success"
 484            if _launch_success_bool
 485            else f"Failed to start daemon '{self.daemon_id}'."
 486        )
 487        if not wait or not _launch_success_bool:
 488            return _launch_success_bool, msg
 489
 490        timeout = self.get_timeout_seconds(timeout)
 491        check_timeout_interval = self.get_check_timeout_interval_seconds()
 492
 493        if not timeout:
 494            success = self.status == 'running'
 495            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
 496            if success:
 497                self._capture_process_timestamp('began')
 498            return success, msg
 499
 500        begin = time.perf_counter()
 501        while (time.perf_counter() - begin) < timeout:
 502            if self.status == 'running':
 503                self._capture_process_timestamp('began')
 504                return True, "Success"
 505            time.sleep(check_timeout_interval)
 506
 507        return False, (
 508            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
 509            + ('s' if timeout != 1 else '') + '.'
 510        )
 511
 512
 513    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
 514        """
 515        Forcibly terminate a running daemon.
 516        Sends a SIGTERM signal to the process.
 517
 518        Parameters
 519        ----------
 520        timeout: Optional[int], default 3
 521            How many seconds to wait for the process to terminate.
 522
 523        Returns
 524        -------
 525        A SuccessTuple indicating success.
 526        """
 527        if self.status != 'paused':
 528            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
 529            if success:
 530                self._write_stop_file('kill')
 531                self.stdin_file.close()
 532                self._remove_blocking_stdin_file()
 533                return success, msg
 534
 535        if self.status == 'stopped':
 536            self._write_stop_file('kill')
 537            self.stdin_file.close()
 538            self._remove_blocking_stdin_file()
 539            return True, "Process has already stopped."
 540
 541        psutil = attempt_import('psutil')
 542        process = self.process
 543        try:
 544            process.terminate()
 545            process.kill()
 546            process.wait(timeout=timeout)
 547        except Exception as e:
 548            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
 549
 550        try:
 551            if process.status():
 552                return False, "Failed to stop daemon '{self}' ({process})."
 553        except psutil.NoSuchProcess:
 554            pass
 555
 556        if self.pid_path.exists():
 557            try:
 558                self.pid_path.unlink()
 559            except Exception:
 560                pass
 561
 562        self._write_stop_file('kill')
 563        self.stdin_file.close()
 564        self._remove_blocking_stdin_file()
 565        return True, "Success"
 566
 567    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
 568        """Gracefully quit a running daemon."""
 569        if self.status == 'paused':
 570            return self.kill(timeout)
 571
 572        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
 573        if signal_success:
 574            self._write_stop_file('quit')
 575            self.stdin_file.close()
 576            self._remove_blocking_stdin_file()
 577        return signal_success, signal_msg
 578
 579    def pause(
 580        self,
 581        timeout: Union[int, float, None] = None,
 582        check_timeout_interval: Union[float, int, None] = None,
 583    ) -> SuccessTuple:
 584        """
 585        Pause the daemon if it is running.
 586
 587        Parameters
 588        ----------
 589        timeout: Union[float, int, None], default None
 590            The maximum number of seconds to wait for a process to suspend.
 591
 592        check_timeout_interval: Union[float, int, None], default None
 593            The number of seconds to wait between checking if the process is still running.
 594
 595        Returns
 596        -------
 597        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
 598        """
 599        self._remove_blocking_stdin_file()
 600
 601        if self.process is None:
 602            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
 603
 604        if self.status == 'paused':
 605            return True, f"Daemon '{self.daemon_id}' is already paused."
 606
 607        self._write_stop_file('pause')
 608        self.stdin_file.close()
 609        self._remove_blocking_stdin_file()
 610        try:
 611            self.process.suspend()
 612        except Exception as e:
 613            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
 614
 615        timeout = self.get_timeout_seconds(timeout)
 616        check_timeout_interval = self.get_check_timeout_interval_seconds(
 617            check_timeout_interval
 618        )
 619
 620        psutil = attempt_import('psutil')
 621
 622        if not timeout:
 623            try:
 624                success = self.process.status() == 'stopped'
 625            except psutil.NoSuchProcess:
 626                success = True
 627            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
 628            if success:
 629                self._capture_process_timestamp('paused')
 630            return success, msg
 631
 632        begin = time.perf_counter()
 633        while (time.perf_counter() - begin) < timeout:
 634            try:
 635                if self.process.status() == 'stopped':
 636                    self._capture_process_timestamp('paused')
 637                    return True, "Success"
 638            except psutil.NoSuchProcess as e:
 639                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
 640            time.sleep(check_timeout_interval)
 641
 642        return False, (
 643            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
 644            + ('s' if timeout != 1 else '') + '.'
 645        )
 646
 647    def resume(
 648        self,
 649        timeout: Union[int, float, None] = None,
 650        check_timeout_interval: Union[float, int, None] = None,
 651    ) -> SuccessTuple:
 652        """
 653        Resume the daemon if it is paused.
 654
 655        Parameters
 656        ----------
 657        timeout: Union[float, int, None], default None
 658            The maximum number of seconds to wait for a process to resume.
 659
 660        check_timeout_interval: Union[float, int, None], default None
 661            The number of seconds to wait between checking if the process is still stopped.
 662
 663        Returns
 664        -------
 665        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
 666        """
 667        if self.status == 'running':
 668            return True, f"Daemon '{self.daemon_id}' is already running."
 669
 670        if self.status == 'stopped':
 671            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
 672
 673        self._remove_stop_file()
 674        try:
 675            if self.process is None:
 676                return False, f"Cannot resume daemon '{self.daemon_id}'."
 677
 678            self.process.resume()
 679        except Exception as e:
 680            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
 681
 682        timeout = self.get_timeout_seconds(timeout)
 683        check_timeout_interval = self.get_check_timeout_interval_seconds(
 684            check_timeout_interval
 685        )
 686
 687        if not timeout:
 688            success = self.status == 'running'
 689            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
 690            if success:
 691                self._capture_process_timestamp('began')
 692            return success, msg
 693
 694        begin = time.perf_counter()
 695        while (time.perf_counter() - begin) < timeout:
 696            if self.status == 'running':
 697                self._capture_process_timestamp('began')
 698                return True, "Success"
 699            time.sleep(check_timeout_interval)
 700
 701        return False, (
 702            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
 703            + ('s' if timeout != 1 else '') + '.'
 704        )
 705
 706    def _write_stop_file(self, action: str) -> SuccessTuple:
 707        """Write the stop file timestamp and action."""
 708        if action not in ('quit', 'kill', 'pause'):
 709            return False, f"Unsupported action '{action}'."
 710
 711        if not self.stop_path.parent.exists():
 712            self.stop_path.parent.mkdir(parents=True, exist_ok=True)
 713
 714        with open(self.stop_path, 'w+', encoding='utf-8') as f:
 715            json.dump(
 716                {
 717                    'stop_time': datetime.now(timezone.utc).isoformat(),
 718                    'action': action,
 719                },
 720                f
 721            )
 722
 723        return True, "Success"
 724
 725    def _remove_stop_file(self) -> SuccessTuple:
 726        """Remove the stop file"""
 727        if not self.stop_path.exists():
 728            return True, "Stop file does not exist."
 729
 730        try:
 731            self.stop_path.unlink()
 732        except Exception as e:
 733            return False, f"Failed to remove stop file:\n{e}"
 734
 735        return True, "Success"
 736
 737    def _read_stop_file(self) -> Dict[str, Any]:
 738        """
 739        Read the stop file if it exists.
 740        """
 741        if not self.stop_path.exists():
 742            return {}
 743
 744        try:
 745            with open(self.stop_path, 'r', encoding='utf-8') as f:
 746                data = json.load(f)
 747            return data
 748        except Exception:
 749            return {}
 750
 751    def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple:
 752        """
 753        Remove the blocking STDIN file if it exists.
 754        """
 755        try:
 756            if self.blocking_stdin_file_path.exists():
 757                self.blocking_stdin_file_path.unlink()
 758        except Exception as e:
 759            return False, str(e)
 760
 761        return True, "Success"
 762
 763    def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None:
 764        """
 765        Handle `SIGTERM` within the `Daemon` context.
 766        This method is injected into the `DaemonContext`.
 767        """
 768        from meerschaum.utils.process import signal_handler
 769        from meerschaum.utils.threading import request_stop, interrupt_threads
 770        signal_handler(signal_number, stack_frame)
 771
 772        ### Tell cooperative loops (e.g. `sync pipes`) to stop, then actively unwind
 773        ### any worker threads so they cannot keep the process alive as a zombie.
 774        request_stop()
 775        interrupt_threads(SystemExit)
 776
 777        timer = self.__dict__.get('_log_refresh_timer', None)
 778        if timer is not None:
 779            timer.cancel()
 780
 781        daemon_context = self.__dict__.get('_daemon_context', None)
 782        if daemon_context is not None:
 783            daemon_context.close()
 784
 785        _close_pools()
 786        raise SystemExit(0)
 787
 788    def _send_signal(
 789        self,
 790        signal_to_send,
 791        timeout: Union[float, int, None] = None,
 792        check_timeout_interval: Union[float, int, None] = None,
 793    ) -> SuccessTuple:
 794        """Send a signal to the daemon process.
 795
 796        Parameters
 797        ----------
 798        signal_to_send:
 799            The signal the send to the daemon, e.g. `signals.SIGINT`.
 800
 801        timeout: Union[float, int, None], default None
 802            The maximum number of seconds to wait for a process to terminate.
 803
 804        check_timeout_interval: Union[float, int, None], default None
 805            The number of seconds to wait between checking if the process is still running.
 806
 807        Returns
 808        -------
 809        A SuccessTuple indicating success.
 810        """
 811        try:
 812            pid = self.pid
 813            if pid is None:
 814                return (
 815                    False,
 816                    f"Daemon '{self.daemon_id}' is not running, "
 817                    + f"cannot send signal '{signal_to_send}'."
 818                )
 819            
 820            os.kill(pid, signal_to_send)
 821        except Exception:
 822            return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}"
 823
 824        timeout = self.get_timeout_seconds(timeout)
 825        check_timeout_interval = self.get_check_timeout_interval_seconds(
 826            check_timeout_interval
 827        )
 828
 829        if not timeout:
 830            return True, f"Successfully sent '{signal_to_send}' to daemon '{self.daemon_id}'."
 831
 832        begin = time.perf_counter()
 833        while (time.perf_counter() - begin) < timeout:
 834            if not self.status == 'running':
 835                return True, "Success"
 836            time.sleep(check_timeout_interval)
 837
 838        return False, (
 839            f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second"
 840            + ('s' if timeout != 1 else '') + '.'
 841        )
 842
 843    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
 844        """Create the Daemon's directory.
 845        If `allow_dirty_run` is `False` and the directory already exists,
 846        raise a `FileExistsError`.
 847        """
 848        try:
 849            self.path.mkdir(parents=True, exist_ok=True)
 850            _already_exists = any(os.scandir(self.path))
 851        except FileExistsError:
 852            _already_exists = True
 853
 854        if _already_exists and not allow_dirty_run:
 855            error(
 856                f"Daemon '{self.daemon_id}' already exists. " +
 857                "To allow this daemon to run, do one of the following:\n"
 858                + "  - Execute `daemon.cleanup()`.\n"
 859                + f"  - Delete the directory '{self.path}'.\n"
 860                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
 861                FileExistsError,
 862            )
 863
 864    @property
 865    def process(self) -> Union['psutil.Process', None]:
 866        """
 867        Return the psutil process for the Daemon.
 868        """
 869        psutil = attempt_import('psutil')
 870        pid = self.pid
 871        if pid is None:
 872            return None
 873        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
 874            try:
 875                self._process = psutil.Process(int(pid))
 876                process_exists = True
 877            except Exception:
 878                process_exists = False
 879            if not process_exists:
 880                _ = self.__dict__.pop('_process', None)
 881                try:
 882                    if self.pid_path.exists():
 883                        self.pid_path.unlink()
 884                except Exception:
 885                    pass
 886                return None
 887        return self._process
 888
 889    @property
 890    def status(self) -> str:
 891        """
 892        Return the running status of this Daemon.
 893        """
 894        if self.process is None:
 895            return 'stopped'
 896
 897        psutil = attempt_import('psutil', lazy=False)
 898        try:
 899            if self.process.status() == 'stopped':
 900                return 'paused'
 901            if self.process.status() == 'zombie':
 902                raise psutil.NoSuchProcess(self.process.pid)
 903        except (psutil.NoSuchProcess, AttributeError):
 904            if self.pid_path.exists():
 905                try:
 906                    self.pid_path.unlink()
 907                except Exception:
 908                    pass
 909            return 'stopped'
 910
 911        return 'running'
 912
 913    @classmethod
 914    def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 915        """
 916        Return a Daemon's path from its `daemon_id`.
 917        """
 918        import meerschaum.config.paths as paths
 919        return paths.DAEMON_RESOURCES_PATH / daemon_id
 920
 921    @property
 922    def path(self) -> pathlib.Path:
 923        """
 924        Return the path for this Daemon's directory.
 925        """
 926        return self._get_path_from_daemon_id(self.daemon_id)
 927
 928    @classmethod
 929    def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path:
 930        """
 931        Return the `properties.json` path for a given `daemon_id`.
 932        """
 933        return cls._get_path_from_daemon_id(daemon_id) / 'properties.json'
 934
 935    @property
 936    def properties_path(self) -> pathlib.Path:
 937        """
 938        Return the `propterties.json` path for this Daemon.
 939        """
 940        return self._get_properties_path_from_daemon_id(self.daemon_id)
 941
 942    @property
 943    def stop_path(self) -> pathlib.Path:
 944        """
 945        Return the path for the stop file (created when manually stopped).
 946        """
 947        return self.path / '.stop.json'
 948
 949    @property
 950    def log_path(self) -> pathlib.Path:
 951        """
 952        Return the log path.
 953        """
 954        logs_cf = self.properties.get('logs', None) or {}
 955        if 'path' not in logs_cf:
 956            import meerschaum.config.paths as paths
 957            return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
 958
 959        return pathlib.Path(logs_cf['path'])
 960
 961    @property
 962    def stdin_file_path(self) -> pathlib.Path:
 963        """
 964        Return the stdin file path.
 965        """
 966        return self.path / 'input.stdin'
 967
 968    @property
 969    def blocking_stdin_file_path(self) -> pathlib.Path:
 970        """
 971        Return the stdin file path.
 972        """
 973        if '_blocking_stdin_file_path' in self.__dict__:
 974            return self._blocking_stdin_file_path
 975
 976        return self.path / 'input.stdin.block'
 977
 978    @property
 979    def prompt_kwargs_file_path(self) -> pathlib.Path:
 980        """
 981        Return the file path to the kwargs for the invoking `prompt()`.
 982        """
 983        return self.path / 'prompt_kwargs.json'
 984
 985    @property
 986    def log_offset_path(self) -> pathlib.Path:
 987        """
 988        Return the log offset file path.
 989        """
 990        import meerschaum.config.paths as paths
 991        return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
 992
 993    @property
 994    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
 995        """
 996        Return the process lock context manager.
 997        """
 998        if '_log_offset_lock' in self.__dict__:
 999            return self._log_offset_lock
1000
1001        fasteners = attempt_import('fasteners')
1002        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
1003        return self._log_offset_lock
1004
1005    @property
1006    def rotating_log(self) -> RotatingFile:
1007        """
1008        The rotating log file for the daemon's output.
1009        """
1010        if '_rotating_log' in self.__dict__:
1011            return self._rotating_log
1012
1013        logs_cf = self.properties.get('logs', None) or {}
1014        write_timestamps = logs_cf.get('write_timestamps', None)
1015        if write_timestamps is None:
1016            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1017
1018        timestamp_format = logs_cf.get('timestamp_format', None)
1019        if timestamp_format is None:
1020            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1021
1022        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1023        if num_files_to_keep is None:
1024            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1025
1026        max_file_size = logs_cf.get('max_file_size', None)
1027        if max_file_size is None:
1028            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1029
1030        redirect_streams = logs_cf.get('redirect_streams', True)
1031
1032        self._rotating_log = RotatingFile(
1033            self.log_path,
1034            redirect_streams=redirect_streams,
1035            write_timestamps=write_timestamps,
1036            timestamp_format=timestamp_format,
1037            num_files_to_keep=num_files_to_keep,
1038            max_file_size=max_file_size,
1039        )
1040        return self._rotating_log
1041
1042    @property
1043    def stdin_file(self):
1044        """
1045        Return the file handler for the stdin file.
1046        """
1047        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1048            return _stdin_file
1049
1050        self._stdin_file = StdinFile(
1051            self.stdin_file_path,
1052            lock_file_path=self.blocking_stdin_file_path,
1053        )
1054        return self._stdin_file
1055
1056    @property
1057    def log_text(self) -> Union[str, None]:
1058        """
1059        Read the log files and return their contents.
1060        Returns `None` if the log file does not exist.
1061        """
1062        logs_cf = self.properties.get('logs', None) or {}
1063        write_timestamps = logs_cf.get('write_timestamps', None)
1064        if write_timestamps is None:
1065            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1066
1067        timestamp_format = logs_cf.get('timestamp_format', None)
1068        if timestamp_format is None:
1069            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1070
1071        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1072        if num_files_to_keep is None:
1073            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1074
1075        max_file_size = logs_cf.get('max_file_size', None)
1076        if max_file_size is None:
1077            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1078
1079        new_rotating_log = RotatingFile(
1080            self.rotating_log.file_path,
1081            num_files_to_keep=num_files_to_keep,
1082            max_file_size=max_file_size,
1083            write_timestamps=write_timestamps,
1084            timestamp_format=timestamp_format,
1085        )
1086        return new_rotating_log.read()
1087
1088    def readlines(self) -> List[str]:
1089        """
1090        Read the next log lines, persisting the cursor for later use.
1091        Note this will alter the cursor of `self.rotating_log`.
1092        """
1093        self.rotating_log._cursor = self._read_log_offset()
1094        lines = self.rotating_log.readlines()
1095        self._write_log_offset()
1096        return lines
1097
1098    def _read_log_offset(self) -> Tuple[int, int]:
1099        """
1100        Return the current log offset cursor.
1101
1102        Returns
1103        -------
1104        A tuple of the form (`subfile_index`, `position`).
1105        """
1106        if not self.log_offset_path.exists():
1107            return 0, 0
1108
1109        try:
1110            with open(self.log_offset_path, 'r', encoding='utf-8') as f:
1111                cursor_text = f.read()
1112            cursor_parts = cursor_text.split(' ')
1113            subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1])
1114            return subfile_index, subfile_position
1115        except Exception as e:
1116            warn(f"Failed to read cursor:\n{e}")
1117        return 0, 0
1118
1119    def _write_log_offset(self) -> None:
1120        """
1121        Write the current log offset file.
1122        """
1123        with self.log_offset_lock:
1124            with open(self.log_offset_path, 'w+', encoding='utf-8') as f:
1125                subfile_index = self.rotating_log._cursor[0]
1126                subfile_position = self.rotating_log._cursor[1]
1127                f.write(f"{subfile_index} {subfile_position}")
1128
1129    @property
1130    def pid(self) -> Union[int, None]:
1131        """
1132        Read the PID file and return its contents.
1133        Returns `None` if the PID file does not exist.
1134        """
1135        if not self.pid_path.exists():
1136            return None
1137        try:
1138            with open(self.pid_path, 'r', encoding='utf-8') as f:
1139                text = f.read()
1140            if len(text) == 0:
1141                return None
1142            pid = int(text.rstrip())
1143        except Exception as e:
1144            warn(e)
1145            text = None
1146            pid = None
1147        return pid
1148
1149    @property
1150    def pid_path(self) -> pathlib.Path:
1151        """
1152        Return the path to a file containing the PID for this Daemon.
1153        """
1154        return self.path / 'process.pid'
1155
1156    @property
1157    def pid_lock(self) -> 'fasteners.InterProcessLock':
1158        """
1159        Return the process lock context manager.
1160        """
1161        if '_pid_lock' in self.__dict__:
1162            return self._pid_lock
1163
1164        fasteners = attempt_import('fasteners')
1165        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1166        return self._pid_lock
1167
1168    @property
1169    def pickle_path(self) -> pathlib.Path:
1170        """
1171        Return the path for the pickle file.
1172        """
1173        return self.path / 'pickle.pkl'
1174
1175    def read_properties(self) -> Optional[Dict[str, Any]]:
1176        """Read the properties JSON file and return the dictionary."""
1177        if not self.properties_path.exists():
1178            return None
1179        try:
1180            with open(self.properties_path, 'r', encoding='utf-8') as file:
1181                properties = json.load(file)
1182        except Exception:
1183            properties = {}
1184        
1185        return properties or {}
1186
1187    def read_pickle(self) -> Daemon:
1188        """Read a Daemon's pickle file and return the `Daemon`."""
1189        import pickle
1190        import traceback
1191        if not self.pickle_path.exists():
1192            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1193
1194        if self.pickle_path.stat().st_size == 0:
1195            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1196
1197        try:
1198            with open(self.pickle_path, 'rb') as pickle_file:
1199                daemon = pickle.load(pickle_file)
1200            success, msg = True, 'Success'
1201        except Exception as e:
1202            success, msg = False, str(e)
1203            daemon = None
1204            traceback.print_exception(type(e), e, e.__traceback__)
1205        if not success:
1206            error(msg)
1207        return daemon
1208
1209    @property
1210    def properties(self) -> Dict[str, Any]:
1211        """
1212        Return the contents of the properties JSON file.
1213        """
1214        try:
1215            _file_properties = self.read_properties() or {}
1216        except Exception:
1217            traceback.print_exc()
1218            _file_properties = {}
1219
1220        if not self._properties:
1221            self._properties = _file_properties
1222
1223        if self._properties is None:
1224            self._properties = {}
1225
1226        if (
1227            self._properties.get('result', None) is None
1228            and _file_properties.get('result', None) is not None
1229        ):
1230            _ = self._properties.pop('result', None)
1231
1232        if _file_properties is not None:
1233            self._properties = apply_patch_to_config(
1234                _file_properties,
1235                self._properties,
1236            )
1237
1238        return self._properties
1239
1240    @property
1241    def hidden(self) -> bool:
1242        """
1243        Return a bool indicating whether this Daemon should be displayed.
1244        """
1245        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')
1246
1247    def write_properties(self) -> SuccessTuple:
1248        """Write the properties dictionary to the properties JSON file
1249        (only if self.properties exists).
1250        """
1251        from meerschaum.utils.misc import generate_password
1252        success, msg = (
1253            False,
1254            f"No properties to write for daemon '{self.daemon_id}'."
1255        )
1256        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1257        props = self.properties
1258        if props is not None:
1259            try:
1260                self.path.mkdir(parents=True, exist_ok=True)
1261                if self.properties_path.exists():
1262                    self.properties_path.rename(backup_path)
1263                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1264                    json.dump(props, properties_file)
1265                success, msg = True, 'Success'
1266            except Exception as e:
1267                success, msg = False, str(e)
1268
1269        try:
1270            if backup_path.exists():
1271                if not success:
1272                    backup_path.rename(self.properties_path)
1273                else:
1274                    backup_path.unlink()
1275        except Exception as e:
1276            success, msg = False, str(e)
1277
1278        return success, msg
1279
1280    def write_pickle(self) -> SuccessTuple:
1281        """Write the pickle file for the daemon."""
1282        import pickle
1283        import traceback
1284        from meerschaum.utils.misc import generate_password
1285
1286        if not self.pickle:
1287            return True, "Success"
1288
1289        from meerschaum._internal.entry import _shells
1290        if _shells:
1291            from meerschaum._internal.shell.Shell import revert_input
1292            revert_input()
1293
1294        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1295        try:
1296            self.path.mkdir(parents=True, exist_ok=True)
1297            if self.pickle_path.exists():
1298                self.pickle_path.rename(backup_path)
1299            with open(self.pickle_path, 'wb+') as pickle_file:
1300                pickle.dump(self, pickle_file)
1301            success, msg = True, "Success"
1302        except Exception as e:
1303            success, msg = False, str(e)
1304            traceback.print_exception(type(e), e, e.__traceback__)
1305        try:
1306            if backup_path.exists():
1307                if not success:
1308                    backup_path.rename(self.pickle_path)
1309                else:
1310                    backup_path.unlink()
1311        except Exception as e:
1312            success, msg = False, str(e)
1313        return success, msg
1314
1315
1316    def _setup(
1317        self,
1318        allow_dirty_run: bool = False,
1319    ) -> None:
1320        """
1321        Update properties before starting the Daemon.
1322        """
1323        if self.properties is None:
1324            self._properties = {}
1325
1326        self._properties.update({
1327            'target': {
1328                'name': self.target.__name__,
1329                'module': self.target.__module__,
1330                'args': self.target_args,
1331                'kw': self.target_kw,
1332            },
1333        })
1334        self.mkdir_if_not_exists(allow_dirty_run)
1335        _write_properties_success_tuple = self.write_properties()
1336        if not _write_properties_success_tuple[0]:
1337            error(_write_properties_success_tuple[1])
1338
1339        _write_pickle_success_tuple = self.write_pickle()
1340        if not _write_pickle_success_tuple[0]:
1341            error(_write_pickle_success_tuple[1])
1342
1343    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1344        """
1345        Remove a daemon's directory after execution.
1346
1347        Parameters
1348        ----------
1349        keep_logs: bool, default False
1350            If `True`, skip deleting the daemon's log files.
1351
1352        Returns
1353        -------
1354        A `SuccessTuple` indicating success.
1355        """
1356        if self.path.exists():
1357            try:
1358                shutil.rmtree(self.path)
1359            except Exception as e:
1360                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1361                warn(msg)
1362                return False, msg
1363        if not keep_logs:
1364            self.rotating_log.delete()
1365            try:
1366                if self.log_offset_path.exists():
1367                    self.log_offset_path.unlink()
1368            except Exception as e:
1369                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1370                warn(msg)
1371                return False, msg
1372        return True, "Success"
1373
1374
1375    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1376        """
1377        Return the timeout value to use. Use `--timeout-seconds` if provided,
1378        else the configured default (8).
1379        """
1380        if isinstance(timeout, (int, float)):
1381            return timeout
1382        return get_config('jobs', 'timeout_seconds')
1383
1384
1385    def get_check_timeout_interval_seconds(
1386        self,
1387        check_timeout_interval: Union[int, float, None] = None,
1388    ) -> Union[int, float]:
1389        """
1390        Return the interval value to check the status of timeouts.
1391        """
1392        if isinstance(check_timeout_interval, (int, float)):
1393            return check_timeout_interval
1394        return get_config('jobs', 'check_timeout_interval_seconds')
1395
1396    @property
1397    def target_args(self) -> Union[Tuple[Any], None]:
1398        """
1399        Return the positional arguments to pass to the target function.
1400        """
1401        target_args = (
1402            self.__dict__.get('_target_args', None)
1403            or self.properties.get('target', {}).get('args', None)
1404        )
1405        if target_args is None:
1406            return tuple([])
1407
1408        return tuple(target_args)
1409
1410    @property
1411    def target_kw(self) -> Union[Dict[str, Any], None]:
1412        """
1413        Return the keyword arguments to pass to the target function.
1414        """
1415        target_kw = (
1416            self.__dict__.get('_target_kw', None)
1417            or self.properties.get('target', {}).get('kw', None)
1418        )
1419        if target_kw is None:
1420            return {}
1421
1422        return {key: val for key, val in target_kw.items()}
1423
1424    @staticmethod
1425    def _get_target_reference(target) -> Union[Dict[str, str], None]:
1426        """
1427        If `target` is an importable, top-level function (e.g. `entry`), return a
1428        ``{'module': ..., 'qualname': ...}`` reference so it can be re-imported in the
1429        daemon process instead of pickled by value.
1430
1431        Pickling such a target by value serializes its entire global graph, which can
1432        reach unpicklable live state (e.g. a connector caching an `_asyncio.Task`) and
1433        raise `TypeError: cannot pickle '_asyncio.Task' object`. dill also falls back to
1434        by-value pickling whenever it cannot match the function by identity — which
1435        happens when two copies of a package are importable (a stale install shadowing
1436        a venv), so `byref=True` alone is not enough. Returns `None` for closures,
1437        lambdas, and anything not importable, which fall back to dill.
1438        """
1439        module = getattr(target, '__module__', None)
1440        qualname = getattr(target, '__qualname__', None)
1441        if not module or not qualname:
1442            return None
1443        if '<locals>' in qualname or '<lambda>' in qualname:
1444            return None
1445        return {'module': module, 'qualname': qualname}
1446
1447    @staticmethod
1448    def _load_target_reference(target_ref: Dict[str, str]):
1449        """
1450        Re-import a target from a `{'module': ..., 'qualname': ...}` reference.
1451        """
1452        import importlib
1453        obj = importlib.import_module(target_ref['module'])
1454        for part in target_ref['qualname'].split('.'):
1455            obj = getattr(obj, part)
1456        return obj
1457
1458    def __getstate__(self):
1459        """
1460        Pickle this Daemon.
1461        """
1462        dill = attempt_import('dill')
1463        state = {
1464            'target_args': self.target_args,
1465            'target_kw': self.target_kw,
1466            'daemon_id': self.daemon_id,
1467            'label': self.label,
1468            'properties': self.properties,
1469        }
1470        target_ref = self._get_target_reference(self.target)
1471        if target_ref is not None:
1472            ### Store by reference (re-imported in the daemon process), not by value.
1473            state['target'] = None
1474            state['target_ref'] = target_ref
1475        else:
1476            state['target'] = dill.dumps(self.target, byref=True)
1477        return state
1478
1479    def __setstate__(self, _state: Dict[str, Any]):
1480        """
1481        Restore this Daemon from a pickled state.
1482        If the properties file exists, skip the old pickled version.
1483        """
1484        dill = attempt_import('dill')
1485        target_ref = _state.pop('target_ref', None)
1486        if target_ref is not None and _state.get('target', None) is None:
1487            _state['target'] = self._load_target_reference(target_ref)
1488        else:
1489            _state['target'] = dill.loads(_state['target'])
1490        self._pickle = True
1491        daemon_id = _state.get('daemon_id', None)
1492        if not daemon_id:
1493            raise ValueError("Need a daemon_id to un-pickle a Daemon.")
1494
1495        properties_path = self._get_properties_path_from_daemon_id(daemon_id)
1496        ignore_properties = properties_path.exists()
1497        if ignore_properties:
1498            _state = {
1499                key: val
1500                for key, val in _state.items()
1501                if key != 'properties'
1502            }
1503        self.__init__(**_state)
1504
1505
1506    def __repr__(self):
1507        return str(self)
1508
1509    def __str__(self):
1510        return self.daemon_id
1511
1512    def __eq__(self, other):
1513        if not isinstance(other, Daemon):
1514            return False
1515        return self.daemon_id == other.daemon_id
1516
1517    def __hash__(self):
1518        return hash(self.daemon_id)

Daemonize Python functions into background processes.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 |

>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)

2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |

>>> mrsm.pprint(daemon.properties)
{
    'label': 'print',
    'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
    'result': None,
    'process': {'ended': '2024-07-29T18:03:33.752806'}
}
Daemon( target: Optional[Callable[[Any], Any]] = None, target_args: Union[List[Any], Tuple[Any], NoneType] = None, target_kw: Optional[Dict[str, Any]] = None, env: Optional[Dict[str, str]] = None, daemon_id: Optional[str] = None, label: Optional[str] = None, properties: Optional[Dict[str, Any]] = None, pickle: bool = True)
136    def __init__(
137        self,
138        target: Optional[Callable[[Any], Any]] = None,
139        target_args: Union[List[Any], Tuple[Any], None] = None,
140        target_kw: Optional[Dict[str, Any]] = None,
141        env: Optional[Dict[str, str]] = None,
142        daemon_id: Optional[str] = None,
143        label: Optional[str] = None,
144        properties: Optional[Dict[str, Any]] = None,
145        pickle: bool = True,
146    ):
147        """
148        Parameters
149        ----------
150        target: Optional[Callable[[Any], Any]], default None,
151            The function to execute in a child process.
152
153        target_args: Union[List[Any], Tuple[Any], None], default None
154            Positional arguments to pass to the target function.
155
156        target_kw: Optional[Dict[str, Any]], default None
157            Keyword arguments to pass to the target function.
158
159        env: Optional[Dict[str, str]], default None
160            If provided, set these environment variables in the daemon process.
161
162        daemon_id: Optional[str], default None
163            Build a `Daemon` from an existing `daemon_id`.
164            If `daemon_id` is provided, other arguments are ignored and are derived
165            from the existing pickled `Daemon`.
166
167        label: Optional[str], default None
168            Label string to help identifiy a daemon.
169            If `None`, use the function name instead.
170
171        properties: Optional[Dict[str, Any]], default None
172            Override reading from the properties JSON by providing an existing dictionary.
173        """
174        _pickle = self.__dict__.get('_pickle', False)
175        if daemon_id is not None:
176            self.daemon_id = daemon_id
177            if not self.pickle_path.exists() and not target and ('target' not in self.__dict__):
178
179                if not self.properties_path.exists():
180                    raise Exception(
181                        f"Daemon '{self.daemon_id}' does not exist. "
182                        + "Pass a target to create a new Daemon."
183                    )
184
185                try:
186                    new_daemon = self.from_properties_file(daemon_id)
187                except Exception:
188                    new_daemon = None
189
190                if new_daemon is not None:
191                    new_daemon.write_pickle()
192                    target = new_daemon.target
193                    target_args = new_daemon.target_args
194                    target_kw = new_daemon.target_kw
195                    label = new_daemon.label
196                    self._properties = new_daemon.properties
197                else:
198                    try:
199                        self.properties_path.unlink()
200                    except Exception:
201                        pass
202
203                    raise Exception(
204                        f"Could not recover daemon '{self.daemon_id}' "
205                        + "from its properties file."
206                    )
207
208        if 'target' not in self.__dict__:
209            if target is None:
210                error("Cannot create a Daemon without a target.")
211            self.target = target
212
213        self.pickle = pickle
214
215        ### NOTE: We have to check self.__dict__ in case we un-pickling.
216        if '_target_args' not in self.__dict__:
217            self._target_args = target_args
218        if '_target_kw' not in self.__dict__:
219            self._target_kw = target_kw
220
221        if 'label' not in self.__dict__:
222            if label is None:
223                label = (
224                    self.target.__name__ if '__name__' in self.target.__dir__()
225                        else str(self.target)
226                )
227            self.label = label
228        elif label is not None:
229            self.label = label
230
231        if 'daemon_id' not in self.__dict__:
232            self.daemon_id = get_new_daemon_name()
233        if '_properties' not in self.__dict__:
234            self._properties = properties
235        elif properties:
236            if self._properties is None:
237                self._properties = {}
238            self._properties.update(properties)
239        if self._properties is None:
240            self._properties = {}
241
242        self._properties.update({'label': self.label})
243        if env:
244            self._properties.update({'env': env})
245
246        ### Instantiate the process and if it doesn't exist, make sure the PID is removed.
247        _ = self.process
Parameters
  • target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
  • target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
  • target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
  • env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
  • daemon_id (Optional[str], default None): Build a Daemon from an existing daemon_id. If daemon_id is provided, other arguments are ignored and are derived from the existing pickled Daemon.
  • label (Optional[str], default None): Label string to help identifiy a daemon. If None, use the function name instead.
  • properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
@classmethod
def from_properties_file(cls, daemon_id: str) -> Daemon:
 89    @classmethod
 90    def from_properties_file(cls, daemon_id: str) -> Daemon:
 91        """
 92        Return a Daemon from a properties dictionary.
 93        """
 94        properties_path = cls._get_properties_path_from_daemon_id(daemon_id)
 95        if not properties_path.exists():
 96            raise OSError(f"Properties file '{properties_path}' does not exist.")
 97
 98        try:
 99            with open(properties_path, 'r', encoding='utf-8') as f:
100                properties = json.load(f)
101        except Exception:
102            properties = {}
103
104        if not properties:
105            raise ValueError(f"No properties could be read for daemon '{daemon_id}'.")
106
107        daemon_id = properties_path.parent.name
108        target_cf = properties.get('target', {})
109        target_module_name = target_cf.get('module', None)
110        target_function_name = target_cf.get('name', None)
111        target_args = target_cf.get('args', None)
112        target_kw = target_cf.get('kw', None)
113        label = properties.get('label', None)
114
115        if None in [
116            target_module_name,
117            target_function_name,
118            target_args,
119            target_kw,
120        ]:
121            raise ValueError("Missing target function information.")
122
123        target_module = importlib.import_module(target_module_name)
124        target_function = getattr(target_module, target_function_name)
125
126        return Daemon(
127            daemon_id=daemon_id,
128            target=target_function,
129            target_args=target_args,
130            target_kw=target_kw,
131            properties=properties,
132            label=label,
133        )

Return a Daemon from a properties dictionary.

pickle
def run( self, keep_daemon_output: bool = True, allow_dirty_run: bool = False, wait: bool = False, timeout: Union[int, float] = 4, debug: bool = False) -> Tuple[bool, str]:
425    def run(
426        self,
427        keep_daemon_output: bool = True,
428        allow_dirty_run: bool = False,
429        wait: bool = False,
430        timeout: Union[int, float] = 4,
431        debug: bool = False,
432    ) -> SuccessTuple:
433        """Run the daemon as a child process and continue executing the parent.
434
435        Parameters
436        ----------
437        keep_daemon_output: bool, default True
438            If `False`, delete the daemon's output directory upon exiting.
439
440        allow_dirty_run: bool, default False
441            If `True`, run the daemon, even if the `daemon_id` directory exists.
442            This option is dangerous because if the same `daemon_id` runs concurrently,
443            the last to finish will overwrite the output of the first.
444
445        wait: bool, default True
446            If `True`, block until `Daemon.status` is running (or the timeout expires).
447
448        timeout: Union[int, float], default 4
449            If `wait` is `True`, block for up to `timeout` seconds before returning a failure.
450
451        Returns
452        -------
453        A SuccessTuple indicating success.
454
455        """
456        import platform
457        if platform.system() == 'Windows':
458            return False, "Cannot run background jobs on Windows."
459
460        ### The daemon might exist and be paused.
461        if self.status == 'paused':
462            return self.resume()
463
464        self._remove_stop_file()
465        if self.status == 'running':
466            return True, f"Daemon '{self}' is already running."
467
468        self.mkdir_if_not_exists(allow_dirty_run)
469        _write_pickle_success_tuple = self.write_pickle()
470        if not _write_pickle_success_tuple[0]:
471            return _write_pickle_success_tuple
472
473        _launch_daemon_code = (
474            "from meerschaum.utils.daemon import Daemon, _daemons; "
475            f"daemon = Daemon(daemon_id='{self.daemon_id}'); "
476            f"_daemons['{self.daemon_id}'] = daemon; "
477            f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, "
478            "allow_dirty_run=True)"
479        )
480        env = dict(os.environ)
481        _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env)
482        msg = (
483            "Success"
484            if _launch_success_bool
485            else f"Failed to start daemon '{self.daemon_id}'."
486        )
487        if not wait or not _launch_success_bool:
488            return _launch_success_bool, msg
489
490        timeout = self.get_timeout_seconds(timeout)
491        check_timeout_interval = self.get_check_timeout_interval_seconds()
492
493        if not timeout:
494            success = self.status == 'running'
495            msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'."
496            if success:
497                self._capture_process_timestamp('began')
498            return success, msg
499
500        begin = time.perf_counter()
501        while (time.perf_counter() - begin) < timeout:
502            if self.status == 'running':
503                self._capture_process_timestamp('began')
504                return True, "Success"
505            time.sleep(check_timeout_interval)
506
507        return False, (
508            f"Failed to start daemon '{self.daemon_id}' within {timeout} second"
509            + ('s' if timeout != 1 else '') + '.'
510        )

Run the daemon as a child process and continue executing the parent.

Parameters
  • keep_daemon_output (bool, default True): If False, delete the daemon's output directory upon exiting.
  • allow_dirty_run (bool, default False): If True, run the daemon, even if the daemon_id directory exists. This option is dangerous because if the same daemon_id runs concurrently, the last to finish will overwrite the output of the first.
  • wait (bool, default True): If True, block until Daemon.status is running (or the timeout expires).
  • timeout (Union[int, float], default 4): If wait is True, block for up to timeout seconds before returning a failure.
Returns
  • A SuccessTuple indicating success.
def kill(self, timeout: Union[int, float, NoneType] = 8) -> Tuple[bool, str]:
513    def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple:
514        """
515        Forcibly terminate a running daemon.
516        Sends a SIGTERM signal to the process.
517
518        Parameters
519        ----------
520        timeout: Optional[int], default 3
521            How many seconds to wait for the process to terminate.
522
523        Returns
524        -------
525        A SuccessTuple indicating success.
526        """
527        if self.status != 'paused':
528            success, msg = self._send_signal(signal.SIGTERM, timeout=timeout)
529            if success:
530                self._write_stop_file('kill')
531                self.stdin_file.close()
532                self._remove_blocking_stdin_file()
533                return success, msg
534
535        if self.status == 'stopped':
536            self._write_stop_file('kill')
537            self.stdin_file.close()
538            self._remove_blocking_stdin_file()
539            return True, "Process has already stopped."
540
541        psutil = attempt_import('psutil')
542        process = self.process
543        try:
544            process.terminate()
545            process.kill()
546            process.wait(timeout=timeout)
547        except Exception as e:
548            return False, f"Failed to kill job {self} ({process}) with exception: {e}"
549
550        try:
551            if process.status():
552                return False, "Failed to stop daemon '{self}' ({process})."
553        except psutil.NoSuchProcess:
554            pass
555
556        if self.pid_path.exists():
557            try:
558                self.pid_path.unlink()
559            except Exception:
560                pass
561
562        self._write_stop_file('kill')
563        self.stdin_file.close()
564        self._remove_blocking_stdin_file()
565        return True, "Success"

Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.

Parameters
  • timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
  • A SuccessTuple indicating success.
def quit(self, timeout: Union[int, float, NoneType] = None) -> Tuple[bool, str]:
567    def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple:
568        """Gracefully quit a running daemon."""
569        if self.status == 'paused':
570            return self.kill(timeout)
571
572        signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout)
573        if signal_success:
574            self._write_stop_file('quit')
575            self.stdin_file.close()
576            self._remove_blocking_stdin_file()
577        return signal_success, signal_msg

Gracefully quit a running daemon.

def pause( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
579    def pause(
580        self,
581        timeout: Union[int, float, None] = None,
582        check_timeout_interval: Union[float, int, None] = None,
583    ) -> SuccessTuple:
584        """
585        Pause the daemon if it is running.
586
587        Parameters
588        ----------
589        timeout: Union[float, int, None], default None
590            The maximum number of seconds to wait for a process to suspend.
591
592        check_timeout_interval: Union[float, int, None], default None
593            The number of seconds to wait between checking if the process is still running.
594
595        Returns
596        -------
597        A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended.
598        """
599        self._remove_blocking_stdin_file()
600
601        if self.process is None:
602            return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused."
603
604        if self.status == 'paused':
605            return True, f"Daemon '{self.daemon_id}' is already paused."
606
607        self._write_stop_file('pause')
608        self.stdin_file.close()
609        self._remove_blocking_stdin_file()
610        try:
611            self.process.suspend()
612        except Exception as e:
613            return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}"
614
615        timeout = self.get_timeout_seconds(timeout)
616        check_timeout_interval = self.get_check_timeout_interval_seconds(
617            check_timeout_interval
618        )
619
620        psutil = attempt_import('psutil')
621
622        if not timeout:
623            try:
624                success = self.process.status() == 'stopped'
625            except psutil.NoSuchProcess:
626                success = True
627            msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'."
628            if success:
629                self._capture_process_timestamp('paused')
630            return success, msg
631
632        begin = time.perf_counter()
633        while (time.perf_counter() - begin) < timeout:
634            try:
635                if self.process.status() == 'stopped':
636                    self._capture_process_timestamp('paused')
637                    return True, "Success"
638            except psutil.NoSuchProcess as e:
639                return False, f"Process exited unexpectedly. Was it killed?\n{e}"
640            time.sleep(check_timeout_interval)
641
642        return False, (
643            f"Failed to pause daemon '{self.daemon_id}' within {timeout} second"
644            + ('s' if timeout != 1 else '') + '.'
645        )

Pause the daemon if it is running.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully suspended.
def resume( self, timeout: Union[int, float, NoneType] = None, check_timeout_interval: Union[float, int, NoneType] = None) -> Tuple[bool, str]:
647    def resume(
648        self,
649        timeout: Union[int, float, None] = None,
650        check_timeout_interval: Union[float, int, None] = None,
651    ) -> SuccessTuple:
652        """
653        Resume the daemon if it is paused.
654
655        Parameters
656        ----------
657        timeout: Union[float, int, None], default None
658            The maximum number of seconds to wait for a process to resume.
659
660        check_timeout_interval: Union[float, int, None], default None
661            The number of seconds to wait between checking if the process is still stopped.
662
663        Returns
664        -------
665        A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed.
666        """
667        if self.status == 'running':
668            return True, f"Daemon '{self.daemon_id}' is already running."
669
670        if self.status == 'stopped':
671            return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed."
672
673        self._remove_stop_file()
674        try:
675            if self.process is None:
676                return False, f"Cannot resume daemon '{self.daemon_id}'."
677
678            self.process.resume()
679        except Exception as e:
680            return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}"
681
682        timeout = self.get_timeout_seconds(timeout)
683        check_timeout_interval = self.get_check_timeout_interval_seconds(
684            check_timeout_interval
685        )
686
687        if not timeout:
688            success = self.status == 'running'
689            msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'."
690            if success:
691                self._capture_process_timestamp('began')
692            return success, msg
693
694        begin = time.perf_counter()
695        while (time.perf_counter() - begin) < timeout:
696            if self.status == 'running':
697                self._capture_process_timestamp('began')
698                return True, "Success"
699            time.sleep(check_timeout_interval)
700
701        return False, (
702            f"Failed to resume daemon '{self.daemon_id}' within {timeout} second"
703            + ('s' if timeout != 1 else '') + '.'
704        )

Resume the daemon if it is paused.

Parameters
  • timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
  • check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
  • A SuccessTuple indicating whether the Daemon process was successfully resumed.
def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
843    def mkdir_if_not_exists(self, allow_dirty_run: bool = False):
844        """Create the Daemon's directory.
845        If `allow_dirty_run` is `False` and the directory already exists,
846        raise a `FileExistsError`.
847        """
848        try:
849            self.path.mkdir(parents=True, exist_ok=True)
850            _already_exists = any(os.scandir(self.path))
851        except FileExistsError:
852            _already_exists = True
853
854        if _already_exists and not allow_dirty_run:
855            error(
856                f"Daemon '{self.daemon_id}' already exists. " +
857                "To allow this daemon to run, do one of the following:\n"
858                + "  - Execute `daemon.cleanup()`.\n"
859                + f"  - Delete the directory '{self.path}'.\n"
860                + "  - Pass `allow_dirty_run=True` to `daemon.run()`.\n",
861                FileExistsError,
862            )

Create the Daemon's directory. If allow_dirty_run is False and the directory already exists, raise a FileExistsError.

process: "Union['psutil.Process', None]"
864    @property
865    def process(self) -> Union['psutil.Process', None]:
866        """
867        Return the psutil process for the Daemon.
868        """
869        psutil = attempt_import('psutil')
870        pid = self.pid
871        if pid is None:
872            return None
873        if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid):
874            try:
875                self._process = psutil.Process(int(pid))
876                process_exists = True
877            except Exception:
878                process_exists = False
879            if not process_exists:
880                _ = self.__dict__.pop('_process', None)
881                try:
882                    if self.pid_path.exists():
883                        self.pid_path.unlink()
884                except Exception:
885                    pass
886                return None
887        return self._process

Return the psutil process for the Daemon.

status: str
889    @property
890    def status(self) -> str:
891        """
892        Return the running status of this Daemon.
893        """
894        if self.process is None:
895            return 'stopped'
896
897        psutil = attempt_import('psutil', lazy=False)
898        try:
899            if self.process.status() == 'stopped':
900                return 'paused'
901            if self.process.status() == 'zombie':
902                raise psutil.NoSuchProcess(self.process.pid)
903        except (psutil.NoSuchProcess, AttributeError):
904            if self.pid_path.exists():
905                try:
906                    self.pid_path.unlink()
907                except Exception:
908                    pass
909            return 'stopped'
910
911        return 'running'

Return the running status of this Daemon.

path: pathlib.Path
921    @property
922    def path(self) -> pathlib.Path:
923        """
924        Return the path for this Daemon's directory.
925        """
926        return self._get_path_from_daemon_id(self.daemon_id)

Return the path for this Daemon's directory.

properties_path: pathlib.Path
935    @property
936    def properties_path(self) -> pathlib.Path:
937        """
938        Return the `propterties.json` path for this Daemon.
939        """
940        return self._get_properties_path_from_daemon_id(self.daemon_id)

Return the propterties.json path for this Daemon.

stop_path: pathlib.Path
942    @property
943    def stop_path(self) -> pathlib.Path:
944        """
945        Return the path for the stop file (created when manually stopped).
946        """
947        return self.path / '.stop.json'

Return the path for the stop file (created when manually stopped).

log_path: pathlib.Path
949    @property
950    def log_path(self) -> pathlib.Path:
951        """
952        Return the log path.
953        """
954        logs_cf = self.properties.get('logs', None) or {}
955        if 'path' not in logs_cf:
956            import meerschaum.config.paths as paths
957            return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
958
959        return pathlib.Path(logs_cf['path'])

Return the log path.

stdin_file_path: pathlib.Path
961    @property
962    def stdin_file_path(self) -> pathlib.Path:
963        """
964        Return the stdin file path.
965        """
966        return self.path / 'input.stdin'

Return the stdin file path.

blocking_stdin_file_path: pathlib.Path
968    @property
969    def blocking_stdin_file_path(self) -> pathlib.Path:
970        """
971        Return the stdin file path.
972        """
973        if '_blocking_stdin_file_path' in self.__dict__:
974            return self._blocking_stdin_file_path
975
976        return self.path / 'input.stdin.block'

Return the stdin file path.

prompt_kwargs_file_path: pathlib.Path
978    @property
979    def prompt_kwargs_file_path(self) -> pathlib.Path:
980        """
981        Return the file path to the kwargs for the invoking `prompt()`.
982        """
983        return self.path / 'prompt_kwargs.json'

Return the file path to the kwargs for the invoking prompt().

log_offset_path: pathlib.Path
985    @property
986    def log_offset_path(self) -> pathlib.Path:
987        """
988        Return the log offset file path.
989        """
990        import meerschaum.config.paths as paths
991        return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')

Return the log offset file path.

log_offset_lock: "'fasteners.InterProcessLock'"
 993    @property
 994    def log_offset_lock(self) -> 'fasteners.InterProcessLock':
 995        """
 996        Return the process lock context manager.
 997        """
 998        if '_log_offset_lock' in self.__dict__:
 999            return self._log_offset_lock
1000
1001        fasteners = attempt_import('fasteners')
1002        self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path)
1003        return self._log_offset_lock

Return the process lock context manager.

rotating_log: meerschaum.utils.daemon.RotatingFile
1005    @property
1006    def rotating_log(self) -> RotatingFile:
1007        """
1008        The rotating log file for the daemon's output.
1009        """
1010        if '_rotating_log' in self.__dict__:
1011            return self._rotating_log
1012
1013        logs_cf = self.properties.get('logs', None) or {}
1014        write_timestamps = logs_cf.get('write_timestamps', None)
1015        if write_timestamps is None:
1016            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1017
1018        timestamp_format = logs_cf.get('timestamp_format', None)
1019        if timestamp_format is None:
1020            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1021
1022        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1023        if num_files_to_keep is None:
1024            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1025
1026        max_file_size = logs_cf.get('max_file_size', None)
1027        if max_file_size is None:
1028            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1029
1030        redirect_streams = logs_cf.get('redirect_streams', True)
1031
1032        self._rotating_log = RotatingFile(
1033            self.log_path,
1034            redirect_streams=redirect_streams,
1035            write_timestamps=write_timestamps,
1036            timestamp_format=timestamp_format,
1037            num_files_to_keep=num_files_to_keep,
1038            max_file_size=max_file_size,
1039        )
1040        return self._rotating_log

The rotating log file for the daemon's output.

stdin_file
1042    @property
1043    def stdin_file(self):
1044        """
1045        Return the file handler for the stdin file.
1046        """
1047        if (_stdin_file := self.__dict__.get('_stdin_file', None)):
1048            return _stdin_file
1049
1050        self._stdin_file = StdinFile(
1051            self.stdin_file_path,
1052            lock_file_path=self.blocking_stdin_file_path,
1053        )
1054        return self._stdin_file

Return the file handler for the stdin file.

log_text: Optional[str]
1056    @property
1057    def log_text(self) -> Union[str, None]:
1058        """
1059        Read the log files and return their contents.
1060        Returns `None` if the log file does not exist.
1061        """
1062        logs_cf = self.properties.get('logs', None) or {}
1063        write_timestamps = logs_cf.get('write_timestamps', None)
1064        if write_timestamps is None:
1065            write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled')
1066
1067        timestamp_format = logs_cf.get('timestamp_format', None)
1068        if timestamp_format is None:
1069            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
1070
1071        num_files_to_keep = logs_cf.get('num_files_to_keep', None)
1072        if num_files_to_keep is None:
1073            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
1074
1075        max_file_size = logs_cf.get('max_file_size', None)
1076        if max_file_size is None:
1077            max_file_size = get_config('jobs', 'logs', 'max_file_size')
1078
1079        new_rotating_log = RotatingFile(
1080            self.rotating_log.file_path,
1081            num_files_to_keep=num_files_to_keep,
1082            max_file_size=max_file_size,
1083            write_timestamps=write_timestamps,
1084            timestamp_format=timestamp_format,
1085        )
1086        return new_rotating_log.read()

Read the log files and return their contents. Returns None if the log file does not exist.

def readlines(self) -> List[str]:
1088    def readlines(self) -> List[str]:
1089        """
1090        Read the next log lines, persisting the cursor for later use.
1091        Note this will alter the cursor of `self.rotating_log`.
1092        """
1093        self.rotating_log._cursor = self._read_log_offset()
1094        lines = self.rotating_log.readlines()
1095        self._write_log_offset()
1096        return lines

Read the next log lines, persisting the cursor for later use. Note this will alter the cursor of self.rotating_log.

pid: Optional[int]
1129    @property
1130    def pid(self) -> Union[int, None]:
1131        """
1132        Read the PID file and return its contents.
1133        Returns `None` if the PID file does not exist.
1134        """
1135        if not self.pid_path.exists():
1136            return None
1137        try:
1138            with open(self.pid_path, 'r', encoding='utf-8') as f:
1139                text = f.read()
1140            if len(text) == 0:
1141                return None
1142            pid = int(text.rstrip())
1143        except Exception as e:
1144            warn(e)
1145            text = None
1146            pid = None
1147        return pid

Read the PID file and return its contents. Returns None if the PID file does not exist.

pid_path: pathlib.Path
1149    @property
1150    def pid_path(self) -> pathlib.Path:
1151        """
1152        Return the path to a file containing the PID for this Daemon.
1153        """
1154        return self.path / 'process.pid'

Return the path to a file containing the PID for this Daemon.

pid_lock: "'fasteners.InterProcessLock'"
1156    @property
1157    def pid_lock(self) -> 'fasteners.InterProcessLock':
1158        """
1159        Return the process lock context manager.
1160        """
1161        if '_pid_lock' in self.__dict__:
1162            return self._pid_lock
1163
1164        fasteners = attempt_import('fasteners')
1165        self._pid_lock = fasteners.InterProcessLock(self.pid_path)
1166        return self._pid_lock

Return the process lock context manager.

pickle_path: pathlib.Path
1168    @property
1169    def pickle_path(self) -> pathlib.Path:
1170        """
1171        Return the path for the pickle file.
1172        """
1173        return self.path / 'pickle.pkl'

Return the path for the pickle file.

def read_properties(self) -> Optional[Dict[str, Any]]:
1175    def read_properties(self) -> Optional[Dict[str, Any]]:
1176        """Read the properties JSON file and return the dictionary."""
1177        if not self.properties_path.exists():
1178            return None
1179        try:
1180            with open(self.properties_path, 'r', encoding='utf-8') as file:
1181                properties = json.load(file)
1182        except Exception:
1183            properties = {}
1184        
1185        return properties or {}

Read the properties JSON file and return the dictionary.

def read_pickle(self) -> Daemon:
1187    def read_pickle(self) -> Daemon:
1188        """Read a Daemon's pickle file and return the `Daemon`."""
1189        import pickle
1190        import traceback
1191        if not self.pickle_path.exists():
1192            error(f"Pickle file does not exist for daemon '{self.daemon_id}'.")
1193
1194        if self.pickle_path.stat().st_size == 0:
1195            error(f"Pickle was empty for daemon '{self.daemon_id}'.")
1196
1197        try:
1198            with open(self.pickle_path, 'rb') as pickle_file:
1199                daemon = pickle.load(pickle_file)
1200            success, msg = True, 'Success'
1201        except Exception as e:
1202            success, msg = False, str(e)
1203            daemon = None
1204            traceback.print_exception(type(e), e, e.__traceback__)
1205        if not success:
1206            error(msg)
1207        return daemon

Read a Daemon's pickle file and return the Daemon.

properties: Dict[str, Any]
1209    @property
1210    def properties(self) -> Dict[str, Any]:
1211        """
1212        Return the contents of the properties JSON file.
1213        """
1214        try:
1215            _file_properties = self.read_properties() or {}
1216        except Exception:
1217            traceback.print_exc()
1218            _file_properties = {}
1219
1220        if not self._properties:
1221            self._properties = _file_properties
1222
1223        if self._properties is None:
1224            self._properties = {}
1225
1226        if (
1227            self._properties.get('result', None) is None
1228            and _file_properties.get('result', None) is not None
1229        ):
1230            _ = self._properties.pop('result', None)
1231
1232        if _file_properties is not None:
1233            self._properties = apply_patch_to_config(
1234                _file_properties,
1235                self._properties,
1236            )
1237
1238        return self._properties

Return the contents of the properties JSON file.

hidden: bool
1240    @property
1241    def hidden(self) -> bool:
1242        """
1243        Return a bool indicating whether this Daemon should be displayed.
1244        """
1245        return self.daemon_id.startswith('_') or self.daemon_id.startswith('.')

Return a bool indicating whether this Daemon should be displayed.

def write_properties(self) -> Tuple[bool, str]:
1247    def write_properties(self) -> SuccessTuple:
1248        """Write the properties dictionary to the properties JSON file
1249        (only if self.properties exists).
1250        """
1251        from meerschaum.utils.misc import generate_password
1252        success, msg = (
1253            False,
1254            f"No properties to write for daemon '{self.daemon_id}'."
1255        )
1256        backup_path = self.properties_path.parent / (generate_password(8) + '.json')
1257        props = self.properties
1258        if props is not None:
1259            try:
1260                self.path.mkdir(parents=True, exist_ok=True)
1261                if self.properties_path.exists():
1262                    self.properties_path.rename(backup_path)
1263                with open(self.properties_path, 'w+', encoding='utf-8') as properties_file:
1264                    json.dump(props, properties_file)
1265                success, msg = True, 'Success'
1266            except Exception as e:
1267                success, msg = False, str(e)
1268
1269        try:
1270            if backup_path.exists():
1271                if not success:
1272                    backup_path.rename(self.properties_path)
1273                else:
1274                    backup_path.unlink()
1275        except Exception as e:
1276            success, msg = False, str(e)
1277
1278        return success, msg

Write the properties dictionary to the properties JSON file (only if self.properties exists).

def write_pickle(self) -> Tuple[bool, str]:
1280    def write_pickle(self) -> SuccessTuple:
1281        """Write the pickle file for the daemon."""
1282        import pickle
1283        import traceback
1284        from meerschaum.utils.misc import generate_password
1285
1286        if not self.pickle:
1287            return True, "Success"
1288
1289        from meerschaum._internal.entry import _shells
1290        if _shells:
1291            from meerschaum._internal.shell.Shell import revert_input
1292            revert_input()
1293
1294        backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl')
1295        try:
1296            self.path.mkdir(parents=True, exist_ok=True)
1297            if self.pickle_path.exists():
1298                self.pickle_path.rename(backup_path)
1299            with open(self.pickle_path, 'wb+') as pickle_file:
1300                pickle.dump(self, pickle_file)
1301            success, msg = True, "Success"
1302        except Exception as e:
1303            success, msg = False, str(e)
1304            traceback.print_exception(type(e), e, e.__traceback__)
1305        try:
1306            if backup_path.exists():
1307                if not success:
1308                    backup_path.rename(self.pickle_path)
1309                else:
1310                    backup_path.unlink()
1311        except Exception as e:
1312            success, msg = False, str(e)
1313        return success, msg

Write the pickle file for the daemon.

def cleanup(self, keep_logs: bool = False) -> Tuple[bool, str]:
1343    def cleanup(self, keep_logs: bool = False) -> SuccessTuple:
1344        """
1345        Remove a daemon's directory after execution.
1346
1347        Parameters
1348        ----------
1349        keep_logs: bool, default False
1350            If `True`, skip deleting the daemon's log files.
1351
1352        Returns
1353        -------
1354        A `SuccessTuple` indicating success.
1355        """
1356        if self.path.exists():
1357            try:
1358                shutil.rmtree(self.path)
1359            except Exception as e:
1360                msg = f"Failed to clean up '{self.daemon_id}':\n{e}"
1361                warn(msg)
1362                return False, msg
1363        if not keep_logs:
1364            self.rotating_log.delete()
1365            try:
1366                if self.log_offset_path.exists():
1367                    self.log_offset_path.unlink()
1368            except Exception as e:
1369                msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}"
1370                warn(msg)
1371                return False, msg
1372        return True, "Success"

Remove a daemon's directory after execution.

Parameters
  • keep_logs (bool, default False): If True, skip deleting the daemon's log files.
Returns
  • A SuccessTuple indicating success.
def get_timeout_seconds(self, timeout: Union[int, float, NoneType] = None) -> Union[int, float]:
1375    def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]:
1376        """
1377        Return the timeout value to use. Use `--timeout-seconds` if provided,
1378        else the configured default (8).
1379        """
1380        if isinstance(timeout, (int, float)):
1381            return timeout
1382        return get_config('jobs', 'timeout_seconds')

Return the timeout value to use. Use --timeout-seconds if provided, else the configured default (8).

def get_check_timeout_interval_seconds( self, check_timeout_interval: Union[int, float, NoneType] = None) -> Union[int, float]:
1385    def get_check_timeout_interval_seconds(
1386        self,
1387        check_timeout_interval: Union[int, float, None] = None,
1388    ) -> Union[int, float]:
1389        """
1390        Return the interval value to check the status of timeouts.
1391        """
1392        if isinstance(check_timeout_interval, (int, float)):
1393            return check_timeout_interval
1394        return get_config('jobs', 'check_timeout_interval_seconds')

Return the interval value to check the status of timeouts.

target_args: Optional[Tuple[Any]]
1396    @property
1397    def target_args(self) -> Union[Tuple[Any], None]:
1398        """
1399        Return the positional arguments to pass to the target function.
1400        """
1401        target_args = (
1402            self.__dict__.get('_target_args', None)
1403            or self.properties.get('target', {}).get('args', None)
1404        )
1405        if target_args is None:
1406            return tuple([])
1407
1408        return tuple(target_args)

Return the positional arguments to pass to the target function.

target_kw: Optional[Dict[str, Any]]
1410    @property
1411    def target_kw(self) -> Union[Dict[str, Any], None]:
1412        """
1413        Return the keyword arguments to pass to the target function.
1414        """
1415        target_kw = (
1416            self.__dict__.get('_target_kw', None)
1417            or self.properties.get('target', {}).get('kw', None)
1418        )
1419        if target_kw is None:
1420            return {}
1421
1422        return {key: val for key, val in target_kw.items()}

Return the keyword arguments to pass to the target function.