meerschaum.utils.daemon.Daemon
Manage running daemons via the Daemon class.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Manage running daemons via the Daemon class. 7""" 8 9from __future__ import annotations 10import os 11import importlib 12import pathlib 13import json 14import shutil 15import signal 16import time 17import traceback 18from functools import partial 19from datetime import datetime, timezone 20 21import meerschaum as mrsm 22from meerschaum.utils.typing import ( 23 Optional, Dict, Any, SuccessTuple, Callable, List, Union, 24 is_success_tuple, Tuple, 25) 26from meerschaum.config import get_config 27from meerschaum._internal.static import STATIC_CONFIG 28from meerschaum.config._patch import apply_patch_to_config 29from meerschaum.utils.warnings import warn, error 30from meerschaum.utils.packages import attempt_import 31from meerschaum.utils.venv import venv_exec 32from meerschaum.utils.daemon._names import get_new_daemon_name 33from meerschaum.utils.daemon.RotatingFile import RotatingFile 34from meerschaum.utils.daemon.StdinFile import StdinFile 35from meerschaum.utils.threading import RepeatTimer 36from meerschaum.__main__ import _close_pools 37 38_daemons = [] 39_results = {} 40 41class Daemon: 42 """ 43 Daemonize Python functions into background processes. 44 45 Examples 46 -------- 47 >>> import meerschaum as mrsm 48 >>> from meerschaum.utils.daemons import Daemon 49 >>> daemon = Daemon(print, ('hi',)) 50 >>> success, msg = daemon.run() 51 >>> print(daemon.log_text) 52 53 2024-07-29 18:03 | hi 54 2024-07-29 18:03 | 55 >>> daemon.run(allow_dirty_run=True) 56 >>> print(daemon.log_text) 57 58 2024-07-29 18:03 | hi 59 2024-07-29 18:03 | 60 2024-07-29 18:05 | hi 61 2024-07-29 18:05 | 62 >>> mrsm.pprint(daemon.properties) 63 { 64 'label': 'print', 65 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 66 'result': None, 67 'process': {'ended': '2024-07-29T18:03:33.752806'} 68 } 69 70 """ 71 72 def __new__( 73 cls, 74 *args, 75 daemon_id: Optional[str] = None, 76 **kw 77 ): 78 """ 79 If a daemon_id is provided and already exists, read from its pickle file. 80 """ 81 instance = super(Daemon, cls).__new__(cls) 82 if daemon_id is not None: 83 instance.daemon_id = daemon_id 84 if instance.pickle_path.exists(): 85 instance = instance.read_pickle() 86 return instance 87 88 @classmethod 89 def from_properties_file(cls, daemon_id: str) -> Daemon: 90 """ 91 Return a Daemon from a properties dictionary. 92 """ 93 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 94 if not properties_path.exists(): 95 raise OSError(f"Properties file '{properties_path}' does not exist.") 96 97 try: 98 with open(properties_path, 'r', encoding='utf-8') as f: 99 properties = json.load(f) 100 except Exception: 101 properties = {} 102 103 if not properties: 104 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 105 106 daemon_id = properties_path.parent.name 107 target_cf = properties.get('target', {}) 108 target_module_name = target_cf.get('module', None) 109 target_function_name = target_cf.get('name', None) 110 target_args = target_cf.get('args', None) 111 target_kw = target_cf.get('kw', None) 112 label = properties.get('label', None) 113 114 if None in [ 115 target_module_name, 116 target_function_name, 117 target_args, 118 target_kw, 119 ]: 120 raise ValueError("Missing target function information.") 121 122 target_module = importlib.import_module(target_module_name) 123 target_function = getattr(target_module, target_function_name) 124 125 return Daemon( 126 daemon_id=daemon_id, 127 target=target_function, 128 target_args=target_args, 129 target_kw=target_kw, 130 properties=properties, 131 label=label, 132 ) 133 134 135 def __init__( 136 self, 137 target: Optional[Callable[[Any], Any]] = None, 138 target_args: Union[List[Any], Tuple[Any], None] = None, 139 target_kw: Optional[Dict[str, Any]] = None, 140 env: Optional[Dict[str, str]] = None, 141 daemon_id: Optional[str] = None, 142 label: Optional[str] = None, 143 properties: Optional[Dict[str, Any]] = None, 144 pickle: bool = True, 145 ): 146 """ 147 Parameters 148 ---------- 149 target: Optional[Callable[[Any], Any]], default None, 150 The function to execute in a child process. 151 152 target_args: Union[List[Any], Tuple[Any], None], default None 153 Positional arguments to pass to the target function. 154 155 target_kw: Optional[Dict[str, Any]], default None 156 Keyword arguments to pass to the target function. 157 158 env: Optional[Dict[str, str]], default None 159 If provided, set these environment variables in the daemon process. 160 161 daemon_id: Optional[str], default None 162 Build a `Daemon` from an existing `daemon_id`. 163 If `daemon_id` is provided, other arguments are ignored and are derived 164 from the existing pickled `Daemon`. 165 166 label: Optional[str], default None 167 Label string to help identifiy a daemon. 168 If `None`, use the function name instead. 169 170 properties: Optional[Dict[str, Any]], default None 171 Override reading from the properties JSON by providing an existing dictionary. 172 """ 173 _pickle = self.__dict__.get('_pickle', False) 174 if daemon_id is not None: 175 self.daemon_id = daemon_id 176 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 177 178 if not self.properties_path.exists(): 179 raise Exception( 180 f"Daemon '{self.daemon_id}' does not exist. " 181 + "Pass a target to create a new Daemon." 182 ) 183 184 try: 185 new_daemon = self.from_properties_file(daemon_id) 186 except Exception: 187 new_daemon = None 188 189 if new_daemon is not None: 190 new_daemon.write_pickle() 191 target = new_daemon.target 192 target_args = new_daemon.target_args 193 target_kw = new_daemon.target_kw 194 label = new_daemon.label 195 self._properties = new_daemon.properties 196 else: 197 try: 198 self.properties_path.unlink() 199 except Exception: 200 pass 201 202 raise Exception( 203 f"Could not recover daemon '{self.daemon_id}' " 204 + "from its properties file." 205 ) 206 207 if 'target' not in self.__dict__: 208 if target is None: 209 error("Cannot create a Daemon without a target.") 210 self.target = target 211 212 self.pickle = pickle 213 214 ### NOTE: We have to check self.__dict__ in case we un-pickling. 215 if '_target_args' not in self.__dict__: 216 self._target_args = target_args 217 if '_target_kw' not in self.__dict__: 218 self._target_kw = target_kw 219 220 if 'label' not in self.__dict__: 221 if label is None: 222 label = ( 223 self.target.__name__ if '__name__' in self.target.__dir__() 224 else str(self.target) 225 ) 226 self.label = label 227 elif label is not None: 228 self.label = label 229 230 if 'daemon_id' not in self.__dict__: 231 self.daemon_id = get_new_daemon_name() 232 if '_properties' not in self.__dict__: 233 self._properties = properties 234 elif properties: 235 if self._properties is None: 236 self._properties = {} 237 self._properties.update(properties) 238 if self._properties is None: 239 self._properties = {} 240 241 self._properties.update({'label': self.label}) 242 if env: 243 self._properties.update({'env': env}) 244 245 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 246 _ = self.process 247 248 249 def _run_exit( 250 self, 251 keep_daemon_output: bool = True, 252 allow_dirty_run: bool = False, 253 ) -> Any: 254 """Run the daemon's target function. 255 NOTE: This WILL EXIT the parent process! 256 257 Parameters 258 ---------- 259 keep_daemon_output: bool, default True 260 If `False`, delete the daemon's output directory upon exiting. 261 262 allow_dirty_run, bool, default False: 263 If `True`, run the daemon, even if the `daemon_id` directory exists. 264 This option is dangerous because if the same `daemon_id` runs twice, 265 the last to finish will overwrite the output of the first. 266 267 Returns 268 ------- 269 Nothing — this will exit the parent process. 270 """ 271 import platform 272 import sys 273 import os 274 import traceback 275 from meerschaum.utils.warnings import warn 276 from meerschaum.config import get_config 277 daemon = attempt_import('daemon') 278 lines = get_config('jobs', 'terminal', 'lines') 279 columns = get_config('jobs', 'terminal', 'columns') 280 281 if platform.system() == 'Windows': 282 return False, "Windows is no longer supported." 283 284 self._setup(allow_dirty_run) 285 286 _daemons.append(self) 287 288 logs_cf = self.properties.get('logs', {}) 289 log_refresh_seconds = logs_cf.get('refresh_files_seconds', None) 290 if log_refresh_seconds is None: 291 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 292 write_timestamps = logs_cf.get('write_timestamps', None) 293 if write_timestamps is None: 294 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 295 296 self._log_refresh_timer = RepeatTimer( 297 log_refresh_seconds, 298 partial(self.rotating_log.refresh_files, start_interception=write_timestamps), 299 ) 300 301 capture_stdin = logs_cf.get('stdin', True) 302 cwd = self.properties.get('cwd', os.getcwd()) 303 304 ### NOTE: The SIGINT handler has been removed so that child processes may handle 305 ### KeyboardInterrupts themselves. 306 ### The previous aggressive approach was redundant because of the SIGTERM handler. 307 self._daemon_context = daemon.DaemonContext( 308 pidfile=self.pid_lock, 309 stdout=self.rotating_log, 310 stderr=self.rotating_log, 311 stdin=(self.stdin_file if capture_stdin else None), 312 working_directory=cwd, 313 detach_process=True, 314 files_preserve=list(self.rotating_log.subfile_objects.values()), 315 signal_map={ 316 signal.SIGTERM: self._handle_sigterm, 317 }, 318 ) 319 320 if capture_stdin and sys.stdin is None: 321 raise OSError("Cannot daemonize without stdin.") 322 323 try: 324 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 325 with self._daemon_context: 326 if capture_stdin: 327 sys.stdin = self.stdin_file 328 _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None) 329 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 330 os.environ['PYTHONUNBUFFERED'] = '1' 331 332 ### Allow the user to override environment variables. 333 env = self.properties.get('env', {}) 334 if env and isinstance(env, dict): 335 os.environ.update({str(k): str(v) for k, v in env.items()}) 336 337 self.rotating_log.refresh_files(start_interception=True) 338 result = None 339 try: 340 with open(self.pid_path, 'w+', encoding='utf-8') as f: 341 f.write(str(os.getpid())) 342 343 ### NOTE: The timer fails to start for remote actions to localhost. 344 try: 345 if not self._log_refresh_timer.is_running(): 346 self._log_refresh_timer.start() 347 except Exception: 348 pass 349 350 self.properties['result'] = None 351 self._capture_process_timestamp('began') 352 result = self.target(*self.target_args, **self.target_kw) 353 self.properties['result'] = result 354 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 355 result = False, traceback.format_exc() 356 except Exception as e: 357 warn( 358 f"Exception in daemon target function: {traceback.format_exc()}", 359 ) 360 result = False, str(e) 361 finally: 362 _results[self.daemon_id] = result 363 self.properties['result'] = result 364 365 if keep_daemon_output: 366 self._capture_process_timestamp('ended') 367 else: 368 self.cleanup() 369 370 self._log_refresh_timer.cancel() 371 try: 372 if self.pid is None and self.pid_path.exists(): 373 self.pid_path.unlink() 374 except Exception: 375 pass 376 377 if is_success_tuple(result): 378 try: 379 mrsm.pprint(result) 380 except BrokenPipeError: 381 pass 382 383 except Exception: 384 daemon_error = traceback.format_exc() 385 from meerschaum.config.paths import DAEMON_ERROR_LOG_PATH 386 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 387 f.write( 388 f"Error in Daemon '{self}':\n\n" 389 f"{sys.stdin=}\n" 390 f"{self.stdin_file_path=}\n" 391 f"{self.stdin_file_path.exists()=}\n\n" 392 f"{daemon_error}\n\n" 393 ) 394 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 395 396 def _capture_process_timestamp( 397 self, 398 process_key: str, 399 write_properties: bool = True, 400 ) -> None: 401 """ 402 Record the current timestamp to the parameters `process:<process_key>`. 403 404 Parameters 405 ---------- 406 process_key: str 407 Under which key to store the timestamp. 408 409 write_properties: bool, default True 410 If `True` persist the properties to disk immediately after capturing the timestamp. 411 """ 412 if 'process' not in self.properties: 413 self.properties['process'] = {} 414 415 if process_key not in ('began', 'ended', 'paused', 'stopped'): 416 raise ValueError(f"Invalid key '{process_key}'.") 417 418 self.properties['process'][process_key] = ( 419 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 420 ) 421 if write_properties: 422 self.write_properties() 423 424 def run( 425 self, 426 keep_daemon_output: bool = True, 427 allow_dirty_run: bool = False, 428 wait: bool = False, 429 timeout: Union[int, float] = 4, 430 debug: bool = False, 431 ) -> SuccessTuple: 432 """Run the daemon as a child process and continue executing the parent. 433 434 Parameters 435 ---------- 436 keep_daemon_output: bool, default True 437 If `False`, delete the daemon's output directory upon exiting. 438 439 allow_dirty_run: bool, default False 440 If `True`, run the daemon, even if the `daemon_id` directory exists. 441 This option is dangerous because if the same `daemon_id` runs concurrently, 442 the last to finish will overwrite the output of the first. 443 444 wait: bool, default True 445 If `True`, block until `Daemon.status` is running (or the timeout expires). 446 447 timeout: Union[int, float], default 4 448 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 449 450 Returns 451 ------- 452 A SuccessTuple indicating success. 453 454 """ 455 import platform 456 if platform.system() == 'Windows': 457 return False, "Cannot run background jobs on Windows." 458 459 ### The daemon might exist and be paused. 460 if self.status == 'paused': 461 return self.resume() 462 463 self._remove_stop_file() 464 if self.status == 'running': 465 return True, f"Daemon '{self}' is already running." 466 467 self.mkdir_if_not_exists(allow_dirty_run) 468 _write_pickle_success_tuple = self.write_pickle() 469 if not _write_pickle_success_tuple[0]: 470 return _write_pickle_success_tuple 471 472 _launch_daemon_code = ( 473 "from meerschaum.utils.daemon import Daemon, _daemons; " 474 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 475 f"_daemons['{self.daemon_id}'] = daemon; " 476 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 477 "allow_dirty_run=True)" 478 ) 479 env = dict(os.environ) 480 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 481 msg = ( 482 "Success" 483 if _launch_success_bool 484 else f"Failed to start daemon '{self.daemon_id}'." 485 ) 486 if not wait or not _launch_success_bool: 487 return _launch_success_bool, msg 488 489 timeout = self.get_timeout_seconds(timeout) 490 check_timeout_interval = self.get_check_timeout_interval_seconds() 491 492 if not timeout: 493 success = self.status == 'running' 494 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 495 if success: 496 self._capture_process_timestamp('began') 497 return success, msg 498 499 begin = time.perf_counter() 500 while (time.perf_counter() - begin) < timeout: 501 if self.status == 'running': 502 self._capture_process_timestamp('began') 503 return True, "Success" 504 time.sleep(check_timeout_interval) 505 506 return False, ( 507 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 508 + ('s' if timeout != 1 else '') + '.' 509 ) 510 511 512 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 513 """ 514 Forcibly terminate a running daemon. 515 Sends a SIGTERM signal to the process. 516 517 Parameters 518 ---------- 519 timeout: Optional[int], default 3 520 How many seconds to wait for the process to terminate. 521 522 Returns 523 ------- 524 A SuccessTuple indicating success. 525 """ 526 if self.status != 'paused': 527 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 528 if success: 529 self._write_stop_file('kill') 530 self.stdin_file.close() 531 self._remove_blocking_stdin_file() 532 return success, msg 533 534 if self.status == 'stopped': 535 self._write_stop_file('kill') 536 self.stdin_file.close() 537 self._remove_blocking_stdin_file() 538 return True, "Process has already stopped." 539 540 psutil = attempt_import('psutil') 541 process = self.process 542 try: 543 process.terminate() 544 process.kill() 545 process.wait(timeout=timeout) 546 except Exception as e: 547 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 548 549 try: 550 if process.status(): 551 return False, "Failed to stop daemon '{self}' ({process})." 552 except psutil.NoSuchProcess: 553 pass 554 555 if self.pid_path.exists(): 556 try: 557 self.pid_path.unlink() 558 except Exception: 559 pass 560 561 self._write_stop_file('kill') 562 self.stdin_file.close() 563 self._remove_blocking_stdin_file() 564 return True, "Success" 565 566 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 567 """Gracefully quit a running daemon.""" 568 if self.status == 'paused': 569 return self.kill(timeout) 570 571 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 572 if signal_success: 573 self._write_stop_file('quit') 574 self.stdin_file.close() 575 self._remove_blocking_stdin_file() 576 return signal_success, signal_msg 577 578 def pause( 579 self, 580 timeout: Union[int, float, None] = None, 581 check_timeout_interval: Union[float, int, None] = None, 582 ) -> SuccessTuple: 583 """ 584 Pause the daemon if it is running. 585 586 Parameters 587 ---------- 588 timeout: Union[float, int, None], default None 589 The maximum number of seconds to wait for a process to suspend. 590 591 check_timeout_interval: Union[float, int, None], default None 592 The number of seconds to wait between checking if the process is still running. 593 594 Returns 595 ------- 596 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 597 """ 598 self._remove_blocking_stdin_file() 599 600 if self.process is None: 601 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 602 603 if self.status == 'paused': 604 return True, f"Daemon '{self.daemon_id}' is already paused." 605 606 self._write_stop_file('pause') 607 self.stdin_file.close() 608 self._remove_blocking_stdin_file() 609 try: 610 self.process.suspend() 611 except Exception as e: 612 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 613 614 timeout = self.get_timeout_seconds(timeout) 615 check_timeout_interval = self.get_check_timeout_interval_seconds( 616 check_timeout_interval 617 ) 618 619 psutil = attempt_import('psutil') 620 621 if not timeout: 622 try: 623 success = self.process.status() == 'stopped' 624 except psutil.NoSuchProcess: 625 success = True 626 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 627 if success: 628 self._capture_process_timestamp('paused') 629 return success, msg 630 631 begin = time.perf_counter() 632 while (time.perf_counter() - begin) < timeout: 633 try: 634 if self.process.status() == 'stopped': 635 self._capture_process_timestamp('paused') 636 return True, "Success" 637 except psutil.NoSuchProcess as e: 638 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 639 time.sleep(check_timeout_interval) 640 641 return False, ( 642 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 643 + ('s' if timeout != 1 else '') + '.' 644 ) 645 646 def resume( 647 self, 648 timeout: Union[int, float, None] = None, 649 check_timeout_interval: Union[float, int, None] = None, 650 ) -> SuccessTuple: 651 """ 652 Resume the daemon if it is paused. 653 654 Parameters 655 ---------- 656 timeout: Union[float, int, None], default None 657 The maximum number of seconds to wait for a process to resume. 658 659 check_timeout_interval: Union[float, int, None], default None 660 The number of seconds to wait between checking if the process is still stopped. 661 662 Returns 663 ------- 664 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 665 """ 666 if self.status == 'running': 667 return True, f"Daemon '{self.daemon_id}' is already running." 668 669 if self.status == 'stopped': 670 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 671 672 self._remove_stop_file() 673 try: 674 if self.process is None: 675 return False, f"Cannot resume daemon '{self.daemon_id}'." 676 677 self.process.resume() 678 except Exception as e: 679 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 680 681 timeout = self.get_timeout_seconds(timeout) 682 check_timeout_interval = self.get_check_timeout_interval_seconds( 683 check_timeout_interval 684 ) 685 686 if not timeout: 687 success = self.status == 'running' 688 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 689 if success: 690 self._capture_process_timestamp('began') 691 return success, msg 692 693 begin = time.perf_counter() 694 while (time.perf_counter() - begin) < timeout: 695 if self.status == 'running': 696 self._capture_process_timestamp('began') 697 return True, "Success" 698 time.sleep(check_timeout_interval) 699 700 return False, ( 701 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 702 + ('s' if timeout != 1 else '') + '.' 703 ) 704 705 def _write_stop_file(self, action: str) -> SuccessTuple: 706 """Write the stop file timestamp and action.""" 707 if action not in ('quit', 'kill', 'pause'): 708 return False, f"Unsupported action '{action}'." 709 710 if not self.stop_path.parent.exists(): 711 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 712 713 with open(self.stop_path, 'w+', encoding='utf-8') as f: 714 json.dump( 715 { 716 'stop_time': datetime.now(timezone.utc).isoformat(), 717 'action': action, 718 }, 719 f 720 ) 721 722 return True, "Success" 723 724 def _remove_stop_file(self) -> SuccessTuple: 725 """Remove the stop file""" 726 if not self.stop_path.exists(): 727 return True, "Stop file does not exist." 728 729 try: 730 self.stop_path.unlink() 731 except Exception as e: 732 return False, f"Failed to remove stop file:\n{e}" 733 734 return True, "Success" 735 736 def _read_stop_file(self) -> Dict[str, Any]: 737 """ 738 Read the stop file if it exists. 739 """ 740 if not self.stop_path.exists(): 741 return {} 742 743 try: 744 with open(self.stop_path, 'r', encoding='utf-8') as f: 745 data = json.load(f) 746 return data 747 except Exception: 748 return {} 749 750 def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple: 751 """ 752 Remove the blocking STDIN file if it exists. 753 """ 754 try: 755 if self.blocking_stdin_file_path.exists(): 756 self.blocking_stdin_file_path.unlink() 757 except Exception as e: 758 return False, str(e) 759 760 return True, "Success" 761 762 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 763 """ 764 Handle `SIGTERM` within the `Daemon` context. 765 This method is injected into the `DaemonContext`. 766 """ 767 from meerschaum.utils.process import signal_handler 768 signal_handler(signal_number, stack_frame) 769 770 timer = self.__dict__.get('_log_refresh_timer', None) 771 if timer is not None: 772 timer.cancel() 773 774 daemon_context = self.__dict__.get('_daemon_context', None) 775 if daemon_context is not None: 776 daemon_context.close() 777 778 _close_pools() 779 raise SystemExit(0) 780 781 def _send_signal( 782 self, 783 signal_to_send, 784 timeout: Union[float, int, None] = None, 785 check_timeout_interval: Union[float, int, None] = None, 786 ) -> SuccessTuple: 787 """Send a signal to the daemon process. 788 789 Parameters 790 ---------- 791 signal_to_send: 792 The signal the send to the daemon, e.g. `signals.SIGINT`. 793 794 timeout: Union[float, int, None], default None 795 The maximum number of seconds to wait for a process to terminate. 796 797 check_timeout_interval: Union[float, int, None], default None 798 The number of seconds to wait between checking if the process is still running. 799 800 Returns 801 ------- 802 A SuccessTuple indicating success. 803 """ 804 try: 805 pid = self.pid 806 if pid is None: 807 return ( 808 False, 809 f"Daemon '{self.daemon_id}' is not running, " 810 + f"cannot send signal '{signal_to_send}'." 811 ) 812 813 os.kill(pid, signal_to_send) 814 except Exception: 815 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 816 817 timeout = self.get_timeout_seconds(timeout) 818 check_timeout_interval = self.get_check_timeout_interval_seconds( 819 check_timeout_interval 820 ) 821 822 if not timeout: 823 return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'." 824 825 begin = time.perf_counter() 826 while (time.perf_counter() - begin) < timeout: 827 if not self.status == 'running': 828 return True, "Success" 829 time.sleep(check_timeout_interval) 830 831 return False, ( 832 f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second" 833 + ('s' if timeout != 1 else '') + '.' 834 ) 835 836 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 837 """Create the Daemon's directory. 838 If `allow_dirty_run` is `False` and the directory already exists, 839 raise a `FileExistsError`. 840 """ 841 try: 842 self.path.mkdir(parents=True, exist_ok=True) 843 _already_exists = any(os.scandir(self.path)) 844 except FileExistsError: 845 _already_exists = True 846 847 if _already_exists and not allow_dirty_run: 848 error( 849 f"Daemon '{self.daemon_id}' already exists. " + 850 "To allow this daemon to run, do one of the following:\n" 851 + " - Execute `daemon.cleanup()`.\n" 852 + f" - Delete the directory '{self.path}'.\n" 853 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 854 FileExistsError, 855 ) 856 857 @property 858 def process(self) -> Union['psutil.Process', None]: 859 """ 860 Return the psutil process for the Daemon. 861 """ 862 psutil = attempt_import('psutil') 863 pid = self.pid 864 if pid is None: 865 return None 866 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 867 try: 868 self._process = psutil.Process(int(pid)) 869 process_exists = True 870 except Exception: 871 process_exists = False 872 if not process_exists: 873 _ = self.__dict__.pop('_process', None) 874 try: 875 if self.pid_path.exists(): 876 self.pid_path.unlink() 877 except Exception: 878 pass 879 return None 880 return self._process 881 882 @property 883 def status(self) -> str: 884 """ 885 Return the running status of this Daemon. 886 """ 887 if self.process is None: 888 return 'stopped' 889 890 psutil = attempt_import('psutil', lazy=False) 891 try: 892 if self.process.status() == 'stopped': 893 return 'paused' 894 if self.process.status() == 'zombie': 895 raise psutil.NoSuchProcess(self.process.pid) 896 except (psutil.NoSuchProcess, AttributeError): 897 if self.pid_path.exists(): 898 try: 899 self.pid_path.unlink() 900 except Exception: 901 pass 902 return 'stopped' 903 904 return 'running' 905 906 @classmethod 907 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 908 """ 909 Return a Daemon's path from its `daemon_id`. 910 """ 911 from meerschaum.config.paths import DAEMON_RESOURCES_PATH 912 return DAEMON_RESOURCES_PATH / daemon_id 913 914 @property 915 def path(self) -> pathlib.Path: 916 """ 917 Return the path for this Daemon's directory. 918 """ 919 return self._get_path_from_daemon_id(self.daemon_id) 920 921 @classmethod 922 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 923 """ 924 Return the `properties.json` path for a given `daemon_id`. 925 """ 926 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 927 928 @property 929 def properties_path(self) -> pathlib.Path: 930 """ 931 Return the `propterties.json` path for this Daemon. 932 """ 933 return self._get_properties_path_from_daemon_id(self.daemon_id) 934 935 @property 936 def stop_path(self) -> pathlib.Path: 937 """ 938 Return the path for the stop file (created when manually stopped). 939 """ 940 return self.path / '.stop.json' 941 942 @property 943 def log_path(self) -> pathlib.Path: 944 """ 945 Return the log path. 946 """ 947 logs_cf = self.properties.get('logs', None) or {} 948 if 'path' not in logs_cf: 949 from meerschaum.config.paths import LOGS_RESOURCES_PATH 950 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 951 952 return pathlib.Path(logs_cf['path']) 953 954 @property 955 def stdin_file_path(self) -> pathlib.Path: 956 """ 957 Return the stdin file path. 958 """ 959 return self.path / 'input.stdin' 960 961 @property 962 def blocking_stdin_file_path(self) -> pathlib.Path: 963 """ 964 Return the stdin file path. 965 """ 966 if '_blocking_stdin_file_path' in self.__dict__: 967 return self._blocking_stdin_file_path 968 969 return self.path / 'input.stdin.block' 970 971 @property 972 def prompt_kwargs_file_path(self) -> pathlib.Path: 973 """ 974 Return the file path to the kwargs for the invoking `prompt()`. 975 """ 976 return self.path / 'prompt_kwargs.json' 977 978 @property 979 def log_offset_path(self) -> pathlib.Path: 980 """ 981 Return the log offset file path. 982 """ 983 from meerschaum.config.paths import LOGS_RESOURCES_PATH 984 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 985 986 @property 987 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 988 """ 989 Return the process lock context manager. 990 """ 991 if '_log_offset_lock' in self.__dict__: 992 return self._log_offset_lock 993 994 fasteners = attempt_import('fasteners') 995 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 996 return self._log_offset_lock 997 998 @property 999 def rotating_log(self) -> RotatingFile: 1000 """ 1001 The rotating log file for the daemon's output. 1002 """ 1003 if '_rotating_log' in self.__dict__: 1004 return self._rotating_log 1005 1006 logs_cf = self.properties.get('logs', None) or {} 1007 write_timestamps = logs_cf.get('write_timestamps', None) 1008 if write_timestamps is None: 1009 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1010 1011 timestamp_format = logs_cf.get('timestamp_format', None) 1012 if timestamp_format is None: 1013 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1014 1015 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1016 if num_files_to_keep is None: 1017 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1018 1019 max_file_size = logs_cf.get('max_file_size', None) 1020 if max_file_size is None: 1021 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1022 1023 redirect_streams = logs_cf.get('redirect_streams', True) 1024 1025 self._rotating_log = RotatingFile( 1026 self.log_path, 1027 redirect_streams=redirect_streams, 1028 write_timestamps=write_timestamps, 1029 timestamp_format=timestamp_format, 1030 num_files_to_keep=num_files_to_keep, 1031 max_file_size=max_file_size, 1032 ) 1033 return self._rotating_log 1034 1035 @property 1036 def stdin_file(self): 1037 """ 1038 Return the file handler for the stdin file. 1039 """ 1040 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1041 return _stdin_file 1042 1043 self._stdin_file = StdinFile( 1044 self.stdin_file_path, 1045 lock_file_path=self.blocking_stdin_file_path, 1046 ) 1047 return self._stdin_file 1048 1049 @property 1050 def log_text(self) -> Union[str, None]: 1051 """ 1052 Read the log files and return their contents. 1053 Returns `None` if the log file does not exist. 1054 """ 1055 logs_cf = self.properties.get('logs', None) or {} 1056 write_timestamps = logs_cf.get('write_timestamps', None) 1057 if write_timestamps is None: 1058 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1059 1060 timestamp_format = logs_cf.get('timestamp_format', None) 1061 if timestamp_format is None: 1062 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1063 1064 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1065 if num_files_to_keep is None: 1066 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1067 1068 max_file_size = logs_cf.get('max_file_size', None) 1069 if max_file_size is None: 1070 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1071 1072 new_rotating_log = RotatingFile( 1073 self.rotating_log.file_path, 1074 num_files_to_keep=num_files_to_keep, 1075 max_file_size=max_file_size, 1076 write_timestamps=write_timestamps, 1077 timestamp_format=timestamp_format, 1078 ) 1079 return new_rotating_log.read() 1080 1081 def readlines(self) -> List[str]: 1082 """ 1083 Read the next log lines, persisting the cursor for later use. 1084 Note this will alter the cursor of `self.rotating_log`. 1085 """ 1086 self.rotating_log._cursor = self._read_log_offset() 1087 lines = self.rotating_log.readlines() 1088 self._write_log_offset() 1089 return lines 1090 1091 def _read_log_offset(self) -> Tuple[int, int]: 1092 """ 1093 Return the current log offset cursor. 1094 1095 Returns 1096 ------- 1097 A tuple of the form (`subfile_index`, `position`). 1098 """ 1099 if not self.log_offset_path.exists(): 1100 return 0, 0 1101 1102 try: 1103 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 1104 cursor_text = f.read() 1105 cursor_parts = cursor_text.split(' ') 1106 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 1107 return subfile_index, subfile_position 1108 except Exception as e: 1109 warn(f"Failed to read cursor:\n{e}") 1110 return 0, 0 1111 1112 def _write_log_offset(self) -> None: 1113 """ 1114 Write the current log offset file. 1115 """ 1116 with self.log_offset_lock: 1117 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 1118 subfile_index = self.rotating_log._cursor[0] 1119 subfile_position = self.rotating_log._cursor[1] 1120 f.write(f"{subfile_index} {subfile_position}") 1121 1122 @property 1123 def pid(self) -> Union[int, None]: 1124 """ 1125 Read the PID file and return its contents. 1126 Returns `None` if the PID file does not exist. 1127 """ 1128 if not self.pid_path.exists(): 1129 return None 1130 try: 1131 with open(self.pid_path, 'r', encoding='utf-8') as f: 1132 text = f.read() 1133 if len(text) == 0: 1134 return None 1135 pid = int(text.rstrip()) 1136 except Exception as e: 1137 warn(e) 1138 text = None 1139 pid = None 1140 return pid 1141 1142 @property 1143 def pid_path(self) -> pathlib.Path: 1144 """ 1145 Return the path to a file containing the PID for this Daemon. 1146 """ 1147 return self.path / 'process.pid' 1148 1149 @property 1150 def pid_lock(self) -> 'fasteners.InterProcessLock': 1151 """ 1152 Return the process lock context manager. 1153 """ 1154 if '_pid_lock' in self.__dict__: 1155 return self._pid_lock 1156 1157 fasteners = attempt_import('fasteners') 1158 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1159 return self._pid_lock 1160 1161 @property 1162 def pickle_path(self) -> pathlib.Path: 1163 """ 1164 Return the path for the pickle file. 1165 """ 1166 return self.path / 'pickle.pkl' 1167 1168 def read_properties(self) -> Optional[Dict[str, Any]]: 1169 """Read the properties JSON file and return the dictionary.""" 1170 if not self.properties_path.exists(): 1171 return None 1172 try: 1173 with open(self.properties_path, 'r', encoding='utf-8') as file: 1174 properties = json.load(file) 1175 except Exception: 1176 properties = {} 1177 1178 return properties or {} 1179 1180 def read_pickle(self) -> Daemon: 1181 """Read a Daemon's pickle file and return the `Daemon`.""" 1182 import pickle 1183 import traceback 1184 if not self.pickle_path.exists(): 1185 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1186 1187 if self.pickle_path.stat().st_size == 0: 1188 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1189 1190 try: 1191 with open(self.pickle_path, 'rb') as pickle_file: 1192 daemon = pickle.load(pickle_file) 1193 success, msg = True, 'Success' 1194 except Exception as e: 1195 success, msg = False, str(e) 1196 daemon = None 1197 traceback.print_exception(type(e), e, e.__traceback__) 1198 if not success: 1199 error(msg) 1200 return daemon 1201 1202 @property 1203 def properties(self) -> Dict[str, Any]: 1204 """ 1205 Return the contents of the properties JSON file. 1206 """ 1207 try: 1208 _file_properties = self.read_properties() or {} 1209 except Exception: 1210 traceback.print_exc() 1211 _file_properties = {} 1212 1213 if not self._properties: 1214 self._properties = _file_properties 1215 1216 if self._properties is None: 1217 self._properties = {} 1218 1219 if ( 1220 self._properties.get('result', None) is None 1221 and _file_properties.get('result', None) is not None 1222 ): 1223 _ = self._properties.pop('result', None) 1224 1225 if _file_properties is not None: 1226 self._properties = apply_patch_to_config( 1227 _file_properties, 1228 self._properties, 1229 ) 1230 1231 return self._properties 1232 1233 @property 1234 def hidden(self) -> bool: 1235 """ 1236 Return a bool indicating whether this Daemon should be displayed. 1237 """ 1238 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1239 1240 def write_properties(self) -> SuccessTuple: 1241 """Write the properties dictionary to the properties JSON file 1242 (only if self.properties exists). 1243 """ 1244 from meerschaum.utils.misc import generate_password 1245 success, msg = ( 1246 False, 1247 f"No properties to write for daemon '{self.daemon_id}'." 1248 ) 1249 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1250 props = self.properties 1251 if props is not None: 1252 try: 1253 self.path.mkdir(parents=True, exist_ok=True) 1254 if self.properties_path.exists(): 1255 self.properties_path.rename(backup_path) 1256 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1257 json.dump(props, properties_file) 1258 success, msg = True, 'Success' 1259 except Exception as e: 1260 success, msg = False, str(e) 1261 1262 try: 1263 if backup_path.exists(): 1264 if not success: 1265 backup_path.rename(self.properties_path) 1266 else: 1267 backup_path.unlink() 1268 except Exception as e: 1269 success, msg = False, str(e) 1270 1271 return success, msg 1272 1273 def write_pickle(self) -> SuccessTuple: 1274 """Write the pickle file for the daemon.""" 1275 import pickle 1276 import traceback 1277 from meerschaum.utils.misc import generate_password 1278 1279 if not self.pickle: 1280 return True, "Success" 1281 1282 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1283 try: 1284 self.path.mkdir(parents=True, exist_ok=True) 1285 if self.pickle_path.exists(): 1286 self.pickle_path.rename(backup_path) 1287 with open(self.pickle_path, 'wb+') as pickle_file: 1288 pickle.dump(self, pickle_file) 1289 success, msg = True, "Success" 1290 except Exception as e: 1291 success, msg = False, str(e) 1292 traceback.print_exception(type(e), e, e.__traceback__) 1293 try: 1294 if backup_path.exists(): 1295 if not success: 1296 backup_path.rename(self.pickle_path) 1297 else: 1298 backup_path.unlink() 1299 except Exception as e: 1300 success, msg = False, str(e) 1301 return success, msg 1302 1303 1304 def _setup( 1305 self, 1306 allow_dirty_run: bool = False, 1307 ) -> None: 1308 """ 1309 Update properties before starting the Daemon. 1310 """ 1311 if self.properties is None: 1312 self._properties = {} 1313 1314 self._properties.update({ 1315 'target': { 1316 'name': self.target.__name__, 1317 'module': self.target.__module__, 1318 'args': self.target_args, 1319 'kw': self.target_kw, 1320 }, 1321 }) 1322 self.mkdir_if_not_exists(allow_dirty_run) 1323 _write_properties_success_tuple = self.write_properties() 1324 if not _write_properties_success_tuple[0]: 1325 error(_write_properties_success_tuple[1]) 1326 1327 _write_pickle_success_tuple = self.write_pickle() 1328 if not _write_pickle_success_tuple[0]: 1329 error(_write_pickle_success_tuple[1]) 1330 1331 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1332 """ 1333 Remove a daemon's directory after execution. 1334 1335 Parameters 1336 ---------- 1337 keep_logs: bool, default False 1338 If `True`, skip deleting the daemon's log files. 1339 1340 Returns 1341 ------- 1342 A `SuccessTuple` indicating success. 1343 """ 1344 if self.path.exists(): 1345 try: 1346 shutil.rmtree(self.path) 1347 except Exception as e: 1348 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1349 warn(msg) 1350 return False, msg 1351 if not keep_logs: 1352 self.rotating_log.delete() 1353 try: 1354 if self.log_offset_path.exists(): 1355 self.log_offset_path.unlink() 1356 except Exception as e: 1357 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1358 warn(msg) 1359 return False, msg 1360 return True, "Success" 1361 1362 1363 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1364 """ 1365 Return the timeout value to use. Use `--timeout-seconds` if provided, 1366 else the configured default (8). 1367 """ 1368 if isinstance(timeout, (int, float)): 1369 return timeout 1370 return get_config('jobs', 'timeout_seconds') 1371 1372 1373 def get_check_timeout_interval_seconds( 1374 self, 1375 check_timeout_interval: Union[int, float, None] = None, 1376 ) -> Union[int, float]: 1377 """ 1378 Return the interval value to check the status of timeouts. 1379 """ 1380 if isinstance(check_timeout_interval, (int, float)): 1381 return check_timeout_interval 1382 return get_config('jobs', 'check_timeout_interval_seconds') 1383 1384 @property 1385 def target_args(self) -> Union[Tuple[Any], None]: 1386 """ 1387 Return the positional arguments to pass to the target function. 1388 """ 1389 target_args = ( 1390 self.__dict__.get('_target_args', None) 1391 or self.properties.get('target', {}).get('args', None) 1392 ) 1393 if target_args is None: 1394 return tuple([]) 1395 1396 return tuple(target_args) 1397 1398 @property 1399 def target_kw(self) -> Union[Dict[str, Any], None]: 1400 """ 1401 Return the keyword arguments to pass to the target function. 1402 """ 1403 target_kw = ( 1404 self.__dict__.get('_target_kw', None) 1405 or self.properties.get('target', {}).get('kw', None) 1406 ) 1407 if target_kw is None: 1408 return {} 1409 1410 return {key: val for key, val in target_kw.items()} 1411 1412 def __getstate__(self): 1413 """ 1414 Pickle this Daemon. 1415 """ 1416 dill = attempt_import('dill') 1417 return { 1418 'target': dill.dumps(self.target), 1419 'target_args': self.target_args, 1420 'target_kw': self.target_kw, 1421 'daemon_id': self.daemon_id, 1422 'label': self.label, 1423 'properties': self.properties, 1424 } 1425 1426 def __setstate__(self, _state: Dict[str, Any]): 1427 """ 1428 Restore this Daemon from a pickled state. 1429 If the properties file exists, skip the old pickled version. 1430 """ 1431 dill = attempt_import('dill') 1432 _state['target'] = dill.loads(_state['target']) 1433 self._pickle = True 1434 daemon_id = _state.get('daemon_id', None) 1435 if not daemon_id: 1436 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1437 1438 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1439 ignore_properties = properties_path.exists() 1440 if ignore_properties: 1441 _state = { 1442 key: val 1443 for key, val in _state.items() 1444 if key != 'properties' 1445 } 1446 self.__init__(**_state) 1447 1448 1449 def __repr__(self): 1450 return str(self) 1451 1452 def __str__(self): 1453 return self.daemon_id 1454 1455 def __eq__(self, other): 1456 if not isinstance(other, Daemon): 1457 return False 1458 return self.daemon_id == other.daemon_id 1459 1460 def __hash__(self): 1461 return hash(self.daemon_id)
42class Daemon: 43 """ 44 Daemonize Python functions into background processes. 45 46 Examples 47 -------- 48 >>> import meerschaum as mrsm 49 >>> from meerschaum.utils.daemons import Daemon 50 >>> daemon = Daemon(print, ('hi',)) 51 >>> success, msg = daemon.run() 52 >>> print(daemon.log_text) 53 54 2024-07-29 18:03 | hi 55 2024-07-29 18:03 | 56 >>> daemon.run(allow_dirty_run=True) 57 >>> print(daemon.log_text) 58 59 2024-07-29 18:03 | hi 60 2024-07-29 18:03 | 61 2024-07-29 18:05 | hi 62 2024-07-29 18:05 | 63 >>> mrsm.pprint(daemon.properties) 64 { 65 'label': 'print', 66 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 67 'result': None, 68 'process': {'ended': '2024-07-29T18:03:33.752806'} 69 } 70 71 """ 72 73 def __new__( 74 cls, 75 *args, 76 daemon_id: Optional[str] = None, 77 **kw 78 ): 79 """ 80 If a daemon_id is provided and already exists, read from its pickle file. 81 """ 82 instance = super(Daemon, cls).__new__(cls) 83 if daemon_id is not None: 84 instance.daemon_id = daemon_id 85 if instance.pickle_path.exists(): 86 instance = instance.read_pickle() 87 return instance 88 89 @classmethod 90 def from_properties_file(cls, daemon_id: str) -> Daemon: 91 """ 92 Return a Daemon from a properties dictionary. 93 """ 94 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 95 if not properties_path.exists(): 96 raise OSError(f"Properties file '{properties_path}' does not exist.") 97 98 try: 99 with open(properties_path, 'r', encoding='utf-8') as f: 100 properties = json.load(f) 101 except Exception: 102 properties = {} 103 104 if not properties: 105 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 106 107 daemon_id = properties_path.parent.name 108 target_cf = properties.get('target', {}) 109 target_module_name = target_cf.get('module', None) 110 target_function_name = target_cf.get('name', None) 111 target_args = target_cf.get('args', None) 112 target_kw = target_cf.get('kw', None) 113 label = properties.get('label', None) 114 115 if None in [ 116 target_module_name, 117 target_function_name, 118 target_args, 119 target_kw, 120 ]: 121 raise ValueError("Missing target function information.") 122 123 target_module = importlib.import_module(target_module_name) 124 target_function = getattr(target_module, target_function_name) 125 126 return Daemon( 127 daemon_id=daemon_id, 128 target=target_function, 129 target_args=target_args, 130 target_kw=target_kw, 131 properties=properties, 132 label=label, 133 ) 134 135 136 def __init__( 137 self, 138 target: Optional[Callable[[Any], Any]] = None, 139 target_args: Union[List[Any], Tuple[Any], None] = None, 140 target_kw: Optional[Dict[str, Any]] = None, 141 env: Optional[Dict[str, str]] = None, 142 daemon_id: Optional[str] = None, 143 label: Optional[str] = None, 144 properties: Optional[Dict[str, Any]] = None, 145 pickle: bool = True, 146 ): 147 """ 148 Parameters 149 ---------- 150 target: Optional[Callable[[Any], Any]], default None, 151 The function to execute in a child process. 152 153 target_args: Union[List[Any], Tuple[Any], None], default None 154 Positional arguments to pass to the target function. 155 156 target_kw: Optional[Dict[str, Any]], default None 157 Keyword arguments to pass to the target function. 158 159 env: Optional[Dict[str, str]], default None 160 If provided, set these environment variables in the daemon process. 161 162 daemon_id: Optional[str], default None 163 Build a `Daemon` from an existing `daemon_id`. 164 If `daemon_id` is provided, other arguments are ignored and are derived 165 from the existing pickled `Daemon`. 166 167 label: Optional[str], default None 168 Label string to help identifiy a daemon. 169 If `None`, use the function name instead. 170 171 properties: Optional[Dict[str, Any]], default None 172 Override reading from the properties JSON by providing an existing dictionary. 173 """ 174 _pickle = self.__dict__.get('_pickle', False) 175 if daemon_id is not None: 176 self.daemon_id = daemon_id 177 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 178 179 if not self.properties_path.exists(): 180 raise Exception( 181 f"Daemon '{self.daemon_id}' does not exist. " 182 + "Pass a target to create a new Daemon." 183 ) 184 185 try: 186 new_daemon = self.from_properties_file(daemon_id) 187 except Exception: 188 new_daemon = None 189 190 if new_daemon is not None: 191 new_daemon.write_pickle() 192 target = new_daemon.target 193 target_args = new_daemon.target_args 194 target_kw = new_daemon.target_kw 195 label = new_daemon.label 196 self._properties = new_daemon.properties 197 else: 198 try: 199 self.properties_path.unlink() 200 except Exception: 201 pass 202 203 raise Exception( 204 f"Could not recover daemon '{self.daemon_id}' " 205 + "from its properties file." 206 ) 207 208 if 'target' not in self.__dict__: 209 if target is None: 210 error("Cannot create a Daemon without a target.") 211 self.target = target 212 213 self.pickle = pickle 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 elif label is not None: 229 self.label = label 230 231 if 'daemon_id' not in self.__dict__: 232 self.daemon_id = get_new_daemon_name() 233 if '_properties' not in self.__dict__: 234 self._properties = properties 235 elif properties: 236 if self._properties is None: 237 self._properties = {} 238 self._properties.update(properties) 239 if self._properties is None: 240 self._properties = {} 241 242 self._properties.update({'label': self.label}) 243 if env: 244 self._properties.update({'env': env}) 245 246 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 247 _ = self.process 248 249 250 def _run_exit( 251 self, 252 keep_daemon_output: bool = True, 253 allow_dirty_run: bool = False, 254 ) -> Any: 255 """Run the daemon's target function. 256 NOTE: This WILL EXIT the parent process! 257 258 Parameters 259 ---------- 260 keep_daemon_output: bool, default True 261 If `False`, delete the daemon's output directory upon exiting. 262 263 allow_dirty_run, bool, default False: 264 If `True`, run the daemon, even if the `daemon_id` directory exists. 265 This option is dangerous because if the same `daemon_id` runs twice, 266 the last to finish will overwrite the output of the first. 267 268 Returns 269 ------- 270 Nothing — this will exit the parent process. 271 """ 272 import platform 273 import sys 274 import os 275 import traceback 276 from meerschaum.utils.warnings import warn 277 from meerschaum.config import get_config 278 daemon = attempt_import('daemon') 279 lines = get_config('jobs', 'terminal', 'lines') 280 columns = get_config('jobs', 'terminal', 'columns') 281 282 if platform.system() == 'Windows': 283 return False, "Windows is no longer supported." 284 285 self._setup(allow_dirty_run) 286 287 _daemons.append(self) 288 289 logs_cf = self.properties.get('logs', {}) 290 log_refresh_seconds = logs_cf.get('refresh_files_seconds', None) 291 if log_refresh_seconds is None: 292 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 293 write_timestamps = logs_cf.get('write_timestamps', None) 294 if write_timestamps is None: 295 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 296 297 self._log_refresh_timer = RepeatTimer( 298 log_refresh_seconds, 299 partial(self.rotating_log.refresh_files, start_interception=write_timestamps), 300 ) 301 302 capture_stdin = logs_cf.get('stdin', True) 303 cwd = self.properties.get('cwd', os.getcwd()) 304 305 ### NOTE: The SIGINT handler has been removed so that child processes may handle 306 ### KeyboardInterrupts themselves. 307 ### The previous aggressive approach was redundant because of the SIGTERM handler. 308 self._daemon_context = daemon.DaemonContext( 309 pidfile=self.pid_lock, 310 stdout=self.rotating_log, 311 stderr=self.rotating_log, 312 stdin=(self.stdin_file if capture_stdin else None), 313 working_directory=cwd, 314 detach_process=True, 315 files_preserve=list(self.rotating_log.subfile_objects.values()), 316 signal_map={ 317 signal.SIGTERM: self._handle_sigterm, 318 }, 319 ) 320 321 if capture_stdin and sys.stdin is None: 322 raise OSError("Cannot daemonize without stdin.") 323 324 try: 325 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 326 with self._daemon_context: 327 if capture_stdin: 328 sys.stdin = self.stdin_file 329 _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None) 330 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 331 os.environ['PYTHONUNBUFFERED'] = '1' 332 333 ### Allow the user to override environment variables. 334 env = self.properties.get('env', {}) 335 if env and isinstance(env, dict): 336 os.environ.update({str(k): str(v) for k, v in env.items()}) 337 338 self.rotating_log.refresh_files(start_interception=True) 339 result = None 340 try: 341 with open(self.pid_path, 'w+', encoding='utf-8') as f: 342 f.write(str(os.getpid())) 343 344 ### NOTE: The timer fails to start for remote actions to localhost. 345 try: 346 if not self._log_refresh_timer.is_running(): 347 self._log_refresh_timer.start() 348 except Exception: 349 pass 350 351 self.properties['result'] = None 352 self._capture_process_timestamp('began') 353 result = self.target(*self.target_args, **self.target_kw) 354 self.properties['result'] = result 355 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 356 result = False, traceback.format_exc() 357 except Exception as e: 358 warn( 359 f"Exception in daemon target function: {traceback.format_exc()}", 360 ) 361 result = False, str(e) 362 finally: 363 _results[self.daemon_id] = result 364 self.properties['result'] = result 365 366 if keep_daemon_output: 367 self._capture_process_timestamp('ended') 368 else: 369 self.cleanup() 370 371 self._log_refresh_timer.cancel() 372 try: 373 if self.pid is None and self.pid_path.exists(): 374 self.pid_path.unlink() 375 except Exception: 376 pass 377 378 if is_success_tuple(result): 379 try: 380 mrsm.pprint(result) 381 except BrokenPipeError: 382 pass 383 384 except Exception: 385 daemon_error = traceback.format_exc() 386 from meerschaum.config.paths import DAEMON_ERROR_LOG_PATH 387 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 388 f.write( 389 f"Error in Daemon '{self}':\n\n" 390 f"{sys.stdin=}\n" 391 f"{self.stdin_file_path=}\n" 392 f"{self.stdin_file_path.exists()=}\n\n" 393 f"{daemon_error}\n\n" 394 ) 395 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 396 397 def _capture_process_timestamp( 398 self, 399 process_key: str, 400 write_properties: bool = True, 401 ) -> None: 402 """ 403 Record the current timestamp to the parameters `process:<process_key>`. 404 405 Parameters 406 ---------- 407 process_key: str 408 Under which key to store the timestamp. 409 410 write_properties: bool, default True 411 If `True` persist the properties to disk immediately after capturing the timestamp. 412 """ 413 if 'process' not in self.properties: 414 self.properties['process'] = {} 415 416 if process_key not in ('began', 'ended', 'paused', 'stopped'): 417 raise ValueError(f"Invalid key '{process_key}'.") 418 419 self.properties['process'][process_key] = ( 420 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 421 ) 422 if write_properties: 423 self.write_properties() 424 425 def run( 426 self, 427 keep_daemon_output: bool = True, 428 allow_dirty_run: bool = False, 429 wait: bool = False, 430 timeout: Union[int, float] = 4, 431 debug: bool = False, 432 ) -> SuccessTuple: 433 """Run the daemon as a child process and continue executing the parent. 434 435 Parameters 436 ---------- 437 keep_daemon_output: bool, default True 438 If `False`, delete the daemon's output directory upon exiting. 439 440 allow_dirty_run: bool, default False 441 If `True`, run the daemon, even if the `daemon_id` directory exists. 442 This option is dangerous because if the same `daemon_id` runs concurrently, 443 the last to finish will overwrite the output of the first. 444 445 wait: bool, default True 446 If `True`, block until `Daemon.status` is running (or the timeout expires). 447 448 timeout: Union[int, float], default 4 449 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 450 451 Returns 452 ------- 453 A SuccessTuple indicating success. 454 455 """ 456 import platform 457 if platform.system() == 'Windows': 458 return False, "Cannot run background jobs on Windows." 459 460 ### The daemon might exist and be paused. 461 if self.status == 'paused': 462 return self.resume() 463 464 self._remove_stop_file() 465 if self.status == 'running': 466 return True, f"Daemon '{self}' is already running." 467 468 self.mkdir_if_not_exists(allow_dirty_run) 469 _write_pickle_success_tuple = self.write_pickle() 470 if not _write_pickle_success_tuple[0]: 471 return _write_pickle_success_tuple 472 473 _launch_daemon_code = ( 474 "from meerschaum.utils.daemon import Daemon, _daemons; " 475 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 476 f"_daemons['{self.daemon_id}'] = daemon; " 477 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 478 "allow_dirty_run=True)" 479 ) 480 env = dict(os.environ) 481 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 482 msg = ( 483 "Success" 484 if _launch_success_bool 485 else f"Failed to start daemon '{self.daemon_id}'." 486 ) 487 if not wait or not _launch_success_bool: 488 return _launch_success_bool, msg 489 490 timeout = self.get_timeout_seconds(timeout) 491 check_timeout_interval = self.get_check_timeout_interval_seconds() 492 493 if not timeout: 494 success = self.status == 'running' 495 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 496 if success: 497 self._capture_process_timestamp('began') 498 return success, msg 499 500 begin = time.perf_counter() 501 while (time.perf_counter() - begin) < timeout: 502 if self.status == 'running': 503 self._capture_process_timestamp('began') 504 return True, "Success" 505 time.sleep(check_timeout_interval) 506 507 return False, ( 508 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 509 + ('s' if timeout != 1 else '') + '.' 510 ) 511 512 513 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 514 """ 515 Forcibly terminate a running daemon. 516 Sends a SIGTERM signal to the process. 517 518 Parameters 519 ---------- 520 timeout: Optional[int], default 3 521 How many seconds to wait for the process to terminate. 522 523 Returns 524 ------- 525 A SuccessTuple indicating success. 526 """ 527 if self.status != 'paused': 528 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 529 if success: 530 self._write_stop_file('kill') 531 self.stdin_file.close() 532 self._remove_blocking_stdin_file() 533 return success, msg 534 535 if self.status == 'stopped': 536 self._write_stop_file('kill') 537 self.stdin_file.close() 538 self._remove_blocking_stdin_file() 539 return True, "Process has already stopped." 540 541 psutil = attempt_import('psutil') 542 process = self.process 543 try: 544 process.terminate() 545 process.kill() 546 process.wait(timeout=timeout) 547 except Exception as e: 548 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 549 550 try: 551 if process.status(): 552 return False, "Failed to stop daemon '{self}' ({process})." 553 except psutil.NoSuchProcess: 554 pass 555 556 if self.pid_path.exists(): 557 try: 558 self.pid_path.unlink() 559 except Exception: 560 pass 561 562 self._write_stop_file('kill') 563 self.stdin_file.close() 564 self._remove_blocking_stdin_file() 565 return True, "Success" 566 567 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 568 """Gracefully quit a running daemon.""" 569 if self.status == 'paused': 570 return self.kill(timeout) 571 572 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 573 if signal_success: 574 self._write_stop_file('quit') 575 self.stdin_file.close() 576 self._remove_blocking_stdin_file() 577 return signal_success, signal_msg 578 579 def pause( 580 self, 581 timeout: Union[int, float, None] = None, 582 check_timeout_interval: Union[float, int, None] = None, 583 ) -> SuccessTuple: 584 """ 585 Pause the daemon if it is running. 586 587 Parameters 588 ---------- 589 timeout: Union[float, int, None], default None 590 The maximum number of seconds to wait for a process to suspend. 591 592 check_timeout_interval: Union[float, int, None], default None 593 The number of seconds to wait between checking if the process is still running. 594 595 Returns 596 ------- 597 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 598 """ 599 self._remove_blocking_stdin_file() 600 601 if self.process is None: 602 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 603 604 if self.status == 'paused': 605 return True, f"Daemon '{self.daemon_id}' is already paused." 606 607 self._write_stop_file('pause') 608 self.stdin_file.close() 609 self._remove_blocking_stdin_file() 610 try: 611 self.process.suspend() 612 except Exception as e: 613 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 614 615 timeout = self.get_timeout_seconds(timeout) 616 check_timeout_interval = self.get_check_timeout_interval_seconds( 617 check_timeout_interval 618 ) 619 620 psutil = attempt_import('psutil') 621 622 if not timeout: 623 try: 624 success = self.process.status() == 'stopped' 625 except psutil.NoSuchProcess: 626 success = True 627 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 628 if success: 629 self._capture_process_timestamp('paused') 630 return success, msg 631 632 begin = time.perf_counter() 633 while (time.perf_counter() - begin) < timeout: 634 try: 635 if self.process.status() == 'stopped': 636 self._capture_process_timestamp('paused') 637 return True, "Success" 638 except psutil.NoSuchProcess as e: 639 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 640 time.sleep(check_timeout_interval) 641 642 return False, ( 643 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 644 + ('s' if timeout != 1 else '') + '.' 645 ) 646 647 def resume( 648 self, 649 timeout: Union[int, float, None] = None, 650 check_timeout_interval: Union[float, int, None] = None, 651 ) -> SuccessTuple: 652 """ 653 Resume the daemon if it is paused. 654 655 Parameters 656 ---------- 657 timeout: Union[float, int, None], default None 658 The maximum number of seconds to wait for a process to resume. 659 660 check_timeout_interval: Union[float, int, None], default None 661 The number of seconds to wait between checking if the process is still stopped. 662 663 Returns 664 ------- 665 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 666 """ 667 if self.status == 'running': 668 return True, f"Daemon '{self.daemon_id}' is already running." 669 670 if self.status == 'stopped': 671 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 672 673 self._remove_stop_file() 674 try: 675 if self.process is None: 676 return False, f"Cannot resume daemon '{self.daemon_id}'." 677 678 self.process.resume() 679 except Exception as e: 680 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 681 682 timeout = self.get_timeout_seconds(timeout) 683 check_timeout_interval = self.get_check_timeout_interval_seconds( 684 check_timeout_interval 685 ) 686 687 if not timeout: 688 success = self.status == 'running' 689 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 690 if success: 691 self._capture_process_timestamp('began') 692 return success, msg 693 694 begin = time.perf_counter() 695 while (time.perf_counter() - begin) < timeout: 696 if self.status == 'running': 697 self._capture_process_timestamp('began') 698 return True, "Success" 699 time.sleep(check_timeout_interval) 700 701 return False, ( 702 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 703 + ('s' if timeout != 1 else '') + '.' 704 ) 705 706 def _write_stop_file(self, action: str) -> SuccessTuple: 707 """Write the stop file timestamp and action.""" 708 if action not in ('quit', 'kill', 'pause'): 709 return False, f"Unsupported action '{action}'." 710 711 if not self.stop_path.parent.exists(): 712 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 713 714 with open(self.stop_path, 'w+', encoding='utf-8') as f: 715 json.dump( 716 { 717 'stop_time': datetime.now(timezone.utc).isoformat(), 718 'action': action, 719 }, 720 f 721 ) 722 723 return True, "Success" 724 725 def _remove_stop_file(self) -> SuccessTuple: 726 """Remove the stop file""" 727 if not self.stop_path.exists(): 728 return True, "Stop file does not exist." 729 730 try: 731 self.stop_path.unlink() 732 except Exception as e: 733 return False, f"Failed to remove stop file:\n{e}" 734 735 return True, "Success" 736 737 def _read_stop_file(self) -> Dict[str, Any]: 738 """ 739 Read the stop file if it exists. 740 """ 741 if not self.stop_path.exists(): 742 return {} 743 744 try: 745 with open(self.stop_path, 'r', encoding='utf-8') as f: 746 data = json.load(f) 747 return data 748 except Exception: 749 return {} 750 751 def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple: 752 """ 753 Remove the blocking STDIN file if it exists. 754 """ 755 try: 756 if self.blocking_stdin_file_path.exists(): 757 self.blocking_stdin_file_path.unlink() 758 except Exception as e: 759 return False, str(e) 760 761 return True, "Success" 762 763 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 764 """ 765 Handle `SIGTERM` within the `Daemon` context. 766 This method is injected into the `DaemonContext`. 767 """ 768 from meerschaum.utils.process import signal_handler 769 signal_handler(signal_number, stack_frame) 770 771 timer = self.__dict__.get('_log_refresh_timer', None) 772 if timer is not None: 773 timer.cancel() 774 775 daemon_context = self.__dict__.get('_daemon_context', None) 776 if daemon_context is not None: 777 daemon_context.close() 778 779 _close_pools() 780 raise SystemExit(0) 781 782 def _send_signal( 783 self, 784 signal_to_send, 785 timeout: Union[float, int, None] = None, 786 check_timeout_interval: Union[float, int, None] = None, 787 ) -> SuccessTuple: 788 """Send a signal to the daemon process. 789 790 Parameters 791 ---------- 792 signal_to_send: 793 The signal the send to the daemon, e.g. `signals.SIGINT`. 794 795 timeout: Union[float, int, None], default None 796 The maximum number of seconds to wait for a process to terminate. 797 798 check_timeout_interval: Union[float, int, None], default None 799 The number of seconds to wait between checking if the process is still running. 800 801 Returns 802 ------- 803 A SuccessTuple indicating success. 804 """ 805 try: 806 pid = self.pid 807 if pid is None: 808 return ( 809 False, 810 f"Daemon '{self.daemon_id}' is not running, " 811 + f"cannot send signal '{signal_to_send}'." 812 ) 813 814 os.kill(pid, signal_to_send) 815 except Exception: 816 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 817 818 timeout = self.get_timeout_seconds(timeout) 819 check_timeout_interval = self.get_check_timeout_interval_seconds( 820 check_timeout_interval 821 ) 822 823 if not timeout: 824 return True, f"Successfully sent '{signal}' to daemon '{self.daemon_id}'." 825 826 begin = time.perf_counter() 827 while (time.perf_counter() - begin) < timeout: 828 if not self.status == 'running': 829 return True, "Success" 830 time.sleep(check_timeout_interval) 831 832 return False, ( 833 f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second" 834 + ('s' if timeout != 1 else '') + '.' 835 ) 836 837 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 838 """Create the Daemon's directory. 839 If `allow_dirty_run` is `False` and the directory already exists, 840 raise a `FileExistsError`. 841 """ 842 try: 843 self.path.mkdir(parents=True, exist_ok=True) 844 _already_exists = any(os.scandir(self.path)) 845 except FileExistsError: 846 _already_exists = True 847 848 if _already_exists and not allow_dirty_run: 849 error( 850 f"Daemon '{self.daemon_id}' already exists. " + 851 "To allow this daemon to run, do one of the following:\n" 852 + " - Execute `daemon.cleanup()`.\n" 853 + f" - Delete the directory '{self.path}'.\n" 854 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 855 FileExistsError, 856 ) 857 858 @property 859 def process(self) -> Union['psutil.Process', None]: 860 """ 861 Return the psutil process for the Daemon. 862 """ 863 psutil = attempt_import('psutil') 864 pid = self.pid 865 if pid is None: 866 return None 867 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 868 try: 869 self._process = psutil.Process(int(pid)) 870 process_exists = True 871 except Exception: 872 process_exists = False 873 if not process_exists: 874 _ = self.__dict__.pop('_process', None) 875 try: 876 if self.pid_path.exists(): 877 self.pid_path.unlink() 878 except Exception: 879 pass 880 return None 881 return self._process 882 883 @property 884 def status(self) -> str: 885 """ 886 Return the running status of this Daemon. 887 """ 888 if self.process is None: 889 return 'stopped' 890 891 psutil = attempt_import('psutil', lazy=False) 892 try: 893 if self.process.status() == 'stopped': 894 return 'paused' 895 if self.process.status() == 'zombie': 896 raise psutil.NoSuchProcess(self.process.pid) 897 except (psutil.NoSuchProcess, AttributeError): 898 if self.pid_path.exists(): 899 try: 900 self.pid_path.unlink() 901 except Exception: 902 pass 903 return 'stopped' 904 905 return 'running' 906 907 @classmethod 908 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 909 """ 910 Return a Daemon's path from its `daemon_id`. 911 """ 912 from meerschaum.config.paths import DAEMON_RESOURCES_PATH 913 return DAEMON_RESOURCES_PATH / daemon_id 914 915 @property 916 def path(self) -> pathlib.Path: 917 """ 918 Return the path for this Daemon's directory. 919 """ 920 return self._get_path_from_daemon_id(self.daemon_id) 921 922 @classmethod 923 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 924 """ 925 Return the `properties.json` path for a given `daemon_id`. 926 """ 927 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 928 929 @property 930 def properties_path(self) -> pathlib.Path: 931 """ 932 Return the `propterties.json` path for this Daemon. 933 """ 934 return self._get_properties_path_from_daemon_id(self.daemon_id) 935 936 @property 937 def stop_path(self) -> pathlib.Path: 938 """ 939 Return the path for the stop file (created when manually stopped). 940 """ 941 return self.path / '.stop.json' 942 943 @property 944 def log_path(self) -> pathlib.Path: 945 """ 946 Return the log path. 947 """ 948 logs_cf = self.properties.get('logs', None) or {} 949 if 'path' not in logs_cf: 950 from meerschaum.config.paths import LOGS_RESOURCES_PATH 951 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 952 953 return pathlib.Path(logs_cf['path']) 954 955 @property 956 def stdin_file_path(self) -> pathlib.Path: 957 """ 958 Return the stdin file path. 959 """ 960 return self.path / 'input.stdin' 961 962 @property 963 def blocking_stdin_file_path(self) -> pathlib.Path: 964 """ 965 Return the stdin file path. 966 """ 967 if '_blocking_stdin_file_path' in self.__dict__: 968 return self._blocking_stdin_file_path 969 970 return self.path / 'input.stdin.block' 971 972 @property 973 def prompt_kwargs_file_path(self) -> pathlib.Path: 974 """ 975 Return the file path to the kwargs for the invoking `prompt()`. 976 """ 977 return self.path / 'prompt_kwargs.json' 978 979 @property 980 def log_offset_path(self) -> pathlib.Path: 981 """ 982 Return the log offset file path. 983 """ 984 from meerschaum.config.paths import LOGS_RESOURCES_PATH 985 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 986 987 @property 988 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 989 """ 990 Return the process lock context manager. 991 """ 992 if '_log_offset_lock' in self.__dict__: 993 return self._log_offset_lock 994 995 fasteners = attempt_import('fasteners') 996 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 997 return self._log_offset_lock 998 999 @property 1000 def rotating_log(self) -> RotatingFile: 1001 """ 1002 The rotating log file for the daemon's output. 1003 """ 1004 if '_rotating_log' in self.__dict__: 1005 return self._rotating_log 1006 1007 logs_cf = self.properties.get('logs', None) or {} 1008 write_timestamps = logs_cf.get('write_timestamps', None) 1009 if write_timestamps is None: 1010 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1011 1012 timestamp_format = logs_cf.get('timestamp_format', None) 1013 if timestamp_format is None: 1014 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1015 1016 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1017 if num_files_to_keep is None: 1018 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1019 1020 max_file_size = logs_cf.get('max_file_size', None) 1021 if max_file_size is None: 1022 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1023 1024 redirect_streams = logs_cf.get('redirect_streams', True) 1025 1026 self._rotating_log = RotatingFile( 1027 self.log_path, 1028 redirect_streams=redirect_streams, 1029 write_timestamps=write_timestamps, 1030 timestamp_format=timestamp_format, 1031 num_files_to_keep=num_files_to_keep, 1032 max_file_size=max_file_size, 1033 ) 1034 return self._rotating_log 1035 1036 @property 1037 def stdin_file(self): 1038 """ 1039 Return the file handler for the stdin file. 1040 """ 1041 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1042 return _stdin_file 1043 1044 self._stdin_file = StdinFile( 1045 self.stdin_file_path, 1046 lock_file_path=self.blocking_stdin_file_path, 1047 ) 1048 return self._stdin_file 1049 1050 @property 1051 def log_text(self) -> Union[str, None]: 1052 """ 1053 Read the log files and return their contents. 1054 Returns `None` if the log file does not exist. 1055 """ 1056 logs_cf = self.properties.get('logs', None) or {} 1057 write_timestamps = logs_cf.get('write_timestamps', None) 1058 if write_timestamps is None: 1059 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1060 1061 timestamp_format = logs_cf.get('timestamp_format', None) 1062 if timestamp_format is None: 1063 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1064 1065 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1066 if num_files_to_keep is None: 1067 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1068 1069 max_file_size = logs_cf.get('max_file_size', None) 1070 if max_file_size is None: 1071 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1072 1073 new_rotating_log = RotatingFile( 1074 self.rotating_log.file_path, 1075 num_files_to_keep=num_files_to_keep, 1076 max_file_size=max_file_size, 1077 write_timestamps=write_timestamps, 1078 timestamp_format=timestamp_format, 1079 ) 1080 return new_rotating_log.read() 1081 1082 def readlines(self) -> List[str]: 1083 """ 1084 Read the next log lines, persisting the cursor for later use. 1085 Note this will alter the cursor of `self.rotating_log`. 1086 """ 1087 self.rotating_log._cursor = self._read_log_offset() 1088 lines = self.rotating_log.readlines() 1089 self._write_log_offset() 1090 return lines 1091 1092 def _read_log_offset(self) -> Tuple[int, int]: 1093 """ 1094 Return the current log offset cursor. 1095 1096 Returns 1097 ------- 1098 A tuple of the form (`subfile_index`, `position`). 1099 """ 1100 if not self.log_offset_path.exists(): 1101 return 0, 0 1102 1103 try: 1104 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 1105 cursor_text = f.read() 1106 cursor_parts = cursor_text.split(' ') 1107 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 1108 return subfile_index, subfile_position 1109 except Exception as e: 1110 warn(f"Failed to read cursor:\n{e}") 1111 return 0, 0 1112 1113 def _write_log_offset(self) -> None: 1114 """ 1115 Write the current log offset file. 1116 """ 1117 with self.log_offset_lock: 1118 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 1119 subfile_index = self.rotating_log._cursor[0] 1120 subfile_position = self.rotating_log._cursor[1] 1121 f.write(f"{subfile_index} {subfile_position}") 1122 1123 @property 1124 def pid(self) -> Union[int, None]: 1125 """ 1126 Read the PID file and return its contents. 1127 Returns `None` if the PID file does not exist. 1128 """ 1129 if not self.pid_path.exists(): 1130 return None 1131 try: 1132 with open(self.pid_path, 'r', encoding='utf-8') as f: 1133 text = f.read() 1134 if len(text) == 0: 1135 return None 1136 pid = int(text.rstrip()) 1137 except Exception as e: 1138 warn(e) 1139 text = None 1140 pid = None 1141 return pid 1142 1143 @property 1144 def pid_path(self) -> pathlib.Path: 1145 """ 1146 Return the path to a file containing the PID for this Daemon. 1147 """ 1148 return self.path / 'process.pid' 1149 1150 @property 1151 def pid_lock(self) -> 'fasteners.InterProcessLock': 1152 """ 1153 Return the process lock context manager. 1154 """ 1155 if '_pid_lock' in self.__dict__: 1156 return self._pid_lock 1157 1158 fasteners = attempt_import('fasteners') 1159 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1160 return self._pid_lock 1161 1162 @property 1163 def pickle_path(self) -> pathlib.Path: 1164 """ 1165 Return the path for the pickle file. 1166 """ 1167 return self.path / 'pickle.pkl' 1168 1169 def read_properties(self) -> Optional[Dict[str, Any]]: 1170 """Read the properties JSON file and return the dictionary.""" 1171 if not self.properties_path.exists(): 1172 return None 1173 try: 1174 with open(self.properties_path, 'r', encoding='utf-8') as file: 1175 properties = json.load(file) 1176 except Exception: 1177 properties = {} 1178 1179 return properties or {} 1180 1181 def read_pickle(self) -> Daemon: 1182 """Read a Daemon's pickle file and return the `Daemon`.""" 1183 import pickle 1184 import traceback 1185 if not self.pickle_path.exists(): 1186 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1187 1188 if self.pickle_path.stat().st_size == 0: 1189 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1190 1191 try: 1192 with open(self.pickle_path, 'rb') as pickle_file: 1193 daemon = pickle.load(pickle_file) 1194 success, msg = True, 'Success' 1195 except Exception as e: 1196 success, msg = False, str(e) 1197 daemon = None 1198 traceback.print_exception(type(e), e, e.__traceback__) 1199 if not success: 1200 error(msg) 1201 return daemon 1202 1203 @property 1204 def properties(self) -> Dict[str, Any]: 1205 """ 1206 Return the contents of the properties JSON file. 1207 """ 1208 try: 1209 _file_properties = self.read_properties() or {} 1210 except Exception: 1211 traceback.print_exc() 1212 _file_properties = {} 1213 1214 if not self._properties: 1215 self._properties = _file_properties 1216 1217 if self._properties is None: 1218 self._properties = {} 1219 1220 if ( 1221 self._properties.get('result', None) is None 1222 and _file_properties.get('result', None) is not None 1223 ): 1224 _ = self._properties.pop('result', None) 1225 1226 if _file_properties is not None: 1227 self._properties = apply_patch_to_config( 1228 _file_properties, 1229 self._properties, 1230 ) 1231 1232 return self._properties 1233 1234 @property 1235 def hidden(self) -> bool: 1236 """ 1237 Return a bool indicating whether this Daemon should be displayed. 1238 """ 1239 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1240 1241 def write_properties(self) -> SuccessTuple: 1242 """Write the properties dictionary to the properties JSON file 1243 (only if self.properties exists). 1244 """ 1245 from meerschaum.utils.misc import generate_password 1246 success, msg = ( 1247 False, 1248 f"No properties to write for daemon '{self.daemon_id}'." 1249 ) 1250 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1251 props = self.properties 1252 if props is not None: 1253 try: 1254 self.path.mkdir(parents=True, exist_ok=True) 1255 if self.properties_path.exists(): 1256 self.properties_path.rename(backup_path) 1257 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1258 json.dump(props, properties_file) 1259 success, msg = True, 'Success' 1260 except Exception as e: 1261 success, msg = False, str(e) 1262 1263 try: 1264 if backup_path.exists(): 1265 if not success: 1266 backup_path.rename(self.properties_path) 1267 else: 1268 backup_path.unlink() 1269 except Exception as e: 1270 success, msg = False, str(e) 1271 1272 return success, msg 1273 1274 def write_pickle(self) -> SuccessTuple: 1275 """Write the pickle file for the daemon.""" 1276 import pickle 1277 import traceback 1278 from meerschaum.utils.misc import generate_password 1279 1280 if not self.pickle: 1281 return True, "Success" 1282 1283 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1284 try: 1285 self.path.mkdir(parents=True, exist_ok=True) 1286 if self.pickle_path.exists(): 1287 self.pickle_path.rename(backup_path) 1288 with open(self.pickle_path, 'wb+') as pickle_file: 1289 pickle.dump(self, pickle_file) 1290 success, msg = True, "Success" 1291 except Exception as e: 1292 success, msg = False, str(e) 1293 traceback.print_exception(type(e), e, e.__traceback__) 1294 try: 1295 if backup_path.exists(): 1296 if not success: 1297 backup_path.rename(self.pickle_path) 1298 else: 1299 backup_path.unlink() 1300 except Exception as e: 1301 success, msg = False, str(e) 1302 return success, msg 1303 1304 1305 def _setup( 1306 self, 1307 allow_dirty_run: bool = False, 1308 ) -> None: 1309 """ 1310 Update properties before starting the Daemon. 1311 """ 1312 if self.properties is None: 1313 self._properties = {} 1314 1315 self._properties.update({ 1316 'target': { 1317 'name': self.target.__name__, 1318 'module': self.target.__module__, 1319 'args': self.target_args, 1320 'kw': self.target_kw, 1321 }, 1322 }) 1323 self.mkdir_if_not_exists(allow_dirty_run) 1324 _write_properties_success_tuple = self.write_properties() 1325 if not _write_properties_success_tuple[0]: 1326 error(_write_properties_success_tuple[1]) 1327 1328 _write_pickle_success_tuple = self.write_pickle() 1329 if not _write_pickle_success_tuple[0]: 1330 error(_write_pickle_success_tuple[1]) 1331 1332 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1333 """ 1334 Remove a daemon's directory after execution. 1335 1336 Parameters 1337 ---------- 1338 keep_logs: bool, default False 1339 If `True`, skip deleting the daemon's log files. 1340 1341 Returns 1342 ------- 1343 A `SuccessTuple` indicating success. 1344 """ 1345 if self.path.exists(): 1346 try: 1347 shutil.rmtree(self.path) 1348 except Exception as e: 1349 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1350 warn(msg) 1351 return False, msg 1352 if not keep_logs: 1353 self.rotating_log.delete() 1354 try: 1355 if self.log_offset_path.exists(): 1356 self.log_offset_path.unlink() 1357 except Exception as e: 1358 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1359 warn(msg) 1360 return False, msg 1361 return True, "Success" 1362 1363 1364 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1365 """ 1366 Return the timeout value to use. Use `--timeout-seconds` if provided, 1367 else the configured default (8). 1368 """ 1369 if isinstance(timeout, (int, float)): 1370 return timeout 1371 return get_config('jobs', 'timeout_seconds') 1372 1373 1374 def get_check_timeout_interval_seconds( 1375 self, 1376 check_timeout_interval: Union[int, float, None] = None, 1377 ) -> Union[int, float]: 1378 """ 1379 Return the interval value to check the status of timeouts. 1380 """ 1381 if isinstance(check_timeout_interval, (int, float)): 1382 return check_timeout_interval 1383 return get_config('jobs', 'check_timeout_interval_seconds') 1384 1385 @property 1386 def target_args(self) -> Union[Tuple[Any], None]: 1387 """ 1388 Return the positional arguments to pass to the target function. 1389 """ 1390 target_args = ( 1391 self.__dict__.get('_target_args', None) 1392 or self.properties.get('target', {}).get('args', None) 1393 ) 1394 if target_args is None: 1395 return tuple([]) 1396 1397 return tuple(target_args) 1398 1399 @property 1400 def target_kw(self) -> Union[Dict[str, Any], None]: 1401 """ 1402 Return the keyword arguments to pass to the target function. 1403 """ 1404 target_kw = ( 1405 self.__dict__.get('_target_kw', None) 1406 or self.properties.get('target', {}).get('kw', None) 1407 ) 1408 if target_kw is None: 1409 return {} 1410 1411 return {key: val for key, val in target_kw.items()} 1412 1413 def __getstate__(self): 1414 """ 1415 Pickle this Daemon. 1416 """ 1417 dill = attempt_import('dill') 1418 return { 1419 'target': dill.dumps(self.target), 1420 'target_args': self.target_args, 1421 'target_kw': self.target_kw, 1422 'daemon_id': self.daemon_id, 1423 'label': self.label, 1424 'properties': self.properties, 1425 } 1426 1427 def __setstate__(self, _state: Dict[str, Any]): 1428 """ 1429 Restore this Daemon from a pickled state. 1430 If the properties file exists, skip the old pickled version. 1431 """ 1432 dill = attempt_import('dill') 1433 _state['target'] = dill.loads(_state['target']) 1434 self._pickle = True 1435 daemon_id = _state.get('daemon_id', None) 1436 if not daemon_id: 1437 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1438 1439 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1440 ignore_properties = properties_path.exists() 1441 if ignore_properties: 1442 _state = { 1443 key: val 1444 for key, val in _state.items() 1445 if key != 'properties' 1446 } 1447 self.__init__(**_state) 1448 1449 1450 def __repr__(self): 1451 return str(self) 1452 1453 def __str__(self): 1454 return self.daemon_id 1455 1456 def __eq__(self, other): 1457 if not isinstance(other, Daemon): 1458 return False 1459 return self.daemon_id == other.daemon_id 1460 1461 def __hash__(self): 1462 return hash(self.daemon_id)
Daemonize Python functions into background processes.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 |
>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |
>>> mrsm.pprint(daemon.properties)
{
'label': 'print',
'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
'result': None,
'process': {'ended': '2024-07-29T18:03:33.752806'}
}
136 def __init__( 137 self, 138 target: Optional[Callable[[Any], Any]] = None, 139 target_args: Union[List[Any], Tuple[Any], None] = None, 140 target_kw: Optional[Dict[str, Any]] = None, 141 env: Optional[Dict[str, str]] = None, 142 daemon_id: Optional[str] = None, 143 label: Optional[str] = None, 144 properties: Optional[Dict[str, Any]] = None, 145 pickle: bool = True, 146 ): 147 """ 148 Parameters 149 ---------- 150 target: Optional[Callable[[Any], Any]], default None, 151 The function to execute in a child process. 152 153 target_args: Union[List[Any], Tuple[Any], None], default None 154 Positional arguments to pass to the target function. 155 156 target_kw: Optional[Dict[str, Any]], default None 157 Keyword arguments to pass to the target function. 158 159 env: Optional[Dict[str, str]], default None 160 If provided, set these environment variables in the daemon process. 161 162 daemon_id: Optional[str], default None 163 Build a `Daemon` from an existing `daemon_id`. 164 If `daemon_id` is provided, other arguments are ignored and are derived 165 from the existing pickled `Daemon`. 166 167 label: Optional[str], default None 168 Label string to help identifiy a daemon. 169 If `None`, use the function name instead. 170 171 properties: Optional[Dict[str, Any]], default None 172 Override reading from the properties JSON by providing an existing dictionary. 173 """ 174 _pickle = self.__dict__.get('_pickle', False) 175 if daemon_id is not None: 176 self.daemon_id = daemon_id 177 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 178 179 if not self.properties_path.exists(): 180 raise Exception( 181 f"Daemon '{self.daemon_id}' does not exist. " 182 + "Pass a target to create a new Daemon." 183 ) 184 185 try: 186 new_daemon = self.from_properties_file(daemon_id) 187 except Exception: 188 new_daemon = None 189 190 if new_daemon is not None: 191 new_daemon.write_pickle() 192 target = new_daemon.target 193 target_args = new_daemon.target_args 194 target_kw = new_daemon.target_kw 195 label = new_daemon.label 196 self._properties = new_daemon.properties 197 else: 198 try: 199 self.properties_path.unlink() 200 except Exception: 201 pass 202 203 raise Exception( 204 f"Could not recover daemon '{self.daemon_id}' " 205 + "from its properties file." 206 ) 207 208 if 'target' not in self.__dict__: 209 if target is None: 210 error("Cannot create a Daemon without a target.") 211 self.target = target 212 213 self.pickle = pickle 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 elif label is not None: 229 self.label = label 230 231 if 'daemon_id' not in self.__dict__: 232 self.daemon_id = get_new_daemon_name() 233 if '_properties' not in self.__dict__: 234 self._properties = properties 235 elif properties: 236 if self._properties is None: 237 self._properties = {} 238 self._properties.update(properties) 239 if self._properties is None: 240 self._properties = {} 241 242 self._properties.update({'label': self.label}) 243 if env: 244 self._properties.update({'env': env}) 245 246 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 247 _ = self.process
Parameters
- target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
- target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
- target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
- daemon_id (Optional[str], default None):
Build a
Daemon
from an existingdaemon_id
. Ifdaemon_id
is provided, other arguments are ignored and are derived from the existing pickledDaemon
. - label (Optional[str], default None):
Label string to help identifiy a daemon.
If
None
, use the function name instead. - properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
89 @classmethod 90 def from_properties_file(cls, daemon_id: str) -> Daemon: 91 """ 92 Return a Daemon from a properties dictionary. 93 """ 94 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 95 if not properties_path.exists(): 96 raise OSError(f"Properties file '{properties_path}' does not exist.") 97 98 try: 99 with open(properties_path, 'r', encoding='utf-8') as f: 100 properties = json.load(f) 101 except Exception: 102 properties = {} 103 104 if not properties: 105 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 106 107 daemon_id = properties_path.parent.name 108 target_cf = properties.get('target', {}) 109 target_module_name = target_cf.get('module', None) 110 target_function_name = target_cf.get('name', None) 111 target_args = target_cf.get('args', None) 112 target_kw = target_cf.get('kw', None) 113 label = properties.get('label', None) 114 115 if None in [ 116 target_module_name, 117 target_function_name, 118 target_args, 119 target_kw, 120 ]: 121 raise ValueError("Missing target function information.") 122 123 target_module = importlib.import_module(target_module_name) 124 target_function = getattr(target_module, target_function_name) 125 126 return Daemon( 127 daemon_id=daemon_id, 128 target=target_function, 129 target_args=target_args, 130 target_kw=target_kw, 131 properties=properties, 132 label=label, 133 )
Return a Daemon from a properties dictionary.
425 def run( 426 self, 427 keep_daemon_output: bool = True, 428 allow_dirty_run: bool = False, 429 wait: bool = False, 430 timeout: Union[int, float] = 4, 431 debug: bool = False, 432 ) -> SuccessTuple: 433 """Run the daemon as a child process and continue executing the parent. 434 435 Parameters 436 ---------- 437 keep_daemon_output: bool, default True 438 If `False`, delete the daemon's output directory upon exiting. 439 440 allow_dirty_run: bool, default False 441 If `True`, run the daemon, even if the `daemon_id` directory exists. 442 This option is dangerous because if the same `daemon_id` runs concurrently, 443 the last to finish will overwrite the output of the first. 444 445 wait: bool, default True 446 If `True`, block until `Daemon.status` is running (or the timeout expires). 447 448 timeout: Union[int, float], default 4 449 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 450 451 Returns 452 ------- 453 A SuccessTuple indicating success. 454 455 """ 456 import platform 457 if platform.system() == 'Windows': 458 return False, "Cannot run background jobs on Windows." 459 460 ### The daemon might exist and be paused. 461 if self.status == 'paused': 462 return self.resume() 463 464 self._remove_stop_file() 465 if self.status == 'running': 466 return True, f"Daemon '{self}' is already running." 467 468 self.mkdir_if_not_exists(allow_dirty_run) 469 _write_pickle_success_tuple = self.write_pickle() 470 if not _write_pickle_success_tuple[0]: 471 return _write_pickle_success_tuple 472 473 _launch_daemon_code = ( 474 "from meerschaum.utils.daemon import Daemon, _daemons; " 475 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 476 f"_daemons['{self.daemon_id}'] = daemon; " 477 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 478 "allow_dirty_run=True)" 479 ) 480 env = dict(os.environ) 481 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 482 msg = ( 483 "Success" 484 if _launch_success_bool 485 else f"Failed to start daemon '{self.daemon_id}'." 486 ) 487 if not wait or not _launch_success_bool: 488 return _launch_success_bool, msg 489 490 timeout = self.get_timeout_seconds(timeout) 491 check_timeout_interval = self.get_check_timeout_interval_seconds() 492 493 if not timeout: 494 success = self.status == 'running' 495 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 496 if success: 497 self._capture_process_timestamp('began') 498 return success, msg 499 500 begin = time.perf_counter() 501 while (time.perf_counter() - begin) < timeout: 502 if self.status == 'running': 503 self._capture_process_timestamp('began') 504 return True, "Success" 505 time.sleep(check_timeout_interval) 506 507 return False, ( 508 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 509 + ('s' if timeout != 1 else '') + '.' 510 )
Run the daemon as a child process and continue executing the parent.
Parameters
- keep_daemon_output (bool, default True):
If
False
, delete the daemon's output directory upon exiting. - allow_dirty_run (bool, default False):
If
True
, run the daemon, even if thedaemon_id
directory exists. This option is dangerous because if the samedaemon_id
runs concurrently, the last to finish will overwrite the output of the first. - wait (bool, default True):
If
True
, block untilDaemon.status
is running (or the timeout expires). - timeout (Union[int, float], default 4):
If
wait
isTrue
, block for up totimeout
seconds before returning a failure.
Returns
- A SuccessTuple indicating success.
513 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 514 """ 515 Forcibly terminate a running daemon. 516 Sends a SIGTERM signal to the process. 517 518 Parameters 519 ---------- 520 timeout: Optional[int], default 3 521 How many seconds to wait for the process to terminate. 522 523 Returns 524 ------- 525 A SuccessTuple indicating success. 526 """ 527 if self.status != 'paused': 528 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 529 if success: 530 self._write_stop_file('kill') 531 self.stdin_file.close() 532 self._remove_blocking_stdin_file() 533 return success, msg 534 535 if self.status == 'stopped': 536 self._write_stop_file('kill') 537 self.stdin_file.close() 538 self._remove_blocking_stdin_file() 539 return True, "Process has already stopped." 540 541 psutil = attempt_import('psutil') 542 process = self.process 543 try: 544 process.terminate() 545 process.kill() 546 process.wait(timeout=timeout) 547 except Exception as e: 548 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 549 550 try: 551 if process.status(): 552 return False, "Failed to stop daemon '{self}' ({process})." 553 except psutil.NoSuchProcess: 554 pass 555 556 if self.pid_path.exists(): 557 try: 558 self.pid_path.unlink() 559 except Exception: 560 pass 561 562 self._write_stop_file('kill') 563 self.stdin_file.close() 564 self._remove_blocking_stdin_file() 565 return True, "Success"
Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.
Parameters
- timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
- A SuccessTuple indicating success.
567 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 568 """Gracefully quit a running daemon.""" 569 if self.status == 'paused': 570 return self.kill(timeout) 571 572 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 573 if signal_success: 574 self._write_stop_file('quit') 575 self.stdin_file.close() 576 self._remove_blocking_stdin_file() 577 return signal_success, signal_msg
Gracefully quit a running daemon.
579 def pause( 580 self, 581 timeout: Union[int, float, None] = None, 582 check_timeout_interval: Union[float, int, None] = None, 583 ) -> SuccessTuple: 584 """ 585 Pause the daemon if it is running. 586 587 Parameters 588 ---------- 589 timeout: Union[float, int, None], default None 590 The maximum number of seconds to wait for a process to suspend. 591 592 check_timeout_interval: Union[float, int, None], default None 593 The number of seconds to wait between checking if the process is still running. 594 595 Returns 596 ------- 597 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 598 """ 599 self._remove_blocking_stdin_file() 600 601 if self.process is None: 602 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 603 604 if self.status == 'paused': 605 return True, f"Daemon '{self.daemon_id}' is already paused." 606 607 self._write_stop_file('pause') 608 self.stdin_file.close() 609 self._remove_blocking_stdin_file() 610 try: 611 self.process.suspend() 612 except Exception as e: 613 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 614 615 timeout = self.get_timeout_seconds(timeout) 616 check_timeout_interval = self.get_check_timeout_interval_seconds( 617 check_timeout_interval 618 ) 619 620 psutil = attempt_import('psutil') 621 622 if not timeout: 623 try: 624 success = self.process.status() == 'stopped' 625 except psutil.NoSuchProcess: 626 success = True 627 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 628 if success: 629 self._capture_process_timestamp('paused') 630 return success, msg 631 632 begin = time.perf_counter() 633 while (time.perf_counter() - begin) < timeout: 634 try: 635 if self.process.status() == 'stopped': 636 self._capture_process_timestamp('paused') 637 return True, "Success" 638 except psutil.NoSuchProcess as e: 639 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 640 time.sleep(check_timeout_interval) 641 642 return False, ( 643 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 644 + ('s' if timeout != 1 else '') + '.' 645 )
Pause the daemon if it is running.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully suspended.
647 def resume( 648 self, 649 timeout: Union[int, float, None] = None, 650 check_timeout_interval: Union[float, int, None] = None, 651 ) -> SuccessTuple: 652 """ 653 Resume the daemon if it is paused. 654 655 Parameters 656 ---------- 657 timeout: Union[float, int, None], default None 658 The maximum number of seconds to wait for a process to resume. 659 660 check_timeout_interval: Union[float, int, None], default None 661 The number of seconds to wait between checking if the process is still stopped. 662 663 Returns 664 ------- 665 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 666 """ 667 if self.status == 'running': 668 return True, f"Daemon '{self.daemon_id}' is already running." 669 670 if self.status == 'stopped': 671 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 672 673 self._remove_stop_file() 674 try: 675 if self.process is None: 676 return False, f"Cannot resume daemon '{self.daemon_id}'." 677 678 self.process.resume() 679 except Exception as e: 680 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 681 682 timeout = self.get_timeout_seconds(timeout) 683 check_timeout_interval = self.get_check_timeout_interval_seconds( 684 check_timeout_interval 685 ) 686 687 if not timeout: 688 success = self.status == 'running' 689 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 690 if success: 691 self._capture_process_timestamp('began') 692 return success, msg 693 694 begin = time.perf_counter() 695 while (time.perf_counter() - begin) < timeout: 696 if self.status == 'running': 697 self._capture_process_timestamp('began') 698 return True, "Success" 699 time.sleep(check_timeout_interval) 700 701 return False, ( 702 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 703 + ('s' if timeout != 1 else '') + '.' 704 )
Resume the daemon if it is paused.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
- A
SuccessTuple
indicating whether theDaemon
process was successfully resumed.
837 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 838 """Create the Daemon's directory. 839 If `allow_dirty_run` is `False` and the directory already exists, 840 raise a `FileExistsError`. 841 """ 842 try: 843 self.path.mkdir(parents=True, exist_ok=True) 844 _already_exists = any(os.scandir(self.path)) 845 except FileExistsError: 846 _already_exists = True 847 848 if _already_exists and not allow_dirty_run: 849 error( 850 f"Daemon '{self.daemon_id}' already exists. " + 851 "To allow this daemon to run, do one of the following:\n" 852 + " - Execute `daemon.cleanup()`.\n" 853 + f" - Delete the directory '{self.path}'.\n" 854 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 855 FileExistsError, 856 )
Create the Daemon's directory.
If allow_dirty_run
is False
and the directory already exists,
raise a FileExistsError
.
858 @property 859 def process(self) -> Union['psutil.Process', None]: 860 """ 861 Return the psutil process for the Daemon. 862 """ 863 psutil = attempt_import('psutil') 864 pid = self.pid 865 if pid is None: 866 return None 867 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 868 try: 869 self._process = psutil.Process(int(pid)) 870 process_exists = True 871 except Exception: 872 process_exists = False 873 if not process_exists: 874 _ = self.__dict__.pop('_process', None) 875 try: 876 if self.pid_path.exists(): 877 self.pid_path.unlink() 878 except Exception: 879 pass 880 return None 881 return self._process
Return the psutil process for the Daemon.
883 @property 884 def status(self) -> str: 885 """ 886 Return the running status of this Daemon. 887 """ 888 if self.process is None: 889 return 'stopped' 890 891 psutil = attempt_import('psutil', lazy=False) 892 try: 893 if self.process.status() == 'stopped': 894 return 'paused' 895 if self.process.status() == 'zombie': 896 raise psutil.NoSuchProcess(self.process.pid) 897 except (psutil.NoSuchProcess, AttributeError): 898 if self.pid_path.exists(): 899 try: 900 self.pid_path.unlink() 901 except Exception: 902 pass 903 return 'stopped' 904 905 return 'running'
Return the running status of this Daemon.
915 @property 916 def path(self) -> pathlib.Path: 917 """ 918 Return the path for this Daemon's directory. 919 """ 920 return self._get_path_from_daemon_id(self.daemon_id)
Return the path for this Daemon's directory.
929 @property 930 def properties_path(self) -> pathlib.Path: 931 """ 932 Return the `propterties.json` path for this Daemon. 933 """ 934 return self._get_properties_path_from_daemon_id(self.daemon_id)
Return the propterties.json
path for this Daemon.
936 @property 937 def stop_path(self) -> pathlib.Path: 938 """ 939 Return the path for the stop file (created when manually stopped). 940 """ 941 return self.path / '.stop.json'
Return the path for the stop file (created when manually stopped).
943 @property 944 def log_path(self) -> pathlib.Path: 945 """ 946 Return the log path. 947 """ 948 logs_cf = self.properties.get('logs', None) or {} 949 if 'path' not in logs_cf: 950 from meerschaum.config.paths import LOGS_RESOURCES_PATH 951 return LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 952 953 return pathlib.Path(logs_cf['path'])
Return the log path.
955 @property 956 def stdin_file_path(self) -> pathlib.Path: 957 """ 958 Return the stdin file path. 959 """ 960 return self.path / 'input.stdin'
Return the stdin file path.
962 @property 963 def blocking_stdin_file_path(self) -> pathlib.Path: 964 """ 965 Return the stdin file path. 966 """ 967 if '_blocking_stdin_file_path' in self.__dict__: 968 return self._blocking_stdin_file_path 969 970 return self.path / 'input.stdin.block'
Return the stdin file path.
972 @property 973 def prompt_kwargs_file_path(self) -> pathlib.Path: 974 """ 975 Return the file path to the kwargs for the invoking `prompt()`. 976 """ 977 return self.path / 'prompt_kwargs.json'
Return the file path to the kwargs for the invoking prompt()
.
979 @property 980 def log_offset_path(self) -> pathlib.Path: 981 """ 982 Return the log offset file path. 983 """ 984 from meerschaum.config.paths import LOGS_RESOURCES_PATH 985 return LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
Return the log offset file path.
987 @property 988 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 989 """ 990 Return the process lock context manager. 991 """ 992 if '_log_offset_lock' in self.__dict__: 993 return self._log_offset_lock 994 995 fasteners = attempt_import('fasteners') 996 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 997 return self._log_offset_lock
Return the process lock context manager.
999 @property 1000 def rotating_log(self) -> RotatingFile: 1001 """ 1002 The rotating log file for the daemon's output. 1003 """ 1004 if '_rotating_log' in self.__dict__: 1005 return self._rotating_log 1006 1007 logs_cf = self.properties.get('logs', None) or {} 1008 write_timestamps = logs_cf.get('write_timestamps', None) 1009 if write_timestamps is None: 1010 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1011 1012 timestamp_format = logs_cf.get('timestamp_format', None) 1013 if timestamp_format is None: 1014 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1015 1016 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1017 if num_files_to_keep is None: 1018 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1019 1020 max_file_size = logs_cf.get('max_file_size', None) 1021 if max_file_size is None: 1022 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1023 1024 redirect_streams = logs_cf.get('redirect_streams', True) 1025 1026 self._rotating_log = RotatingFile( 1027 self.log_path, 1028 redirect_streams=redirect_streams, 1029 write_timestamps=write_timestamps, 1030 timestamp_format=timestamp_format, 1031 num_files_to_keep=num_files_to_keep, 1032 max_file_size=max_file_size, 1033 ) 1034 return self._rotating_log
The rotating log file for the daemon's output.
1036 @property 1037 def stdin_file(self): 1038 """ 1039 Return the file handler for the stdin file. 1040 """ 1041 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1042 return _stdin_file 1043 1044 self._stdin_file = StdinFile( 1045 self.stdin_file_path, 1046 lock_file_path=self.blocking_stdin_file_path, 1047 ) 1048 return self._stdin_file
Return the file handler for the stdin file.
1050 @property 1051 def log_text(self) -> Union[str, None]: 1052 """ 1053 Read the log files and return their contents. 1054 Returns `None` if the log file does not exist. 1055 """ 1056 logs_cf = self.properties.get('logs', None) or {} 1057 write_timestamps = logs_cf.get('write_timestamps', None) 1058 if write_timestamps is None: 1059 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1060 1061 timestamp_format = logs_cf.get('timestamp_format', None) 1062 if timestamp_format is None: 1063 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1064 1065 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1066 if num_files_to_keep is None: 1067 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1068 1069 max_file_size = logs_cf.get('max_file_size', None) 1070 if max_file_size is None: 1071 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1072 1073 new_rotating_log = RotatingFile( 1074 self.rotating_log.file_path, 1075 num_files_to_keep=num_files_to_keep, 1076 max_file_size=max_file_size, 1077 write_timestamps=write_timestamps, 1078 timestamp_format=timestamp_format, 1079 ) 1080 return new_rotating_log.read()
Read the log files and return their contents.
Returns None
if the log file does not exist.
1082 def readlines(self) -> List[str]: 1083 """ 1084 Read the next log lines, persisting the cursor for later use. 1085 Note this will alter the cursor of `self.rotating_log`. 1086 """ 1087 self.rotating_log._cursor = self._read_log_offset() 1088 lines = self.rotating_log.readlines() 1089 self._write_log_offset() 1090 return lines
Read the next log lines, persisting the cursor for later use.
Note this will alter the cursor of self.rotating_log
.
1123 @property 1124 def pid(self) -> Union[int, None]: 1125 """ 1126 Read the PID file and return its contents. 1127 Returns `None` if the PID file does not exist. 1128 """ 1129 if not self.pid_path.exists(): 1130 return None 1131 try: 1132 with open(self.pid_path, 'r', encoding='utf-8') as f: 1133 text = f.read() 1134 if len(text) == 0: 1135 return None 1136 pid = int(text.rstrip()) 1137 except Exception as e: 1138 warn(e) 1139 text = None 1140 pid = None 1141 return pid
Read the PID file and return its contents.
Returns None
if the PID file does not exist.
1143 @property 1144 def pid_path(self) -> pathlib.Path: 1145 """ 1146 Return the path to a file containing the PID for this Daemon. 1147 """ 1148 return self.path / 'process.pid'
Return the path to a file containing the PID for this Daemon.
1150 @property 1151 def pid_lock(self) -> 'fasteners.InterProcessLock': 1152 """ 1153 Return the process lock context manager. 1154 """ 1155 if '_pid_lock' in self.__dict__: 1156 return self._pid_lock 1157 1158 fasteners = attempt_import('fasteners') 1159 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1160 return self._pid_lock
Return the process lock context manager.
1162 @property 1163 def pickle_path(self) -> pathlib.Path: 1164 """ 1165 Return the path for the pickle file. 1166 """ 1167 return self.path / 'pickle.pkl'
Return the path for the pickle file.
1169 def read_properties(self) -> Optional[Dict[str, Any]]: 1170 """Read the properties JSON file and return the dictionary.""" 1171 if not self.properties_path.exists(): 1172 return None 1173 try: 1174 with open(self.properties_path, 'r', encoding='utf-8') as file: 1175 properties = json.load(file) 1176 except Exception: 1177 properties = {} 1178 1179 return properties or {}
Read the properties JSON file and return the dictionary.
1181 def read_pickle(self) -> Daemon: 1182 """Read a Daemon's pickle file and return the `Daemon`.""" 1183 import pickle 1184 import traceback 1185 if not self.pickle_path.exists(): 1186 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1187 1188 if self.pickle_path.stat().st_size == 0: 1189 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1190 1191 try: 1192 with open(self.pickle_path, 'rb') as pickle_file: 1193 daemon = pickle.load(pickle_file) 1194 success, msg = True, 'Success' 1195 except Exception as e: 1196 success, msg = False, str(e) 1197 daemon = None 1198 traceback.print_exception(type(e), e, e.__traceback__) 1199 if not success: 1200 error(msg) 1201 return daemon
Read a Daemon's pickle file and return the Daemon
.
1203 @property 1204 def properties(self) -> Dict[str, Any]: 1205 """ 1206 Return the contents of the properties JSON file. 1207 """ 1208 try: 1209 _file_properties = self.read_properties() or {} 1210 except Exception: 1211 traceback.print_exc() 1212 _file_properties = {} 1213 1214 if not self._properties: 1215 self._properties = _file_properties 1216 1217 if self._properties is None: 1218 self._properties = {} 1219 1220 if ( 1221 self._properties.get('result', None) is None 1222 and _file_properties.get('result', None) is not None 1223 ): 1224 _ = self._properties.pop('result', None) 1225 1226 if _file_properties is not None: 1227 self._properties = apply_patch_to_config( 1228 _file_properties, 1229 self._properties, 1230 ) 1231 1232 return self._properties
Return the contents of the properties JSON file.
1241 def write_properties(self) -> SuccessTuple: 1242 """Write the properties dictionary to the properties JSON file 1243 (only if self.properties exists). 1244 """ 1245 from meerschaum.utils.misc import generate_password 1246 success, msg = ( 1247 False, 1248 f"No properties to write for daemon '{self.daemon_id}'." 1249 ) 1250 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1251 props = self.properties 1252 if props is not None: 1253 try: 1254 self.path.mkdir(parents=True, exist_ok=True) 1255 if self.properties_path.exists(): 1256 self.properties_path.rename(backup_path) 1257 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1258 json.dump(props, properties_file) 1259 success, msg = True, 'Success' 1260 except Exception as e: 1261 success, msg = False, str(e) 1262 1263 try: 1264 if backup_path.exists(): 1265 if not success: 1266 backup_path.rename(self.properties_path) 1267 else: 1268 backup_path.unlink() 1269 except Exception as e: 1270 success, msg = False, str(e) 1271 1272 return success, msg
Write the properties dictionary to the properties JSON file (only if self.properties exists).
1274 def write_pickle(self) -> SuccessTuple: 1275 """Write the pickle file for the daemon.""" 1276 import pickle 1277 import traceback 1278 from meerschaum.utils.misc import generate_password 1279 1280 if not self.pickle: 1281 return True, "Success" 1282 1283 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1284 try: 1285 self.path.mkdir(parents=True, exist_ok=True) 1286 if self.pickle_path.exists(): 1287 self.pickle_path.rename(backup_path) 1288 with open(self.pickle_path, 'wb+') as pickle_file: 1289 pickle.dump(self, pickle_file) 1290 success, msg = True, "Success" 1291 except Exception as e: 1292 success, msg = False, str(e) 1293 traceback.print_exception(type(e), e, e.__traceback__) 1294 try: 1295 if backup_path.exists(): 1296 if not success: 1297 backup_path.rename(self.pickle_path) 1298 else: 1299 backup_path.unlink() 1300 except Exception as e: 1301 success, msg = False, str(e) 1302 return success, msg
Write the pickle file for the daemon.
1332 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1333 """ 1334 Remove a daemon's directory after execution. 1335 1336 Parameters 1337 ---------- 1338 keep_logs: bool, default False 1339 If `True`, skip deleting the daemon's log files. 1340 1341 Returns 1342 ------- 1343 A `SuccessTuple` indicating success. 1344 """ 1345 if self.path.exists(): 1346 try: 1347 shutil.rmtree(self.path) 1348 except Exception as e: 1349 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1350 warn(msg) 1351 return False, msg 1352 if not keep_logs: 1353 self.rotating_log.delete() 1354 try: 1355 if self.log_offset_path.exists(): 1356 self.log_offset_path.unlink() 1357 except Exception as e: 1358 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1359 warn(msg) 1360 return False, msg 1361 return True, "Success"
Remove a daemon's directory after execution.
Parameters
- keep_logs (bool, default False):
If
True
, skip deleting the daemon's log files.
Returns
- A
SuccessTuple
indicating success.
1364 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1365 """ 1366 Return the timeout value to use. Use `--timeout-seconds` if provided, 1367 else the configured default (8). 1368 """ 1369 if isinstance(timeout, (int, float)): 1370 return timeout 1371 return get_config('jobs', 'timeout_seconds')
Return the timeout value to use. Use --timeout-seconds
if provided,
else the configured default (8).
1374 def get_check_timeout_interval_seconds( 1375 self, 1376 check_timeout_interval: Union[int, float, None] = None, 1377 ) -> Union[int, float]: 1378 """ 1379 Return the interval value to check the status of timeouts. 1380 """ 1381 if isinstance(check_timeout_interval, (int, float)): 1382 return check_timeout_interval 1383 return get_config('jobs', 'check_timeout_interval_seconds')
Return the interval value to check the status of timeouts.
1385 @property 1386 def target_args(self) -> Union[Tuple[Any], None]: 1387 """ 1388 Return the positional arguments to pass to the target function. 1389 """ 1390 target_args = ( 1391 self.__dict__.get('_target_args', None) 1392 or self.properties.get('target', {}).get('args', None) 1393 ) 1394 if target_args is None: 1395 return tuple([]) 1396 1397 return tuple(target_args)
Return the positional arguments to pass to the target function.
1399 @property 1400 def target_kw(self) -> Union[Dict[str, Any], None]: 1401 """ 1402 Return the keyword arguments to pass to the target function. 1403 """ 1404 target_kw = ( 1405 self.__dict__.get('_target_kw', None) 1406 or self.properties.get('target', {}).get('kw', None) 1407 ) 1408 if target_kw is None: 1409 return {} 1410 1411 return {key: val for key, val in target_kw.items()}
Return the keyword arguments to pass to the target function.