meerschaum.utils.daemon.Daemon
Manage running daemons via the Daemon class.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Manage running daemons via the Daemon class. 7""" 8 9from __future__ import annotations 10import os 11import importlib 12import pathlib 13import json 14import shutil 15import signal 16import time 17import traceback 18from functools import partial 19from datetime import datetime, timezone 20 21import meerschaum as mrsm 22from meerschaum.utils.typing import ( 23 Optional, Dict, Any, SuccessTuple, Callable, List, Union, 24 is_success_tuple, Tuple, 25) 26from meerschaum.config import get_config 27from meerschaum._internal.static import STATIC_CONFIG 28from meerschaum.config._patch import apply_patch_to_config 29from meerschaum.utils.warnings import warn, error 30from meerschaum.utils.packages import attempt_import 31from meerschaum.utils.venv import venv_exec 32from meerschaum.utils.daemon._names import get_new_daemon_name 33from meerschaum.utils.daemon.RotatingFile import RotatingFile 34from meerschaum.utils.daemon.StdinFile import StdinFile 35from meerschaum.utils.threading import RepeatTimer 36from meerschaum.__main__ import _close_pools 37 38_daemons = [] 39_results = {} 40 41class Daemon: 42 """ 43 Daemonize Python functions into background processes. 44 45 Examples 46 -------- 47 >>> import meerschaum as mrsm 48 >>> from meerschaum.utils.daemons import Daemon 49 >>> daemon = Daemon(print, ('hi',)) 50 >>> success, msg = daemon.run() 51 >>> print(daemon.log_text) 52 53 2024-07-29 18:03 | hi 54 2024-07-29 18:03 | 55 >>> daemon.run(allow_dirty_run=True) 56 >>> print(daemon.log_text) 57 58 2024-07-29 18:03 | hi 59 2024-07-29 18:03 | 60 2024-07-29 18:05 | hi 61 2024-07-29 18:05 | 62 >>> mrsm.pprint(daemon.properties) 63 { 64 'label': 'print', 65 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 66 'result': None, 67 'process': {'ended': '2024-07-29T18:03:33.752806'} 68 } 69 70 """ 71 72 def __new__( 73 cls, 74 *args, 75 daemon_id: Optional[str] = None, 76 **kw 77 ): 78 """ 79 If a daemon_id is provided and already exists, read from its pickle file. 80 """ 81 instance = super(Daemon, cls).__new__(cls) 82 if daemon_id is not None: 83 instance.daemon_id = daemon_id 84 if instance.pickle_path.exists(): 85 instance = instance.read_pickle() 86 return instance 87 88 @classmethod 89 def from_properties_file(cls, daemon_id: str) -> Daemon: 90 """ 91 Return a Daemon from a properties dictionary. 92 """ 93 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 94 if not properties_path.exists(): 95 raise OSError(f"Properties file '{properties_path}' does not exist.") 96 97 try: 98 with open(properties_path, 'r', encoding='utf-8') as f: 99 properties = json.load(f) 100 except Exception: 101 properties = {} 102 103 if not properties: 104 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 105 106 daemon_id = properties_path.parent.name 107 target_cf = properties.get('target', {}) 108 target_module_name = target_cf.get('module', None) 109 target_function_name = target_cf.get('name', None) 110 target_args = target_cf.get('args', None) 111 target_kw = target_cf.get('kw', None) 112 label = properties.get('label', None) 113 114 if None in [ 115 target_module_name, 116 target_function_name, 117 target_args, 118 target_kw, 119 ]: 120 raise ValueError("Missing target function information.") 121 122 target_module = importlib.import_module(target_module_name) 123 target_function = getattr(target_module, target_function_name) 124 125 return Daemon( 126 daemon_id=daemon_id, 127 target=target_function, 128 target_args=target_args, 129 target_kw=target_kw, 130 properties=properties, 131 label=label, 132 ) 133 134 135 def __init__( 136 self, 137 target: Optional[Callable[[Any], Any]] = None, 138 target_args: Union[List[Any], Tuple[Any], None] = None, 139 target_kw: Optional[Dict[str, Any]] = None, 140 env: Optional[Dict[str, str]] = None, 141 daemon_id: Optional[str] = None, 142 label: Optional[str] = None, 143 properties: Optional[Dict[str, Any]] = None, 144 pickle: bool = True, 145 ): 146 """ 147 Parameters 148 ---------- 149 target: Optional[Callable[[Any], Any]], default None, 150 The function to execute in a child process. 151 152 target_args: Union[List[Any], Tuple[Any], None], default None 153 Positional arguments to pass to the target function. 154 155 target_kw: Optional[Dict[str, Any]], default None 156 Keyword arguments to pass to the target function. 157 158 env: Optional[Dict[str, str]], default None 159 If provided, set these environment variables in the daemon process. 160 161 daemon_id: Optional[str], default None 162 Build a `Daemon` from an existing `daemon_id`. 163 If `daemon_id` is provided, other arguments are ignored and are derived 164 from the existing pickled `Daemon`. 165 166 label: Optional[str], default None 167 Label string to help identifiy a daemon. 168 If `None`, use the function name instead. 169 170 properties: Optional[Dict[str, Any]], default None 171 Override reading from the properties JSON by providing an existing dictionary. 172 """ 173 _pickle = self.__dict__.get('_pickle', False) 174 if daemon_id is not None: 175 self.daemon_id = daemon_id 176 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 177 178 if not self.properties_path.exists(): 179 raise Exception( 180 f"Daemon '{self.daemon_id}' does not exist. " 181 + "Pass a target to create a new Daemon." 182 ) 183 184 try: 185 new_daemon = self.from_properties_file(daemon_id) 186 except Exception: 187 new_daemon = None 188 189 if new_daemon is not None: 190 new_daemon.write_pickle() 191 target = new_daemon.target 192 target_args = new_daemon.target_args 193 target_kw = new_daemon.target_kw 194 label = new_daemon.label 195 self._properties = new_daemon.properties 196 else: 197 try: 198 self.properties_path.unlink() 199 except Exception: 200 pass 201 202 raise Exception( 203 f"Could not recover daemon '{self.daemon_id}' " 204 + "from its properties file." 205 ) 206 207 if 'target' not in self.__dict__: 208 if target is None: 209 error("Cannot create a Daemon without a target.") 210 self.target = target 211 212 self.pickle = pickle 213 214 ### NOTE: We have to check self.__dict__ in case we un-pickling. 215 if '_target_args' not in self.__dict__: 216 self._target_args = target_args 217 if '_target_kw' not in self.__dict__: 218 self._target_kw = target_kw 219 220 if 'label' not in self.__dict__: 221 if label is None: 222 label = ( 223 self.target.__name__ if '__name__' in self.target.__dir__() 224 else str(self.target) 225 ) 226 self.label = label 227 elif label is not None: 228 self.label = label 229 230 if 'daemon_id' not in self.__dict__: 231 self.daemon_id = get_new_daemon_name() 232 if '_properties' not in self.__dict__: 233 self._properties = properties 234 elif properties: 235 if self._properties is None: 236 self._properties = {} 237 self._properties.update(properties) 238 if self._properties is None: 239 self._properties = {} 240 241 self._properties.update({'label': self.label}) 242 if env: 243 self._properties.update({'env': env}) 244 245 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 246 _ = self.process 247 248 249 def _run_exit( 250 self, 251 keep_daemon_output: bool = True, 252 allow_dirty_run: bool = False, 253 ) -> Any: 254 """Run the daemon's target function. 255 NOTE: This WILL EXIT the parent process! 256 257 Parameters 258 ---------- 259 keep_daemon_output: bool, default True 260 If `False`, delete the daemon's output directory upon exiting. 261 262 allow_dirty_run, bool, default False: 263 If `True`, run the daemon, even if the `daemon_id` directory exists. 264 This option is dangerous because if the same `daemon_id` runs twice, 265 the last to finish will overwrite the output of the first. 266 267 Returns 268 ------- 269 Nothing — this will exit the parent process. 270 """ 271 import platform 272 import sys 273 import os 274 import traceback 275 from meerschaum.utils.warnings import warn 276 from meerschaum.config import get_config 277 daemon = attempt_import('daemon') 278 lines = get_config('jobs', 'terminal', 'lines') 279 columns = get_config('jobs', 'terminal', 'columns') 280 281 if platform.system() == 'Windows': 282 return False, "Windows is no longer supported." 283 284 self._setup(allow_dirty_run) 285 286 _daemons.append(self) 287 288 logs_cf = self.properties.get('logs', {}) 289 log_refresh_seconds = logs_cf.get('refresh_files_seconds', None) 290 if log_refresh_seconds is None: 291 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 292 write_timestamps = logs_cf.get('write_timestamps', None) 293 if write_timestamps is None: 294 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 295 296 self._log_refresh_timer = RepeatTimer( 297 log_refresh_seconds, 298 partial(self.rotating_log.refresh_files, start_interception=write_timestamps), 299 ) 300 301 capture_stdin = logs_cf.get('stdin', True) 302 cwd = self.properties.get('cwd', os.getcwd()) 303 304 ### NOTE: The SIGINT handler has been removed so that child processes may handle 305 ### KeyboardInterrupts themselves. 306 ### The previous aggressive approach was redundant because of the SIGTERM handler. 307 self._daemon_context = daemon.DaemonContext( 308 pidfile=self.pid_lock, 309 stdout=self.rotating_log, 310 stderr=self.rotating_log, 311 stdin=(self.stdin_file if capture_stdin else None), 312 working_directory=cwd, 313 detach_process=True, 314 files_preserve=list(self.rotating_log.subfile_objects.values()), 315 signal_map={ 316 signal.SIGTERM: self._handle_sigterm, 317 }, 318 ) 319 320 if capture_stdin and sys.stdin is None: 321 raise OSError("Cannot daemonize without stdin.") 322 323 try: 324 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 325 with self._daemon_context: 326 if capture_stdin: 327 sys.stdin = self.stdin_file 328 _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None) 329 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 330 os.environ['PYTHONUNBUFFERED'] = '1' 331 332 ### Allow the user to override environment variables. 333 env = self.properties.get('env', {}) 334 if env and isinstance(env, dict): 335 os.environ.update({str(k): str(v) for k, v in env.items()}) 336 337 self.rotating_log.refresh_files(start_interception=True) 338 result = None 339 try: 340 with open(self.pid_path, 'w+', encoding='utf-8') as f: 341 f.write(str(os.getpid())) 342 343 ### NOTE: The timer fails to start for remote actions to localhost. 344 try: 345 if not self._log_refresh_timer.is_running(): 346 self._log_refresh_timer.start() 347 except Exception: 348 pass 349 350 self.properties['result'] = None 351 self._capture_process_timestamp('began') 352 result = self.target(*self.target_args, **self.target_kw) 353 self.properties['result'] = result 354 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 355 result = False, traceback.format_exc() 356 except Exception as e: 357 warn( 358 f"Exception in daemon target function: {traceback.format_exc()}", 359 ) 360 result = False, str(e) 361 finally: 362 _results[self.daemon_id] = result 363 self.properties['result'] = result 364 365 if keep_daemon_output: 366 self._capture_process_timestamp('ended') 367 else: 368 self.cleanup() 369 370 self._log_refresh_timer.cancel() 371 try: 372 if self.pid is None and self.pid_path.exists(): 373 self.pid_path.unlink() 374 except Exception: 375 pass 376 377 if is_success_tuple(result): 378 try: 379 mrsm.pprint(result) 380 except BrokenPipeError: 381 pass 382 383 except Exception: 384 daemon_error = traceback.format_exc() 385 import meerschaum.config.paths as paths 386 with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 387 f.write( 388 f"Error in Daemon '{self}':\n\n" 389 f"{sys.stdin=}\n" 390 f"{self.stdin_file_path=}\n" 391 f"{self.stdin_file_path.exists()=}\n\n" 392 f"{daemon_error}\n\n" 393 ) 394 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 395 396 def _capture_process_timestamp( 397 self, 398 process_key: str, 399 write_properties: bool = True, 400 ) -> None: 401 """ 402 Record the current timestamp to the parameters `process:<process_key>`. 403 404 Parameters 405 ---------- 406 process_key: str 407 Under which key to store the timestamp. 408 409 write_properties: bool, default True 410 If `True` persist the properties to disk immediately after capturing the timestamp. 411 """ 412 if 'process' not in self.properties: 413 self.properties['process'] = {} 414 415 if process_key not in ('began', 'ended', 'paused', 'stopped'): 416 raise ValueError(f"Invalid key '{process_key}'.") 417 418 self.properties['process'][process_key] = ( 419 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 420 ) 421 if write_properties: 422 self.write_properties() 423 424 def run( 425 self, 426 keep_daemon_output: bool = True, 427 allow_dirty_run: bool = False, 428 wait: bool = False, 429 timeout: Union[int, float] = 4, 430 debug: bool = False, 431 ) -> SuccessTuple: 432 """Run the daemon as a child process and continue executing the parent. 433 434 Parameters 435 ---------- 436 keep_daemon_output: bool, default True 437 If `False`, delete the daemon's output directory upon exiting. 438 439 allow_dirty_run: bool, default False 440 If `True`, run the daemon, even if the `daemon_id` directory exists. 441 This option is dangerous because if the same `daemon_id` runs concurrently, 442 the last to finish will overwrite the output of the first. 443 444 wait: bool, default True 445 If `True`, block until `Daemon.status` is running (or the timeout expires). 446 447 timeout: Union[int, float], default 4 448 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 449 450 Returns 451 ------- 452 A SuccessTuple indicating success. 453 454 """ 455 import platform 456 if platform.system() == 'Windows': 457 return False, "Cannot run background jobs on Windows." 458 459 ### The daemon might exist and be paused. 460 if self.status == 'paused': 461 return self.resume() 462 463 self._remove_stop_file() 464 if self.status == 'running': 465 return True, f"Daemon '{self}' is already running." 466 467 self.mkdir_if_not_exists(allow_dirty_run) 468 _write_pickle_success_tuple = self.write_pickle() 469 if not _write_pickle_success_tuple[0]: 470 return _write_pickle_success_tuple 471 472 _launch_daemon_code = ( 473 "from meerschaum.utils.daemon import Daemon, _daemons; " 474 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 475 f"_daemons['{self.daemon_id}'] = daemon; " 476 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 477 "allow_dirty_run=True)" 478 ) 479 env = dict(os.environ) 480 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 481 msg = ( 482 "Success" 483 if _launch_success_bool 484 else f"Failed to start daemon '{self.daemon_id}'." 485 ) 486 if not wait or not _launch_success_bool: 487 return _launch_success_bool, msg 488 489 timeout = self.get_timeout_seconds(timeout) 490 check_timeout_interval = self.get_check_timeout_interval_seconds() 491 492 if not timeout: 493 success = self.status == 'running' 494 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 495 if success: 496 self._capture_process_timestamp('began') 497 return success, msg 498 499 begin = time.perf_counter() 500 while (time.perf_counter() - begin) < timeout: 501 if self.status == 'running': 502 self._capture_process_timestamp('began') 503 return True, "Success" 504 time.sleep(check_timeout_interval) 505 506 return False, ( 507 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 508 + ('s' if timeout != 1 else '') + '.' 509 ) 510 511 512 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 513 """ 514 Forcibly terminate a running daemon. 515 Sends a SIGTERM signal to the process. 516 517 Parameters 518 ---------- 519 timeout: Optional[int], default 3 520 How many seconds to wait for the process to terminate. 521 522 Returns 523 ------- 524 A SuccessTuple indicating success. 525 """ 526 if self.status != 'paused': 527 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 528 if success: 529 self._write_stop_file('kill') 530 self.stdin_file.close() 531 self._remove_blocking_stdin_file() 532 return success, msg 533 534 if self.status == 'stopped': 535 self._write_stop_file('kill') 536 self.stdin_file.close() 537 self._remove_blocking_stdin_file() 538 return True, "Process has already stopped." 539 540 psutil = attempt_import('psutil') 541 process = self.process 542 try: 543 process.terminate() 544 process.kill() 545 process.wait(timeout=timeout) 546 except Exception as e: 547 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 548 549 try: 550 if process.status(): 551 return False, "Failed to stop daemon '{self}' ({process})." 552 except psutil.NoSuchProcess: 553 pass 554 555 if self.pid_path.exists(): 556 try: 557 self.pid_path.unlink() 558 except Exception: 559 pass 560 561 self._write_stop_file('kill') 562 self.stdin_file.close() 563 self._remove_blocking_stdin_file() 564 return True, "Success" 565 566 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 567 """Gracefully quit a running daemon.""" 568 if self.status == 'paused': 569 return self.kill(timeout) 570 571 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 572 if signal_success: 573 self._write_stop_file('quit') 574 self.stdin_file.close() 575 self._remove_blocking_stdin_file() 576 return signal_success, signal_msg 577 578 def pause( 579 self, 580 timeout: Union[int, float, None] = None, 581 check_timeout_interval: Union[float, int, None] = None, 582 ) -> SuccessTuple: 583 """ 584 Pause the daemon if it is running. 585 586 Parameters 587 ---------- 588 timeout: Union[float, int, None], default None 589 The maximum number of seconds to wait for a process to suspend. 590 591 check_timeout_interval: Union[float, int, None], default None 592 The number of seconds to wait between checking if the process is still running. 593 594 Returns 595 ------- 596 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 597 """ 598 self._remove_blocking_stdin_file() 599 600 if self.process is None: 601 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 602 603 if self.status == 'paused': 604 return True, f"Daemon '{self.daemon_id}' is already paused." 605 606 self._write_stop_file('pause') 607 self.stdin_file.close() 608 self._remove_blocking_stdin_file() 609 try: 610 self.process.suspend() 611 except Exception as e: 612 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 613 614 timeout = self.get_timeout_seconds(timeout) 615 check_timeout_interval = self.get_check_timeout_interval_seconds( 616 check_timeout_interval 617 ) 618 619 psutil = attempt_import('psutil') 620 621 if not timeout: 622 try: 623 success = self.process.status() == 'stopped' 624 except psutil.NoSuchProcess: 625 success = True 626 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 627 if success: 628 self._capture_process_timestamp('paused') 629 return success, msg 630 631 begin = time.perf_counter() 632 while (time.perf_counter() - begin) < timeout: 633 try: 634 if self.process.status() == 'stopped': 635 self._capture_process_timestamp('paused') 636 return True, "Success" 637 except psutil.NoSuchProcess as e: 638 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 639 time.sleep(check_timeout_interval) 640 641 return False, ( 642 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 643 + ('s' if timeout != 1 else '') + '.' 644 ) 645 646 def resume( 647 self, 648 timeout: Union[int, float, None] = None, 649 check_timeout_interval: Union[float, int, None] = None, 650 ) -> SuccessTuple: 651 """ 652 Resume the daemon if it is paused. 653 654 Parameters 655 ---------- 656 timeout: Union[float, int, None], default None 657 The maximum number of seconds to wait for a process to resume. 658 659 check_timeout_interval: Union[float, int, None], default None 660 The number of seconds to wait between checking if the process is still stopped. 661 662 Returns 663 ------- 664 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 665 """ 666 if self.status == 'running': 667 return True, f"Daemon '{self.daemon_id}' is already running." 668 669 if self.status == 'stopped': 670 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 671 672 self._remove_stop_file() 673 try: 674 if self.process is None: 675 return False, f"Cannot resume daemon '{self.daemon_id}'." 676 677 self.process.resume() 678 except Exception as e: 679 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 680 681 timeout = self.get_timeout_seconds(timeout) 682 check_timeout_interval = self.get_check_timeout_interval_seconds( 683 check_timeout_interval 684 ) 685 686 if not timeout: 687 success = self.status == 'running' 688 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 689 if success: 690 self._capture_process_timestamp('began') 691 return success, msg 692 693 begin = time.perf_counter() 694 while (time.perf_counter() - begin) < timeout: 695 if self.status == 'running': 696 self._capture_process_timestamp('began') 697 return True, "Success" 698 time.sleep(check_timeout_interval) 699 700 return False, ( 701 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 702 + ('s' if timeout != 1 else '') + '.' 703 ) 704 705 def _write_stop_file(self, action: str) -> SuccessTuple: 706 """Write the stop file timestamp and action.""" 707 if action not in ('quit', 'kill', 'pause'): 708 return False, f"Unsupported action '{action}'." 709 710 if not self.stop_path.parent.exists(): 711 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 712 713 with open(self.stop_path, 'w+', encoding='utf-8') as f: 714 json.dump( 715 { 716 'stop_time': datetime.now(timezone.utc).isoformat(), 717 'action': action, 718 }, 719 f 720 ) 721 722 return True, "Success" 723 724 def _remove_stop_file(self) -> SuccessTuple: 725 """Remove the stop file""" 726 if not self.stop_path.exists(): 727 return True, "Stop file does not exist." 728 729 try: 730 self.stop_path.unlink() 731 except Exception as e: 732 return False, f"Failed to remove stop file:\n{e}" 733 734 return True, "Success" 735 736 def _read_stop_file(self) -> Dict[str, Any]: 737 """ 738 Read the stop file if it exists. 739 """ 740 if not self.stop_path.exists(): 741 return {} 742 743 try: 744 with open(self.stop_path, 'r', encoding='utf-8') as f: 745 data = json.load(f) 746 return data 747 except Exception: 748 return {} 749 750 def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple: 751 """ 752 Remove the blocking STDIN file if it exists. 753 """ 754 try: 755 if self.blocking_stdin_file_path.exists(): 756 self.blocking_stdin_file_path.unlink() 757 except Exception as e: 758 return False, str(e) 759 760 return True, "Success" 761 762 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 763 """ 764 Handle `SIGTERM` within the `Daemon` context. 765 This method is injected into the `DaemonContext`. 766 """ 767 from meerschaum.utils.process import signal_handler 768 from meerschaum.utils.threading import request_stop, interrupt_threads 769 signal_handler(signal_number, stack_frame) 770 771 ### Tell cooperative loops (e.g. `sync pipes`) to stop, then actively unwind 772 ### any worker threads so they cannot keep the process alive as a zombie. 773 request_stop() 774 interrupt_threads(SystemExit) 775 776 timer = self.__dict__.get('_log_refresh_timer', None) 777 if timer is not None: 778 timer.cancel() 779 780 daemon_context = self.__dict__.get('_daemon_context', None) 781 if daemon_context is not None: 782 daemon_context.close() 783 784 _close_pools() 785 raise SystemExit(0) 786 787 def _send_signal( 788 self, 789 signal_to_send, 790 timeout: Union[float, int, None] = None, 791 check_timeout_interval: Union[float, int, None] = None, 792 ) -> SuccessTuple: 793 """Send a signal to the daemon process. 794 795 Parameters 796 ---------- 797 signal_to_send: 798 The signal the send to the daemon, e.g. `signals.SIGINT`. 799 800 timeout: Union[float, int, None], default None 801 The maximum number of seconds to wait for a process to terminate. 802 803 check_timeout_interval: Union[float, int, None], default None 804 The number of seconds to wait between checking if the process is still running. 805 806 Returns 807 ------- 808 A SuccessTuple indicating success. 809 """ 810 try: 811 pid = self.pid 812 if pid is None: 813 return ( 814 False, 815 f"Daemon '{self.daemon_id}' is not running, " 816 + f"cannot send signal '{signal_to_send}'." 817 ) 818 819 os.kill(pid, signal_to_send) 820 except Exception: 821 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 822 823 timeout = self.get_timeout_seconds(timeout) 824 check_timeout_interval = self.get_check_timeout_interval_seconds( 825 check_timeout_interval 826 ) 827 828 if not timeout: 829 return True, f"Successfully sent '{signal_to_send}' to daemon '{self.daemon_id}'." 830 831 begin = time.perf_counter() 832 while (time.perf_counter() - begin) < timeout: 833 if not self.status == 'running': 834 return True, "Success" 835 time.sleep(check_timeout_interval) 836 837 return False, ( 838 f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second" 839 + ('s' if timeout != 1 else '') + '.' 840 ) 841 842 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 843 """Create the Daemon's directory. 844 If `allow_dirty_run` is `False` and the directory already exists, 845 raise a `FileExistsError`. 846 """ 847 try: 848 self.path.mkdir(parents=True, exist_ok=True) 849 _already_exists = any(os.scandir(self.path)) 850 except FileExistsError: 851 _already_exists = True 852 853 if _already_exists and not allow_dirty_run: 854 error( 855 f"Daemon '{self.daemon_id}' already exists. " + 856 "To allow this daemon to run, do one of the following:\n" 857 + " - Execute `daemon.cleanup()`.\n" 858 + f" - Delete the directory '{self.path}'.\n" 859 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 860 FileExistsError, 861 ) 862 863 @property 864 def process(self) -> Union['psutil.Process', None]: 865 """ 866 Return the psutil process for the Daemon. 867 """ 868 psutil = attempt_import('psutil') 869 pid = self.pid 870 if pid is None: 871 return None 872 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 873 try: 874 self._process = psutil.Process(int(pid)) 875 process_exists = True 876 except Exception: 877 process_exists = False 878 if not process_exists: 879 _ = self.__dict__.pop('_process', None) 880 try: 881 if self.pid_path.exists(): 882 self.pid_path.unlink() 883 except Exception: 884 pass 885 return None 886 return self._process 887 888 @property 889 def status(self) -> str: 890 """ 891 Return the running status of this Daemon. 892 """ 893 if self.process is None: 894 return 'stopped' 895 896 psutil = attempt_import('psutil', lazy=False) 897 try: 898 if self.process.status() == 'stopped': 899 return 'paused' 900 if self.process.status() == 'zombie': 901 raise psutil.NoSuchProcess(self.process.pid) 902 except (psutil.NoSuchProcess, AttributeError): 903 if self.pid_path.exists(): 904 try: 905 self.pid_path.unlink() 906 except Exception: 907 pass 908 return 'stopped' 909 910 return 'running' 911 912 @classmethod 913 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 914 """ 915 Return a Daemon's path from its `daemon_id`. 916 """ 917 import meerschaum.config.paths as paths 918 return paths.DAEMON_RESOURCES_PATH / daemon_id 919 920 @property 921 def path(self) -> pathlib.Path: 922 """ 923 Return the path for this Daemon's directory. 924 """ 925 return self._get_path_from_daemon_id(self.daemon_id) 926 927 @classmethod 928 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 929 """ 930 Return the `properties.json` path for a given `daemon_id`. 931 """ 932 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 933 934 @property 935 def properties_path(self) -> pathlib.Path: 936 """ 937 Return the `propterties.json` path for this Daemon. 938 """ 939 return self._get_properties_path_from_daemon_id(self.daemon_id) 940 941 @property 942 def stop_path(self) -> pathlib.Path: 943 """ 944 Return the path for the stop file (created when manually stopped). 945 """ 946 return self.path / '.stop.json' 947 948 @property 949 def log_path(self) -> pathlib.Path: 950 """ 951 Return the log path. 952 """ 953 logs_cf = self.properties.get('logs', None) or {} 954 if 'path' not in logs_cf: 955 import meerschaum.config.paths as paths 956 return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 957 958 return pathlib.Path(logs_cf['path']) 959 960 @property 961 def stdin_file_path(self) -> pathlib.Path: 962 """ 963 Return the stdin file path. 964 """ 965 return self.path / 'input.stdin' 966 967 @property 968 def blocking_stdin_file_path(self) -> pathlib.Path: 969 """ 970 Return the stdin file path. 971 """ 972 if '_blocking_stdin_file_path' in self.__dict__: 973 return self._blocking_stdin_file_path 974 975 return self.path / 'input.stdin.block' 976 977 @property 978 def prompt_kwargs_file_path(self) -> pathlib.Path: 979 """ 980 Return the file path to the kwargs for the invoking `prompt()`. 981 """ 982 return self.path / 'prompt_kwargs.json' 983 984 @property 985 def log_offset_path(self) -> pathlib.Path: 986 """ 987 Return the log offset file path. 988 """ 989 import meerschaum.config.paths as paths 990 return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 991 992 @property 993 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 994 """ 995 Return the process lock context manager. 996 """ 997 if '_log_offset_lock' in self.__dict__: 998 return self._log_offset_lock 999 1000 fasteners = attempt_import('fasteners') 1001 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 1002 return self._log_offset_lock 1003 1004 @property 1005 def rotating_log(self) -> RotatingFile: 1006 """ 1007 The rotating log file for the daemon's output. 1008 """ 1009 if '_rotating_log' in self.__dict__: 1010 return self._rotating_log 1011 1012 logs_cf = self.properties.get('logs', None) or {} 1013 write_timestamps = logs_cf.get('write_timestamps', None) 1014 if write_timestamps is None: 1015 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1016 1017 timestamp_format = logs_cf.get('timestamp_format', None) 1018 if timestamp_format is None: 1019 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1020 1021 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1022 if num_files_to_keep is None: 1023 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1024 1025 max_file_size = logs_cf.get('max_file_size', None) 1026 if max_file_size is None: 1027 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1028 1029 redirect_streams = logs_cf.get('redirect_streams', True) 1030 1031 self._rotating_log = RotatingFile( 1032 self.log_path, 1033 redirect_streams=redirect_streams, 1034 write_timestamps=write_timestamps, 1035 timestamp_format=timestamp_format, 1036 num_files_to_keep=num_files_to_keep, 1037 max_file_size=max_file_size, 1038 ) 1039 return self._rotating_log 1040 1041 @property 1042 def stdin_file(self): 1043 """ 1044 Return the file handler for the stdin file. 1045 """ 1046 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1047 return _stdin_file 1048 1049 self._stdin_file = StdinFile( 1050 self.stdin_file_path, 1051 lock_file_path=self.blocking_stdin_file_path, 1052 ) 1053 return self._stdin_file 1054 1055 @property 1056 def log_text(self) -> Union[str, None]: 1057 """ 1058 Read the log files and return their contents. 1059 Returns `None` if the log file does not exist. 1060 """ 1061 logs_cf = self.properties.get('logs', None) or {} 1062 write_timestamps = logs_cf.get('write_timestamps', None) 1063 if write_timestamps is None: 1064 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1065 1066 timestamp_format = logs_cf.get('timestamp_format', None) 1067 if timestamp_format is None: 1068 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1069 1070 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1071 if num_files_to_keep is None: 1072 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1073 1074 max_file_size = logs_cf.get('max_file_size', None) 1075 if max_file_size is None: 1076 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1077 1078 new_rotating_log = RotatingFile( 1079 self.rotating_log.file_path, 1080 num_files_to_keep=num_files_to_keep, 1081 max_file_size=max_file_size, 1082 write_timestamps=write_timestamps, 1083 timestamp_format=timestamp_format, 1084 ) 1085 return new_rotating_log.read() 1086 1087 def readlines(self) -> List[str]: 1088 """ 1089 Read the next log lines, persisting the cursor for later use. 1090 Note this will alter the cursor of `self.rotating_log`. 1091 """ 1092 self.rotating_log._cursor = self._read_log_offset() 1093 lines = self.rotating_log.readlines() 1094 self._write_log_offset() 1095 return lines 1096 1097 def _read_log_offset(self) -> Tuple[int, int]: 1098 """ 1099 Return the current log offset cursor. 1100 1101 Returns 1102 ------- 1103 A tuple of the form (`subfile_index`, `position`). 1104 """ 1105 if not self.log_offset_path.exists(): 1106 return 0, 0 1107 1108 try: 1109 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 1110 cursor_text = f.read() 1111 cursor_parts = cursor_text.split(' ') 1112 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 1113 return subfile_index, subfile_position 1114 except Exception as e: 1115 warn(f"Failed to read cursor:\n{e}") 1116 return 0, 0 1117 1118 def _write_log_offset(self) -> None: 1119 """ 1120 Write the current log offset file. 1121 """ 1122 with self.log_offset_lock: 1123 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 1124 subfile_index = self.rotating_log._cursor[0] 1125 subfile_position = self.rotating_log._cursor[1] 1126 f.write(f"{subfile_index} {subfile_position}") 1127 1128 @property 1129 def pid(self) -> Union[int, None]: 1130 """ 1131 Read the PID file and return its contents. 1132 Returns `None` if the PID file does not exist. 1133 """ 1134 if not self.pid_path.exists(): 1135 return None 1136 try: 1137 with open(self.pid_path, 'r', encoding='utf-8') as f: 1138 text = f.read() 1139 if len(text) == 0: 1140 return None 1141 pid = int(text.rstrip()) 1142 except Exception as e: 1143 warn(e) 1144 text = None 1145 pid = None 1146 return pid 1147 1148 @property 1149 def pid_path(self) -> pathlib.Path: 1150 """ 1151 Return the path to a file containing the PID for this Daemon. 1152 """ 1153 return self.path / 'process.pid' 1154 1155 @property 1156 def pid_lock(self) -> 'fasteners.InterProcessLock': 1157 """ 1158 Return the process lock context manager. 1159 """ 1160 if '_pid_lock' in self.__dict__: 1161 return self._pid_lock 1162 1163 fasteners = attempt_import('fasteners') 1164 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1165 return self._pid_lock 1166 1167 @property 1168 def pickle_path(self) -> pathlib.Path: 1169 """ 1170 Return the path for the pickle file. 1171 """ 1172 return self.path / 'pickle.pkl' 1173 1174 def read_properties(self) -> Optional[Dict[str, Any]]: 1175 """Read the properties JSON file and return the dictionary.""" 1176 if not self.properties_path.exists(): 1177 return None 1178 try: 1179 with open(self.properties_path, 'r', encoding='utf-8') as file: 1180 properties = json.load(file) 1181 except Exception: 1182 properties = {} 1183 1184 return properties or {} 1185 1186 def read_pickle(self) -> Daemon: 1187 """Read a Daemon's pickle file and return the `Daemon`.""" 1188 import pickle 1189 import traceback 1190 if not self.pickle_path.exists(): 1191 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1192 1193 if self.pickle_path.stat().st_size == 0: 1194 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1195 1196 try: 1197 with open(self.pickle_path, 'rb') as pickle_file: 1198 daemon = pickle.load(pickle_file) 1199 success, msg = True, 'Success' 1200 except Exception as e: 1201 success, msg = False, str(e) 1202 daemon = None 1203 traceback.print_exception(type(e), e, e.__traceback__) 1204 if not success: 1205 error(msg) 1206 return daemon 1207 1208 @property 1209 def properties(self) -> Dict[str, Any]: 1210 """ 1211 Return the contents of the properties JSON file. 1212 """ 1213 try: 1214 _file_properties = self.read_properties() or {} 1215 except Exception: 1216 traceback.print_exc() 1217 _file_properties = {} 1218 1219 if not self._properties: 1220 self._properties = _file_properties 1221 1222 if self._properties is None: 1223 self._properties = {} 1224 1225 if ( 1226 self._properties.get('result', None) is None 1227 and _file_properties.get('result', None) is not None 1228 ): 1229 _ = self._properties.pop('result', None) 1230 1231 if _file_properties is not None: 1232 self._properties = apply_patch_to_config( 1233 _file_properties, 1234 self._properties, 1235 ) 1236 1237 return self._properties 1238 1239 @property 1240 def hidden(self) -> bool: 1241 """ 1242 Return a bool indicating whether this Daemon should be displayed. 1243 """ 1244 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1245 1246 def write_properties(self) -> SuccessTuple: 1247 """Write the properties dictionary to the properties JSON file 1248 (only if self.properties exists). 1249 """ 1250 from meerschaum.utils.misc import generate_password 1251 success, msg = ( 1252 False, 1253 f"No properties to write for daemon '{self.daemon_id}'." 1254 ) 1255 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1256 props = self.properties 1257 if props is not None: 1258 try: 1259 self.path.mkdir(parents=True, exist_ok=True) 1260 if self.properties_path.exists(): 1261 self.properties_path.rename(backup_path) 1262 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1263 json.dump(props, properties_file) 1264 success, msg = True, 'Success' 1265 except Exception as e: 1266 success, msg = False, str(e) 1267 1268 try: 1269 if backup_path.exists(): 1270 if not success: 1271 backup_path.rename(self.properties_path) 1272 else: 1273 backup_path.unlink() 1274 except Exception as e: 1275 success, msg = False, str(e) 1276 1277 return success, msg 1278 1279 def write_pickle(self) -> SuccessTuple: 1280 """Write the pickle file for the daemon.""" 1281 import pickle 1282 import traceback 1283 from meerschaum.utils.misc import generate_password 1284 1285 if not self.pickle: 1286 return True, "Success" 1287 1288 from meerschaum._internal.entry import _shells 1289 if _shells: 1290 from meerschaum._internal.shell.Shell import revert_input 1291 revert_input() 1292 1293 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1294 try: 1295 self.path.mkdir(parents=True, exist_ok=True) 1296 if self.pickle_path.exists(): 1297 self.pickle_path.rename(backup_path) 1298 with open(self.pickle_path, 'wb+') as pickle_file: 1299 pickle.dump(self, pickle_file) 1300 success, msg = True, "Success" 1301 except Exception as e: 1302 success, msg = False, str(e) 1303 traceback.print_exception(type(e), e, e.__traceback__) 1304 try: 1305 if backup_path.exists(): 1306 if not success: 1307 backup_path.rename(self.pickle_path) 1308 else: 1309 backup_path.unlink() 1310 except Exception as e: 1311 success, msg = False, str(e) 1312 return success, msg 1313 1314 1315 def _setup( 1316 self, 1317 allow_dirty_run: bool = False, 1318 ) -> None: 1319 """ 1320 Update properties before starting the Daemon. 1321 """ 1322 if self.properties is None: 1323 self._properties = {} 1324 1325 self._properties.update({ 1326 'target': { 1327 'name': self.target.__name__, 1328 'module': self.target.__module__, 1329 'args': self.target_args, 1330 'kw': self.target_kw, 1331 }, 1332 }) 1333 self.mkdir_if_not_exists(allow_dirty_run) 1334 _write_properties_success_tuple = self.write_properties() 1335 if not _write_properties_success_tuple[0]: 1336 error(_write_properties_success_tuple[1]) 1337 1338 _write_pickle_success_tuple = self.write_pickle() 1339 if not _write_pickle_success_tuple[0]: 1340 error(_write_pickle_success_tuple[1]) 1341 1342 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1343 """ 1344 Remove a daemon's directory after execution. 1345 1346 Parameters 1347 ---------- 1348 keep_logs: bool, default False 1349 If `True`, skip deleting the daemon's log files. 1350 1351 Returns 1352 ------- 1353 A `SuccessTuple` indicating success. 1354 """ 1355 if self.path.exists(): 1356 try: 1357 shutil.rmtree(self.path) 1358 except Exception as e: 1359 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1360 warn(msg) 1361 return False, msg 1362 if not keep_logs: 1363 self.rotating_log.delete() 1364 try: 1365 if self.log_offset_path.exists(): 1366 self.log_offset_path.unlink() 1367 except Exception as e: 1368 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1369 warn(msg) 1370 return False, msg 1371 return True, "Success" 1372 1373 1374 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1375 """ 1376 Return the timeout value to use. Use `--timeout-seconds` if provided, 1377 else the configured default (8). 1378 """ 1379 if isinstance(timeout, (int, float)): 1380 return timeout 1381 return get_config('jobs', 'timeout_seconds') 1382 1383 1384 def get_check_timeout_interval_seconds( 1385 self, 1386 check_timeout_interval: Union[int, float, None] = None, 1387 ) -> Union[int, float]: 1388 """ 1389 Return the interval value to check the status of timeouts. 1390 """ 1391 if isinstance(check_timeout_interval, (int, float)): 1392 return check_timeout_interval 1393 return get_config('jobs', 'check_timeout_interval_seconds') 1394 1395 @property 1396 def target_args(self) -> Union[Tuple[Any], None]: 1397 """ 1398 Return the positional arguments to pass to the target function. 1399 """ 1400 target_args = ( 1401 self.__dict__.get('_target_args', None) 1402 or self.properties.get('target', {}).get('args', None) 1403 ) 1404 if target_args is None: 1405 return tuple([]) 1406 1407 return tuple(target_args) 1408 1409 @property 1410 def target_kw(self) -> Union[Dict[str, Any], None]: 1411 """ 1412 Return the keyword arguments to pass to the target function. 1413 """ 1414 target_kw = ( 1415 self.__dict__.get('_target_kw', None) 1416 or self.properties.get('target', {}).get('kw', None) 1417 ) 1418 if target_kw is None: 1419 return {} 1420 1421 return {key: val for key, val in target_kw.items()} 1422 1423 @staticmethod 1424 def _get_target_reference(target) -> Union[Dict[str, str], None]: 1425 """ 1426 If `target` is an importable, top-level function (e.g. `entry`), return a 1427 ``{'module': ..., 'qualname': ...}`` reference so it can be re-imported in the 1428 daemon process instead of pickled by value. 1429 1430 Pickling such a target by value serializes its entire global graph, which can 1431 reach unpicklable live state (e.g. a connector caching an `_asyncio.Task`) and 1432 raise `TypeError: cannot pickle '_asyncio.Task' object`. dill also falls back to 1433 by-value pickling whenever it cannot match the function by identity — which 1434 happens when two copies of a package are importable (a stale install shadowing 1435 a venv), so `byref=True` alone is not enough. Returns `None` for closures, 1436 lambdas, and anything not importable, which fall back to dill. 1437 """ 1438 module = getattr(target, '__module__', None) 1439 qualname = getattr(target, '__qualname__', None) 1440 if not module or not qualname: 1441 return None 1442 if '<locals>' in qualname or '<lambda>' in qualname: 1443 return None 1444 return {'module': module, 'qualname': qualname} 1445 1446 @staticmethod 1447 def _load_target_reference(target_ref: Dict[str, str]): 1448 """ 1449 Re-import a target from a `{'module': ..., 'qualname': ...}` reference. 1450 """ 1451 import importlib 1452 obj = importlib.import_module(target_ref['module']) 1453 for part in target_ref['qualname'].split('.'): 1454 obj = getattr(obj, part) 1455 return obj 1456 1457 def __getstate__(self): 1458 """ 1459 Pickle this Daemon. 1460 """ 1461 dill = attempt_import('dill') 1462 state = { 1463 'target_args': self.target_args, 1464 'target_kw': self.target_kw, 1465 'daemon_id': self.daemon_id, 1466 'label': self.label, 1467 'properties': self.properties, 1468 } 1469 target_ref = self._get_target_reference(self.target) 1470 if target_ref is not None: 1471 ### Store by reference (re-imported in the daemon process), not by value. 1472 state['target'] = None 1473 state['target_ref'] = target_ref 1474 else: 1475 state['target'] = dill.dumps(self.target, byref=True) 1476 return state 1477 1478 def __setstate__(self, _state: Dict[str, Any]): 1479 """ 1480 Restore this Daemon from a pickled state. 1481 If the properties file exists, skip the old pickled version. 1482 """ 1483 dill = attempt_import('dill') 1484 target_ref = _state.pop('target_ref', None) 1485 if target_ref is not None and _state.get('target', None) is None: 1486 _state['target'] = self._load_target_reference(target_ref) 1487 else: 1488 _state['target'] = dill.loads(_state['target']) 1489 self._pickle = True 1490 daemon_id = _state.get('daemon_id', None) 1491 if not daemon_id: 1492 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1493 1494 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1495 ignore_properties = properties_path.exists() 1496 if ignore_properties: 1497 _state = { 1498 key: val 1499 for key, val in _state.items() 1500 if key != 'properties' 1501 } 1502 self.__init__(**_state) 1503 1504 1505 def __repr__(self): 1506 return str(self) 1507 1508 def __str__(self): 1509 return self.daemon_id 1510 1511 def __eq__(self, other): 1512 if not isinstance(other, Daemon): 1513 return False 1514 return self.daemon_id == other.daemon_id 1515 1516 def __hash__(self): 1517 return hash(self.daemon_id)
42class Daemon: 43 """ 44 Daemonize Python functions into background processes. 45 46 Examples 47 -------- 48 >>> import meerschaum as mrsm 49 >>> from meerschaum.utils.daemons import Daemon 50 >>> daemon = Daemon(print, ('hi',)) 51 >>> success, msg = daemon.run() 52 >>> print(daemon.log_text) 53 54 2024-07-29 18:03 | hi 55 2024-07-29 18:03 | 56 >>> daemon.run(allow_dirty_run=True) 57 >>> print(daemon.log_text) 58 59 2024-07-29 18:03 | hi 60 2024-07-29 18:03 | 61 2024-07-29 18:05 | hi 62 2024-07-29 18:05 | 63 >>> mrsm.pprint(daemon.properties) 64 { 65 'label': 'print', 66 'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}}, 67 'result': None, 68 'process': {'ended': '2024-07-29T18:03:33.752806'} 69 } 70 71 """ 72 73 def __new__( 74 cls, 75 *args, 76 daemon_id: Optional[str] = None, 77 **kw 78 ): 79 """ 80 If a daemon_id is provided and already exists, read from its pickle file. 81 """ 82 instance = super(Daemon, cls).__new__(cls) 83 if daemon_id is not None: 84 instance.daemon_id = daemon_id 85 if instance.pickle_path.exists(): 86 instance = instance.read_pickle() 87 return instance 88 89 @classmethod 90 def from_properties_file(cls, daemon_id: str) -> Daemon: 91 """ 92 Return a Daemon from a properties dictionary. 93 """ 94 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 95 if not properties_path.exists(): 96 raise OSError(f"Properties file '{properties_path}' does not exist.") 97 98 try: 99 with open(properties_path, 'r', encoding='utf-8') as f: 100 properties = json.load(f) 101 except Exception: 102 properties = {} 103 104 if not properties: 105 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 106 107 daemon_id = properties_path.parent.name 108 target_cf = properties.get('target', {}) 109 target_module_name = target_cf.get('module', None) 110 target_function_name = target_cf.get('name', None) 111 target_args = target_cf.get('args', None) 112 target_kw = target_cf.get('kw', None) 113 label = properties.get('label', None) 114 115 if None in [ 116 target_module_name, 117 target_function_name, 118 target_args, 119 target_kw, 120 ]: 121 raise ValueError("Missing target function information.") 122 123 target_module = importlib.import_module(target_module_name) 124 target_function = getattr(target_module, target_function_name) 125 126 return Daemon( 127 daemon_id=daemon_id, 128 target=target_function, 129 target_args=target_args, 130 target_kw=target_kw, 131 properties=properties, 132 label=label, 133 ) 134 135 136 def __init__( 137 self, 138 target: Optional[Callable[[Any], Any]] = None, 139 target_args: Union[List[Any], Tuple[Any], None] = None, 140 target_kw: Optional[Dict[str, Any]] = None, 141 env: Optional[Dict[str, str]] = None, 142 daemon_id: Optional[str] = None, 143 label: Optional[str] = None, 144 properties: Optional[Dict[str, Any]] = None, 145 pickle: bool = True, 146 ): 147 """ 148 Parameters 149 ---------- 150 target: Optional[Callable[[Any], Any]], default None, 151 The function to execute in a child process. 152 153 target_args: Union[List[Any], Tuple[Any], None], default None 154 Positional arguments to pass to the target function. 155 156 target_kw: Optional[Dict[str, Any]], default None 157 Keyword arguments to pass to the target function. 158 159 env: Optional[Dict[str, str]], default None 160 If provided, set these environment variables in the daemon process. 161 162 daemon_id: Optional[str], default None 163 Build a `Daemon` from an existing `daemon_id`. 164 If `daemon_id` is provided, other arguments are ignored and are derived 165 from the existing pickled `Daemon`. 166 167 label: Optional[str], default None 168 Label string to help identifiy a daemon. 169 If `None`, use the function name instead. 170 171 properties: Optional[Dict[str, Any]], default None 172 Override reading from the properties JSON by providing an existing dictionary. 173 """ 174 _pickle = self.__dict__.get('_pickle', False) 175 if daemon_id is not None: 176 self.daemon_id = daemon_id 177 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 178 179 if not self.properties_path.exists(): 180 raise Exception( 181 f"Daemon '{self.daemon_id}' does not exist. " 182 + "Pass a target to create a new Daemon." 183 ) 184 185 try: 186 new_daemon = self.from_properties_file(daemon_id) 187 except Exception: 188 new_daemon = None 189 190 if new_daemon is not None: 191 new_daemon.write_pickle() 192 target = new_daemon.target 193 target_args = new_daemon.target_args 194 target_kw = new_daemon.target_kw 195 label = new_daemon.label 196 self._properties = new_daemon.properties 197 else: 198 try: 199 self.properties_path.unlink() 200 except Exception: 201 pass 202 203 raise Exception( 204 f"Could not recover daemon '{self.daemon_id}' " 205 + "from its properties file." 206 ) 207 208 if 'target' not in self.__dict__: 209 if target is None: 210 error("Cannot create a Daemon without a target.") 211 self.target = target 212 213 self.pickle = pickle 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 elif label is not None: 229 self.label = label 230 231 if 'daemon_id' not in self.__dict__: 232 self.daemon_id = get_new_daemon_name() 233 if '_properties' not in self.__dict__: 234 self._properties = properties 235 elif properties: 236 if self._properties is None: 237 self._properties = {} 238 self._properties.update(properties) 239 if self._properties is None: 240 self._properties = {} 241 242 self._properties.update({'label': self.label}) 243 if env: 244 self._properties.update({'env': env}) 245 246 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 247 _ = self.process 248 249 250 def _run_exit( 251 self, 252 keep_daemon_output: bool = True, 253 allow_dirty_run: bool = False, 254 ) -> Any: 255 """Run the daemon's target function. 256 NOTE: This WILL EXIT the parent process! 257 258 Parameters 259 ---------- 260 keep_daemon_output: bool, default True 261 If `False`, delete the daemon's output directory upon exiting. 262 263 allow_dirty_run, bool, default False: 264 If `True`, run the daemon, even if the `daemon_id` directory exists. 265 This option is dangerous because if the same `daemon_id` runs twice, 266 the last to finish will overwrite the output of the first. 267 268 Returns 269 ------- 270 Nothing — this will exit the parent process. 271 """ 272 import platform 273 import sys 274 import os 275 import traceback 276 from meerschaum.utils.warnings import warn 277 from meerschaum.config import get_config 278 daemon = attempt_import('daemon') 279 lines = get_config('jobs', 'terminal', 'lines') 280 columns = get_config('jobs', 'terminal', 'columns') 281 282 if platform.system() == 'Windows': 283 return False, "Windows is no longer supported." 284 285 self._setup(allow_dirty_run) 286 287 _daemons.append(self) 288 289 logs_cf = self.properties.get('logs', {}) 290 log_refresh_seconds = logs_cf.get('refresh_files_seconds', None) 291 if log_refresh_seconds is None: 292 log_refresh_seconds = get_config('jobs', 'logs', 'refresh_files_seconds') 293 write_timestamps = logs_cf.get('write_timestamps', None) 294 if write_timestamps is None: 295 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 296 297 self._log_refresh_timer = RepeatTimer( 298 log_refresh_seconds, 299 partial(self.rotating_log.refresh_files, start_interception=write_timestamps), 300 ) 301 302 capture_stdin = logs_cf.get('stdin', True) 303 cwd = self.properties.get('cwd', os.getcwd()) 304 305 ### NOTE: The SIGINT handler has been removed so that child processes may handle 306 ### KeyboardInterrupts themselves. 307 ### The previous aggressive approach was redundant because of the SIGTERM handler. 308 self._daemon_context = daemon.DaemonContext( 309 pidfile=self.pid_lock, 310 stdout=self.rotating_log, 311 stderr=self.rotating_log, 312 stdin=(self.stdin_file if capture_stdin else None), 313 working_directory=cwd, 314 detach_process=True, 315 files_preserve=list(self.rotating_log.subfile_objects.values()), 316 signal_map={ 317 signal.SIGTERM: self._handle_sigterm, 318 }, 319 ) 320 321 if capture_stdin and sys.stdin is None: 322 raise OSError("Cannot daemonize without stdin.") 323 324 try: 325 os.environ['LINES'], os.environ['COLUMNS'] = str(int(lines)), str(int(columns)) 326 with self._daemon_context: 327 if capture_stdin: 328 sys.stdin = self.stdin_file 329 _ = os.environ.pop(STATIC_CONFIG['environment']['systemd_stdin_path'], None) 330 os.environ[STATIC_CONFIG['environment']['daemon_id']] = self.daemon_id 331 os.environ['PYTHONUNBUFFERED'] = '1' 332 333 ### Allow the user to override environment variables. 334 env = self.properties.get('env', {}) 335 if env and isinstance(env, dict): 336 os.environ.update({str(k): str(v) for k, v in env.items()}) 337 338 self.rotating_log.refresh_files(start_interception=True) 339 result = None 340 try: 341 with open(self.pid_path, 'w+', encoding='utf-8') as f: 342 f.write(str(os.getpid())) 343 344 ### NOTE: The timer fails to start for remote actions to localhost. 345 try: 346 if not self._log_refresh_timer.is_running(): 347 self._log_refresh_timer.start() 348 except Exception: 349 pass 350 351 self.properties['result'] = None 352 self._capture_process_timestamp('began') 353 result = self.target(*self.target_args, **self.target_kw) 354 self.properties['result'] = result 355 except (BrokenPipeError, KeyboardInterrupt, SystemExit): 356 result = False, traceback.format_exc() 357 except Exception as e: 358 warn( 359 f"Exception in daemon target function: {traceback.format_exc()}", 360 ) 361 result = False, str(e) 362 finally: 363 _results[self.daemon_id] = result 364 self.properties['result'] = result 365 366 if keep_daemon_output: 367 self._capture_process_timestamp('ended') 368 else: 369 self.cleanup() 370 371 self._log_refresh_timer.cancel() 372 try: 373 if self.pid is None and self.pid_path.exists(): 374 self.pid_path.unlink() 375 except Exception: 376 pass 377 378 if is_success_tuple(result): 379 try: 380 mrsm.pprint(result) 381 except BrokenPipeError: 382 pass 383 384 except Exception: 385 daemon_error = traceback.format_exc() 386 import meerschaum.config.paths as paths 387 with open(paths.DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 388 f.write( 389 f"Error in Daemon '{self}':\n\n" 390 f"{sys.stdin=}\n" 391 f"{self.stdin_file_path=}\n" 392 f"{self.stdin_file_path.exists()=}\n\n" 393 f"{daemon_error}\n\n" 394 ) 395 warn(f"Encountered an error while running the daemon '{self}':\n{daemon_error}") 396 397 def _capture_process_timestamp( 398 self, 399 process_key: str, 400 write_properties: bool = True, 401 ) -> None: 402 """ 403 Record the current timestamp to the parameters `process:<process_key>`. 404 405 Parameters 406 ---------- 407 process_key: str 408 Under which key to store the timestamp. 409 410 write_properties: bool, default True 411 If `True` persist the properties to disk immediately after capturing the timestamp. 412 """ 413 if 'process' not in self.properties: 414 self.properties['process'] = {} 415 416 if process_key not in ('began', 'ended', 'paused', 'stopped'): 417 raise ValueError(f"Invalid key '{process_key}'.") 418 419 self.properties['process'][process_key] = ( 420 datetime.now(timezone.utc).replace(tzinfo=None).isoformat() 421 ) 422 if write_properties: 423 self.write_properties() 424 425 def run( 426 self, 427 keep_daemon_output: bool = True, 428 allow_dirty_run: bool = False, 429 wait: bool = False, 430 timeout: Union[int, float] = 4, 431 debug: bool = False, 432 ) -> SuccessTuple: 433 """Run the daemon as a child process and continue executing the parent. 434 435 Parameters 436 ---------- 437 keep_daemon_output: bool, default True 438 If `False`, delete the daemon's output directory upon exiting. 439 440 allow_dirty_run: bool, default False 441 If `True`, run the daemon, even if the `daemon_id` directory exists. 442 This option is dangerous because if the same `daemon_id` runs concurrently, 443 the last to finish will overwrite the output of the first. 444 445 wait: bool, default True 446 If `True`, block until `Daemon.status` is running (or the timeout expires). 447 448 timeout: Union[int, float], default 4 449 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 450 451 Returns 452 ------- 453 A SuccessTuple indicating success. 454 455 """ 456 import platform 457 if platform.system() == 'Windows': 458 return False, "Cannot run background jobs on Windows." 459 460 ### The daemon might exist and be paused. 461 if self.status == 'paused': 462 return self.resume() 463 464 self._remove_stop_file() 465 if self.status == 'running': 466 return True, f"Daemon '{self}' is already running." 467 468 self.mkdir_if_not_exists(allow_dirty_run) 469 _write_pickle_success_tuple = self.write_pickle() 470 if not _write_pickle_success_tuple[0]: 471 return _write_pickle_success_tuple 472 473 _launch_daemon_code = ( 474 "from meerschaum.utils.daemon import Daemon, _daemons; " 475 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 476 f"_daemons['{self.daemon_id}'] = daemon; " 477 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 478 "allow_dirty_run=True)" 479 ) 480 env = dict(os.environ) 481 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 482 msg = ( 483 "Success" 484 if _launch_success_bool 485 else f"Failed to start daemon '{self.daemon_id}'." 486 ) 487 if not wait or not _launch_success_bool: 488 return _launch_success_bool, msg 489 490 timeout = self.get_timeout_seconds(timeout) 491 check_timeout_interval = self.get_check_timeout_interval_seconds() 492 493 if not timeout: 494 success = self.status == 'running' 495 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 496 if success: 497 self._capture_process_timestamp('began') 498 return success, msg 499 500 begin = time.perf_counter() 501 while (time.perf_counter() - begin) < timeout: 502 if self.status == 'running': 503 self._capture_process_timestamp('began') 504 return True, "Success" 505 time.sleep(check_timeout_interval) 506 507 return False, ( 508 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 509 + ('s' if timeout != 1 else '') + '.' 510 ) 511 512 513 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 514 """ 515 Forcibly terminate a running daemon. 516 Sends a SIGTERM signal to the process. 517 518 Parameters 519 ---------- 520 timeout: Optional[int], default 3 521 How many seconds to wait for the process to terminate. 522 523 Returns 524 ------- 525 A SuccessTuple indicating success. 526 """ 527 if self.status != 'paused': 528 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 529 if success: 530 self._write_stop_file('kill') 531 self.stdin_file.close() 532 self._remove_blocking_stdin_file() 533 return success, msg 534 535 if self.status == 'stopped': 536 self._write_stop_file('kill') 537 self.stdin_file.close() 538 self._remove_blocking_stdin_file() 539 return True, "Process has already stopped." 540 541 psutil = attempt_import('psutil') 542 process = self.process 543 try: 544 process.terminate() 545 process.kill() 546 process.wait(timeout=timeout) 547 except Exception as e: 548 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 549 550 try: 551 if process.status(): 552 return False, "Failed to stop daemon '{self}' ({process})." 553 except psutil.NoSuchProcess: 554 pass 555 556 if self.pid_path.exists(): 557 try: 558 self.pid_path.unlink() 559 except Exception: 560 pass 561 562 self._write_stop_file('kill') 563 self.stdin_file.close() 564 self._remove_blocking_stdin_file() 565 return True, "Success" 566 567 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 568 """Gracefully quit a running daemon.""" 569 if self.status == 'paused': 570 return self.kill(timeout) 571 572 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 573 if signal_success: 574 self._write_stop_file('quit') 575 self.stdin_file.close() 576 self._remove_blocking_stdin_file() 577 return signal_success, signal_msg 578 579 def pause( 580 self, 581 timeout: Union[int, float, None] = None, 582 check_timeout_interval: Union[float, int, None] = None, 583 ) -> SuccessTuple: 584 """ 585 Pause the daemon if it is running. 586 587 Parameters 588 ---------- 589 timeout: Union[float, int, None], default None 590 The maximum number of seconds to wait for a process to suspend. 591 592 check_timeout_interval: Union[float, int, None], default None 593 The number of seconds to wait between checking if the process is still running. 594 595 Returns 596 ------- 597 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 598 """ 599 self._remove_blocking_stdin_file() 600 601 if self.process is None: 602 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 603 604 if self.status == 'paused': 605 return True, f"Daemon '{self.daemon_id}' is already paused." 606 607 self._write_stop_file('pause') 608 self.stdin_file.close() 609 self._remove_blocking_stdin_file() 610 try: 611 self.process.suspend() 612 except Exception as e: 613 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 614 615 timeout = self.get_timeout_seconds(timeout) 616 check_timeout_interval = self.get_check_timeout_interval_seconds( 617 check_timeout_interval 618 ) 619 620 psutil = attempt_import('psutil') 621 622 if not timeout: 623 try: 624 success = self.process.status() == 'stopped' 625 except psutil.NoSuchProcess: 626 success = True 627 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 628 if success: 629 self._capture_process_timestamp('paused') 630 return success, msg 631 632 begin = time.perf_counter() 633 while (time.perf_counter() - begin) < timeout: 634 try: 635 if self.process.status() == 'stopped': 636 self._capture_process_timestamp('paused') 637 return True, "Success" 638 except psutil.NoSuchProcess as e: 639 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 640 time.sleep(check_timeout_interval) 641 642 return False, ( 643 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 644 + ('s' if timeout != 1 else '') + '.' 645 ) 646 647 def resume( 648 self, 649 timeout: Union[int, float, None] = None, 650 check_timeout_interval: Union[float, int, None] = None, 651 ) -> SuccessTuple: 652 """ 653 Resume the daemon if it is paused. 654 655 Parameters 656 ---------- 657 timeout: Union[float, int, None], default None 658 The maximum number of seconds to wait for a process to resume. 659 660 check_timeout_interval: Union[float, int, None], default None 661 The number of seconds to wait between checking if the process is still stopped. 662 663 Returns 664 ------- 665 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 666 """ 667 if self.status == 'running': 668 return True, f"Daemon '{self.daemon_id}' is already running." 669 670 if self.status == 'stopped': 671 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 672 673 self._remove_stop_file() 674 try: 675 if self.process is None: 676 return False, f"Cannot resume daemon '{self.daemon_id}'." 677 678 self.process.resume() 679 except Exception as e: 680 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 681 682 timeout = self.get_timeout_seconds(timeout) 683 check_timeout_interval = self.get_check_timeout_interval_seconds( 684 check_timeout_interval 685 ) 686 687 if not timeout: 688 success = self.status == 'running' 689 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 690 if success: 691 self._capture_process_timestamp('began') 692 return success, msg 693 694 begin = time.perf_counter() 695 while (time.perf_counter() - begin) < timeout: 696 if self.status == 'running': 697 self._capture_process_timestamp('began') 698 return True, "Success" 699 time.sleep(check_timeout_interval) 700 701 return False, ( 702 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 703 + ('s' if timeout != 1 else '') + '.' 704 ) 705 706 def _write_stop_file(self, action: str) -> SuccessTuple: 707 """Write the stop file timestamp and action.""" 708 if action not in ('quit', 'kill', 'pause'): 709 return False, f"Unsupported action '{action}'." 710 711 if not self.stop_path.parent.exists(): 712 self.stop_path.parent.mkdir(parents=True, exist_ok=True) 713 714 with open(self.stop_path, 'w+', encoding='utf-8') as f: 715 json.dump( 716 { 717 'stop_time': datetime.now(timezone.utc).isoformat(), 718 'action': action, 719 }, 720 f 721 ) 722 723 return True, "Success" 724 725 def _remove_stop_file(self) -> SuccessTuple: 726 """Remove the stop file""" 727 if not self.stop_path.exists(): 728 return True, "Stop file does not exist." 729 730 try: 731 self.stop_path.unlink() 732 except Exception as e: 733 return False, f"Failed to remove stop file:\n{e}" 734 735 return True, "Success" 736 737 def _read_stop_file(self) -> Dict[str, Any]: 738 """ 739 Read the stop file if it exists. 740 """ 741 if not self.stop_path.exists(): 742 return {} 743 744 try: 745 with open(self.stop_path, 'r', encoding='utf-8') as f: 746 data = json.load(f) 747 return data 748 except Exception: 749 return {} 750 751 def _remove_blocking_stdin_file(self) -> mrsm.SuccessTuple: 752 """ 753 Remove the blocking STDIN file if it exists. 754 """ 755 try: 756 if self.blocking_stdin_file_path.exists(): 757 self.blocking_stdin_file_path.unlink() 758 except Exception as e: 759 return False, str(e) 760 761 return True, "Success" 762 763 def _handle_sigterm(self, signal_number: int, stack_frame: 'frame') -> None: 764 """ 765 Handle `SIGTERM` within the `Daemon` context. 766 This method is injected into the `DaemonContext`. 767 """ 768 from meerschaum.utils.process import signal_handler 769 from meerschaum.utils.threading import request_stop, interrupt_threads 770 signal_handler(signal_number, stack_frame) 771 772 ### Tell cooperative loops (e.g. `sync pipes`) to stop, then actively unwind 773 ### any worker threads so they cannot keep the process alive as a zombie. 774 request_stop() 775 interrupt_threads(SystemExit) 776 777 timer = self.__dict__.get('_log_refresh_timer', None) 778 if timer is not None: 779 timer.cancel() 780 781 daemon_context = self.__dict__.get('_daemon_context', None) 782 if daemon_context is not None: 783 daemon_context.close() 784 785 _close_pools() 786 raise SystemExit(0) 787 788 def _send_signal( 789 self, 790 signal_to_send, 791 timeout: Union[float, int, None] = None, 792 check_timeout_interval: Union[float, int, None] = None, 793 ) -> SuccessTuple: 794 """Send a signal to the daemon process. 795 796 Parameters 797 ---------- 798 signal_to_send: 799 The signal the send to the daemon, e.g. `signals.SIGINT`. 800 801 timeout: Union[float, int, None], default None 802 The maximum number of seconds to wait for a process to terminate. 803 804 check_timeout_interval: Union[float, int, None], default None 805 The number of seconds to wait between checking if the process is still running. 806 807 Returns 808 ------- 809 A SuccessTuple indicating success. 810 """ 811 try: 812 pid = self.pid 813 if pid is None: 814 return ( 815 False, 816 f"Daemon '{self.daemon_id}' is not running, " 817 + f"cannot send signal '{signal_to_send}'." 818 ) 819 820 os.kill(pid, signal_to_send) 821 except Exception: 822 return False, f"Failed to send signal {signal_to_send}:\n{traceback.format_exc()}" 823 824 timeout = self.get_timeout_seconds(timeout) 825 check_timeout_interval = self.get_check_timeout_interval_seconds( 826 check_timeout_interval 827 ) 828 829 if not timeout: 830 return True, f"Successfully sent '{signal_to_send}' to daemon '{self.daemon_id}'." 831 832 begin = time.perf_counter() 833 while (time.perf_counter() - begin) < timeout: 834 if not self.status == 'running': 835 return True, "Success" 836 time.sleep(check_timeout_interval) 837 838 return False, ( 839 f"Failed to stop daemon '{self.daemon_id}' (PID: {pid}) within {timeout} second" 840 + ('s' if timeout != 1 else '') + '.' 841 ) 842 843 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 844 """Create the Daemon's directory. 845 If `allow_dirty_run` is `False` and the directory already exists, 846 raise a `FileExistsError`. 847 """ 848 try: 849 self.path.mkdir(parents=True, exist_ok=True) 850 _already_exists = any(os.scandir(self.path)) 851 except FileExistsError: 852 _already_exists = True 853 854 if _already_exists and not allow_dirty_run: 855 error( 856 f"Daemon '{self.daemon_id}' already exists. " + 857 "To allow this daemon to run, do one of the following:\n" 858 + " - Execute `daemon.cleanup()`.\n" 859 + f" - Delete the directory '{self.path}'.\n" 860 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 861 FileExistsError, 862 ) 863 864 @property 865 def process(self) -> Union['psutil.Process', None]: 866 """ 867 Return the psutil process for the Daemon. 868 """ 869 psutil = attempt_import('psutil') 870 pid = self.pid 871 if pid is None: 872 return None 873 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 874 try: 875 self._process = psutil.Process(int(pid)) 876 process_exists = True 877 except Exception: 878 process_exists = False 879 if not process_exists: 880 _ = self.__dict__.pop('_process', None) 881 try: 882 if self.pid_path.exists(): 883 self.pid_path.unlink() 884 except Exception: 885 pass 886 return None 887 return self._process 888 889 @property 890 def status(self) -> str: 891 """ 892 Return the running status of this Daemon. 893 """ 894 if self.process is None: 895 return 'stopped' 896 897 psutil = attempt_import('psutil', lazy=False) 898 try: 899 if self.process.status() == 'stopped': 900 return 'paused' 901 if self.process.status() == 'zombie': 902 raise psutil.NoSuchProcess(self.process.pid) 903 except (psutil.NoSuchProcess, AttributeError): 904 if self.pid_path.exists(): 905 try: 906 self.pid_path.unlink() 907 except Exception: 908 pass 909 return 'stopped' 910 911 return 'running' 912 913 @classmethod 914 def _get_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 915 """ 916 Return a Daemon's path from its `daemon_id`. 917 """ 918 import meerschaum.config.paths as paths 919 return paths.DAEMON_RESOURCES_PATH / daemon_id 920 921 @property 922 def path(self) -> pathlib.Path: 923 """ 924 Return the path for this Daemon's directory. 925 """ 926 return self._get_path_from_daemon_id(self.daemon_id) 927 928 @classmethod 929 def _get_properties_path_from_daemon_id(cls, daemon_id: str) -> pathlib.Path: 930 """ 931 Return the `properties.json` path for a given `daemon_id`. 932 """ 933 return cls._get_path_from_daemon_id(daemon_id) / 'properties.json' 934 935 @property 936 def properties_path(self) -> pathlib.Path: 937 """ 938 Return the `propterties.json` path for this Daemon. 939 """ 940 return self._get_properties_path_from_daemon_id(self.daemon_id) 941 942 @property 943 def stop_path(self) -> pathlib.Path: 944 """ 945 Return the path for the stop file (created when manually stopped). 946 """ 947 return self.path / '.stop.json' 948 949 @property 950 def log_path(self) -> pathlib.Path: 951 """ 952 Return the log path. 953 """ 954 logs_cf = self.properties.get('logs', None) or {} 955 if 'path' not in logs_cf: 956 import meerschaum.config.paths as paths 957 return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 958 959 return pathlib.Path(logs_cf['path']) 960 961 @property 962 def stdin_file_path(self) -> pathlib.Path: 963 """ 964 Return the stdin file path. 965 """ 966 return self.path / 'input.stdin' 967 968 @property 969 def blocking_stdin_file_path(self) -> pathlib.Path: 970 """ 971 Return the stdin file path. 972 """ 973 if '_blocking_stdin_file_path' in self.__dict__: 974 return self._blocking_stdin_file_path 975 976 return self.path / 'input.stdin.block' 977 978 @property 979 def prompt_kwargs_file_path(self) -> pathlib.Path: 980 """ 981 Return the file path to the kwargs for the invoking `prompt()`. 982 """ 983 return self.path / 'prompt_kwargs.json' 984 985 @property 986 def log_offset_path(self) -> pathlib.Path: 987 """ 988 Return the log offset file path. 989 """ 990 import meerschaum.config.paths as paths 991 return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset') 992 993 @property 994 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 995 """ 996 Return the process lock context manager. 997 """ 998 if '_log_offset_lock' in self.__dict__: 999 return self._log_offset_lock 1000 1001 fasteners = attempt_import('fasteners') 1002 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 1003 return self._log_offset_lock 1004 1005 @property 1006 def rotating_log(self) -> RotatingFile: 1007 """ 1008 The rotating log file for the daemon's output. 1009 """ 1010 if '_rotating_log' in self.__dict__: 1011 return self._rotating_log 1012 1013 logs_cf = self.properties.get('logs', None) or {} 1014 write_timestamps = logs_cf.get('write_timestamps', None) 1015 if write_timestamps is None: 1016 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1017 1018 timestamp_format = logs_cf.get('timestamp_format', None) 1019 if timestamp_format is None: 1020 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1021 1022 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1023 if num_files_to_keep is None: 1024 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1025 1026 max_file_size = logs_cf.get('max_file_size', None) 1027 if max_file_size is None: 1028 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1029 1030 redirect_streams = logs_cf.get('redirect_streams', True) 1031 1032 self._rotating_log = RotatingFile( 1033 self.log_path, 1034 redirect_streams=redirect_streams, 1035 write_timestamps=write_timestamps, 1036 timestamp_format=timestamp_format, 1037 num_files_to_keep=num_files_to_keep, 1038 max_file_size=max_file_size, 1039 ) 1040 return self._rotating_log 1041 1042 @property 1043 def stdin_file(self): 1044 """ 1045 Return the file handler for the stdin file. 1046 """ 1047 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1048 return _stdin_file 1049 1050 self._stdin_file = StdinFile( 1051 self.stdin_file_path, 1052 lock_file_path=self.blocking_stdin_file_path, 1053 ) 1054 return self._stdin_file 1055 1056 @property 1057 def log_text(self) -> Union[str, None]: 1058 """ 1059 Read the log files and return their contents. 1060 Returns `None` if the log file does not exist. 1061 """ 1062 logs_cf = self.properties.get('logs', None) or {} 1063 write_timestamps = logs_cf.get('write_timestamps', None) 1064 if write_timestamps is None: 1065 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1066 1067 timestamp_format = logs_cf.get('timestamp_format', None) 1068 if timestamp_format is None: 1069 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1070 1071 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1072 if num_files_to_keep is None: 1073 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1074 1075 max_file_size = logs_cf.get('max_file_size', None) 1076 if max_file_size is None: 1077 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1078 1079 new_rotating_log = RotatingFile( 1080 self.rotating_log.file_path, 1081 num_files_to_keep=num_files_to_keep, 1082 max_file_size=max_file_size, 1083 write_timestamps=write_timestamps, 1084 timestamp_format=timestamp_format, 1085 ) 1086 return new_rotating_log.read() 1087 1088 def readlines(self) -> List[str]: 1089 """ 1090 Read the next log lines, persisting the cursor for later use. 1091 Note this will alter the cursor of `self.rotating_log`. 1092 """ 1093 self.rotating_log._cursor = self._read_log_offset() 1094 lines = self.rotating_log.readlines() 1095 self._write_log_offset() 1096 return lines 1097 1098 def _read_log_offset(self) -> Tuple[int, int]: 1099 """ 1100 Return the current log offset cursor. 1101 1102 Returns 1103 ------- 1104 A tuple of the form (`subfile_index`, `position`). 1105 """ 1106 if not self.log_offset_path.exists(): 1107 return 0, 0 1108 1109 try: 1110 with open(self.log_offset_path, 'r', encoding='utf-8') as f: 1111 cursor_text = f.read() 1112 cursor_parts = cursor_text.split(' ') 1113 subfile_index, subfile_position = int(cursor_parts[0]), int(cursor_parts[1]) 1114 return subfile_index, subfile_position 1115 except Exception as e: 1116 warn(f"Failed to read cursor:\n{e}") 1117 return 0, 0 1118 1119 def _write_log_offset(self) -> None: 1120 """ 1121 Write the current log offset file. 1122 """ 1123 with self.log_offset_lock: 1124 with open(self.log_offset_path, 'w+', encoding='utf-8') as f: 1125 subfile_index = self.rotating_log._cursor[0] 1126 subfile_position = self.rotating_log._cursor[1] 1127 f.write(f"{subfile_index} {subfile_position}") 1128 1129 @property 1130 def pid(self) -> Union[int, None]: 1131 """ 1132 Read the PID file and return its contents. 1133 Returns `None` if the PID file does not exist. 1134 """ 1135 if not self.pid_path.exists(): 1136 return None 1137 try: 1138 with open(self.pid_path, 'r', encoding='utf-8') as f: 1139 text = f.read() 1140 if len(text) == 0: 1141 return None 1142 pid = int(text.rstrip()) 1143 except Exception as e: 1144 warn(e) 1145 text = None 1146 pid = None 1147 return pid 1148 1149 @property 1150 def pid_path(self) -> pathlib.Path: 1151 """ 1152 Return the path to a file containing the PID for this Daemon. 1153 """ 1154 return self.path / 'process.pid' 1155 1156 @property 1157 def pid_lock(self) -> 'fasteners.InterProcessLock': 1158 """ 1159 Return the process lock context manager. 1160 """ 1161 if '_pid_lock' in self.__dict__: 1162 return self._pid_lock 1163 1164 fasteners = attempt_import('fasteners') 1165 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1166 return self._pid_lock 1167 1168 @property 1169 def pickle_path(self) -> pathlib.Path: 1170 """ 1171 Return the path for the pickle file. 1172 """ 1173 return self.path / 'pickle.pkl' 1174 1175 def read_properties(self) -> Optional[Dict[str, Any]]: 1176 """Read the properties JSON file and return the dictionary.""" 1177 if not self.properties_path.exists(): 1178 return None 1179 try: 1180 with open(self.properties_path, 'r', encoding='utf-8') as file: 1181 properties = json.load(file) 1182 except Exception: 1183 properties = {} 1184 1185 return properties or {} 1186 1187 def read_pickle(self) -> Daemon: 1188 """Read a Daemon's pickle file and return the `Daemon`.""" 1189 import pickle 1190 import traceback 1191 if not self.pickle_path.exists(): 1192 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1193 1194 if self.pickle_path.stat().st_size == 0: 1195 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1196 1197 try: 1198 with open(self.pickle_path, 'rb') as pickle_file: 1199 daemon = pickle.load(pickle_file) 1200 success, msg = True, 'Success' 1201 except Exception as e: 1202 success, msg = False, str(e) 1203 daemon = None 1204 traceback.print_exception(type(e), e, e.__traceback__) 1205 if not success: 1206 error(msg) 1207 return daemon 1208 1209 @property 1210 def properties(self) -> Dict[str, Any]: 1211 """ 1212 Return the contents of the properties JSON file. 1213 """ 1214 try: 1215 _file_properties = self.read_properties() or {} 1216 except Exception: 1217 traceback.print_exc() 1218 _file_properties = {} 1219 1220 if not self._properties: 1221 self._properties = _file_properties 1222 1223 if self._properties is None: 1224 self._properties = {} 1225 1226 if ( 1227 self._properties.get('result', None) is None 1228 and _file_properties.get('result', None) is not None 1229 ): 1230 _ = self._properties.pop('result', None) 1231 1232 if _file_properties is not None: 1233 self._properties = apply_patch_to_config( 1234 _file_properties, 1235 self._properties, 1236 ) 1237 1238 return self._properties 1239 1240 @property 1241 def hidden(self) -> bool: 1242 """ 1243 Return a bool indicating whether this Daemon should be displayed. 1244 """ 1245 return self.daemon_id.startswith('_') or self.daemon_id.startswith('.') 1246 1247 def write_properties(self) -> SuccessTuple: 1248 """Write the properties dictionary to the properties JSON file 1249 (only if self.properties exists). 1250 """ 1251 from meerschaum.utils.misc import generate_password 1252 success, msg = ( 1253 False, 1254 f"No properties to write for daemon '{self.daemon_id}'." 1255 ) 1256 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1257 props = self.properties 1258 if props is not None: 1259 try: 1260 self.path.mkdir(parents=True, exist_ok=True) 1261 if self.properties_path.exists(): 1262 self.properties_path.rename(backup_path) 1263 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1264 json.dump(props, properties_file) 1265 success, msg = True, 'Success' 1266 except Exception as e: 1267 success, msg = False, str(e) 1268 1269 try: 1270 if backup_path.exists(): 1271 if not success: 1272 backup_path.rename(self.properties_path) 1273 else: 1274 backup_path.unlink() 1275 except Exception as e: 1276 success, msg = False, str(e) 1277 1278 return success, msg 1279 1280 def write_pickle(self) -> SuccessTuple: 1281 """Write the pickle file for the daemon.""" 1282 import pickle 1283 import traceback 1284 from meerschaum.utils.misc import generate_password 1285 1286 if not self.pickle: 1287 return True, "Success" 1288 1289 from meerschaum._internal.entry import _shells 1290 if _shells: 1291 from meerschaum._internal.shell.Shell import revert_input 1292 revert_input() 1293 1294 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1295 try: 1296 self.path.mkdir(parents=True, exist_ok=True) 1297 if self.pickle_path.exists(): 1298 self.pickle_path.rename(backup_path) 1299 with open(self.pickle_path, 'wb+') as pickle_file: 1300 pickle.dump(self, pickle_file) 1301 success, msg = True, "Success" 1302 except Exception as e: 1303 success, msg = False, str(e) 1304 traceback.print_exception(type(e), e, e.__traceback__) 1305 try: 1306 if backup_path.exists(): 1307 if not success: 1308 backup_path.rename(self.pickle_path) 1309 else: 1310 backup_path.unlink() 1311 except Exception as e: 1312 success, msg = False, str(e) 1313 return success, msg 1314 1315 1316 def _setup( 1317 self, 1318 allow_dirty_run: bool = False, 1319 ) -> None: 1320 """ 1321 Update properties before starting the Daemon. 1322 """ 1323 if self.properties is None: 1324 self._properties = {} 1325 1326 self._properties.update({ 1327 'target': { 1328 'name': self.target.__name__, 1329 'module': self.target.__module__, 1330 'args': self.target_args, 1331 'kw': self.target_kw, 1332 }, 1333 }) 1334 self.mkdir_if_not_exists(allow_dirty_run) 1335 _write_properties_success_tuple = self.write_properties() 1336 if not _write_properties_success_tuple[0]: 1337 error(_write_properties_success_tuple[1]) 1338 1339 _write_pickle_success_tuple = self.write_pickle() 1340 if not _write_pickle_success_tuple[0]: 1341 error(_write_pickle_success_tuple[1]) 1342 1343 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1344 """ 1345 Remove a daemon's directory after execution. 1346 1347 Parameters 1348 ---------- 1349 keep_logs: bool, default False 1350 If `True`, skip deleting the daemon's log files. 1351 1352 Returns 1353 ------- 1354 A `SuccessTuple` indicating success. 1355 """ 1356 if self.path.exists(): 1357 try: 1358 shutil.rmtree(self.path) 1359 except Exception as e: 1360 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1361 warn(msg) 1362 return False, msg 1363 if not keep_logs: 1364 self.rotating_log.delete() 1365 try: 1366 if self.log_offset_path.exists(): 1367 self.log_offset_path.unlink() 1368 except Exception as e: 1369 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1370 warn(msg) 1371 return False, msg 1372 return True, "Success" 1373 1374 1375 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1376 """ 1377 Return the timeout value to use. Use `--timeout-seconds` if provided, 1378 else the configured default (8). 1379 """ 1380 if isinstance(timeout, (int, float)): 1381 return timeout 1382 return get_config('jobs', 'timeout_seconds') 1383 1384 1385 def get_check_timeout_interval_seconds( 1386 self, 1387 check_timeout_interval: Union[int, float, None] = None, 1388 ) -> Union[int, float]: 1389 """ 1390 Return the interval value to check the status of timeouts. 1391 """ 1392 if isinstance(check_timeout_interval, (int, float)): 1393 return check_timeout_interval 1394 return get_config('jobs', 'check_timeout_interval_seconds') 1395 1396 @property 1397 def target_args(self) -> Union[Tuple[Any], None]: 1398 """ 1399 Return the positional arguments to pass to the target function. 1400 """ 1401 target_args = ( 1402 self.__dict__.get('_target_args', None) 1403 or self.properties.get('target', {}).get('args', None) 1404 ) 1405 if target_args is None: 1406 return tuple([]) 1407 1408 return tuple(target_args) 1409 1410 @property 1411 def target_kw(self) -> Union[Dict[str, Any], None]: 1412 """ 1413 Return the keyword arguments to pass to the target function. 1414 """ 1415 target_kw = ( 1416 self.__dict__.get('_target_kw', None) 1417 or self.properties.get('target', {}).get('kw', None) 1418 ) 1419 if target_kw is None: 1420 return {} 1421 1422 return {key: val for key, val in target_kw.items()} 1423 1424 @staticmethod 1425 def _get_target_reference(target) -> Union[Dict[str, str], None]: 1426 """ 1427 If `target` is an importable, top-level function (e.g. `entry`), return a 1428 ``{'module': ..., 'qualname': ...}`` reference so it can be re-imported in the 1429 daemon process instead of pickled by value. 1430 1431 Pickling such a target by value serializes its entire global graph, which can 1432 reach unpicklable live state (e.g. a connector caching an `_asyncio.Task`) and 1433 raise `TypeError: cannot pickle '_asyncio.Task' object`. dill also falls back to 1434 by-value pickling whenever it cannot match the function by identity — which 1435 happens when two copies of a package are importable (a stale install shadowing 1436 a venv), so `byref=True` alone is not enough. Returns `None` for closures, 1437 lambdas, and anything not importable, which fall back to dill. 1438 """ 1439 module = getattr(target, '__module__', None) 1440 qualname = getattr(target, '__qualname__', None) 1441 if not module or not qualname: 1442 return None 1443 if '<locals>' in qualname or '<lambda>' in qualname: 1444 return None 1445 return {'module': module, 'qualname': qualname} 1446 1447 @staticmethod 1448 def _load_target_reference(target_ref: Dict[str, str]): 1449 """ 1450 Re-import a target from a `{'module': ..., 'qualname': ...}` reference. 1451 """ 1452 import importlib 1453 obj = importlib.import_module(target_ref['module']) 1454 for part in target_ref['qualname'].split('.'): 1455 obj = getattr(obj, part) 1456 return obj 1457 1458 def __getstate__(self): 1459 """ 1460 Pickle this Daemon. 1461 """ 1462 dill = attempt_import('dill') 1463 state = { 1464 'target_args': self.target_args, 1465 'target_kw': self.target_kw, 1466 'daemon_id': self.daemon_id, 1467 'label': self.label, 1468 'properties': self.properties, 1469 } 1470 target_ref = self._get_target_reference(self.target) 1471 if target_ref is not None: 1472 ### Store by reference (re-imported in the daemon process), not by value. 1473 state['target'] = None 1474 state['target_ref'] = target_ref 1475 else: 1476 state['target'] = dill.dumps(self.target, byref=True) 1477 return state 1478 1479 def __setstate__(self, _state: Dict[str, Any]): 1480 """ 1481 Restore this Daemon from a pickled state. 1482 If the properties file exists, skip the old pickled version. 1483 """ 1484 dill = attempt_import('dill') 1485 target_ref = _state.pop('target_ref', None) 1486 if target_ref is not None and _state.get('target', None) is None: 1487 _state['target'] = self._load_target_reference(target_ref) 1488 else: 1489 _state['target'] = dill.loads(_state['target']) 1490 self._pickle = True 1491 daemon_id = _state.get('daemon_id', None) 1492 if not daemon_id: 1493 raise ValueError("Need a daemon_id to un-pickle a Daemon.") 1494 1495 properties_path = self._get_properties_path_from_daemon_id(daemon_id) 1496 ignore_properties = properties_path.exists() 1497 if ignore_properties: 1498 _state = { 1499 key: val 1500 for key, val in _state.items() 1501 if key != 'properties' 1502 } 1503 self.__init__(**_state) 1504 1505 1506 def __repr__(self): 1507 return str(self) 1508 1509 def __str__(self): 1510 return self.daemon_id 1511 1512 def __eq__(self, other): 1513 if not isinstance(other, Daemon): 1514 return False 1515 return self.daemon_id == other.daemon_id 1516 1517 def __hash__(self): 1518 return hash(self.daemon_id)
Daemonize Python functions into background processes.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.utils.daemons import Daemon
>>> daemon = Daemon(print, ('hi',))
>>> success, msg = daemon.run()
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 |
>>> daemon.run(allow_dirty_run=True)
>>> print(daemon.log_text)
2024-07-29 18:03 | hi 2024-07-29 18:03 | 2024-07-29 18:05 | hi 2024-07-29 18:05 |
>>> mrsm.pprint(daemon.properties)
{
'label': 'print',
'target': {'name': 'print', 'module': 'builtins', 'args': ['hi'], 'kw': {}},
'result': None,
'process': {'ended': '2024-07-29T18:03:33.752806'}
}
136 def __init__( 137 self, 138 target: Optional[Callable[[Any], Any]] = None, 139 target_args: Union[List[Any], Tuple[Any], None] = None, 140 target_kw: Optional[Dict[str, Any]] = None, 141 env: Optional[Dict[str, str]] = None, 142 daemon_id: Optional[str] = None, 143 label: Optional[str] = None, 144 properties: Optional[Dict[str, Any]] = None, 145 pickle: bool = True, 146 ): 147 """ 148 Parameters 149 ---------- 150 target: Optional[Callable[[Any], Any]], default None, 151 The function to execute in a child process. 152 153 target_args: Union[List[Any], Tuple[Any], None], default None 154 Positional arguments to pass to the target function. 155 156 target_kw: Optional[Dict[str, Any]], default None 157 Keyword arguments to pass to the target function. 158 159 env: Optional[Dict[str, str]], default None 160 If provided, set these environment variables in the daemon process. 161 162 daemon_id: Optional[str], default None 163 Build a `Daemon` from an existing `daemon_id`. 164 If `daemon_id` is provided, other arguments are ignored and are derived 165 from the existing pickled `Daemon`. 166 167 label: Optional[str], default None 168 Label string to help identifiy a daemon. 169 If `None`, use the function name instead. 170 171 properties: Optional[Dict[str, Any]], default None 172 Override reading from the properties JSON by providing an existing dictionary. 173 """ 174 _pickle = self.__dict__.get('_pickle', False) 175 if daemon_id is not None: 176 self.daemon_id = daemon_id 177 if not self.pickle_path.exists() and not target and ('target' not in self.__dict__): 178 179 if not self.properties_path.exists(): 180 raise Exception( 181 f"Daemon '{self.daemon_id}' does not exist. " 182 + "Pass a target to create a new Daemon." 183 ) 184 185 try: 186 new_daemon = self.from_properties_file(daemon_id) 187 except Exception: 188 new_daemon = None 189 190 if new_daemon is not None: 191 new_daemon.write_pickle() 192 target = new_daemon.target 193 target_args = new_daemon.target_args 194 target_kw = new_daemon.target_kw 195 label = new_daemon.label 196 self._properties = new_daemon.properties 197 else: 198 try: 199 self.properties_path.unlink() 200 except Exception: 201 pass 202 203 raise Exception( 204 f"Could not recover daemon '{self.daemon_id}' " 205 + "from its properties file." 206 ) 207 208 if 'target' not in self.__dict__: 209 if target is None: 210 error("Cannot create a Daemon without a target.") 211 self.target = target 212 213 self.pickle = pickle 214 215 ### NOTE: We have to check self.__dict__ in case we un-pickling. 216 if '_target_args' not in self.__dict__: 217 self._target_args = target_args 218 if '_target_kw' not in self.__dict__: 219 self._target_kw = target_kw 220 221 if 'label' not in self.__dict__: 222 if label is None: 223 label = ( 224 self.target.__name__ if '__name__' in self.target.__dir__() 225 else str(self.target) 226 ) 227 self.label = label 228 elif label is not None: 229 self.label = label 230 231 if 'daemon_id' not in self.__dict__: 232 self.daemon_id = get_new_daemon_name() 233 if '_properties' not in self.__dict__: 234 self._properties = properties 235 elif properties: 236 if self._properties is None: 237 self._properties = {} 238 self._properties.update(properties) 239 if self._properties is None: 240 self._properties = {} 241 242 self._properties.update({'label': self.label}) 243 if env: 244 self._properties.update({'env': env}) 245 246 ### Instantiate the process and if it doesn't exist, make sure the PID is removed. 247 _ = self.process
Parameters
- target (Optional[Callable[[Any], Any]], default None,): The function to execute in a child process.
- target_args (Union[List[Any], Tuple[Any], None], default None): Positional arguments to pass to the target function.
- target_kw (Optional[Dict[str, Any]], default None): Keyword arguments to pass to the target function.
- env (Optional[Dict[str, str]], default None): If provided, set these environment variables in the daemon process.
- daemon_id (Optional[str], default None):
Build a
Daemonfrom an existingdaemon_id. Ifdaemon_idis provided, other arguments are ignored and are derived from the existing pickledDaemon. - label (Optional[str], default None):
Label string to help identifiy a daemon.
If
None, use the function name instead. - properties (Optional[Dict[str, Any]], default None): Override reading from the properties JSON by providing an existing dictionary.
89 @classmethod 90 def from_properties_file(cls, daemon_id: str) -> Daemon: 91 """ 92 Return a Daemon from a properties dictionary. 93 """ 94 properties_path = cls._get_properties_path_from_daemon_id(daemon_id) 95 if not properties_path.exists(): 96 raise OSError(f"Properties file '{properties_path}' does not exist.") 97 98 try: 99 with open(properties_path, 'r', encoding='utf-8') as f: 100 properties = json.load(f) 101 except Exception: 102 properties = {} 103 104 if not properties: 105 raise ValueError(f"No properties could be read for daemon '{daemon_id}'.") 106 107 daemon_id = properties_path.parent.name 108 target_cf = properties.get('target', {}) 109 target_module_name = target_cf.get('module', None) 110 target_function_name = target_cf.get('name', None) 111 target_args = target_cf.get('args', None) 112 target_kw = target_cf.get('kw', None) 113 label = properties.get('label', None) 114 115 if None in [ 116 target_module_name, 117 target_function_name, 118 target_args, 119 target_kw, 120 ]: 121 raise ValueError("Missing target function information.") 122 123 target_module = importlib.import_module(target_module_name) 124 target_function = getattr(target_module, target_function_name) 125 126 return Daemon( 127 daemon_id=daemon_id, 128 target=target_function, 129 target_args=target_args, 130 target_kw=target_kw, 131 properties=properties, 132 label=label, 133 )
Return a Daemon from a properties dictionary.
425 def run( 426 self, 427 keep_daemon_output: bool = True, 428 allow_dirty_run: bool = False, 429 wait: bool = False, 430 timeout: Union[int, float] = 4, 431 debug: bool = False, 432 ) -> SuccessTuple: 433 """Run the daemon as a child process and continue executing the parent. 434 435 Parameters 436 ---------- 437 keep_daemon_output: bool, default True 438 If `False`, delete the daemon's output directory upon exiting. 439 440 allow_dirty_run: bool, default False 441 If `True`, run the daemon, even if the `daemon_id` directory exists. 442 This option is dangerous because if the same `daemon_id` runs concurrently, 443 the last to finish will overwrite the output of the first. 444 445 wait: bool, default True 446 If `True`, block until `Daemon.status` is running (or the timeout expires). 447 448 timeout: Union[int, float], default 4 449 If `wait` is `True`, block for up to `timeout` seconds before returning a failure. 450 451 Returns 452 ------- 453 A SuccessTuple indicating success. 454 455 """ 456 import platform 457 if platform.system() == 'Windows': 458 return False, "Cannot run background jobs on Windows." 459 460 ### The daemon might exist and be paused. 461 if self.status == 'paused': 462 return self.resume() 463 464 self._remove_stop_file() 465 if self.status == 'running': 466 return True, f"Daemon '{self}' is already running." 467 468 self.mkdir_if_not_exists(allow_dirty_run) 469 _write_pickle_success_tuple = self.write_pickle() 470 if not _write_pickle_success_tuple[0]: 471 return _write_pickle_success_tuple 472 473 _launch_daemon_code = ( 474 "from meerschaum.utils.daemon import Daemon, _daemons; " 475 f"daemon = Daemon(daemon_id='{self.daemon_id}'); " 476 f"_daemons['{self.daemon_id}'] = daemon; " 477 f"daemon._run_exit(keep_daemon_output={keep_daemon_output}, " 478 "allow_dirty_run=True)" 479 ) 480 env = dict(os.environ) 481 _launch_success_bool = venv_exec(_launch_daemon_code, debug=debug, venv=None, env=env) 482 msg = ( 483 "Success" 484 if _launch_success_bool 485 else f"Failed to start daemon '{self.daemon_id}'." 486 ) 487 if not wait or not _launch_success_bool: 488 return _launch_success_bool, msg 489 490 timeout = self.get_timeout_seconds(timeout) 491 check_timeout_interval = self.get_check_timeout_interval_seconds() 492 493 if not timeout: 494 success = self.status == 'running' 495 msg = "Success" if success else f"Failed to run daemon '{self.daemon_id}'." 496 if success: 497 self._capture_process_timestamp('began') 498 return success, msg 499 500 begin = time.perf_counter() 501 while (time.perf_counter() - begin) < timeout: 502 if self.status == 'running': 503 self._capture_process_timestamp('began') 504 return True, "Success" 505 time.sleep(check_timeout_interval) 506 507 return False, ( 508 f"Failed to start daemon '{self.daemon_id}' within {timeout} second" 509 + ('s' if timeout != 1 else '') + '.' 510 )
Run the daemon as a child process and continue executing the parent.
Parameters
- keep_daemon_output (bool, default True):
If
False, delete the daemon's output directory upon exiting. - allow_dirty_run (bool, default False):
If
True, run the daemon, even if thedaemon_iddirectory exists. This option is dangerous because if the samedaemon_idruns concurrently, the last to finish will overwrite the output of the first. - wait (bool, default True):
If
True, block untilDaemon.statusis running (or the timeout expires). - timeout (Union[int, float], default 4):
If
waitisTrue, block for up totimeoutseconds before returning a failure.
Returns
- A SuccessTuple indicating success.
513 def kill(self, timeout: Union[int, float, None] = 8) -> SuccessTuple: 514 """ 515 Forcibly terminate a running daemon. 516 Sends a SIGTERM signal to the process. 517 518 Parameters 519 ---------- 520 timeout: Optional[int], default 3 521 How many seconds to wait for the process to terminate. 522 523 Returns 524 ------- 525 A SuccessTuple indicating success. 526 """ 527 if self.status != 'paused': 528 success, msg = self._send_signal(signal.SIGTERM, timeout=timeout) 529 if success: 530 self._write_stop_file('kill') 531 self.stdin_file.close() 532 self._remove_blocking_stdin_file() 533 return success, msg 534 535 if self.status == 'stopped': 536 self._write_stop_file('kill') 537 self.stdin_file.close() 538 self._remove_blocking_stdin_file() 539 return True, "Process has already stopped." 540 541 psutil = attempt_import('psutil') 542 process = self.process 543 try: 544 process.terminate() 545 process.kill() 546 process.wait(timeout=timeout) 547 except Exception as e: 548 return False, f"Failed to kill job {self} ({process}) with exception: {e}" 549 550 try: 551 if process.status(): 552 return False, "Failed to stop daemon '{self}' ({process})." 553 except psutil.NoSuchProcess: 554 pass 555 556 if self.pid_path.exists(): 557 try: 558 self.pid_path.unlink() 559 except Exception: 560 pass 561 562 self._write_stop_file('kill') 563 self.stdin_file.close() 564 self._remove_blocking_stdin_file() 565 return True, "Success"
Forcibly terminate a running daemon. Sends a SIGTERM signal to the process.
Parameters
- timeout (Optional[int], default 3): How many seconds to wait for the process to terminate.
Returns
- A SuccessTuple indicating success.
567 def quit(self, timeout: Union[int, float, None] = None) -> SuccessTuple: 568 """Gracefully quit a running daemon.""" 569 if self.status == 'paused': 570 return self.kill(timeout) 571 572 signal_success, signal_msg = self._send_signal(signal.SIGINT, timeout=timeout) 573 if signal_success: 574 self._write_stop_file('quit') 575 self.stdin_file.close() 576 self._remove_blocking_stdin_file() 577 return signal_success, signal_msg
Gracefully quit a running daemon.
579 def pause( 580 self, 581 timeout: Union[int, float, None] = None, 582 check_timeout_interval: Union[float, int, None] = None, 583 ) -> SuccessTuple: 584 """ 585 Pause the daemon if it is running. 586 587 Parameters 588 ---------- 589 timeout: Union[float, int, None], default None 590 The maximum number of seconds to wait for a process to suspend. 591 592 check_timeout_interval: Union[float, int, None], default None 593 The number of seconds to wait between checking if the process is still running. 594 595 Returns 596 ------- 597 A `SuccessTuple` indicating whether the `Daemon` process was successfully suspended. 598 """ 599 self._remove_blocking_stdin_file() 600 601 if self.process is None: 602 return False, f"Daemon '{self.daemon_id}' is not running and cannot be paused." 603 604 if self.status == 'paused': 605 return True, f"Daemon '{self.daemon_id}' is already paused." 606 607 self._write_stop_file('pause') 608 self.stdin_file.close() 609 self._remove_blocking_stdin_file() 610 try: 611 self.process.suspend() 612 except Exception as e: 613 return False, f"Failed to pause daemon '{self.daemon_id}':\n{e}" 614 615 timeout = self.get_timeout_seconds(timeout) 616 check_timeout_interval = self.get_check_timeout_interval_seconds( 617 check_timeout_interval 618 ) 619 620 psutil = attempt_import('psutil') 621 622 if not timeout: 623 try: 624 success = self.process.status() == 'stopped' 625 except psutil.NoSuchProcess: 626 success = True 627 msg = "Success" if success else f"Failed to suspend daemon '{self.daemon_id}'." 628 if success: 629 self._capture_process_timestamp('paused') 630 return success, msg 631 632 begin = time.perf_counter() 633 while (time.perf_counter() - begin) < timeout: 634 try: 635 if self.process.status() == 'stopped': 636 self._capture_process_timestamp('paused') 637 return True, "Success" 638 except psutil.NoSuchProcess as e: 639 return False, f"Process exited unexpectedly. Was it killed?\n{e}" 640 time.sleep(check_timeout_interval) 641 642 return False, ( 643 f"Failed to pause daemon '{self.daemon_id}' within {timeout} second" 644 + ('s' if timeout != 1 else '') + '.' 645 )
Pause the daemon if it is running.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to suspend.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still running.
Returns
- A
SuccessTupleindicating whether theDaemonprocess was successfully suspended.
647 def resume( 648 self, 649 timeout: Union[int, float, None] = None, 650 check_timeout_interval: Union[float, int, None] = None, 651 ) -> SuccessTuple: 652 """ 653 Resume the daemon if it is paused. 654 655 Parameters 656 ---------- 657 timeout: Union[float, int, None], default None 658 The maximum number of seconds to wait for a process to resume. 659 660 check_timeout_interval: Union[float, int, None], default None 661 The number of seconds to wait between checking if the process is still stopped. 662 663 Returns 664 ------- 665 A `SuccessTuple` indicating whether the `Daemon` process was successfully resumed. 666 """ 667 if self.status == 'running': 668 return True, f"Daemon '{self.daemon_id}' is already running." 669 670 if self.status == 'stopped': 671 return False, f"Daemon '{self.daemon_id}' is stopped and cannot be resumed." 672 673 self._remove_stop_file() 674 try: 675 if self.process is None: 676 return False, f"Cannot resume daemon '{self.daemon_id}'." 677 678 self.process.resume() 679 except Exception as e: 680 return False, f"Failed to resume daemon '{self.daemon_id}':\n{e}" 681 682 timeout = self.get_timeout_seconds(timeout) 683 check_timeout_interval = self.get_check_timeout_interval_seconds( 684 check_timeout_interval 685 ) 686 687 if not timeout: 688 success = self.status == 'running' 689 msg = "Success" if success else f"Failed to resume daemon '{self.daemon_id}'." 690 if success: 691 self._capture_process_timestamp('began') 692 return success, msg 693 694 begin = time.perf_counter() 695 while (time.perf_counter() - begin) < timeout: 696 if self.status == 'running': 697 self._capture_process_timestamp('began') 698 return True, "Success" 699 time.sleep(check_timeout_interval) 700 701 return False, ( 702 f"Failed to resume daemon '{self.daemon_id}' within {timeout} second" 703 + ('s' if timeout != 1 else '') + '.' 704 )
Resume the daemon if it is paused.
Parameters
- timeout (Union[float, int, None], default None): The maximum number of seconds to wait for a process to resume.
- check_timeout_interval (Union[float, int, None], default None): The number of seconds to wait between checking if the process is still stopped.
Returns
- A
SuccessTupleindicating whether theDaemonprocess was successfully resumed.
843 def mkdir_if_not_exists(self, allow_dirty_run: bool = False): 844 """Create the Daemon's directory. 845 If `allow_dirty_run` is `False` and the directory already exists, 846 raise a `FileExistsError`. 847 """ 848 try: 849 self.path.mkdir(parents=True, exist_ok=True) 850 _already_exists = any(os.scandir(self.path)) 851 except FileExistsError: 852 _already_exists = True 853 854 if _already_exists and not allow_dirty_run: 855 error( 856 f"Daemon '{self.daemon_id}' already exists. " + 857 "To allow this daemon to run, do one of the following:\n" 858 + " - Execute `daemon.cleanup()`.\n" 859 + f" - Delete the directory '{self.path}'.\n" 860 + " - Pass `allow_dirty_run=True` to `daemon.run()`.\n", 861 FileExistsError, 862 )
Create the Daemon's directory.
If allow_dirty_run is False and the directory already exists,
raise a FileExistsError.
864 @property 865 def process(self) -> Union['psutil.Process', None]: 866 """ 867 Return the psutil process for the Daemon. 868 """ 869 psutil = attempt_import('psutil') 870 pid = self.pid 871 if pid is None: 872 return None 873 if '_process' not in self.__dict__ or self.__dict__['_process'].pid != int(pid): 874 try: 875 self._process = psutil.Process(int(pid)) 876 process_exists = True 877 except Exception: 878 process_exists = False 879 if not process_exists: 880 _ = self.__dict__.pop('_process', None) 881 try: 882 if self.pid_path.exists(): 883 self.pid_path.unlink() 884 except Exception: 885 pass 886 return None 887 return self._process
Return the psutil process for the Daemon.
889 @property 890 def status(self) -> str: 891 """ 892 Return the running status of this Daemon. 893 """ 894 if self.process is None: 895 return 'stopped' 896 897 psutil = attempt_import('psutil', lazy=False) 898 try: 899 if self.process.status() == 'stopped': 900 return 'paused' 901 if self.process.status() == 'zombie': 902 raise psutil.NoSuchProcess(self.process.pid) 903 except (psutil.NoSuchProcess, AttributeError): 904 if self.pid_path.exists(): 905 try: 906 self.pid_path.unlink() 907 except Exception: 908 pass 909 return 'stopped' 910 911 return 'running'
Return the running status of this Daemon.
921 @property 922 def path(self) -> pathlib.Path: 923 """ 924 Return the path for this Daemon's directory. 925 """ 926 return self._get_path_from_daemon_id(self.daemon_id)
Return the path for this Daemon's directory.
935 @property 936 def properties_path(self) -> pathlib.Path: 937 """ 938 Return the `propterties.json` path for this Daemon. 939 """ 940 return self._get_properties_path_from_daemon_id(self.daemon_id)
Return the propterties.json path for this Daemon.
942 @property 943 def stop_path(self) -> pathlib.Path: 944 """ 945 Return the path for the stop file (created when manually stopped). 946 """ 947 return self.path / '.stop.json'
Return the path for the stop file (created when manually stopped).
949 @property 950 def log_path(self) -> pathlib.Path: 951 """ 952 Return the log path. 953 """ 954 logs_cf = self.properties.get('logs', None) or {} 955 if 'path' not in logs_cf: 956 import meerschaum.config.paths as paths 957 return paths.LOGS_RESOURCES_PATH / (self.daemon_id + '.log') 958 959 return pathlib.Path(logs_cf['path'])
Return the log path.
961 @property 962 def stdin_file_path(self) -> pathlib.Path: 963 """ 964 Return the stdin file path. 965 """ 966 return self.path / 'input.stdin'
Return the stdin file path.
968 @property 969 def blocking_stdin_file_path(self) -> pathlib.Path: 970 """ 971 Return the stdin file path. 972 """ 973 if '_blocking_stdin_file_path' in self.__dict__: 974 return self._blocking_stdin_file_path 975 976 return self.path / 'input.stdin.block'
Return the stdin file path.
978 @property 979 def prompt_kwargs_file_path(self) -> pathlib.Path: 980 """ 981 Return the file path to the kwargs for the invoking `prompt()`. 982 """ 983 return self.path / 'prompt_kwargs.json'
Return the file path to the kwargs for the invoking prompt().
985 @property 986 def log_offset_path(self) -> pathlib.Path: 987 """ 988 Return the log offset file path. 989 """ 990 import meerschaum.config.paths as paths 991 return paths.LOGS_RESOURCES_PATH / ('.' + self.daemon_id + '.log.offset')
Return the log offset file path.
993 @property 994 def log_offset_lock(self) -> 'fasteners.InterProcessLock': 995 """ 996 Return the process lock context manager. 997 """ 998 if '_log_offset_lock' in self.__dict__: 999 return self._log_offset_lock 1000 1001 fasteners = attempt_import('fasteners') 1002 self._log_offset_lock = fasteners.InterProcessLock(self.log_offset_path) 1003 return self._log_offset_lock
Return the process lock context manager.
1005 @property 1006 def rotating_log(self) -> RotatingFile: 1007 """ 1008 The rotating log file for the daemon's output. 1009 """ 1010 if '_rotating_log' in self.__dict__: 1011 return self._rotating_log 1012 1013 logs_cf = self.properties.get('logs', None) or {} 1014 write_timestamps = logs_cf.get('write_timestamps', None) 1015 if write_timestamps is None: 1016 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1017 1018 timestamp_format = logs_cf.get('timestamp_format', None) 1019 if timestamp_format is None: 1020 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1021 1022 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1023 if num_files_to_keep is None: 1024 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1025 1026 max_file_size = logs_cf.get('max_file_size', None) 1027 if max_file_size is None: 1028 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1029 1030 redirect_streams = logs_cf.get('redirect_streams', True) 1031 1032 self._rotating_log = RotatingFile( 1033 self.log_path, 1034 redirect_streams=redirect_streams, 1035 write_timestamps=write_timestamps, 1036 timestamp_format=timestamp_format, 1037 num_files_to_keep=num_files_to_keep, 1038 max_file_size=max_file_size, 1039 ) 1040 return self._rotating_log
The rotating log file for the daemon's output.
1042 @property 1043 def stdin_file(self): 1044 """ 1045 Return the file handler for the stdin file. 1046 """ 1047 if (_stdin_file := self.__dict__.get('_stdin_file', None)): 1048 return _stdin_file 1049 1050 self._stdin_file = StdinFile( 1051 self.stdin_file_path, 1052 lock_file_path=self.blocking_stdin_file_path, 1053 ) 1054 return self._stdin_file
Return the file handler for the stdin file.
1056 @property 1057 def log_text(self) -> Union[str, None]: 1058 """ 1059 Read the log files and return their contents. 1060 Returns `None` if the log file does not exist. 1061 """ 1062 logs_cf = self.properties.get('logs', None) or {} 1063 write_timestamps = logs_cf.get('write_timestamps', None) 1064 if write_timestamps is None: 1065 write_timestamps = get_config('jobs', 'logs', 'timestamps', 'enabled') 1066 1067 timestamp_format = logs_cf.get('timestamp_format', None) 1068 if timestamp_format is None: 1069 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 1070 1071 num_files_to_keep = logs_cf.get('num_files_to_keep', None) 1072 if num_files_to_keep is None: 1073 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 1074 1075 max_file_size = logs_cf.get('max_file_size', None) 1076 if max_file_size is None: 1077 max_file_size = get_config('jobs', 'logs', 'max_file_size') 1078 1079 new_rotating_log = RotatingFile( 1080 self.rotating_log.file_path, 1081 num_files_to_keep=num_files_to_keep, 1082 max_file_size=max_file_size, 1083 write_timestamps=write_timestamps, 1084 timestamp_format=timestamp_format, 1085 ) 1086 return new_rotating_log.read()
Read the log files and return their contents.
Returns None if the log file does not exist.
1088 def readlines(self) -> List[str]: 1089 """ 1090 Read the next log lines, persisting the cursor for later use. 1091 Note this will alter the cursor of `self.rotating_log`. 1092 """ 1093 self.rotating_log._cursor = self._read_log_offset() 1094 lines = self.rotating_log.readlines() 1095 self._write_log_offset() 1096 return lines
Read the next log lines, persisting the cursor for later use.
Note this will alter the cursor of self.rotating_log.
1129 @property 1130 def pid(self) -> Union[int, None]: 1131 """ 1132 Read the PID file and return its contents. 1133 Returns `None` if the PID file does not exist. 1134 """ 1135 if not self.pid_path.exists(): 1136 return None 1137 try: 1138 with open(self.pid_path, 'r', encoding='utf-8') as f: 1139 text = f.read() 1140 if len(text) == 0: 1141 return None 1142 pid = int(text.rstrip()) 1143 except Exception as e: 1144 warn(e) 1145 text = None 1146 pid = None 1147 return pid
Read the PID file and return its contents.
Returns None if the PID file does not exist.
1149 @property 1150 def pid_path(self) -> pathlib.Path: 1151 """ 1152 Return the path to a file containing the PID for this Daemon. 1153 """ 1154 return self.path / 'process.pid'
Return the path to a file containing the PID for this Daemon.
1156 @property 1157 def pid_lock(self) -> 'fasteners.InterProcessLock': 1158 """ 1159 Return the process lock context manager. 1160 """ 1161 if '_pid_lock' in self.__dict__: 1162 return self._pid_lock 1163 1164 fasteners = attempt_import('fasteners') 1165 self._pid_lock = fasteners.InterProcessLock(self.pid_path) 1166 return self._pid_lock
Return the process lock context manager.
1168 @property 1169 def pickle_path(self) -> pathlib.Path: 1170 """ 1171 Return the path for the pickle file. 1172 """ 1173 return self.path / 'pickle.pkl'
Return the path for the pickle file.
1175 def read_properties(self) -> Optional[Dict[str, Any]]: 1176 """Read the properties JSON file and return the dictionary.""" 1177 if not self.properties_path.exists(): 1178 return None 1179 try: 1180 with open(self.properties_path, 'r', encoding='utf-8') as file: 1181 properties = json.load(file) 1182 except Exception: 1183 properties = {} 1184 1185 return properties or {}
Read the properties JSON file and return the dictionary.
1187 def read_pickle(self) -> Daemon: 1188 """Read a Daemon's pickle file and return the `Daemon`.""" 1189 import pickle 1190 import traceback 1191 if not self.pickle_path.exists(): 1192 error(f"Pickle file does not exist for daemon '{self.daemon_id}'.") 1193 1194 if self.pickle_path.stat().st_size == 0: 1195 error(f"Pickle was empty for daemon '{self.daemon_id}'.") 1196 1197 try: 1198 with open(self.pickle_path, 'rb') as pickle_file: 1199 daemon = pickle.load(pickle_file) 1200 success, msg = True, 'Success' 1201 except Exception as e: 1202 success, msg = False, str(e) 1203 daemon = None 1204 traceback.print_exception(type(e), e, e.__traceback__) 1205 if not success: 1206 error(msg) 1207 return daemon
Read a Daemon's pickle file and return the Daemon.
1209 @property 1210 def properties(self) -> Dict[str, Any]: 1211 """ 1212 Return the contents of the properties JSON file. 1213 """ 1214 try: 1215 _file_properties = self.read_properties() or {} 1216 except Exception: 1217 traceback.print_exc() 1218 _file_properties = {} 1219 1220 if not self._properties: 1221 self._properties = _file_properties 1222 1223 if self._properties is None: 1224 self._properties = {} 1225 1226 if ( 1227 self._properties.get('result', None) is None 1228 and _file_properties.get('result', None) is not None 1229 ): 1230 _ = self._properties.pop('result', None) 1231 1232 if _file_properties is not None: 1233 self._properties = apply_patch_to_config( 1234 _file_properties, 1235 self._properties, 1236 ) 1237 1238 return self._properties
Return the contents of the properties JSON file.
1247 def write_properties(self) -> SuccessTuple: 1248 """Write the properties dictionary to the properties JSON file 1249 (only if self.properties exists). 1250 """ 1251 from meerschaum.utils.misc import generate_password 1252 success, msg = ( 1253 False, 1254 f"No properties to write for daemon '{self.daemon_id}'." 1255 ) 1256 backup_path = self.properties_path.parent / (generate_password(8) + '.json') 1257 props = self.properties 1258 if props is not None: 1259 try: 1260 self.path.mkdir(parents=True, exist_ok=True) 1261 if self.properties_path.exists(): 1262 self.properties_path.rename(backup_path) 1263 with open(self.properties_path, 'w+', encoding='utf-8') as properties_file: 1264 json.dump(props, properties_file) 1265 success, msg = True, 'Success' 1266 except Exception as e: 1267 success, msg = False, str(e) 1268 1269 try: 1270 if backup_path.exists(): 1271 if not success: 1272 backup_path.rename(self.properties_path) 1273 else: 1274 backup_path.unlink() 1275 except Exception as e: 1276 success, msg = False, str(e) 1277 1278 return success, msg
Write the properties dictionary to the properties JSON file (only if self.properties exists).
1280 def write_pickle(self) -> SuccessTuple: 1281 """Write the pickle file for the daemon.""" 1282 import pickle 1283 import traceback 1284 from meerschaum.utils.misc import generate_password 1285 1286 if not self.pickle: 1287 return True, "Success" 1288 1289 from meerschaum._internal.entry import _shells 1290 if _shells: 1291 from meerschaum._internal.shell.Shell import revert_input 1292 revert_input() 1293 1294 backup_path = self.pickle_path.parent / (generate_password(7) + '.pkl') 1295 try: 1296 self.path.mkdir(parents=True, exist_ok=True) 1297 if self.pickle_path.exists(): 1298 self.pickle_path.rename(backup_path) 1299 with open(self.pickle_path, 'wb+') as pickle_file: 1300 pickle.dump(self, pickle_file) 1301 success, msg = True, "Success" 1302 except Exception as e: 1303 success, msg = False, str(e) 1304 traceback.print_exception(type(e), e, e.__traceback__) 1305 try: 1306 if backup_path.exists(): 1307 if not success: 1308 backup_path.rename(self.pickle_path) 1309 else: 1310 backup_path.unlink() 1311 except Exception as e: 1312 success, msg = False, str(e) 1313 return success, msg
Write the pickle file for the daemon.
1343 def cleanup(self, keep_logs: bool = False) -> SuccessTuple: 1344 """ 1345 Remove a daemon's directory after execution. 1346 1347 Parameters 1348 ---------- 1349 keep_logs: bool, default False 1350 If `True`, skip deleting the daemon's log files. 1351 1352 Returns 1353 ------- 1354 A `SuccessTuple` indicating success. 1355 """ 1356 if self.path.exists(): 1357 try: 1358 shutil.rmtree(self.path) 1359 except Exception as e: 1360 msg = f"Failed to clean up '{self.daemon_id}':\n{e}" 1361 warn(msg) 1362 return False, msg 1363 if not keep_logs: 1364 self.rotating_log.delete() 1365 try: 1366 if self.log_offset_path.exists(): 1367 self.log_offset_path.unlink() 1368 except Exception as e: 1369 msg = f"Failed to remove offset file for '{self.daemon_id}':\n{e}" 1370 warn(msg) 1371 return False, msg 1372 return True, "Success"
Remove a daemon's directory after execution.
Parameters
- keep_logs (bool, default False):
If
True, skip deleting the daemon's log files.
Returns
- A
SuccessTupleindicating success.
1375 def get_timeout_seconds(self, timeout: Union[int, float, None] = None) -> Union[int, float]: 1376 """ 1377 Return the timeout value to use. Use `--timeout-seconds` if provided, 1378 else the configured default (8). 1379 """ 1380 if isinstance(timeout, (int, float)): 1381 return timeout 1382 return get_config('jobs', 'timeout_seconds')
Return the timeout value to use. Use --timeout-seconds if provided,
else the configured default (8).
1385 def get_check_timeout_interval_seconds( 1386 self, 1387 check_timeout_interval: Union[int, float, None] = None, 1388 ) -> Union[int, float]: 1389 """ 1390 Return the interval value to check the status of timeouts. 1391 """ 1392 if isinstance(check_timeout_interval, (int, float)): 1393 return check_timeout_interval 1394 return get_config('jobs', 'check_timeout_interval_seconds')
Return the interval value to check the status of timeouts.
1396 @property 1397 def target_args(self) -> Union[Tuple[Any], None]: 1398 """ 1399 Return the positional arguments to pass to the target function. 1400 """ 1401 target_args = ( 1402 self.__dict__.get('_target_args', None) 1403 or self.properties.get('target', {}).get('args', None) 1404 ) 1405 if target_args is None: 1406 return tuple([]) 1407 1408 return tuple(target_args)
Return the positional arguments to pass to the target function.
1410 @property 1411 def target_kw(self) -> Union[Dict[str, Any], None]: 1412 """ 1413 Return the keyword arguments to pass to the target function. 1414 """ 1415 target_kw = ( 1416 self.__dict__.get('_target_kw', None) 1417 or self.properties.get('target', {}).get('kw', None) 1418 ) 1419 if target_kw is None: 1420 return {} 1421 1422 return {key: val for key, val in target_kw.items()}
Return the keyword arguments to pass to the target function.