meerschaum.jobs.systemd
Manage meerschaum.jobs.Job
via systemd
.
1#! /usr/bin/env python3 2# vim:fenc=utf-8 3 4""" 5Manage `meerschaum.jobs.Job` via `systemd`. 6""" 7 8import os 9import pathlib 10import shlex 11import sys 12import asyncio 13import json 14import time 15import shutil 16from datetime import datetime, timezone 17from functools import partial 18 19import meerschaum as mrsm 20from meerschaum.jobs import Job, Executor, make_executor 21from meerschaum.utils.typing import Dict, Any, List, SuccessTuple, Union, Optional, Callable 22from meerschaum.config import get_config 23from meerschaum._internal.static import STATIC_CONFIG 24from meerschaum.utils.warnings import warn, dprint 25 26JOB_METADATA_CACHE_SECONDS: int = STATIC_CONFIG['api']['jobs']['metadata_cache_seconds'] 27 28 29@make_executor 30class SystemdExecutor(Executor): 31 """ 32 Execute Meerschaum jobs via `systemd`. 33 """ 34 35 def get_job_names(self, debug: bool = False) -> List[str]: 36 """ 37 Return a list of existing jobs, including hidden ones. 38 """ 39 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 40 return [ 41 service_name[len('mrsm-'):(-1 * len('.service'))] 42 for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH) 43 if ( 44 service_name.startswith('mrsm-') 45 and service_name.endswith('.service') 46 ### Check for broken symlinks. 47 and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists() 48 ) 49 ] 50 51 def get_job_exists(self, name: str, debug: bool = False) -> bool: 52 """ 53 Return whether a job exists. 54 """ 55 user_services = self.get_job_names(debug=debug) 56 if debug: 57 dprint(f'Existing services: {user_services}') 58 return name in user_services 59 60 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 61 """ 62 Return a dictionary of `systemd` Jobs (including hidden jobs). 63 """ 64 user_services = self.get_job_names(debug=debug) 65 jobs = { 66 name: Job(name, executor_keys=str(self)) 67 for name in user_services 68 } 69 return { 70 name: job 71 for name, job in jobs.items() 72 } 73 74 def get_service_name(self, name: str, debug: bool = False) -> str: 75 """ 76 Return a job's service name. 77 """ 78 return f"mrsm-{name.replace(' ', '-')}.service" 79 80 def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path: 81 """ 82 Return the path for the job's files under the root directory. 83 """ 84 from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH 85 return SYSTEMD_JOBS_RESOURCES_PATH / name 86 87 def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 88 """ 89 Return the path to where to create the service symlink. 90 """ 91 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 92 return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug) 93 94 def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 95 """ 96 Return the path to a Job's service file. 97 """ 98 return ( 99 self.get_service_job_path(name, debug=debug) 100 / self.get_service_name(name, debug=debug) 101 ) 102 103 def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path: 104 """ 105 Return the path to direct service logs to. 106 """ 107 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 108 return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log') 109 110 def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path: 111 """ 112 Return the path to the FIFO file. 113 """ 114 return ( 115 self.get_service_job_path(name, debug=debug) 116 / (self.get_service_name(name, debug=debug) + '.stdin') 117 ) 118 119 def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path: 120 """ 121 Return the path to the result file. 122 """ 123 return ( 124 self.get_service_job_path(name, debug=debug) 125 / (self.get_service_name(name, debug=debug) + '.result.json') 126 ) 127 128 def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str: 129 """ 130 Return the contents of the unit file. 131 """ 132 service_logs_path = self.get_service_logs_path(name, debug=debug) 133 socket_path = self.get_socket_path(name, debug=debug) 134 result_path = self.get_result_path(name, debug=debug) 135 job = self.get_hidden_job(name, debug=debug) 136 137 sysargs_str = shlex.join(sysargs) 138 exec_str = f'{sys.executable} -m meerschaum {sysargs_str}' 139 mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()]) 140 mrsm_env_vars = { 141 key: val 142 for key, val in os.environ.items() 143 if key in mrsm_env_var_names 144 } 145 146 ### Add new environment variables for the service process. 147 mrsm_env_vars.update({ 148 STATIC_CONFIG['environment']['daemon_id']: name, 149 STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(), 150 STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(), 151 STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(), 152 STATIC_CONFIG['environment']['systemd_delete_job']: ( 153 '1' 154 if job.delete_after_completion 155 else '0' 156 ), 157 }) 158 159 ### Allow for user-defined environment variables. 160 mrsm_env_vars.update(job.env) 161 162 environment_lines = [ 163 f"Environment={key}={shlex.quote(str(val))}" 164 for key, val in mrsm_env_vars.items() 165 ] 166 environment_str = '\n'.join(environment_lines) 167 service_name = self.get_service_name(name, debug=debug) 168 169 service_text = ( 170 "[Unit]\n" 171 f"Description=Run the job '{name}'\n" 172 "\n" 173 "[Service]\n" 174 f"ExecStart={exec_str}\n" 175 "KillSignal=SIGTERM\n" 176 "Restart=always\n" 177 "RestartPreventExitStatus=0\n" 178 f"SyslogIdentifier={service_name}\n" 179 f"{environment_str}\n" 180 "\n" 181 "[Install]\n" 182 "WantedBy=default.target\n" 183 ) 184 return service_text 185 186 def get_socket_file_text(self, name: str, debug: bool = False) -> str: 187 """ 188 Return the contents of the socket file. 189 """ 190 service_name = self.get_service_name(name, debug=debug) 191 socket_path = self.get_socket_path(name, debug=debug) 192 socket_text = ( 193 "[Unit]\n" 194 f"BindsTo={service_name}\n" 195 "\n" 196 "[Socket]\n" 197 f"ListenFIFO={socket_path.as_posix()}\n" 198 "FileDescriptorName=stdin\n" 199 "RemoveOnStop=true\n" 200 "SocketMode=0660\n" 201 ) 202 return socket_text 203 204 def get_hidden_job( 205 self, 206 name: str, 207 sysargs: Optional[List[str]] = None, 208 properties: Optional[Dict[str, Any]] = None, 209 debug: bool = False, 210 ): 211 """ 212 Return the hidden "sister" job to store a job's parameters. 213 """ 214 job = Job( 215 name, 216 sysargs, 217 executor_keys='local', 218 _properties=properties, 219 _rotating_log=self.get_job_rotating_file(name, debug=debug), 220 _stdin_file=self.get_job_stdin_file(name, debug=debug), 221 _status_hook=partial(self.get_job_status, name), 222 _result_hook=partial(self.get_job_result, name), 223 _externally_managed=True, 224 ) 225 return job 226 227 228 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 229 """ 230 Return metadata about a job. 231 """ 232 now = time.perf_counter() 233 234 if '_jobs_metadata' not in self.__dict__: 235 self._jobs_metadata: Dict[str, Any] = {} 236 237 if name in self._jobs_metadata: 238 ts = self._jobs_metadata[name].get('timestamp', None) 239 240 if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS: 241 if debug: 242 dprint(f"Retuning cached metadata for job '{name}'.") 243 return self._jobs_metadata[name]['metadata'] 244 245 metadata = { 246 'sysargs': self.get_job_sysargs(name, debug=debug), 247 'result': self.get_job_result(name, debug=debug), 248 'restart': self.get_job_restart(name, debug=debug), 249 'daemon': { 250 'status': self.get_job_status(name, debug=debug), 251 'pid': self.get_job_pid(name, debug=debug), 252 'properties': self.get_job_properties(name, debug=debug), 253 }, 254 } 255 self._jobs_metadata[name] = { 256 'timestamp': now, 257 'metadata': metadata, 258 } 259 return metadata 260 261 def get_job_restart(self, name: str, debug: bool = False) -> bool: 262 """ 263 Return whether a job restarts. 264 """ 265 from meerschaum.jobs._Job import RESTART_FLAGS 266 sysargs = self.get_job_sysargs(name, debug=debug) 267 if not sysargs: 268 return False 269 270 for flag in RESTART_FLAGS: 271 if flag in sysargs: 272 return True 273 274 return False 275 276 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 277 """ 278 Return the properties for a job. 279 """ 280 job = self.get_hidden_job(name, debug=debug) 281 return { 282 k: v for k, v in job.daemon.properties.items() 283 if k != 'externally_managed' 284 } 285 286 def get_job_process(self, name: str, debug: bool = False): 287 """ 288 Return a `psutil.Process` for the job's PID. 289 """ 290 pid = self.get_job_pid(name, debug=debug) 291 if pid is None: 292 return None 293 294 psutil = mrsm.attempt_import('psutil') 295 try: 296 return psutil.Process(pid) 297 except Exception: 298 return None 299 300 def get_job_status(self, name: str, debug: bool = False) -> str: 301 """ 302 Return the job's service status. 303 """ 304 output = self.run_command( 305 ['is-active', self.get_service_name(name, debug=debug)], 306 as_output=True, 307 debug=debug, 308 ) 309 310 if output == 'activating': 311 return 'running' 312 313 if output == 'active': 314 process = self.get_job_process(name, debug=debug) 315 if process is None: 316 return 'stopped' 317 318 try: 319 if process.status() == 'stopped': 320 return 'paused' 321 except Exception: 322 return 'stopped' 323 324 return 'running' 325 326 return 'stopped' 327 328 def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]: 329 """ 330 Return the job's service PID. 331 """ 332 from meerschaum.utils.misc import is_int 333 334 output = self.run_command( 335 [ 336 'show', 337 self.get_service_name(name, debug=debug), 338 '--property=MainPID', 339 ], 340 as_output=True, 341 debug=debug, 342 ) 343 if not output.startswith('MainPID='): 344 return None 345 346 pid_str = output[len('MainPID='):] 347 if pid_str == '0': 348 return None 349 350 if is_int(pid_str): 351 return int(pid_str) 352 353 return None 354 355 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 356 """ 357 Return when a job began running. 358 """ 359 output = self.run_command( 360 [ 361 'show', 362 self.get_service_name(name, debug=debug), 363 '--property=ActiveEnterTimestamp' 364 ], 365 as_output=True, 366 debug=debug, 367 ) 368 if not output.startswith('ActiveEnterTimestamp'): 369 return None 370 371 dt_str = output.split('=')[-1] 372 if not dt_str: 373 return None 374 375 dateutil_parser = mrsm.attempt_import('dateutil.parser') 376 try: 377 dt = dateutil_parser.parse(dt_str) 378 except Exception as e: 379 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 380 return None 381 382 return dt.astimezone(timezone.utc).isoformat() 383 384 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 385 """ 386 Return when a job began running. 387 """ 388 output = self.run_command( 389 [ 390 'show', 391 self.get_service_name(name, debug=debug), 392 '--property=InactiveEnterTimestamp' 393 ], 394 as_output=True, 395 debug=debug, 396 ) 397 if not output.startswith('InactiveEnterTimestamp'): 398 return None 399 400 dt_str = output.split('=')[-1] 401 if not dt_str: 402 return None 403 404 dateutil_parser = mrsm.attempt_import('dateutil.parser') 405 406 try: 407 dt = dateutil_parser.parse(dt_str) 408 except Exception as e: 409 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 410 return None 411 return dt.astimezone(timezone.utc).isoformat() 412 413 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 414 """ 415 Return when a job was paused. 416 """ 417 job = self.get_hidden_job(name, debug=debug) 418 if self.get_job_status(name, debug=debug) != 'paused': 419 return None 420 421 stop_time = job.stop_time 422 if stop_time is None: 423 return None 424 425 return stop_time.isoformat() 426 427 def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple: 428 """ 429 Return the job's result SuccessTuple. 430 """ 431 result_path = self.get_result_path(name, debug=debug) 432 if not result_path.exists(): 433 return False, "No result available." 434 435 try: 436 with open(result_path, 'r', encoding='utf-8') as f: 437 result = json.load(f) 438 except Exception: 439 return False, f"Could not read result for Job '{name}'." 440 441 return tuple(result) 442 443 def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]: 444 """ 445 Return the sysargs from the service file. 446 """ 447 service_file_path = self.get_service_file_path(name, debug=debug) 448 if not service_file_path.exists(): 449 return [] 450 451 with open(service_file_path, 'r', encoding='utf-8') as f: 452 service_lines = f.readlines() 453 454 for line in service_lines: 455 if line.startswith('ExecStart='): 456 sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0] 457 return shlex.split(sysargs_str) 458 459 return [] 460 461 def run_command( 462 self, 463 command_args: List[str], 464 as_output: bool = False, 465 debug: bool = False, 466 ) -> Union[SuccessTuple, str]: 467 """ 468 Run a `systemd` command and return success. 469 470 Parameters 471 ---------- 472 command_args: List[str] 473 The command to pass to `systemctl --user`. 474 475 as_output: bool, default False 476 If `True`, return the process stdout output. 477 Defaults to a `SuccessTuple`. 478 479 Returns 480 ------- 481 A `SuccessTuple` indicating success or a str for the process output. 482 """ 483 from meerschaum.utils.process import run_process 484 485 if command_args[:2] != ['systemctl', '--user']: 486 command_args = ['systemctl', '--user'] + command_args 487 488 if debug: 489 dprint(shlex.join(command_args)) 490 491 proc = run_process( 492 command_args, 493 foreground=False, 494 as_proc=True, 495 capture_output=True, 496 text=True, 497 ) 498 stdout, stderr = proc.communicate() 499 if debug: 500 dprint(f"{stdout}") 501 502 if as_output: 503 return stdout.strip() 504 505 command_success = proc.wait() == 0 506 command_msg = ( 507 "Success" 508 if command_success 509 else f"Failed to execute command `{shlex.join(command_args)}`." 510 ) 511 return command_success, command_msg 512 513 def get_job_stdin_file(self, name: str, debug: bool = False): 514 """ 515 Return a `StdinFile` for the job. 516 """ 517 from meerschaum.utils.daemon import StdinFile 518 if '_stdin_files' not in self.__dict__: 519 self._stdin_files: Dict[str, StdinFile] = {} 520 521 if name not in self._stdin_files: 522 socket_path = self.get_socket_path(name, debug=debug) 523 socket_path.parent.mkdir(parents=True, exist_ok=True) 524 self._stdin_files[name] = StdinFile(socket_path) 525 526 return self._stdin_files[name] 527 528 def create_job( 529 self, 530 name: str, 531 sysargs: List[str], 532 properties: Optional[Dict[str, Any]] = None, 533 debug: bool = False, 534 ) -> SuccessTuple: 535 """ 536 Create a job as a service to be run by `systemd`. 537 """ 538 from meerschaum.utils.misc import make_symlink 539 service_name = self.get_service_name(name, debug=debug) 540 service_file_path = self.get_service_file_path(name, debug=debug) 541 service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug) 542 socket_stdin = self.get_job_stdin_file(name, debug=debug) 543 _ = socket_stdin.file_handler 544 545 ### Init the externally_managed file. 546 ### NOTE: We must write the pickle file in addition to the properties file. 547 job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug) 548 job._set_externally_managed() 549 pickle_success, pickle_msg = job.daemon.write_pickle() 550 if not pickle_success: 551 return pickle_success, pickle_msg 552 properties_success, properties_msg = job.daemon.write_properties() 553 if not properties_success: 554 return properties_success, properties_msg 555 556 service_file_path.parent.mkdir(parents=True, exist_ok=True) 557 558 with open(service_file_path, 'w+', encoding='utf-8') as f: 559 f.write(self.get_service_file_text(name, sysargs, debug=debug)) 560 561 symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path) 562 if not symlink_success: 563 return symlink_success, symlink_msg 564 565 commands = [ 566 ['daemon-reload'], 567 ['enable', service_name], 568 ['start', service_name], 569 ] 570 571 fails = 0 572 for command_list in commands: 573 command_success, command_msg = self.run_command(command_list, debug=debug) 574 if not command_success: 575 fails += 1 576 577 if fails > 1: 578 return False, "Failed to reload systemd." 579 580 return True, f"Started job '{name}' via systemd." 581 582 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 583 """ 584 Stop a job's service. 585 """ 586 job = self.get_hidden_job(name, debug=debug) 587 job.daemon._remove_stop_file() 588 589 status = self.get_job_status(name, debug=debug) 590 if status == 'paused': 591 return self.run_command( 592 ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)], 593 debug=debug, 594 ) 595 596 return self.run_command( 597 ['start', self.get_service_name(name, debug=debug)], 598 debug=debug, 599 ) 600 601 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 602 """ 603 Stop a job's service. 604 """ 605 job = self.get_hidden_job(name, debug=debug) 606 job.daemon._write_stop_file('quit') 607 sigint_success, sigint_msg = self.run_command( 608 ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)], 609 debug=debug, 610 ) 611 612 check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds') 613 loop_start = time.perf_counter() 614 timeout_seconds = get_config('jobs', 'timeout_seconds') 615 while (time.perf_counter() - loop_start) < timeout_seconds: 616 if self.get_job_status(name, debug=debug) == 'stopped': 617 return True, 'Success' 618 619 time.sleep(check_timeout_interval) 620 621 return self.run_command( 622 ['stop', self.get_service_name(name, debug=debug)], 623 debug=debug, 624 ) 625 626 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 627 """ 628 Pause a job's service. 629 """ 630 job = self.get_hidden_job(name, debug=debug) 631 job.daemon._write_stop_file('pause') 632 return self.run_command( 633 ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)], 634 debug=debug, 635 ) 636 637 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 638 """ 639 Delete a job's service. 640 """ 641 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 642 job = self.get_hidden_job(name, debug=debug) 643 644 if not job.delete_after_completion: 645 _ = self.stop_job(name, debug=debug) 646 _ = self.run_command( 647 ['disable', self.get_service_name(name, debug=debug)], 648 debug=debug, 649 ) 650 651 service_job_path = self.get_service_job_path(name, debug=debug) 652 try: 653 if service_job_path.exists(): 654 shutil.rmtree(service_job_path) 655 except Exception as e: 656 warn(e) 657 return False, str(e) 658 659 service_logs_path = self.get_service_logs_path(name, debug=debug) 660 logs_paths = [ 661 (SYSTEMD_LOGS_RESOURCES_PATH / name) 662 for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH) 663 if name.startswith(service_logs_path.name + '.') 664 ] 665 paths = [ 666 self.get_service_file_path(name, debug=debug), 667 self.get_service_symlink_file_path(name, debug=debug), 668 self.get_socket_path(name, debug=debug), 669 self.get_result_path(name, debug=debug), 670 ] + logs_paths 671 672 for path in paths: 673 if path.exists(): 674 try: 675 path.unlink() 676 except Exception as e: 677 warn(e) 678 return False, str(e) 679 680 _ = job.delete() 681 682 return self.run_command(['daemon-reload'], debug=debug) 683 684 def get_logs(self, name: str, debug: bool = False) -> str: 685 """ 686 Return a job's journal logs. 687 """ 688 rotating_file = self.get_job_rotating_file(name, debug=debug) 689 return rotating_file.read() 690 691 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 692 """ 693 Return a job's stop time. 694 """ 695 job = self.get_hidden_job(name, debug=debug) 696 return job.stop_time 697 698 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 699 """ 700 Return whether a job is blocking on stdin. 701 """ 702 socket_path = self.get_socket_path(name, debug=debug) 703 blocking_path = socket_path.parent / (socket_path.name + '.block') 704 return blocking_path.exists() 705 706 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 707 """ 708 Return the kwargs to the blocking prompt. 709 """ 710 job = self.get_hidden_job(name, debug=debug) 711 return job.get_prompt_kwargs(debug=debug) 712 713 def get_job_rotating_file(self, name: str, debug: bool = False): 714 """ 715 Return a `RotatingFile` for the job's log output. 716 """ 717 from meerschaum.utils.daemon import RotatingFile 718 service_logs_path = self.get_service_logs_path(name, debug=debug) 719 return RotatingFile(service_logs_path) 720 721 async def monitor_logs_async( 722 self, 723 name: str, 724 *args, 725 debug: bool = False, 726 **kwargs 727 ): 728 """ 729 Monitor a job's output. 730 """ 731 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 732 job = self.get_hidden_job(name, debug=debug) 733 kwargs.update({ 734 '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH, 735 '_log': self.get_job_rotating_file(name, debug=debug), 736 '_stdin_file': self.get_job_stdin_file(name, debug=debug), 737 'debug': debug, 738 }) 739 await job.monitor_logs_async(*args, **kwargs) 740 741 def monitor_logs(self, *args, **kwargs): 742 """ 743 Monitor a job's output. 744 """ 745 asyncio.run(self.monitor_logs_async(*args, **kwargs))
30@make_executor 31class SystemdExecutor(Executor): 32 """ 33 Execute Meerschaum jobs via `systemd`. 34 """ 35 36 def get_job_names(self, debug: bool = False) -> List[str]: 37 """ 38 Return a list of existing jobs, including hidden ones. 39 """ 40 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 41 return [ 42 service_name[len('mrsm-'):(-1 * len('.service'))] 43 for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH) 44 if ( 45 service_name.startswith('mrsm-') 46 and service_name.endswith('.service') 47 ### Check for broken symlinks. 48 and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists() 49 ) 50 ] 51 52 def get_job_exists(self, name: str, debug: bool = False) -> bool: 53 """ 54 Return whether a job exists. 55 """ 56 user_services = self.get_job_names(debug=debug) 57 if debug: 58 dprint(f'Existing services: {user_services}') 59 return name in user_services 60 61 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 62 """ 63 Return a dictionary of `systemd` Jobs (including hidden jobs). 64 """ 65 user_services = self.get_job_names(debug=debug) 66 jobs = { 67 name: Job(name, executor_keys=str(self)) 68 for name in user_services 69 } 70 return { 71 name: job 72 for name, job in jobs.items() 73 } 74 75 def get_service_name(self, name: str, debug: bool = False) -> str: 76 """ 77 Return a job's service name. 78 """ 79 return f"mrsm-{name.replace(' ', '-')}.service" 80 81 def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path: 82 """ 83 Return the path for the job's files under the root directory. 84 """ 85 from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH 86 return SYSTEMD_JOBS_RESOURCES_PATH / name 87 88 def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 89 """ 90 Return the path to where to create the service symlink. 91 """ 92 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 93 return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug) 94 95 def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 96 """ 97 Return the path to a Job's service file. 98 """ 99 return ( 100 self.get_service_job_path(name, debug=debug) 101 / self.get_service_name(name, debug=debug) 102 ) 103 104 def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path: 105 """ 106 Return the path to direct service logs to. 107 """ 108 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 109 return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log') 110 111 def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path: 112 """ 113 Return the path to the FIFO file. 114 """ 115 return ( 116 self.get_service_job_path(name, debug=debug) 117 / (self.get_service_name(name, debug=debug) + '.stdin') 118 ) 119 120 def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path: 121 """ 122 Return the path to the result file. 123 """ 124 return ( 125 self.get_service_job_path(name, debug=debug) 126 / (self.get_service_name(name, debug=debug) + '.result.json') 127 ) 128 129 def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str: 130 """ 131 Return the contents of the unit file. 132 """ 133 service_logs_path = self.get_service_logs_path(name, debug=debug) 134 socket_path = self.get_socket_path(name, debug=debug) 135 result_path = self.get_result_path(name, debug=debug) 136 job = self.get_hidden_job(name, debug=debug) 137 138 sysargs_str = shlex.join(sysargs) 139 exec_str = f'{sys.executable} -m meerschaum {sysargs_str}' 140 mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()]) 141 mrsm_env_vars = { 142 key: val 143 for key, val in os.environ.items() 144 if key in mrsm_env_var_names 145 } 146 147 ### Add new environment variables for the service process. 148 mrsm_env_vars.update({ 149 STATIC_CONFIG['environment']['daemon_id']: name, 150 STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(), 151 STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(), 152 STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(), 153 STATIC_CONFIG['environment']['systemd_delete_job']: ( 154 '1' 155 if job.delete_after_completion 156 else '0' 157 ), 158 }) 159 160 ### Allow for user-defined environment variables. 161 mrsm_env_vars.update(job.env) 162 163 environment_lines = [ 164 f"Environment={key}={shlex.quote(str(val))}" 165 for key, val in mrsm_env_vars.items() 166 ] 167 environment_str = '\n'.join(environment_lines) 168 service_name = self.get_service_name(name, debug=debug) 169 170 service_text = ( 171 "[Unit]\n" 172 f"Description=Run the job '{name}'\n" 173 "\n" 174 "[Service]\n" 175 f"ExecStart={exec_str}\n" 176 "KillSignal=SIGTERM\n" 177 "Restart=always\n" 178 "RestartPreventExitStatus=0\n" 179 f"SyslogIdentifier={service_name}\n" 180 f"{environment_str}\n" 181 "\n" 182 "[Install]\n" 183 "WantedBy=default.target\n" 184 ) 185 return service_text 186 187 def get_socket_file_text(self, name: str, debug: bool = False) -> str: 188 """ 189 Return the contents of the socket file. 190 """ 191 service_name = self.get_service_name(name, debug=debug) 192 socket_path = self.get_socket_path(name, debug=debug) 193 socket_text = ( 194 "[Unit]\n" 195 f"BindsTo={service_name}\n" 196 "\n" 197 "[Socket]\n" 198 f"ListenFIFO={socket_path.as_posix()}\n" 199 "FileDescriptorName=stdin\n" 200 "RemoveOnStop=true\n" 201 "SocketMode=0660\n" 202 ) 203 return socket_text 204 205 def get_hidden_job( 206 self, 207 name: str, 208 sysargs: Optional[List[str]] = None, 209 properties: Optional[Dict[str, Any]] = None, 210 debug: bool = False, 211 ): 212 """ 213 Return the hidden "sister" job to store a job's parameters. 214 """ 215 job = Job( 216 name, 217 sysargs, 218 executor_keys='local', 219 _properties=properties, 220 _rotating_log=self.get_job_rotating_file(name, debug=debug), 221 _stdin_file=self.get_job_stdin_file(name, debug=debug), 222 _status_hook=partial(self.get_job_status, name), 223 _result_hook=partial(self.get_job_result, name), 224 _externally_managed=True, 225 ) 226 return job 227 228 229 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 230 """ 231 Return metadata about a job. 232 """ 233 now = time.perf_counter() 234 235 if '_jobs_metadata' not in self.__dict__: 236 self._jobs_metadata: Dict[str, Any] = {} 237 238 if name in self._jobs_metadata: 239 ts = self._jobs_metadata[name].get('timestamp', None) 240 241 if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS: 242 if debug: 243 dprint(f"Retuning cached metadata for job '{name}'.") 244 return self._jobs_metadata[name]['metadata'] 245 246 metadata = { 247 'sysargs': self.get_job_sysargs(name, debug=debug), 248 'result': self.get_job_result(name, debug=debug), 249 'restart': self.get_job_restart(name, debug=debug), 250 'daemon': { 251 'status': self.get_job_status(name, debug=debug), 252 'pid': self.get_job_pid(name, debug=debug), 253 'properties': self.get_job_properties(name, debug=debug), 254 }, 255 } 256 self._jobs_metadata[name] = { 257 'timestamp': now, 258 'metadata': metadata, 259 } 260 return metadata 261 262 def get_job_restart(self, name: str, debug: bool = False) -> bool: 263 """ 264 Return whether a job restarts. 265 """ 266 from meerschaum.jobs._Job import RESTART_FLAGS 267 sysargs = self.get_job_sysargs(name, debug=debug) 268 if not sysargs: 269 return False 270 271 for flag in RESTART_FLAGS: 272 if flag in sysargs: 273 return True 274 275 return False 276 277 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 278 """ 279 Return the properties for a job. 280 """ 281 job = self.get_hidden_job(name, debug=debug) 282 return { 283 k: v for k, v in job.daemon.properties.items() 284 if k != 'externally_managed' 285 } 286 287 def get_job_process(self, name: str, debug: bool = False): 288 """ 289 Return a `psutil.Process` for the job's PID. 290 """ 291 pid = self.get_job_pid(name, debug=debug) 292 if pid is None: 293 return None 294 295 psutil = mrsm.attempt_import('psutil') 296 try: 297 return psutil.Process(pid) 298 except Exception: 299 return None 300 301 def get_job_status(self, name: str, debug: bool = False) -> str: 302 """ 303 Return the job's service status. 304 """ 305 output = self.run_command( 306 ['is-active', self.get_service_name(name, debug=debug)], 307 as_output=True, 308 debug=debug, 309 ) 310 311 if output == 'activating': 312 return 'running' 313 314 if output == 'active': 315 process = self.get_job_process(name, debug=debug) 316 if process is None: 317 return 'stopped' 318 319 try: 320 if process.status() == 'stopped': 321 return 'paused' 322 except Exception: 323 return 'stopped' 324 325 return 'running' 326 327 return 'stopped' 328 329 def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]: 330 """ 331 Return the job's service PID. 332 """ 333 from meerschaum.utils.misc import is_int 334 335 output = self.run_command( 336 [ 337 'show', 338 self.get_service_name(name, debug=debug), 339 '--property=MainPID', 340 ], 341 as_output=True, 342 debug=debug, 343 ) 344 if not output.startswith('MainPID='): 345 return None 346 347 pid_str = output[len('MainPID='):] 348 if pid_str == '0': 349 return None 350 351 if is_int(pid_str): 352 return int(pid_str) 353 354 return None 355 356 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 357 """ 358 Return when a job began running. 359 """ 360 output = self.run_command( 361 [ 362 'show', 363 self.get_service_name(name, debug=debug), 364 '--property=ActiveEnterTimestamp' 365 ], 366 as_output=True, 367 debug=debug, 368 ) 369 if not output.startswith('ActiveEnterTimestamp'): 370 return None 371 372 dt_str = output.split('=')[-1] 373 if not dt_str: 374 return None 375 376 dateutil_parser = mrsm.attempt_import('dateutil.parser') 377 try: 378 dt = dateutil_parser.parse(dt_str) 379 except Exception as e: 380 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 381 return None 382 383 return dt.astimezone(timezone.utc).isoformat() 384 385 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 386 """ 387 Return when a job began running. 388 """ 389 output = self.run_command( 390 [ 391 'show', 392 self.get_service_name(name, debug=debug), 393 '--property=InactiveEnterTimestamp' 394 ], 395 as_output=True, 396 debug=debug, 397 ) 398 if not output.startswith('InactiveEnterTimestamp'): 399 return None 400 401 dt_str = output.split('=')[-1] 402 if not dt_str: 403 return None 404 405 dateutil_parser = mrsm.attempt_import('dateutil.parser') 406 407 try: 408 dt = dateutil_parser.parse(dt_str) 409 except Exception as e: 410 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 411 return None 412 return dt.astimezone(timezone.utc).isoformat() 413 414 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 415 """ 416 Return when a job was paused. 417 """ 418 job = self.get_hidden_job(name, debug=debug) 419 if self.get_job_status(name, debug=debug) != 'paused': 420 return None 421 422 stop_time = job.stop_time 423 if stop_time is None: 424 return None 425 426 return stop_time.isoformat() 427 428 def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple: 429 """ 430 Return the job's result SuccessTuple. 431 """ 432 result_path = self.get_result_path(name, debug=debug) 433 if not result_path.exists(): 434 return False, "No result available." 435 436 try: 437 with open(result_path, 'r', encoding='utf-8') as f: 438 result = json.load(f) 439 except Exception: 440 return False, f"Could not read result for Job '{name}'." 441 442 return tuple(result) 443 444 def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]: 445 """ 446 Return the sysargs from the service file. 447 """ 448 service_file_path = self.get_service_file_path(name, debug=debug) 449 if not service_file_path.exists(): 450 return [] 451 452 with open(service_file_path, 'r', encoding='utf-8') as f: 453 service_lines = f.readlines() 454 455 for line in service_lines: 456 if line.startswith('ExecStart='): 457 sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0] 458 return shlex.split(sysargs_str) 459 460 return [] 461 462 def run_command( 463 self, 464 command_args: List[str], 465 as_output: bool = False, 466 debug: bool = False, 467 ) -> Union[SuccessTuple, str]: 468 """ 469 Run a `systemd` command and return success. 470 471 Parameters 472 ---------- 473 command_args: List[str] 474 The command to pass to `systemctl --user`. 475 476 as_output: bool, default False 477 If `True`, return the process stdout output. 478 Defaults to a `SuccessTuple`. 479 480 Returns 481 ------- 482 A `SuccessTuple` indicating success or a str for the process output. 483 """ 484 from meerschaum.utils.process import run_process 485 486 if command_args[:2] != ['systemctl', '--user']: 487 command_args = ['systemctl', '--user'] + command_args 488 489 if debug: 490 dprint(shlex.join(command_args)) 491 492 proc = run_process( 493 command_args, 494 foreground=False, 495 as_proc=True, 496 capture_output=True, 497 text=True, 498 ) 499 stdout, stderr = proc.communicate() 500 if debug: 501 dprint(f"{stdout}") 502 503 if as_output: 504 return stdout.strip() 505 506 command_success = proc.wait() == 0 507 command_msg = ( 508 "Success" 509 if command_success 510 else f"Failed to execute command `{shlex.join(command_args)}`." 511 ) 512 return command_success, command_msg 513 514 def get_job_stdin_file(self, name: str, debug: bool = False): 515 """ 516 Return a `StdinFile` for the job. 517 """ 518 from meerschaum.utils.daemon import StdinFile 519 if '_stdin_files' not in self.__dict__: 520 self._stdin_files: Dict[str, StdinFile] = {} 521 522 if name not in self._stdin_files: 523 socket_path = self.get_socket_path(name, debug=debug) 524 socket_path.parent.mkdir(parents=True, exist_ok=True) 525 self._stdin_files[name] = StdinFile(socket_path) 526 527 return self._stdin_files[name] 528 529 def create_job( 530 self, 531 name: str, 532 sysargs: List[str], 533 properties: Optional[Dict[str, Any]] = None, 534 debug: bool = False, 535 ) -> SuccessTuple: 536 """ 537 Create a job as a service to be run by `systemd`. 538 """ 539 from meerschaum.utils.misc import make_symlink 540 service_name = self.get_service_name(name, debug=debug) 541 service_file_path = self.get_service_file_path(name, debug=debug) 542 service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug) 543 socket_stdin = self.get_job_stdin_file(name, debug=debug) 544 _ = socket_stdin.file_handler 545 546 ### Init the externally_managed file. 547 ### NOTE: We must write the pickle file in addition to the properties file. 548 job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug) 549 job._set_externally_managed() 550 pickle_success, pickle_msg = job.daemon.write_pickle() 551 if not pickle_success: 552 return pickle_success, pickle_msg 553 properties_success, properties_msg = job.daemon.write_properties() 554 if not properties_success: 555 return properties_success, properties_msg 556 557 service_file_path.parent.mkdir(parents=True, exist_ok=True) 558 559 with open(service_file_path, 'w+', encoding='utf-8') as f: 560 f.write(self.get_service_file_text(name, sysargs, debug=debug)) 561 562 symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path) 563 if not symlink_success: 564 return symlink_success, symlink_msg 565 566 commands = [ 567 ['daemon-reload'], 568 ['enable', service_name], 569 ['start', service_name], 570 ] 571 572 fails = 0 573 for command_list in commands: 574 command_success, command_msg = self.run_command(command_list, debug=debug) 575 if not command_success: 576 fails += 1 577 578 if fails > 1: 579 return False, "Failed to reload systemd." 580 581 return True, f"Started job '{name}' via systemd." 582 583 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 584 """ 585 Stop a job's service. 586 """ 587 job = self.get_hidden_job(name, debug=debug) 588 job.daemon._remove_stop_file() 589 590 status = self.get_job_status(name, debug=debug) 591 if status == 'paused': 592 return self.run_command( 593 ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)], 594 debug=debug, 595 ) 596 597 return self.run_command( 598 ['start', self.get_service_name(name, debug=debug)], 599 debug=debug, 600 ) 601 602 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 603 """ 604 Stop a job's service. 605 """ 606 job = self.get_hidden_job(name, debug=debug) 607 job.daemon._write_stop_file('quit') 608 sigint_success, sigint_msg = self.run_command( 609 ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)], 610 debug=debug, 611 ) 612 613 check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds') 614 loop_start = time.perf_counter() 615 timeout_seconds = get_config('jobs', 'timeout_seconds') 616 while (time.perf_counter() - loop_start) < timeout_seconds: 617 if self.get_job_status(name, debug=debug) == 'stopped': 618 return True, 'Success' 619 620 time.sleep(check_timeout_interval) 621 622 return self.run_command( 623 ['stop', self.get_service_name(name, debug=debug)], 624 debug=debug, 625 ) 626 627 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 628 """ 629 Pause a job's service. 630 """ 631 job = self.get_hidden_job(name, debug=debug) 632 job.daemon._write_stop_file('pause') 633 return self.run_command( 634 ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)], 635 debug=debug, 636 ) 637 638 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 639 """ 640 Delete a job's service. 641 """ 642 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 643 job = self.get_hidden_job(name, debug=debug) 644 645 if not job.delete_after_completion: 646 _ = self.stop_job(name, debug=debug) 647 _ = self.run_command( 648 ['disable', self.get_service_name(name, debug=debug)], 649 debug=debug, 650 ) 651 652 service_job_path = self.get_service_job_path(name, debug=debug) 653 try: 654 if service_job_path.exists(): 655 shutil.rmtree(service_job_path) 656 except Exception as e: 657 warn(e) 658 return False, str(e) 659 660 service_logs_path = self.get_service_logs_path(name, debug=debug) 661 logs_paths = [ 662 (SYSTEMD_LOGS_RESOURCES_PATH / name) 663 for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH) 664 if name.startswith(service_logs_path.name + '.') 665 ] 666 paths = [ 667 self.get_service_file_path(name, debug=debug), 668 self.get_service_symlink_file_path(name, debug=debug), 669 self.get_socket_path(name, debug=debug), 670 self.get_result_path(name, debug=debug), 671 ] + logs_paths 672 673 for path in paths: 674 if path.exists(): 675 try: 676 path.unlink() 677 except Exception as e: 678 warn(e) 679 return False, str(e) 680 681 _ = job.delete() 682 683 return self.run_command(['daemon-reload'], debug=debug) 684 685 def get_logs(self, name: str, debug: bool = False) -> str: 686 """ 687 Return a job's journal logs. 688 """ 689 rotating_file = self.get_job_rotating_file(name, debug=debug) 690 return rotating_file.read() 691 692 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 693 """ 694 Return a job's stop time. 695 """ 696 job = self.get_hidden_job(name, debug=debug) 697 return job.stop_time 698 699 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 700 """ 701 Return whether a job is blocking on stdin. 702 """ 703 socket_path = self.get_socket_path(name, debug=debug) 704 blocking_path = socket_path.parent / (socket_path.name + '.block') 705 return blocking_path.exists() 706 707 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 708 """ 709 Return the kwargs to the blocking prompt. 710 """ 711 job = self.get_hidden_job(name, debug=debug) 712 return job.get_prompt_kwargs(debug=debug) 713 714 def get_job_rotating_file(self, name: str, debug: bool = False): 715 """ 716 Return a `RotatingFile` for the job's log output. 717 """ 718 from meerschaum.utils.daemon import RotatingFile 719 service_logs_path = self.get_service_logs_path(name, debug=debug) 720 return RotatingFile(service_logs_path) 721 722 async def monitor_logs_async( 723 self, 724 name: str, 725 *args, 726 debug: bool = False, 727 **kwargs 728 ): 729 """ 730 Monitor a job's output. 731 """ 732 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 733 job = self.get_hidden_job(name, debug=debug) 734 kwargs.update({ 735 '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH, 736 '_log': self.get_job_rotating_file(name, debug=debug), 737 '_stdin_file': self.get_job_stdin_file(name, debug=debug), 738 'debug': debug, 739 }) 740 await job.monitor_logs_async(*args, **kwargs) 741 742 def monitor_logs(self, *args, **kwargs): 743 """ 744 Monitor a job's output. 745 """ 746 asyncio.run(self.monitor_logs_async(*args, **kwargs))
Execute Meerschaum jobs via systemd
.
36 def get_job_names(self, debug: bool = False) -> List[str]: 37 """ 38 Return a list of existing jobs, including hidden ones. 39 """ 40 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 41 return [ 42 service_name[len('mrsm-'):(-1 * len('.service'))] 43 for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH) 44 if ( 45 service_name.startswith('mrsm-') 46 and service_name.endswith('.service') 47 ### Check for broken symlinks. 48 and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists() 49 ) 50 ]
Return a list of existing jobs, including hidden ones.
52 def get_job_exists(self, name: str, debug: bool = False) -> bool: 53 """ 54 Return whether a job exists. 55 """ 56 user_services = self.get_job_names(debug=debug) 57 if debug: 58 dprint(f'Existing services: {user_services}') 59 return name in user_services
Return whether a job exists.
61 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 62 """ 63 Return a dictionary of `systemd` Jobs (including hidden jobs). 64 """ 65 user_services = self.get_job_names(debug=debug) 66 jobs = { 67 name: Job(name, executor_keys=str(self)) 68 for name in user_services 69 } 70 return { 71 name: job 72 for name, job in jobs.items() 73 }
Return a dictionary of systemd
Jobs (including hidden jobs).
75 def get_service_name(self, name: str, debug: bool = False) -> str: 76 """ 77 Return a job's service name. 78 """ 79 return f"mrsm-{name.replace(' ', '-')}.service"
Return a job's service name.
81 def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path: 82 """ 83 Return the path for the job's files under the root directory. 84 """ 85 from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH 86 return SYSTEMD_JOBS_RESOURCES_PATH / name
Return the path for the job's files under the root directory.
88 def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 89 """ 90 Return the path to where to create the service symlink. 91 """ 92 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 93 return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug)
Return the path to where to create the service symlink.
95 def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 96 """ 97 Return the path to a Job's service file. 98 """ 99 return ( 100 self.get_service_job_path(name, debug=debug) 101 / self.get_service_name(name, debug=debug) 102 )
Return the path to a Job's service file.
104 def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path: 105 """ 106 Return the path to direct service logs to. 107 """ 108 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 109 return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log')
Return the path to direct service logs to.
111 def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path: 112 """ 113 Return the path to the FIFO file. 114 """ 115 return ( 116 self.get_service_job_path(name, debug=debug) 117 / (self.get_service_name(name, debug=debug) + '.stdin') 118 )
Return the path to the FIFO file.
120 def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path: 121 """ 122 Return the path to the result file. 123 """ 124 return ( 125 self.get_service_job_path(name, debug=debug) 126 / (self.get_service_name(name, debug=debug) + '.result.json') 127 )
Return the path to the result file.
129 def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str: 130 """ 131 Return the contents of the unit file. 132 """ 133 service_logs_path = self.get_service_logs_path(name, debug=debug) 134 socket_path = self.get_socket_path(name, debug=debug) 135 result_path = self.get_result_path(name, debug=debug) 136 job = self.get_hidden_job(name, debug=debug) 137 138 sysargs_str = shlex.join(sysargs) 139 exec_str = f'{sys.executable} -m meerschaum {sysargs_str}' 140 mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()]) 141 mrsm_env_vars = { 142 key: val 143 for key, val in os.environ.items() 144 if key in mrsm_env_var_names 145 } 146 147 ### Add new environment variables for the service process. 148 mrsm_env_vars.update({ 149 STATIC_CONFIG['environment']['daemon_id']: name, 150 STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(), 151 STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(), 152 STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(), 153 STATIC_CONFIG['environment']['systemd_delete_job']: ( 154 '1' 155 if job.delete_after_completion 156 else '0' 157 ), 158 }) 159 160 ### Allow for user-defined environment variables. 161 mrsm_env_vars.update(job.env) 162 163 environment_lines = [ 164 f"Environment={key}={shlex.quote(str(val))}" 165 for key, val in mrsm_env_vars.items() 166 ] 167 environment_str = '\n'.join(environment_lines) 168 service_name = self.get_service_name(name, debug=debug) 169 170 service_text = ( 171 "[Unit]\n" 172 f"Description=Run the job '{name}'\n" 173 "\n" 174 "[Service]\n" 175 f"ExecStart={exec_str}\n" 176 "KillSignal=SIGTERM\n" 177 "Restart=always\n" 178 "RestartPreventExitStatus=0\n" 179 f"SyslogIdentifier={service_name}\n" 180 f"{environment_str}\n" 181 "\n" 182 "[Install]\n" 183 "WantedBy=default.target\n" 184 ) 185 return service_text
Return the contents of the unit file.
187 def get_socket_file_text(self, name: str, debug: bool = False) -> str: 188 """ 189 Return the contents of the socket file. 190 """ 191 service_name = self.get_service_name(name, debug=debug) 192 socket_path = self.get_socket_path(name, debug=debug) 193 socket_text = ( 194 "[Unit]\n" 195 f"BindsTo={service_name}\n" 196 "\n" 197 "[Socket]\n" 198 f"ListenFIFO={socket_path.as_posix()}\n" 199 "FileDescriptorName=stdin\n" 200 "RemoveOnStop=true\n" 201 "SocketMode=0660\n" 202 ) 203 return socket_text
Return the contents of the socket file.
229 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 230 """ 231 Return metadata about a job. 232 """ 233 now = time.perf_counter() 234 235 if '_jobs_metadata' not in self.__dict__: 236 self._jobs_metadata: Dict[str, Any] = {} 237 238 if name in self._jobs_metadata: 239 ts = self._jobs_metadata[name].get('timestamp', None) 240 241 if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS: 242 if debug: 243 dprint(f"Retuning cached metadata for job '{name}'.") 244 return self._jobs_metadata[name]['metadata'] 245 246 metadata = { 247 'sysargs': self.get_job_sysargs(name, debug=debug), 248 'result': self.get_job_result(name, debug=debug), 249 'restart': self.get_job_restart(name, debug=debug), 250 'daemon': { 251 'status': self.get_job_status(name, debug=debug), 252 'pid': self.get_job_pid(name, debug=debug), 253 'properties': self.get_job_properties(name, debug=debug), 254 }, 255 } 256 self._jobs_metadata[name] = { 257 'timestamp': now, 258 'metadata': metadata, 259 } 260 return metadata
Return metadata about a job.
262 def get_job_restart(self, name: str, debug: bool = False) -> bool: 263 """ 264 Return whether a job restarts. 265 """ 266 from meerschaum.jobs._Job import RESTART_FLAGS 267 sysargs = self.get_job_sysargs(name, debug=debug) 268 if not sysargs: 269 return False 270 271 for flag in RESTART_FLAGS: 272 if flag in sysargs: 273 return True 274 275 return False
Return whether a job restarts.
277 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 278 """ 279 Return the properties for a job. 280 """ 281 job = self.get_hidden_job(name, debug=debug) 282 return { 283 k: v for k, v in job.daemon.properties.items() 284 if k != 'externally_managed' 285 }
Return the properties for a job.
287 def get_job_process(self, name: str, debug: bool = False): 288 """ 289 Return a `psutil.Process` for the job's PID. 290 """ 291 pid = self.get_job_pid(name, debug=debug) 292 if pid is None: 293 return None 294 295 psutil = mrsm.attempt_import('psutil') 296 try: 297 return psutil.Process(pid) 298 except Exception: 299 return None
Return a psutil.Process
for the job's PID.
301 def get_job_status(self, name: str, debug: bool = False) -> str: 302 """ 303 Return the job's service status. 304 """ 305 output = self.run_command( 306 ['is-active', self.get_service_name(name, debug=debug)], 307 as_output=True, 308 debug=debug, 309 ) 310 311 if output == 'activating': 312 return 'running' 313 314 if output == 'active': 315 process = self.get_job_process(name, debug=debug) 316 if process is None: 317 return 'stopped' 318 319 try: 320 if process.status() == 'stopped': 321 return 'paused' 322 except Exception: 323 return 'stopped' 324 325 return 'running' 326 327 return 'stopped'
Return the job's service status.
329 def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]: 330 """ 331 Return the job's service PID. 332 """ 333 from meerschaum.utils.misc import is_int 334 335 output = self.run_command( 336 [ 337 'show', 338 self.get_service_name(name, debug=debug), 339 '--property=MainPID', 340 ], 341 as_output=True, 342 debug=debug, 343 ) 344 if not output.startswith('MainPID='): 345 return None 346 347 pid_str = output[len('MainPID='):] 348 if pid_str == '0': 349 return None 350 351 if is_int(pid_str): 352 return int(pid_str) 353 354 return None
Return the job's service PID.
356 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 357 """ 358 Return when a job began running. 359 """ 360 output = self.run_command( 361 [ 362 'show', 363 self.get_service_name(name, debug=debug), 364 '--property=ActiveEnterTimestamp' 365 ], 366 as_output=True, 367 debug=debug, 368 ) 369 if not output.startswith('ActiveEnterTimestamp'): 370 return None 371 372 dt_str = output.split('=')[-1] 373 if not dt_str: 374 return None 375 376 dateutil_parser = mrsm.attempt_import('dateutil.parser') 377 try: 378 dt = dateutil_parser.parse(dt_str) 379 except Exception as e: 380 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 381 return None 382 383 return dt.astimezone(timezone.utc).isoformat()
Return when a job began running.
385 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 386 """ 387 Return when a job began running. 388 """ 389 output = self.run_command( 390 [ 391 'show', 392 self.get_service_name(name, debug=debug), 393 '--property=InactiveEnterTimestamp' 394 ], 395 as_output=True, 396 debug=debug, 397 ) 398 if not output.startswith('InactiveEnterTimestamp'): 399 return None 400 401 dt_str = output.split('=')[-1] 402 if not dt_str: 403 return None 404 405 dateutil_parser = mrsm.attempt_import('dateutil.parser') 406 407 try: 408 dt = dateutil_parser.parse(dt_str) 409 except Exception as e: 410 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 411 return None 412 return dt.astimezone(timezone.utc).isoformat()
Return when a job began running.
414 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 415 """ 416 Return when a job was paused. 417 """ 418 job = self.get_hidden_job(name, debug=debug) 419 if self.get_job_status(name, debug=debug) != 'paused': 420 return None 421 422 stop_time = job.stop_time 423 if stop_time is None: 424 return None 425 426 return stop_time.isoformat()
Return when a job was paused.
428 def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple: 429 """ 430 Return the job's result SuccessTuple. 431 """ 432 result_path = self.get_result_path(name, debug=debug) 433 if not result_path.exists(): 434 return False, "No result available." 435 436 try: 437 with open(result_path, 'r', encoding='utf-8') as f: 438 result = json.load(f) 439 except Exception: 440 return False, f"Could not read result for Job '{name}'." 441 442 return tuple(result)
Return the job's result SuccessTuple.
444 def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]: 445 """ 446 Return the sysargs from the service file. 447 """ 448 service_file_path = self.get_service_file_path(name, debug=debug) 449 if not service_file_path.exists(): 450 return [] 451 452 with open(service_file_path, 'r', encoding='utf-8') as f: 453 service_lines = f.readlines() 454 455 for line in service_lines: 456 if line.startswith('ExecStart='): 457 sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0] 458 return shlex.split(sysargs_str) 459 460 return []
Return the sysargs from the service file.
462 def run_command( 463 self, 464 command_args: List[str], 465 as_output: bool = False, 466 debug: bool = False, 467 ) -> Union[SuccessTuple, str]: 468 """ 469 Run a `systemd` command and return success. 470 471 Parameters 472 ---------- 473 command_args: List[str] 474 The command to pass to `systemctl --user`. 475 476 as_output: bool, default False 477 If `True`, return the process stdout output. 478 Defaults to a `SuccessTuple`. 479 480 Returns 481 ------- 482 A `SuccessTuple` indicating success or a str for the process output. 483 """ 484 from meerschaum.utils.process import run_process 485 486 if command_args[:2] != ['systemctl', '--user']: 487 command_args = ['systemctl', '--user'] + command_args 488 489 if debug: 490 dprint(shlex.join(command_args)) 491 492 proc = run_process( 493 command_args, 494 foreground=False, 495 as_proc=True, 496 capture_output=True, 497 text=True, 498 ) 499 stdout, stderr = proc.communicate() 500 if debug: 501 dprint(f"{stdout}") 502 503 if as_output: 504 return stdout.strip() 505 506 command_success = proc.wait() == 0 507 command_msg = ( 508 "Success" 509 if command_success 510 else f"Failed to execute command `{shlex.join(command_args)}`." 511 ) 512 return command_success, command_msg
Run a systemd
command and return success.
Parameters
- command_args (List[str]):
The command to pass to
systemctl --user
. - as_output (bool, default False):
If
True
, return the process stdout output. Defaults to aSuccessTuple
.
Returns
- A
SuccessTuple
indicating success or a str for the process output.
514 def get_job_stdin_file(self, name: str, debug: bool = False): 515 """ 516 Return a `StdinFile` for the job. 517 """ 518 from meerschaum.utils.daemon import StdinFile 519 if '_stdin_files' not in self.__dict__: 520 self._stdin_files: Dict[str, StdinFile] = {} 521 522 if name not in self._stdin_files: 523 socket_path = self.get_socket_path(name, debug=debug) 524 socket_path.parent.mkdir(parents=True, exist_ok=True) 525 self._stdin_files[name] = StdinFile(socket_path) 526 527 return self._stdin_files[name]
Return a StdinFile
for the job.
529 def create_job( 530 self, 531 name: str, 532 sysargs: List[str], 533 properties: Optional[Dict[str, Any]] = None, 534 debug: bool = False, 535 ) -> SuccessTuple: 536 """ 537 Create a job as a service to be run by `systemd`. 538 """ 539 from meerschaum.utils.misc import make_symlink 540 service_name = self.get_service_name(name, debug=debug) 541 service_file_path = self.get_service_file_path(name, debug=debug) 542 service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug) 543 socket_stdin = self.get_job_stdin_file(name, debug=debug) 544 _ = socket_stdin.file_handler 545 546 ### Init the externally_managed file. 547 ### NOTE: We must write the pickle file in addition to the properties file. 548 job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug) 549 job._set_externally_managed() 550 pickle_success, pickle_msg = job.daemon.write_pickle() 551 if not pickle_success: 552 return pickle_success, pickle_msg 553 properties_success, properties_msg = job.daemon.write_properties() 554 if not properties_success: 555 return properties_success, properties_msg 556 557 service_file_path.parent.mkdir(parents=True, exist_ok=True) 558 559 with open(service_file_path, 'w+', encoding='utf-8') as f: 560 f.write(self.get_service_file_text(name, sysargs, debug=debug)) 561 562 symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path) 563 if not symlink_success: 564 return symlink_success, symlink_msg 565 566 commands = [ 567 ['daemon-reload'], 568 ['enable', service_name], 569 ['start', service_name], 570 ] 571 572 fails = 0 573 for command_list in commands: 574 command_success, command_msg = self.run_command(command_list, debug=debug) 575 if not command_success: 576 fails += 1 577 578 if fails > 1: 579 return False, "Failed to reload systemd." 580 581 return True, f"Started job '{name}' via systemd."
Create a job as a service to be run by systemd
.
583 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 584 """ 585 Stop a job's service. 586 """ 587 job = self.get_hidden_job(name, debug=debug) 588 job.daemon._remove_stop_file() 589 590 status = self.get_job_status(name, debug=debug) 591 if status == 'paused': 592 return self.run_command( 593 ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)], 594 debug=debug, 595 ) 596 597 return self.run_command( 598 ['start', self.get_service_name(name, debug=debug)], 599 debug=debug, 600 )
Stop a job's service.
602 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 603 """ 604 Stop a job's service. 605 """ 606 job = self.get_hidden_job(name, debug=debug) 607 job.daemon._write_stop_file('quit') 608 sigint_success, sigint_msg = self.run_command( 609 ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)], 610 debug=debug, 611 ) 612 613 check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds') 614 loop_start = time.perf_counter() 615 timeout_seconds = get_config('jobs', 'timeout_seconds') 616 while (time.perf_counter() - loop_start) < timeout_seconds: 617 if self.get_job_status(name, debug=debug) == 'stopped': 618 return True, 'Success' 619 620 time.sleep(check_timeout_interval) 621 622 return self.run_command( 623 ['stop', self.get_service_name(name, debug=debug)], 624 debug=debug, 625 )
Stop a job's service.
627 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 628 """ 629 Pause a job's service. 630 """ 631 job = self.get_hidden_job(name, debug=debug) 632 job.daemon._write_stop_file('pause') 633 return self.run_command( 634 ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)], 635 debug=debug, 636 )
Pause a job's service.
638 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 639 """ 640 Delete a job's service. 641 """ 642 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 643 job = self.get_hidden_job(name, debug=debug) 644 645 if not job.delete_after_completion: 646 _ = self.stop_job(name, debug=debug) 647 _ = self.run_command( 648 ['disable', self.get_service_name(name, debug=debug)], 649 debug=debug, 650 ) 651 652 service_job_path = self.get_service_job_path(name, debug=debug) 653 try: 654 if service_job_path.exists(): 655 shutil.rmtree(service_job_path) 656 except Exception as e: 657 warn(e) 658 return False, str(e) 659 660 service_logs_path = self.get_service_logs_path(name, debug=debug) 661 logs_paths = [ 662 (SYSTEMD_LOGS_RESOURCES_PATH / name) 663 for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH) 664 if name.startswith(service_logs_path.name + '.') 665 ] 666 paths = [ 667 self.get_service_file_path(name, debug=debug), 668 self.get_service_symlink_file_path(name, debug=debug), 669 self.get_socket_path(name, debug=debug), 670 self.get_result_path(name, debug=debug), 671 ] + logs_paths 672 673 for path in paths: 674 if path.exists(): 675 try: 676 path.unlink() 677 except Exception as e: 678 warn(e) 679 return False, str(e) 680 681 _ = job.delete() 682 683 return self.run_command(['daemon-reload'], debug=debug)
Delete a job's service.
685 def get_logs(self, name: str, debug: bool = False) -> str: 686 """ 687 Return a job's journal logs. 688 """ 689 rotating_file = self.get_job_rotating_file(name, debug=debug) 690 return rotating_file.read()
Return a job's journal logs.
692 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 693 """ 694 Return a job's stop time. 695 """ 696 job = self.get_hidden_job(name, debug=debug) 697 return job.stop_time
Return a job's stop time.
699 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 700 """ 701 Return whether a job is blocking on stdin. 702 """ 703 socket_path = self.get_socket_path(name, debug=debug) 704 blocking_path = socket_path.parent / (socket_path.name + '.block') 705 return blocking_path.exists()
Return whether a job is blocking on stdin.
707 def get_job_prompt_kwargs(self, name: str, debug: bool = False) -> Dict[str, Any]: 708 """ 709 Return the kwargs to the blocking prompt. 710 """ 711 job = self.get_hidden_job(name, debug=debug) 712 return job.get_prompt_kwargs(debug=debug)
Return the kwargs to the blocking prompt.
714 def get_job_rotating_file(self, name: str, debug: bool = False): 715 """ 716 Return a `RotatingFile` for the job's log output. 717 """ 718 from meerschaum.utils.daemon import RotatingFile 719 service_logs_path = self.get_service_logs_path(name, debug=debug) 720 return RotatingFile(service_logs_path)
Return a RotatingFile
for the job's log output.
722 async def monitor_logs_async( 723 self, 724 name: str, 725 *args, 726 debug: bool = False, 727 **kwargs 728 ): 729 """ 730 Monitor a job's output. 731 """ 732 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 733 job = self.get_hidden_job(name, debug=debug) 734 kwargs.update({ 735 '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH, 736 '_log': self.get_job_rotating_file(name, debug=debug), 737 '_stdin_file': self.get_job_stdin_file(name, debug=debug), 738 'debug': debug, 739 }) 740 await job.monitor_logs_async(*args, **kwargs)
Monitor a job's output.