meerschaum.utils.daemon.Daemon
Manage running daemons via the Daemon class.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Manage running daemons via the Daemon class. 7""" 8 9from __future__ import annotations 10import os 11import importlib 12import pathlib 13import json 14import shutil 15import signal 16import sys 17import time 18import traceback 19from functools import partial 20from datetime import datetime, timezone 21 22import meerschaum as mrsm 23from meerschaum.utils.typing import ( 24 Optional, Dict, Any, SuccessTuple, Callable, List, Union, 25 is_success_tuple, Tuple, 26) 27from meerschaum.config import get_config 28from meerschaum.config.static import STATIC_CONFIG 29from meerschaum.config._paths import ( 30 DAEMON_RESOURCES_PATH, LOGS_RESOURCES_PATH, DAEMON_ERROR_LOG_PATH, 31) 32from meerschaum.config._patch import apply_patch_to_config 33from meerschaum.utils.warnings import warn, error 34from meerschaum.utils.packages import attempt_import 35from meerschaum.utils.venv import venv_exec 36from meerschaum.utils.daemon._names import get_new_daemon_name 37from meerschaum.utils.daemon.RotatingFile import RotatingFile 38from meerschaum.utils.daemon.StdinFile import StdinFile 39from meerschaum.utils.threading import RepeatTimer 40from meerschaum.__main__ import _close_pools 41 42_daemons = [] 43_results = {} 44 45class Daemon: 46 """ 47 Daemonize Python functions into background processes. 48 49 Examples 50 -------- 51 >>> import meerschaum as mrsm 52 >>> from meerschaum.utils.daemons import Daemon 53 >>> daemon = Daemon(print, ('hi',)) 54 >>> success, msg = daemon.run() 55 >>> print(daemon.log_text) 56 57 2024-07-29 18:03 | hi 58 2024-07-29 18:03 | 59 >>> daemon.run(allow_dirty_run=True) 60 >>> print(daemon.log_text) 61 62 2024-07-29 18:03 | hi 63 2024-07-29 18:03 | 64 2024-07-29 18:05 | hi 65 2024-07-29 18:05 | 66 >>> mrsm.pprint(daemon.properties) 67 { 68 'label': 'print', 69 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 70 'result': None, 71 'process': {'ended': '2024-07-29T18:03:33.752806'} 72 } 73 74 """ 75 76 def __new__( 77 cls, 78 *args, 79 daemon_id: Optional[str] = None, 80 **kw 81 ): 82 """ 83 If a daemon_id is provided and already exists, read from its pickle file. 84 """ 85 instance = super(Daemon, cls).__new__(cls) 86 if daemon_id is not None: 87 instance.daemon_id = daemon_id 88 if instance.pickle_path.exists(): 89 instance = instance.read_pickle() 90 return instance 91 92 @classmethod 93 def from_properties_file(cls, daemon_id: str) -> Daemon: 94 """ 95 Return a Daemon from a properties dictionary. 96 """ 97 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 98 if not properties_path.exists(): 99 raise OSError(f"Properties file '{properties_path}' does not exist.") 100 101 try: 102 with open(properties_path, 'r', encoding='utf-8') as f: 103 properties = json.load(f) 104 except Exception: 105 properties = {} 106 107 if not properties: 108 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 109 110 daemon_id = properties_path.parent.name 111 target_cf = properties.get('target', {}) 112 target_module_name = target_cf.get('module', None) 113 target_function_name = target_cf.get('name', None) 114 target_args = target_cf.get('args', None) 115 target_kw = target_cf.get('kw', None) 116 label = properties.get('label', None) 117 118 if None in [ 119 target_module_name, 120 target_function_name, 121 target_args, 122 target_kw, 123 ]: 124 raise ValueError("Missing target function information.") 125 126 target_module = importlib.import_module(target_module_name) 127 target_function = getattr(target_module, target_function_name) 128 129 return Daemon( 130 daemon_id=daemon_id, 131 target=target_function, 132 target_args=target_args, 133 target_kw=target_kw, 134 properties=properties, 135 label=label, 136 ) 137 138 139 def __init__( 140 self, 141 target: Optional[Callable[[Any], Any]] = None, 142 target_args: Union[List[Any], Tuple[Any], None] = None, 143 target_kw: Optional[Dict[str, Any]] = None, 144 env: Optional[Dict[str, str]] = None, 145 daemon_id: Optional[str] = None, 146 label: Optional[str] = None, 147 properties: Optional[Dict[str, Any]] = None, 148 ): 149 """ 150 Parameters 151 ---------- 152 target: Optional[Callable[[Any], Any]], default None, 153 The function to execute in a child process. 154 155 target_args: Union[List[Any], Tuple[Any], None], default None 156 Positional arguments to pass to the target function. 157 158 target_kw: Optional[Dict[str, Any]], default None 159 Keyword arguments to pass to the target function. 160 161 env: Optional[Dict[str, str]], default None 162 If provided, set these environment variables in the daemon process. 163 164 daemon_id: Optional[str], default None 165 Build a `Daemon` from an existing `daemon_id`. 166 If `daemon_id` is provided, other arguments are ignored and are derived 167 from the existing pickled `Daemon`. 168 169 label: Optional[str], default None 170 Label string to help identifiy a daemon. 171 If `None`, use the function name instead. 172 173 properties: Optional[Dict[str, Any]], default None 174 Override reading from the properties JSON by providing an existing dictionary. 175 """ 176 _pickle = self.__dict__.get('_pickle', False) 177 if daemon_id is not None: 178 self.daemon_id = daemon_id 179 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 180 181 if not self.properties_path.exists(): 182 raise Exception( 183 f"Daemon '{self.daemon_id}' does not exist. " 184 + "Pass a target to create a new Daemon." 185 ) 186 187 try: 188 new_daemon = self.from_properties_file(daemon_id) 189 except Exception: 190 new_daemon = None 191 192 if new_daemon is not None: 193 new_daemon.write_pickle() 194 target = new_daemon.target 195 target_args = new_daemon.target_args 196 target_kw = new_daemon.target_kw 197 label = new_daemon.label 198 self._properties = new_daemon.properties 199 else: 200 try: 201 self.properties_path.unlink() 202 except Exception: 203 pass 204 205 raise Exception( 206 f"Could not recover daemon '{self.daemon_id}' " 207 + "from its properties file." 208 ) 209 210 if 'target' not in self.__dict__: 211 if target is None: 212 error("Cannot create a Daemon without a target.") 213 self.target = target 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 if 'daemon_id' not in self.__dict__: 229 self.daemon_id = get_new_daemon_name() 230 if '_properties' not in self.__dict__: 231 self._properties = properties 232 if self._properties is None: 233 self._properties = {} 234 235 self._properties.update({'label': self.label}) 236 if env: 237 self._properties.update({'env': env}) 238 239 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 240 _ = self.process 241 242 243 def _run_exit( 244 self, 245 keep_daemon_output: bool = True, 246 allow_dirty_run: bool = False, 247 ) -> Any: 248 """Run the daemon's target function. 249 NOTE: This WILL EXIT the parent process! 250 251 Parameters 252 ---------- 253 keep_daemon_output: bool, default True 254 If `False`, delete the daemon's output directory upon exiting. 255 256 allow_dirty_run, bool, default False: 257 If `True`, run the daemon, even if the `daemon_id` directory exists. 258 This option is dangerous because if the same `daemon_id` runs twice, 259 the last to finish will overwrite the output of the first. 260 261 Returns 262 ------- 263 Nothing — this will exit the parent process. 264 """ 265 import platform, sys, os, traceback 266 from meerschaum.utils.warnings import warn 267 from meerschaum.config import get_config 268 daemon = attempt_import('daemon') 269 lines = get_config('jobs', 'terminal', 'lines') 270 columns = get_config('jobs', 'terminal', 'columns') 271 272 if platform.system() == 'Windows': 273 return False, "Windows is no longer supported." 274 275 self._setup(allow_dirty_run) 276 277 ### NOTE: The SIGINT handler has been removed so that child processes may handle 278 ### KeyboardInterrupts themselves. 279 ### The previous aggressive approach was redundant because of the SIGTERM handler. 280 self._daemon_context = daemon.DaemonContext( 281 pidfile=self.pid_lock, 282 stdout=self.rotating_log, 283 stderr=self.rotating_log, 284 working_directory=os.getcwd(), 285 detach_process=True, 286 files_preserve=list(self.rotating_log.subfile_objects.values()), 287 signal_map={ 288 signal.SIGTERM: self._handle_sigterm, 289 }, 290 ) 291 292 _daemons.append(self) 293 294 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 295 self._log_refresh_timer = RepeatTimer( 296 log_refresh_seconds, 297 partial(self.rotating_log.refresh_files, start_interception=True), 298 ) 299 300 try: 301 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 302 with self._daemon_context: 303 sys.stdin = self.stdin_file 304 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 305 os.environ['PYTHONUNBUFFERED'] = '1' 306 307 ### Allow the user to override environment variables. 308 env = self.properties.get('env', {}) 309 if env and isinstance(env, dict): 310 os.environ.update({str(k): str(v) for k, v in env.items()}) 311 312 self.rotating_log.refresh_files(start_interception=True) 313 result = None 314 try: 315 with open(self.pid_path, 'w+', encoding='utf-8') as f: 316 f.write(str(os.getpid())) 317 318 ### NOTE: The timer fails to start for remote actions to localhost. 319 try: 320 if not self._log_refresh_timer.is_running(): 321 self._log_refresh_timer.start() 322 except Exception: 323 pass 324 325 self.properties['result'] = None 326 self._capture_process_timestamp('began') 327 result = self.target(*self.target_args, **self.target_kw) 328 self.properties['result'] = result 329 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 330 pass 331 except Exception as e: 332 warn( 333 f"Exception in daemon target function: {traceback.format_exc()}", 334 ) 335 result = e 336 finally: 337 _results[self.daemon_id] = result 338 339 if keep_daemon_output: 340 self._capture_process_timestamp('ended') 341 else: 342 self.cleanup() 343 344 self._log_refresh_timer.cancel() 345 if self.pid is None and self.pid_path.exists(): 346 self.pid_path.unlink() 347 348 if is_success_tuple(result): 349 try: 350 mrsm.pprint(result) 351 except BrokenPipeError: 352 pass 353 354 except Exception: 355 daemon_error = traceback.format_exc() 356 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 357 f.write(daemon_error) 358 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 359 360 def _capture_process_timestamp( 361 self, 362 process_key: str, 363 write_properties: bool = True, 364 ) -> None: 365 """ 366 Record the current timestamp to the parameters `process:<process_key>`. 367 368 Parameters 369 ---------- 370 process_key: str 371 Under which key to store the timestamp. 372 373 write_properties: bool, default True 374 If `True` persist the properties to disk immediately after capturing the timestamp. 375 """ 376 if 'process' not in self.properties: 377 self.properties['process'] = {} 378 379 if process_key not in ('began', 'ended', 'paused', 'stopped'): 380 raise ValueError(f"Invalid key '{process_key}'.") 381 382 self.properties['process'][process_key] = ( 383 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 384 ) 385 if write_properties: 386 self.write_properties() 387 388 def run( 389 self, 390 keep_daemon_output: bool = True, 391 allow_dirty_run: bool = False, 392 debug: bool = False, 393 ) -> SuccessTuple: 394 """Run the daemon as a child process and continue executing the parent. 395 396 Parameters 397 ---------- 398 keep_daemon_output: bool, default True 399 If `False`, delete the daemon's output directory upon exiting. 400 401 allow_dirty_run: bool, default False 402 If `True`, run the daemon, even if the `daemon_id` directory exists. 403 This option is dangerous because if the same `daemon_id` runs concurrently, 404 the last to finish will overwrite the output of the first. 405 406 Returns 407 ------- 408 A SuccessTuple indicating success. 409 410 """ 411 import platform 412 if platform.system() == 'Windows': 413 return False, "Cannot run background jobs on Windows." 414 415 ### The daemon might exist and be paused. 416 if self.status == 'paused': 417 return self.resume() 418 419 self._remove_stop_file() 420 if self.status == 'running': 421 return True, f"Daemon '{self}' is already running." 422 423 self.mkdir_if_not_exists(allow_dirty_run) 424 _write_pickle_success_tuple = self.write_pickle() 425 if not _write_pickle_success_tuple[0]: 426 return _write_pickle_success_tuple 427 428 _launch_daemon_code = ( 429 "from meerschaum.utils.daemon import Daemon; " 430 + f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 431 + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 432 + "allow_dirty_run=True)" 433 ) 434 env = dict(os.environ) 435 env['MRSM_NOASK'] = 'true' 436 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 437 msg = ( 438 "Success" 439 if _launch_success_bool 440 else f"Failed to start daemon '{self.daemon_id}'." 441 ) 442 return _launch_success_bool, msg 443 444 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 445 """ 446 Forcibly terminate a running daemon. 447 Sends a SIGTERM signal to the process. 448 449 Parameters 450 ---------- 451 timeout: Optional[int], default 3 452 How many seconds to wait for the process to terminate. 453 454 Returns 455 ------- 456 A SuccessTuple indicating success. 457 """ 458 if self.status != 'paused': 459 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 460 if success: 461 self._write_stop_file('kill') 462 return success, msg 463 464 if self.status == 'stopped': 465 self._write_stop_file('kill') 466 return True, "Process has already stopped." 467 468 process = self.process 469 try: 470 process.terminate() 471 process.kill() 472 process.wait(timeout=timeout) 473 except Exception as e: 474 return False, f"Failed to kill job {self} with exception: {e}" 475 476 if self.pid_path.exists(): 477 try: 478 self.pid_path.unlink() 479 except Exception as e: 480 pass 481 482 self._write_stop_file('kill') 483 return True, "Success" 484 485 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 486 """Gracefully quit a running daemon.""" 487 if self.status == 'paused': 488 return self.kill(timeout) 489 490 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 491 if signal_success: 492 self._write_stop_file('quit') 493 return signal_success, signal_msg 494 495 def pause( 496 self, 497 timeout: Union[int, float, None] = None, 498 check_timeout_interval: Union[float, int, None] = None, 499 ) -> SuccessTuple: 500 """ 501 Pause the daemon if it is running. 502 503 Parameters 504 ---------- 505 timeout: Union[float, int, None], default None 506 The maximum number of seconds to wait for a process to suspend. 507 508 check_timeout_interval: Union[float, int, None], default None 509 The number of seconds to wait between checking if the process is still running. 510 511 Returns 512 ------- 513 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 514 """ 515 if self.process is None: 516 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 517 518 if self.status == 'paused': 519 return True, f"Daemon '{self.daemon_id}' is already paused." 520 521 self._write_stop_file('pause') 522 try: 523 self.process.suspend() 524 except Exception as e: 525 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 526 527 timeout = self.get_timeout_seconds(timeout) 528 check_timeout_interval = self.get_check_timeout_interval_seconds( 529 check_timeout_interval 530 ) 531 532 psutil = attempt_import('psutil') 533 534 if not timeout: 535 try: 536 success = self.process.status() == 'stopped' 537 except psutil.NoSuchProcess as e: 538 success = True 539 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 540 if success: 541 self._capture_process_timestamp('paused') 542 return success, msg 543 544 begin = time.perf_counter() 545 while (time.perf_counter() - begin) < timeout: 546 try: 547 if self.process.status() == 'stopped': 548 self._capture_process_timestamp('paused') 549 return True, "Success" 550 except psutil.NoSuchProcess as e: 551 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 552 time.sleep(check_timeout_interval) 553 554 return False, ( 555 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 556 + ('s' if timeout != 1 else '') + '.' 557 ) 558 559 def resume( 560 self, 561 timeout: Union[int, float, None] = None, 562 check_timeout_interval: Union[float, int, None] = None, 563 ) -> SuccessTuple: 564 """ 565 Resume the daemon if it is paused. 566 567 Parameters 568 ---------- 569 timeout: Union[float, int, None], default None 570 The maximum number of seconds to wait for a process to resume. 571 572 check_timeout_interval: Union[float, int, None], default None 573 The number of seconds to wait between checking if the process is still stopped. 574 575 Returns 576 ------- 577 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 578 """ 579 if self.status == 'running': 580 return True, f"Daemon '{self.daemon_id}' is already running." 581 582 if self.status == 'stopped': 583 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 584 585 self._remove_stop_file() 586 try: 587 self.process.resume() 588 except Exception as e: 589 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 590 591 timeout = self.get_timeout_seconds(timeout) 592 check_timeout_interval = self.get_check_timeout_interval_seconds( 593 check_timeout_interval 594 ) 595 596 if not timeout: 597 success = self.status == 'running' 598 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 599 if success: 600 self._capture_process_timestamp('began') 601 return success, msg 602 603 begin = time.perf_counter() 604 while (time.perf_counter() - begin) < timeout: 605 if self.status == 'running': 606 self._capture_process_timestamp('began') 607 return True, "Success" 608 time.sleep(check_timeout_interval) 609 610 return False, ( 611 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 612 + ('s' if timeout != 1 else '') + '.' 613 ) 614 615 def _write_stop_file(self, action: str) -> SuccessTuple: 616 """Write the stop file timestamp and action.""" 617 if action not in ('quit', 'kill', 'pause'): 618 return False, f"Unsupported action '{action}'." 619 620 if not self.stop_path.parent.exists(): 621 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 622 623 with open(self.stop_path, 'w+', encoding='utf-8') as f: 624 json.dump( 625 { 626 'stop_time': datetime.now(timezone.utc).isoformat(), 627 'action': action, 628 }, 629 f 630 ) 631 632 return True, "Success" 633 634 def _remove_stop_file(self) -> SuccessTuple: 635 """Remove the stop file""" 636 if not self.stop_path.exists(): 637 return True, "Stop file does not exist." 638 639 try: 640 self.stop_path.unlink() 641 except Exception as e: 642 return False, f"Failed to remove stop file:\n{e}" 643 644 return True, "Success" 645 646 def _read_stop_file(self) -> Dict[str, Any]: 647 """ 648 Read the stop file if it exists. 649 """ 650 if not self.stop_path.exists(): 651 return {} 652 653 try: 654 with open(self.stop_path, 'r', encoding='utf-8') as f: 655 data = json.load(f) 656 return data 657 except Exception: 658 return {} 659 660 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 661 """ 662 Handle `SIGTERM` within the `Daemon` context. 663 This method is injected into the `DaemonContext`. 664 """ 665 from meerschaum.utils.process import signal_handler 666 signal_handler(signal_number, stack_frame) 667 668 timer = self.__dict__.get('_log_refresh_timer', None) 669 if timer is not None: 670 timer.cancel() 671 672 daemon_context = self.__dict__.get('_daemon_context', None) 673 if daemon_context is not None: 674 daemon_context.close() 675 676 _close_pools() 677 raise SystemExit(0) 678 679 def _send_signal( 680 self, 681 signal_to_send, 682 timeout: Union[float, int, None] = None, 683 check_timeout_interval: Union[float, int, None] = None, 684 ) -> SuccessTuple: 685 """Send a signal to the daemon process. 686 687 Parameters 688 ---------- 689 signal_to_send: 690 The signal the send to the daemon, e.g. `signals.SIGINT`. 691 692 timeout: Union[float, int, None], default None 693 The maximum number of seconds to wait for a process to terminate. 694 695 check_timeout_interval: Union[float, int, None], default None 696 The number of seconds to wait between checking if the process is still running. 697 698 Returns 699 ------- 700 A SuccessTuple indicating success. 701 """ 702 try: 703 pid = self.pid 704 if pid is None: 705 return ( 706 False, 707 f"Daemon '{self.daemon_id}' is not running, " 708 + f"cannot send signal '{signal_to_send}'." 709 ) 710 711 os.kill(pid, signal_to_send) 712 except Exception as e: 713 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 714 715 timeout = self.get_timeout_seconds(timeout) 716 check_timeout_interval = self.get_check_timeout_interval_seconds( 717 check_timeout_interval 718 ) 719 720 if not timeout: 721 return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'." 722 723 begin = time.perf_counter() 724 while (time.perf_counter() - begin) < timeout: 725 if not self.status == 'running': 726 return True, "Success" 727 time.sleep(check_timeout_interval) 728 729 return False, ( 730 f"Failed to stop daemon '{self.daemon_id}' within {timeout} second" 731 + ('s' if timeout != 1 else '') + '.' 732 ) 733 734 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 735 """Create the Daemon's directory. 736 If `allow_dirty_run` is `False` and the directory already exists, 737 raise a `FileExistsError`. 738 """ 739 try: 740 self.path.mkdir(parents=True, exist_ok=True) 741 _already_exists = any(os.scandir(self.path)) 742 except FileExistsError: 743 _already_exists = True 744 745 if _already_exists and not allow_dirty_run: 746 error( 747 f"Daemon '{self.daemon_id}' already exists. " + 748 f"To allow this daemon to run, do one of the following:\n" 749 + " - Execute `daemon.cleanup()`.\n" 750 + f" - Delete the directory '{self.path}'.\n" 751 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 752 FileExistsError, 753 ) 754 755 @property 756 def process(self) -> Union['psutil.Process', None]: 757 """ 758 Return the psutil process for the Daemon. 759 """ 760 psutil = attempt_import('psutil') 761 pid = self.pid 762 if pid is None: 763 return None 764 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 765 try: 766 self._process = psutil.Process(int(pid)) 767 except Exception as e: 768 if self.pid_path.exists(): 769 self.pid_path.unlink() 770 return None 771 return self._process 772 773 @property 774 def status(self) -> str: 775 """ 776 Return the running status of this Daemon. 777 """ 778 if self.process is None: 779 return 'stopped' 780 781 psutil = attempt_import('psutil') 782 try: 783 if self.process.status() == 'stopped': 784 return 'paused' 785 if self.process.status() == 'zombie': 786 raise psutil.NoSuchProcess(self.process.pid) 787 except (psutil.NoSuchProcess, AttributeError): 788 if self.pid_path.exists(): 789 try: 790 self.pid_path.unlink() 791 except Exception as e: 792 pass 793 return 'stopped' 794 795 return 'running' 796 797 @classmethod 798 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 799 """ 800 Return a Daemon's path from its `daemon_id`. 801 """ 802 return DAEMON_RESOURCES_PATH / daemon_id 803 804 @property 805 def path(self) -> pathlib.Path: 806 """ 807 Return the path for this Daemon's directory. 808 """ 809 return self._get_path_from_daemon_id(self.daemon_id) 810 811 @classmethod 812 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 813 """ 814 Return the `properties.json` path for a given `daemon_id`. 815 """ 816 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 817 818 @property 819 def properties_path(self) -> pathlib.Path: 820 """ 821 Return the `propterties.json` path for this Daemon. 822 """ 823 return self._get_properties_path_from_daemon_id(self.daemon_id) 824 825 @property 826 def stop_path(self) -> pathlib.Path: 827 """ 828 Return the path for the stop file (created when manually stopped). 829 """ 830 return self.path / '.stop.json' 831 832 @property 833 def log_path(self) -> pathlib.Path: 834 """ 835 Return the log path. 836 """ 837 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 838 839 @property 840 def stdin_file_path(self) -> pathlib.Path: 841 """ 842 Return the stdin file path. 843 """ 844 return self.path / 'input.stdin' 845 846 @property 847 def blocking_stdin_file_path(self) -> pathlib.Path: 848 """ 849 Return the stdin file path. 850 """ 851 if '_blocking_stdin_file_path' in self.__dict__: 852 return self._blocking_stdin_file_path 853 854 return self.path / 'input.stdin.block' 855 856 @property 857 def log_offset_path(self) -> pathlib.Path: 858 """ 859 Return the log offset file path. 860 """ 861 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 862 863 @property 864 def rotating_log(self) -> RotatingFile: 865 """ 866 The rotating log file for the daemon's output. 867 """ 868 if '_rotating_log' in self.__dict__: 869 return self._rotating_log 870 871 write_timestamps = ( 872 self.properties.get('logs', {}).get('write_timestamps', None) 873 ) 874 if write_timestamps is None: 875 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 876 877 self._rotating_log = RotatingFile( 878 self.log_path, 879 redirect_streams=True, 880 write_timestamps=write_timestamps, 881 timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'), 882 ) 883 return self._rotating_log 884 885 @property 886 def stdin_file(self): 887 """ 888 Return the file handler for the stdin file. 889 """ 890 if '_stdin_file' in self.__dict__: 891 return self._stdin_file 892 893 self._stdin_file = StdinFile( 894 self.stdin_file_path, 895 lock_file_path=self.blocking_stdin_file_path, 896 ) 897 return self._stdin_file 898 899 @property 900 def log_text(self) -> Optional[str]: 901 """ 902 Read the log files and return their contents. 903 Returns `None` if the log file does not exist. 904 """ 905 new_rotating_log = RotatingFile( 906 self.rotating_log.file_path, 907 num_files_to_keep = self.rotating_log.num_files_to_keep, 908 max_file_size = self.rotating_log.max_file_size, 909 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'), 910 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'), 911 ) 912 return new_rotating_log.read() 913 914 def readlines(self) -> List[str]: 915 """ 916 Read the next log lines, persisting the cursor for later use. 917 Note this will alter the cursor of `self.rotating_log`. 918 """ 919 self.rotating_log._cursor = self._read_log_offset() 920 lines = self.rotating_log.readlines() 921 self._write_log_offset() 922 return lines 923 924 def _read_log_offset(self) -> Tuple[int, int]: 925 """ 926 Return the current log offset cursor. 927 928 Returns 929 ------- 930 A tuple of the form (`subfile_index`, `position`). 931 """ 932 if not self.log_offset_path.exists(): 933 return 0, 0 934 935 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 936 cursor_text = f.read() 937 cursor_parts = cursor_text.split(' ') 938 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 939 return subfile_index, subfile_position 940 941 def _write_log_offset(self) -> None: 942 """ 943 Write the current log offset file. 944 """ 945 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 946 subfile_index = self.rotating_log._cursor[0] 947 subfile_position = self.rotating_log._cursor[1] 948 f.write(f"{subfile_index} {subfile_position}") 949 950 @property 951 def pid(self) -> Union[int, None]: 952 """ 953 Read the PID file and return its contents. 954 Returns `None` if the PID file does not exist. 955 """ 956 if not self.pid_path.exists(): 957 return None 958 try: 959 with open(self.pid_path, 'r', encoding='utf-8') as f: 960 text = f.read() 961 if len(text) == 0: 962 return None 963 pid = int(text.rstrip()) 964 except Exception as e: 965 warn(e) 966 text = None 967 pid = None 968 return pid 969 970 @property 971 def pid_path(self) -> pathlib.Path: 972 """ 973 Return the path to a file containing the PID for this Daemon. 974 """ 975 return self.path / 'process.pid' 976 977 @property 978 def pid_lock(self) -> 'fasteners.InterProcessLock': 979 """ 980 Return the process lock context manager. 981 """ 982 if '_pid_lock' in self.__dict__: 983 return self._pid_lock 984 985 fasteners = attempt_import('fasteners') 986 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 987 return self._pid_lock 988 989 @property 990 def pickle_path(self) -> pathlib.Path: 991 """ 992 Return the path for the pickle file. 993 """ 994 return self.path / 'pickle.pkl' 995 996 def read_properties(self) -> Optional[Dict[str, Any]]: 997 """Read the properties JSON file and return the dictionary.""" 998 if not self.properties_path.exists(): 999 return None 1000 try: 1001 with open(self.properties_path, 'r', encoding='utf-8') as file: 1002 properties = json.load(file) 1003 except Exception as e: 1004 properties = {} 1005 1006 return properties 1007 1008 def read_pickle(self) -> Daemon: 1009 """Read a Daemon's pickle file and return the `Daemon`.""" 1010 import pickle, traceback 1011 if not self.pickle_path.exists(): 1012 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1013 1014 if self.pickle_path.stat().st_size == 0: 1015 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1016 1017 try: 1018 with open(self.pickle_path, 'rb') as pickle_file: 1019 daemon = pickle.load(pickle_file) 1020 success, msg = True, 'Success' 1021 except Exception as e: 1022 success, msg = False, str(e) 1023 daemon = None 1024 traceback.print_exception(type(e), e, e.__traceback__) 1025 if not success: 1026 error(msg) 1027 return daemon 1028 1029 @property 1030 def properties(self) -> Dict[str, Any]: 1031 """ 1032 Return the contents of the properties JSON file. 1033 """ 1034 try: 1035 _file_properties = self.read_properties() 1036 except Exception: 1037 traceback.print_exc() 1038 _file_properties = {} 1039 1040 if not self._properties: 1041 self._properties = _file_properties 1042 1043 if self._properties is None: 1044 self._properties = {} 1045 1046 if _file_properties is not None: 1047 self._properties = apply_patch_to_config( 1048 _file_properties, 1049 self._properties, 1050 ) 1051 1052 return self._properties 1053 1054 @property 1055 def hidden(self) -> bool: 1056 """ 1057 Return a bool indicating whether this Daemon should be displayed. 1058 """ 1059 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1060 1061 def write_properties(self) -> SuccessTuple: 1062 """Write the properties dictionary to the properties JSON file 1063 (only if self.properties exists). 1064 """ 1065 success, msg = ( 1066 False, 1067 f"No properties to write for daemon '{self.daemon_id}'." 1068 ) 1069 if self.properties is not None: 1070 try: 1071 self.path.mkdir(parents=True, exist_ok=True) 1072 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1073 json.dump(self.properties, properties_file) 1074 success, msg = True, 'Success' 1075 except Exception as e: 1076 success, msg = False, str(e) 1077 return success, msg 1078 1079 def write_pickle(self) -> SuccessTuple: 1080 """Write the pickle file for the daemon.""" 1081 import pickle, traceback 1082 try: 1083 self.path.mkdir(parents=True, exist_ok=True) 1084 with open(self.pickle_path, 'wb+') as pickle_file: 1085 pickle.dump(self, pickle_file) 1086 success, msg = True, "Success" 1087 except Exception as e: 1088 success, msg = False, str(e) 1089 traceback.print_exception(type(e), e, e.__traceback__) 1090 return success, msg 1091 1092 1093 def _setup( 1094 self, 1095 allow_dirty_run: bool = False, 1096 ) -> None: 1097 """ 1098 Update properties before starting the Daemon. 1099 """ 1100 if self.properties is None: 1101 self._properties = {} 1102 1103 self._properties.update({ 1104 'target': { 1105 'name': self.target.__name__, 1106 'module': self.target.__module__, 1107 'args': self.target_args, 1108 'kw': self.target_kw, 1109 }, 1110 }) 1111 self.mkdir_if_not_exists(allow_dirty_run) 1112 _write_properties_success_tuple = self.write_properties() 1113 if not _write_properties_success_tuple[0]: 1114 error(_write_properties_success_tuple[1]) 1115 1116 _write_pickle_success_tuple = self.write_pickle() 1117 if not _write_pickle_success_tuple[0]: 1118 error(_write_pickle_success_tuple[1]) 1119 1120 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1121 """ 1122 Remove a daemon's directory after execution. 1123 1124 Parameters 1125 ---------- 1126 keep_logs: bool, default False 1127 If `True`, skip deleting the daemon's log files. 1128 1129 Returns 1130 ------- 1131 A `SuccessTuple` indicating success. 1132 """ 1133 if self.path.exists(): 1134 try: 1135 shutil.rmtree(self.path) 1136 except Exception as e: 1137 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1138 warn(msg) 1139 return False, msg 1140 if not keep_logs: 1141 self.rotating_log.delete() 1142 try: 1143 if self.log_offset_path.exists(): 1144 self.log_offset_path.unlink() 1145 except Exception as e: 1146 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1147 warn(msg) 1148 return False, msg 1149 return True, "Success" 1150 1151 1152 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1153 """ 1154 Return the timeout value to use. Use `--timeout-seconds` if provided, 1155 else the configured default (8). 1156 """ 1157 if isinstance(timeout, (int, float)): 1158 return timeout 1159 return get_config('jobs', 'timeout_seconds') 1160 1161 1162 def get_check_timeout_interval_seconds( 1163 self, 1164 check_timeout_interval: Union[int, float, None] = None, 1165 ) -> Union[int, float]: 1166 """ 1167 Return the interval value to check the status of timeouts. 1168 """ 1169 if isinstance(check_timeout_interval, (int, float)): 1170 return check_timeout_interval 1171 return get_config('jobs', 'check_timeout_interval_seconds') 1172 1173 @property 1174 def target_args(self) -> Union[Tuple[Any], None]: 1175 """ 1176 Return the positional arguments to pass to the target function. 1177 """ 1178 target_args = ( 1179 self.__dict__.get('_target_args', None) 1180 or self.properties.get('target', {}).get('args', None) 1181 ) 1182 if target_args is None: 1183 return tuple([]) 1184 1185 return tuple(target_args) 1186 1187 @property 1188 def target_kw(self) -> Union[Dict[str, Any], None]: 1189 """ 1190 Return the keyword arguments to pass to the target function. 1191 """ 1192 target_kw = ( 1193 self.__dict__.get('_target_kw', None) 1194 or self.properties.get('target', {}).get('kw', None) 1195 ) 1196 if target_kw is None: 1197 return {} 1198 1199 return {key: val for key, val in target_kw.items()} 1200 1201 def __getstate__(self): 1202 """ 1203 Pickle this Daemon. 1204 """ 1205 dill = attempt_import('dill') 1206 return { 1207 'target': dill.dumps(self.target), 1208 'target_args': self.target_args, 1209 'target_kw': self.target_kw, 1210 'daemon_id': self.daemon_id, 1211 'label': self.label, 1212 'properties': self.properties, 1213 } 1214 1215 def __setstate__(self, _state: Dict[str, Any]): 1216 """ 1217 Restore this Daemon from a pickled state. 1218 If the properties file exists, skip the old pickled version. 1219 """ 1220 dill = attempt_import('dill') 1221 _state['target'] = dill.loads(_state['target']) 1222 self._pickle = True 1223 daemon_id = _state.get('daemon_id', None) 1224 if not daemon_id: 1225 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1226 1227 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1228 ignore_properties = properties_path.exists() 1229 if ignore_properties: 1230 _state = { 1231 key: val 1232 for key, val in _state.items() 1233 if key != 'properties' 1234 } 1235 self.__init__(**_state) 1236 1237 1238 def __repr__(self): 1239 return str(self) 1240 1241 def __str__(self): 1242 return self.daemon_id 1243 1244 def __eq__(self, other): 1245 if not isinstance(other, Daemon): 1246 return False 1247 return self.daemon_id == other.daemon_id 1248 1249 def __hash__(self): 1250 return hash(self.daemon_id)
46class Daemon: 47 """ 48 Daemonize Python functions into background processes. 49 50 Examples 51 -------- 52 >>> import meerschaum as mrsm 53 >>> from meerschaum.utils.daemons import Daemon 54 >>> daemon = Daemon(print, ('hi',)) 55 >>> success, msg = daemon.run() 56 >>> print(daemon.log_text) 57 58 2024-07-29 18:03 | hi 59 2024-07-29 18:03 | 60 >>> daemon.run(allow_dirty_run=True) 61 >>> print(daemon.log_text) 62 63 2024-07-29 18:03 | hi 64 2024-07-29 18:03 | 65 2024-07-29 18:05 | hi 66 2024-07-29 18:05 | 67 >>> mrsm.pprint(daemon.properties) 68 { 69 'label': 'print', 70 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 71 'result': None, 72 'process': {'ended': '2024-07-29T18:03:33.752806'} 73 } 74 75 """ 76 77 def __new__( 78 cls, 79 *args, 80 daemon_id: Optional[str] = None, 81 **kw 82 ): 83 """ 84 If a daemon_id is provided and already exists, read from its pickle file. 85 """ 86 instance = super(Daemon, cls).__new__(cls) 87 if daemon_id is not None: 88 instance.daemon_id = daemon_id 89 if instance.pickle_path.exists(): 90 instance = instance.read_pickle() 91 return instance 92 93 @classmethod 94 def from_properties_file(cls, daemon_id: str) -> Daemon: 95 """ 96 Return a Daemon from a properties dictionary. 97 """ 98 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 99 if not properties_path.exists(): 100 raise OSError(f"Properties file '{properties_path}' does not exist.") 101 102 try: 103 with open(properties_path, 'r', encoding='utf-8') as f: 104 properties = json.load(f) 105 except Exception: 106 properties = {} 107 108 if not properties: 109 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 110 111 daemon_id = properties_path.parent.name 112 target_cf = properties.get('target', {}) 113 target_module_name = target_cf.get('module', None) 114 target_function_name = target_cf.get('name', None) 115 target_args = target_cf.get('args', None) 116 target_kw = target_cf.get('kw', None) 117 label = properties.get('label', None) 118 119 if None in [ 120 target_module_name, 121 target_function_name, 122 target_args, 123 target_kw, 124 ]: 125 raise ValueError("Missing target function information.") 126 127 target_module = importlib.import_module(target_module_name) 128 target_function = getattr(target_module, target_function_name) 129 130 return Daemon( 131 daemon_id=daemon_id, 132 target=target_function, 133 target_args=target_args, 134 target_kw=target_kw, 135 properties=properties, 136 label=label, 137 ) 138 139 140 def __init__( 141 self, 142 target: Optional[Callable[[Any], Any]] = None, 143 target_args: Union[List[Any], Tuple[Any], None] = None, 144 target_kw: Optional[Dict[str, Any]] = None, 145 env: Optional[Dict[str, str]] = None, 146 daemon_id: Optional[str] = None, 147 label: Optional[str] = None, 148 properties: Optional[Dict[str, Any]] = None, 149 ): 150 """ 151 Parameters 152 ---------- 153 target: Optional[Callable[[Any], Any]], default None, 154 The function to execute in a child process. 155 156 target_args: Union[List[Any], Tuple[Any], None], default None 157 Positional arguments to pass to the target function. 158 159 target_kw: Optional[Dict[str, Any]], default None 160 Keyword arguments to pass to the target function. 161 162 env: Optional[Dict[str, str]], default None 163 If provided, set these environment variables in the daemon process. 164 165 daemon_id: Optional[str], default None 166 Build a `Daemon` from an existing `daemon_id`. 167 If `daemon_id` is provided, other arguments are ignored and are derived 168 from the existing pickled `Daemon`. 169 170 label: Optional[str], default None 171 Label string to help identifiy a daemon. 172 If `None`, use the function name instead. 173 174 properties: Optional[Dict[str, Any]], default None 175 Override reading from the properties JSON by providing an existing dictionary. 176 """ 177 _pickle = self.__dict__.get('_pickle', False) 178 if daemon_id is not None: 179 self.daemon_id = daemon_id 180 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 181 182 if not self.properties_path.exists(): 183 raise Exception( 184 f"Daemon '{self.daemon_id}' does not exist. " 185 + "Pass a target to create a new Daemon." 186 ) 187 188 try: 189 new_daemon = self.from_properties_file(daemon_id) 190 except Exception: 191 new_daemon = None 192 193 if new_daemon is not None: 194 new_daemon.write_pickle() 195 target = new_daemon.target 196 target_args = new_daemon.target_args 197 target_kw = new_daemon.target_kw 198 label = new_daemon.label 199 self._properties = new_daemon.properties 200 else: 201 try: 202 self.properties_path.unlink() 203 except Exception: 204 pass 205 206 raise Exception( 207 f"Could not recover daemon '{self.daemon_id}' " 208 + "from its properties file." 209 ) 210 211 if 'target' not in self.__dict__: 212 if target is None: 213 error("Cannot create a Daemon without a target.") 214 self.target = target 215 216 ### NOTE: We have to check self.__dict__ in case we un-pickling. 217 if '_target_args' not in self.__dict__: 218 self._target_args = target_args 219 if '_target_kw' not in self.__dict__: 220 self._target_kw = target_kw 221 222 if 'label' not in self.__dict__: 223 if label is None: 224 label = ( 225 self.target.__name__ if '__name__' in self.target.__dir__() 226 else str(self.target) 227 ) 228 self.label = label 229 if 'daemon_id' not in self.__dict__: 230 self.daemon_id = get_new_daemon_name() 231 if '_properties' not in self.__dict__: 232 self._properties = properties 233 if self._properties is None: 234 self._properties = {} 235 236 self._properties.update({'label': self.label}) 237 if env: 238 self._properties.update({'env': env}) 239 240 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 241 _ = self.process 242 243 244 def _run_exit( 245 self, 246 keep_daemon_output: bool = True, 247 allow_dirty_run: bool = False, 248 ) -> Any: 249 """Run the daemon's target function. 250 NOTE: This WILL EXIT the parent process! 251 252 Parameters 253 ---------- 254 keep_daemon_output: bool, default True 255 If `False`, delete the daemon's output directory upon exiting. 256 257 allow_dirty_run, bool, default False: 258 If `True`, run the daemon, even if the `daemon_id` directory exists. 259 This option is dangerous because if the same `daemon_id` runs twice, 260 the last to finish will overwrite the output of the first. 261 262 Returns 263 ------- 264 Nothing — this will exit the parent process. 265 """ 266 import platform, sys, os, traceback 267 from meerschaum.utils.warnings import warn 268 from meerschaum.config import get_config 269 daemon = attempt_import('daemon') 270 lines = get_config('jobs', 'terminal', 'lines') 271 columns = get_config('jobs', 'terminal', 'columns') 272 273 if platform.system() == 'Windows': 274 return False, "Windows is no longer supported." 275 276 self._setup(allow_dirty_run) 277 278 ### NOTE: The SIGINT handler has been removed so that child processes may handle 279 ### KeyboardInterrupts themselves. 280 ### The previous aggressive approach was redundant because of the SIGTERM handler. 281 self._daemon_context = daemon.DaemonContext( 282 pidfile=self.pid_lock, 283 stdout=self.rotating_log, 284 stderr=self.rotating_log, 285 working_directory=os.getcwd(), 286 detach_process=True, 287 files_preserve=list(self.rotating_log.subfile_objects.values()), 288 signal_map={ 289 signal.SIGTERM: self._handle_sigterm, 290 }, 291 ) 292 293 _daemons.append(self) 294 295 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 296 self._log_refresh_timer = RepeatTimer( 297 log_refresh_seconds, 298 partial(self.rotating_log.refresh_files, start_interception=True), 299 ) 300 301 try: 302 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 303 with self._daemon_context: 304 sys.stdin = self.stdin_file 305 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 306 os.environ['PYTHONUNBUFFERED'] = '1' 307 308 ### Allow the user to override environment variables. 309 env = self.properties.get('env', {}) 310 if env and isinstance(env, dict): 311 os.environ.update({str(k): str(v) for k, v in env.items()}) 312 313 self.rotating_log.refresh_files(start_interception=True) 314 result = None 315 try: 316 with open(self.pid_path, 'w+', encoding='utf-8') as f: 317 f.write(str(os.getpid())) 318 319 ### NOTE: The timer fails to start for remote actions to localhost. 320 try: 321 if not self._log_refresh_timer.is_running(): 322 self._log_refresh_timer.start() 323 except Exception: 324 pass 325 326 self.properties['result'] = None 327 self._capture_process_timestamp('began') 328 result = self.target(*self.target_args, **self.target_kw) 329 self.properties['result'] = result 330 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 331 pass 332 except Exception as e: 333 warn( 334 f"Exception in daemon target function: {traceback.format_exc()}", 335 ) 336 result = e 337 finally: 338 _results[self.daemon_id] = result 339 340 if keep_daemon_output: 341 self._capture_process_timestamp('ended') 342 else: 343 self.cleanup() 344 345 self._log_refresh_timer.cancel() 346 if self.pid is None and self.pid_path.exists(): 347 self.pid_path.unlink() 348 349 if is_success_tuple(result): 350 try: 351 mrsm.pprint(result) 352 except BrokenPipeError: 353 pass 354 355 except Exception: 356 daemon_error = traceback.format_exc() 357 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 358 f.write(daemon_error) 359 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 360 361 def _capture_process_timestamp( 362 self, 363 process_key: str, 364 write_properties: bool = True, 365 ) -> None: 366 """ 367 Record the current timestamp to the parameters `process:<process_key>`. 368 369 Parameters 370 ---------- 371 process_key: str 372 Under which key to store the timestamp. 373 374 write_properties: bool, default True 375 If `True` persist the properties to disk immediately after capturing the timestamp. 376 """ 377 if 'process' not in self.properties: 378 self.properties['process'] = {} 379 380 if process_key not in ('began', 'ended', 'paused', 'stopped'): 381 raise ValueError(f"Invalid key '{process_key}'.") 382 383 self.properties['process'][process_key] = ( 384 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 385 ) 386 if write_properties: 387 self.write_properties() 388 389 def run( 390 self, 391 keep_daemon_output: bool = True, 392 allow_dirty_run: bool = False, 393 debug: bool = False, 394 ) -> SuccessTuple: 395 """Run the daemon as a child process and continue executing the parent. 396 397 Parameters 398 ---------- 399 keep_daemon_output: bool, default True 400 If `False`, delete the daemon's output directory upon exiting. 401 402 allow_dirty_run: bool, default False 403 If `True`, run the daemon, even if the `daemon_id` directory exists. 404 This option is dangerous because if the same `daemon_id` runs concurrently, 405 the last to finish will overwrite the output of the first. 406 407 Returns 408 ------- 409 A SuccessTuple indicating success. 410 411 """ 412 import platform 413 if platform.system() == 'Windows': 414 return False, "Cannot run background jobs on Windows." 415 416 ### The daemon might exist and be paused. 417 if self.status == 'paused': 418 return self.resume() 419 420 self._remove_stop_file() 421 if self.status == 'running': 422 return True, f"Daemon '{self}' is already running." 423 424 self.mkdir_if_not_exists(allow_dirty_run) 425 _write_pickle_success_tuple = self.write_pickle() 426 if not _write_pickle_success_tuple[0]: 427 return _write_pickle_success_tuple 428 429 _launch_daemon_code = ( 430 "from meerschaum.utils.daemon import Daemon; " 431 + f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 432 + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 433 + "allow_dirty_run=True)" 434 ) 435 env = dict(os.environ) 436 env['MRSM_NOASK'] = 'true' 437 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 438 msg = ( 439 "Success" 440 if _launch_success_bool 441 else f"Failed to start daemon '{self.daemon_id}'." 442 ) 443 return _launch_success_bool, msg 444 445 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 446 """ 447 Forcibly terminate a running daemon. 448 Sends a SIGTERM signal to the process. 449 450 Parameters 451 ---------- 452 timeout: Optional[int], default 3 453 How many seconds to wait for the process to terminate. 454 455 Returns 456 ------- 457 A SuccessTuple indicating success. 458 """ 459 if self.status != 'paused': 460 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 461 if success: 462 self._write_stop_file('kill') 463 return success, msg 464 465 if self.status == 'stopped': 466 self._write_stop_file('kill') 467 return True, "Process has already stopped." 468 469 process = self.process 470 try: 471 process.terminate() 472 process.kill() 473 process.wait(timeout=timeout) 474 except Exception as e: 475 return False, f"Failed to kill job {self} with exception: {e}" 476 477 if self.pid_path.exists(): 478 try: 479 self.pid_path.unlink() 480 except Exception as e: 481 pass 482 483 self._write_stop_file('kill') 484 return True, "Success" 485 486 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 487 """Gracefully quit a running daemon.""" 488 if self.status == 'paused': 489 return self.kill(timeout) 490 491 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 492 if signal_success: 493 self._write_stop_file('quit') 494 return signal_success, signal_msg 495 496 def pause( 497 self, 498 timeout: Union[int, float, None] = None, 499 check_timeout_interval: Union[float, int, None] = None, 500 ) -> SuccessTuple: 501 """ 502 Pause the daemon if it is running. 503 504 Parameters 505 ---------- 506 timeout: Union[float, int, None], default None 507 The maximum number of seconds to wait for a process to suspend. 508 509 check_timeout_interval: Union[float, int, None], default None 510 The number of seconds to wait between checking if the process is still running. 511 512 Returns 513 ------- 514 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 515 """ 516 if self.process is None: 517 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 518 519 if self.status == 'paused': 520 return True, f"Daemon '{self.daemon_id}' is already paused." 521 522 self._write_stop_file('pause') 523 try: 524 self.process.suspend() 525 except Exception as e: 526 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 527 528 timeout = self.get_timeout_seconds(timeout) 529 check_timeout_interval = self.get_check_timeout_interval_seconds( 530 check_timeout_interval 531 ) 532 533 psutil = attempt_import('psutil') 534 535 if not timeout: 536 try: 537 success = self.process.status() == 'stopped' 538 except psutil.NoSuchProcess as e: 539 success = True 540 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 541 if success: 542 self._capture_process_timestamp('paused') 543 return success, msg 544 545 begin = time.perf_counter() 546 while (time.perf_counter() - begin) < timeout: 547 try: 548 if self.process.status() == 'stopped': 549 self._capture_process_timestamp('paused') 550 return True, "Success" 551 except psutil.NoSuchProcess as e: 552 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 553 time.sleep(check_timeout_interval) 554 555 return False, ( 556 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 557 + ('s' if timeout != 1 else '') + '.' 558 ) 559 560 def resume( 561 self, 562 timeout: Union[int, float, None] = None, 563 check_timeout_interval: Union[float, int, None] = None, 564 ) -> SuccessTuple: 565 """ 566 Resume the daemon if it is paused. 567 568 Parameters 569 ---------- 570 timeout: Union[float, int, None], default None 571 The maximum number of seconds to wait for a process to resume. 572 573 check_timeout_interval: Union[float, int, None], default None 574 The number of seconds to wait between checking if the process is still stopped. 575 576 Returns 577 ------- 578 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 579 """ 580 if self.status == 'running': 581 return True, f"Daemon '{self.daemon_id}' is already running." 582 583 if self.status == 'stopped': 584 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 585 586 self._remove_stop_file() 587 try: 588 self.process.resume() 589 except Exception as e: 590 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 591 592 timeout = self.get_timeout_seconds(timeout) 593 check_timeout_interval = self.get_check_timeout_interval_seconds( 594 check_timeout_interval 595 ) 596 597 if not timeout: 598 success = self.status == 'running' 599 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 600 if success: 601 self._capture_process_timestamp('began') 602 return success, msg 603 604 begin = time.perf_counter() 605 while (time.perf_counter() - begin) < timeout: 606 if self.status == 'running': 607 self._capture_process_timestamp('began') 608 return True, "Success" 609 time.sleep(check_timeout_interval) 610 611 return False, ( 612 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 613 + ('s' if timeout != 1 else '') + '.' 614 ) 615 616 def _write_stop_file(self, action: str) -> SuccessTuple: 617 """Write the stop file timestamp and action.""" 618 if action not in ('quit', 'kill', 'pause'): 619 return False, f"Unsupported action '{action}'." 620 621 if not self.stop_path.parent.exists(): 622 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 623 624 with open(self.stop_path, 'w+', encoding='utf-8') as f: 625 json.dump( 626 { 627 'stop_time': datetime.now(timezone.utc).isoformat(), 628 'action': action, 629 }, 630 f 631 ) 632 633 return True, "Success" 634 635 def _remove_stop_file(self) -> SuccessTuple: 636 """Remove the stop file""" 637 if not self.stop_path.exists(): 638 return True, "Stop file does not exist." 639 640 try: 641 self.stop_path.unlink() 642 except Exception as e: 643 return False, f"Failed to remove stop file:\n{e}" 644 645 return True, "Success" 646 647 def _read_stop_file(self) -> Dict[str, Any]: 648 """ 649 Read the stop file if it exists. 650 """ 651 if not self.stop_path.exists(): 652 return {} 653 654 try: 655 with open(self.stop_path, 'r', encoding='utf-8') as f: 656 data = json.load(f) 657 return data 658 except Exception: 659 return {} 660 661 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 662 """ 663 Handle `SIGTERM` within the `Daemon` context. 664 This method is injected into the `DaemonContext`. 665 """ 666 from meerschaum.utils.process import signal_handler 667 signal_handler(signal_number, stack_frame) 668 669 timer = self.__dict__.get('_log_refresh_timer', None) 670 if timer is not None: 671 timer.cancel() 672 673 daemon_context = self.__dict__.get('_daemon_context', None) 674 if daemon_context is not None: 675 daemon_context.close() 676 677 _close_pools() 678 raise SystemExit(0) 679 680 def _send_signal( 681 self, 682 signal_to_send, 683 timeout: Union[float, int, None] = None, 684 check_timeout_interval: Union[float, int, None] = None, 685 ) -> SuccessTuple: 686 """Send a signal to the daemon process. 687 688 Parameters 689 ---------- 690 signal_to_send: 691 The signal the send to the daemon, e.g. `signals.SIGINT`. 692 693 timeout: Union[float, int, None], default None 694 The maximum number of seconds to wait for a process to terminate. 695 696 check_timeout_interval: Union[float, int, None], default None 697 The number of seconds to wait between checking if the process is still running. 698 699 Returns 700 ------- 701 A SuccessTuple indicating success. 702 """ 703 try: 704 pid = self.pid 705 if pid is None: 706 return ( 707 False, 708 f"Daemon '{self.daemon_id}' is not running, " 709 + f"cannot send signal '{signal_to_send}'." 710 ) 711 712 os.kill(pid, signal_to_send) 713 except Exception as e: 714 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 715 716 timeout = self.get_timeout_seconds(timeout) 717 check_timeout_interval = self.get_check_timeout_interval_seconds( 718 check_timeout_interval 719 ) 720 721 if not timeout: 722 return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'." 723 724 begin = time.perf_counter() 725 while (time.perf_counter() - begin) < timeout: 726 if not self.status == 'running': 727 return True, "Success" 728 time.sleep(check_timeout_interval) 729 730 return False, ( 731 f"Failed to stop daemon '{self.daemon_id}' within {timeout} second" 732 + ('s' if timeout != 1 else '') + '.' 733 ) 734 735 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 736 """Create the Daemon's directory. 737 If `allow_dirty_run` is `False` and the directory already exists, 738 raise a `FileExistsError`. 739 """ 740 try: 741 self.path.mkdir(parents=True, exist_ok=True) 742 _already_exists = any(os.scandir(self.path)) 743 except FileExistsError: 744 _already_exists = True 745 746 if _already_exists and not allow_dirty_run: 747 error( 748 f"Daemon '{self.daemon_id}' already exists. " + 749 f"To allow this daemon to run, do one of the following:\n" 750 + " - Execute `daemon.cleanup()`.\n" 751 + f" - Delete the directory '{self.path}'.\n" 752 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 753 FileExistsError, 754 ) 755 756 @property 757 def process(self) -> Union['psutil.Process', None]: 758 """ 759 Return the psutil process for the Daemon. 760 """ 761 psutil = attempt_import('psutil') 762 pid = self.pid 763 if pid is None: 764 return None 765 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 766 try: 767 self._process = psutil.Process(int(pid)) 768 except Exception as e: 769 if self.pid_path.exists(): 770 self.pid_path.unlink() 771 return None 772 return self._process 773 774 @property 775 def status(self) -> str: 776 """ 777 Return the running status of this Daemon. 778 """ 779 if self.process is None: 780 return 'stopped' 781 782 psutil = attempt_import('psutil') 783 try: 784 if self.process.status() == 'stopped': 785 return 'paused' 786 if self.process.status() == 'zombie': 787 raise psutil.NoSuchProcess(self.process.pid) 788 except (psutil.NoSuchProcess, AttributeError): 789 if self.pid_path.exists(): 790 try: 791 self.pid_path.unlink() 792 except Exception as e: 793 pass 794 return 'stopped' 795 796 return 'running' 797 798 @classmethod 799 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 800 """ 801 Return a Daemon's path from its `daemon_id`. 802 """ 803 return DAEMON_RESOURCES_PATH / daemon_id 804 805 @property 806 def path(self) -> pathlib.Path: 807 """ 808 Return the path for this Daemon's directory. 809 """ 810 return self._get_path_from_daemon_id(self.daemon_id) 811 812 @classmethod 813 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 814 """ 815 Return the `properties.json` path for a given `daemon_id`. 816 """ 817 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 818 819 @property 820 def properties_path(self) -> pathlib.Path: 821 """ 822 Return the `propterties.json` path for this Daemon. 823 """ 824 return self._get_properties_path_from_daemon_id(self.daemon_id) 825 826 @property 827 def stop_path(self) -> pathlib.Path: 828 """ 829 Return the path for the stop file (created when manually stopped). 830 """ 831 return self.path / '.stop.json' 832 833 @property 834 def log_path(self) -> pathlib.Path: 835 """ 836 Return the log path. 837 """ 838 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 839 840 @property 841 def stdin_file_path(self) -> pathlib.Path: 842 """ 843 Return the stdin file path. 844 """ 845 return self.path / 'input.stdin' 846 847 @property 848 def blocking_stdin_file_path(self) -> pathlib.Path: 849 """ 850 Return the stdin file path. 851 """ 852 if '_blocking_stdin_file_path' in self.__dict__: 853 return self._blocking_stdin_file_path 854 855 return self.path / 'input.stdin.block' 856 857 @property 858 def log_offset_path(self) -> pathlib.Path: 859 """ 860 Return the log offset file path. 861 """ 862 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 863 864 @property 865 def rotating_log(self) -> RotatingFile: 866 """ 867 The rotating log file for the daemon's output. 868 """ 869 if '_rotating_log' in self.__dict__: 870 return self._rotating_log 871 872 write_timestamps = ( 873 self.properties.get('logs', {}).get('write_timestamps', None) 874 ) 875 if write_timestamps is None: 876 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 877 878 self._rotating_log = RotatingFile( 879 self.log_path, 880 redirect_streams=True, 881 write_timestamps=write_timestamps, 882 timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'), 883 ) 884 return self._rotating_log 885 886 @property 887 def stdin_file(self): 888 """ 889 Return the file handler for the stdin file. 890 """ 891 if '_stdin_file' in self.__dict__: 892 return self._stdin_file 893 894 self._stdin_file = StdinFile( 895 self.stdin_file_path, 896 lock_file_path=self.blocking_stdin_file_path, 897 ) 898 return self._stdin_file 899 900 @property 901 def log_text(self) -> Optional[str]: 902 """ 903 Read the log files and return their contents. 904 Returns `None` if the log file does not exist. 905 """ 906 new_rotating_log = RotatingFile( 907 self.rotating_log.file_path, 908 num_files_to_keep = self.rotating_log.num_files_to_keep, 909 max_file_size = self.rotating_log.max_file_size, 910 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'), 911 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'), 912 ) 913 return new_rotating_log.read() 914 915 def readlines(self) -> List[str]: 916 """ 917 Read the next log lines, persisting the cursor for later use. 918 Note this will alter the cursor of `self.rotating_log`. 919 """ 920 self.rotating_log._cursor = self._read_log_offset() 921 lines = self.rotating_log.readlines() 922 self._write_log_offset() 923 return lines 924 925 def _read_log_offset(self) -> Tuple[int, int]: 926 """ 927 Return the current log offset cursor. 928 929 Returns 930 ------- 931 A tuple of the form (`subfile_index`, `position`). 932 """ 933 if not self.log_offset_path.exists(): 934 return 0, 0 935 936 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 937 cursor_text = f.read() 938 cursor_parts = cursor_text.split(' ') 939 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 940 return subfile_index, subfile_position 941 942 def _write_log_offset(self) -> None: 943 """ 944 Write the current log offset file. 945 """ 946 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 947 subfile_index = self.rotating_log._cursor[0] 948 subfile_position = self.rotating_log._cursor[1] 949 f.write(f"{subfile_index} {subfile_position}") 950 951 @property 952 def pid(self) -> Union[int, None]: 953 """ 954 Read the PID file and return its contents. 955 Returns `None` if the PID file does not exist. 956 """ 957 if not self.pid_path.exists(): 958 return None 959 try: 960 with open(self.pid_path, 'r', encoding='utf-8') as f: 961 text = f.read() 962 if len(text) == 0: 963 return None 964 pid = int(text.rstrip()) 965 except Exception as e: 966 warn(e) 967 text = None 968 pid = None 969 return pid 970 971 @property 972 def pid_path(self) -> pathlib.Path: 973 """ 974 Return the path to a file containing the PID for this Daemon. 975 """ 976 return self.path / 'process.pid' 977 978 @property 979 def pid_lock(self) -> 'fasteners.InterProcessLock': 980 """ 981 Return the process lock context manager. 982 """ 983 if '_pid_lock' in self.__dict__: 984 return self._pid_lock 985 986 fasteners = attempt_import('fasteners') 987 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 988 return self._pid_lock 989 990 @property 991 def pickle_path(self) -> pathlib.Path: 992 """ 993 Return the path for the pickle file. 994 """ 995 return self.path / 'pickle.pkl' 996 997 def read_properties(self) -> Optional[Dict[str, Any]]: 998 """Read the properties JSON file and return the dictionary.""" 999 if not self.properties_path.exists(): 1000 return None 1001 try: 1002 with open(self.properties_path, 'r', encoding='utf-8') as file: 1003 properties = json.load(file) 1004 except Exception as e: 1005 properties = {} 1006 1007 return properties 1008 1009 def read_pickle(self) -> Daemon: 1010 """Read a Daemon's pickle file and return the `Daemon`.""" 1011 import pickle, traceback 1012 if not self.pickle_path.exists(): 1013 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1014 1015 if self.pickle_path.stat().st_size == 0: 1016 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1017 1018 try: 1019 with open(self.pickle_path, 'rb') as pickle_file: 1020 daemon = pickle.load(pickle_file) 1021 success, msg = True, 'Success' 1022 except Exception as e: 1023 success, msg = False, str(e) 1024 daemon = None 1025 traceback.print_exception(type(e), e, e.__traceback__) 1026 if not success: 1027 error(msg) 1028 return daemon 1029 1030 @property 1031 def properties(self) -> Dict[str, Any]: 1032 """ 1033 Return the contents of the properties JSON file. 1034 """ 1035 try: 1036 _file_properties = self.read_properties() 1037 except Exception: 1038 traceback.print_exc() 1039 _file_properties = {} 1040 1041 if not self._properties: 1042 self._properties = _file_properties 1043 1044 if self._properties is None: 1045 self._properties = {} 1046 1047 if _file_properties is not None: 1048 self._properties = apply_patch_to_config( 1049 _file_properties, 1050 self._properties, 1051 ) 1052 1053 return self._properties 1054 1055 @property 1056 def hidden(self) -> bool: 1057 """ 1058 Return a bool indicating whether this Daemon should be displayed. 1059 """ 1060 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1061 1062 def write_properties(self) -> SuccessTuple: 1063 """Write the properties dictionary to the properties JSON file 1064 (only if self.properties exists). 1065 """ 1066 success, msg = ( 1067 False, 1068 f"No properties to write for daemon '{self.daemon_id}'." 1069 ) 1070 if self.properties is not None: 1071 try: 1072 self.path.mkdir(parents=True, exist_ok=True) 1073 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1074 json.dump(self.properties, properties_file) 1075 success, msg = True, 'Success' 1076 except Exception as e: 1077 success, msg = False, str(e) 1078 return success, msg 1079 1080 def write_pickle(self) -> SuccessTuple: 1081 """Write the pickle file for the daemon.""" 1082 import pickle, traceback 1083 try: 1084 self.path.mkdir(parents=True, exist_ok=True) 1085 with open(self.pickle_path, 'wb+') as pickle_file: 1086 pickle.dump(self, pickle_file) 1087 success, msg = True, "Success" 1088 except Exception as e: 1089 success, msg = False, str(e) 1090 traceback.print_exception(type(e), e, e.__traceback__) 1091 return success, msg 1092 1093 1094 def _setup( 1095 self, 1096 allow_dirty_run: bool = False, 1097 ) -> None: 1098 """ 1099 Update properties before starting the Daemon. 1100 """ 1101 if self.properties is None: 1102 self._properties = {} 1103 1104 self._properties.update({ 1105 'target': { 1106 'name': self.target.__name__, 1107 'module': self.target.__module__, 1108 'args': self.target_args, 1109 'kw': self.target_kw, 1110 }, 1111 }) 1112 self.mkdir_if_not_exists(allow_dirty_run) 1113 _write_properties_success_tuple = self.write_properties() 1114 if not _write_properties_success_tuple[0]: 1115 error(_write_properties_success_tuple[1]) 1116 1117 _write_pickle_success_tuple = self.write_pickle() 1118 if not _write_pickle_success_tuple[0]: 1119 error(_write_pickle_success_tuple[1]) 1120 1121 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1122 """ 1123 Remove a daemon's directory after execution. 1124 1125 Parameters 1126 ---------- 1127 keep_logs: bool, default False 1128 If `True`, skip deleting the daemon's log files. 1129 1130 Returns 1131 ------- 1132 A `SuccessTuple` indicating success. 1133 """ 1134 if self.path.exists(): 1135 try: 1136 shutil.rmtree(self.path) 1137 except Exception as e: 1138 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1139 warn(msg) 1140 return False, msg 1141 if not keep_logs: 1142 self.rotating_log.delete() 1143 try: 1144 if self.log_offset_path.exists(): 1145 self.log_offset_path.unlink() 1146 except Exception as e: 1147 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1148 warn(msg) 1149 return False, msg 1150 return True, "Success" 1151 1152 1153 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1154 """ 1155 Return the timeout value to use. Use `--timeout-seconds` if provided, 1156 else the configured default (8). 1157 """ 1158 if isinstance(timeout, (int, float)): 1159 return timeout 1160 return get_config('jobs', 'timeout_seconds') 1161 1162 1163 def get_check_timeout_interval_seconds( 1164 self, 1165 check_timeout_interval: Union[int, float, None] = None, 1166 ) -> Union[int, float]: 1167 """ 1168 Return the interval value to check the status of timeouts. 1169 """ 1170 if isinstance(check_timeout_interval, (int, float)): 1171 return check_timeout_interval 1172 return get_config('jobs', 'check_timeout_interval_seconds') 1173 1174 @property 1175 def target_args(self) -> Union[Tuple[Any], None]: 1176 """ 1177 Return the positional arguments to pass to the target function. 1178 """ 1179 target_args = ( 1180 self.__dict__.get('_target_args', None) 1181 or self.properties.get('target', {}).get('args', None) 1182 ) 1183 if target_args is None: 1184 return tuple([]) 1185 1186 return tuple(target_args) 1187 1188 @property 1189 def target_kw(self) -> Union[Dict[str, Any], None]: 1190 """ 1191 Return the keyword arguments to pass to the target function. 1192 """ 1193 target_kw = ( 1194 self.__dict__.get('_target_kw', None) 1195 or self.properties.get('target', {}).get('kw', None) 1196 ) 1197 if target_kw is None: 1198 return {} 1199 1200 return {key: val for key, val in target_kw.items()} 1201 1202 def __getstate__(self): 1203 """ 1204 Pickle this Daemon. 1205 """ 1206 dill = attempt_import('dill') 1207 return { 1208 'target': dill.dumps(self.target), 1209 'target_args': self.target_args, 1210 'target_kw': self.target_kw, 1211 'daemon_id': self.daemon_id, 1212 'label': self.label, 1213 'properties': self.properties, 1214 } 1215 1216 def __setstate__(self, _state: Dict[str, Any]): 1217 """ 1218 Restore this Daemon from a pickled state. 1219 If the properties file exists, skip the old pickled version. 1220 """ 1221 dill = attempt_import('dill') 1222 _state['target'] = dill.loads(_state['target']) 1223 self._pickle = True 1224 daemon_id = _state.get('daemon_id', None) 1225 if not daemon_id: 1226 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1227 1228 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1229 ignore_properties = properties_path.exists() 1230 if ignore_properties: 1231 _state = { 1232 key: val 1233 for key, val in _state.items() 1234 if key != 'properties' 1235 } 1236 self.__init__(**_state) 1237 1238 1239 def __repr__(self): 1240 return str(self) 1241 1242 def __str__(self): 1243 return self.daemon_id 1244 1245 def __eq__(self, other): 1246 if not isinstance(other, Daemon): 1247 return False 1248 return self.daemon_id == other.daemon_id 1249 1250 def __hash__(self): 1251 return hash(self.daemon_id)
Daemonize Python functions into background processes.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 |
>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |
>>> mrsm.pprint(daemon.properties)
{
'label': 'print',
'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
'result': None,
'process': {'ended': '2024-07-29T18:03:33.752806'}
}
140 def __init__( 141 self, 142 target: Optional[Callable[[Any], Any]] = None, 143 target_args: Union[List[Any], Tuple[Any], None] = None, 144 target_kw: Optional[Dict[str, Any]] = None, 145 env: Optional[Dict[str, str]] = None, 146 daemon_id: Optional[str] = None, 147 label: Optional[str] = None, 148 properties: Optional[Dict[str, Any]] = None, 149 ): 150 """ 151 Parameters 152 ---------- 153 target: Optional[Callable[[Any], Any]], default None, 154 The function to execute in a child process. 155 156 target_args: Union[List[Any], Tuple[Any], None], default None 157 Positional arguments to pass to the target function. 158 159 target_kw: Optional[Dict[str, Any]], default None 160 Keyword arguments to pass to the target function. 161 162 env: Optional[Dict[str, str]], default None 163 If provided, set these environment variables in the daemon process. 164 165 daemon_id: Optional[str], default None 166 Build a `Daemon` from an existing `daemon_id`. 167 If `daemon_id` is provided, other arguments are ignored and are derived 168 from the existing pickled `Daemon`. 169 170 label: Optional[str], default None 171 Label string to help identifiy a daemon. 172 If `None`, use the function name instead. 173 174 properties: Optional[Dict[str, Any]], default None 175 Override reading from the properties JSON by providing an existing dictionary. 176 """ 177 _pickle = self.__dict__.get('_pickle', False) 178 if daemon_id is not None: 179 self.daemon_id = daemon_id 180 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 181 182 if not self.properties_path.exists(): 183 raise Exception( 184 f"Daemon '{self.daemon_id}' does not exist. " 185 + "Pass a target to create a new Daemon." 186 ) 187 188 try: 189 new_daemon = self.from_properties_file(daemon_id) 190 except Exception: 191 new_daemon = None 192 193 if new_daemon is not None: 194 new_daemon.write_pickle() 195 target = new_daemon.target 196 target_args = new_daemon.target_args 197 target_kw = new_daemon.target_kw 198 label = new_daemon.label 199 self._properties = new_daemon.properties 200 else: 201 try: 202 self.properties_path.unlink() 203 except Exception: 204 pass 205 206 raise Exception( 207 f"Could not recover daemon '{self.daemon_id}' " 208 + "from its properties file." 209 ) 210 211 if 'target' not in self.__dict__: 212 if target is None: 213 error("Cannot create a Daemon without a target.") 214 self.target = target 215 216 ### NOTE: We have to check self.__dict__ in case we un-pickling. 217 if '_target_args' not in self.__dict__: 218 self._target_args = target_args 219 if '_target_kw' not in self.__dict__: 220 self._target_kw = target_kw 221 222 if 'label' not in self.__dict__: 223 if label is None: 224 label = ( 225 self.target.__name__ if '__name__' in self.target.__dir__() 226 else str(self.target) 227 ) 228 self.label = label 229 if 'daemon_id' not in self.__dict__: 230 self.daemon_id = get_new_daemon_name() 231 if '_properties' not in self.__dict__: 232 self._properties = properties 233 if self._properties is None: 234 self._properties = {} 235 236 self._properties.update({'label': self.label}) 237 if env: 238 self._properties.update({'env': env}) 239 240 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 241 _ = self.process
Parameters
- target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
- target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
- target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
- daemon_id (Optional[str], default None):
Build a
Daemon
from an existingdaemon_id
. Ifdaemon_id
is provided, other arguments are ignored and are derived from the existing pickledDaemon
. - label (Optional[str], default None):
Label string to help identifiy a daemon.
If
None
, use the function name instead. - properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
93 @classmethod 94 def from_properties_file(cls, daemon_id: str) -> Daemon: 95 """ 96 Return a Daemon from a properties dictionary. 97 """ 98 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 99 if not properties_path.exists(): 100 raise OSError(f"Properties file '{properties_path}' does not exist.") 101 102 try: 103 with open(properties_path, 'r', encoding='utf-8') as f: 104 properties = json.load(f) 105 except Exception: 106 properties = {} 107 108 if not properties: 109 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 110 111 daemon_id = properties_path.parent.name 112 target_cf = properties.get('target', {}) 113 target_module_name = target_cf.get('module', None) 114 target_function_name = target_cf.get('name', None) 115 target_args = target_cf.get('args', None) 116 target_kw = target_cf.get('kw', None) 117 label = properties.get('label', None) 118 119 if None in [ 120 target_module_name, 121 target_function_name, 122 target_args, 123 target_kw, 124 ]: 125 raise ValueError("Missing target function information.") 126 127 target_module = importlib.import_module(target_module_name) 128 target_function = getattr(target_module, target_function_name) 129 130 return Daemon( 131 daemon_id=daemon_id, 132 target=target_function, 133 target_args=target_args, 134 target_kw=target_kw, 135 properties=properties, 136 label=label, 137 )
Return a Daemon from a properties dictionary.
389 def run( 390 self, 391 keep_daemon_output: bool = True, 392 allow_dirty_run: bool = False, 393 debug: bool = False, 394 ) -> SuccessTuple: 395 """Run the daemon as a child process and continue executing the parent. 396 397 Parameters 398 ---------- 399 keep_daemon_output: bool, default True 400 If `False`, delete the daemon's output directory upon exiting. 401 402 allow_dirty_run: bool, default False 403 If `True`, run the daemon, even if the `daemon_id` directory exists. 404 This option is dangerous because if the same `daemon_id` runs concurrently, 405 the last to finish will overwrite the output of the first. 406 407 Returns 408 ------- 409 A SuccessTuple indicating success. 410 411 """ 412 import platform 413 if platform.system() == 'Windows': 414 return False, "Cannot run background jobs on Windows." 415 416 ### The daemon might exist and be paused. 417 if self.status == 'paused': 418 return self.resume() 419 420 self._remove_stop_file() 421 if self.status == 'running': 422 return True, f"Daemon '{self}' is already running." 423 424 self.mkdir_if_not_exists(allow_dirty_run) 425 _write_pickle_success_tuple = self.write_pickle() 426 if not _write_pickle_success_tuple[0]: 427 return _write_pickle_success_tuple 428 429 _launch_daemon_code = ( 430 "from meerschaum.utils.daemon import Daemon; " 431 + f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 432 + f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 433 + "allow_dirty_run=True)" 434 ) 435 env = dict(os.environ) 436 env['MRSM_NOASK'] = 'true' 437 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 438 msg = ( 439 "Success" 440 if _launch_success_bool 441 else f"Failed to start daemon '{self.daemon_id}'." 442 ) 443 return _launch_success_bool, msg
Run the daemon as a child process and continue executing the parent.
Parameters
- keep_daemon_output (bool, default True):
If
False
, delete the daemon's output directory upon exiting. - allow_dirty_run (bool, default False):
If
True
, run the daemon, even if thedaemon_id
directory exists. This option is dangerous because if the samedaemon_id
runs concurrently, the last to finish will overwrite the output of the first.
Returns
- A SuccessTuple indicating success.
445 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 446 """ 447 Forcibly terminate a running daemon. 448 Sends a SIGTERM signal to the process. 449 450 Parameters 451 ---------- 452 timeout: Optional[int], default 3 453 How many seconds to wait for the process to terminate. 454 455 Returns 456 ------- 457 A SuccessTuple indicating success. 458 """ 459 if self.status != 'paused': 460 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 461 if success: 462 self._write_stop_file('kill') 463 return success, msg 464 465 if self.status == 'stopped': 466 self._write_stop_file('kill') 467 return True, "Process has already stopped." 468 469 process = self.process 470 try: 471 process.terminate() 472 process.kill() 473 process.wait(timeout=timeout) 474 except Exception as e: 475 return False, f"Failed to kill job {self} with exception: {e}" 476 477 if self.pid_path.exists(): 478 try: 479 self.pid_path.unlink() 480 except Exception as e: 481 pass 482 483 self._write_stop_file('kill') 484 return True, "Success"
Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.
Parameters
- timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
- A SuccessTuple indicating success.
486 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 487 """Gracefully quit a running daemon.""" 488 if self.status == 'paused': 489 return self.kill(timeout) 490 491 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 492 if signal_success: 493 self._write_stop_file('quit') 494 return signal_success, signal_msg
Gracefully quit a running daemon.
496 def pause( 497 self, 498 timeout: Union[int, float, None] = None, 499 check_timeout_interval: Union[float, int, None] = None, 500 ) -> SuccessTuple: 501 """ 502 Pause the daemon if it is running. 503 504 Parameters 505 ---------- 506 timeout: Union[float, int, None], default None 507 The maximum number of seconds to wait for a process to suspend. 508 509 check_timeout_interval: Union[float, int, None], default None 510 The number of seconds to wait between checking if the process is still running. 511 512 Returns 513 ------- 514 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 515 """ 516 if self.process is None: 517 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 518 519 if self.status == 'paused': 520 return True, f"Daemon '{self.daemon_id}' is already paused." 521 522 self._write_stop_file('pause') 523 try: 524 self.process.suspend() 525 except Exception as e: 526 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 527 528 timeout = self.get_timeout_seconds(timeout) 529 check_timeout_interval = self.get_check_timeout_interval_seconds( 530 check_timeout_interval 531 ) 532 533 psutil = attempt_import('psutil') 534 535 if not timeout: 536 try: 537 success = self.process.status() == 'stopped' 538 except psutil.NoSuchProcess as e: 539 success = True 540 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 541 if success: 542 self._capture_process_timestamp('paused') 543 return success, msg 544 545 begin = time.perf_counter() 546 while (time.perf_counter() - begin) < timeout: 547 try: 548 if self.process.status() == 'stopped': 549 self._capture_process_timestamp('paused') 550 return True, "Success" 551 except psutil.NoSuchProcess as e: 552 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 553 time.sleep(check_timeout_interval) 554 555 return False, ( 556 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 557 + ('s' if timeout != 1 else '') + '.' 558 )
Pause the daemon if it is running.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully suspended.
560 def resume( 561 self, 562 timeout: Union[int, float, None] = None, 563 check_timeout_interval: Union[float, int, None] = None, 564 ) -> SuccessTuple: 565 """ 566 Resume the daemon if it is paused. 567 568 Parameters 569 ---------- 570 timeout: Union[float, int, None], default None 571 The maximum number of seconds to wait for a process to resume. 572 573 check_timeout_interval: Union[float, int, None], default None 574 The number of seconds to wait between checking if the process is still stopped. 575 576 Returns 577 ------- 578 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 579 """ 580 if self.status == 'running': 581 return True, f"Daemon '{self.daemon_id}' is already running." 582 583 if self.status == 'stopped': 584 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 585 586 self._remove_stop_file() 587 try: 588 self.process.resume() 589 except Exception as e: 590 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 591 592 timeout = self.get_timeout_seconds(timeout) 593 check_timeout_interval = self.get_check_timeout_interval_seconds( 594 check_timeout_interval 595 ) 596 597 if not timeout: 598 success = self.status == 'running' 599 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 600 if success: 601 self._capture_process_timestamp('began') 602 return success, msg 603 604 begin = time.perf_counter() 605 while (time.perf_counter() - begin) < timeout: 606 if self.status == 'running': 607 self._capture_process_timestamp('began') 608 return True, "Success" 609 time.sleep(check_timeout_interval) 610 611 return False, ( 612 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 613 + ('s' if timeout != 1 else '') + '.' 614 )
Resume the daemon if it is paused.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully resumed.
735 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 736 """Create the Daemon's directory. 737 If `allow_dirty_run` is `False` and the directory already exists, 738 raise a `FileExistsError`. 739 """ 740 try: 741 self.path.mkdir(parents=True, exist_ok=True) 742 _already_exists = any(os.scandir(self.path)) 743 except FileExistsError: 744 _already_exists = True 745 746 if _already_exists and not allow_dirty_run: 747 error( 748 f"Daemon '{self.daemon_id}' already exists. " + 749 f"To allow this daemon to run, do one of the following:\n" 750 + " - Execute `daemon.cleanup()`.\n" 751 + f" - Delete the directory '{self.path}'.\n" 752 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 753 FileExistsError, 754 )
Create the Daemon's directory.
If allow_dirty_run
is False
and the directory already exists,
raise a FileExistsError
.
756 @property 757 def process(self) -> Union['psutil.Process', None]: 758 """ 759 Return the psutil process for the Daemon. 760 """ 761 psutil = attempt_import('psutil') 762 pid = self.pid 763 if pid is None: 764 return None 765 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 766 try: 767 self._process = psutil.Process(int(pid)) 768 except Exception as e: 769 if self.pid_path.exists(): 770 self.pid_path.unlink() 771 return None 772 return self._process
Return the psutil process for the Daemon.
774 @property 775 def status(self) -> str: 776 """ 777 Return the running status of this Daemon. 778 """ 779 if self.process is None: 780 return 'stopped' 781 782 psutil = attempt_import('psutil') 783 try: 784 if self.process.status() == 'stopped': 785 return 'paused' 786 if self.process.status() == 'zombie': 787 raise psutil.NoSuchProcess(self.process.pid) 788 except (psutil.NoSuchProcess, AttributeError): 789 if self.pid_path.exists(): 790 try: 791 self.pid_path.unlink() 792 except Exception as e: 793 pass 794 return 'stopped' 795 796 return 'running'
Return the running status of this Daemon.
805 @property 806 def path(self) -> pathlib.Path: 807 """ 808 Return the path for this Daemon's directory. 809 """ 810 return self._get_path_from_daemon_id(self.daemon_id)
Return the path for this Daemon's directory.
819 @property 820 def properties_path(self) -> pathlib.Path: 821 """ 822 Return the `propterties.json` path for this Daemon. 823 """ 824 return self._get_properties_path_from_daemon_id(self.daemon_id)
Return the propterties.json
path for this Daemon.
826 @property 827 def stop_path(self) -> pathlib.Path: 828 """ 829 Return the path for the stop file (created when manually stopped). 830 """ 831 return self.path / '.stop.json'
Return the path for the stop file (created when manually stopped).
833 @property 834 def log_path(self) -> pathlib.Path: 835 """ 836 Return the log path. 837 """ 838 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log')
Return the log path.
840 @property 841 def stdin_file_path(self) -> pathlib.Path: 842 """ 843 Return the stdin file path. 844 """ 845 return self.path / 'input.stdin'
Return the stdin file path.
847 @property 848 def blocking_stdin_file_path(self) -> pathlib.Path: 849 """ 850 Return the stdin file path. 851 """ 852 if '_blocking_stdin_file_path' in self.__dict__: 853 return self._blocking_stdin_file_path 854 855 return self.path / 'input.stdin.block'
Return the stdin file path.
857 @property 858 def log_offset_path(self) -> pathlib.Path: 859 """ 860 Return the log offset file path. 861 """ 862 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
Return the log offset file path.
864 @property 865 def rotating_log(self) -> RotatingFile: 866 """ 867 The rotating log file for the daemon's output. 868 """ 869 if '_rotating_log' in self.__dict__: 870 return self._rotating_log 871 872 write_timestamps = ( 873 self.properties.get('logs', {}).get('write_timestamps', None) 874 ) 875 if write_timestamps is None: 876 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 877 878 self._rotating_log = RotatingFile( 879 self.log_path, 880 redirect_streams=True, 881 write_timestamps=write_timestamps, 882 timestamp_format=get_config('jobs', 'logs', 'timestamps', 'format'), 883 ) 884 return self._rotating_log
The rotating log file for the daemon's output.
886 @property 887 def stdin_file(self): 888 """ 889 Return the file handler for the stdin file. 890 """ 891 if '_stdin_file' in self.__dict__: 892 return self._stdin_file 893 894 self._stdin_file = StdinFile( 895 self.stdin_file_path, 896 lock_file_path=self.blocking_stdin_file_path, 897 ) 898 return self._stdin_file
Return the file handler for the stdin file.
900 @property 901 def log_text(self) -> Optional[str]: 902 """ 903 Read the log files and return their contents. 904 Returns `None` if the log file does not exist. 905 """ 906 new_rotating_log = RotatingFile( 907 self.rotating_log.file_path, 908 num_files_to_keep = self.rotating_log.num_files_to_keep, 909 max_file_size = self.rotating_log.max_file_size, 910 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled'), 911 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format'), 912 ) 913 return new_rotating_log.read()
Read the log files and return their contents.
Returns None
if the log file does not exist.
915 def readlines(self) -> List[str]: 916 """ 917 Read the next log lines, persisting the cursor for later use. 918 Note this will alter the cursor of `self.rotating_log`. 919 """ 920 self.rotating_log._cursor = self._read_log_offset() 921 lines = self.rotating_log.readlines() 922 self._write_log_offset() 923 return lines
Read the next log lines, persisting the cursor for later use.
Note this will alter the cursor of self.rotating_log
.
951 @property 952 def pid(self) -> Union[int, None]: 953 """ 954 Read the PID file and return its contents. 955 Returns `None` if the PID file does not exist. 956 """ 957 if not self.pid_path.exists(): 958 return None 959 try: 960 with open(self.pid_path, 'r', encoding='utf-8') as f: 961 text = f.read() 962 if len(text) == 0: 963 return None 964 pid = int(text.rstrip()) 965 except Exception as e: 966 warn(e) 967 text = None 968 pid = None 969 return pid
Read the PID file and return its contents.
Returns None
if the PID file does not exist.
971 @property 972 def pid_path(self) -> pathlib.Path: 973 """ 974 Return the path to a file containing the PID for this Daemon. 975 """ 976 return self.path / 'process.pid'
Return the path to a file containing the PID for this Daemon.
978 @property 979 def pid_lock(self) -> 'fasteners.InterProcessLock': 980 """ 981 Return the process lock context manager. 982 """ 983 if '_pid_lock' in self.__dict__: 984 return self._pid_lock 985 986 fasteners = attempt_import('fasteners') 987 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 988 return self._pid_lock
Return the process lock context manager.
990 @property 991 def pickle_path(self) -> pathlib.Path: 992 """ 993 Return the path for the pickle file. 994 """ 995 return self.path / 'pickle.pkl'
Return the path for the pickle file.
997 def read_properties(self) -> Optional[Dict[str, Any]]: 998 """Read the properties JSON file and return the dictionary.""" 999 if not self.properties_path.exists(): 1000 return None 1001 try: 1002 with open(self.properties_path, 'r', encoding='utf-8') as file: 1003 properties = json.load(file) 1004 except Exception as e: 1005 properties = {} 1006 1007 return properties
Read the properties JSON file and return the dictionary.
1009 def read_pickle(self) -> Daemon: 1010 """Read a Daemon's pickle file and return the `Daemon`.""" 1011 import pickle, traceback 1012 if not self.pickle_path.exists(): 1013 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1014 1015 if self.pickle_path.stat().st_size == 0: 1016 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1017 1018 try: 1019 with open(self.pickle_path, 'rb') as pickle_file: 1020 daemon = pickle.load(pickle_file) 1021 success, msg = True, 'Success' 1022 except Exception as e: 1023 success, msg = False, str(e) 1024 daemon = None 1025 traceback.print_exception(type(e), e, e.__traceback__) 1026 if not success: 1027 error(msg) 1028 return daemon
Read a Daemon's pickle file and return the Daemon
.
1030 @property 1031 def properties(self) -> Dict[str, Any]: 1032 """ 1033 Return the contents of the properties JSON file. 1034 """ 1035 try: 1036 _file_properties = self.read_properties() 1037 except Exception: 1038 traceback.print_exc() 1039 _file_properties = {} 1040 1041 if not self._properties: 1042 self._properties = _file_properties 1043 1044 if self._properties is None: 1045 self._properties = {} 1046 1047 if _file_properties is not None: 1048 self._properties = apply_patch_to_config( 1049 _file_properties, 1050 self._properties, 1051 ) 1052 1053 return self._properties
Return the contents of the properties JSON file.
1062 def write_properties(self) -> SuccessTuple: 1063 """Write the properties dictionary to the properties JSON file 1064 (only if self.properties exists). 1065 """ 1066 success, msg = ( 1067 False, 1068 f"No properties to write for daemon '{self.daemon_id}'." 1069 ) 1070 if self.properties is not None: 1071 try: 1072 self.path.mkdir(parents=True, exist_ok=True) 1073 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1074 json.dump(self.properties, properties_file) 1075 success, msg = True, 'Success' 1076 except Exception as e: 1077 success, msg = False, str(e) 1078 return success, msg
Write the properties dictionary to the properties JSON file (only if self.properties exists).
1080 def write_pickle(self) -> SuccessTuple: 1081 """Write the pickle file for the daemon.""" 1082 import pickle, traceback 1083 try: 1084 self.path.mkdir(parents=True, exist_ok=True) 1085 with open(self.pickle_path, 'wb+') as pickle_file: 1086 pickle.dump(self, pickle_file) 1087 success, msg = True, "Success" 1088 except Exception as e: 1089 success, msg = False, str(e) 1090 traceback.print_exception(type(e), e, e.__traceback__) 1091 return success, msg
Write the pickle file for the daemon.
1121 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1122 """ 1123 Remove a daemon's directory after execution. 1124 1125 Parameters 1126 ---------- 1127 keep_logs: bool, default False 1128 If `True`, skip deleting the daemon's log files. 1129 1130 Returns 1131 ------- 1132 A `SuccessTuple` indicating success. 1133 """ 1134 if self.path.exists(): 1135 try: 1136 shutil.rmtree(self.path) 1137 except Exception as e: 1138 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1139 warn(msg) 1140 return False, msg 1141 if not keep_logs: 1142 self.rotating_log.delete() 1143 try: 1144 if self.log_offset_path.exists(): 1145 self.log_offset_path.unlink() 1146 except Exception as e: 1147 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1148 warn(msg) 1149 return False, msg 1150 return True, "Success"
Remove a daemon's directory after execution.
Parameters
- keep_logs (bool, default False):
If
True
, skip deleting the daemon's log files.
Returns
- A
SuccessTuple
indicating success.
1153 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1154 """ 1155 Return the timeout value to use. Use `--timeout-seconds` if provided, 1156 else the configured default (8). 1157 """ 1158 if isinstance(timeout, (int, float)): 1159 return timeout 1160 return get_config('jobs', 'timeout_seconds')
Return the timeout value to use. Use --timeout-seconds
if provided,
else the configured default (8).
1163 def get_check_timeout_interval_seconds( 1164 self, 1165 check_timeout_interval: Union[int, float, None] = None, 1166 ) -> Union[int, float]: 1167 """ 1168 Return the interval value to check the status of timeouts. 1169 """ 1170 if isinstance(check_timeout_interval, (int, float)): 1171 return check_timeout_interval 1172 return get_config('jobs', 'check_timeout_interval_seconds')
Return the interval value to check the status of timeouts.
1174 @property 1175 def target_args(self) -> Union[Tuple[Any], None]: 1176 """ 1177 Return the positional arguments to pass to the target function. 1178 """ 1179 target_args = ( 1180 self.__dict__.get('_target_args', None) 1181 or self.properties.get('target', {}).get('args', None) 1182 ) 1183 if target_args is None: 1184 return tuple([]) 1185 1186 return tuple(target_args)
Return the positional arguments to pass to the target function.
1188 @property 1189 def target_kw(self) -> Union[Dict[str, Any], None]: 1190 """ 1191 Return the keyword arguments to pass to the target function. 1192 """ 1193 target_kw = ( 1194 self.__dict__.get('_target_kw', None) 1195 or self.properties.get('target', {}).get('kw', None) 1196 ) 1197 if target_kw is None: 1198 return {} 1199 1200 return {key: val for key, val in target_kw.items()}
Return the keyword arguments to pass to the target function.