meerschaum.utils.daemon.Daemon
Manage running daemons via the Daemon class.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Manage running daemons via the Daemon class. 7""" 8 9from __future__ import annotations 10import os 11import importlib 12import pathlib 13import json 14import shutil 15import signal 16import time 17import traceback 18from functools import partial 19from datetime import datetime, timezone 20 21import meerschaum as mrsm 22from meerschaum.utils.typing import ( 23 Optional, Dict, Any, SuccessTuple, Callable, List, Union, 24 is_success_tuple, Tuple, 25) 26from meerschaum.config import get_config 27from meerschaum.config.static import STATIC_CONFIG 28from meerschaum.config._paths import ( 29 DAEMON_RESOURCES_PATH, LOGS_RESOURCES_PATH, DAEMON_ERROR_LOG_PATH, 30) 31from meerschaum.config._patch import apply_patch_to_config 32from meerschaum.utils.warnings import warn, error 33from meerschaum.utils.packages import attempt_import 34from meerschaum.utils.venv import venv_exec 35from meerschaum.utils.daemon._names import get_new_daemon_name 36from meerschaum.utils.daemon.RotatingFile import RotatingFile 37from meerschaum.utils.daemon.StdinFile import StdinFile 38from meerschaum.utils.threading import RepeatTimer 39from meerschaum.__main__ import _close_pools 40 41_daemons = [] 42_results = {} 43 44class Daemon: 45 """ 46 Daemonize Python functions into background processes. 47 48 Examples 49 -------- 50 >>> import meerschaum as mrsm 51 >>> from meerschaum.utils.daemons import Daemon 52 >>> daemon = Daemon(print, ('hi',)) 53 >>> success, msg = daemon.run() 54 >>> print(daemon.log_text) 55 56 2024-07-29 18:03 | hi 57 2024-07-29 18:03 | 58 >>> daemon.run(allow_dirty_run=True) 59 >>> print(daemon.log_text) 60 61 2024-07-29 18:03 | hi 62 2024-07-29 18:03 | 63 2024-07-29 18:05 | hi 64 2024-07-29 18:05 | 65 >>> mrsm.pprint(daemon.properties) 66 { 67 'label': 'print', 68 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 69 'result': None, 70 'process': {'ended': '2024-07-29T18:03:33.752806'} 71 } 72 73 """ 74 75 def __new__( 76 cls, 77 *args, 78 daemon_id: Optional[str] = None, 79 **kw 80 ): 81 """ 82 If a daemon_id is provided and already exists, read from its pickle file. 83 """ 84 instance = super(Daemon, cls).__new__(cls) 85 if daemon_id is not None: 86 instance.daemon_id = daemon_id 87 if instance.pickle_path.exists(): 88 instance = instance.read_pickle() 89 return instance 90 91 @classmethod 92 def from_properties_file(cls, daemon_id: str) -> Daemon: 93 """ 94 Return a Daemon from a properties dictionary. 95 """ 96 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 97 if not properties_path.exists(): 98 raise OSError(f"Properties file '{properties_path}' does not exist.") 99 100 try: 101 with open(properties_path, 'r', encoding='utf-8') as f: 102 properties = json.load(f) 103 except Exception: 104 properties = {} 105 106 if not properties: 107 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 108 109 daemon_id = properties_path.parent.name 110 target_cf = properties.get('target', {}) 111 target_module_name = target_cf.get('module', None) 112 target_function_name = target_cf.get('name', None) 113 target_args = target_cf.get('args', None) 114 target_kw = target_cf.get('kw', None) 115 label = properties.get('label', None) 116 117 if None in [ 118 target_module_name, 119 target_function_name, 120 target_args, 121 target_kw, 122 ]: 123 raise ValueError("Missing target function information.") 124 125 target_module = importlib.import_module(target_module_name) 126 target_function = getattr(target_module, target_function_name) 127 128 return Daemon( 129 daemon_id=daemon_id, 130 target=target_function, 131 target_args=target_args, 132 target_kw=target_kw, 133 properties=properties, 134 label=label, 135 ) 136 137 138 def __init__( 139 self, 140 target: Optional[Callable[[Any], Any]] = None, 141 target_args: Union[List[Any], Tuple[Any], None] = None, 142 target_kw: Optional[Dict[str, Any]] = None, 143 env: Optional[Dict[str, str]] = None, 144 daemon_id: Optional[str] = None, 145 label: Optional[str] = None, 146 properties: Optional[Dict[str, Any]] = None, 147 ): 148 """ 149 Parameters 150 ---------- 151 target: Optional[Callable[[Any], Any]], default None, 152 The function to execute in a child process. 153 154 target_args: Union[List[Any], Tuple[Any], None], default None 155 Positional arguments to pass to the target function. 156 157 target_kw: Optional[Dict[str, Any]], default None 158 Keyword arguments to pass to the target function. 159 160 env: Optional[Dict[str, str]], default None 161 If provided, set these environment variables in the daemon process. 162 163 daemon_id: Optional[str], default None 164 Build a `Daemon` from an existing `daemon_id`. 165 If `daemon_id` is provided, other arguments are ignored and are derived 166 from the existing pickled `Daemon`. 167 168 label: Optional[str], default None 169 Label string to help identifiy a daemon. 170 If `None`, use the function name instead. 171 172 properties: Optional[Dict[str, Any]], default None 173 Override reading from the properties JSON by providing an existing dictionary. 174 """ 175 _pickle = self.__dict__.get('_pickle', False) 176 if daemon_id is not None: 177 self.daemon_id = daemon_id 178 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 179 180 if not self.properties_path.exists(): 181 raise Exception( 182 f"Daemon '{self.daemon_id}' does not exist. " 183 + "Pass a target to create a new Daemon." 184 ) 185 186 try: 187 new_daemon = self.from_properties_file(daemon_id) 188 except Exception: 189 new_daemon = None 190 191 if new_daemon is not None: 192 new_daemon.write_pickle() 193 target = new_daemon.target 194 target_args = new_daemon.target_args 195 target_kw = new_daemon.target_kw 196 label = new_daemon.label 197 self._properties = new_daemon.properties 198 else: 199 try: 200 self.properties_path.unlink() 201 except Exception: 202 pass 203 204 raise Exception( 205 f"Could not recover daemon '{self.daemon_id}' " 206 + "from its properties file." 207 ) 208 209 if 'target' not in self.__dict__: 210 if target is None: 211 error("Cannot create a Daemon without a target.") 212 self.target = target 213 214 ### NOTE: We have to check self.__dict__ in case we un-pickling. 215 if '_target_args' not in self.__dict__: 216 self._target_args = target_args 217 if '_target_kw' not in self.__dict__: 218 self._target_kw = target_kw 219 220 if 'label' not in self.__dict__: 221 if label is None: 222 label = ( 223 self.target.__name__ if '__name__' in self.target.__dir__() 224 else str(self.target) 225 ) 226 self.label = label 227 if 'daemon_id' not in self.__dict__: 228 self.daemon_id = get_new_daemon_name() 229 if '_properties' not in self.__dict__: 230 self._properties = properties 231 if self._properties is None: 232 self._properties = {} 233 234 self._properties.update({'label': self.label}) 235 if env: 236 self._properties.update({'env': env}) 237 238 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 239 _ = self.process 240 241 242 def _run_exit( 243 self, 244 keep_daemon_output: bool = True, 245 allow_dirty_run: bool = False, 246 ) -> Any: 247 """Run the daemon's target function. 248 NOTE: This WILL EXIT the parent process! 249 250 Parameters 251 ---------- 252 keep_daemon_output: bool, default True 253 If `False`, delete the daemon's output directory upon exiting. 254 255 allow_dirty_run, bool, default False: 256 If `True`, run the daemon, even if the `daemon_id` directory exists. 257 This option is dangerous because if the same `daemon_id` runs twice, 258 the last to finish will overwrite the output of the first. 259 260 Returns 261 ------- 262 Nothing — this will exit the parent process. 263 """ 264 import platform 265 import sys 266 import os 267 import traceback 268 from meerschaum.utils.warnings import warn 269 from meerschaum.config import get_config 270 daemon = attempt_import('daemon') 271 lines = get_config('jobs', 'terminal', 'lines') 272 columns = get_config('jobs', 'terminal', 'columns') 273 274 if platform.system() == 'Windows': 275 return False, "Windows is no longer supported." 276 277 self._setup(allow_dirty_run) 278 279 ### NOTE: The SIGINT handler has been removed so that child processes may handle 280 ### KeyboardInterrupts themselves. 281 ### The previous aggressive approach was redundant because of the SIGTERM handler. 282 self._daemon_context = daemon.DaemonContext( 283 pidfile=self.pid_lock, 284 stdout=self.rotating_log, 285 stderr=self.rotating_log, 286 working_directory=os.getcwd(), 287 detach_process=True, 288 files_preserve=list(self.rotating_log.subfile_objects.values()), 289 signal_map={ 290 signal.SIGTERM: self._handle_sigterm, 291 }, 292 ) 293 294 _daemons.append(self) 295 296 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 297 self._log_refresh_timer = RepeatTimer( 298 log_refresh_seconds, 299 partial(self.rotating_log.refresh_files, start_interception=True), 300 ) 301 302 try: 303 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 304 with self._daemon_context: 305 sys.stdin = self.stdin_file 306 _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None) 307 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 308 os.environ['PYTHONUNBUFFERED'] = '1' 309 310 ### Allow the user to override environment variables. 311 env = self.properties.get('env', {}) 312 if env and isinstance(env, dict): 313 os.environ.update({str(k): str(v) for k, v in env.items()}) 314 315 self.rotating_log.refresh_files(start_interception=True) 316 result = None 317 try: 318 with open(self.pid_path, 'w+', encoding='utf-8') as f: 319 f.write(str(os.getpid())) 320 321 ### NOTE: The timer fails to start for remote actions to localhost. 322 try: 323 if not self._log_refresh_timer.is_running(): 324 self._log_refresh_timer.start() 325 except Exception: 326 pass 327 328 self.properties['result'] = None 329 self._capture_process_timestamp('began') 330 result = self.target(*self.target_args, **self.target_kw) 331 self.properties['result'] = result 332 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 333 result = False, traceback.format_exc() 334 except Exception as e: 335 warn( 336 f"Exception in daemon target function: {traceback.format_exc()}", 337 ) 338 result = False, str(e) 339 finally: 340 _results[self.daemon_id] = result 341 342 if keep_daemon_output: 343 self._capture_process_timestamp('ended') 344 else: 345 self.cleanup() 346 347 self._log_refresh_timer.cancel() 348 try: 349 if self.pid is None and self.pid_path.exists(): 350 self.pid_path.unlink() 351 except Exception: 352 pass 353 354 if is_success_tuple(result): 355 try: 356 mrsm.pprint(result) 357 except BrokenPipeError: 358 pass 359 360 except Exception: 361 daemon_error = traceback.format_exc() 362 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 363 f.write(daemon_error) 364 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 365 366 def _capture_process_timestamp( 367 self, 368 process_key: str, 369 write_properties: bool = True, 370 ) -> None: 371 """ 372 Record the current timestamp to the parameters `process:<process_key>`. 373 374 Parameters 375 ---------- 376 process_key: str 377 Under which key to store the timestamp. 378 379 write_properties: bool, default True 380 If `True` persist the properties to disk immediately after capturing the timestamp. 381 """ 382 if 'process' not in self.properties: 383 self.properties['process'] = {} 384 385 if process_key not in ('began', 'ended', 'paused', 'stopped'): 386 raise ValueError(f"Invalid key '{process_key}'.") 387 388 self.properties['process'][process_key] = ( 389 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 390 ) 391 if write_properties: 392 self.write_properties() 393 394 def run( 395 self, 396 keep_daemon_output: bool = True, 397 allow_dirty_run: bool = False, 398 debug: bool = False, 399 ) -> SuccessTuple: 400 """Run the daemon as a child process and continue executing the parent. 401 402 Parameters 403 ---------- 404 keep_daemon_output: bool, default True 405 If `False`, delete the daemon's output directory upon exiting. 406 407 allow_dirty_run: bool, default False 408 If `True`, run the daemon, even if the `daemon_id` directory exists. 409 This option is dangerous because if the same `daemon_id` runs concurrently, 410 the last to finish will overwrite the output of the first. 411 412 Returns 413 ------- 414 A SuccessTuple indicating success. 415 416 """ 417 import platform 418 if platform.system() == 'Windows': 419 return False, "Cannot run background jobs on Windows." 420 421 ### The daemon might exist and be paused. 422 if self.status == 'paused': 423 return self.resume() 424 425 self._remove_stop_file() 426 if self.status == 'running': 427 return True, f"Daemon '{self}' is already running." 428 429 self.mkdir_if_not_exists(allow_dirty_run) 430 _write_pickle_success_tuple = self.write_pickle() 431 if not _write_pickle_success_tuple[0]: 432 return _write_pickle_success_tuple 433 434 _launch_daemon_code = ( 435 "from meerschaum.utils.daemon import Daemon; " 436 + f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 437 + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 438 + "allow_dirty_run=True)" 439 ) 440 env = dict(os.environ) 441 env[STATIC_CONFIG['environment']['noninteractive']] = 'true' 442 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 443 msg = ( 444 "Success" 445 if _launch_success_bool 446 else f"Failed to start daemon '{self.daemon_id}'." 447 ) 448 return _launch_success_bool, msg 449 450 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 451 """ 452 Forcibly terminate a running daemon. 453 Sends a SIGTERM signal to the process. 454 455 Parameters 456 ---------- 457 timeout: Optional[int], default 3 458 How many seconds to wait for the process to terminate. 459 460 Returns 461 ------- 462 A SuccessTuple indicating success. 463 """ 464 if self.status != 'paused': 465 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 466 if success: 467 self._write_stop_file('kill') 468 return success, msg 469 470 if self.status == 'stopped': 471 self._write_stop_file('kill') 472 return True, "Process has already stopped." 473 474 psutil = attempt_import('psutil') 475 process = self.process 476 try: 477 process.terminate() 478 process.kill() 479 process.wait(timeout=timeout) 480 except Exception as e: 481 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 482 483 try: 484 if process.status(): 485 return False, "Failed to stop daemon '{self}' ({process})." 486 except psutil.NoSuchProcess: 487 pass 488 489 if self.pid_path.exists(): 490 try: 491 self.pid_path.unlink() 492 except Exception: 493 pass 494 495 self._write_stop_file('kill') 496 return True, "Success" 497 498 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 499 """Gracefully quit a running daemon.""" 500 if self.status == 'paused': 501 return self.kill(timeout) 502 503 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 504 if signal_success: 505 self._write_stop_file('quit') 506 return signal_success, signal_msg 507 508 def pause( 509 self, 510 timeout: Union[int, float, None] = None, 511 check_timeout_interval: Union[float, int, None] = None, 512 ) -> SuccessTuple: 513 """ 514 Pause the daemon if it is running. 515 516 Parameters 517 ---------- 518 timeout: Union[float, int, None], default None 519 The maximum number of seconds to wait for a process to suspend. 520 521 check_timeout_interval: Union[float, int, None], default None 522 The number of seconds to wait between checking if the process is still running. 523 524 Returns 525 ------- 526 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 527 """ 528 if self.process is None: 529 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 530 531 if self.status == 'paused': 532 return True, f"Daemon '{self.daemon_id}' is already paused." 533 534 self._write_stop_file('pause') 535 try: 536 self.process.suspend() 537 except Exception as e: 538 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 539 540 timeout = self.get_timeout_seconds(timeout) 541 check_timeout_interval = self.get_check_timeout_interval_seconds( 542 check_timeout_interval 543 ) 544 545 psutil = attempt_import('psutil') 546 547 if not timeout: 548 try: 549 success = self.process.status() == 'stopped' 550 except psutil.NoSuchProcess: 551 success = True 552 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 553 if success: 554 self._capture_process_timestamp('paused') 555 return success, msg 556 557 begin = time.perf_counter() 558 while (time.perf_counter() - begin) < timeout: 559 try: 560 if self.process.status() == 'stopped': 561 self._capture_process_timestamp('paused') 562 return True, "Success" 563 except psutil.NoSuchProcess as e: 564 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 565 time.sleep(check_timeout_interval) 566 567 return False, ( 568 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 569 + ('s' if timeout != 1 else '') + '.' 570 ) 571 572 def resume( 573 self, 574 timeout: Union[int, float, None] = None, 575 check_timeout_interval: Union[float, int, None] = None, 576 ) -> SuccessTuple: 577 """ 578 Resume the daemon if it is paused. 579 580 Parameters 581 ---------- 582 timeout: Union[float, int, None], default None 583 The maximum number of seconds to wait for a process to resume. 584 585 check_timeout_interval: Union[float, int, None], default None 586 The number of seconds to wait between checking if the process is still stopped. 587 588 Returns 589 ------- 590 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 591 """ 592 if self.status == 'running': 593 return True, f"Daemon '{self.daemon_id}' is already running." 594 595 if self.status == 'stopped': 596 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 597 598 self._remove_stop_file() 599 try: 600 self.process.resume() 601 except Exception as e: 602 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 603 604 timeout = self.get_timeout_seconds(timeout) 605 check_timeout_interval = self.get_check_timeout_interval_seconds( 606 check_timeout_interval 607 ) 608 609 if not timeout: 610 success = self.status == 'running' 611 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 612 if success: 613 self._capture_process_timestamp('began') 614 return success, msg 615 616 begin = time.perf_counter() 617 while (time.perf_counter() - begin) < timeout: 618 if self.status == 'running': 619 self._capture_process_timestamp('began') 620 return True, "Success" 621 time.sleep(check_timeout_interval) 622 623 return False, ( 624 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 625 + ('s' if timeout != 1 else '') + '.' 626 ) 627 628 def _write_stop_file(self, action: str) -> SuccessTuple: 629 """Write the stop file timestamp and action.""" 630 if action not in ('quit', 'kill', 'pause'): 631 return False, f"Unsupported action '{action}'." 632 633 if not self.stop_path.parent.exists(): 634 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 635 636 with open(self.stop_path, 'w+', encoding='utf-8') as f: 637 json.dump( 638 { 639 'stop_time': datetime.now(timezone.utc).isoformat(), 640 'action': action, 641 }, 642 f 643 ) 644 645 return True, "Success" 646 647 def _remove_stop_file(self) -> SuccessTuple: 648 """Remove the stop file""" 649 if not self.stop_path.exists(): 650 return True, "Stop file does not exist." 651 652 try: 653 self.stop_path.unlink() 654 except Exception as e: 655 return False, f"Failed to remove stop file:\n{e}" 656 657 return True, "Success" 658 659 def _read_stop_file(self) -> Dict[str, Any]: 660 """ 661 Read the stop file if it exists. 662 """ 663 if not self.stop_path.exists(): 664 return {} 665 666 try: 667 with open(self.stop_path, 'r', encoding='utf-8') as f: 668 data = json.load(f) 669 return data 670 except Exception: 671 return {} 672 673 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 674 """ 675 Handle `SIGTERM` within the `Daemon` context. 676 This method is injected into the `DaemonContext`. 677 """ 678 from meerschaum.utils.process import signal_handler 679 signal_handler(signal_number, stack_frame) 680 681 timer = self.__dict__.get('_log_refresh_timer', None) 682 if timer is not None: 683 timer.cancel() 684 685 daemon_context = self.__dict__.get('_daemon_context', None) 686 if daemon_context is not None: 687 daemon_context.close() 688 689 _close_pools() 690 raise SystemExit(0) 691 692 def _send_signal( 693 self, 694 signal_to_send, 695 timeout: Union[float, int, None] = None, 696 check_timeout_interval: Union[float, int, None] = None, 697 ) -> SuccessTuple: 698 """Send a signal to the daemon process. 699 700 Parameters 701 ---------- 702 signal_to_send: 703 The signal the send to the daemon, e.g. `signals.SIGINT`. 704 705 timeout: Union[float, int, None], default None 706 The maximum number of seconds to wait for a process to terminate. 707 708 check_timeout_interval: Union[float, int, None], default None 709 The number of seconds to wait between checking if the process is still running. 710 711 Returns 712 ------- 713 A SuccessTuple indicating success. 714 """ 715 try: 716 pid = self.pid 717 if pid is None: 718 return ( 719 False, 720 f"Daemon '{self.daemon_id}' is not running, " 721 + f"cannot send signal '{signal_to_send}'." 722 ) 723 724 os.kill(pid, signal_to_send) 725 except Exception: 726 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 727 728 timeout = self.get_timeout_seconds(timeout) 729 check_timeout_interval = self.get_check_timeout_interval_seconds( 730 check_timeout_interval 731 ) 732 733 if not timeout: 734 return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'." 735 736 begin = time.perf_counter() 737 while (time.perf_counter() - begin) < timeout: 738 if not self.status == 'running': 739 return True, "Success" 740 time.sleep(check_timeout_interval) 741 742 return False, ( 743 f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second" 744 + ('s' if timeout != 1 else '') + '.' 745 ) 746 747 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 748 """Create the Daemon's directory. 749 If `allow_dirty_run` is `False` and the directory already exists, 750 raise a `FileExistsError`. 751 """ 752 try: 753 self.path.mkdir(parents=True, exist_ok=True) 754 _already_exists = any(os.scandir(self.path)) 755 except FileExistsError: 756 _already_exists = True 757 758 if _already_exists and not allow_dirty_run: 759 error( 760 f"Daemon '{self.daemon_id}' already exists. " + 761 "To allow this daemon to run, do one of the following:\n" 762 + " - Execute `daemon.cleanup()`.\n" 763 + f" - Delete the directory '{self.path}'.\n" 764 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 765 FileExistsError, 766 ) 767 768 @property 769 def process(self) -> Union['psutil.Process', None]: 770 """ 771 Return the psutil process for the Daemon. 772 """ 773 psutil = attempt_import('psutil') 774 pid = self.pid 775 if pid is None: 776 return None 777 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 778 try: 779 self._process = psutil.Process(int(pid)) 780 process_exists = True 781 except Exception: 782 process_exists = False 783 if not process_exists: 784 _ = self.__dict__.pop('_process', None) 785 try: 786 if self.pid_path.exists(): 787 self.pid_path.unlink() 788 except Exception: 789 pass 790 return None 791 return self._process 792 793 @property 794 def status(self) -> str: 795 """ 796 Return the running status of this Daemon. 797 """ 798 if self.process is None: 799 return 'stopped' 800 801 psutil = attempt_import('psutil') 802 try: 803 if self.process.status() == 'stopped': 804 return 'paused' 805 if self.process.status() == 'zombie': 806 raise psutil.NoSuchProcess(self.process.pid) 807 except (psutil.NoSuchProcess, AttributeError): 808 if self.pid_path.exists(): 809 try: 810 self.pid_path.unlink() 811 except Exception: 812 pass 813 return 'stopped' 814 815 return 'running' 816 817 @classmethod 818 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 819 """ 820 Return a Daemon's path from its `daemon_id`. 821 """ 822 return DAEMON_RESOURCES_PATH / daemon_id 823 824 @property 825 def path(self) -> pathlib.Path: 826 """ 827 Return the path for this Daemon's directory. 828 """ 829 return self._get_path_from_daemon_id(self.daemon_id) 830 831 @classmethod 832 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 833 """ 834 Return the `properties.json` path for a given `daemon_id`. 835 """ 836 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 837 838 @property 839 def properties_path(self) -> pathlib.Path: 840 """ 841 Return the `propterties.json` path for this Daemon. 842 """ 843 return self._get_properties_path_from_daemon_id(self.daemon_id) 844 845 @property 846 def stop_path(self) -> pathlib.Path: 847 """ 848 Return the path for the stop file (created when manually stopped). 849 """ 850 return self.path / '.stop.json' 851 852 @property 853 def log_path(self) -> pathlib.Path: 854 """ 855 Return the log path. 856 """ 857 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 858 859 @property 860 def stdin_file_path(self) -> pathlib.Path: 861 """ 862 Return the stdin file path. 863 """ 864 return self.path / 'input.stdin' 865 866 @property 867 def blocking_stdin_file_path(self) -> pathlib.Path: 868 """ 869 Return the stdin file path. 870 """ 871 if '_blocking_stdin_file_path' in self.__dict__: 872 return self._blocking_stdin_file_path 873 874 return self.path / 'input.stdin.block' 875 876 @property 877 def log_offset_path(self) -> pathlib.Path: 878 """ 879 Return the log offset file path. 880 """ 881 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 882 883 @property 884 def rotating_log(self) -> RotatingFile: 885 """ 886 The rotating log file for the daemon's output. 887 """ 888 if '_rotating_log' in self.__dict__: 889 return self._rotating_log 890 891 write_timestamps = ( 892 self.properties.get('logs', {}).get('write_timestamps', None) 893 ) 894 if write_timestamps is None: 895 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 896 897 self._rotating_log = RotatingFile( 898 self.log_path, 899 redirect_streams=True, 900 write_timestamps=write_timestamps, 901 timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'), 902 ) 903 return self._rotating_log 904 905 @property 906 def stdin_file(self): 907 """ 908 Return the file handler for the stdin file. 909 """ 910 if (stdin_file := self.__dict__.get('_stdin_file', None)): 911 return stdin_file 912 913 self._stdin_file = StdinFile( 914 self.stdin_file_path, 915 lock_file_path=self.blocking_stdin_file_path, 916 ) 917 return self._stdin_file 918 919 @property 920 def log_text(self) -> Optional[str]: 921 """ 922 Read the log files and return their contents. 923 Returns `None` if the log file does not exist. 924 """ 925 new_rotating_log = RotatingFile( 926 self.rotating_log.file_path, 927 num_files_to_keep = self.rotating_log.num_files_to_keep, 928 max_file_size = self.rotating_log.max_file_size, 929 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'), 930 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'), 931 ) 932 return new_rotating_log.read() 933 934 def readlines(self) -> List[str]: 935 """ 936 Read the next log lines, persisting the cursor for later use. 937 Note this will alter the cursor of `self.rotating_log`. 938 """ 939 self.rotating_log._cursor = self._read_log_offset() 940 lines = self.rotating_log.readlines() 941 self._write_log_offset() 942 return lines 943 944 def _read_log_offset(self) -> Tuple[int, int]: 945 """ 946 Return the current log offset cursor. 947 948 Returns 949 ------- 950 A tuple of the form (`subfile_index`, `position`). 951 """ 952 if not self.log_offset_path.exists(): 953 return 0, 0 954 955 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 956 cursor_text = f.read() 957 cursor_parts = cursor_text.split(' ') 958 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 959 return subfile_index, subfile_position 960 961 def _write_log_offset(self) -> None: 962 """ 963 Write the current log offset file. 964 """ 965 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 966 subfile_index = self.rotating_log._cursor[0] 967 subfile_position = self.rotating_log._cursor[1] 968 f.write(f"{subfile_index} {subfile_position}") 969 970 @property 971 def pid(self) -> Union[int, None]: 972 """ 973 Read the PID file and return its contents. 974 Returns `None` if the PID file does not exist. 975 """ 976 if not self.pid_path.exists(): 977 return None 978 try: 979 with open(self.pid_path, 'r', encoding='utf-8') as f: 980 text = f.read() 981 if len(text) == 0: 982 return None 983 pid = int(text.rstrip()) 984 except Exception as e: 985 warn(e) 986 text = None 987 pid = None 988 return pid 989 990 @property 991 def pid_path(self) -> pathlib.Path: 992 """ 993 Return the path to a file containing the PID for this Daemon. 994 """ 995 return self.path / 'process.pid' 996 997 @property 998 def pid_lock(self) -> 'fasteners.InterProcessLock': 999 """ 1000 Return the process lock context manager. 1001 """ 1002 if '_pid_lock' in self.__dict__: 1003 return self._pid_lock 1004 1005 fasteners = attempt_import('fasteners') 1006 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1007 return self._pid_lock 1008 1009 @property 1010 def pickle_path(self) -> pathlib.Path: 1011 """ 1012 Return the path for the pickle file. 1013 """ 1014 return self.path / 'pickle.pkl' 1015 1016 def read_properties(self) -> Optional[Dict[str, Any]]: 1017 """Read the properties JSON file and return the dictionary.""" 1018 if not self.properties_path.exists(): 1019 return None 1020 try: 1021 with open(self.properties_path, 'r', encoding='utf-8') as file: 1022 properties = json.load(file) 1023 except Exception: 1024 properties = {} 1025 1026 return properties or {} 1027 1028 def read_pickle(self) -> Daemon: 1029 """Read a Daemon's pickle file and return the `Daemon`.""" 1030 import pickle 1031 import traceback 1032 if not self.pickle_path.exists(): 1033 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1034 1035 if self.pickle_path.stat().st_size == 0: 1036 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1037 1038 try: 1039 with open(self.pickle_path, 'rb') as pickle_file: 1040 daemon = pickle.load(pickle_file) 1041 success, msg = True, 'Success' 1042 except Exception as e: 1043 success, msg = False, str(e) 1044 daemon = None 1045 traceback.print_exception(type(e), e, e.__traceback__) 1046 if not success: 1047 error(msg) 1048 return daemon 1049 1050 @property 1051 def properties(self) -> Dict[str, Any]: 1052 """ 1053 Return the contents of the properties JSON file. 1054 """ 1055 try: 1056 _file_properties = self.read_properties() or {} 1057 except Exception: 1058 traceback.print_exc() 1059 _file_properties = {} 1060 1061 if not self._properties: 1062 self._properties = _file_properties 1063 1064 if self._properties is None: 1065 self._properties = {} 1066 1067 if ( 1068 self._properties.get('result', None) is None 1069 and _file_properties.get('result', None) is not None 1070 ): 1071 _ = self._properties.pop('result', None) 1072 1073 if _file_properties is not None: 1074 self._properties = apply_patch_to_config( 1075 _file_properties, 1076 self._properties, 1077 ) 1078 1079 return self._properties 1080 1081 @property 1082 def hidden(self) -> bool: 1083 """ 1084 Return a bool indicating whether this Daemon should be displayed. 1085 """ 1086 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1087 1088 def write_properties(self) -> SuccessTuple: 1089 """Write the properties dictionary to the properties JSON file 1090 (only if self.properties exists). 1091 """ 1092 success, msg = ( 1093 False, 1094 f"No properties to write for daemon '{self.daemon_id}'." 1095 ) 1096 if self.properties is not None: 1097 try: 1098 self.path.mkdir(parents=True, exist_ok=True) 1099 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1100 json.dump(self.properties, properties_file) 1101 success, msg = True, 'Success' 1102 except Exception as e: 1103 success, msg = False, str(e) 1104 return success, msg 1105 1106 def write_pickle(self) -> SuccessTuple: 1107 """Write the pickle file for the daemon.""" 1108 import pickle 1109 import traceback 1110 try: 1111 self.path.mkdir(parents=True, exist_ok=True) 1112 with open(self.pickle_path, 'wb+') as pickle_file: 1113 pickle.dump(self, pickle_file) 1114 success, msg = True, "Success" 1115 except Exception as e: 1116 success, msg = False, str(e) 1117 traceback.print_exception(type(e), e, e.__traceback__) 1118 return success, msg 1119 1120 1121 def _setup( 1122 self, 1123 allow_dirty_run: bool = False, 1124 ) -> None: 1125 """ 1126 Update properties before starting the Daemon. 1127 """ 1128 if self.properties is None: 1129 self._properties = {} 1130 1131 self._properties.update({ 1132 'target': { 1133 'name': self.target.__name__, 1134 'module': self.target.__module__, 1135 'args': self.target_args, 1136 'kw': self.target_kw, 1137 }, 1138 }) 1139 self.mkdir_if_not_exists(allow_dirty_run) 1140 _write_properties_success_tuple = self.write_properties() 1141 if not _write_properties_success_tuple[0]: 1142 error(_write_properties_success_tuple[1]) 1143 1144 _write_pickle_success_tuple = self.write_pickle() 1145 if not _write_pickle_success_tuple[0]: 1146 error(_write_pickle_success_tuple[1]) 1147 1148 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1149 """ 1150 Remove a daemon's directory after execution. 1151 1152 Parameters 1153 ---------- 1154 keep_logs: bool, default False 1155 If `True`, skip deleting the daemon's log files. 1156 1157 Returns 1158 ------- 1159 A `SuccessTuple` indicating success. 1160 """ 1161 if self.path.exists(): 1162 try: 1163 shutil.rmtree(self.path) 1164 except Exception as e: 1165 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1166 warn(msg) 1167 return False, msg 1168 if not keep_logs: 1169 self.rotating_log.delete() 1170 try: 1171 if self.log_offset_path.exists(): 1172 self.log_offset_path.unlink() 1173 except Exception as e: 1174 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1175 warn(msg) 1176 return False, msg 1177 return True, "Success" 1178 1179 1180 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1181 """ 1182 Return the timeout value to use. Use `--timeout-seconds` if provided, 1183 else the configured default (8). 1184 """ 1185 if isinstance(timeout, (int, float)): 1186 return timeout 1187 return get_config('jobs', 'timeout_seconds') 1188 1189 1190 def get_check_timeout_interval_seconds( 1191 self, 1192 check_timeout_interval: Union[int, float, None] = None, 1193 ) -> Union[int, float]: 1194 """ 1195 Return the interval value to check the status of timeouts. 1196 """ 1197 if isinstance(check_timeout_interval, (int, float)): 1198 return check_timeout_interval 1199 return get_config('jobs', 'check_timeout_interval_seconds') 1200 1201 @property 1202 def target_args(self) -> Union[Tuple[Any], None]: 1203 """ 1204 Return the positional arguments to pass to the target function. 1205 """ 1206 target_args = ( 1207 self.__dict__.get('_target_args', None) 1208 or self.properties.get('target', {}).get('args', None) 1209 ) 1210 if target_args is None: 1211 return tuple([]) 1212 1213 return tuple(target_args) 1214 1215 @property 1216 def target_kw(self) -> Union[Dict[str, Any], None]: 1217 """ 1218 Return the keyword arguments to pass to the target function. 1219 """ 1220 target_kw = ( 1221 self.__dict__.get('_target_kw', None) 1222 or self.properties.get('target', {}).get('kw', None) 1223 ) 1224 if target_kw is None: 1225 return {} 1226 1227 return {key: val for key, val in target_kw.items()} 1228 1229 def __getstate__(self): 1230 """ 1231 Pickle this Daemon. 1232 """ 1233 dill = attempt_import('dill') 1234 return { 1235 'target': dill.dumps(self.target), 1236 'target_args': self.target_args, 1237 'target_kw': self.target_kw, 1238 'daemon_id': self.daemon_id, 1239 'label': self.label, 1240 'properties': self.properties, 1241 } 1242 1243 def __setstate__(self, _state: Dict[str, Any]): 1244 """ 1245 Restore this Daemon from a pickled state. 1246 If the properties file exists, skip the old pickled version. 1247 """ 1248 dill = attempt_import('dill') 1249 _state['target'] = dill.loads(_state['target']) 1250 self._pickle = True 1251 daemon_id = _state.get('daemon_id', None) 1252 if not daemon_id: 1253 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1254 1255 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1256 ignore_properties = properties_path.exists() 1257 if ignore_properties: 1258 _state = { 1259 key: val 1260 for key, val in _state.items() 1261 if key != 'properties' 1262 } 1263 self.__init__(**_state) 1264 1265 1266 def __repr__(self): 1267 return str(self) 1268 1269 def __str__(self): 1270 return self.daemon_id 1271 1272 def __eq__(self, other): 1273 if not isinstance(other, Daemon): 1274 return False 1275 return self.daemon_id == other.daemon_id 1276 1277 def __hash__(self): 1278 return hash(self.daemon_id)
45class Daemon: 46 """ 47 Daemonize Python functions into background processes. 48 49 Examples 50 -------- 51 >>> import meerschaum as mrsm 52 >>> from meerschaum.utils.daemons import Daemon 53 >>> daemon = Daemon(print, ('hi',)) 54 >>> success, msg = daemon.run() 55 >>> print(daemon.log_text) 56 57 2024-07-29 18:03 | hi 58 2024-07-29 18:03 | 59 >>> daemon.run(allow_dirty_run=True) 60 >>> print(daemon.log_text) 61 62 2024-07-29 18:03 | hi 63 2024-07-29 18:03 | 64 2024-07-29 18:05 | hi 65 2024-07-29 18:05 | 66 >>> mrsm.pprint(daemon.properties) 67 { 68 'label': 'print', 69 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 70 'result': None, 71 'process': {'ended': '2024-07-29T18:03:33.752806'} 72 } 73 74 """ 75 76 def __new__( 77 cls, 78 *args, 79 daemon_id: Optional[str] = None, 80 **kw 81 ): 82 """ 83 If a daemon_id is provided and already exists, read from its pickle file. 84 """ 85 instance = super(Daemon, cls).__new__(cls) 86 if daemon_id is not None: 87 instance.daemon_id = daemon_id 88 if instance.pickle_path.exists(): 89 instance = instance.read_pickle() 90 return instance 91 92 @classmethod 93 def from_properties_file(cls, daemon_id: str) -> Daemon: 94 """ 95 Return a Daemon from a properties dictionary. 96 """ 97 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 98 if not properties_path.exists(): 99 raise OSError(f"Properties file '{properties_path}' does not exist.") 100 101 try: 102 with open(properties_path, 'r', encoding='utf-8') as f: 103 properties = json.load(f) 104 except Exception: 105 properties = {} 106 107 if not properties: 108 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 109 110 daemon_id = properties_path.parent.name 111 target_cf = properties.get('target', {}) 112 target_module_name = target_cf.get('module', None) 113 target_function_name = target_cf.get('name', None) 114 target_args = target_cf.get('args', None) 115 target_kw = target_cf.get('kw', None) 116 label = properties.get('label', None) 117 118 if None in [ 119 target_module_name, 120 target_function_name, 121 target_args, 122 target_kw, 123 ]: 124 raise ValueError("Missing target function information.") 125 126 target_module = importlib.import_module(target_module_name) 127 target_function = getattr(target_module, target_function_name) 128 129 return Daemon( 130 daemon_id=daemon_id, 131 target=target_function, 132 target_args=target_args, 133 target_kw=target_kw, 134 properties=properties, 135 label=label, 136 ) 137 138 139 def __init__( 140 self, 141 target: Optional[Callable[[Any], Any]] = None, 142 target_args: Union[List[Any], Tuple[Any], None] = None, 143 target_kw: Optional[Dict[str, Any]] = None, 144 env: Optional[Dict[str, str]] = None, 145 daemon_id: Optional[str] = None, 146 label: Optional[str] = None, 147 properties: Optional[Dict[str, Any]] = None, 148 ): 149 """ 150 Parameters 151 ---------- 152 target: Optional[Callable[[Any], Any]], default None, 153 The function to execute in a child process. 154 155 target_args: Union[List[Any], Tuple[Any], None], default None 156 Positional arguments to pass to the target function. 157 158 target_kw: Optional[Dict[str, Any]], default None 159 Keyword arguments to pass to the target function. 160 161 env: Optional[Dict[str, str]], default None 162 If provided, set these environment variables in the daemon process. 163 164 daemon_id: Optional[str], default None 165 Build a `Daemon` from an existing `daemon_id`. 166 If `daemon_id` is provided, other arguments are ignored and are derived 167 from the existing pickled `Daemon`. 168 169 label: Optional[str], default None 170 Label string to help identifiy a daemon. 171 If `None`, use the function name instead. 172 173 properties: Optional[Dict[str, Any]], default None 174 Override reading from the properties JSON by providing an existing dictionary. 175 """ 176 _pickle = self.__dict__.get('_pickle', False) 177 if daemon_id is not None: 178 self.daemon_id = daemon_id 179 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 180 181 if not self.properties_path.exists(): 182 raise Exception( 183 f"Daemon '{self.daemon_id}' does not exist. " 184 + "Pass a target to create a new Daemon." 185 ) 186 187 try: 188 new_daemon = self.from_properties_file(daemon_id) 189 except Exception: 190 new_daemon = None 191 192 if new_daemon is not None: 193 new_daemon.write_pickle() 194 target = new_daemon.target 195 target_args = new_daemon.target_args 196 target_kw = new_daemon.target_kw 197 label = new_daemon.label 198 self._properties = new_daemon.properties 199 else: 200 try: 201 self.properties_path.unlink() 202 except Exception: 203 pass 204 205 raise Exception( 206 f"Could not recover daemon '{self.daemon_id}' " 207 + "from its properties file." 208 ) 209 210 if 'target' not in self.__dict__: 211 if target is None: 212 error("Cannot create a Daemon without a target.") 213 self.target = target 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 if 'daemon_id' not in self.__dict__: 229 self.daemon_id = get_new_daemon_name() 230 if '_properties' not in self.__dict__: 231 self._properties = properties 232 if self._properties is None: 233 self._properties = {} 234 235 self._properties.update({'label': self.label}) 236 if env: 237 self._properties.update({'env': env}) 238 239 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 240 _ = self.process 241 242 243 def _run_exit( 244 self, 245 keep_daemon_output: bool = True, 246 allow_dirty_run: bool = False, 247 ) -> Any: 248 """Run the daemon's target function. 249 NOTE: This WILL EXIT the parent process! 250 251 Parameters 252 ---------- 253 keep_daemon_output: bool, default True 254 If `False`, delete the daemon's output directory upon exiting. 255 256 allow_dirty_run, bool, default False: 257 If `True`, run the daemon, even if the `daemon_id` directory exists. 258 This option is dangerous because if the same `daemon_id` runs twice, 259 the last to finish will overwrite the output of the first. 260 261 Returns 262 ------- 263 Nothing — this will exit the parent process. 264 """ 265 import platform 266 import sys 267 import os 268 import traceback 269 from meerschaum.utils.warnings import warn 270 from meerschaum.config import get_config 271 daemon = attempt_import('daemon') 272 lines = get_config('jobs', 'terminal', 'lines') 273 columns = get_config('jobs', 'terminal', 'columns') 274 275 if platform.system() == 'Windows': 276 return False, "Windows is no longer supported." 277 278 self._setup(allow_dirty_run) 279 280 ### NOTE: The SIGINT handler has been removed so that child processes may handle 281 ### KeyboardInterrupts themselves. 282 ### The previous aggressive approach was redundant because of the SIGTERM handler. 283 self._daemon_context = daemon.DaemonContext( 284 pidfile=self.pid_lock, 285 stdout=self.rotating_log, 286 stderr=self.rotating_log, 287 working_directory=os.getcwd(), 288 detach_process=True, 289 files_preserve=list(self.rotating_log.subfile_objects.values()), 290 signal_map={ 291 signal.SIGTERM: self._handle_sigterm, 292 }, 293 ) 294 295 _daemons.append(self) 296 297 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 298 self._log_refresh_timer = RepeatTimer( 299 log_refresh_seconds, 300 partial(self.rotating_log.refresh_files, start_interception=True), 301 ) 302 303 try: 304 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 305 with self._daemon_context: 306 sys.stdin = self.stdin_file 307 _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None) 308 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 309 os.environ['PYTHONUNBUFFERED'] = '1' 310 311 ### Allow the user to override environment variables. 312 env = self.properties.get('env', {}) 313 if env and isinstance(env, dict): 314 os.environ.update({str(k): str(v) for k, v in env.items()}) 315 316 self.rotating_log.refresh_files(start_interception=True) 317 result = None 318 try: 319 with open(self.pid_path, 'w+', encoding='utf-8') as f: 320 f.write(str(os.getpid())) 321 322 ### NOTE: The timer fails to start for remote actions to localhost. 323 try: 324 if not self._log_refresh_timer.is_running(): 325 self._log_refresh_timer.start() 326 except Exception: 327 pass 328 329 self.properties['result'] = None 330 self._capture_process_timestamp('began') 331 result = self.target(*self.target_args, **self.target_kw) 332 self.properties['result'] = result 333 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 334 result = False, traceback.format_exc() 335 except Exception as e: 336 warn( 337 f"Exception in daemon target function: {traceback.format_exc()}", 338 ) 339 result = False, str(e) 340 finally: 341 _results[self.daemon_id] = result 342 343 if keep_daemon_output: 344 self._capture_process_timestamp('ended') 345 else: 346 self.cleanup() 347 348 self._log_refresh_timer.cancel() 349 try: 350 if self.pid is None and self.pid_path.exists(): 351 self.pid_path.unlink() 352 except Exception: 353 pass 354 355 if is_success_tuple(result): 356 try: 357 mrsm.pprint(result) 358 except BrokenPipeError: 359 pass 360 361 except Exception: 362 daemon_error = traceback.format_exc() 363 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 364 f.write(daemon_error) 365 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 366 367 def _capture_process_timestamp( 368 self, 369 process_key: str, 370 write_properties: bool = True, 371 ) -> None: 372 """ 373 Record the current timestamp to the parameters `process:<process_key>`. 374 375 Parameters 376 ---------- 377 process_key: str 378 Under which key to store the timestamp. 379 380 write_properties: bool, default True 381 If `True` persist the properties to disk immediately after capturing the timestamp. 382 """ 383 if 'process' not in self.properties: 384 self.properties['process'] = {} 385 386 if process_key not in ('began', 'ended', 'paused', 'stopped'): 387 raise ValueError(f"Invalid key '{process_key}'.") 388 389 self.properties['process'][process_key] = ( 390 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 391 ) 392 if write_properties: 393 self.write_properties() 394 395 def run( 396 self, 397 keep_daemon_output: bool = True, 398 allow_dirty_run: bool = False, 399 debug: bool = False, 400 ) -> SuccessTuple: 401 """Run the daemon as a child process and continue executing the parent. 402 403 Parameters 404 ---------- 405 keep_daemon_output: bool, default True 406 If `False`, delete the daemon's output directory upon exiting. 407 408 allow_dirty_run: bool, default False 409 If `True`, run the daemon, even if the `daemon_id` directory exists. 410 This option is dangerous because if the same `daemon_id` runs concurrently, 411 the last to finish will overwrite the output of the first. 412 413 Returns 414 ------- 415 A SuccessTuple indicating success. 416 417 """ 418 import platform 419 if platform.system() == 'Windows': 420 return False, "Cannot run background jobs on Windows." 421 422 ### The daemon might exist and be paused. 423 if self.status == 'paused': 424 return self.resume() 425 426 self._remove_stop_file() 427 if self.status == 'running': 428 return True, f"Daemon '{self}' is already running." 429 430 self.mkdir_if_not_exists(allow_dirty_run) 431 _write_pickle_success_tuple = self.write_pickle() 432 if not _write_pickle_success_tuple[0]: 433 return _write_pickle_success_tuple 434 435 _launch_daemon_code = ( 436 "from meerschaum.utils.daemon import Daemon; " 437 + f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 438 + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 439 + "allow_dirty_run=True)" 440 ) 441 env = dict(os.environ) 442 env[STATIC_CONFIG['environment']['noninteractive']] = 'true' 443 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 444 msg = ( 445 "Success" 446 if _launch_success_bool 447 else f"Failed to start daemon '{self.daemon_id}'." 448 ) 449 return _launch_success_bool, msg 450 451 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 452 """ 453 Forcibly terminate a running daemon. 454 Sends a SIGTERM signal to the process. 455 456 Parameters 457 ---------- 458 timeout: Optional[int], default 3 459 How many seconds to wait for the process to terminate. 460 461 Returns 462 ------- 463 A SuccessTuple indicating success. 464 """ 465 if self.status != 'paused': 466 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 467 if success: 468 self._write_stop_file('kill') 469 return success, msg 470 471 if self.status == 'stopped': 472 self._write_stop_file('kill') 473 return True, "Process has already stopped." 474 475 psutil = attempt_import('psutil') 476 process = self.process 477 try: 478 process.terminate() 479 process.kill() 480 process.wait(timeout=timeout) 481 except Exception as e: 482 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 483 484 try: 485 if process.status(): 486 return False, "Failed to stop daemon '{self}' ({process})." 487 except psutil.NoSuchProcess: 488 pass 489 490 if self.pid_path.exists(): 491 try: 492 self.pid_path.unlink() 493 except Exception: 494 pass 495 496 self._write_stop_file('kill') 497 return True, "Success" 498 499 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 500 """Gracefully quit a running daemon.""" 501 if self.status == 'paused': 502 return self.kill(timeout) 503 504 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 505 if signal_success: 506 self._write_stop_file('quit') 507 return signal_success, signal_msg 508 509 def pause( 510 self, 511 timeout: Union[int, float, None] = None, 512 check_timeout_interval: Union[float, int, None] = None, 513 ) -> SuccessTuple: 514 """ 515 Pause the daemon if it is running. 516 517 Parameters 518 ---------- 519 timeout: Union[float, int, None], default None 520 The maximum number of seconds to wait for a process to suspend. 521 522 check_timeout_interval: Union[float, int, None], default None 523 The number of seconds to wait between checking if the process is still running. 524 525 Returns 526 ------- 527 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 528 """ 529 if self.process is None: 530 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 531 532 if self.status == 'paused': 533 return True, f"Daemon '{self.daemon_id}' is already paused." 534 535 self._write_stop_file('pause') 536 try: 537 self.process.suspend() 538 except Exception as e: 539 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 540 541 timeout = self.get_timeout_seconds(timeout) 542 check_timeout_interval = self.get_check_timeout_interval_seconds( 543 check_timeout_interval 544 ) 545 546 psutil = attempt_import('psutil') 547 548 if not timeout: 549 try: 550 success = self.process.status() == 'stopped' 551 except psutil.NoSuchProcess: 552 success = True 553 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 554 if success: 555 self._capture_process_timestamp('paused') 556 return success, msg 557 558 begin = time.perf_counter() 559 while (time.perf_counter() - begin) < timeout: 560 try: 561 if self.process.status() == 'stopped': 562 self._capture_process_timestamp('paused') 563 return True, "Success" 564 except psutil.NoSuchProcess as e: 565 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 566 time.sleep(check_timeout_interval) 567 568 return False, ( 569 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 570 + ('s' if timeout != 1 else '') + '.' 571 ) 572 573 def resume( 574 self, 575 timeout: Union[int, float, None] = None, 576 check_timeout_interval: Union[float, int, None] = None, 577 ) -> SuccessTuple: 578 """ 579 Resume the daemon if it is paused. 580 581 Parameters 582 ---------- 583 timeout: Union[float, int, None], default None 584 The maximum number of seconds to wait for a process to resume. 585 586 check_timeout_interval: Union[float, int, None], default None 587 The number of seconds to wait between checking if the process is still stopped. 588 589 Returns 590 ------- 591 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 592 """ 593 if self.status == 'running': 594 return True, f"Daemon '{self.daemon_id}' is already running." 595 596 if self.status == 'stopped': 597 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 598 599 self._remove_stop_file() 600 try: 601 self.process.resume() 602 except Exception as e: 603 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 604 605 timeout = self.get_timeout_seconds(timeout) 606 check_timeout_interval = self.get_check_timeout_interval_seconds( 607 check_timeout_interval 608 ) 609 610 if not timeout: 611 success = self.status == 'running' 612 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 613 if success: 614 self._capture_process_timestamp('began') 615 return success, msg 616 617 begin = time.perf_counter() 618 while (time.perf_counter() - begin) < timeout: 619 if self.status == 'running': 620 self._capture_process_timestamp('began') 621 return True, "Success" 622 time.sleep(check_timeout_interval) 623 624 return False, ( 625 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 626 + ('s' if timeout != 1 else '') + '.' 627 ) 628 629 def _write_stop_file(self, action: str) -> SuccessTuple: 630 """Write the stop file timestamp and action.""" 631 if action not in ('quit', 'kill', 'pause'): 632 return False, f"Unsupported action '{action}'." 633 634 if not self.stop_path.parent.exists(): 635 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 636 637 with open(self.stop_path, 'w+', encoding='utf-8') as f: 638 json.dump( 639 { 640 'stop_time': datetime.now(timezone.utc).isoformat(), 641 'action': action, 642 }, 643 f 644 ) 645 646 return True, "Success" 647 648 def _remove_stop_file(self) -> SuccessTuple: 649 """Remove the stop file""" 650 if not self.stop_path.exists(): 651 return True, "Stop file does not exist." 652 653 try: 654 self.stop_path.unlink() 655 except Exception as e: 656 return False, f"Failed to remove stop file:\n{e}" 657 658 return True, "Success" 659 660 def _read_stop_file(self) -> Dict[str, Any]: 661 """ 662 Read the stop file if it exists. 663 """ 664 if not self.stop_path.exists(): 665 return {} 666 667 try: 668 with open(self.stop_path, 'r', encoding='utf-8') as f: 669 data = json.load(f) 670 return data 671 except Exception: 672 return {} 673 674 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 675 """ 676 Handle `SIGTERM` within the `Daemon` context. 677 This method is injected into the `DaemonContext`. 678 """ 679 from meerschaum.utils.process import signal_handler 680 signal_handler(signal_number, stack_frame) 681 682 timer = self.__dict__.get('_log_refresh_timer', None) 683 if timer is not None: 684 timer.cancel() 685 686 daemon_context = self.__dict__.get('_daemon_context', None) 687 if daemon_context is not None: 688 daemon_context.close() 689 690 _close_pools() 691 raise SystemExit(0) 692 693 def _send_signal( 694 self, 695 signal_to_send, 696 timeout: Union[float, int, None] = None, 697 check_timeout_interval: Union[float, int, None] = None, 698 ) -> SuccessTuple: 699 """Send a signal to the daemon process. 700 701 Parameters 702 ---------- 703 signal_to_send: 704 The signal the send to the daemon, e.g. `signals.SIGINT`. 705 706 timeout: Union[float, int, None], default None 707 The maximum number of seconds to wait for a process to terminate. 708 709 check_timeout_interval: Union[float, int, None], default None 710 The number of seconds to wait between checking if the process is still running. 711 712 Returns 713 ------- 714 A SuccessTuple indicating success. 715 """ 716 try: 717 pid = self.pid 718 if pid is None: 719 return ( 720 False, 721 f"Daemon '{self.daemon_id}' is not running, " 722 + f"cannot send signal '{signal_to_send}'." 723 ) 724 725 os.kill(pid, signal_to_send) 726 except Exception: 727 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 728 729 timeout = self.get_timeout_seconds(timeout) 730 check_timeout_interval = self.get_check_timeout_interval_seconds( 731 check_timeout_interval 732 ) 733 734 if not timeout: 735 return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'." 736 737 begin = time.perf_counter() 738 while (time.perf_counter() - begin) < timeout: 739 if not self.status == 'running': 740 return True, "Success" 741 time.sleep(check_timeout_interval) 742 743 return False, ( 744 f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second" 745 + ('s' if timeout != 1 else '') + '.' 746 ) 747 748 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 749 """Create the Daemon's directory. 750 If `allow_dirty_run` is `False` and the directory already exists, 751 raise a `FileExistsError`. 752 """ 753 try: 754 self.path.mkdir(parents=True, exist_ok=True) 755 _already_exists = any(os.scandir(self.path)) 756 except FileExistsError: 757 _already_exists = True 758 759 if _already_exists and not allow_dirty_run: 760 error( 761 f"Daemon '{self.daemon_id}' already exists. " + 762 "To allow this daemon to run, do one of the following:\n" 763 + " - Execute `daemon.cleanup()`.\n" 764 + f" - Delete the directory '{self.path}'.\n" 765 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 766 FileExistsError, 767 ) 768 769 @property 770 def process(self) -> Union['psutil.Process', None]: 771 """ 772 Return the psutil process for the Daemon. 773 """ 774 psutil = attempt_import('psutil') 775 pid = self.pid 776 if pid is None: 777 return None 778 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 779 try: 780 self._process = psutil.Process(int(pid)) 781 process_exists = True 782 except Exception: 783 process_exists = False 784 if not process_exists: 785 _ = self.__dict__.pop('_process', None) 786 try: 787 if self.pid_path.exists(): 788 self.pid_path.unlink() 789 except Exception: 790 pass 791 return None 792 return self._process 793 794 @property 795 def status(self) -> str: 796 """ 797 Return the running status of this Daemon. 798 """ 799 if self.process is None: 800 return 'stopped' 801 802 psutil = attempt_import('psutil') 803 try: 804 if self.process.status() == 'stopped': 805 return 'paused' 806 if self.process.status() == 'zombie': 807 raise psutil.NoSuchProcess(self.process.pid) 808 except (psutil.NoSuchProcess, AttributeError): 809 if self.pid_path.exists(): 810 try: 811 self.pid_path.unlink() 812 except Exception: 813 pass 814 return 'stopped' 815 816 return 'running' 817 818 @classmethod 819 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 820 """ 821 Return a Daemon's path from its `daemon_id`. 822 """ 823 return DAEMON_RESOURCES_PATH / daemon_id 824 825 @property 826 def path(self) -> pathlib.Path: 827 """ 828 Return the path for this Daemon's directory. 829 """ 830 return self._get_path_from_daemon_id(self.daemon_id) 831 832 @classmethod 833 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 834 """ 835 Return the `properties.json` path for a given `daemon_id`. 836 """ 837 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 838 839 @property 840 def properties_path(self) -> pathlib.Path: 841 """ 842 Return the `propterties.json` path for this Daemon. 843 """ 844 return self._get_properties_path_from_daemon_id(self.daemon_id) 845 846 @property 847 def stop_path(self) -> pathlib.Path: 848 """ 849 Return the path for the stop file (created when manually stopped). 850 """ 851 return self.path / '.stop.json' 852 853 @property 854 def log_path(self) -> pathlib.Path: 855 """ 856 Return the log path. 857 """ 858 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 859 860 @property 861 def stdin_file_path(self) -> pathlib.Path: 862 """ 863 Return the stdin file path. 864 """ 865 return self.path / 'input.stdin' 866 867 @property 868 def blocking_stdin_file_path(self) -> pathlib.Path: 869 """ 870 Return the stdin file path. 871 """ 872 if '_blocking_stdin_file_path' in self.__dict__: 873 return self._blocking_stdin_file_path 874 875 return self.path / 'input.stdin.block' 876 877 @property 878 def log_offset_path(self) -> pathlib.Path: 879 """ 880 Return the log offset file path. 881 """ 882 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 883 884 @property 885 def rotating_log(self) -> RotatingFile: 886 """ 887 The rotating log file for the daemon's output. 888 """ 889 if '_rotating_log' in self.__dict__: 890 return self._rotating_log 891 892 write_timestamps = ( 893 self.properties.get('logs', {}).get('write_timestamps', None) 894 ) 895 if write_timestamps is None: 896 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 897 898 self._rotating_log = RotatingFile( 899 self.log_path, 900 redirect_streams=True, 901 write_timestamps=write_timestamps, 902 timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'), 903 ) 904 return self._rotating_log 905 906 @property 907 def stdin_file(self): 908 """ 909 Return the file handler for the stdin file. 910 """ 911 if (stdin_file := self.__dict__.get('_stdin_file', None)): 912 return stdin_file 913 914 self._stdin_file = StdinFile( 915 self.stdin_file_path, 916 lock_file_path=self.blocking_stdin_file_path, 917 ) 918 return self._stdin_file 919 920 @property 921 def log_text(self) -> Optional[str]: 922 """ 923 Read the log files and return their contents. 924 Returns `None` if the log file does not exist. 925 """ 926 new_rotating_log = RotatingFile( 927 self.rotating_log.file_path, 928 num_files_to_keep = self.rotating_log.num_files_to_keep, 929 max_file_size = self.rotating_log.max_file_size, 930 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'), 931 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'), 932 ) 933 return new_rotating_log.read() 934 935 def readlines(self) -> List[str]: 936 """ 937 Read the next log lines, persisting the cursor for later use. 938 Note this will alter the cursor of `self.rotating_log`. 939 """ 940 self.rotating_log._cursor = self._read_log_offset() 941 lines = self.rotating_log.readlines() 942 self._write_log_offset() 943 return lines 944 945 def _read_log_offset(self) -> Tuple[int, int]: 946 """ 947 Return the current log offset cursor. 948 949 Returns 950 ------- 951 A tuple of the form (`subfile_index`, `position`). 952 """ 953 if not self.log_offset_path.exists(): 954 return 0, 0 955 956 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 957 cursor_text = f.read() 958 cursor_parts = cursor_text.split(' ') 959 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 960 return subfile_index, subfile_position 961 962 def _write_log_offset(self) -> None: 963 """ 964 Write the current log offset file. 965 """ 966 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 967 subfile_index = self.rotating_log._cursor[0] 968 subfile_position = self.rotating_log._cursor[1] 969 f.write(f"{subfile_index} {subfile_position}") 970 971 @property 972 def pid(self) -> Union[int, None]: 973 """ 974 Read the PID file and return its contents. 975 Returns `None` if the PID file does not exist. 976 """ 977 if not self.pid_path.exists(): 978 return None 979 try: 980 with open(self.pid_path, 'r', encoding='utf-8') as f: 981 text = f.read() 982 if len(text) == 0: 983 return None 984 pid = int(text.rstrip()) 985 except Exception as e: 986 warn(e) 987 text = None 988 pid = None 989 return pid 990 991 @property 992 def pid_path(self) -> pathlib.Path: 993 """ 994 Return the path to a file containing the PID for this Daemon. 995 """ 996 return self.path / 'process.pid' 997 998 @property 999 def pid_lock(self) -> 'fasteners.InterProcessLock': 1000 """ 1001 Return the process lock context manager. 1002 """ 1003 if '_pid_lock' in self.__dict__: 1004 return self._pid_lock 1005 1006 fasteners = attempt_import('fasteners') 1007 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1008 return self._pid_lock 1009 1010 @property 1011 def pickle_path(self) -> pathlib.Path: 1012 """ 1013 Return the path for the pickle file. 1014 """ 1015 return self.path / 'pickle.pkl' 1016 1017 def read_properties(self) -> Optional[Dict[str, Any]]: 1018 """Read the properties JSON file and return the dictionary.""" 1019 if not self.properties_path.exists(): 1020 return None 1021 try: 1022 with open(self.properties_path, 'r', encoding='utf-8') as file: 1023 properties = json.load(file) 1024 except Exception: 1025 properties = {} 1026 1027 return properties or {} 1028 1029 def read_pickle(self) -> Daemon: 1030 """Read a Daemon's pickle file and return the `Daemon`.""" 1031 import pickle 1032 import traceback 1033 if not self.pickle_path.exists(): 1034 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1035 1036 if self.pickle_path.stat().st_size == 0: 1037 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1038 1039 try: 1040 with open(self.pickle_path, 'rb') as pickle_file: 1041 daemon = pickle.load(pickle_file) 1042 success, msg = True, 'Success' 1043 except Exception as e: 1044 success, msg = False, str(e) 1045 daemon = None 1046 traceback.print_exception(type(e), e, e.__traceback__) 1047 if not success: 1048 error(msg) 1049 return daemon 1050 1051 @property 1052 def properties(self) -> Dict[str, Any]: 1053 """ 1054 Return the contents of the properties JSON file. 1055 """ 1056 try: 1057 _file_properties = self.read_properties() or {} 1058 except Exception: 1059 traceback.print_exc() 1060 _file_properties = {} 1061 1062 if not self._properties: 1063 self._properties = _file_properties 1064 1065 if self._properties is None: 1066 self._properties = {} 1067 1068 if ( 1069 self._properties.get('result', None) is None 1070 and _file_properties.get('result', None) is not None 1071 ): 1072 _ = self._properties.pop('result', None) 1073 1074 if _file_properties is not None: 1075 self._properties = apply_patch_to_config( 1076 _file_properties, 1077 self._properties, 1078 ) 1079 1080 return self._properties 1081 1082 @property 1083 def hidden(self) -> bool: 1084 """ 1085 Return a bool indicating whether this Daemon should be displayed. 1086 """ 1087 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1088 1089 def write_properties(self) -> SuccessTuple: 1090 """Write the properties dictionary to the properties JSON file 1091 (only if self.properties exists). 1092 """ 1093 success, msg = ( 1094 False, 1095 f"No properties to write for daemon '{self.daemon_id}'." 1096 ) 1097 if self.properties is not None: 1098 try: 1099 self.path.mkdir(parents=True, exist_ok=True) 1100 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1101 json.dump(self.properties, properties_file) 1102 success, msg = True, 'Success' 1103 except Exception as e: 1104 success, msg = False, str(e) 1105 return success, msg 1106 1107 def write_pickle(self) -> SuccessTuple: 1108 """Write the pickle file for the daemon.""" 1109 import pickle 1110 import traceback 1111 try: 1112 self.path.mkdir(parents=True, exist_ok=True) 1113 with open(self.pickle_path, 'wb+') as pickle_file: 1114 pickle.dump(self, pickle_file) 1115 success, msg = True, "Success" 1116 except Exception as e: 1117 success, msg = False, str(e) 1118 traceback.print_exception(type(e), e, e.__traceback__) 1119 return success, msg 1120 1121 1122 def _setup( 1123 self, 1124 allow_dirty_run: bool = False, 1125 ) -> None: 1126 """ 1127 Update properties before starting the Daemon. 1128 """ 1129 if self.properties is None: 1130 self._properties = {} 1131 1132 self._properties.update({ 1133 'target': { 1134 'name': self.target.__name__, 1135 'module': self.target.__module__, 1136 'args': self.target_args, 1137 'kw': self.target_kw, 1138 }, 1139 }) 1140 self.mkdir_if_not_exists(allow_dirty_run) 1141 _write_properties_success_tuple = self.write_properties() 1142 if not _write_properties_success_tuple[0]: 1143 error(_write_properties_success_tuple[1]) 1144 1145 _write_pickle_success_tuple = self.write_pickle() 1146 if not _write_pickle_success_tuple[0]: 1147 error(_write_pickle_success_tuple[1]) 1148 1149 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1150 """ 1151 Remove a daemon's directory after execution. 1152 1153 Parameters 1154 ---------- 1155 keep_logs: bool, default False 1156 If `True`, skip deleting the daemon's log files. 1157 1158 Returns 1159 ------- 1160 A `SuccessTuple` indicating success. 1161 """ 1162 if self.path.exists(): 1163 try: 1164 shutil.rmtree(self.path) 1165 except Exception as e: 1166 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1167 warn(msg) 1168 return False, msg 1169 if not keep_logs: 1170 self.rotating_log.delete() 1171 try: 1172 if self.log_offset_path.exists(): 1173 self.log_offset_path.unlink() 1174 except Exception as e: 1175 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1176 warn(msg) 1177 return False, msg 1178 return True, "Success" 1179 1180 1181 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1182 """ 1183 Return the timeout value to use. Use `--timeout-seconds` if provided, 1184 else the configured default (8). 1185 """ 1186 if isinstance(timeout, (int, float)): 1187 return timeout 1188 return get_config('jobs', 'timeout_seconds') 1189 1190 1191 def get_check_timeout_interval_seconds( 1192 self, 1193 check_timeout_interval: Union[int, float, None] = None, 1194 ) -> Union[int, float]: 1195 """ 1196 Return the interval value to check the status of timeouts. 1197 """ 1198 if isinstance(check_timeout_interval, (int, float)): 1199 return check_timeout_interval 1200 return get_config('jobs', 'check_timeout_interval_seconds') 1201 1202 @property 1203 def target_args(self) -> Union[Tuple[Any], None]: 1204 """ 1205 Return the positional arguments to pass to the target function. 1206 """ 1207 target_args = ( 1208 self.__dict__.get('_target_args', None) 1209 or self.properties.get('target', {}).get('args', None) 1210 ) 1211 if target_args is None: 1212 return tuple([]) 1213 1214 return tuple(target_args) 1215 1216 @property 1217 def target_kw(self) -> Union[Dict[str, Any], None]: 1218 """ 1219 Return the keyword arguments to pass to the target function. 1220 """ 1221 target_kw = ( 1222 self.__dict__.get('_target_kw', None) 1223 or self.properties.get('target', {}).get('kw', None) 1224 ) 1225 if target_kw is None: 1226 return {} 1227 1228 return {key: val for key, val in target_kw.items()} 1229 1230 def __getstate__(self): 1231 """ 1232 Pickle this Daemon. 1233 """ 1234 dill = attempt_import('dill') 1235 return { 1236 'target': dill.dumps(self.target), 1237 'target_args': self.target_args, 1238 'target_kw': self.target_kw, 1239 'daemon_id': self.daemon_id, 1240 'label': self.label, 1241 'properties': self.properties, 1242 } 1243 1244 def __setstate__(self, _state: Dict[str, Any]): 1245 """ 1246 Restore this Daemon from a pickled state. 1247 If the properties file exists, skip the old pickled version. 1248 """ 1249 dill = attempt_import('dill') 1250 _state['target'] = dill.loads(_state['target']) 1251 self._pickle = True 1252 daemon_id = _state.get('daemon_id', None) 1253 if not daemon_id: 1254 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1255 1256 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1257 ignore_properties = properties_path.exists() 1258 if ignore_properties: 1259 _state = { 1260 key: val 1261 for key, val in _state.items() 1262 if key != 'properties' 1263 } 1264 self.__init__(**_state) 1265 1266 1267 def __repr__(self): 1268 return str(self) 1269 1270 def __str__(self): 1271 return self.daemon_id 1272 1273 def __eq__(self, other): 1274 if not isinstance(other, Daemon): 1275 return False 1276 return self.daemon_id == other.daemon_id 1277 1278 def __hash__(self): 1279 return hash(self.daemon_id)
Daemonize Python functions into background processes.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 |
>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |
>>> mrsm.pprint(daemon.properties)
{
'label': 'print',
'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
'result': None,
'process': {'ended': '2024-07-29T18:03:33.752806'}
}
139 def __init__( 140 self, 141 target: Optional[Callable[[Any], Any]] = None, 142 target_args: Union[List[Any], Tuple[Any], None] = None, 143 target_kw: Optional[Dict[str, Any]] = None, 144 env: Optional[Dict[str, str]] = None, 145 daemon_id: Optional[str] = None, 146 label: Optional[str] = None, 147 properties: Optional[Dict[str, Any]] = None, 148 ): 149 """ 150 Parameters 151 ---------- 152 target: Optional[Callable[[Any], Any]], default None, 153 The function to execute in a child process. 154 155 target_args: Union[List[Any], Tuple[Any], None], default None 156 Positional arguments to pass to the target function. 157 158 target_kw: Optional[Dict[str, Any]], default None 159 Keyword arguments to pass to the target function. 160 161 env: Optional[Dict[str, str]], default None 162 If provided, set these environment variables in the daemon process. 163 164 daemon_id: Optional[str], default None 165 Build a `Daemon` from an existing `daemon_id`. 166 If `daemon_id` is provided, other arguments are ignored and are derived 167 from the existing pickled `Daemon`. 168 169 label: Optional[str], default None 170 Label string to help identifiy a daemon. 171 If `None`, use the function name instead. 172 173 properties: Optional[Dict[str, Any]], default None 174 Override reading from the properties JSON by providing an existing dictionary. 175 """ 176 _pickle = self.__dict__.get('_pickle', False) 177 if daemon_id is not None: 178 self.daemon_id = daemon_id 179 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 180 181 if not self.properties_path.exists(): 182 raise Exception( 183 f"Daemon '{self.daemon_id}' does not exist. " 184 + "Pass a target to create a new Daemon." 185 ) 186 187 try: 188 new_daemon = self.from_properties_file(daemon_id) 189 except Exception: 190 new_daemon = None 191 192 if new_daemon is not None: 193 new_daemon.write_pickle() 194 target = new_daemon.target 195 target_args = new_daemon.target_args 196 target_kw = new_daemon.target_kw 197 label = new_daemon.label 198 self._properties = new_daemon.properties 199 else: 200 try: 201 self.properties_path.unlink() 202 except Exception: 203 pass 204 205 raise Exception( 206 f"Could not recover daemon '{self.daemon_id}' " 207 + "from its properties file." 208 ) 209 210 if 'target' not in self.__dict__: 211 if target is None: 212 error("Cannot create a Daemon without a target.") 213 self.target = target 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 if 'daemon_id' not in self.__dict__: 229 self.daemon_id = get_new_daemon_name() 230 if '_properties' not in self.__dict__: 231 self._properties = properties 232 if self._properties is None: 233 self._properties = {} 234 235 self._properties.update({'label': self.label}) 236 if env: 237 self._properties.update({'env': env}) 238 239 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 240 _ = self.process
Parameters
- target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
- target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
- target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
- daemon_id (Optional[str], default None):
Build a
Daemon
from an existingdaemon_id
. Ifdaemon_id
is provided, other arguments are ignored and are derived from the existing pickledDaemon
. - label (Optional[str], default None):
Label string to help identifiy a daemon.
If
None
, use the function name instead. - properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
92 @classmethod 93 def from_properties_file(cls, daemon_id: str) -> Daemon: 94 """ 95 Return a Daemon from a properties dictionary. 96 """ 97 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 98 if not properties_path.exists(): 99 raise OSError(f"Properties file '{properties_path}' does not exist.") 100 101 try: 102 with open(properties_path, 'r', encoding='utf-8') as f: 103 properties = json.load(f) 104 except Exception: 105 properties = {} 106 107 if not properties: 108 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 109 110 daemon_id = properties_path.parent.name 111 target_cf = properties.get('target', {}) 112 target_module_name = target_cf.get('module', None) 113 target_function_name = target_cf.get('name', None) 114 target_args = target_cf.get('args', None) 115 target_kw = target_cf.get('kw', None) 116 label = properties.get('label', None) 117 118 if None in [ 119 target_module_name, 120 target_function_name, 121 target_args, 122 target_kw, 123 ]: 124 raise ValueError("Missing target function information.") 125 126 target_module = importlib.import_module(target_module_name) 127 target_function = getattr(target_module, target_function_name) 128 129 return Daemon( 130 daemon_id=daemon_id, 131 target=target_function, 132 target_args=target_args, 133 target_kw=target_kw, 134 properties=properties, 135 label=label, 136 )
Return a Daemon from a properties dictionary.
395 def run( 396 self, 397 keep_daemon_output: bool = True, 398 allow_dirty_run: bool = False, 399 debug: bool = False, 400 ) -> SuccessTuple: 401 """Run the daemon as a child process and continue executing the parent. 402 403 Parameters 404 ---------- 405 keep_daemon_output: bool, default True 406 If `False`, delete the daemon's output directory upon exiting. 407 408 allow_dirty_run: bool, default False 409 If `True`, run the daemon, even if the `daemon_id` directory exists. 410 This option is dangerous because if the same `daemon_id` runs concurrently, 411 the last to finish will overwrite the output of the first. 412 413 Returns 414 ------- 415 A SuccessTuple indicating success. 416 417 """ 418 import platform 419 if platform.system() == 'Windows': 420 return False, "Cannot run background jobs on Windows." 421 422 ### The daemon might exist and be paused. 423 if self.status == 'paused': 424 return self.resume() 425 426 self._remove_stop_file() 427 if self.status == 'running': 428 return True, f"Daemon '{self}' is already running." 429 430 self.mkdir_if_not_exists(allow_dirty_run) 431 _write_pickle_success_tuple = self.write_pickle() 432 if not _write_pickle_success_tuple[0]: 433 return _write_pickle_success_tuple 434 435 _launch_daemon_code = ( 436 "from meerschaum.utils.daemon import Daemon; " 437 + f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 438 + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 439 + "allow_dirty_run=True)" 440 ) 441 env = dict(os.environ) 442 env[STATIC_CONFIG['environment']['noninteractive']] = 'true' 443 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 444 msg = ( 445 "Success" 446 if _launch_success_bool 447 else f"Failed to start daemon '{self.daemon_id}'." 448 ) 449 return _launch_success_bool, msg
Run the daemon as a child process and continue executing the parent.
Parameters
- keep_daemon_output (bool, default True):
If
False
, delete the daemon's output directory upon exiting. - allow_dirty_run (bool, default False):
If
True
, run the daemon, even if thedaemon_id
directory exists. This option is dangerous because if the samedaemon_id
runs concurrently, the last to finish will overwrite the output of the first.
Returns
- A SuccessTuple indicating success.
451 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 452 """ 453 Forcibly terminate a running daemon. 454 Sends a SIGTERM signal to the process. 455 456 Parameters 457 ---------- 458 timeout: Optional[int], default 3 459 How many seconds to wait for the process to terminate. 460 461 Returns 462 ------- 463 A SuccessTuple indicating success. 464 """ 465 if self.status != 'paused': 466 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 467 if success: 468 self._write_stop_file('kill') 469 return success, msg 470 471 if self.status == 'stopped': 472 self._write_stop_file('kill') 473 return True, "Process has already stopped." 474 475 psutil = attempt_import('psutil') 476 process = self.process 477 try: 478 process.terminate() 479 process.kill() 480 process.wait(timeout=timeout) 481 except Exception as e: 482 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 483 484 try: 485 if process.status(): 486 return False, "Failed to stop daemon '{self}' ({process})." 487 except psutil.NoSuchProcess: 488 pass 489 490 if self.pid_path.exists(): 491 try: 492 self.pid_path.unlink() 493 except Exception: 494 pass 495 496 self._write_stop_file('kill') 497 return True, "Success"
Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.
Parameters
- timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
- A SuccessTuple indicating success.
499 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 500 """Gracefully quit a running daemon.""" 501 if self.status == 'paused': 502 return self.kill(timeout) 503 504 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 505 if signal_success: 506 self._write_stop_file('quit') 507 return signal_success, signal_msg
Gracefully quit a running daemon.
509 def pause( 510 self, 511 timeout: Union[int, float, None] = None, 512 check_timeout_interval: Union[float, int, None] = None, 513 ) -> SuccessTuple: 514 """ 515 Pause the daemon if it is running. 516 517 Parameters 518 ---------- 519 timeout: Union[float, int, None], default None 520 The maximum number of seconds to wait for a process to suspend. 521 522 check_timeout_interval: Union[float, int, None], default None 523 The number of seconds to wait between checking if the process is still running. 524 525 Returns 526 ------- 527 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 528 """ 529 if self.process is None: 530 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 531 532 if self.status == 'paused': 533 return True, f"Daemon '{self.daemon_id}' is already paused." 534 535 self._write_stop_file('pause') 536 try: 537 self.process.suspend() 538 except Exception as e: 539 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 540 541 timeout = self.get_timeout_seconds(timeout) 542 check_timeout_interval = self.get_check_timeout_interval_seconds( 543 check_timeout_interval 544 ) 545 546 psutil = attempt_import('psutil') 547 548 if not timeout: 549 try: 550 success = self.process.status() == 'stopped' 551 except psutil.NoSuchProcess: 552 success = True 553 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 554 if success: 555 self._capture_process_timestamp('paused') 556 return success, msg 557 558 begin = time.perf_counter() 559 while (time.perf_counter() - begin) < timeout: 560 try: 561 if self.process.status() == 'stopped': 562 self._capture_process_timestamp('paused') 563 return True, "Success" 564 except psutil.NoSuchProcess as e: 565 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 566 time.sleep(check_timeout_interval) 567 568 return False, ( 569 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 570 + ('s' if timeout != 1 else '') + '.' 571 )
Pause the daemon if it is running.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully suspended.
573 def resume( 574 self, 575 timeout: Union[int, float, None] = None, 576 check_timeout_interval: Union[float, int, None] = None, 577 ) -> SuccessTuple: 578 """ 579 Resume the daemon if it is paused. 580 581 Parameters 582 ---------- 583 timeout: Union[float, int, None], default None 584 The maximum number of seconds to wait for a process to resume. 585 586 check_timeout_interval: Union[float, int, None], default None 587 The number of seconds to wait between checking if the process is still stopped. 588 589 Returns 590 ------- 591 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 592 """ 593 if self.status == 'running': 594 return True, f"Daemon '{self.daemon_id}' is already running." 595 596 if self.status == 'stopped': 597 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 598 599 self._remove_stop_file() 600 try: 601 self.process.resume() 602 except Exception as e: 603 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 604 605 timeout = self.get_timeout_seconds(timeout) 606 check_timeout_interval = self.get_check_timeout_interval_seconds( 607 check_timeout_interval 608 ) 609 610 if not timeout: 611 success = self.status == 'running' 612 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 613 if success: 614 self._capture_process_timestamp('began') 615 return success, msg 616 617 begin = time.perf_counter() 618 while (time.perf_counter() - begin) < timeout: 619 if self.status == 'running': 620 self._capture_process_timestamp('began') 621 return True, "Success" 622 time.sleep(check_timeout_interval) 623 624 return False, ( 625 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 626 + ('s' if timeout != 1 else '') + '.' 627 )
Resume the daemon if it is paused.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully resumed.
748 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 749 """Create the Daemon's directory. 750 If `allow_dirty_run` is `False` and the directory already exists, 751 raise a `FileExistsError`. 752 """ 753 try: 754 self.path.mkdir(parents=True, exist_ok=True) 755 _already_exists = any(os.scandir(self.path)) 756 except FileExistsError: 757 _already_exists = True 758 759 if _already_exists and not allow_dirty_run: 760 error( 761 f"Daemon '{self.daemon_id}' already exists. " + 762 "To allow this daemon to run, do one of the following:\n" 763 + " - Execute `daemon.cleanup()`.\n" 764 + f" - Delete the directory '{self.path}'.\n" 765 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 766 FileExistsError, 767 )
Create the Daemon's directory.
If allow_dirty_run
is False
and the directory already exists,
raise a FileExistsError
.
769 @property 770 def process(self) -> Union['psutil.Process', None]: 771 """ 772 Return the psutil process for the Daemon. 773 """ 774 psutil = attempt_import('psutil') 775 pid = self.pid 776 if pid is None: 777 return None 778 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 779 try: 780 self._process = psutil.Process(int(pid)) 781 process_exists = True 782 except Exception: 783 process_exists = False 784 if not process_exists: 785 _ = self.__dict__.pop('_process', None) 786 try: 787 if self.pid_path.exists(): 788 self.pid_path.unlink() 789 except Exception: 790 pass 791 return None 792 return self._process
Return the psutil process for the Daemon.
794 @property 795 def status(self) -> str: 796 """ 797 Return the running status of this Daemon. 798 """ 799 if self.process is None: 800 return 'stopped' 801 802 psutil = attempt_import('psutil') 803 try: 804 if self.process.status() == 'stopped': 805 return 'paused' 806 if self.process.status() == 'zombie': 807 raise psutil.NoSuchProcess(self.process.pid) 808 except (psutil.NoSuchProcess, AttributeError): 809 if self.pid_path.exists(): 810 try: 811 self.pid_path.unlink() 812 except Exception: 813 pass 814 return 'stopped' 815 816 return 'running'
Return the running status of this Daemon.
825 @property 826 def path(self) -> pathlib.Path: 827 """ 828 Return the path for this Daemon's directory. 829 """ 830 return self._get_path_from_daemon_id(self.daemon_id)
Return the path for this Daemon's directory.
839 @property 840 def properties_path(self) -> pathlib.Path: 841 """ 842 Return the `propterties.json` path for this Daemon. 843 """ 844 return self._get_properties_path_from_daemon_id(self.daemon_id)
Return the propterties.json
path for this Daemon.
846 @property 847 def stop_path(self) -> pathlib.Path: 848 """ 849 Return the path for the stop file (created when manually stopped). 850 """ 851 return self.path / '.stop.json'
Return the path for the stop file (created when manually stopped).
853 @property 854 def log_path(self) -> pathlib.Path: 855 """ 856 Return the log path. 857 """ 858 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
Return the log path.
860 @property 861 def stdin_file_path(self) -> pathlib.Path: 862 """ 863 Return the stdin file path. 864 """ 865 return self.path / 'input.stdin'
Return the stdin file path.
867 @property 868 def blocking_stdin_file_path(self) -> pathlib.Path: 869 """ 870 Return the stdin file path. 871 """ 872 if '_blocking_stdin_file_path' in self.__dict__: 873 return self._blocking_stdin_file_path 874 875 return self.path / 'input.stdin.block'
Return the stdin file path.
877 @property 878 def log_offset_path(self) -> pathlib.Path: 879 """ 880 Return the log offset file path. 881 """ 882 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
Return the log offset file path.
884 @property 885 def rotating_log(self) -> RotatingFile: 886 """ 887 The rotating log file for the daemon's output. 888 """ 889 if '_rotating_log' in self.__dict__: 890 return self._rotating_log 891 892 write_timestamps = ( 893 self.properties.get('logs', {}).get('write_timestamps', None) 894 ) 895 if write_timestamps is None: 896 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 897 898 self._rotating_log = RotatingFile( 899 self.log_path, 900 redirect_streams=True, 901 write_timestamps=write_timestamps, 902 timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'), 903 ) 904 return self._rotating_log
The rotating log file for the daemon's output.
906 @property 907 def stdin_file(self): 908 """ 909 Return the file handler for the stdin file. 910 """ 911 if (stdin_file := self.__dict__.get('_stdin_file', None)): 912 return stdin_file 913 914 self._stdin_file = StdinFile( 915 self.stdin_file_path, 916 lock_file_path=self.blocking_stdin_file_path, 917 ) 918 return self._stdin_file
Return the file handler for the stdin file.
920 @property 921 def log_text(self) -> Optional[str]: 922 """ 923 Read the log files and return their contents. 924 Returns `None` if the log file does not exist. 925 """ 926 new_rotating_log = RotatingFile( 927 self.rotating_log.file_path, 928 num_files_to_keep = self.rotating_log.num_files_to_keep, 929 max_file_size = self.rotating_log.max_file_size, 930 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'), 931 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'), 932 ) 933 return new_rotating_log.read()
Read the log files and return their contents.
Returns None
if the log file does not exist.
935 def readlines(self) -> List[str]: 936 """ 937 Read the next log lines, persisting the cursor for later use. 938 Note this will alter the cursor of `self.rotating_log`. 939 """ 940 self.rotating_log._cursor = self._read_log_offset() 941 lines = self.rotating_log.readlines() 942 self._write_log_offset() 943 return lines
Read the next log lines, persisting the cursor for later use.
Note this will alter the cursor of self.rotating_log
.
971 @property 972 def pid(self) -> Union[int, None]: 973 """ 974 Read the PID file and return its contents. 975 Returns `None` if the PID file does not exist. 976 """ 977 if not self.pid_path.exists(): 978 return None 979 try: 980 with open(self.pid_path, 'r', encoding='utf-8') as f: 981 text = f.read() 982 if len(text) == 0: 983 return None 984 pid = int(text.rstrip()) 985 except Exception as e: 986 warn(e) 987 text = None 988 pid = None 989 return pid
Read the PID file and return its contents.
Returns None
if the PID file does not exist.
991 @property 992 def pid_path(self) -> pathlib.Path: 993 """ 994 Return the path to a file containing the PID for this Daemon. 995 """ 996 return self.path / 'process.pid'
Return the path to a file containing the PID for this Daemon.
998 @property 999 def pid_lock(self) -> 'fasteners.InterProcessLock': 1000 """ 1001 Return the process lock context manager. 1002 """ 1003 if '_pid_lock' in self.__dict__: 1004 return self._pid_lock 1005 1006 fasteners = attempt_import('fasteners') 1007 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1008 return self._pid_lock
Return the process lock context manager.
1010 @property 1011 def pickle_path(self) -> pathlib.Path: 1012 """ 1013 Return the path for the pickle file. 1014 """ 1015 return self.path / 'pickle.pkl'
Return the path for the pickle file.
1017 def read_properties(self) -> Optional[Dict[str, Any]]: 1018 """Read the properties JSON file and return the dictionary.""" 1019 if not self.properties_path.exists(): 1020 return None 1021 try: 1022 with open(self.properties_path, 'r', encoding='utf-8') as file: 1023 properties = json.load(file) 1024 except Exception: 1025 properties = {} 1026 1027 return properties or {}
Read the properties JSON file and return the dictionary.
1029 def read_pickle(self) -> Daemon: 1030 """Read a Daemon's pickle file and return the `Daemon`.""" 1031 import pickle 1032 import traceback 1033 if not self.pickle_path.exists(): 1034 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1035 1036 if self.pickle_path.stat().st_size == 0: 1037 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1038 1039 try: 1040 with open(self.pickle_path, 'rb') as pickle_file: 1041 daemon = pickle.load(pickle_file) 1042 success, msg = True, 'Success' 1043 except Exception as e: 1044 success, msg = False, str(e) 1045 daemon = None 1046 traceback.print_exception(type(e), e, e.__traceback__) 1047 if not success: 1048 error(msg) 1049 return daemon
Read a Daemon's pickle file and return the Daemon
.
1051 @property 1052 def properties(self) -> Dict[str, Any]: 1053 """ 1054 Return the contents of the properties JSON file. 1055 """ 1056 try: 1057 _file_properties = self.read_properties() or {} 1058 except Exception: 1059 traceback.print_exc() 1060 _file_properties = {} 1061 1062 if not self._properties: 1063 self._properties = _file_properties 1064 1065 if self._properties is None: 1066 self._properties = {} 1067 1068 if ( 1069 self._properties.get('result', None) is None 1070 and _file_properties.get('result', None) is not None 1071 ): 1072 _ = self._properties.pop('result', None) 1073 1074 if _file_properties is not None: 1075 self._properties = apply_patch_to_config( 1076 _file_properties, 1077 self._properties, 1078 ) 1079 1080 return self._properties
Return the contents of the properties JSON file.
1089 def write_properties(self) -> SuccessTuple: 1090 """Write the properties dictionary to the properties JSON file 1091 (only if self.properties exists). 1092 """ 1093 success, msg = ( 1094 False, 1095 f"No properties to write for daemon '{self.daemon_id}'." 1096 ) 1097 if self.properties is not None: 1098 try: 1099 self.path.mkdir(parents=True, exist_ok=True) 1100 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1101 json.dump(self.properties, properties_file) 1102 success, msg = True, 'Success' 1103 except Exception as e: 1104 success, msg = False, str(e) 1105 return success, msg
Write the properties dictionary to the properties JSON file (only if self.properties exists).
1107 def write_pickle(self) -> SuccessTuple: 1108 """Write the pickle file for the daemon.""" 1109 import pickle 1110 import traceback 1111 try: 1112 self.path.mkdir(parents=True, exist_ok=True) 1113 with open(self.pickle_path, 'wb+') as pickle_file: 1114 pickle.dump(self, pickle_file) 1115 success, msg = True, "Success" 1116 except Exception as e: 1117 success, msg = False, str(e) 1118 traceback.print_exception(type(e), e, e.__traceback__) 1119 return success, msg
Write the pickle file for the daemon.
1149 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1150 """ 1151 Remove a daemon's directory after execution. 1152 1153 Parameters 1154 ---------- 1155 keep_logs: bool, default False 1156 If `True`, skip deleting the daemon's log files. 1157 1158 Returns 1159 ------- 1160 A `SuccessTuple` indicating success. 1161 """ 1162 if self.path.exists(): 1163 try: 1164 shutil.rmtree(self.path) 1165 except Exception as e: 1166 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1167 warn(msg) 1168 return False, msg 1169 if not keep_logs: 1170 self.rotating_log.delete() 1171 try: 1172 if self.log_offset_path.exists(): 1173 self.log_offset_path.unlink() 1174 except Exception as e: 1175 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1176 warn(msg) 1177 return False, msg 1178 return True, "Success"
Remove a daemon's directory after execution.
Parameters
- keep_logs (bool, default False):
If
True
, skip deleting the daemon's log files.
Returns
- A
SuccessTuple
indicating success.
1181 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1182 """ 1183 Return the timeout value to use. Use `--timeout-seconds` if provided, 1184 else the configured default (8). 1185 """ 1186 if isinstance(timeout, (int, float)): 1187 return timeout 1188 return get_config('jobs', 'timeout_seconds')
Return the timeout value to use. Use --timeout-seconds
if provided,
else the configured default (8).
1191 def get_check_timeout_interval_seconds( 1192 self, 1193 check_timeout_interval: Union[int, float, None] = None, 1194 ) -> Union[int, float]: 1195 """ 1196 Return the interval value to check the status of timeouts. 1197 """ 1198 if isinstance(check_timeout_interval, (int, float)): 1199 return check_timeout_interval 1200 return get_config('jobs', 'check_timeout_interval_seconds')
Return the interval value to check the status of timeouts.
1202 @property 1203 def target_args(self) -> Union[Tuple[Any], None]: 1204 """ 1205 Return the positional arguments to pass to the target function. 1206 """ 1207 target_args = ( 1208 self.__dict__.get('_target_args', None) 1209 or self.properties.get('target', {}).get('args', None) 1210 ) 1211 if target_args is None: 1212 return tuple([]) 1213 1214 return tuple(target_args)
Return the positional arguments to pass to the target function.
1216 @property 1217 def target_kw(self) -> Union[Dict[str, Any], None]: 1218 """ 1219 Return the keyword arguments to pass to the target function. 1220 """ 1221 target_kw = ( 1222 self.__dict__.get('_target_kw', None) 1223 or self.properties.get('target', {}).get('kw', None) 1224 ) 1225 if target_kw is None: 1226 return {} 1227 1228 return {key: val for key, val in target_kw.items()}
Return the keyword arguments to pass to the target function.