meerschaum.jobs.systemd
Manage meerschaum.jobs.Job via systemd.
1#! /usr/bin/env python3 2# vim:fenc=utf-8 3 4""" 5Manage `meerschaum.jobs.Job` via `systemd`. 6""" 7 8import os 9import pathlib 10import shlex 11import sys 12import asyncio 13import json 14import time 15import shutil 16from datetime import datetime, timezone 17from functools import partial 18 19import meerschaum as mrsm 20from meerschaum.jobs import Job, Executor, make_executor 21from meerschaum.utils.typing import Dict, Any, List, SuccessTuple, Union, Optional 22from meerschaum.config import get_config 23from meerschaum._internal.static import STATIC_CONFIG 24from meerschaum.utils.warnings import warn, dprint 25 26JOB_METADATA_CACHE_SECONDS: int = STATIC_CONFIG['api']['jobs']['metadata_cache_seconds'] 27 28 29@make_executor 30class SystemdExecutor(Executor): 31 """ 32 Execute Meerschaum jobs via `systemd`. 33 """ 34 35 def get_job_names(self, debug: bool = False) -> List[str]: 36 """ 37 Return a list of existing jobs, including hidden ones. 38 """ 39 import meerschaum.config.paths as paths 40 return [ 41 service_name[len('mrsm-'):(-1 * len('.service'))] 42 for service_name in os.listdir(paths.SYSTEMD_USER_RESOURCES_PATH) 43 if ( 44 service_name.startswith('mrsm-') 45 and service_name.endswith('.service') 46 ### Check for broken symlinks. 47 and (paths.SYSTEMD_USER_RESOURCES_PATH / service_name).exists() 48 ) 49 ] 50 51 def get_job_exists(self, name: str, debug: bool = False) -> bool: 52 """ 53 Return whether a job exists. 54 """ 55 user_services = self.get_job_names(debug=debug) 56 if debug: 57 dprint(f'Existing services: {user_services}') 58 return name in user_services 59 60 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 61 """ 62 Return a dictionary of `systemd` Jobs (including hidden jobs). 63 """ 64 user_services = self.get_job_names(debug=debug) 65 jobs = { 66 name: Job(name, executor_keys=str(self)) 67 for name in user_services 68 } 69 return { 70 name: job 71 for name, job in jobs.items() 72 } 73 74 def get_service_name(self, name: str, debug: bool = False) -> str: 75 """ 76 Return a job's service name. 77 """ 78 return f"mrsm-{name.replace(' ', '-')}.service" 79 80 def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path: 81 """ 82 Return the path for the job's files under the root directory. 83 """ 84 import meerschaum.config.paths as paths 85 return paths.SYSTEMD_JOBS_RESOURCES_PATH / name 86 87 def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 88 """ 89 Return the path to where to create the service symlink. 90 """ 91 import meerschaum.config.paths as paths 92 return paths.SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug) 93 94 def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 95 """ 96 Return the path to a Job's service file. 97 """ 98 return ( 99 self.get_service_job_path(name, debug=debug) 100 / self.get_service_name(name, debug=debug) 101 ) 102 103 def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path: 104 """ 105 Return the path to direct service logs to. 106 """ 107 import meerschaum.config.paths as paths 108 return ( 109 paths.SYSTEMD_LOGS_RESOURCES_PATH 110 / (self.get_service_name(name, debug=debug) + '.log') 111 ) 112 113 def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path: 114 """ 115 Return the path to the FIFO file. 116 """ 117 return ( 118 self.get_service_job_path(name, debug=debug) 119 / (self.get_service_name(name, debug=debug) + '.stdin') 120 ) 121 122 def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path: 123 """ 124 Return the path to the result file. 125 """ 126 return ( 127 self.get_service_job_path(name, debug=debug) 128 / (self.get_service_name(name, debug=debug) + '.result.json') 129 ) 130 131 def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str: 132 """ 133 Return the contents of the unit file. 134 """ 135 service_logs_path = self.get_service_logs_path(name, debug=debug) 136 socket_path = self.get_socket_path(name, debug=debug) 137 result_path = self.get_result_path(name, debug=debug) 138 job = self.get_hidden_job(name, debug=debug) 139 140 sysargs_str = shlex.join(sysargs) 141 exec_str = f'{sys.executable} -m meerschaum {sysargs_str}' 142 mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()]) 143 mrsm_env_vars = { 144 key: val 145 for key, val in os.environ.items() 146 if key in mrsm_env_var_names 147 } 148 149 ### Add new environment variables for the service process. 150 mrsm_env_vars.update({ 151 STATIC_CONFIG['environment']['daemon_id']: name, 152 STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(), 153 STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(), 154 STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(), 155 STATIC_CONFIG['environment']['systemd_delete_job']: ( 156 '1' 157 if job.delete_after_completion 158 else '0' 159 ), 160 }) 161 162 ### Allow for user-defined environment variables. 163 mrsm_env_vars.update(job.env) 164 165 environment_lines = [ 166 f"Environment={key}={shlex.quote(str(val))}" 167 for key, val in mrsm_env_vars.items() 168 ] 169 environment_str = '\n'.join(environment_lines) 170 service_name = self.get_service_name(name, debug=debug) 171 172 service_text = ( 173 "[Unit]\n" 174 f"Description=Run the job '{name}'\n" 175 "\n" 176 "[Service]\n" 177 f"ExecStart={exec_str}\n" 178 "KillSignal=SIGTERM\n" 179 "Restart=always\n" 180 "RestartPreventExitStatus=0\n" 181 f"SyslogIdentifier={service_name}\n" 182 f"{environment_str}\n" 183 "\n" 184 "[Install]\n" 185 "WantedBy=default.target\n" 186 ) 187 return service_text 188 189 def get_socket_file_text(self, name: str, debug: bool = False) -> str: 190 """ 191 Return the contents of the socket file. 192 """ 193 service_name = self.get_service_name(name, debug=debug) 194 socket_path = self.get_socket_path(name, debug=debug) 195 socket_text = ( 196 "[Unit]\n" 197 f"BindsTo={service_name}\n" 198 "\n" 199 "[Socket]\n" 200 f"ListenFIFO={socket_path.as_posix()}\n" 201 "FileDescriptorName=stdin\n" 202 "RemoveOnStop=true\n" 203 "SocketMode=0660\n" 204 ) 205 return socket_text 206 207 def get_hidden_job( 208 self, 209 name: str, 210 sysargs: Optional[List[str]] = None, 211 properties: Optional[Dict[str, Any]] = None, 212 debug: bool = False, 213 ): 214 """ 215 Return the hidden "sister" job to store a job's parameters. 216 """ 217 job = Job( 218 name, 219 sysargs, 220 executor_keys='local', 221 _properties=properties, 222 _rotating_log=self.get_job_rotating_file(name, debug=debug), 223 _stdin_file=self.get_job_stdin_file(name, debug=debug), 224 _status_hook=partial(self.get_job_status, name), 225 _result_hook=partial(self.get_job_result, name), 226 _externally_managed=True, 227 ) 228 return job 229 230 231 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 232 """ 233 Return metadata about a job. 234 """ 235 now = time.perf_counter() 236 237 if '_jobs_metadata' not in self.__dict__: 238 self._jobs_metadata: Dict[str, Any] = {} 239 240 if name in self._jobs_metadata: 241 ts = self._jobs_metadata[name].get('timestamp', None) 242 243 if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS: 244 if debug: 245 dprint(f"Retuning cached metadata for job '{name}'.") 246 return self._jobs_metadata[name]['metadata'] 247 248 metadata = { 249 'sysargs': self.get_job_sysargs(name, debug=debug), 250 'result': self.get_job_result(name, debug=debug), 251 'restart': self.get_job_restart(name, debug=debug), 252 'daemon': { 253 'status': self.get_job_status(name, debug=debug), 254 'pid': self.get_job_pid(name, debug=debug), 255 'properties': self.get_job_properties(name, debug=debug), 256 }, 257 } 258 self._jobs_metadata[name] = { 259 'timestamp': now, 260 'metadata': metadata, 261 } 262 return metadata 263 264 def get_job_restart(self, name: str, debug: bool = False) -> bool: 265 """ 266 Return whether a job restarts. 267 """ 268 from meerschaum.jobs._Job import RESTART_FLAGS 269 sysargs = self.get_job_sysargs(name, debug=debug) 270 if not sysargs: 271 return False 272 273 for flag in RESTART_FLAGS: 274 if flag in sysargs: 275 return True 276 277 return False 278 279 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 280 """ 281 Return the properties for a job. 282 """ 283 job = self.get_hidden_job(name, debug=debug) 284 return { 285 k: v for k, v in job.daemon.properties.items() 286 if k != 'externally_managed' 287 } 288 289 def get_job_process(self, name: str, debug: bool = False): 290 """ 291 Return a `psutil.Process` for the job's PID. 292 """ 293 pid = self.get_job_pid(name, debug=debug) 294 if pid is None: 295 return None 296 297 psutil = mrsm.attempt_import('psutil') 298 try: 299 return psutil.Process(pid) 300 except Exception: 301 return None 302 303 def get_job_status(self, name: str, debug: bool = False) -> str: 304 """ 305 Return the job's service status. 306 """ 307 output = self.run_command( 308 ['is-active', self.get_service_name(name, debug=debug)], 309 as_output=True, 310 debug=debug, 311 ) 312 313 if output == 'activating': 314 return 'running' 315 316 if output == 'active': 317 process = self.get_job_process(name, debug=debug) 318 if process is None: 319 return 'stopped' 320 321 try: 322 if process.status() == 'stopped': 323 return 'paused' 324 except Exception: 325 return 'stopped' 326 327 return 'running' 328 329 return 'stopped' 330 331 def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]: 332 """ 333 Return the job's service PID. 334 """ 335 from meerschaum.utils.misc import is_int 336 337 output = self.run_command( 338 [ 339 'show', 340 self.get_service_name(name, debug=debug), 341 '--property=MainPID', 342 ], 343 as_output=True, 344 debug=debug, 345 ) 346 if not output.startswith('MainPID='): 347 return None 348 349 pid_str = output[len('MainPID='):] 350 if pid_str == '0': 351 return None 352 353 if is_int(pid_str): 354 return int(pid_str) 355 356 return None 357 358 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 359 """ 360 Return when a job began running. 361 """ 362 output = self.run_command( 363 [ 364 'show', 365 self.get_service_name(name, debug=debug), 366 '--property=ActiveEnterTimestamp' 367 ], 368 as_output=True, 369 debug=debug, 370 ) 371 if not output.startswith('ActiveEnterTimestamp'): 372 return None 373 374 dt_str = output.split('=')[-1] 375 if not dt_str: 376 return None 377 378 dateutil_parser = mrsm.attempt_import('dateutil.parser') 379 try: 380 dt = dateutil_parser.parse(dt_str) 381 except Exception as e: 382 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 383 return None 384 385 return dt.astimezone(timezone.utc).isoformat() 386 387 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 388 """ 389 Return when a job began running. 390 """ 391 output = self.run_command( 392 [ 393 'show', 394 self.get_service_name(name, debug=debug), 395 '--property=InactiveEnterTimestamp' 396 ], 397 as_output=True, 398 debug=debug, 399 ) 400 if not output.startswith('InactiveEnterTimestamp'): 401 return None 402 403 dt_str = output.split('=')[-1] 404 if not dt_str: 405 return None 406 407 dateutil_parser = mrsm.attempt_import('dateutil.parser') 408 409 try: 410 dt = dateutil_parser.parse(dt_str) 411 except Exception as e: 412 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 413 return None 414 return dt.astimezone(timezone.utc).isoformat() 415 416 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 417 """ 418 Return when a job was paused. 419 """ 420 job = self.get_hidden_job(name, debug=debug) 421 if self.get_job_status(name, debug=debug) != 'paused': 422 return None 423 424 stop_time = job.stop_time 425 if stop_time is None: 426 return None 427 428 return stop_time.isoformat() 429 430 def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple: 431 """ 432 Return the job's result SuccessTuple. 433 """ 434 result_path = self.get_result_path(name, debug=debug) 435 if not result_path.exists(): 436 return False, "No result available." 437 438 try: 439 with open(result_path, 'r', encoding='utf-8') as f: 440 result = json.load(f) 441 except Exception: 442 return False, f"Could not read result for Job '{name}'." 443 444 return tuple(result) 445 446 def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]: 447 """ 448 Return the sysargs from the service file. 449 """ 450 service_file_path = self.get_service_file_path(name, debug=debug) 451 if not service_file_path.exists(): 452 return [] 453 454 with open(service_file_path, 'r', encoding='utf-8') as f: 455 service_lines = f.readlines() 456 457 for line in service_lines: 458 if line.startswith('ExecStart='): 459 sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0] 460 return shlex.split(sysargs_str) 461 462 return [] 463 464 def run_command( 465 self, 466 command_args: List[str], 467 as_output: bool = False, 468 debug: bool = False, 469 ) -> Union[SuccessTuple, str]: 470 """ 471 Run a `systemd` command and return success. 472 473 Parameters 474 ---------- 475 command_args: List[str] 476 The command to pass to `systemctl --user`. 477 478 as_output: bool, default False 479 If `True`, return the process stdout output. 480 Defaults to a `SuccessTuple`. 481 482 Returns 483 ------- 484 A `SuccessTuple` indicating success or a str for the process output. 485 """ 486 from meerschaum.utils.process import run_process 487 488 if command_args[:2] != ['systemctl', '--user']: 489 command_args = ['systemctl', '--user'] + command_args 490 491 if debug: 492 dprint(shlex.join(command_args)) 493 494 proc = run_process( 495 command_args, 496 foreground=False, 497 as_proc=True, 498 capture_output=True, 499 text=True, 500 ) 501 stdout, stderr = proc.communicate() 502 if debug: 503 dprint(f"{stdout}") 504 505 if as_output: 506 return stdout.strip() 507 508 command_success = proc.wait() == 0 509 command_msg = ( 510 "Success" 511 if command_success 512 else f"Failed to execute command `{shlex.join(command_args)}`." 513 ) 514 return command_success, command_msg 515 516 def get_job_stdin_file(self, name: str, debug: bool = False): 517 """ 518 Return a `StdinFile` for the job. 519 """ 520 from meerschaum.utils.daemon import StdinFile 521 if '_stdin_files' not in self.__dict__: 522 self._stdin_files: Dict[str, StdinFile] = {} 523 524 if name not in self._stdin_files: 525 socket_path = self.get_socket_path(name, debug=debug) 526 socket_path.parent.mkdir(parents=True, exist_ok=True) 527 self._stdin_files[name] = StdinFile(socket_path) 528 529 return self._stdin_files[name] 530 531 def create_job( 532 self, 533 name: str, 534 sysargs: List[str], 535 properties: Optional[Dict[str, Any]] = None, 536 debug: bool = False, 537 ) -> SuccessTuple: 538 """ 539 Create a job as a service to be run by `systemd`. 540 """ 541 from meerschaum.utils.misc import make_symlink 542 service_name = self.get_service_name(name, debug=debug) 543 service_file_path = self.get_service_file_path(name, debug=debug) 544 service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug) 545 socket_stdin = self.get_job_stdin_file(name, debug=debug) 546 _ = socket_stdin.file_handler 547 548 ### Init the externally_managed file. 549 ### NOTE: We must write the pickle file in addition to the properties file. 550 job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug) 551 job._set_externally_managed() 552 pickle_success, pickle_msg = job.daemon.write_pickle() 553 if not pickle_success: 554 return pickle_success, pickle_msg 555 properties_success, properties_msg = job.daemon.write_properties() 556 if not properties_success: 557 return properties_success, properties_msg 558 559 service_file_path.parent.mkdir(parents=True, exist_ok=True) 560 561 with open(service_file_path, 'w+', encoding='utf-8') as f: 562 f.write(self.get_service_file_text(name, sysargs, debug=debug)) 563 564 symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path) 565 if not symlink_success: 566 return symlink_success, symlink_msg 567 568 commands = [ 569 ['daemon-reload'], 570 ['enable', service_name], 571 ['start', service_name], 572 ] 573 574 fails = 0 575 for command_list in commands: 576 command_success, command_msg = self.run_command(command_list, debug=debug) 577 if not command_success: 578 fails += 1 579 580 if fails > 1: 581 return False, "Failed to reload systemd." 582 583 return True, f"Started job '{name}' via systemd." 584 585 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 586 """ 587 Stop a job's service. 588 """ 589 job = self.get_hidden_job(name, debug=debug) 590 job.daemon._remove_stop_file() 591 592 status = self.get_job_status(name, debug=debug) 593 if status == 'paused': 594 return self.run_command( 595 ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)], 596 debug=debug, 597 ) 598 599 return self.run_command( 600 ['start', self.get_service_name(name, debug=debug)], 601 debug=debug, 602 ) 603 604 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 605 """ 606 Stop a job's service. 607 """ 608 job = self.get_hidden_job(name, debug=debug) 609 job.daemon._write_stop_file('quit') 610 sigint_success, sigint_msg = self.run_command( 611 ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)], 612 debug=debug, 613 ) 614 615 check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds') 616 loop_start = time.perf_counter() 617 timeout_seconds = get_config('jobs', 'timeout_seconds') 618 while (time.perf_counter() - loop_start) < timeout_seconds: 619 if self.get_job_status(name, debug=debug) == 'stopped': 620 return True, 'Success' 621 622 time.sleep(check_timeout_interval) 623 624 return self.run_command( 625 ['stop', self.get_service_name(name, debug=debug)], 626 debug=debug, 627 ) 628 629 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 630 """ 631 Pause a job's service. 632 """ 633 job = self.get_hidden_job(name, debug=debug) 634 job.daemon._write_stop_file('pause') 635 return self.run_command( 636 ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)], 637 debug=debug, 638 ) 639 640 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 641 """ 642 Delete a job's service. 643 """ 644 import meerschaum.config.paths as paths 645 job = self.get_hidden_job(name, debug=debug) 646 647 if not job.delete_after_completion: 648 _ = self.stop_job(name, debug=debug) 649 _ = self.run_command( 650 ['disable', self.get_service_name(name, debug=debug)], 651 debug=debug, 652 ) 653 654 service_job_path = self.get_service_job_path(name, debug=debug) 655 try: 656 if service_job_path.exists(): 657 shutil.rmtree(service_job_path) 658 except Exception as e: 659 warn(e) 660 return False, str(e) 661 662 service_logs_path = self.get_service_logs_path(name, debug=debug) 663 logs_paths = [ 664 (paths.SYSTEMD_LOGS_RESOURCES_PATH / name) 665 for name in os.listdir(paths.SYSTEMD_LOGS_RESOURCES_PATH) 666 if name.startswith(service_logs_path.name + '.') 667 ] 668 file_paths = [ 669 self.get_service_file_path(name, debug=debug), 670 self.get_service_symlink_file_path(name, debug=debug), 671 self.get_socket_path(name, debug=debug), 672 self.get_result_path(name, debug=debug), 673 ] + logs_paths 674 675 for path in file_paths: 676 if path.exists(): 677 try: 678 path.unlink() 679 except Exception as e: 680 warn(e) 681 return False, str(e) 682 683 _ = job.delete() 684 685 return self.run_command(['daemon-reload'], debug=debug) 686 687 def get_logs(self, name: str, debug: bool = False) -> str: 688 """ 689 Return a job's journal logs. 690 """ 691 rotating_file = self.get_job_rotating_file(name, debug=debug) 692 return rotating_file.read() 693 694 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 695 """ 696 Return a job's stop time. 697 """ 698 job = self.get_hidden_job(name, debug=debug) 699 return job.stop_time 700 701 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 702 """ 703 Return whether a job is blocking on stdin. 704 """ 705 socket_path = self.get_socket_path(name, debug=debug) 706 blocking_path = socket_path.parent / (socket_path.name + '.block') 707 return blocking_path.exists() 708 709 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 710 """ 711 Return the kwargs to the blocking prompt. 712 """ 713 job = self.get_hidden_job(name, debug=debug) 714 return job.get_prompt_kwargs(debug=debug) 715 716 def get_job_rotating_file(self, name: str, debug: bool = False): 717 """ 718 Return a `RotatingFile` for the job's log output. 719 """ 720 from meerschaum.utils.daemon import RotatingFile 721 service_logs_path = self.get_service_logs_path(name, debug=debug) 722 return RotatingFile(service_logs_path) 723 724 async def monitor_logs_async( 725 self, 726 name: str, 727 *args, 728 debug: bool = False, 729 **kwargs 730 ): 731 """ 732 Monitor a job's output. 733 """ 734 import meerschaum.config.paths as paths 735 job = self.get_hidden_job(name, debug=debug) 736 kwargs.update({ 737 '_logs_path': paths.SYSTEMD_LOGS_RESOURCES_PATH, 738 '_log': self.get_job_rotating_file(name, debug=debug), 739 '_stdin_file': self.get_job_stdin_file(name, debug=debug), 740 'debug': debug, 741 }) 742 await job.monitor_logs_async(*args, **kwargs) 743 744 def monitor_logs(self, *args, **kwargs): 745 """ 746 Monitor a job's output. 747 """ 748 asyncio.run(self.monitor_logs_async(*args, **kwargs))
30@make_executor 31class SystemdExecutor(Executor): 32 """ 33 Execute Meerschaum jobs via `systemd`. 34 """ 35 36 def get_job_names(self, debug: bool = False) -> List[str]: 37 """ 38 Return a list of existing jobs, including hidden ones. 39 """ 40 import meerschaum.config.paths as paths 41 return [ 42 service_name[len('mrsm-'):(-1 * len('.service'))] 43 for service_name in os.listdir(paths.SYSTEMD_USER_RESOURCES_PATH) 44 if ( 45 service_name.startswith('mrsm-') 46 and service_name.endswith('.service') 47 ### Check for broken symlinks. 48 and (paths.SYSTEMD_USER_RESOURCES_PATH / service_name).exists() 49 ) 50 ] 51 52 def get_job_exists(self, name: str, debug: bool = False) -> bool: 53 """ 54 Return whether a job exists. 55 """ 56 user_services = self.get_job_names(debug=debug) 57 if debug: 58 dprint(f'Existing services: {user_services}') 59 return name in user_services 60 61 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 62 """ 63 Return a dictionary of `systemd` Jobs (including hidden jobs). 64 """ 65 user_services = self.get_job_names(debug=debug) 66 jobs = { 67 name: Job(name, executor_keys=str(self)) 68 for name in user_services 69 } 70 return { 71 name: job 72 for name, job in jobs.items() 73 } 74 75 def get_service_name(self, name: str, debug: bool = False) -> str: 76 """ 77 Return a job's service name. 78 """ 79 return f"mrsm-{name.replace(' ', '-')}.service" 80 81 def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path: 82 """ 83 Return the path for the job's files under the root directory. 84 """ 85 import meerschaum.config.paths as paths 86 return paths.SYSTEMD_JOBS_RESOURCES_PATH / name 87 88 def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 89 """ 90 Return the path to where to create the service symlink. 91 """ 92 import meerschaum.config.paths as paths 93 return paths.SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug) 94 95 def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 96 """ 97 Return the path to a Job's service file. 98 """ 99 return ( 100 self.get_service_job_path(name, debug=debug) 101 / self.get_service_name(name, debug=debug) 102 ) 103 104 def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path: 105 """ 106 Return the path to direct service logs to. 107 """ 108 import meerschaum.config.paths as paths 109 return ( 110 paths.SYSTEMD_LOGS_RESOURCES_PATH 111 / (self.get_service_name(name, debug=debug) + '.log') 112 ) 113 114 def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path: 115 """ 116 Return the path to the FIFO file. 117 """ 118 return ( 119 self.get_service_job_path(name, debug=debug) 120 / (self.get_service_name(name, debug=debug) + '.stdin') 121 ) 122 123 def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path: 124 """ 125 Return the path to the result file. 126 """ 127 return ( 128 self.get_service_job_path(name, debug=debug) 129 / (self.get_service_name(name, debug=debug) + '.result.json') 130 ) 131 132 def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str: 133 """ 134 Return the contents of the unit file. 135 """ 136 service_logs_path = self.get_service_logs_path(name, debug=debug) 137 socket_path = self.get_socket_path(name, debug=debug) 138 result_path = self.get_result_path(name, debug=debug) 139 job = self.get_hidden_job(name, debug=debug) 140 141 sysargs_str = shlex.join(sysargs) 142 exec_str = f'{sys.executable} -m meerschaum {sysargs_str}' 143 mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()]) 144 mrsm_env_vars = { 145 key: val 146 for key, val in os.environ.items() 147 if key in mrsm_env_var_names 148 } 149 150 ### Add new environment variables for the service process. 151 mrsm_env_vars.update({ 152 STATIC_CONFIG['environment']['daemon_id']: name, 153 STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(), 154 STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(), 155 STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(), 156 STATIC_CONFIG['environment']['systemd_delete_job']: ( 157 '1' 158 if job.delete_after_completion 159 else '0' 160 ), 161 }) 162 163 ### Allow for user-defined environment variables. 164 mrsm_env_vars.update(job.env) 165 166 environment_lines = [ 167 f"Environment={key}={shlex.quote(str(val))}" 168 for key, val in mrsm_env_vars.items() 169 ] 170 environment_str = '\n'.join(environment_lines) 171 service_name = self.get_service_name(name, debug=debug) 172 173 service_text = ( 174 "[Unit]\n" 175 f"Description=Run the job '{name}'\n" 176 "\n" 177 "[Service]\n" 178 f"ExecStart={exec_str}\n" 179 "KillSignal=SIGTERM\n" 180 "Restart=always\n" 181 "RestartPreventExitStatus=0\n" 182 f"SyslogIdentifier={service_name}\n" 183 f"{environment_str}\n" 184 "\n" 185 "[Install]\n" 186 "WantedBy=default.target\n" 187 ) 188 return service_text 189 190 def get_socket_file_text(self, name: str, debug: bool = False) -> str: 191 """ 192 Return the contents of the socket file. 193 """ 194 service_name = self.get_service_name(name, debug=debug) 195 socket_path = self.get_socket_path(name, debug=debug) 196 socket_text = ( 197 "[Unit]\n" 198 f"BindsTo={service_name}\n" 199 "\n" 200 "[Socket]\n" 201 f"ListenFIFO={socket_path.as_posix()}\n" 202 "FileDescriptorName=stdin\n" 203 "RemoveOnStop=true\n" 204 "SocketMode=0660\n" 205 ) 206 return socket_text 207 208 def get_hidden_job( 209 self, 210 name: str, 211 sysargs: Optional[List[str]] = None, 212 properties: Optional[Dict[str, Any]] = None, 213 debug: bool = False, 214 ): 215 """ 216 Return the hidden "sister" job to store a job's parameters. 217 """ 218 job = Job( 219 name, 220 sysargs, 221 executor_keys='local', 222 _properties=properties, 223 _rotating_log=self.get_job_rotating_file(name, debug=debug), 224 _stdin_file=self.get_job_stdin_file(name, debug=debug), 225 _status_hook=partial(self.get_job_status, name), 226 _result_hook=partial(self.get_job_result, name), 227 _externally_managed=True, 228 ) 229 return job 230 231 232 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 233 """ 234 Return metadata about a job. 235 """ 236 now = time.perf_counter() 237 238 if '_jobs_metadata' not in self.__dict__: 239 self._jobs_metadata: Dict[str, Any] = {} 240 241 if name in self._jobs_metadata: 242 ts = self._jobs_metadata[name].get('timestamp', None) 243 244 if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS: 245 if debug: 246 dprint(f"Retuning cached metadata for job '{name}'.") 247 return self._jobs_metadata[name]['metadata'] 248 249 metadata = { 250 'sysargs': self.get_job_sysargs(name, debug=debug), 251 'result': self.get_job_result(name, debug=debug), 252 'restart': self.get_job_restart(name, debug=debug), 253 'daemon': { 254 'status': self.get_job_status(name, debug=debug), 255 'pid': self.get_job_pid(name, debug=debug), 256 'properties': self.get_job_properties(name, debug=debug), 257 }, 258 } 259 self._jobs_metadata[name] = { 260 'timestamp': now, 261 'metadata': metadata, 262 } 263 return metadata 264 265 def get_job_restart(self, name: str, debug: bool = False) -> bool: 266 """ 267 Return whether a job restarts. 268 """ 269 from meerschaum.jobs._Job import RESTART_FLAGS 270 sysargs = self.get_job_sysargs(name, debug=debug) 271 if not sysargs: 272 return False 273 274 for flag in RESTART_FLAGS: 275 if flag in sysargs: 276 return True 277 278 return False 279 280 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 281 """ 282 Return the properties for a job. 283 """ 284 job = self.get_hidden_job(name, debug=debug) 285 return { 286 k: v for k, v in job.daemon.properties.items() 287 if k != 'externally_managed' 288 } 289 290 def get_job_process(self, name: str, debug: bool = False): 291 """ 292 Return a `psutil.Process` for the job's PID. 293 """ 294 pid = self.get_job_pid(name, debug=debug) 295 if pid is None: 296 return None 297 298 psutil = mrsm.attempt_import('psutil') 299 try: 300 return psutil.Process(pid) 301 except Exception: 302 return None 303 304 def get_job_status(self, name: str, debug: bool = False) -> str: 305 """ 306 Return the job's service status. 307 """ 308 output = self.run_command( 309 ['is-active', self.get_service_name(name, debug=debug)], 310 as_output=True, 311 debug=debug, 312 ) 313 314 if output == 'activating': 315 return 'running' 316 317 if output == 'active': 318 process = self.get_job_process(name, debug=debug) 319 if process is None: 320 return 'stopped' 321 322 try: 323 if process.status() == 'stopped': 324 return 'paused' 325 except Exception: 326 return 'stopped' 327 328 return 'running' 329 330 return 'stopped' 331 332 def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]: 333 """ 334 Return the job's service PID. 335 """ 336 from meerschaum.utils.misc import is_int 337 338 output = self.run_command( 339 [ 340 'show', 341 self.get_service_name(name, debug=debug), 342 '--property=MainPID', 343 ], 344 as_output=True, 345 debug=debug, 346 ) 347 if not output.startswith('MainPID='): 348 return None 349 350 pid_str = output[len('MainPID='):] 351 if pid_str == '0': 352 return None 353 354 if is_int(pid_str): 355 return int(pid_str) 356 357 return None 358 359 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 360 """ 361 Return when a job began running. 362 """ 363 output = self.run_command( 364 [ 365 'show', 366 self.get_service_name(name, debug=debug), 367 '--property=ActiveEnterTimestamp' 368 ], 369 as_output=True, 370 debug=debug, 371 ) 372 if not output.startswith('ActiveEnterTimestamp'): 373 return None 374 375 dt_str = output.split('=')[-1] 376 if not dt_str: 377 return None 378 379 dateutil_parser = mrsm.attempt_import('dateutil.parser') 380 try: 381 dt = dateutil_parser.parse(dt_str) 382 except Exception as e: 383 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 384 return None 385 386 return dt.astimezone(timezone.utc).isoformat() 387 388 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 389 """ 390 Return when a job began running. 391 """ 392 output = self.run_command( 393 [ 394 'show', 395 self.get_service_name(name, debug=debug), 396 '--property=InactiveEnterTimestamp' 397 ], 398 as_output=True, 399 debug=debug, 400 ) 401 if not output.startswith('InactiveEnterTimestamp'): 402 return None 403 404 dt_str = output.split('=')[-1] 405 if not dt_str: 406 return None 407 408 dateutil_parser = mrsm.attempt_import('dateutil.parser') 409 410 try: 411 dt = dateutil_parser.parse(dt_str) 412 except Exception as e: 413 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 414 return None 415 return dt.astimezone(timezone.utc).isoformat() 416 417 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 418 """ 419 Return when a job was paused. 420 """ 421 job = self.get_hidden_job(name, debug=debug) 422 if self.get_job_status(name, debug=debug) != 'paused': 423 return None 424 425 stop_time = job.stop_time 426 if stop_time is None: 427 return None 428 429 return stop_time.isoformat() 430 431 def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple: 432 """ 433 Return the job's result SuccessTuple. 434 """ 435 result_path = self.get_result_path(name, debug=debug) 436 if not result_path.exists(): 437 return False, "No result available." 438 439 try: 440 with open(result_path, 'r', encoding='utf-8') as f: 441 result = json.load(f) 442 except Exception: 443 return False, f"Could not read result for Job '{name}'." 444 445 return tuple(result) 446 447 def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]: 448 """ 449 Return the sysargs from the service file. 450 """ 451 service_file_path = self.get_service_file_path(name, debug=debug) 452 if not service_file_path.exists(): 453 return [] 454 455 with open(service_file_path, 'r', encoding='utf-8') as f: 456 service_lines = f.readlines() 457 458 for line in service_lines: 459 if line.startswith('ExecStart='): 460 sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0] 461 return shlex.split(sysargs_str) 462 463 return [] 464 465 def run_command( 466 self, 467 command_args: List[str], 468 as_output: bool = False, 469 debug: bool = False, 470 ) -> Union[SuccessTuple, str]: 471 """ 472 Run a `systemd` command and return success. 473 474 Parameters 475 ---------- 476 command_args: List[str] 477 The command to pass to `systemctl --user`. 478 479 as_output: bool, default False 480 If `True`, return the process stdout output. 481 Defaults to a `SuccessTuple`. 482 483 Returns 484 ------- 485 A `SuccessTuple` indicating success or a str for the process output. 486 """ 487 from meerschaum.utils.process import run_process 488 489 if command_args[:2] != ['systemctl', '--user']: 490 command_args = ['systemctl', '--user'] + command_args 491 492 if debug: 493 dprint(shlex.join(command_args)) 494 495 proc = run_process( 496 command_args, 497 foreground=False, 498 as_proc=True, 499 capture_output=True, 500 text=True, 501 ) 502 stdout, stderr = proc.communicate() 503 if debug: 504 dprint(f"{stdout}") 505 506 if as_output: 507 return stdout.strip() 508 509 command_success = proc.wait() == 0 510 command_msg = ( 511 "Success" 512 if command_success 513 else f"Failed to execute command `{shlex.join(command_args)}`." 514 ) 515 return command_success, command_msg 516 517 def get_job_stdin_file(self, name: str, debug: bool = False): 518 """ 519 Return a `StdinFile` for the job. 520 """ 521 from meerschaum.utils.daemon import StdinFile 522 if '_stdin_files' not in self.__dict__: 523 self._stdin_files: Dict[str, StdinFile] = {} 524 525 if name not in self._stdin_files: 526 socket_path = self.get_socket_path(name, debug=debug) 527 socket_path.parent.mkdir(parents=True, exist_ok=True) 528 self._stdin_files[name] = StdinFile(socket_path) 529 530 return self._stdin_files[name] 531 532 def create_job( 533 self, 534 name: str, 535 sysargs: List[str], 536 properties: Optional[Dict[str, Any]] = None, 537 debug: bool = False, 538 ) -> SuccessTuple: 539 """ 540 Create a job as a service to be run by `systemd`. 541 """ 542 from meerschaum.utils.misc import make_symlink 543 service_name = self.get_service_name(name, debug=debug) 544 service_file_path = self.get_service_file_path(name, debug=debug) 545 service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug) 546 socket_stdin = self.get_job_stdin_file(name, debug=debug) 547 _ = socket_stdin.file_handler 548 549 ### Init the externally_managed file. 550 ### NOTE: We must write the pickle file in addition to the properties file. 551 job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug) 552 job._set_externally_managed() 553 pickle_success, pickle_msg = job.daemon.write_pickle() 554 if not pickle_success: 555 return pickle_success, pickle_msg 556 properties_success, properties_msg = job.daemon.write_properties() 557 if not properties_success: 558 return properties_success, properties_msg 559 560 service_file_path.parent.mkdir(parents=True, exist_ok=True) 561 562 with open(service_file_path, 'w+', encoding='utf-8') as f: 563 f.write(self.get_service_file_text(name, sysargs, debug=debug)) 564 565 symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path) 566 if not symlink_success: 567 return symlink_success, symlink_msg 568 569 commands = [ 570 ['daemon-reload'], 571 ['enable', service_name], 572 ['start', service_name], 573 ] 574 575 fails = 0 576 for command_list in commands: 577 command_success, command_msg = self.run_command(command_list, debug=debug) 578 if not command_success: 579 fails += 1 580 581 if fails > 1: 582 return False, "Failed to reload systemd." 583 584 return True, f"Started job '{name}' via systemd." 585 586 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 587 """ 588 Stop a job's service. 589 """ 590 job = self.get_hidden_job(name, debug=debug) 591 job.daemon._remove_stop_file() 592 593 status = self.get_job_status(name, debug=debug) 594 if status == 'paused': 595 return self.run_command( 596 ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)], 597 debug=debug, 598 ) 599 600 return self.run_command( 601 ['start', self.get_service_name(name, debug=debug)], 602 debug=debug, 603 ) 604 605 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 606 """ 607 Stop a job's service. 608 """ 609 job = self.get_hidden_job(name, debug=debug) 610 job.daemon._write_stop_file('quit') 611 sigint_success, sigint_msg = self.run_command( 612 ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)], 613 debug=debug, 614 ) 615 616 check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds') 617 loop_start = time.perf_counter() 618 timeout_seconds = get_config('jobs', 'timeout_seconds') 619 while (time.perf_counter() - loop_start) < timeout_seconds: 620 if self.get_job_status(name, debug=debug) == 'stopped': 621 return True, 'Success' 622 623 time.sleep(check_timeout_interval) 624 625 return self.run_command( 626 ['stop', self.get_service_name(name, debug=debug)], 627 debug=debug, 628 ) 629 630 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 631 """ 632 Pause a job's service. 633 """ 634 job = self.get_hidden_job(name, debug=debug) 635 job.daemon._write_stop_file('pause') 636 return self.run_command( 637 ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)], 638 debug=debug, 639 ) 640 641 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 642 """ 643 Delete a job's service. 644 """ 645 import meerschaum.config.paths as paths 646 job = self.get_hidden_job(name, debug=debug) 647 648 if not job.delete_after_completion: 649 _ = self.stop_job(name, debug=debug) 650 _ = self.run_command( 651 ['disable', self.get_service_name(name, debug=debug)], 652 debug=debug, 653 ) 654 655 service_job_path = self.get_service_job_path(name, debug=debug) 656 try: 657 if service_job_path.exists(): 658 shutil.rmtree(service_job_path) 659 except Exception as e: 660 warn(e) 661 return False, str(e) 662 663 service_logs_path = self.get_service_logs_path(name, debug=debug) 664 logs_paths = [ 665 (paths.SYSTEMD_LOGS_RESOURCES_PATH / name) 666 for name in os.listdir(paths.SYSTEMD_LOGS_RESOURCES_PATH) 667 if name.startswith(service_logs_path.name + '.') 668 ] 669 file_paths = [ 670 self.get_service_file_path(name, debug=debug), 671 self.get_service_symlink_file_path(name, debug=debug), 672 self.get_socket_path(name, debug=debug), 673 self.get_result_path(name, debug=debug), 674 ] + logs_paths 675 676 for path in file_paths: 677 if path.exists(): 678 try: 679 path.unlink() 680 except Exception as e: 681 warn(e) 682 return False, str(e) 683 684 _ = job.delete() 685 686 return self.run_command(['daemon-reload'], debug=debug) 687 688 def get_logs(self, name: str, debug: bool = False) -> str: 689 """ 690 Return a job's journal logs. 691 """ 692 rotating_file = self.get_job_rotating_file(name, debug=debug) 693 return rotating_file.read() 694 695 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 696 """ 697 Return a job's stop time. 698 """ 699 job = self.get_hidden_job(name, debug=debug) 700 return job.stop_time 701 702 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 703 """ 704 Return whether a job is blocking on stdin. 705 """ 706 socket_path = self.get_socket_path(name, debug=debug) 707 blocking_path = socket_path.parent / (socket_path.name + '.block') 708 return blocking_path.exists() 709 710 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 711 """ 712 Return the kwargs to the blocking prompt. 713 """ 714 job = self.get_hidden_job(name, debug=debug) 715 return job.get_prompt_kwargs(debug=debug) 716 717 def get_job_rotating_file(self, name: str, debug: bool = False): 718 """ 719 Return a `RotatingFile` for the job's log output. 720 """ 721 from meerschaum.utils.daemon import RotatingFile 722 service_logs_path = self.get_service_logs_path(name, debug=debug) 723 return RotatingFile(service_logs_path) 724 725 async def monitor_logs_async( 726 self, 727 name: str, 728 *args, 729 debug: bool = False, 730 **kwargs 731 ): 732 """ 733 Monitor a job's output. 734 """ 735 import meerschaum.config.paths as paths 736 job = self.get_hidden_job(name, debug=debug) 737 kwargs.update({ 738 '_logs_path': paths.SYSTEMD_LOGS_RESOURCES_PATH, 739 '_log': self.get_job_rotating_file(name, debug=debug), 740 '_stdin_file': self.get_job_stdin_file(name, debug=debug), 741 'debug': debug, 742 }) 743 await job.monitor_logs_async(*args, **kwargs) 744 745 def monitor_logs(self, *args, **kwargs): 746 """ 747 Monitor a job's output. 748 """ 749 asyncio.run(self.monitor_logs_async(*args, **kwargs))
Execute Meerschaum jobs via systemd.
36 def get_job_names(self, debug: bool = False) -> List[str]: 37 """ 38 Return a list of existing jobs, including hidden ones. 39 """ 40 import meerschaum.config.paths as paths 41 return [ 42 service_name[len('mrsm-'):(-1 * len('.service'))] 43 for service_name in os.listdir(paths.SYSTEMD_USER_RESOURCES_PATH) 44 if ( 45 service_name.startswith('mrsm-') 46 and service_name.endswith('.service') 47 ### Check for broken symlinks. 48 and (paths.SYSTEMD_USER_RESOURCES_PATH / service_name).exists() 49 ) 50 ]
Return a list of existing jobs, including hidden ones.
52 def get_job_exists(self, name: str, debug: bool = False) -> bool: 53 """ 54 Return whether a job exists. 55 """ 56 user_services = self.get_job_names(debug=debug) 57 if debug: 58 dprint(f'Existing services: {user_services}') 59 return name in user_services
Return whether a job exists.
61 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 62 """ 63 Return a dictionary of `systemd` Jobs (including hidden jobs). 64 """ 65 user_services = self.get_job_names(debug=debug) 66 jobs = { 67 name: Job(name, executor_keys=str(self)) 68 for name in user_services 69 } 70 return { 71 name: job 72 for name, job in jobs.items() 73 }
Return a dictionary of systemd Jobs (including hidden jobs).
75 def get_service_name(self, name: str, debug: bool = False) -> str: 76 """ 77 Return a job's service name. 78 """ 79 return f"mrsm-{name.replace(' ', '-')}.service"
Return a job's service name.
81 def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path: 82 """ 83 Return the path for the job's files under the root directory. 84 """ 85 import meerschaum.config.paths as paths 86 return paths.SYSTEMD_JOBS_RESOURCES_PATH / name
Return the path for the job's files under the root directory.
88 def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 89 """ 90 Return the path to where to create the service symlink. 91 """ 92 import meerschaum.config.paths as paths 93 return paths.SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug)
Return the path to where to create the service symlink.
95 def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 96 """ 97 Return the path to a Job's service file. 98 """ 99 return ( 100 self.get_service_job_path(name, debug=debug) 101 / self.get_service_name(name, debug=debug) 102 )
Return the path to a Job's service file.
104 def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path: 105 """ 106 Return the path to direct service logs to. 107 """ 108 import meerschaum.config.paths as paths 109 return ( 110 paths.SYSTEMD_LOGS_RESOURCES_PATH 111 / (self.get_service_name(name, debug=debug) + '.log') 112 )
Return the path to direct service logs to.
114 def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path: 115 """ 116 Return the path to the FIFO file. 117 """ 118 return ( 119 self.get_service_job_path(name, debug=debug) 120 / (self.get_service_name(name, debug=debug) + '.stdin') 121 )
Return the path to the FIFO file.
123 def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path: 124 """ 125 Return the path to the result file. 126 """ 127 return ( 128 self.get_service_job_path(name, debug=debug) 129 / (self.get_service_name(name, debug=debug) + '.result.json') 130 )
Return the path to the result file.
132 def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str: 133 """ 134 Return the contents of the unit file. 135 """ 136 service_logs_path = self.get_service_logs_path(name, debug=debug) 137 socket_path = self.get_socket_path(name, debug=debug) 138 result_path = self.get_result_path(name, debug=debug) 139 job = self.get_hidden_job(name, debug=debug) 140 141 sysargs_str = shlex.join(sysargs) 142 exec_str = f'{sys.executable} -m meerschaum {sysargs_str}' 143 mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()]) 144 mrsm_env_vars = { 145 key: val 146 for key, val in os.environ.items() 147 if key in mrsm_env_var_names 148 } 149 150 ### Add new environment variables for the service process. 151 mrsm_env_vars.update({ 152 STATIC_CONFIG['environment']['daemon_id']: name, 153 STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(), 154 STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(), 155 STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(), 156 STATIC_CONFIG['environment']['systemd_delete_job']: ( 157 '1' 158 if job.delete_after_completion 159 else '0' 160 ), 161 }) 162 163 ### Allow for user-defined environment variables. 164 mrsm_env_vars.update(job.env) 165 166 environment_lines = [ 167 f"Environment={key}={shlex.quote(str(val))}" 168 for key, val in mrsm_env_vars.items() 169 ] 170 environment_str = '\n'.join(environment_lines) 171 service_name = self.get_service_name(name, debug=debug) 172 173 service_text = ( 174 "[Unit]\n" 175 f"Description=Run the job '{name}'\n" 176 "\n" 177 "[Service]\n" 178 f"ExecStart={exec_str}\n" 179 "KillSignal=SIGTERM\n" 180 "Restart=always\n" 181 "RestartPreventExitStatus=0\n" 182 f"SyslogIdentifier={service_name}\n" 183 f"{environment_str}\n" 184 "\n" 185 "[Install]\n" 186 "WantedBy=default.target\n" 187 ) 188 return service_text
Return the contents of the unit file.
190 def get_socket_file_text(self, name: str, debug: bool = False) -> str: 191 """ 192 Return the contents of the socket file. 193 """ 194 service_name = self.get_service_name(name, debug=debug) 195 socket_path = self.get_socket_path(name, debug=debug) 196 socket_text = ( 197 "[Unit]\n" 198 f"BindsTo={service_name}\n" 199 "\n" 200 "[Socket]\n" 201 f"ListenFIFO={socket_path.as_posix()}\n" 202 "FileDescriptorName=stdin\n" 203 "RemoveOnStop=true\n" 204 "SocketMode=0660\n" 205 ) 206 return socket_text
Return the contents of the socket file.
232 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 233 """ 234 Return metadata about a job. 235 """ 236 now = time.perf_counter() 237 238 if '_jobs_metadata' not in self.__dict__: 239 self._jobs_metadata: Dict[str, Any] = {} 240 241 if name in self._jobs_metadata: 242 ts = self._jobs_metadata[name].get('timestamp', None) 243 244 if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS: 245 if debug: 246 dprint(f"Retuning cached metadata for job '{name}'.") 247 return self._jobs_metadata[name]['metadata'] 248 249 metadata = { 250 'sysargs': self.get_job_sysargs(name, debug=debug), 251 'result': self.get_job_result(name, debug=debug), 252 'restart': self.get_job_restart(name, debug=debug), 253 'daemon': { 254 'status': self.get_job_status(name, debug=debug), 255 'pid': self.get_job_pid(name, debug=debug), 256 'properties': self.get_job_properties(name, debug=debug), 257 }, 258 } 259 self._jobs_metadata[name] = { 260 'timestamp': now, 261 'metadata': metadata, 262 } 263 return metadata
Return metadata about a job.
265 def get_job_restart(self, name: str, debug: bool = False) -> bool: 266 """ 267 Return whether a job restarts. 268 """ 269 from meerschaum.jobs._Job import RESTART_FLAGS 270 sysargs = self.get_job_sysargs(name, debug=debug) 271 if not sysargs: 272 return False 273 274 for flag in RESTART_FLAGS: 275 if flag in sysargs: 276 return True 277 278 return False
Return whether a job restarts.
280 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 281 """ 282 Return the properties for a job. 283 """ 284 job = self.get_hidden_job(name, debug=debug) 285 return { 286 k: v for k, v in job.daemon.properties.items() 287 if k != 'externally_managed' 288 }
Return the properties for a job.
290 def get_job_process(self, name: str, debug: bool = False): 291 """ 292 Return a `psutil.Process` for the job's PID. 293 """ 294 pid = self.get_job_pid(name, debug=debug) 295 if pid is None: 296 return None 297 298 psutil = mrsm.attempt_import('psutil') 299 try: 300 return psutil.Process(pid) 301 except Exception: 302 return None
Return a psutil.Process for the job's PID.
304 def get_job_status(self, name: str, debug: bool = False) -> str: 305 """ 306 Return the job's service status. 307 """ 308 output = self.run_command( 309 ['is-active', self.get_service_name(name, debug=debug)], 310 as_output=True, 311 debug=debug, 312 ) 313 314 if output == 'activating': 315 return 'running' 316 317 if output == 'active': 318 process = self.get_job_process(name, debug=debug) 319 if process is None: 320 return 'stopped' 321 322 try: 323 if process.status() == 'stopped': 324 return 'paused' 325 except Exception: 326 return 'stopped' 327 328 return 'running' 329 330 return 'stopped'
Return the job's service status.
332 def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]: 333 """ 334 Return the job's service PID. 335 """ 336 from meerschaum.utils.misc import is_int 337 338 output = self.run_command( 339 [ 340 'show', 341 self.get_service_name(name, debug=debug), 342 '--property=MainPID', 343 ], 344 as_output=True, 345 debug=debug, 346 ) 347 if not output.startswith('MainPID='): 348 return None 349 350 pid_str = output[len('MainPID='):] 351 if pid_str == '0': 352 return None 353 354 if is_int(pid_str): 355 return int(pid_str) 356 357 return None
Return the job's service PID.
359 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 360 """ 361 Return when a job began running. 362 """ 363 output = self.run_command( 364 [ 365 'show', 366 self.get_service_name(name, debug=debug), 367 '--property=ActiveEnterTimestamp' 368 ], 369 as_output=True, 370 debug=debug, 371 ) 372 if not output.startswith('ActiveEnterTimestamp'): 373 return None 374 375 dt_str = output.split('=')[-1] 376 if not dt_str: 377 return None 378 379 dateutil_parser = mrsm.attempt_import('dateutil.parser') 380 try: 381 dt = dateutil_parser.parse(dt_str) 382 except Exception as e: 383 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 384 return None 385 386 return dt.astimezone(timezone.utc).isoformat()
Return when a job began running.
388 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 389 """ 390 Return when a job began running. 391 """ 392 output = self.run_command( 393 [ 394 'show', 395 self.get_service_name(name, debug=debug), 396 '--property=InactiveEnterTimestamp' 397 ], 398 as_output=True, 399 debug=debug, 400 ) 401 if not output.startswith('InactiveEnterTimestamp'): 402 return None 403 404 dt_str = output.split('=')[-1] 405 if not dt_str: 406 return None 407 408 dateutil_parser = mrsm.attempt_import('dateutil.parser') 409 410 try: 411 dt = dateutil_parser.parse(dt_str) 412 except Exception as e: 413 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 414 return None 415 return dt.astimezone(timezone.utc).isoformat()
Return when a job began running.
417 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 418 """ 419 Return when a job was paused. 420 """ 421 job = self.get_hidden_job(name, debug=debug) 422 if self.get_job_status(name, debug=debug) != 'paused': 423 return None 424 425 stop_time = job.stop_time 426 if stop_time is None: 427 return None 428 429 return stop_time.isoformat()
Return when a job was paused.
431 def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple: 432 """ 433 Return the job's result SuccessTuple. 434 """ 435 result_path = self.get_result_path(name, debug=debug) 436 if not result_path.exists(): 437 return False, "No result available." 438 439 try: 440 with open(result_path, 'r', encoding='utf-8') as f: 441 result = json.load(f) 442 except Exception: 443 return False, f"Could not read result for Job '{name}'." 444 445 return tuple(result)
Return the job's result SuccessTuple.
447 def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]: 448 """ 449 Return the sysargs from the service file. 450 """ 451 service_file_path = self.get_service_file_path(name, debug=debug) 452 if not service_file_path.exists(): 453 return [] 454 455 with open(service_file_path, 'r', encoding='utf-8') as f: 456 service_lines = f.readlines() 457 458 for line in service_lines: 459 if line.startswith('ExecStart='): 460 sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0] 461 return shlex.split(sysargs_str) 462 463 return []
Return the sysargs from the service file.
465 def run_command( 466 self, 467 command_args: List[str], 468 as_output: bool = False, 469 debug: bool = False, 470 ) -> Union[SuccessTuple, str]: 471 """ 472 Run a `systemd` command and return success. 473 474 Parameters 475 ---------- 476 command_args: List[str] 477 The command to pass to `systemctl --user`. 478 479 as_output: bool, default False 480 If `True`, return the process stdout output. 481 Defaults to a `SuccessTuple`. 482 483 Returns 484 ------- 485 A `SuccessTuple` indicating success or a str for the process output. 486 """ 487 from meerschaum.utils.process import run_process 488 489 if command_args[:2] != ['systemctl', '--user']: 490 command_args = ['systemctl', '--user'] + command_args 491 492 if debug: 493 dprint(shlex.join(command_args)) 494 495 proc = run_process( 496 command_args, 497 foreground=False, 498 as_proc=True, 499 capture_output=True, 500 text=True, 501 ) 502 stdout, stderr = proc.communicate() 503 if debug: 504 dprint(f"{stdout}") 505 506 if as_output: 507 return stdout.strip() 508 509 command_success = proc.wait() == 0 510 command_msg = ( 511 "Success" 512 if command_success 513 else f"Failed to execute command `{shlex.join(command_args)}`." 514 ) 515 return command_success, command_msg
Run a systemd command and return success.
Parameters
- command_args (List[str]):
The command to pass to
systemctl --user. - as_output (bool, default False):
If
True, return the process stdout output. Defaults to aSuccessTuple.
Returns
- A
SuccessTupleindicating success or a str for the process output.
517 def get_job_stdin_file(self, name: str, debug: bool = False): 518 """ 519 Return a `StdinFile` for the job. 520 """ 521 from meerschaum.utils.daemon import StdinFile 522 if '_stdin_files' not in self.__dict__: 523 self._stdin_files: Dict[str, StdinFile] = {} 524 525 if name not in self._stdin_files: 526 socket_path = self.get_socket_path(name, debug=debug) 527 socket_path.parent.mkdir(parents=True, exist_ok=True) 528 self._stdin_files[name] = StdinFile(socket_path) 529 530 return self._stdin_files[name]
Return a StdinFile for the job.
532 def create_job( 533 self, 534 name: str, 535 sysargs: List[str], 536 properties: Optional[Dict[str, Any]] = None, 537 debug: bool = False, 538 ) -> SuccessTuple: 539 """ 540 Create a job as a service to be run by `systemd`. 541 """ 542 from meerschaum.utils.misc import make_symlink 543 service_name = self.get_service_name(name, debug=debug) 544 service_file_path = self.get_service_file_path(name, debug=debug) 545 service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug) 546 socket_stdin = self.get_job_stdin_file(name, debug=debug) 547 _ = socket_stdin.file_handler 548 549 ### Init the externally_managed file. 550 ### NOTE: We must write the pickle file in addition to the properties file. 551 job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug) 552 job._set_externally_managed() 553 pickle_success, pickle_msg = job.daemon.write_pickle() 554 if not pickle_success: 555 return pickle_success, pickle_msg 556 properties_success, properties_msg = job.daemon.write_properties() 557 if not properties_success: 558 return properties_success, properties_msg 559 560 service_file_path.parent.mkdir(parents=True, exist_ok=True) 561 562 with open(service_file_path, 'w+', encoding='utf-8') as f: 563 f.write(self.get_service_file_text(name, sysargs, debug=debug)) 564 565 symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path) 566 if not symlink_success: 567 return symlink_success, symlink_msg 568 569 commands = [ 570 ['daemon-reload'], 571 ['enable', service_name], 572 ['start', service_name], 573 ] 574 575 fails = 0 576 for command_list in commands: 577 command_success, command_msg = self.run_command(command_list, debug=debug) 578 if not command_success: 579 fails += 1 580 581 if fails > 1: 582 return False, "Failed to reload systemd." 583 584 return True, f"Started job '{name}' via systemd."
Create a job as a service to be run by systemd.
586 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 587 """ 588 Stop a job's service. 589 """ 590 job = self.get_hidden_job(name, debug=debug) 591 job.daemon._remove_stop_file() 592 593 status = self.get_job_status(name, debug=debug) 594 if status == 'paused': 595 return self.run_command( 596 ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)], 597 debug=debug, 598 ) 599 600 return self.run_command( 601 ['start', self.get_service_name(name, debug=debug)], 602 debug=debug, 603 )
Stop a job's service.
605 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 606 """ 607 Stop a job's service. 608 """ 609 job = self.get_hidden_job(name, debug=debug) 610 job.daemon._write_stop_file('quit') 611 sigint_success, sigint_msg = self.run_command( 612 ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)], 613 debug=debug, 614 ) 615 616 check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds') 617 loop_start = time.perf_counter() 618 timeout_seconds = get_config('jobs', 'timeout_seconds') 619 while (time.perf_counter() - loop_start) < timeout_seconds: 620 if self.get_job_status(name, debug=debug) == 'stopped': 621 return True, 'Success' 622 623 time.sleep(check_timeout_interval) 624 625 return self.run_command( 626 ['stop', self.get_service_name(name, debug=debug)], 627 debug=debug, 628 )
Stop a job's service.
630 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 631 """ 632 Pause a job's service. 633 """ 634 job = self.get_hidden_job(name, debug=debug) 635 job.daemon._write_stop_file('pause') 636 return self.run_command( 637 ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)], 638 debug=debug, 639 )
Pause a job's service.
641 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 642 """ 643 Delete a job's service. 644 """ 645 import meerschaum.config.paths as paths 646 job = self.get_hidden_job(name, debug=debug) 647 648 if not job.delete_after_completion: 649 _ = self.stop_job(name, debug=debug) 650 _ = self.run_command( 651 ['disable', self.get_service_name(name, debug=debug)], 652 debug=debug, 653 ) 654 655 service_job_path = self.get_service_job_path(name, debug=debug) 656 try: 657 if service_job_path.exists(): 658 shutil.rmtree(service_job_path) 659 except Exception as e: 660 warn(e) 661 return False, str(e) 662 663 service_logs_path = self.get_service_logs_path(name, debug=debug) 664 logs_paths = [ 665 (paths.SYSTEMD_LOGS_RESOURCES_PATH / name) 666 for name in os.listdir(paths.SYSTEMD_LOGS_RESOURCES_PATH) 667 if name.startswith(service_logs_path.name + '.') 668 ] 669 file_paths = [ 670 self.get_service_file_path(name, debug=debug), 671 self.get_service_symlink_file_path(name, debug=debug), 672 self.get_socket_path(name, debug=debug), 673 self.get_result_path(name, debug=debug), 674 ] + logs_paths 675 676 for path in file_paths: 677 if path.exists(): 678 try: 679 path.unlink() 680 except Exception as e: 681 warn(e) 682 return False, str(e) 683 684 _ = job.delete() 685 686 return self.run_command(['daemon-reload'], debug=debug)
Delete a job's service.
688 def get_logs(self, name: str, debug: bool = False) -> str: 689 """ 690 Return a job's journal logs. 691 """ 692 rotating_file = self.get_job_rotating_file(name, debug=debug) 693 return rotating_file.read()
Return a job's journal logs.
695 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 696 """ 697 Return a job's stop time. 698 """ 699 job = self.get_hidden_job(name, debug=debug) 700 return job.stop_time
Return a job's stop time.
702 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 703 """ 704 Return whether a job is blocking on stdin. 705 """ 706 socket_path = self.get_socket_path(name, debug=debug) 707 blocking_path = socket_path.parent / (socket_path.name + '.block') 708 return blocking_path.exists()
Return whether a job is blocking on stdin.
710 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 711 """ 712 Return the kwargs to the blocking prompt. 713 """ 714 job = self.get_hidden_job(name, debug=debug) 715 return job.get_prompt_kwargs(debug=debug)
Return the kwargs to the blocking prompt.
717 def get_job_rotating_file(self, name: str, debug: bool = False): 718 """ 719 Return a `RotatingFile` for the job's log output. 720 """ 721 from meerschaum.utils.daemon import RotatingFile 722 service_logs_path = self.get_service_logs_path(name, debug=debug) 723 return RotatingFile(service_logs_path)
Return a RotatingFile for the job's log output.
725 async def monitor_logs_async( 726 self, 727 name: str, 728 *args, 729 debug: bool = False, 730 **kwargs 731 ): 732 """ 733 Monitor a job's output. 734 """ 735 import meerschaum.config.paths as paths 736 job = self.get_hidden_job(name, debug=debug) 737 kwargs.update({ 738 '_logs_path': paths.SYSTEMD_LOGS_RESOURCES_PATH, 739 '_log': self.get_job_rotating_file(name, debug=debug), 740 '_stdin_file': self.get_job_stdin_file(name, debug=debug), 741 'debug': debug, 742 }) 743 await job.monitor_logs_async(*args, **kwargs)
Monitor a job's output.