meerschaum.utils.daemon.Daemon
Manage running daemons via the Daemon class.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Manage running daemons via the Daemon class. 7""" 8 9from __future__ import annotations 10import os 11import importlib 12import pathlib 13import json 14import shutil 15import signal 16import sys 17import time 18import traceback 19from functools import partial 20from datetime import datetime, timezone 21 22import meerschaum as mrsm 23from meerschaum.utils.typing import ( 24 Optional, Dict, Any, SuccessTuple, Callable, List, Union, 25 is_success_tuple, Tuple, 26) 27from meerschaum.config import get_config 28from meerschaum.config.static import STATIC_CONFIG 29from meerschaum.config._paths import ( 30 DAEMON_RESOURCES_PATH, LOGS_RESOURCES_PATH, DAEMON_ERROR_LOG_PATH, 31) 32from meerschaum.config._patch import apply_patch_to_config 33from meerschaum.utils.warnings import warn, error 34from meerschaum.utils.packages import attempt_import 35from meerschaum.utils.venv import venv_exec 36from meerschaum.utils.daemon._names import get_new_daemon_name 37from meerschaum.utils.daemon.RotatingFile import RotatingFile 38from meerschaum.utils.daemon.StdinFile import StdinFile 39from meerschaum.utils.threading import RepeatTimer 40from meerschaum.__main__ import _close_pools 41 42_daemons = [] 43_results = {} 44 45class Daemon: 46 """ 47 Daemonize Python functions into background processes. 48 49 Examples 50 -------- 51 >>> import meerschaum as mrsm 52 >>> from meerschaum.utils.daemons import Daemon 53 >>> daemon = Daemon(print, ('hi',)) 54 >>> success, msg = daemon.run() 55 >>> print(daemon.log_text) 56 57 2024-07-29 18:03 | hi 58 2024-07-29 18:03 | 59 >>> daemon.run(allow_dirty_run=True) 60 >>> print(daemon.log_text) 61 62 2024-07-29 18:03 | hi 63 2024-07-29 18:03 | 64 2024-07-29 18:05 | hi 65 2024-07-29 18:05 | 66 >>> mrsm.pprint(daemon.properties) 67 { 68 'label': 'print', 69 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 70 'result': None, 71 'process': {'ended': '2024-07-29T18:03:33.752806'} 72 } 73 74 """ 75 76 def __new__( 77 cls, 78 *args, 79 daemon_id: Optional[str] = None, 80 **kw 81 ): 82 """ 83 If a daemon_id is provided and already exists, read from its pickle file. 84 """ 85 instance = super(Daemon, cls).__new__(cls) 86 if daemon_id is not None: 87 instance.daemon_id = daemon_id 88 if instance.pickle_path.exists(): 89 instance = instance.read_pickle() 90 return instance 91 92 @classmethod 93 def from_properties_file(cls, daemon_id: str) -> Daemon: 94 """ 95 Return a Daemon from a properties dictionary. 96 """ 97 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 98 if not properties_path.exists(): 99 raise OSError(f"Properties file '{properties_path}' does not exist.") 100 101 try: 102 with open(properties_path, 'r', encoding='utf-8') as f: 103 properties = json.load(f) 104 except Exception: 105 properties = {} 106 107 if not properties: 108 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 109 110 daemon_id = properties_path.parent.name 111 target_cf = properties.get('target', {}) 112 target_module_name = target_cf.get('module', None) 113 target_function_name = target_cf.get('name', None) 114 target_args = target_cf.get('args', None) 115 target_kw = target_cf.get('kw', None) 116 label = properties.get('label', None) 117 118 if None in [ 119 target_module_name, 120 target_function_name, 121 target_args, 122 target_kw, 123 ]: 124 raise ValueError("Missing target function information.") 125 126 target_module = importlib.import_module(target_module_name) 127 target_function = getattr(target_module, target_function_name) 128 129 return Daemon( 130 daemon_id=daemon_id, 131 target=target_function, 132 target_args=target_args, 133 target_kw=target_kw, 134 properties=properties, 135 label=label, 136 ) 137 138 139 def __init__( 140 self, 141 target: Optional[Callable[[Any], Any]] = None, 142 target_args: Union[List[Any], Tuple[Any], None] = None, 143 target_kw: Optional[Dict[str, Any]] = None, 144 env: Optional[Dict[str, str]] = None, 145 daemon_id: Optional[str] = None, 146 label: Optional[str] = None, 147 properties: Optional[Dict[str, Any]] = None, 148 ): 149 """ 150 Parameters 151 ---------- 152 target: Optional[Callable[[Any], Any]], default None, 153 The function to execute in a child process. 154 155 target_args: Union[List[Any], Tuple[Any], None], default None 156 Positional arguments to pass to the target function. 157 158 target_kw: Optional[Dict[str, Any]], default None 159 Keyword arguments to pass to the target function. 160 161 env: Optional[Dict[str, str]], default None 162 If provided, set these environment variables in the daemon process. 163 164 daemon_id: Optional[str], default None 165 Build a `Daemon` from an existing `daemon_id`. 166 If `daemon_id` is provided, other arguments are ignored and are derived 167 from the existing pickled `Daemon`. 168 169 label: Optional[str], default None 170 Label string to help identifiy a daemon. 171 If `None`, use the function name instead. 172 173 properties: Optional[Dict[str, Any]], default None 174 Override reading from the properties JSON by providing an existing dictionary. 175 """ 176 _pickle = self.__dict__.get('_pickle', False) 177 if daemon_id is not None: 178 self.daemon_id = daemon_id 179 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 180 181 if not self.properties_path.exists(): 182 raise Exception( 183 f"Daemon '{self.daemon_id}' does not exist. " 184 + "Pass a target to create a new Daemon." 185 ) 186 187 try: 188 new_daemon = self.from_properties_file(daemon_id) 189 except Exception: 190 new_daemon = None 191 192 if new_daemon is not None: 193 new_daemon.write_pickle() 194 target = new_daemon.target 195 target_args = new_daemon.target_args 196 target_kw = new_daemon.target_kw 197 label = new_daemon.label 198 self._properties = new_daemon.properties 199 else: 200 try: 201 self.properties_path.unlink() 202 except Exception: 203 pass 204 205 raise Exception( 206 f"Could not recover daemon '{self.daemon_id}' " 207 + "from its properties file." 208 ) 209 210 if 'target' not in self.__dict__: 211 if target is None: 212 error("Cannot create a Daemon without a target.") 213 self.target = target 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 if 'daemon_id' not in self.__dict__: 229 self.daemon_id = get_new_daemon_name() 230 if '_properties' not in self.__dict__: 231 self._properties = properties 232 if self._properties is None: 233 self._properties = {} 234 235 self._properties.update({'label': self.label}) 236 if env: 237 self._properties.update({'env': env}) 238 239 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 240 _ = self.process 241 242 243 def _run_exit( 244 self, 245 keep_daemon_output: bool = True, 246 allow_dirty_run: bool = False, 247 ) -> Any: 248 """Run the daemon's target function. 249 NOTE: This WILL EXIT the parent process! 250 251 Parameters 252 ---------- 253 keep_daemon_output: bool, default True 254 If `False`, delete the daemon's output directory upon exiting. 255 256 allow_dirty_run, bool, default False: 257 If `True`, run the daemon, even if the `daemon_id` directory exists. 258 This option is dangerous because if the same `daemon_id` runs twice, 259 the last to finish will overwrite the output of the first. 260 261 Returns 262 ------- 263 Nothing — this will exit the parent process. 264 """ 265 import platform, sys, os, traceback 266 from meerschaum.utils.warnings import warn 267 from meerschaum.config import get_config 268 daemon = attempt_import('daemon') 269 lines = get_config('jobs', 'terminal', 'lines') 270 columns = get_config('jobs', 'terminal', 'columns') 271 272 if platform.system() == 'Windows': 273 return False, "Windows is no longer supported." 274 275 self._setup(allow_dirty_run) 276 277 ### NOTE: The SIGINT handler has been removed so that child processes may handle 278 ### KeyboardInterrupts themselves. 279 ### The previous aggressive approach was redundant because of the SIGTERM handler. 280 self._daemon_context = daemon.DaemonContext( 281 pidfile=self.pid_lock, 282 stdout=self.rotating_log, 283 stderr=self.rotating_log, 284 working_directory=os.getcwd(), 285 detach_process=True, 286 files_preserve=list(self.rotating_log.subfile_objects.values()), 287 signal_map={ 288 signal.SIGTERM: self._handle_sigterm, 289 }, 290 ) 291 292 _daemons.append(self) 293 294 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 295 self._log_refresh_timer = RepeatTimer( 296 log_refresh_seconds, 297 partial(self.rotating_log.refresh_files, start_interception=True), 298 ) 299 300 try: 301 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 302 with self._daemon_context: 303 sys.stdin = self.stdin_file 304 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 305 os.environ['PYTHONUNBUFFERED'] = '1' 306 307 ### Allow the user to override environment variables. 308 env = self.properties.get('env', {}) 309 if env and isinstance(env, dict): 310 os.environ.update({str(k): str(v) for k, v in env.items()}) 311 312 self.rotating_log.refresh_files(start_interception=True) 313 result = None 314 try: 315 with open(self.pid_path, 'w+', encoding='utf-8') as f: 316 f.write(str(os.getpid())) 317 318 self._log_refresh_timer.start() 319 self.properties['result'] = None 320 self._capture_process_timestamp('began') 321 result = self.target(*self.target_args, **self.target_kw) 322 self.properties['result'] = result 323 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 324 pass 325 except Exception as e: 326 warn( 327 f"Exception in daemon target function: {traceback.format_exc()}", 328 ) 329 result = e 330 finally: 331 _results[self.daemon_id] = result 332 333 if keep_daemon_output: 334 self._capture_process_timestamp('ended') 335 else: 336 self.cleanup() 337 338 self._log_refresh_timer.cancel() 339 if self.pid is None and self.pid_path.exists(): 340 self.pid_path.unlink() 341 342 if is_success_tuple(result): 343 try: 344 mrsm.pprint(result) 345 except BrokenPipeError: 346 pass 347 348 except Exception as e: 349 daemon_error = traceback.format_exc() 350 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 351 f.write(daemon_error) 352 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 353 354 def _capture_process_timestamp( 355 self, 356 process_key: str, 357 write_properties: bool = True, 358 ) -> None: 359 """ 360 Record the current timestamp to the parameters `process:<process_key>`. 361 362 Parameters 363 ---------- 364 process_key: str 365 Under which key to store the timestamp. 366 367 write_properties: bool, default True 368 If `True` persist the properties to disk immediately after capturing the timestamp. 369 """ 370 if 'process' not in self.properties: 371 self.properties['process'] = {} 372 373 if process_key not in ('began', 'ended', 'paused', 'stopped'): 374 raise ValueError(f"Invalid key '{process_key}'.") 375 376 self.properties['process'][process_key] = ( 377 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 378 ) 379 if write_properties: 380 self.write_properties() 381 382 def run( 383 self, 384 keep_daemon_output: bool = True, 385 allow_dirty_run: bool = False, 386 debug: bool = False, 387 ) -> SuccessTuple: 388 """Run the daemon as a child process and continue executing the parent. 389 390 Parameters 391 ---------- 392 keep_daemon_output: bool, default True 393 If `False`, delete the daemon's output directory upon exiting. 394 395 allow_dirty_run: bool, default False 396 If `True`, run the daemon, even if the `daemon_id` directory exists. 397 This option is dangerous because if the same `daemon_id` runs concurrently, 398 the last to finish will overwrite the output of the first. 399 400 Returns 401 ------- 402 A SuccessTuple indicating success. 403 404 """ 405 import platform 406 if platform.system() == 'Windows': 407 return False, "Cannot run background jobs on Windows." 408 409 ### The daemon might exist and be paused. 410 if self.status == 'paused': 411 return self.resume() 412 413 self._remove_stop_file() 414 if self.status == 'running': 415 return True, f"Daemon '{self}' is already running." 416 417 self.mkdir_if_not_exists(allow_dirty_run) 418 _write_pickle_success_tuple = self.write_pickle() 419 if not _write_pickle_success_tuple[0]: 420 return _write_pickle_success_tuple 421 422 _launch_daemon_code = ( 423 "from meerschaum.utils.daemon import Daemon; " 424 + f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 425 + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 426 + "allow_dirty_run=True)" 427 ) 428 env = dict(os.environ) 429 env['MRSM_NOASK'] = 'true' 430 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 431 msg = ( 432 "Success" 433 if _launch_success_bool 434 else f"Failed to start daemon '{self.daemon_id}'." 435 ) 436 return _launch_success_bool, msg 437 438 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 439 """ 440 Forcibly terminate a running daemon. 441 Sends a SIGTERM signal to the process. 442 443 Parameters 444 ---------- 445 timeout: Optional[int], default 3 446 How many seconds to wait for the process to terminate. 447 448 Returns 449 ------- 450 A SuccessTuple indicating success. 451 """ 452 if self.status != 'paused': 453 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 454 if success: 455 self._write_stop_file('kill') 456 return success, msg 457 458 if self.status == 'stopped': 459 self._write_stop_file('kill') 460 return True, "Process has already stopped." 461 462 process = self.process 463 try: 464 process.terminate() 465 process.kill() 466 process.wait(timeout=timeout) 467 except Exception as e: 468 return False, f"Failed to kill job {self} with exception: {e}" 469 470 if self.pid_path.exists(): 471 try: 472 self.pid_path.unlink() 473 except Exception as e: 474 pass 475 476 self._write_stop_file('kill') 477 return True, "Success" 478 479 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 480 """Gracefully quit a running daemon.""" 481 if self.status == 'paused': 482 return self.kill(timeout) 483 484 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 485 if signal_success: 486 self._write_stop_file('quit') 487 return signal_success, signal_msg 488 489 def pause( 490 self, 491 timeout: Union[int, float, None] = None, 492 check_timeout_interval: Union[float, int, None] = None, 493 ) -> SuccessTuple: 494 """ 495 Pause the daemon if it is running. 496 497 Parameters 498 ---------- 499 timeout: Union[float, int, None], default None 500 The maximum number of seconds to wait for a process to suspend. 501 502 check_timeout_interval: Union[float, int, None], default None 503 The number of seconds to wait between checking if the process is still running. 504 505 Returns 506 ------- 507 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 508 """ 509 if self.process is None: 510 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 511 512 if self.status == 'paused': 513 return True, f"Daemon '{self.daemon_id}' is already paused." 514 515 self._write_stop_file('pause') 516 try: 517 self.process.suspend() 518 except Exception as e: 519 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 520 521 timeout = self.get_timeout_seconds(timeout) 522 check_timeout_interval = self.get_check_timeout_interval_seconds( 523 check_timeout_interval 524 ) 525 526 psutil = attempt_import('psutil') 527 528 if not timeout: 529 try: 530 success = self.process.status() == 'stopped' 531 except psutil.NoSuchProcess as e: 532 success = True 533 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 534 if success: 535 self._capture_process_timestamp('paused') 536 return success, msg 537 538 begin = time.perf_counter() 539 while (time.perf_counter() - begin) < timeout: 540 try: 541 if self.process.status() == 'stopped': 542 self._capture_process_timestamp('paused') 543 return True, "Success" 544 except psutil.NoSuchProcess as e: 545 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 546 time.sleep(check_timeout_interval) 547 548 return False, ( 549 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 550 + ('s' if timeout != 1 else '') + '.' 551 ) 552 553 def resume( 554 self, 555 timeout: Union[int, float, None] = None, 556 check_timeout_interval: Union[float, int, None] = None, 557 ) -> SuccessTuple: 558 """ 559 Resume the daemon if it is paused. 560 561 Parameters 562 ---------- 563 timeout: Union[float, int, None], default None 564 The maximum number of seconds to wait for a process to resume. 565 566 check_timeout_interval: Union[float, int, None], default None 567 The number of seconds to wait between checking if the process is still stopped. 568 569 Returns 570 ------- 571 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 572 """ 573 if self.status == 'running': 574 return True, f"Daemon '{self.daemon_id}' is already running." 575 576 if self.status == 'stopped': 577 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 578 579 self._remove_stop_file() 580 try: 581 self.process.resume() 582 except Exception as e: 583 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 584 585 timeout = self.get_timeout_seconds(timeout) 586 check_timeout_interval = self.get_check_timeout_interval_seconds( 587 check_timeout_interval 588 ) 589 590 if not timeout: 591 success = self.status == 'running' 592 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 593 if success: 594 self._capture_process_timestamp('began') 595 return success, msg 596 597 begin = time.perf_counter() 598 while (time.perf_counter() - begin) < timeout: 599 if self.status == 'running': 600 self._capture_process_timestamp('began') 601 return True, "Success" 602 time.sleep(check_timeout_interval) 603 604 return False, ( 605 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 606 + ('s' if timeout != 1 else '') + '.' 607 ) 608 609 def _write_stop_file(self, action: str) -> SuccessTuple: 610 """Write the stop file timestamp and action.""" 611 if action not in ('quit', 'kill', 'pause'): 612 return False, f"Unsupported action '{action}'." 613 614 if not self.stop_path.parent.exists(): 615 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 616 617 with open(self.stop_path, 'w+', encoding='utf-8') as f: 618 json.dump( 619 { 620 'stop_time': datetime.now(timezone.utc).isoformat(), 621 'action': action, 622 }, 623 f 624 ) 625 626 return True, "Success" 627 628 def _remove_stop_file(self) -> SuccessTuple: 629 """Remove the stop file""" 630 if not self.stop_path.exists(): 631 return True, "Stop file does not exist." 632 633 try: 634 self.stop_path.unlink() 635 except Exception as e: 636 return False, f"Failed to remove stop file:\n{e}" 637 638 return True, "Success" 639 640 def _read_stop_file(self) -> Dict[str, Any]: 641 """ 642 Read the stop file if it exists. 643 """ 644 if not self.stop_path.exists(): 645 return {} 646 647 try: 648 with open(self.stop_path, 'r', encoding='utf-8') as f: 649 data = json.load(f) 650 return data 651 except Exception: 652 return {} 653 654 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 655 """ 656 Handle `SIGTERM` within the `Daemon` context. 657 This method is injected into the `DaemonContext`. 658 """ 659 from meerschaum.utils.process import signal_handler 660 signal_handler(signal_number, stack_frame) 661 662 timer = self.__dict__.get('_log_refresh_timer', None) 663 if timer is not None: 664 timer.cancel() 665 666 daemon_context = self.__dict__.get('_daemon_context', None) 667 if daemon_context is not None: 668 daemon_context.close() 669 670 _close_pools() 671 raise SystemExit(0) 672 673 def _send_signal( 674 self, 675 signal_to_send, 676 timeout: Union[float, int, None] = None, 677 check_timeout_interval: Union[float, int, None] = None, 678 ) -> SuccessTuple: 679 """Send a signal to the daemon process. 680 681 Parameters 682 ---------- 683 signal_to_send: 684 The signal the send to the daemon, e.g. `signals.SIGINT`. 685 686 timeout: Union[float, int, None], default None 687 The maximum number of seconds to wait for a process to terminate. 688 689 check_timeout_interval: Union[float, int, None], default None 690 The number of seconds to wait between checking if the process is still running. 691 692 Returns 693 ------- 694 A SuccessTuple indicating success. 695 """ 696 try: 697 pid = self.pid 698 if pid is None: 699 return ( 700 False, 701 f"Daemon '{self.daemon_id}' is not running, " 702 + f"cannot send signal '{signal_to_send}'." 703 ) 704 705 os.kill(pid, signal_to_send) 706 except Exception as e: 707 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 708 709 timeout = self.get_timeout_seconds(timeout) 710 check_timeout_interval = self.get_check_timeout_interval_seconds( 711 check_timeout_interval 712 ) 713 714 if not timeout: 715 return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'." 716 717 begin = time.perf_counter() 718 while (time.perf_counter() - begin) < timeout: 719 if not self.status == 'running': 720 return True, "Success" 721 time.sleep(check_timeout_interval) 722 723 return False, ( 724 f"Failed to stop daemon '{self.daemon_id}' within {timeout} second" 725 + ('s' if timeout != 1 else '') + '.' 726 ) 727 728 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 729 """Create the Daemon's directory. 730 If `allow_dirty_run` is `False` and the directory already exists, 731 raise a `FileExistsError`. 732 """ 733 try: 734 self.path.mkdir(parents=True, exist_ok=True) 735 _already_exists = any(os.scandir(self.path)) 736 except FileExistsError: 737 _already_exists = True 738 739 if _already_exists and not allow_dirty_run: 740 error( 741 f"Daemon '{self.daemon_id}' already exists. " + 742 f"To allow this daemon to run, do one of the following:\n" 743 + " - Execute `daemon.cleanup()`.\n" 744 + f" - Delete the directory '{self.path}'.\n" 745 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 746 FileExistsError, 747 ) 748 749 @property 750 def process(self) -> Union['psutil.Process', None]: 751 """ 752 Return the psutil process for the Daemon. 753 """ 754 psutil = attempt_import('psutil') 755 pid = self.pid 756 if pid is None: 757 return None 758 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 759 try: 760 self._process = psutil.Process(int(pid)) 761 except Exception as e: 762 if self.pid_path.exists(): 763 self.pid_path.unlink() 764 return None 765 return self._process 766 767 @property 768 def status(self) -> str: 769 """ 770 Return the running status of this Daemon. 771 """ 772 if self.process is None: 773 return 'stopped' 774 775 psutil = attempt_import('psutil') 776 try: 777 if self.process.status() == 'stopped': 778 return 'paused' 779 if self.process.status() == 'zombie': 780 raise psutil.NoSuchProcess(self.process.pid) 781 except (psutil.NoSuchProcess, AttributeError): 782 if self.pid_path.exists(): 783 try: 784 self.pid_path.unlink() 785 except Exception as e: 786 pass 787 return 'stopped' 788 789 return 'running' 790 791 @classmethod 792 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 793 """ 794 Return a Daemon's path from its `daemon_id`. 795 """ 796 return DAEMON_RESOURCES_PATH / daemon_id 797 798 @property 799 def path(self) -> pathlib.Path: 800 """ 801 Return the path for this Daemon's directory. 802 """ 803 return self._get_path_from_daemon_id(self.daemon_id) 804 805 @classmethod 806 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 807 """ 808 Return the `properties.json` path for a given `daemon_id`. 809 """ 810 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 811 812 @property 813 def properties_path(self) -> pathlib.Path: 814 """ 815 Return the `propterties.json` path for this Daemon. 816 """ 817 return self._get_properties_path_from_daemon_id(self.daemon_id) 818 819 @property 820 def stop_path(self) -> pathlib.Path: 821 """ 822 Return the path for the stop file (created when manually stopped). 823 """ 824 return self.path / '.stop.json' 825 826 @property 827 def log_path(self) -> pathlib.Path: 828 """ 829 Return the log path. 830 """ 831 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 832 833 @property 834 def stdin_file_path(self) -> pathlib.Path: 835 """ 836 Return the stdin file path. 837 """ 838 return self.path / 'input.stdin' 839 840 @property 841 def blocking_stdin_file_path(self) -> pathlib.Path: 842 """ 843 Return the stdin file path. 844 """ 845 if '_blocking_stdin_file_path' in self.__dict__: 846 return self._blocking_stdin_file_path 847 848 return self.path / 'input.stdin.block' 849 850 @property 851 def log_offset_path(self) -> pathlib.Path: 852 """ 853 Return the log offset file path. 854 """ 855 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 856 857 @property 858 def rotating_log(self) -> RotatingFile: 859 """ 860 The rotating log file for the daemon's output. 861 """ 862 if '_rotating_log' in self.__dict__: 863 return self._rotating_log 864 865 write_timestamps = ( 866 self.properties.get('logs', {}).get('write_timestamps', None) 867 ) 868 if write_timestamps is None: 869 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 870 871 self._rotating_log = RotatingFile( 872 self.log_path, 873 redirect_streams=True, 874 write_timestamps=write_timestamps, 875 timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'), 876 ) 877 return self._rotating_log 878 879 @property 880 def stdin_file(self): 881 """ 882 Return the file handler for the stdin file. 883 """ 884 if '_stdin_file' in self.__dict__: 885 return self._stdin_file 886 887 self._stdin_file = StdinFile( 888 self.stdin_file_path, 889 lock_file_path=self.blocking_stdin_file_path, 890 ) 891 return self._stdin_file 892 893 @property 894 def log_text(self) -> Optional[str]: 895 """ 896 Read the log files and return their contents. 897 Returns `None` if the log file does not exist. 898 """ 899 new_rotating_log = RotatingFile( 900 self.rotating_log.file_path, 901 num_files_to_keep = self.rotating_log.num_files_to_keep, 902 max_file_size = self.rotating_log.max_file_size, 903 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'), 904 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'), 905 ) 906 return new_rotating_log.read() 907 908 def readlines(self) -> List[str]: 909 """ 910 Read the next log lines, persisting the cursor for later use. 911 Note this will alter the cursor of `self.rotating_log`. 912 """ 913 self.rotating_log._cursor = self._read_log_offset() 914 lines = self.rotating_log.readlines() 915 self._write_log_offset() 916 return lines 917 918 def _read_log_offset(self) -> Tuple[int, int]: 919 """ 920 Return the current log offset cursor. 921 922 Returns 923 ------- 924 A tuple of the form (`subfile_index`, `position`). 925 """ 926 if not self.log_offset_path.exists(): 927 return 0, 0 928 929 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 930 cursor_text = f.read() 931 cursor_parts = cursor_text.split(' ') 932 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 933 return subfile_index, subfile_position 934 935 def _write_log_offset(self) -> None: 936 """ 937 Write the current log offset file. 938 """ 939 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 940 subfile_index = self.rotating_log._cursor[0] 941 subfile_position = self.rotating_log._cursor[1] 942 f.write(f"{subfile_index} {subfile_position}") 943 944 @property 945 def pid(self) -> Union[int, None]: 946 """ 947 Read the PID file and return its contents. 948 Returns `None` if the PID file does not exist. 949 """ 950 if not self.pid_path.exists(): 951 return None 952 try: 953 with open(self.pid_path, 'r', encoding='utf-8') as f: 954 text = f.read() 955 if len(text) == 0: 956 return None 957 pid = int(text.rstrip()) 958 except Exception as e: 959 warn(e) 960 text = None 961 pid = None 962 return pid 963 964 @property 965 def pid_path(self) -> pathlib.Path: 966 """ 967 Return the path to a file containing the PID for this Daemon. 968 """ 969 return self.path / 'process.pid' 970 971 @property 972 def pid_lock(self) -> 'fasteners.InterProcessLock': 973 """ 974 Return the process lock context manager. 975 """ 976 if '_pid_lock' in self.__dict__: 977 return self._pid_lock 978 979 fasteners = attempt_import('fasteners') 980 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 981 return self._pid_lock 982 983 @property 984 def pickle_path(self) -> pathlib.Path: 985 """ 986 Return the path for the pickle file. 987 """ 988 return self.path / 'pickle.pkl' 989 990 def read_properties(self) -> Optional[Dict[str, Any]]: 991 """Read the properties JSON file and return the dictionary.""" 992 if not self.properties_path.exists(): 993 return None 994 try: 995 with open(self.properties_path, 'r', encoding='utf-8') as file: 996 properties = json.load(file) 997 except Exception as e: 998 properties = {} 999 1000 return properties 1001 1002 def read_pickle(self) -> Daemon: 1003 """Read a Daemon's pickle file and return the `Daemon`.""" 1004 import pickle, traceback 1005 if not self.pickle_path.exists(): 1006 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1007 1008 if self.pickle_path.stat().st_size == 0: 1009 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1010 1011 try: 1012 with open(self.pickle_path, 'rb') as pickle_file: 1013 daemon = pickle.load(pickle_file) 1014 success, msg = True, 'Success' 1015 except Exception as e: 1016 success, msg = False, str(e) 1017 daemon = None 1018 traceback.print_exception(type(e), e, e.__traceback__) 1019 if not success: 1020 error(msg) 1021 return daemon 1022 1023 @property 1024 def properties(self) -> Dict[str, Any]: 1025 """ 1026 Return the contents of the properties JSON file. 1027 """ 1028 try: 1029 _file_properties = self.read_properties() 1030 except Exception: 1031 traceback.print_exc() 1032 _file_properties = {} 1033 1034 if not self._properties: 1035 self._properties = _file_properties 1036 1037 if self._properties is None: 1038 self._properties = {} 1039 1040 if _file_properties is not None: 1041 self._properties = apply_patch_to_config( 1042 _file_properties, 1043 self._properties, 1044 ) 1045 1046 return self._properties 1047 1048 @property 1049 def hidden(self) -> bool: 1050 """ 1051 Return a bool indicating whether this Daemon should be displayed. 1052 """ 1053 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1054 1055 def write_properties(self) -> SuccessTuple: 1056 """Write the properties dictionary to the properties JSON file 1057 (only if self.properties exists). 1058 """ 1059 success, msg = ( 1060 False, 1061 f"No properties to write for daemon '{self.daemon_id}'." 1062 ) 1063 if self.properties is not None: 1064 try: 1065 self.path.mkdir(parents=True, exist_ok=True) 1066 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1067 json.dump(self.properties, properties_file) 1068 success, msg = True, 'Success' 1069 except Exception as e: 1070 success, msg = False, str(e) 1071 return success, msg 1072 1073 def write_pickle(self) -> SuccessTuple: 1074 """Write the pickle file for the daemon.""" 1075 import pickle, traceback 1076 try: 1077 self.path.mkdir(parents=True, exist_ok=True) 1078 with open(self.pickle_path, 'wb+') as pickle_file: 1079 pickle.dump(self, pickle_file) 1080 success, msg = True, "Success" 1081 except Exception as e: 1082 success, msg = False, str(e) 1083 traceback.print_exception(type(e), e, e.__traceback__) 1084 return success, msg 1085 1086 1087 def _setup( 1088 self, 1089 allow_dirty_run: bool = False, 1090 ) -> None: 1091 """ 1092 Update properties before starting the Daemon. 1093 """ 1094 if self.properties is None: 1095 self._properties = {} 1096 1097 self._properties.update({ 1098 'target': { 1099 'name': self.target.__name__, 1100 'module': self.target.__module__, 1101 'args': self.target_args, 1102 'kw': self.target_kw, 1103 }, 1104 }) 1105 self.mkdir_if_not_exists(allow_dirty_run) 1106 _write_properties_success_tuple = self.write_properties() 1107 if not _write_properties_success_tuple[0]: 1108 error(_write_properties_success_tuple[1]) 1109 1110 _write_pickle_success_tuple = self.write_pickle() 1111 if not _write_pickle_success_tuple[0]: 1112 error(_write_pickle_success_tuple[1]) 1113 1114 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1115 """ 1116 Remove a daemon's directory after execution. 1117 1118 Parameters 1119 ---------- 1120 keep_logs: bool, default False 1121 If `True`, skip deleting the daemon's log files. 1122 1123 Returns 1124 ------- 1125 A `SuccessTuple` indicating success. 1126 """ 1127 if self.path.exists(): 1128 try: 1129 shutil.rmtree(self.path) 1130 except Exception as e: 1131 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1132 warn(msg) 1133 return False, msg 1134 if not keep_logs: 1135 self.rotating_log.delete() 1136 try: 1137 if self.log_offset_path.exists(): 1138 self.log_offset_path.unlink() 1139 except Exception as e: 1140 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1141 warn(msg) 1142 return False, msg 1143 return True, "Success" 1144 1145 1146 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1147 """ 1148 Return the timeout value to use. Use `--timeout-seconds` if provided, 1149 else the configured default (8). 1150 """ 1151 if isinstance(timeout, (int, float)): 1152 return timeout 1153 return get_config('jobs', 'timeout_seconds') 1154 1155 1156 def get_check_timeout_interval_seconds( 1157 self, 1158 check_timeout_interval: Union[int, float, None] = None, 1159 ) -> Union[int, float]: 1160 """ 1161 Return the interval value to check the status of timeouts. 1162 """ 1163 if isinstance(check_timeout_interval, (int, float)): 1164 return check_timeout_interval 1165 return get_config('jobs', 'check_timeout_interval_seconds') 1166 1167 @property 1168 def target_args(self) -> Union[Tuple[Any], None]: 1169 """ 1170 Return the positional arguments to pass to the target function. 1171 """ 1172 target_args = ( 1173 self.__dict__.get('_target_args', None) 1174 or self.properties.get('target', {}).get('args', None) 1175 ) 1176 if target_args is None: 1177 return tuple([]) 1178 1179 return tuple(target_args) 1180 1181 @property 1182 def target_kw(self) -> Union[Dict[str, Any], None]: 1183 """ 1184 Return the keyword arguments to pass to the target function. 1185 """ 1186 target_kw = ( 1187 self.__dict__.get('_target_kw', None) 1188 or self.properties.get('target', {}).get('kw', None) 1189 ) 1190 if target_kw is None: 1191 return {} 1192 1193 return {key: val for key, val in target_kw.items()} 1194 1195 def __getstate__(self): 1196 """ 1197 Pickle this Daemon. 1198 """ 1199 dill = attempt_import('dill') 1200 return { 1201 'target': dill.dumps(self.target), 1202 'target_args': self.target_args, 1203 'target_kw': self.target_kw, 1204 'daemon_id': self.daemon_id, 1205 'label': self.label, 1206 'properties': self.properties, 1207 } 1208 1209 def __setstate__(self, _state: Dict[str, Any]): 1210 """ 1211 Restore this Daemon from a pickled state. 1212 If the properties file exists, skip the old pickled version. 1213 """ 1214 dill = attempt_import('dill') 1215 _state['target'] = dill.loads(_state['target']) 1216 self._pickle = True 1217 daemon_id = _state.get('daemon_id', None) 1218 if not daemon_id: 1219 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1220 1221 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1222 ignore_properties = properties_path.exists() 1223 if ignore_properties: 1224 _state = { 1225 key: val 1226 for key, val in _state.items() 1227 if key != 'properties' 1228 } 1229 self.__init__(**_state) 1230 1231 1232 def __repr__(self): 1233 return str(self) 1234 1235 def __str__(self): 1236 return self.daemon_id 1237 1238 def __eq__(self, other): 1239 if not isinstance(other, Daemon): 1240 return False 1241 return self.daemon_id == other.daemon_id 1242 1243 def __hash__(self): 1244 return hash(self.daemon_id)
46class Daemon: 47 """ 48 Daemonize Python functions into background processes. 49 50 Examples 51 -------- 52 >>> import meerschaum as mrsm 53 >>> from meerschaum.utils.daemons import Daemon 54 >>> daemon = Daemon(print, ('hi',)) 55 >>> success, msg = daemon.run() 56 >>> print(daemon.log_text) 57 58 2024-07-29 18:03 | hi 59 2024-07-29 18:03 | 60 >>> daemon.run(allow_dirty_run=True) 61 >>> print(daemon.log_text) 62 63 2024-07-29 18:03 | hi 64 2024-07-29 18:03 | 65 2024-07-29 18:05 | hi 66 2024-07-29 18:05 | 67 >>> mrsm.pprint(daemon.properties) 68 { 69 'label': 'print', 70 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 71 'result': None, 72 'process': {'ended': '2024-07-29T18:03:33.752806'} 73 } 74 75 """ 76 77 def __new__( 78 cls, 79 *args, 80 daemon_id: Optional[str] = None, 81 **kw 82 ): 83 """ 84 If a daemon_id is provided and already exists, read from its pickle file. 85 """ 86 instance = super(Daemon, cls).__new__(cls) 87 if daemon_id is not None: 88 instance.daemon_id = daemon_id 89 if instance.pickle_path.exists(): 90 instance = instance.read_pickle() 91 return instance 92 93 @classmethod 94 def from_properties_file(cls, daemon_id: str) -> Daemon: 95 """ 96 Return a Daemon from a properties dictionary. 97 """ 98 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 99 if not properties_path.exists(): 100 raise OSError(f"Properties file '{properties_path}' does not exist.") 101 102 try: 103 with open(properties_path, 'r', encoding='utf-8') as f: 104 properties = json.load(f) 105 except Exception: 106 properties = {} 107 108 if not properties: 109 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 110 111 daemon_id = properties_path.parent.name 112 target_cf = properties.get('target', {}) 113 target_module_name = target_cf.get('module', None) 114 target_function_name = target_cf.get('name', None) 115 target_args = target_cf.get('args', None) 116 target_kw = target_cf.get('kw', None) 117 label = properties.get('label', None) 118 119 if None in [ 120 target_module_name, 121 target_function_name, 122 target_args, 123 target_kw, 124 ]: 125 raise ValueError("Missing target function information.") 126 127 target_module = importlib.import_module(target_module_name) 128 target_function = getattr(target_module, target_function_name) 129 130 return Daemon( 131 daemon_id=daemon_id, 132 target=target_function, 133 target_args=target_args, 134 target_kw=target_kw, 135 properties=properties, 136 label=label, 137 ) 138 139 140 def __init__( 141 self, 142 target: Optional[Callable[[Any], Any]] = None, 143 target_args: Union[List[Any], Tuple[Any], None] = None, 144 target_kw: Optional[Dict[str, Any]] = None, 145 env: Optional[Dict[str, str]] = None, 146 daemon_id: Optional[str] = None, 147 label: Optional[str] = None, 148 properties: Optional[Dict[str, Any]] = None, 149 ): 150 """ 151 Parameters 152 ---------- 153 target: Optional[Callable[[Any], Any]], default None, 154 The function to execute in a child process. 155 156 target_args: Union[List[Any], Tuple[Any], None], default None 157 Positional arguments to pass to the target function. 158 159 target_kw: Optional[Dict[str, Any]], default None 160 Keyword arguments to pass to the target function. 161 162 env: Optional[Dict[str, str]], default None 163 If provided, set these environment variables in the daemon process. 164 165 daemon_id: Optional[str], default None 166 Build a `Daemon` from an existing `daemon_id`. 167 If `daemon_id` is provided, other arguments are ignored and are derived 168 from the existing pickled `Daemon`. 169 170 label: Optional[str], default None 171 Label string to help identifiy a daemon. 172 If `None`, use the function name instead. 173 174 properties: Optional[Dict[str, Any]], default None 175 Override reading from the properties JSON by providing an existing dictionary. 176 """ 177 _pickle = self.__dict__.get('_pickle', False) 178 if daemon_id is not None: 179 self.daemon_id = daemon_id 180 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 181 182 if not self.properties_path.exists(): 183 raise Exception( 184 f"Daemon '{self.daemon_id}' does not exist. " 185 + "Pass a target to create a new Daemon." 186 ) 187 188 try: 189 new_daemon = self.from_properties_file(daemon_id) 190 except Exception: 191 new_daemon = None 192 193 if new_daemon is not None: 194 new_daemon.write_pickle() 195 target = new_daemon.target 196 target_args = new_daemon.target_args 197 target_kw = new_daemon.target_kw 198 label = new_daemon.label 199 self._properties = new_daemon.properties 200 else: 201 try: 202 self.properties_path.unlink() 203 except Exception: 204 pass 205 206 raise Exception( 207 f"Could not recover daemon '{self.daemon_id}' " 208 + "from its properties file." 209 ) 210 211 if 'target' not in self.__dict__: 212 if target is None: 213 error("Cannot create a Daemon without a target.") 214 self.target = target 215 216 ### NOTE: We have to check self.__dict__ in case we un-pickling. 217 if '_target_args' not in self.__dict__: 218 self._target_args = target_args 219 if '_target_kw' not in self.__dict__: 220 self._target_kw = target_kw 221 222 if 'label' not in self.__dict__: 223 if label is None: 224 label = ( 225 self.target.__name__ if '__name__' in self.target.__dir__() 226 else str(self.target) 227 ) 228 self.label = label 229 if 'daemon_id' not in self.__dict__: 230 self.daemon_id = get_new_daemon_name() 231 if '_properties' not in self.__dict__: 232 self._properties = properties 233 if self._properties is None: 234 self._properties = {} 235 236 self._properties.update({'label': self.label}) 237 if env: 238 self._properties.update({'env': env}) 239 240 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 241 _ = self.process 242 243 244 def _run_exit( 245 self, 246 keep_daemon_output: bool = True, 247 allow_dirty_run: bool = False, 248 ) -> Any: 249 """Run the daemon's target function. 250 NOTE: This WILL EXIT the parent process! 251 252 Parameters 253 ---------- 254 keep_daemon_output: bool, default True 255 If `False`, delete the daemon's output directory upon exiting. 256 257 allow_dirty_run, bool, default False: 258 If `True`, run the daemon, even if the `daemon_id` directory exists. 259 This option is dangerous because if the same `daemon_id` runs twice, 260 the last to finish will overwrite the output of the first. 261 262 Returns 263 ------- 264 Nothing — this will exit the parent process. 265 """ 266 import platform, sys, os, traceback 267 from meerschaum.utils.warnings import warn 268 from meerschaum.config import get_config 269 daemon = attempt_import('daemon') 270 lines = get_config('jobs', 'terminal', 'lines') 271 columns = get_config('jobs', 'terminal', 'columns') 272 273 if platform.system() == 'Windows': 274 return False, "Windows is no longer supported." 275 276 self._setup(allow_dirty_run) 277 278 ### NOTE: The SIGINT handler has been removed so that child processes may handle 279 ### KeyboardInterrupts themselves. 280 ### The previous aggressive approach was redundant because of the SIGTERM handler. 281 self._daemon_context = daemon.DaemonContext( 282 pidfile=self.pid_lock, 283 stdout=self.rotating_log, 284 stderr=self.rotating_log, 285 working_directory=os.getcwd(), 286 detach_process=True, 287 files_preserve=list(self.rotating_log.subfile_objects.values()), 288 signal_map={ 289 signal.SIGTERM: self._handle_sigterm, 290 }, 291 ) 292 293 _daemons.append(self) 294 295 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 296 self._log_refresh_timer = RepeatTimer( 297 log_refresh_seconds, 298 partial(self.rotating_log.refresh_files, start_interception=True), 299 ) 300 301 try: 302 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 303 with self._daemon_context: 304 sys.stdin = self.stdin_file 305 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 306 os.environ['PYTHONUNBUFFERED'] = '1' 307 308 ### Allow the user to override environment variables. 309 env = self.properties.get('env', {}) 310 if env and isinstance(env, dict): 311 os.environ.update({str(k): str(v) for k, v in env.items()}) 312 313 self.rotating_log.refresh_files(start_interception=True) 314 result = None 315 try: 316 with open(self.pid_path, 'w+', encoding='utf-8') as f: 317 f.write(str(os.getpid())) 318 319 self._log_refresh_timer.start() 320 self.properties['result'] = None 321 self._capture_process_timestamp('began') 322 result = self.target(*self.target_args, **self.target_kw) 323 self.properties['result'] = result 324 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 325 pass 326 except Exception as e: 327 warn( 328 f"Exception in daemon target function: {traceback.format_exc()}", 329 ) 330 result = e 331 finally: 332 _results[self.daemon_id] = result 333 334 if keep_daemon_output: 335 self._capture_process_timestamp('ended') 336 else: 337 self.cleanup() 338 339 self._log_refresh_timer.cancel() 340 if self.pid is None and self.pid_path.exists(): 341 self.pid_path.unlink() 342 343 if is_success_tuple(result): 344 try: 345 mrsm.pprint(result) 346 except BrokenPipeError: 347 pass 348 349 except Exception as e: 350 daemon_error = traceback.format_exc() 351 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 352 f.write(daemon_error) 353 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 354 355 def _capture_process_timestamp( 356 self, 357 process_key: str, 358 write_properties: bool = True, 359 ) -> None: 360 """ 361 Record the current timestamp to the parameters `process:<process_key>`. 362 363 Parameters 364 ---------- 365 process_key: str 366 Under which key to store the timestamp. 367 368 write_properties: bool, default True 369 If `True` persist the properties to disk immediately after capturing the timestamp. 370 """ 371 if 'process' not in self.properties: 372 self.properties['process'] = {} 373 374 if process_key not in ('began', 'ended', 'paused', 'stopped'): 375 raise ValueError(f"Invalid key '{process_key}'.") 376 377 self.properties['process'][process_key] = ( 378 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 379 ) 380 if write_properties: 381 self.write_properties() 382 383 def run( 384 self, 385 keep_daemon_output: bool = True, 386 allow_dirty_run: bool = False, 387 debug: bool = False, 388 ) -> SuccessTuple: 389 """Run the daemon as a child process and continue executing the parent. 390 391 Parameters 392 ---------- 393 keep_daemon_output: bool, default True 394 If `False`, delete the daemon's output directory upon exiting. 395 396 allow_dirty_run: bool, default False 397 If `True`, run the daemon, even if the `daemon_id` directory exists. 398 This option is dangerous because if the same `daemon_id` runs concurrently, 399 the last to finish will overwrite the output of the first. 400 401 Returns 402 ------- 403 A SuccessTuple indicating success. 404 405 """ 406 import platform 407 if platform.system() == 'Windows': 408 return False, "Cannot run background jobs on Windows." 409 410 ### The daemon might exist and be paused. 411 if self.status == 'paused': 412 return self.resume() 413 414 self._remove_stop_file() 415 if self.status == 'running': 416 return True, f"Daemon '{self}' is already running." 417 418 self.mkdir_if_not_exists(allow_dirty_run) 419 _write_pickle_success_tuple = self.write_pickle() 420 if not _write_pickle_success_tuple[0]: 421 return _write_pickle_success_tuple 422 423 _launch_daemon_code = ( 424 "from meerschaum.utils.daemon import Daemon; " 425 + f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 426 + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 427 + "allow_dirty_run=True)" 428 ) 429 env = dict(os.environ) 430 env['MRSM_NOASK'] = 'true' 431 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 432 msg = ( 433 "Success" 434 if _launch_success_bool 435 else f"Failed to start daemon '{self.daemon_id}'." 436 ) 437 return _launch_success_bool, msg 438 439 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 440 """ 441 Forcibly terminate a running daemon. 442 Sends a SIGTERM signal to the process. 443 444 Parameters 445 ---------- 446 timeout: Optional[int], default 3 447 How many seconds to wait for the process to terminate. 448 449 Returns 450 ------- 451 A SuccessTuple indicating success. 452 """ 453 if self.status != 'paused': 454 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 455 if success: 456 self._write_stop_file('kill') 457 return success, msg 458 459 if self.status == 'stopped': 460 self._write_stop_file('kill') 461 return True, "Process has already stopped." 462 463 process = self.process 464 try: 465 process.terminate() 466 process.kill() 467 process.wait(timeout=timeout) 468 except Exception as e: 469 return False, f"Failed to kill job {self} with exception: {e}" 470 471 if self.pid_path.exists(): 472 try: 473 self.pid_path.unlink() 474 except Exception as e: 475 pass 476 477 self._write_stop_file('kill') 478 return True, "Success" 479 480 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 481 """Gracefully quit a running daemon.""" 482 if self.status == 'paused': 483 return self.kill(timeout) 484 485 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 486 if signal_success: 487 self._write_stop_file('quit') 488 return signal_success, signal_msg 489 490 def pause( 491 self, 492 timeout: Union[int, float, None] = None, 493 check_timeout_interval: Union[float, int, None] = None, 494 ) -> SuccessTuple: 495 """ 496 Pause the daemon if it is running. 497 498 Parameters 499 ---------- 500 timeout: Union[float, int, None], default None 501 The maximum number of seconds to wait for a process to suspend. 502 503 check_timeout_interval: Union[float, int, None], default None 504 The number of seconds to wait between checking if the process is still running. 505 506 Returns 507 ------- 508 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 509 """ 510 if self.process is None: 511 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 512 513 if self.status == 'paused': 514 return True, f"Daemon '{self.daemon_id}' is already paused." 515 516 self._write_stop_file('pause') 517 try: 518 self.process.suspend() 519 except Exception as e: 520 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 521 522 timeout = self.get_timeout_seconds(timeout) 523 check_timeout_interval = self.get_check_timeout_interval_seconds( 524 check_timeout_interval 525 ) 526 527 psutil = attempt_import('psutil') 528 529 if not timeout: 530 try: 531 success = self.process.status() == 'stopped' 532 except psutil.NoSuchProcess as e: 533 success = True 534 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 535 if success: 536 self._capture_process_timestamp('paused') 537 return success, msg 538 539 begin = time.perf_counter() 540 while (time.perf_counter() - begin) < timeout: 541 try: 542 if self.process.status() == 'stopped': 543 self._capture_process_timestamp('paused') 544 return True, "Success" 545 except psutil.NoSuchProcess as e: 546 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 547 time.sleep(check_timeout_interval) 548 549 return False, ( 550 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 551 + ('s' if timeout != 1 else '') + '.' 552 ) 553 554 def resume( 555 self, 556 timeout: Union[int, float, None] = None, 557 check_timeout_interval: Union[float, int, None] = None, 558 ) -> SuccessTuple: 559 """ 560 Resume the daemon if it is paused. 561 562 Parameters 563 ---------- 564 timeout: Union[float, int, None], default None 565 The maximum number of seconds to wait for a process to resume. 566 567 check_timeout_interval: Union[float, int, None], default None 568 The number of seconds to wait between checking if the process is still stopped. 569 570 Returns 571 ------- 572 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 573 """ 574 if self.status == 'running': 575 return True, f"Daemon '{self.daemon_id}' is already running." 576 577 if self.status == 'stopped': 578 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 579 580 self._remove_stop_file() 581 try: 582 self.process.resume() 583 except Exception as e: 584 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 585 586 timeout = self.get_timeout_seconds(timeout) 587 check_timeout_interval = self.get_check_timeout_interval_seconds( 588 check_timeout_interval 589 ) 590 591 if not timeout: 592 success = self.status == 'running' 593 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 594 if success: 595 self._capture_process_timestamp('began') 596 return success, msg 597 598 begin = time.perf_counter() 599 while (time.perf_counter() - begin) < timeout: 600 if self.status == 'running': 601 self._capture_process_timestamp('began') 602 return True, "Success" 603 time.sleep(check_timeout_interval) 604 605 return False, ( 606 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 607 + ('s' if timeout != 1 else '') + '.' 608 ) 609 610 def _write_stop_file(self, action: str) -> SuccessTuple: 611 """Write the stop file timestamp and action.""" 612 if action not in ('quit', 'kill', 'pause'): 613 return False, f"Unsupported action '{action}'." 614 615 if not self.stop_path.parent.exists(): 616 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 617 618 with open(self.stop_path, 'w+', encoding='utf-8') as f: 619 json.dump( 620 { 621 'stop_time': datetime.now(timezone.utc).isoformat(), 622 'action': action, 623 }, 624 f 625 ) 626 627 return True, "Success" 628 629 def _remove_stop_file(self) -> SuccessTuple: 630 """Remove the stop file""" 631 if not self.stop_path.exists(): 632 return True, "Stop file does not exist." 633 634 try: 635 self.stop_path.unlink() 636 except Exception as e: 637 return False, f"Failed to remove stop file:\n{e}" 638 639 return True, "Success" 640 641 def _read_stop_file(self) -> Dict[str, Any]: 642 """ 643 Read the stop file if it exists. 644 """ 645 if not self.stop_path.exists(): 646 return {} 647 648 try: 649 with open(self.stop_path, 'r', encoding='utf-8') as f: 650 data = json.load(f) 651 return data 652 except Exception: 653 return {} 654 655 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 656 """ 657 Handle `SIGTERM` within the `Daemon` context. 658 This method is injected into the `DaemonContext`. 659 """ 660 from meerschaum.utils.process import signal_handler 661 signal_handler(signal_number, stack_frame) 662 663 timer = self.__dict__.get('_log_refresh_timer', None) 664 if timer is not None: 665 timer.cancel() 666 667 daemon_context = self.__dict__.get('_daemon_context', None) 668 if daemon_context is not None: 669 daemon_context.close() 670 671 _close_pools() 672 raise SystemExit(0) 673 674 def _send_signal( 675 self, 676 signal_to_send, 677 timeout: Union[float, int, None] = None, 678 check_timeout_interval: Union[float, int, None] = None, 679 ) -> SuccessTuple: 680 """Send a signal to the daemon process. 681 682 Parameters 683 ---------- 684 signal_to_send: 685 The signal the send to the daemon, e.g. `signals.SIGINT`. 686 687 timeout: Union[float, int, None], default None 688 The maximum number of seconds to wait for a process to terminate. 689 690 check_timeout_interval: Union[float, int, None], default None 691 The number of seconds to wait between checking if the process is still running. 692 693 Returns 694 ------- 695 A SuccessTuple indicating success. 696 """ 697 try: 698 pid = self.pid 699 if pid is None: 700 return ( 701 False, 702 f"Daemon '{self.daemon_id}' is not running, " 703 + f"cannot send signal '{signal_to_send}'." 704 ) 705 706 os.kill(pid, signal_to_send) 707 except Exception as e: 708 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 709 710 timeout = self.get_timeout_seconds(timeout) 711 check_timeout_interval = self.get_check_timeout_interval_seconds( 712 check_timeout_interval 713 ) 714 715 if not timeout: 716 return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'." 717 718 begin = time.perf_counter() 719 while (time.perf_counter() - begin) < timeout: 720 if not self.status == 'running': 721 return True, "Success" 722 time.sleep(check_timeout_interval) 723 724 return False, ( 725 f"Failed to stop daemon '{self.daemon_id}' within {timeout} second" 726 + ('s' if timeout != 1 else '') + '.' 727 ) 728 729 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 730 """Create the Daemon's directory. 731 If `allow_dirty_run` is `False` and the directory already exists, 732 raise a `FileExistsError`. 733 """ 734 try: 735 self.path.mkdir(parents=True, exist_ok=True) 736 _already_exists = any(os.scandir(self.path)) 737 except FileExistsError: 738 _already_exists = True 739 740 if _already_exists and not allow_dirty_run: 741 error( 742 f"Daemon '{self.daemon_id}' already exists. " + 743 f"To allow this daemon to run, do one of the following:\n" 744 + " - Execute `daemon.cleanup()`.\n" 745 + f" - Delete the directory '{self.path}'.\n" 746 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 747 FileExistsError, 748 ) 749 750 @property 751 def process(self) -> Union['psutil.Process', None]: 752 """ 753 Return the psutil process for the Daemon. 754 """ 755 psutil = attempt_import('psutil') 756 pid = self.pid 757 if pid is None: 758 return None 759 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 760 try: 761 self._process = psutil.Process(int(pid)) 762 except Exception as e: 763 if self.pid_path.exists(): 764 self.pid_path.unlink() 765 return None 766 return self._process 767 768 @property 769 def status(self) -> str: 770 """ 771 Return the running status of this Daemon. 772 """ 773 if self.process is None: 774 return 'stopped' 775 776 psutil = attempt_import('psutil') 777 try: 778 if self.process.status() == 'stopped': 779 return 'paused' 780 if self.process.status() == 'zombie': 781 raise psutil.NoSuchProcess(self.process.pid) 782 except (psutil.NoSuchProcess, AttributeError): 783 if self.pid_path.exists(): 784 try: 785 self.pid_path.unlink() 786 except Exception as e: 787 pass 788 return 'stopped' 789 790 return 'running' 791 792 @classmethod 793 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 794 """ 795 Return a Daemon's path from its `daemon_id`. 796 """ 797 return DAEMON_RESOURCES_PATH / daemon_id 798 799 @property 800 def path(self) -> pathlib.Path: 801 """ 802 Return the path for this Daemon's directory. 803 """ 804 return self._get_path_from_daemon_id(self.daemon_id) 805 806 @classmethod 807 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 808 """ 809 Return the `properties.json` path for a given `daemon_id`. 810 """ 811 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 812 813 @property 814 def properties_path(self) -> pathlib.Path: 815 """ 816 Return the `propterties.json` path for this Daemon. 817 """ 818 return self._get_properties_path_from_daemon_id(self.daemon_id) 819 820 @property 821 def stop_path(self) -> pathlib.Path: 822 """ 823 Return the path for the stop file (created when manually stopped). 824 """ 825 return self.path / '.stop.json' 826 827 @property 828 def log_path(self) -> pathlib.Path: 829 """ 830 Return the log path. 831 """ 832 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 833 834 @property 835 def stdin_file_path(self) -> pathlib.Path: 836 """ 837 Return the stdin file path. 838 """ 839 return self.path / 'input.stdin' 840 841 @property 842 def blocking_stdin_file_path(self) -> pathlib.Path: 843 """ 844 Return the stdin file path. 845 """ 846 if '_blocking_stdin_file_path' in self.__dict__: 847 return self._blocking_stdin_file_path 848 849 return self.path / 'input.stdin.block' 850 851 @property 852 def log_offset_path(self) -> pathlib.Path: 853 """ 854 Return the log offset file path. 855 """ 856 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 857 858 @property 859 def rotating_log(self) -> RotatingFile: 860 """ 861 The rotating log file for the daemon's output. 862 """ 863 if '_rotating_log' in self.__dict__: 864 return self._rotating_log 865 866 write_timestamps = ( 867 self.properties.get('logs', {}).get('write_timestamps', None) 868 ) 869 if write_timestamps is None: 870 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 871 872 self._rotating_log = RotatingFile( 873 self.log_path, 874 redirect_streams=True, 875 write_timestamps=write_timestamps, 876 timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'), 877 ) 878 return self._rotating_log 879 880 @property 881 def stdin_file(self): 882 """ 883 Return the file handler for the stdin file. 884 """ 885 if '_stdin_file' in self.__dict__: 886 return self._stdin_file 887 888 self._stdin_file = StdinFile( 889 self.stdin_file_path, 890 lock_file_path=self.blocking_stdin_file_path, 891 ) 892 return self._stdin_file 893 894 @property 895 def log_text(self) -> Optional[str]: 896 """ 897 Read the log files and return their contents. 898 Returns `None` if the log file does not exist. 899 """ 900 new_rotating_log = RotatingFile( 901 self.rotating_log.file_path, 902 num_files_to_keep = self.rotating_log.num_files_to_keep, 903 max_file_size = self.rotating_log.max_file_size, 904 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'), 905 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'), 906 ) 907 return new_rotating_log.read() 908 909 def readlines(self) -> List[str]: 910 """ 911 Read the next log lines, persisting the cursor for later use. 912 Note this will alter the cursor of `self.rotating_log`. 913 """ 914 self.rotating_log._cursor = self._read_log_offset() 915 lines = self.rotating_log.readlines() 916 self._write_log_offset() 917 return lines 918 919 def _read_log_offset(self) -> Tuple[int, int]: 920 """ 921 Return the current log offset cursor. 922 923 Returns 924 ------- 925 A tuple of the form (`subfile_index`, `position`). 926 """ 927 if not self.log_offset_path.exists(): 928 return 0, 0 929 930 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 931 cursor_text = f.read() 932 cursor_parts = cursor_text.split(' ') 933 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 934 return subfile_index, subfile_position 935 936 def _write_log_offset(self) -> None: 937 """ 938 Write the current log offset file. 939 """ 940 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 941 subfile_index = self.rotating_log._cursor[0] 942 subfile_position = self.rotating_log._cursor[1] 943 f.write(f"{subfile_index} {subfile_position}") 944 945 @property 946 def pid(self) -> Union[int, None]: 947 """ 948 Read the PID file and return its contents. 949 Returns `None` if the PID file does not exist. 950 """ 951 if not self.pid_path.exists(): 952 return None 953 try: 954 with open(self.pid_path, 'r', encoding='utf-8') as f: 955 text = f.read() 956 if len(text) == 0: 957 return None 958 pid = int(text.rstrip()) 959 except Exception as e: 960 warn(e) 961 text = None 962 pid = None 963 return pid 964 965 @property 966 def pid_path(self) -> pathlib.Path: 967 """ 968 Return the path to a file containing the PID for this Daemon. 969 """ 970 return self.path / 'process.pid' 971 972 @property 973 def pid_lock(self) -> 'fasteners.InterProcessLock': 974 """ 975 Return the process lock context manager. 976 """ 977 if '_pid_lock' in self.__dict__: 978 return self._pid_lock 979 980 fasteners = attempt_import('fasteners') 981 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 982 return self._pid_lock 983 984 @property 985 def pickle_path(self) -> pathlib.Path: 986 """ 987 Return the path for the pickle file. 988 """ 989 return self.path / 'pickle.pkl' 990 991 def read_properties(self) -> Optional[Dict[str, Any]]: 992 """Read the properties JSON file and return the dictionary.""" 993 if not self.properties_path.exists(): 994 return None 995 try: 996 with open(self.properties_path, 'r', encoding='utf-8') as file: 997 properties = json.load(file) 998 except Exception as e: 999 properties = {} 1000 1001 return properties 1002 1003 def read_pickle(self) -> Daemon: 1004 """Read a Daemon's pickle file and return the `Daemon`.""" 1005 import pickle, traceback 1006 if not self.pickle_path.exists(): 1007 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1008 1009 if self.pickle_path.stat().st_size == 0: 1010 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1011 1012 try: 1013 with open(self.pickle_path, 'rb') as pickle_file: 1014 daemon = pickle.load(pickle_file) 1015 success, msg = True, 'Success' 1016 except Exception as e: 1017 success, msg = False, str(e) 1018 daemon = None 1019 traceback.print_exception(type(e), e, e.__traceback__) 1020 if not success: 1021 error(msg) 1022 return daemon 1023 1024 @property 1025 def properties(self) -> Dict[str, Any]: 1026 """ 1027 Return the contents of the properties JSON file. 1028 """ 1029 try: 1030 _file_properties = self.read_properties() 1031 except Exception: 1032 traceback.print_exc() 1033 _file_properties = {} 1034 1035 if not self._properties: 1036 self._properties = _file_properties 1037 1038 if self._properties is None: 1039 self._properties = {} 1040 1041 if _file_properties is not None: 1042 self._properties = apply_patch_to_config( 1043 _file_properties, 1044 self._properties, 1045 ) 1046 1047 return self._properties 1048 1049 @property 1050 def hidden(self) -> bool: 1051 """ 1052 Return a bool indicating whether this Daemon should be displayed. 1053 """ 1054 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1055 1056 def write_properties(self) -> SuccessTuple: 1057 """Write the properties dictionary to the properties JSON file 1058 (only if self.properties exists). 1059 """ 1060 success, msg = ( 1061 False, 1062 f"No properties to write for daemon '{self.daemon_id}'." 1063 ) 1064 if self.properties is not None: 1065 try: 1066 self.path.mkdir(parents=True, exist_ok=True) 1067 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1068 json.dump(self.properties, properties_file) 1069 success, msg = True, 'Success' 1070 except Exception as e: 1071 success, msg = False, str(e) 1072 return success, msg 1073 1074 def write_pickle(self) -> SuccessTuple: 1075 """Write the pickle file for the daemon.""" 1076 import pickle, traceback 1077 try: 1078 self.path.mkdir(parents=True, exist_ok=True) 1079 with open(self.pickle_path, 'wb+') as pickle_file: 1080 pickle.dump(self, pickle_file) 1081 success, msg = True, "Success" 1082 except Exception as e: 1083 success, msg = False, str(e) 1084 traceback.print_exception(type(e), e, e.__traceback__) 1085 return success, msg 1086 1087 1088 def _setup( 1089 self, 1090 allow_dirty_run: bool = False, 1091 ) -> None: 1092 """ 1093 Update properties before starting the Daemon. 1094 """ 1095 if self.properties is None: 1096 self._properties = {} 1097 1098 self._properties.update({ 1099 'target': { 1100 'name': self.target.__name__, 1101 'module': self.target.__module__, 1102 'args': self.target_args, 1103 'kw': self.target_kw, 1104 }, 1105 }) 1106 self.mkdir_if_not_exists(allow_dirty_run) 1107 _write_properties_success_tuple = self.write_properties() 1108 if not _write_properties_success_tuple[0]: 1109 error(_write_properties_success_tuple[1]) 1110 1111 _write_pickle_success_tuple = self.write_pickle() 1112 if not _write_pickle_success_tuple[0]: 1113 error(_write_pickle_success_tuple[1]) 1114 1115 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1116 """ 1117 Remove a daemon's directory after execution. 1118 1119 Parameters 1120 ---------- 1121 keep_logs: bool, default False 1122 If `True`, skip deleting the daemon's log files. 1123 1124 Returns 1125 ------- 1126 A `SuccessTuple` indicating success. 1127 """ 1128 if self.path.exists(): 1129 try: 1130 shutil.rmtree(self.path) 1131 except Exception as e: 1132 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1133 warn(msg) 1134 return False, msg 1135 if not keep_logs: 1136 self.rotating_log.delete() 1137 try: 1138 if self.log_offset_path.exists(): 1139 self.log_offset_path.unlink() 1140 except Exception as e: 1141 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1142 warn(msg) 1143 return False, msg 1144 return True, "Success" 1145 1146 1147 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1148 """ 1149 Return the timeout value to use. Use `--timeout-seconds` if provided, 1150 else the configured default (8). 1151 """ 1152 if isinstance(timeout, (int, float)): 1153 return timeout 1154 return get_config('jobs', 'timeout_seconds') 1155 1156 1157 def get_check_timeout_interval_seconds( 1158 self, 1159 check_timeout_interval: Union[int, float, None] = None, 1160 ) -> Union[int, float]: 1161 """ 1162 Return the interval value to check the status of timeouts. 1163 """ 1164 if isinstance(check_timeout_interval, (int, float)): 1165 return check_timeout_interval 1166 return get_config('jobs', 'check_timeout_interval_seconds') 1167 1168 @property 1169 def target_args(self) -> Union[Tuple[Any], None]: 1170 """ 1171 Return the positional arguments to pass to the target function. 1172 """ 1173 target_args = ( 1174 self.__dict__.get('_target_args', None) 1175 or self.properties.get('target', {}).get('args', None) 1176 ) 1177 if target_args is None: 1178 return tuple([]) 1179 1180 return tuple(target_args) 1181 1182 @property 1183 def target_kw(self) -> Union[Dict[str, Any], None]: 1184 """ 1185 Return the keyword arguments to pass to the target function. 1186 """ 1187 target_kw = ( 1188 self.__dict__.get('_target_kw', None) 1189 or self.properties.get('target', {}).get('kw', None) 1190 ) 1191 if target_kw is None: 1192 return {} 1193 1194 return {key: val for key, val in target_kw.items()} 1195 1196 def __getstate__(self): 1197 """ 1198 Pickle this Daemon. 1199 """ 1200 dill = attempt_import('dill') 1201 return { 1202 'target': dill.dumps(self.target), 1203 'target_args': self.target_args, 1204 'target_kw': self.target_kw, 1205 'daemon_id': self.daemon_id, 1206 'label': self.label, 1207 'properties': self.properties, 1208 } 1209 1210 def __setstate__(self, _state: Dict[str, Any]): 1211 """ 1212 Restore this Daemon from a pickled state. 1213 If the properties file exists, skip the old pickled version. 1214 """ 1215 dill = attempt_import('dill') 1216 _state['target'] = dill.loads(_state['target']) 1217 self._pickle = True 1218 daemon_id = _state.get('daemon_id', None) 1219 if not daemon_id: 1220 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1221 1222 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1223 ignore_properties = properties_path.exists() 1224 if ignore_properties: 1225 _state = { 1226 key: val 1227 for key, val in _state.items() 1228 if key != 'properties' 1229 } 1230 self.__init__(**_state) 1231 1232 1233 def __repr__(self): 1234 return str(self) 1235 1236 def __str__(self): 1237 return self.daemon_id 1238 1239 def __eq__(self, other): 1240 if not isinstance(other, Daemon): 1241 return False 1242 return self.daemon_id == other.daemon_id 1243 1244 def __hash__(self): 1245 return hash(self.daemon_id)
Daemonize Python functions into background processes.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 |
>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |
>>> mrsm.pprint(daemon.properties)
{
'label': 'print',
'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
'result': None,
'process': {'ended': '2024-07-29T18:03:33.752806'}
}
140 def __init__( 141 self, 142 target: Optional[Callable[[Any], Any]] = None, 143 target_args: Union[List[Any], Tuple[Any], None] = None, 144 target_kw: Optional[Dict[str, Any]] = None, 145 env: Optional[Dict[str, str]] = None, 146 daemon_id: Optional[str] = None, 147 label: Optional[str] = None, 148 properties: Optional[Dict[str, Any]] = None, 149 ): 150 """ 151 Parameters 152 ---------- 153 target: Optional[Callable[[Any], Any]], default None, 154 The function to execute in a child process. 155 156 target_args: Union[List[Any], Tuple[Any], None], default None 157 Positional arguments to pass to the target function. 158 159 target_kw: Optional[Dict[str, Any]], default None 160 Keyword arguments to pass to the target function. 161 162 env: Optional[Dict[str, str]], default None 163 If provided, set these environment variables in the daemon process. 164 165 daemon_id: Optional[str], default None 166 Build a `Daemon` from an existing `daemon_id`. 167 If `daemon_id` is provided, other arguments are ignored and are derived 168 from the existing pickled `Daemon`. 169 170 label: Optional[str], default None 171 Label string to help identifiy a daemon. 172 If `None`, use the function name instead. 173 174 properties: Optional[Dict[str, Any]], default None 175 Override reading from the properties JSON by providing an existing dictionary. 176 """ 177 _pickle = self.__dict__.get('_pickle', False) 178 if daemon_id is not None: 179 self.daemon_id = daemon_id 180 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 181 182 if not self.properties_path.exists(): 183 raise Exception( 184 f"Daemon '{self.daemon_id}' does not exist. " 185 + "Pass a target to create a new Daemon." 186 ) 187 188 try: 189 new_daemon = self.from_properties_file(daemon_id) 190 except Exception: 191 new_daemon = None 192 193 if new_daemon is not None: 194 new_daemon.write_pickle() 195 target = new_daemon.target 196 target_args = new_daemon.target_args 197 target_kw = new_daemon.target_kw 198 label = new_daemon.label 199 self._properties = new_daemon.properties 200 else: 201 try: 202 self.properties_path.unlink() 203 except Exception: 204 pass 205 206 raise Exception( 207 f"Could not recover daemon '{self.daemon_id}' " 208 + "from its properties file." 209 ) 210 211 if 'target' not in self.__dict__: 212 if target is None: 213 error("Cannot create a Daemon without a target.") 214 self.target = target 215 216 ### NOTE: We have to check self.__dict__ in case we un-pickling. 217 if '_target_args' not in self.__dict__: 218 self._target_args = target_args 219 if '_target_kw' not in self.__dict__: 220 self._target_kw = target_kw 221 222 if 'label' not in self.__dict__: 223 if label is None: 224 label = ( 225 self.target.__name__ if '__name__' in self.target.__dir__() 226 else str(self.target) 227 ) 228 self.label = label 229 if 'daemon_id' not in self.__dict__: 230 self.daemon_id = get_new_daemon_name() 231 if '_properties' not in self.__dict__: 232 self._properties = properties 233 if self._properties is None: 234 self._properties = {} 235 236 self._properties.update({'label': self.label}) 237 if env: 238 self._properties.update({'env': env}) 239 240 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 241 _ = self.process
Parameters
- target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
- target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
- target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
- daemon_id (Optional[str], default None):
Build a
Daemon
from an existingdaemon_id
. Ifdaemon_id
is provided, other arguments are ignored and are derived from the existing pickledDaemon
. - label (Optional[str], default None):
Label string to help identifiy a daemon.
If
None
, use the function name instead. - properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
93 @classmethod 94 def from_properties_file(cls, daemon_id: str) -> Daemon: 95 """ 96 Return a Daemon from a properties dictionary. 97 """ 98 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 99 if not properties_path.exists(): 100 raise OSError(f"Properties file '{properties_path}' does not exist.") 101 102 try: 103 with open(properties_path, 'r', encoding='utf-8') as f: 104 properties = json.load(f) 105 except Exception: 106 properties = {} 107 108 if not properties: 109 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 110 111 daemon_id = properties_path.parent.name 112 target_cf = properties.get('target', {}) 113 target_module_name = target_cf.get('module', None) 114 target_function_name = target_cf.get('name', None) 115 target_args = target_cf.get('args', None) 116 target_kw = target_cf.get('kw', None) 117 label = properties.get('label', None) 118 119 if None in [ 120 target_module_name, 121 target_function_name, 122 target_args, 123 target_kw, 124 ]: 125 raise ValueError("Missing target function information.") 126 127 target_module = importlib.import_module(target_module_name) 128 target_function = getattr(target_module, target_function_name) 129 130 return Daemon( 131 daemon_id=daemon_id, 132 target=target_function, 133 target_args=target_args, 134 target_kw=target_kw, 135 properties=properties, 136 label=label, 137 )
Return a Daemon from a properties dictionary.
383 def run( 384 self, 385 keep_daemon_output: bool = True, 386 allow_dirty_run: bool = False, 387 debug: bool = False, 388 ) -> SuccessTuple: 389 """Run the daemon as a child process and continue executing the parent. 390 391 Parameters 392 ---------- 393 keep_daemon_output: bool, default True 394 If `False`, delete the daemon's output directory upon exiting. 395 396 allow_dirty_run: bool, default False 397 If `True`, run the daemon, even if the `daemon_id` directory exists. 398 This option is dangerous because if the same `daemon_id` runs concurrently, 399 the last to finish will overwrite the output of the first. 400 401 Returns 402 ------- 403 A SuccessTuple indicating success. 404 405 """ 406 import platform 407 if platform.system() == 'Windows': 408 return False, "Cannot run background jobs on Windows." 409 410 ### The daemon might exist and be paused. 411 if self.status == 'paused': 412 return self.resume() 413 414 self._remove_stop_file() 415 if self.status == 'running': 416 return True, f"Daemon '{self}' is already running." 417 418 self.mkdir_if_not_exists(allow_dirty_run) 419 _write_pickle_success_tuple = self.write_pickle() 420 if not _write_pickle_success_tuple[0]: 421 return _write_pickle_success_tuple 422 423 _launch_daemon_code = ( 424 "from meerschaum.utils.daemon import Daemon; " 425 + f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 426 + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 427 + "allow_dirty_run=True)" 428 ) 429 env = dict(os.environ) 430 env['MRSM_NOASK'] = 'true' 431 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 432 msg = ( 433 "Success" 434 if _launch_success_bool 435 else f"Failed to start daemon '{self.daemon_id}'." 436 ) 437 return _launch_success_bool, msg
Run the daemon as a child process and continue executing the parent.
Parameters
- keep_daemon_output (bool, default True):
If
False
, delete the daemon's output directory upon exiting. - allow_dirty_run (bool, default False):
If
True
, run the daemon, even if thedaemon_id
directory exists. This option is dangerous because if the samedaemon_id
runs concurrently, the last to finish will overwrite the output of the first.
Returns
- A SuccessTuple indicating success.
439 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 440 """ 441 Forcibly terminate a running daemon. 442 Sends a SIGTERM signal to the process. 443 444 Parameters 445 ---------- 446 timeout: Optional[int], default 3 447 How many seconds to wait for the process to terminate. 448 449 Returns 450 ------- 451 A SuccessTuple indicating success. 452 """ 453 if self.status != 'paused': 454 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 455 if success: 456 self._write_stop_file('kill') 457 return success, msg 458 459 if self.status == 'stopped': 460 self._write_stop_file('kill') 461 return True, "Process has already stopped." 462 463 process = self.process 464 try: 465 process.terminate() 466 process.kill() 467 process.wait(timeout=timeout) 468 except Exception as e: 469 return False, f"Failed to kill job {self} with exception: {e}" 470 471 if self.pid_path.exists(): 472 try: 473 self.pid_path.unlink() 474 except Exception as e: 475 pass 476 477 self._write_stop_file('kill') 478 return True, "Success"
Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.
Parameters
- timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
- A SuccessTuple indicating success.
480 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 481 """Gracefully quit a running daemon.""" 482 if self.status == 'paused': 483 return self.kill(timeout) 484 485 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 486 if signal_success: 487 self._write_stop_file('quit') 488 return signal_success, signal_msg
Gracefully quit a running daemon.
490 def pause( 491 self, 492 timeout: Union[int, float, None] = None, 493 check_timeout_interval: Union[float, int, None] = None, 494 ) -> SuccessTuple: 495 """ 496 Pause the daemon if it is running. 497 498 Parameters 499 ---------- 500 timeout: Union[float, int, None], default None 501 The maximum number of seconds to wait for a process to suspend. 502 503 check_timeout_interval: Union[float, int, None], default None 504 The number of seconds to wait between checking if the process is still running. 505 506 Returns 507 ------- 508 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 509 """ 510 if self.process is None: 511 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 512 513 if self.status == 'paused': 514 return True, f"Daemon '{self.daemon_id}' is already paused." 515 516 self._write_stop_file('pause') 517 try: 518 self.process.suspend() 519 except Exception as e: 520 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 521 522 timeout = self.get_timeout_seconds(timeout) 523 check_timeout_interval = self.get_check_timeout_interval_seconds( 524 check_timeout_interval 525 ) 526 527 psutil = attempt_import('psutil') 528 529 if not timeout: 530 try: 531 success = self.process.status() == 'stopped' 532 except psutil.NoSuchProcess as e: 533 success = True 534 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 535 if success: 536 self._capture_process_timestamp('paused') 537 return success, msg 538 539 begin = time.perf_counter() 540 while (time.perf_counter() - begin) < timeout: 541 try: 542 if self.process.status() == 'stopped': 543 self._capture_process_timestamp('paused') 544 return True, "Success" 545 except psutil.NoSuchProcess as e: 546 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 547 time.sleep(check_timeout_interval) 548 549 return False, ( 550 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 551 + ('s' if timeout != 1 else '') + '.' 552 )
Pause the daemon if it is running.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully suspended.
554 def resume( 555 self, 556 timeout: Union[int, float, None] = None, 557 check_timeout_interval: Union[float, int, None] = None, 558 ) -> SuccessTuple: 559 """ 560 Resume the daemon if it is paused. 561 562 Parameters 563 ---------- 564 timeout: Union[float, int, None], default None 565 The maximum number of seconds to wait for a process to resume. 566 567 check_timeout_interval: Union[float, int, None], default None 568 The number of seconds to wait between checking if the process is still stopped. 569 570 Returns 571 ------- 572 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 573 """ 574 if self.status == 'running': 575 return True, f"Daemon '{self.daemon_id}' is already running." 576 577 if self.status == 'stopped': 578 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 579 580 self._remove_stop_file() 581 try: 582 self.process.resume() 583 except Exception as e: 584 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 585 586 timeout = self.get_timeout_seconds(timeout) 587 check_timeout_interval = self.get_check_timeout_interval_seconds( 588 check_timeout_interval 589 ) 590 591 if not timeout: 592 success = self.status == 'running' 593 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 594 if success: 595 self._capture_process_timestamp('began') 596 return success, msg 597 598 begin = time.perf_counter() 599 while (time.perf_counter() - begin) < timeout: 600 if self.status == 'running': 601 self._capture_process_timestamp('began') 602 return True, "Success" 603 time.sleep(check_timeout_interval) 604 605 return False, ( 606 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 607 + ('s' if timeout != 1 else '') + '.' 608 )
Resume the daemon if it is paused.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully resumed.
729 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 730 """Create the Daemon's directory. 731 If `allow_dirty_run` is `False` and the directory already exists, 732 raise a `FileExistsError`. 733 """ 734 try: 735 self.path.mkdir(parents=True, exist_ok=True) 736 _already_exists = any(os.scandir(self.path)) 737 except FileExistsError: 738 _already_exists = True 739 740 if _already_exists and not allow_dirty_run: 741 error( 742 f"Daemon '{self.daemon_id}' already exists. " + 743 f"To allow this daemon to run, do one of the following:\n" 744 + " - Execute `daemon.cleanup()`.\n" 745 + f" - Delete the directory '{self.path}'.\n" 746 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 747 FileExistsError, 748 )
Create the Daemon's directory.
If allow_dirty_run
is False
and the directory already exists,
raise a FileExistsError
.
750 @property 751 def process(self) -> Union['psutil.Process', None]: 752 """ 753 Return the psutil process for the Daemon. 754 """ 755 psutil = attempt_import('psutil') 756 pid = self.pid 757 if pid is None: 758 return None 759 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 760 try: 761 self._process = psutil.Process(int(pid)) 762 except Exception as e: 763 if self.pid_path.exists(): 764 self.pid_path.unlink() 765 return None 766 return self._process
Return the psutil process for the Daemon.
768 @property 769 def status(self) -> str: 770 """ 771 Return the running status of this Daemon. 772 """ 773 if self.process is None: 774 return 'stopped' 775 776 psutil = attempt_import('psutil') 777 try: 778 if self.process.status() == 'stopped': 779 return 'paused' 780 if self.process.status() == 'zombie': 781 raise psutil.NoSuchProcess(self.process.pid) 782 except (psutil.NoSuchProcess, AttributeError): 783 if self.pid_path.exists(): 784 try: 785 self.pid_path.unlink() 786 except Exception as e: 787 pass 788 return 'stopped' 789 790 return 'running'
Return the running status of this Daemon.
799 @property 800 def path(self) -> pathlib.Path: 801 """ 802 Return the path for this Daemon's directory. 803 """ 804 return self._get_path_from_daemon_id(self.daemon_id)
Return the path for this Daemon's directory.
813 @property 814 def properties_path(self) -> pathlib.Path: 815 """ 816 Return the `propterties.json` path for this Daemon. 817 """ 818 return self._get_properties_path_from_daemon_id(self.daemon_id)
Return the propterties.json
path for this Daemon.
820 @property 821 def stop_path(self) -> pathlib.Path: 822 """ 823 Return the path for the stop file (created when manually stopped). 824 """ 825 return self.path / '.stop.json'
Return the path for the stop file (created when manually stopped).
827 @property 828 def log_path(self) -> pathlib.Path: 829 """ 830 Return the log path. 831 """ 832 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
Return the log path.
834 @property 835 def stdin_file_path(self) -> pathlib.Path: 836 """ 837 Return the stdin file path. 838 """ 839 return self.path / 'input.stdin'
Return the stdin file path.
841 @property 842 def blocking_stdin_file_path(self) -> pathlib.Path: 843 """ 844 Return the stdin file path. 845 """ 846 if '_blocking_stdin_file_path' in self.__dict__: 847 return self._blocking_stdin_file_path 848 849 return self.path / 'input.stdin.block'
Return the stdin file path.
851 @property 852 def log_offset_path(self) -> pathlib.Path: 853 """ 854 Return the log offset file path. 855 """ 856 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
Return the log offset file path.
858 @property 859 def rotating_log(self) -> RotatingFile: 860 """ 861 The rotating log file for the daemon's output. 862 """ 863 if '_rotating_log' in self.__dict__: 864 return self._rotating_log 865 866 write_timestamps = ( 867 self.properties.get('logs', {}).get('write_timestamps', None) 868 ) 869 if write_timestamps is None: 870 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 871 872 self._rotating_log = RotatingFile( 873 self.log_path, 874 redirect_streams=True, 875 write_timestamps=write_timestamps, 876 timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'), 877 ) 878 return self._rotating_log
The rotating log file for the daemon's output.
880 @property 881 def stdin_file(self): 882 """ 883 Return the file handler for the stdin file. 884 """ 885 if '_stdin_file' in self.__dict__: 886 return self._stdin_file 887 888 self._stdin_file = StdinFile( 889 self.stdin_file_path, 890 lock_file_path=self.blocking_stdin_file_path, 891 ) 892 return self._stdin_file
Return the file handler for the stdin file.
894 @property 895 def log_text(self) -> Optional[str]: 896 """ 897 Read the log files and return their contents. 898 Returns `None` if the log file does not exist. 899 """ 900 new_rotating_log = RotatingFile( 901 self.rotating_log.file_path, 902 num_files_to_keep = self.rotating_log.num_files_to_keep, 903 max_file_size = self.rotating_log.max_file_size, 904 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'), 905 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'), 906 ) 907 return new_rotating_log.read()
Read the log files and return their contents.
Returns None
if the log file does not exist.
909 def readlines(self) -> List[str]: 910 """ 911 Read the next log lines, persisting the cursor for later use. 912 Note this will alter the cursor of `self.rotating_log`. 913 """ 914 self.rotating_log._cursor = self._read_log_offset() 915 lines = self.rotating_log.readlines() 916 self._write_log_offset() 917 return lines
Read the next log lines, persisting the cursor for later use.
Note this will alter the cursor of self.rotating_log
.
945 @property 946 def pid(self) -> Union[int, None]: 947 """ 948 Read the PID file and return its contents. 949 Returns `None` if the PID file does not exist. 950 """ 951 if not self.pid_path.exists(): 952 return None 953 try: 954 with open(self.pid_path, 'r', encoding='utf-8') as f: 955 text = f.read() 956 if len(text) == 0: 957 return None 958 pid = int(text.rstrip()) 959 except Exception as e: 960 warn(e) 961 text = None 962 pid = None 963 return pid
Read the PID file and return its contents.
Returns None
if the PID file does not exist.
965 @property 966 def pid_path(self) -> pathlib.Path: 967 """ 968 Return the path to a file containing the PID for this Daemon. 969 """ 970 return self.path / 'process.pid'
Return the path to a file containing the PID for this Daemon.
972 @property 973 def pid_lock(self) -> 'fasteners.InterProcessLock': 974 """ 975 Return the process lock context manager. 976 """ 977 if '_pid_lock' in self.__dict__: 978 return self._pid_lock 979 980 fasteners = attempt_import('fasteners') 981 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 982 return self._pid_lock
Return the process lock context manager.
984 @property 985 def pickle_path(self) -> pathlib.Path: 986 """ 987 Return the path for the pickle file. 988 """ 989 return self.path / 'pickle.pkl'
Return the path for the pickle file.
991 def read_properties(self) -> Optional[Dict[str, Any]]: 992 """Read the properties JSON file and return the dictionary.""" 993 if not self.properties_path.exists(): 994 return None 995 try: 996 with open(self.properties_path, 'r', encoding='utf-8') as file: 997 properties = json.load(file) 998 except Exception as e: 999 properties = {} 1000 1001 return properties
Read the properties JSON file and return the dictionary.
1003 def read_pickle(self) -> Daemon: 1004 """Read a Daemon's pickle file and return the `Daemon`.""" 1005 import pickle, traceback 1006 if not self.pickle_path.exists(): 1007 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1008 1009 if self.pickle_path.stat().st_size == 0: 1010 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1011 1012 try: 1013 with open(self.pickle_path, 'rb') as pickle_file: 1014 daemon = pickle.load(pickle_file) 1015 success, msg = True, 'Success' 1016 except Exception as e: 1017 success, msg = False, str(e) 1018 daemon = None 1019 traceback.print_exception(type(e), e, e.__traceback__) 1020 if not success: 1021 error(msg) 1022 return daemon
Read a Daemon's pickle file and return the Daemon
.
1024 @property 1025 def properties(self) -> Dict[str, Any]: 1026 """ 1027 Return the contents of the properties JSON file. 1028 """ 1029 try: 1030 _file_properties = self.read_properties() 1031 except Exception: 1032 traceback.print_exc() 1033 _file_properties = {} 1034 1035 if not self._properties: 1036 self._properties = _file_properties 1037 1038 if self._properties is None: 1039 self._properties = {} 1040 1041 if _file_properties is not None: 1042 self._properties = apply_patch_to_config( 1043 _file_properties, 1044 self._properties, 1045 ) 1046 1047 return self._properties
Return the contents of the properties JSON file.
1056 def write_properties(self) -> SuccessTuple: 1057 """Write the properties dictionary to the properties JSON file 1058 (only if self.properties exists). 1059 """ 1060 success, msg = ( 1061 False, 1062 f"No properties to write for daemon '{self.daemon_id}'." 1063 ) 1064 if self.properties is not None: 1065 try: 1066 self.path.mkdir(parents=True, exist_ok=True) 1067 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1068 json.dump(self.properties, properties_file) 1069 success, msg = True, 'Success' 1070 except Exception as e: 1071 success, msg = False, str(e) 1072 return success, msg
Write the properties dictionary to the properties JSON file (only if self.properties exists).
1074 def write_pickle(self) -> SuccessTuple: 1075 """Write the pickle file for the daemon.""" 1076 import pickle, traceback 1077 try: 1078 self.path.mkdir(parents=True, exist_ok=True) 1079 with open(self.pickle_path, 'wb+') as pickle_file: 1080 pickle.dump(self, pickle_file) 1081 success, msg = True, "Success" 1082 except Exception as e: 1083 success, msg = False, str(e) 1084 traceback.print_exception(type(e), e, e.__traceback__) 1085 return success, msg
Write the pickle file for the daemon.
1115 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1116 """ 1117 Remove a daemon's directory after execution. 1118 1119 Parameters 1120 ---------- 1121 keep_logs: bool, default False 1122 If `True`, skip deleting the daemon's log files. 1123 1124 Returns 1125 ------- 1126 A `SuccessTuple` indicating success. 1127 """ 1128 if self.path.exists(): 1129 try: 1130 shutil.rmtree(self.path) 1131 except Exception as e: 1132 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1133 warn(msg) 1134 return False, msg 1135 if not keep_logs: 1136 self.rotating_log.delete() 1137 try: 1138 if self.log_offset_path.exists(): 1139 self.log_offset_path.unlink() 1140 except Exception as e: 1141 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1142 warn(msg) 1143 return False, msg 1144 return True, "Success"
Remove a daemon's directory after execution.
Parameters
- keep_logs (bool, default False):
If
True
, skip deleting the daemon's log files.
Returns
- A
SuccessTuple
indicating success.
1147 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1148 """ 1149 Return the timeout value to use. Use `--timeout-seconds` if provided, 1150 else the configured default (8). 1151 """ 1152 if isinstance(timeout, (int, float)): 1153 return timeout 1154 return get_config('jobs', 'timeout_seconds')
Return the timeout value to use. Use --timeout-seconds
if provided,
else the configured default (8).
1157 def get_check_timeout_interval_seconds( 1158 self, 1159 check_timeout_interval: Union[int, float, None] = None, 1160 ) -> Union[int, float]: 1161 """ 1162 Return the interval value to check the status of timeouts. 1163 """ 1164 if isinstance(check_timeout_interval, (int, float)): 1165 return check_timeout_interval 1166 return get_config('jobs', 'check_timeout_interval_seconds')
Return the interval value to check the status of timeouts.
1168 @property 1169 def target_args(self) -> Union[Tuple[Any], None]: 1170 """ 1171 Return the positional arguments to pass to the target function. 1172 """ 1173 target_args = ( 1174 self.__dict__.get('_target_args', None) 1175 or self.properties.get('target', {}).get('args', None) 1176 ) 1177 if target_args is None: 1178 return tuple([]) 1179 1180 return tuple(target_args)
Return the positional arguments to pass to the target function.
1182 @property 1183 def target_kw(self) -> Union[Dict[str, Any], None]: 1184 """ 1185 Return the keyword arguments to pass to the target function. 1186 """ 1187 target_kw = ( 1188 self.__dict__.get('_target_kw', None) 1189 or self.properties.get('target', {}).get('kw', None) 1190 ) 1191 if target_kw is None: 1192 return {} 1193 1194 return {key: val for key, val in target_kw.items()}
Return the keyword arguments to pass to the target function.