meerschaum.jobs.systemd
Manage meerschaum.jobs.Job
via systemd
.
1#! /usr/bin/env python3 2# vim:fenc=utf-8 3 4""" 5Manage `meerschaum.jobs.Job` via `systemd`. 6""" 7 8import os 9import pathlib 10import shlex 11import sys 12import asyncio 13import json 14import time 15import traceback 16import shutil 17from datetime import datetime, timezone 18from functools import partial 19 20import meerschaum as mrsm 21from meerschaum.jobs import Job, Executor, make_executor 22from meerschaum.utils.typing import Dict, Any, List, SuccessTuple, Union, Optional, Callable 23from meerschaum.config import get_config 24from meerschaum.config.static import STATIC_CONFIG 25from meerschaum.utils.warnings import warn, dprint 26from meerschaum._internal.arguments._parse_arguments import parse_arguments 27 28JOB_METADATA_CACHE_SECONDS: int = STATIC_CONFIG['api']['jobs']['metadata_cache_seconds'] 29 30 31@make_executor 32class SystemdExecutor(Executor): 33 """ 34 Execute Meerschaum jobs via `systemd`. 35 """ 36 37 def get_job_names(self, debug: bool = False) -> List[str]: 38 """ 39 Return a list of existing jobs, including hidden ones. 40 """ 41 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 42 return [ 43 service_name[len('mrsm-'):(-1 * len('.service'))] 44 for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH) 45 if ( 46 service_name.startswith('mrsm-') 47 and service_name.endswith('.service') 48 ### Check for broken symlinks. 49 and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists() 50 ) 51 ] 52 53 def get_job_exists(self, name: str, debug: bool = False) -> bool: 54 """ 55 Return whether a job exists. 56 """ 57 user_services = self.get_job_names(debug=debug) 58 if debug: 59 dprint(f'Existing services: {user_services}') 60 return name in user_services 61 62 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 63 """ 64 Return a dictionary of `systemd` Jobs (including hidden jobs). 65 """ 66 user_services = self.get_job_names(debug=debug) 67 jobs = { 68 name: Job(name, executor_keys=str(self)) 69 for name in user_services 70 } 71 return { 72 name: job 73 for name, job in jobs.items() 74 } 75 76 def get_service_name(self, name: str, debug: bool = False) -> str: 77 """ 78 Return a job's service name. 79 """ 80 return f"mrsm-{name.replace(' ', '-')}.service" 81 82 def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path: 83 """ 84 Return the path for the job's files under the root directory. 85 """ 86 from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH 87 return SYSTEMD_JOBS_RESOURCES_PATH / name 88 89 def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 90 """ 91 Return the path to where to create the service symlink. 92 """ 93 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 94 return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug) 95 96 def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 97 """ 98 Return the path to a Job's service file. 99 """ 100 return ( 101 self.get_service_job_path(name, debug=debug) 102 / self.get_service_name(name, debug=debug) 103 ) 104 105 def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path: 106 """ 107 Return the path to direct service logs to. 108 """ 109 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 110 return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log') 111 112 def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path: 113 """ 114 Return the path to the FIFO file. 115 """ 116 return ( 117 self.get_service_job_path(name, debug=debug) 118 / (self.get_service_name(name, debug=debug) + '.stdin') 119 ) 120 121 def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path: 122 """ 123 Return the path to the result file. 124 """ 125 return ( 126 self.get_service_job_path(name, debug=debug) 127 / (self.get_service_name(name, debug=debug) + '.result.json') 128 ) 129 130 def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str: 131 """ 132 Return the contents of the unit file. 133 """ 134 service_logs_path = self.get_service_logs_path(name, debug=debug) 135 socket_path = self.get_socket_path(name, debug=debug) 136 result_path = self.get_result_path(name, debug=debug) 137 job = self.get_hidden_job(name, debug=debug) 138 139 sysargs_str = shlex.join(sysargs) 140 exec_str = f'{sys.executable} -m meerschaum {sysargs_str}' 141 mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()]) 142 mrsm_env_vars = { 143 key: val 144 for key, val in os.environ.items() 145 if key in mrsm_env_var_names 146 } 147 148 ### Add new environment variables for the service process. 149 mrsm_env_vars.update({ 150 STATIC_CONFIG['environment']['daemon_id']: name, 151 STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(), 152 STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(), 153 STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(), 154 STATIC_CONFIG['environment']['systemd_delete_job']: ( 155 '1' 156 if job.delete_after_completion 157 else '0', 158 ), 159 }) 160 161 ### Allow for user-defined environment variables. 162 mrsm_env_vars.update(job.env) 163 164 environment_lines = [ 165 f"Environment={key}={shlex.quote(str(val))}" 166 for key, val in mrsm_env_vars.items() 167 ] 168 environment_str = '\n'.join(environment_lines) 169 service_name = self.get_service_name(name, debug=debug) 170 171 service_text = ( 172 "[Unit]\n" 173 f"Description=Run the job '{name}'\n" 174 "\n" 175 "[Service]\n" 176 f"ExecStart={exec_str}\n" 177 "KillSignal=SIGTERM\n" 178 "Restart=always\n" 179 "RestartPreventExitStatus=0\n" 180 f"SyslogIdentifier={service_name}\n" 181 f"{environment_str}\n" 182 "\n" 183 "[Install]\n" 184 "WantedBy=default.target\n" 185 ) 186 return service_text 187 188 def get_socket_file_text(self, name: str, debug: bool = False) -> str: 189 """ 190 Return the contents of the socket file. 191 """ 192 service_name = self.get_service_name(name, debug=debug) 193 socket_path = self.get_socket_path(name, debug=debug) 194 socket_text = ( 195 "[Unit]\n" 196 f"BindsTo={service_name}\n" 197 "\n" 198 "[Socket]\n" 199 f"ListenFIFO={socket_path.as_posix()}\n" 200 "FileDescriptorName=stdin\n" 201 "RemoveOnStop=true\n" 202 "SocketMode=0660\n" 203 ) 204 return socket_text 205 206 def get_hidden_job( 207 self, 208 name: str, 209 sysargs: Optional[List[str]] = None, 210 properties: Optional[Dict[str, Any]] = None, 211 debug: bool = False, 212 ): 213 """ 214 Return the hidden "sister" job to store a job's parameters. 215 """ 216 job = Job( 217 name, 218 sysargs, 219 executor_keys='local', 220 _properties=properties, 221 _rotating_log=self.get_job_rotating_file(name, debug=debug), 222 _stdin_file=self.get_job_stdin_file(name, debug=debug), 223 _status_hook=partial(self.get_job_status, name), 224 _result_hook=partial(self.get_job_result, name), 225 _externally_managed=True, 226 ) 227 return job 228 229 230 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 231 """ 232 Return metadata about a job. 233 """ 234 now = time.perf_counter() 235 236 if '_jobs_metadata' not in self.__dict__: 237 self._jobs_metadata: Dict[str, Any] = {} 238 239 if name in self._jobs_metadata: 240 ts = self._jobs_metadata[name].get('timestamp', None) 241 242 if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS: 243 if debug: 244 dprint(f"Retuning cached metadata for job '{name}'.") 245 return self._jobs_metadata[name]['metadata'] 246 247 metadata = { 248 'sysargs': self.get_job_sysargs(name, debug=debug), 249 'result': self.get_job_result(name, debug=debug), 250 'restart': self.get_job_restart(name, debug=debug), 251 'daemon': { 252 'status': self.get_job_status(name, debug=debug), 253 'pid': self.get_job_pid(name, debug=debug), 254 'properties': self.get_job_properties(name, debug=debug), 255 }, 256 } 257 self._jobs_metadata[name] = { 258 'timestamp': now, 259 'metadata': metadata, 260 } 261 return metadata 262 263 def get_job_restart(self, name: str, debug: bool = False) -> bool: 264 """ 265 Return whether a job restarts. 266 """ 267 from meerschaum.jobs._Job import RESTART_FLAGS 268 sysargs = self.get_job_sysargs(name, debug=debug) 269 if not sysargs: 270 return False 271 272 for flag in RESTART_FLAGS: 273 if flag in sysargs: 274 return True 275 276 return False 277 278 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 279 """ 280 Return the properties for a job. 281 """ 282 job = self.get_hidden_job(name, debug=debug) 283 return { 284 k: v for k, v in job.daemon.properties.items() 285 if k != 'externally_managed' 286 } 287 288 def get_job_process(self, name: str, debug: bool = False): 289 """ 290 Return a `psutil.Process` for the job's PID. 291 """ 292 pid = self.get_job_pid(name, debug=debug) 293 if pid is None: 294 return None 295 296 psutil = mrsm.attempt_import('psutil') 297 try: 298 return psutil.Process(pid) 299 except Exception: 300 return None 301 302 def get_job_status(self, name: str, debug: bool = False) -> str: 303 """ 304 Return the job's service status. 305 """ 306 output = self.run_command( 307 ['is-active', self.get_service_name(name, debug=debug)], 308 as_output=True, 309 debug=debug, 310 ) 311 312 if output == 'activating': 313 return 'running' 314 315 if output == 'active': 316 process = self.get_job_process(name, debug=debug) 317 if process is None: 318 return 'stopped' 319 320 try: 321 if process.status() == 'stopped': 322 return 'paused' 323 except Exception: 324 return 'stopped' 325 326 return 'running' 327 328 return 'stopped' 329 330 def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]: 331 """ 332 Return the job's service PID. 333 """ 334 from meerschaum.utils.misc import is_int 335 336 output = self.run_command( 337 [ 338 'show', 339 self.get_service_name(name, debug=debug), 340 '--property=MainPID', 341 ], 342 as_output=True, 343 debug=debug, 344 ) 345 if not output.startswith('MainPID='): 346 return None 347 348 pid_str = output[len('MainPID='):] 349 if pid_str == '0': 350 return None 351 352 if is_int(pid_str): 353 return int(pid_str) 354 355 return None 356 357 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 358 """ 359 Return when a job began running. 360 """ 361 output = self.run_command( 362 [ 363 'show', 364 self.get_service_name(name, debug=debug), 365 '--property=ActiveEnterTimestamp' 366 ], 367 as_output=True, 368 debug=debug, 369 ) 370 if not output.startswith('ActiveEnterTimestamp'): 371 return None 372 373 dt_str = output.split('=')[-1] 374 if not dt_str: 375 return None 376 377 dateutil_parser = mrsm.attempt_import('dateutil.parser') 378 try: 379 dt = dateutil_parser.parse(dt_str) 380 except Exception as e: 381 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 382 return None 383 384 return dt.astimezone(timezone.utc).isoformat() 385 386 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 387 """ 388 Return when a job began running. 389 """ 390 output = self.run_command( 391 [ 392 'show', 393 self.get_service_name(name, debug=debug), 394 '--property=InactiveEnterTimestamp' 395 ], 396 as_output=True, 397 debug=debug, 398 ) 399 if not output.startswith('InactiveEnterTimestamp'): 400 return None 401 402 dt_str = output.split('=')[-1] 403 if not dt_str: 404 return None 405 406 dateutil_parser = mrsm.attempt_import('dateutil.parser') 407 408 try: 409 dt = dateutil_parser.parse(dt_str) 410 except Exception as e: 411 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 412 return None 413 return dt.astimezone(timezone.utc).isoformat() 414 415 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 416 """ 417 Return when a job was paused. 418 """ 419 job = self.get_hidden_job(name, debug=debug) 420 if self.get_job_status(name, debug=debug) != 'paused': 421 return None 422 423 stop_time = job.stop_time 424 if stop_time is None: 425 return None 426 427 return stop_time.isoformat() 428 429 def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple: 430 """ 431 Return the job's result SuccessTuple. 432 """ 433 result_path = self.get_result_path(name, debug=debug) 434 if not result_path.exists(): 435 return False, "No result available." 436 437 try: 438 with open(result_path, 'r', encoding='utf-8') as f: 439 result = json.load(f) 440 except Exception: 441 return False, f"Could not read result for Job '{name}'." 442 443 return tuple(result) 444 445 def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]: 446 """ 447 Return the sysargs from the service file. 448 """ 449 service_file_path = self.get_service_file_path(name, debug=debug) 450 if not service_file_path.exists(): 451 return [] 452 453 with open(service_file_path, 'r', encoding='utf-8') as f: 454 service_lines = f.readlines() 455 456 for line in service_lines: 457 if line.startswith('ExecStart='): 458 sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0] 459 return shlex.split(sysargs_str) 460 461 return [] 462 463 def run_command( 464 self, 465 command_args: List[str], 466 as_output: bool = False, 467 debug: bool = False, 468 ) -> Union[SuccessTuple, str]: 469 """ 470 Run a `systemd` command and return success. 471 472 Parameters 473 ---------- 474 command_args: List[str] 475 The command to pass to `systemctl --user`. 476 477 as_output: bool, default False 478 If `True`, return the process stdout output. 479 Defaults to a `SuccessTuple`. 480 481 Returns 482 ------- 483 A `SuccessTuple` indicating success or a str for the process output. 484 """ 485 from meerschaum.utils.process import run_process 486 487 if command_args[:2] != ['systemctl', '--user']: 488 command_args = ['systemctl', '--user'] + command_args 489 490 if debug: 491 dprint(shlex.join(command_args)) 492 493 proc = run_process( 494 command_args, 495 foreground=False, 496 as_proc=True, 497 capture_output=True, 498 text=True, 499 ) 500 stdout, stderr = proc.communicate() 501 if debug: 502 dprint(f"{stdout}") 503 504 if as_output: 505 return stdout.strip() 506 507 command_success = proc.wait() == 0 508 command_msg = ( 509 "Success" 510 if command_success 511 else f"Failed to execute command `{shlex.join(command_args)}`." 512 ) 513 return command_success, command_msg 514 515 def get_job_stdin_file(self, name: str, debug: bool = False): 516 """ 517 Return a `StdinFile` for the job. 518 """ 519 from meerschaum.utils.daemon import StdinFile 520 if '_stdin_files' not in self.__dict__: 521 self._stdin_files: Dict[str, StdinFile] = {} 522 523 if name not in self._stdin_files: 524 socket_path = self.get_socket_path(name, debug=debug) 525 socket_path.parent.mkdir(parents=True, exist_ok=True) 526 self._stdin_files[name] = StdinFile(socket_path) 527 528 return self._stdin_files[name] 529 530 def create_job( 531 self, 532 name: str, 533 sysargs: List[str], 534 properties: Optional[Dict[str, Any]] = None, 535 debug: bool = False, 536 ) -> SuccessTuple: 537 """ 538 Create a job as a service to be run by `systemd`. 539 """ 540 from meerschaum.utils.misc import make_symlink 541 service_name = self.get_service_name(name, debug=debug) 542 service_file_path = self.get_service_file_path(name, debug=debug) 543 service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug) 544 socket_stdin = self.get_job_stdin_file(name, debug=debug) 545 _ = socket_stdin.file_handler 546 547 ### Init the externally_managed file. 548 ### NOTE: We must write the pickle file in addition to the properties file. 549 job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug) 550 job._set_externally_managed() 551 pickle_success, pickle_msg = job.daemon.write_pickle() 552 if not pickle_success: 553 return pickle_success, pickle_msg 554 properties_success, properties_msg = job.daemon.write_properties() 555 if not properties_success: 556 return properties_success, properties_msg 557 558 service_file_path.parent.mkdir(parents=True, exist_ok=True) 559 560 with open(service_file_path, 'w+', encoding='utf-8') as f: 561 f.write(self.get_service_file_text(name, sysargs, debug=debug)) 562 563 symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path) 564 if not symlink_success: 565 return symlink_success, symlink_msg 566 567 commands = [ 568 ['daemon-reload'], 569 ['enable', service_name], 570 ['start', service_name], 571 ] 572 573 fails = 0 574 for command_list in commands: 575 command_success, command_msg = self.run_command(command_list, debug=debug) 576 if not command_success: 577 fails += 1 578 579 if fails > 1: 580 return False, "Failed to reload systemd." 581 582 return True, f"Started job '{name}' via systemd." 583 584 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 585 """ 586 Stop a job's service. 587 """ 588 job = self.get_hidden_job(name, debug=debug) 589 job.daemon._remove_stop_file() 590 591 status = self.get_job_status(name, debug=debug) 592 if status == 'paused': 593 return self.run_command( 594 ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)], 595 debug=debug, 596 ) 597 598 return self.run_command( 599 ['start', self.get_service_name(name, debug=debug)], 600 debug=debug, 601 ) 602 603 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 604 """ 605 Stop a job's service. 606 """ 607 job = self.get_hidden_job(name, debug=debug) 608 job.daemon._write_stop_file('quit') 609 sigint_success, sigint_msg = self.run_command( 610 ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)], 611 debug=debug, 612 ) 613 614 check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds') 615 loop_start = time.perf_counter() 616 timeout_seconds = get_config('jobs', 'timeout_seconds') 617 while (time.perf_counter() - loop_start) < timeout_seconds: 618 if self.get_job_status(name, debug=debug) == 'stopped': 619 return True, 'Success' 620 621 time.sleep(check_timeout_interval) 622 623 return self.run_command( 624 ['stop', self.get_service_name(name, debug=debug)], 625 debug=debug, 626 ) 627 628 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 629 """ 630 Pause a job's service. 631 """ 632 job = self.get_hidden_job(name, debug=debug) 633 job.daemon._write_stop_file('pause') 634 return self.run_command( 635 ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)], 636 debug=debug, 637 ) 638 639 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 640 """ 641 Delete a job's service. 642 """ 643 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 644 job = self.get_hidden_job(name, debug=debug) 645 646 if not job.delete_after_completion: 647 _ = self.stop_job(name, debug=debug) 648 _ = self.run_command( 649 ['disable', self.get_service_name(name, debug=debug)], 650 debug=debug, 651 ) 652 653 service_job_path = self.get_service_job_path(name, debug=debug) 654 try: 655 if service_job_path.exists(): 656 shutil.rmtree(service_job_path) 657 except Exception as e: 658 warn(e) 659 return False, str(e) 660 661 service_logs_path = self.get_service_logs_path(name, debug=debug) 662 logs_paths = [ 663 (SYSTEMD_LOGS_RESOURCES_PATH / name) 664 for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH) 665 if name.startswith(service_logs_path.name + '.') 666 ] 667 paths = [ 668 self.get_service_file_path(name, debug=debug), 669 self.get_service_symlink_file_path(name, debug=debug), 670 self.get_socket_path(name, debug=debug), 671 self.get_result_path(name, debug=debug), 672 ] + logs_paths 673 674 for path in paths: 675 if path.exists(): 676 try: 677 path.unlink() 678 except Exception as e: 679 warn(e) 680 return False, str(e) 681 682 _ = job.delete() 683 684 return self.run_command(['daemon-reload'], debug=debug) 685 686 def get_logs(self, name: str, debug: bool = False) -> str: 687 """ 688 Return a job's journal logs. 689 """ 690 rotating_file = self.get_job_rotating_file(name, debug=debug) 691 return rotating_file.read() 692 693 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 694 """ 695 Return a job's stop time. 696 """ 697 job = self.get_hidden_job(name, debug=debug) 698 return job.stop_time 699 700 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 701 """ 702 Return whether a job is blocking on stdin. 703 """ 704 socket_path = self.get_socket_path(name, debug=debug) 705 blocking_path = socket_path.parent / (socket_path.name + '.block') 706 return blocking_path.exists() 707 708 def get_job_rotating_file(self, name: str, debug: bool = False): 709 """ 710 Return a `RotatingFile` for the job's log output. 711 """ 712 from meerschaum.utils.daemon import RotatingFile 713 service_logs_path = self.get_service_logs_path(name, debug=debug) 714 return RotatingFile(service_logs_path) 715 716 async def monitor_logs_async( 717 self, 718 name: str, 719 *args, 720 debug: bool = False, 721 **kwargs 722 ): 723 """ 724 Monitor a job's output. 725 """ 726 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 727 job = self.get_hidden_job(name, debug=debug) 728 kwargs.update({ 729 '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH, 730 '_log': self.get_job_rotating_file(name, debug=debug), 731 '_stdin_file': self.get_job_stdin_file(name, debug=debug), 732 'debug': debug, 733 }) 734 await job.monitor_logs_async(*args, **kwargs) 735 736 def monitor_logs(self, *args, **kwargs): 737 """ 738 Monitor a job's output. 739 """ 740 asyncio.run(self.monitor_logs_async(*args, **kwargs))
32@make_executor 33class SystemdExecutor(Executor): 34 """ 35 Execute Meerschaum jobs via `systemd`. 36 """ 37 38 def get_job_names(self, debug: bool = False) -> List[str]: 39 """ 40 Return a list of existing jobs, including hidden ones. 41 """ 42 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 43 return [ 44 service_name[len('mrsm-'):(-1 * len('.service'))] 45 for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH) 46 if ( 47 service_name.startswith('mrsm-') 48 and service_name.endswith('.service') 49 ### Check for broken symlinks. 50 and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists() 51 ) 52 ] 53 54 def get_job_exists(self, name: str, debug: bool = False) -> bool: 55 """ 56 Return whether a job exists. 57 """ 58 user_services = self.get_job_names(debug=debug) 59 if debug: 60 dprint(f'Existing services: {user_services}') 61 return name in user_services 62 63 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 64 """ 65 Return a dictionary of `systemd` Jobs (including hidden jobs). 66 """ 67 user_services = self.get_job_names(debug=debug) 68 jobs = { 69 name: Job(name, executor_keys=str(self)) 70 for name in user_services 71 } 72 return { 73 name: job 74 for name, job in jobs.items() 75 } 76 77 def get_service_name(self, name: str, debug: bool = False) -> str: 78 """ 79 Return a job's service name. 80 """ 81 return f"mrsm-{name.replace(' ', '-')}.service" 82 83 def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path: 84 """ 85 Return the path for the job's files under the root directory. 86 """ 87 from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH 88 return SYSTEMD_JOBS_RESOURCES_PATH / name 89 90 def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 91 """ 92 Return the path to where to create the service symlink. 93 """ 94 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 95 return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug) 96 97 def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 98 """ 99 Return the path to a Job's service file. 100 """ 101 return ( 102 self.get_service_job_path(name, debug=debug) 103 / self.get_service_name(name, debug=debug) 104 ) 105 106 def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path: 107 """ 108 Return the path to direct service logs to. 109 """ 110 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 111 return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log') 112 113 def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path: 114 """ 115 Return the path to the FIFO file. 116 """ 117 return ( 118 self.get_service_job_path(name, debug=debug) 119 / (self.get_service_name(name, debug=debug) + '.stdin') 120 ) 121 122 def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path: 123 """ 124 Return the path to the result file. 125 """ 126 return ( 127 self.get_service_job_path(name, debug=debug) 128 / (self.get_service_name(name, debug=debug) + '.result.json') 129 ) 130 131 def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str: 132 """ 133 Return the contents of the unit file. 134 """ 135 service_logs_path = self.get_service_logs_path(name, debug=debug) 136 socket_path = self.get_socket_path(name, debug=debug) 137 result_path = self.get_result_path(name, debug=debug) 138 job = self.get_hidden_job(name, debug=debug) 139 140 sysargs_str = shlex.join(sysargs) 141 exec_str = f'{sys.executable} -m meerschaum {sysargs_str}' 142 mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()]) 143 mrsm_env_vars = { 144 key: val 145 for key, val in os.environ.items() 146 if key in mrsm_env_var_names 147 } 148 149 ### Add new environment variables for the service process. 150 mrsm_env_vars.update({ 151 STATIC_CONFIG['environment']['daemon_id']: name, 152 STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(), 153 STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(), 154 STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(), 155 STATIC_CONFIG['environment']['systemd_delete_job']: ( 156 '1' 157 if job.delete_after_completion 158 else '0', 159 ), 160 }) 161 162 ### Allow for user-defined environment variables. 163 mrsm_env_vars.update(job.env) 164 165 environment_lines = [ 166 f"Environment={key}={shlex.quote(str(val))}" 167 for key, val in mrsm_env_vars.items() 168 ] 169 environment_str = '\n'.join(environment_lines) 170 service_name = self.get_service_name(name, debug=debug) 171 172 service_text = ( 173 "[Unit]\n" 174 f"Description=Run the job '{name}'\n" 175 "\n" 176 "[Service]\n" 177 f"ExecStart={exec_str}\n" 178 "KillSignal=SIGTERM\n" 179 "Restart=always\n" 180 "RestartPreventExitStatus=0\n" 181 f"SyslogIdentifier={service_name}\n" 182 f"{environment_str}\n" 183 "\n" 184 "[Install]\n" 185 "WantedBy=default.target\n" 186 ) 187 return service_text 188 189 def get_socket_file_text(self, name: str, debug: bool = False) -> str: 190 """ 191 Return the contents of the socket file. 192 """ 193 service_name = self.get_service_name(name, debug=debug) 194 socket_path = self.get_socket_path(name, debug=debug) 195 socket_text = ( 196 "[Unit]\n" 197 f"BindsTo={service_name}\n" 198 "\n" 199 "[Socket]\n" 200 f"ListenFIFO={socket_path.as_posix()}\n" 201 "FileDescriptorName=stdin\n" 202 "RemoveOnStop=true\n" 203 "SocketMode=0660\n" 204 ) 205 return socket_text 206 207 def get_hidden_job( 208 self, 209 name: str, 210 sysargs: Optional[List[str]] = None, 211 properties: Optional[Dict[str, Any]] = None, 212 debug: bool = False, 213 ): 214 """ 215 Return the hidden "sister" job to store a job's parameters. 216 """ 217 job = Job( 218 name, 219 sysargs, 220 executor_keys='local', 221 _properties=properties, 222 _rotating_log=self.get_job_rotating_file(name, debug=debug), 223 _stdin_file=self.get_job_stdin_file(name, debug=debug), 224 _status_hook=partial(self.get_job_status, name), 225 _result_hook=partial(self.get_job_result, name), 226 _externally_managed=True, 227 ) 228 return job 229 230 231 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 232 """ 233 Return metadata about a job. 234 """ 235 now = time.perf_counter() 236 237 if '_jobs_metadata' not in self.__dict__: 238 self._jobs_metadata: Dict[str, Any] = {} 239 240 if name in self._jobs_metadata: 241 ts = self._jobs_metadata[name].get('timestamp', None) 242 243 if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS: 244 if debug: 245 dprint(f"Retuning cached metadata for job '{name}'.") 246 return self._jobs_metadata[name]['metadata'] 247 248 metadata = { 249 'sysargs': self.get_job_sysargs(name, debug=debug), 250 'result': self.get_job_result(name, debug=debug), 251 'restart': self.get_job_restart(name, debug=debug), 252 'daemon': { 253 'status': self.get_job_status(name, debug=debug), 254 'pid': self.get_job_pid(name, debug=debug), 255 'properties': self.get_job_properties(name, debug=debug), 256 }, 257 } 258 self._jobs_metadata[name] = { 259 'timestamp': now, 260 'metadata': metadata, 261 } 262 return metadata 263 264 def get_job_restart(self, name: str, debug: bool = False) -> bool: 265 """ 266 Return whether a job restarts. 267 """ 268 from meerschaum.jobs._Job import RESTART_FLAGS 269 sysargs = self.get_job_sysargs(name, debug=debug) 270 if not sysargs: 271 return False 272 273 for flag in RESTART_FLAGS: 274 if flag in sysargs: 275 return True 276 277 return False 278 279 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 280 """ 281 Return the properties for a job. 282 """ 283 job = self.get_hidden_job(name, debug=debug) 284 return { 285 k: v for k, v in job.daemon.properties.items() 286 if k != 'externally_managed' 287 } 288 289 def get_job_process(self, name: str, debug: bool = False): 290 """ 291 Return a `psutil.Process` for the job's PID. 292 """ 293 pid = self.get_job_pid(name, debug=debug) 294 if pid is None: 295 return None 296 297 psutil = mrsm.attempt_import('psutil') 298 try: 299 return psutil.Process(pid) 300 except Exception: 301 return None 302 303 def get_job_status(self, name: str, debug: bool = False) -> str: 304 """ 305 Return the job's service status. 306 """ 307 output = self.run_command( 308 ['is-active', self.get_service_name(name, debug=debug)], 309 as_output=True, 310 debug=debug, 311 ) 312 313 if output == 'activating': 314 return 'running' 315 316 if output == 'active': 317 process = self.get_job_process(name, debug=debug) 318 if process is None: 319 return 'stopped' 320 321 try: 322 if process.status() == 'stopped': 323 return 'paused' 324 except Exception: 325 return 'stopped' 326 327 return 'running' 328 329 return 'stopped' 330 331 def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]: 332 """ 333 Return the job's service PID. 334 """ 335 from meerschaum.utils.misc import is_int 336 337 output = self.run_command( 338 [ 339 'show', 340 self.get_service_name(name, debug=debug), 341 '--property=MainPID', 342 ], 343 as_output=True, 344 debug=debug, 345 ) 346 if not output.startswith('MainPID='): 347 return None 348 349 pid_str = output[len('MainPID='):] 350 if pid_str == '0': 351 return None 352 353 if is_int(pid_str): 354 return int(pid_str) 355 356 return None 357 358 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 359 """ 360 Return when a job began running. 361 """ 362 output = self.run_command( 363 [ 364 'show', 365 self.get_service_name(name, debug=debug), 366 '--property=ActiveEnterTimestamp' 367 ], 368 as_output=True, 369 debug=debug, 370 ) 371 if not output.startswith('ActiveEnterTimestamp'): 372 return None 373 374 dt_str = output.split('=')[-1] 375 if not dt_str: 376 return None 377 378 dateutil_parser = mrsm.attempt_import('dateutil.parser') 379 try: 380 dt = dateutil_parser.parse(dt_str) 381 except Exception as e: 382 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 383 return None 384 385 return dt.astimezone(timezone.utc).isoformat() 386 387 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 388 """ 389 Return when a job began running. 390 """ 391 output = self.run_command( 392 [ 393 'show', 394 self.get_service_name(name, debug=debug), 395 '--property=InactiveEnterTimestamp' 396 ], 397 as_output=True, 398 debug=debug, 399 ) 400 if not output.startswith('InactiveEnterTimestamp'): 401 return None 402 403 dt_str = output.split('=')[-1] 404 if not dt_str: 405 return None 406 407 dateutil_parser = mrsm.attempt_import('dateutil.parser') 408 409 try: 410 dt = dateutil_parser.parse(dt_str) 411 except Exception as e: 412 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 413 return None 414 return dt.astimezone(timezone.utc).isoformat() 415 416 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 417 """ 418 Return when a job was paused. 419 """ 420 job = self.get_hidden_job(name, debug=debug) 421 if self.get_job_status(name, debug=debug) != 'paused': 422 return None 423 424 stop_time = job.stop_time 425 if stop_time is None: 426 return None 427 428 return stop_time.isoformat() 429 430 def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple: 431 """ 432 Return the job's result SuccessTuple. 433 """ 434 result_path = self.get_result_path(name, debug=debug) 435 if not result_path.exists(): 436 return False, "No result available." 437 438 try: 439 with open(result_path, 'r', encoding='utf-8') as f: 440 result = json.load(f) 441 except Exception: 442 return False, f"Could not read result for Job '{name}'." 443 444 return tuple(result) 445 446 def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]: 447 """ 448 Return the sysargs from the service file. 449 """ 450 service_file_path = self.get_service_file_path(name, debug=debug) 451 if not service_file_path.exists(): 452 return [] 453 454 with open(service_file_path, 'r', encoding='utf-8') as f: 455 service_lines = f.readlines() 456 457 for line in service_lines: 458 if line.startswith('ExecStart='): 459 sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0] 460 return shlex.split(sysargs_str) 461 462 return [] 463 464 def run_command( 465 self, 466 command_args: List[str], 467 as_output: bool = False, 468 debug: bool = False, 469 ) -> Union[SuccessTuple, str]: 470 """ 471 Run a `systemd` command and return success. 472 473 Parameters 474 ---------- 475 command_args: List[str] 476 The command to pass to `systemctl --user`. 477 478 as_output: bool, default False 479 If `True`, return the process stdout output. 480 Defaults to a `SuccessTuple`. 481 482 Returns 483 ------- 484 A `SuccessTuple` indicating success or a str for the process output. 485 """ 486 from meerschaum.utils.process import run_process 487 488 if command_args[:2] != ['systemctl', '--user']: 489 command_args = ['systemctl', '--user'] + command_args 490 491 if debug: 492 dprint(shlex.join(command_args)) 493 494 proc = run_process( 495 command_args, 496 foreground=False, 497 as_proc=True, 498 capture_output=True, 499 text=True, 500 ) 501 stdout, stderr = proc.communicate() 502 if debug: 503 dprint(f"{stdout}") 504 505 if as_output: 506 return stdout.strip() 507 508 command_success = proc.wait() == 0 509 command_msg = ( 510 "Success" 511 if command_success 512 else f"Failed to execute command `{shlex.join(command_args)}`." 513 ) 514 return command_success, command_msg 515 516 def get_job_stdin_file(self, name: str, debug: bool = False): 517 """ 518 Return a `StdinFile` for the job. 519 """ 520 from meerschaum.utils.daemon import StdinFile 521 if '_stdin_files' not in self.__dict__: 522 self._stdin_files: Dict[str, StdinFile] = {} 523 524 if name not in self._stdin_files: 525 socket_path = self.get_socket_path(name, debug=debug) 526 socket_path.parent.mkdir(parents=True, exist_ok=True) 527 self._stdin_files[name] = StdinFile(socket_path) 528 529 return self._stdin_files[name] 530 531 def create_job( 532 self, 533 name: str, 534 sysargs: List[str], 535 properties: Optional[Dict[str, Any]] = None, 536 debug: bool = False, 537 ) -> SuccessTuple: 538 """ 539 Create a job as a service to be run by `systemd`. 540 """ 541 from meerschaum.utils.misc import make_symlink 542 service_name = self.get_service_name(name, debug=debug) 543 service_file_path = self.get_service_file_path(name, debug=debug) 544 service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug) 545 socket_stdin = self.get_job_stdin_file(name, debug=debug) 546 _ = socket_stdin.file_handler 547 548 ### Init the externally_managed file. 549 ### NOTE: We must write the pickle file in addition to the properties file. 550 job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug) 551 job._set_externally_managed() 552 pickle_success, pickle_msg = job.daemon.write_pickle() 553 if not pickle_success: 554 return pickle_success, pickle_msg 555 properties_success, properties_msg = job.daemon.write_properties() 556 if not properties_success: 557 return properties_success, properties_msg 558 559 service_file_path.parent.mkdir(parents=True, exist_ok=True) 560 561 with open(service_file_path, 'w+', encoding='utf-8') as f: 562 f.write(self.get_service_file_text(name, sysargs, debug=debug)) 563 564 symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path) 565 if not symlink_success: 566 return symlink_success, symlink_msg 567 568 commands = [ 569 ['daemon-reload'], 570 ['enable', service_name], 571 ['start', service_name], 572 ] 573 574 fails = 0 575 for command_list in commands: 576 command_success, command_msg = self.run_command(command_list, debug=debug) 577 if not command_success: 578 fails += 1 579 580 if fails > 1: 581 return False, "Failed to reload systemd." 582 583 return True, f"Started job '{name}' via systemd." 584 585 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 586 """ 587 Stop a job's service. 588 """ 589 job = self.get_hidden_job(name, debug=debug) 590 job.daemon._remove_stop_file() 591 592 status = self.get_job_status(name, debug=debug) 593 if status == 'paused': 594 return self.run_command( 595 ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)], 596 debug=debug, 597 ) 598 599 return self.run_command( 600 ['start', self.get_service_name(name, debug=debug)], 601 debug=debug, 602 ) 603 604 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 605 """ 606 Stop a job's service. 607 """ 608 job = self.get_hidden_job(name, debug=debug) 609 job.daemon._write_stop_file('quit') 610 sigint_success, sigint_msg = self.run_command( 611 ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)], 612 debug=debug, 613 ) 614 615 check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds') 616 loop_start = time.perf_counter() 617 timeout_seconds = get_config('jobs', 'timeout_seconds') 618 while (time.perf_counter() - loop_start) < timeout_seconds: 619 if self.get_job_status(name, debug=debug) == 'stopped': 620 return True, 'Success' 621 622 time.sleep(check_timeout_interval) 623 624 return self.run_command( 625 ['stop', self.get_service_name(name, debug=debug)], 626 debug=debug, 627 ) 628 629 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 630 """ 631 Pause a job's service. 632 """ 633 job = self.get_hidden_job(name, debug=debug) 634 job.daemon._write_stop_file('pause') 635 return self.run_command( 636 ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)], 637 debug=debug, 638 ) 639 640 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 641 """ 642 Delete a job's service. 643 """ 644 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 645 job = self.get_hidden_job(name, debug=debug) 646 647 if not job.delete_after_completion: 648 _ = self.stop_job(name, debug=debug) 649 _ = self.run_command( 650 ['disable', self.get_service_name(name, debug=debug)], 651 debug=debug, 652 ) 653 654 service_job_path = self.get_service_job_path(name, debug=debug) 655 try: 656 if service_job_path.exists(): 657 shutil.rmtree(service_job_path) 658 except Exception as e: 659 warn(e) 660 return False, str(e) 661 662 service_logs_path = self.get_service_logs_path(name, debug=debug) 663 logs_paths = [ 664 (SYSTEMD_LOGS_RESOURCES_PATH / name) 665 for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH) 666 if name.startswith(service_logs_path.name + '.') 667 ] 668 paths = [ 669 self.get_service_file_path(name, debug=debug), 670 self.get_service_symlink_file_path(name, debug=debug), 671 self.get_socket_path(name, debug=debug), 672 self.get_result_path(name, debug=debug), 673 ] + logs_paths 674 675 for path in paths: 676 if path.exists(): 677 try: 678 path.unlink() 679 except Exception as e: 680 warn(e) 681 return False, str(e) 682 683 _ = job.delete() 684 685 return self.run_command(['daemon-reload'], debug=debug) 686 687 def get_logs(self, name: str, debug: bool = False) -> str: 688 """ 689 Return a job's journal logs. 690 """ 691 rotating_file = self.get_job_rotating_file(name, debug=debug) 692 return rotating_file.read() 693 694 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 695 """ 696 Return a job's stop time. 697 """ 698 job = self.get_hidden_job(name, debug=debug) 699 return job.stop_time 700 701 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 702 """ 703 Return whether a job is blocking on stdin. 704 """ 705 socket_path = self.get_socket_path(name, debug=debug) 706 blocking_path = socket_path.parent / (socket_path.name + '.block') 707 return blocking_path.exists() 708 709 def get_job_rotating_file(self, name: str, debug: bool = False): 710 """ 711 Return a `RotatingFile` for the job's log output. 712 """ 713 from meerschaum.utils.daemon import RotatingFile 714 service_logs_path = self.get_service_logs_path(name, debug=debug) 715 return RotatingFile(service_logs_path) 716 717 async def monitor_logs_async( 718 self, 719 name: str, 720 *args, 721 debug: bool = False, 722 **kwargs 723 ): 724 """ 725 Monitor a job's output. 726 """ 727 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 728 job = self.get_hidden_job(name, debug=debug) 729 kwargs.update({ 730 '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH, 731 '_log': self.get_job_rotating_file(name, debug=debug), 732 '_stdin_file': self.get_job_stdin_file(name, debug=debug), 733 'debug': debug, 734 }) 735 await job.monitor_logs_async(*args, **kwargs) 736 737 def monitor_logs(self, *args, **kwargs): 738 """ 739 Monitor a job's output. 740 """ 741 asyncio.run(self.monitor_logs_async(*args, **kwargs))
Execute Meerschaum jobs via systemd
.
38 def get_job_names(self, debug: bool = False) -> List[str]: 39 """ 40 Return a list of existing jobs, including hidden ones. 41 """ 42 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 43 return [ 44 service_name[len('mrsm-'):(-1 * len('.service'))] 45 for service_name in os.listdir(SYSTEMD_USER_RESOURCES_PATH) 46 if ( 47 service_name.startswith('mrsm-') 48 and service_name.endswith('.service') 49 ### Check for broken symlinks. 50 and (SYSTEMD_USER_RESOURCES_PATH / service_name).exists() 51 ) 52 ]
Return a list of existing jobs, including hidden ones.
54 def get_job_exists(self, name: str, debug: bool = False) -> bool: 55 """ 56 Return whether a job exists. 57 """ 58 user_services = self.get_job_names(debug=debug) 59 if debug: 60 dprint(f'Existing services: {user_services}') 61 return name in user_services
Return whether a job exists.
63 def get_jobs(self, debug: bool = False) -> Dict[str, Job]: 64 """ 65 Return a dictionary of `systemd` Jobs (including hidden jobs). 66 """ 67 user_services = self.get_job_names(debug=debug) 68 jobs = { 69 name: Job(name, executor_keys=str(self)) 70 for name in user_services 71 } 72 return { 73 name: job 74 for name, job in jobs.items() 75 }
Return a dictionary of systemd
Jobs (including hidden jobs).
77 def get_service_name(self, name: str, debug: bool = False) -> str: 78 """ 79 Return a job's service name. 80 """ 81 return f"mrsm-{name.replace(' ', '-')}.service"
Return a job's service name.
83 def get_service_job_path(self, name: str, debug: bool = False) -> pathlib.Path: 84 """ 85 Return the path for the job's files under the root directory. 86 """ 87 from meerschaum.config.paths import SYSTEMD_JOBS_RESOURCES_PATH 88 return SYSTEMD_JOBS_RESOURCES_PATH / name
Return the path for the job's files under the root directory.
90 def get_service_symlink_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 91 """ 92 Return the path to where to create the service symlink. 93 """ 94 from meerschaum.config.paths import SYSTEMD_USER_RESOURCES_PATH 95 return SYSTEMD_USER_RESOURCES_PATH / self.get_service_name(name, debug=debug)
Return the path to where to create the service symlink.
97 def get_service_file_path(self, name: str, debug: bool = False) -> pathlib.Path: 98 """ 99 Return the path to a Job's service file. 100 """ 101 return ( 102 self.get_service_job_path(name, debug=debug) 103 / self.get_service_name(name, debug=debug) 104 )
Return the path to a Job's service file.
106 def get_service_logs_path(self, name: str, debug: bool = False) -> pathlib.Path: 107 """ 108 Return the path to direct service logs to. 109 """ 110 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 111 return SYSTEMD_LOGS_RESOURCES_PATH / (self.get_service_name(name, debug=debug) + '.log')
Return the path to direct service logs to.
113 def get_socket_path(self, name: str, debug: bool = False) -> pathlib.Path: 114 """ 115 Return the path to the FIFO file. 116 """ 117 return ( 118 self.get_service_job_path(name, debug=debug) 119 / (self.get_service_name(name, debug=debug) + '.stdin') 120 )
Return the path to the FIFO file.
122 def get_result_path(self, name: str, debug: bool = False) -> pathlib.Path: 123 """ 124 Return the path to the result file. 125 """ 126 return ( 127 self.get_service_job_path(name, debug=debug) 128 / (self.get_service_name(name, debug=debug) + '.result.json') 129 )
Return the path to the result file.
131 def get_service_file_text(self, name: str, sysargs: List[str], debug: bool = False) -> str: 132 """ 133 Return the contents of the unit file. 134 """ 135 service_logs_path = self.get_service_logs_path(name, debug=debug) 136 socket_path = self.get_socket_path(name, debug=debug) 137 result_path = self.get_result_path(name, debug=debug) 138 job = self.get_hidden_job(name, debug=debug) 139 140 sysargs_str = shlex.join(sysargs) 141 exec_str = f'{sys.executable} -m meerschaum {sysargs_str}' 142 mrsm_env_var_names = set([var for var in STATIC_CONFIG['environment'].values()]) 143 mrsm_env_vars = { 144 key: val 145 for key, val in os.environ.items() 146 if key in mrsm_env_var_names 147 } 148 149 ### Add new environment variables for the service process. 150 mrsm_env_vars.update({ 151 STATIC_CONFIG['environment']['daemon_id']: name, 152 STATIC_CONFIG['environment']['systemd_log_path']: service_logs_path.as_posix(), 153 STATIC_CONFIG['environment']['systemd_result_path']: result_path.as_posix(), 154 STATIC_CONFIG['environment']['systemd_stdin_path']: socket_path.as_posix(), 155 STATIC_CONFIG['environment']['systemd_delete_job']: ( 156 '1' 157 if job.delete_after_completion 158 else '0', 159 ), 160 }) 161 162 ### Allow for user-defined environment variables. 163 mrsm_env_vars.update(job.env) 164 165 environment_lines = [ 166 f"Environment={key}={shlex.quote(str(val))}" 167 for key, val in mrsm_env_vars.items() 168 ] 169 environment_str = '\n'.join(environment_lines) 170 service_name = self.get_service_name(name, debug=debug) 171 172 service_text = ( 173 "[Unit]\n" 174 f"Description=Run the job '{name}'\n" 175 "\n" 176 "[Service]\n" 177 f"ExecStart={exec_str}\n" 178 "KillSignal=SIGTERM\n" 179 "Restart=always\n" 180 "RestartPreventExitStatus=0\n" 181 f"SyslogIdentifier={service_name}\n" 182 f"{environment_str}\n" 183 "\n" 184 "[Install]\n" 185 "WantedBy=default.target\n" 186 ) 187 return service_text
Return the contents of the unit file.
189 def get_socket_file_text(self, name: str, debug: bool = False) -> str: 190 """ 191 Return the contents of the socket file. 192 """ 193 service_name = self.get_service_name(name, debug=debug) 194 socket_path = self.get_socket_path(name, debug=debug) 195 socket_text = ( 196 "[Unit]\n" 197 f"BindsTo={service_name}\n" 198 "\n" 199 "[Socket]\n" 200 f"ListenFIFO={socket_path.as_posix()}\n" 201 "FileDescriptorName=stdin\n" 202 "RemoveOnStop=true\n" 203 "SocketMode=0660\n" 204 ) 205 return socket_text
Return the contents of the socket file.
231 def get_job_metadata(self, name: str, debug: bool = False) -> Dict[str, Any]: 232 """ 233 Return metadata about a job. 234 """ 235 now = time.perf_counter() 236 237 if '_jobs_metadata' not in self.__dict__: 238 self._jobs_metadata: Dict[str, Any] = {} 239 240 if name in self._jobs_metadata: 241 ts = self._jobs_metadata[name].get('timestamp', None) 242 243 if ts is not None and (now - ts) <= JOB_METADATA_CACHE_SECONDS: 244 if debug: 245 dprint(f"Retuning cached metadata for job '{name}'.") 246 return self._jobs_metadata[name]['metadata'] 247 248 metadata = { 249 'sysargs': self.get_job_sysargs(name, debug=debug), 250 'result': self.get_job_result(name, debug=debug), 251 'restart': self.get_job_restart(name, debug=debug), 252 'daemon': { 253 'status': self.get_job_status(name, debug=debug), 254 'pid': self.get_job_pid(name, debug=debug), 255 'properties': self.get_job_properties(name, debug=debug), 256 }, 257 } 258 self._jobs_metadata[name] = { 259 'timestamp': now, 260 'metadata': metadata, 261 } 262 return metadata
Return metadata about a job.
264 def get_job_restart(self, name: str, debug: bool = False) -> bool: 265 """ 266 Return whether a job restarts. 267 """ 268 from meerschaum.jobs._Job import RESTART_FLAGS 269 sysargs = self.get_job_sysargs(name, debug=debug) 270 if not sysargs: 271 return False 272 273 for flag in RESTART_FLAGS: 274 if flag in sysargs: 275 return True 276 277 return False
Return whether a job restarts.
279 def get_job_properties(self, name: str, debug: bool = False) -> Dict[str, Any]: 280 """ 281 Return the properties for a job. 282 """ 283 job = self.get_hidden_job(name, debug=debug) 284 return { 285 k: v for k, v in job.daemon.properties.items() 286 if k != 'externally_managed' 287 }
Return the properties for a job.
289 def get_job_process(self, name: str, debug: bool = False): 290 """ 291 Return a `psutil.Process` for the job's PID. 292 """ 293 pid = self.get_job_pid(name, debug=debug) 294 if pid is None: 295 return None 296 297 psutil = mrsm.attempt_import('psutil') 298 try: 299 return psutil.Process(pid) 300 except Exception: 301 return None
Return a psutil.Process
for the job's PID.
303 def get_job_status(self, name: str, debug: bool = False) -> str: 304 """ 305 Return the job's service status. 306 """ 307 output = self.run_command( 308 ['is-active', self.get_service_name(name, debug=debug)], 309 as_output=True, 310 debug=debug, 311 ) 312 313 if output == 'activating': 314 return 'running' 315 316 if output == 'active': 317 process = self.get_job_process(name, debug=debug) 318 if process is None: 319 return 'stopped' 320 321 try: 322 if process.status() == 'stopped': 323 return 'paused' 324 except Exception: 325 return 'stopped' 326 327 return 'running' 328 329 return 'stopped'
Return the job's service status.
331 def get_job_pid(self, name: str, debug: bool = False) -> Union[int, None]: 332 """ 333 Return the job's service PID. 334 """ 335 from meerschaum.utils.misc import is_int 336 337 output = self.run_command( 338 [ 339 'show', 340 self.get_service_name(name, debug=debug), 341 '--property=MainPID', 342 ], 343 as_output=True, 344 debug=debug, 345 ) 346 if not output.startswith('MainPID='): 347 return None 348 349 pid_str = output[len('MainPID='):] 350 if pid_str == '0': 351 return None 352 353 if is_int(pid_str): 354 return int(pid_str) 355 356 return None
Return the job's service PID.
358 def get_job_began(self, name: str, debug: bool = False) -> Union[str, None]: 359 """ 360 Return when a job began running. 361 """ 362 output = self.run_command( 363 [ 364 'show', 365 self.get_service_name(name, debug=debug), 366 '--property=ActiveEnterTimestamp' 367 ], 368 as_output=True, 369 debug=debug, 370 ) 371 if not output.startswith('ActiveEnterTimestamp'): 372 return None 373 374 dt_str = output.split('=')[-1] 375 if not dt_str: 376 return None 377 378 dateutil_parser = mrsm.attempt_import('dateutil.parser') 379 try: 380 dt = dateutil_parser.parse(dt_str) 381 except Exception as e: 382 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 383 return None 384 385 return dt.astimezone(timezone.utc).isoformat()
Return when a job began running.
387 def get_job_ended(self, name: str, debug: bool = False) -> Union[str, None]: 388 """ 389 Return when a job began running. 390 """ 391 output = self.run_command( 392 [ 393 'show', 394 self.get_service_name(name, debug=debug), 395 '--property=InactiveEnterTimestamp' 396 ], 397 as_output=True, 398 debug=debug, 399 ) 400 if not output.startswith('InactiveEnterTimestamp'): 401 return None 402 403 dt_str = output.split('=')[-1] 404 if not dt_str: 405 return None 406 407 dateutil_parser = mrsm.attempt_import('dateutil.parser') 408 409 try: 410 dt = dateutil_parser.parse(dt_str) 411 except Exception as e: 412 warn(f"Cannot parse '{output}' as a datetime:\n{e}") 413 return None 414 return dt.astimezone(timezone.utc).isoformat()
Return when a job began running.
416 def get_job_paused(self, name: str, debug: bool = False) -> Union[str, None]: 417 """ 418 Return when a job was paused. 419 """ 420 job = self.get_hidden_job(name, debug=debug) 421 if self.get_job_status(name, debug=debug) != 'paused': 422 return None 423 424 stop_time = job.stop_time 425 if stop_time is None: 426 return None 427 428 return stop_time.isoformat()
Return when a job was paused.
430 def get_job_result(self, name: str, debug: bool = False) -> SuccessTuple: 431 """ 432 Return the job's result SuccessTuple. 433 """ 434 result_path = self.get_result_path(name, debug=debug) 435 if not result_path.exists(): 436 return False, "No result available." 437 438 try: 439 with open(result_path, 'r', encoding='utf-8') as f: 440 result = json.load(f) 441 except Exception: 442 return False, f"Could not read result for Job '{name}'." 443 444 return tuple(result)
Return the job's result SuccessTuple.
446 def get_job_sysargs(self, name: str, debug: bool = False) -> Union[List[str], None]: 447 """ 448 Return the sysargs from the service file. 449 """ 450 service_file_path = self.get_service_file_path(name, debug=debug) 451 if not service_file_path.exists(): 452 return [] 453 454 with open(service_file_path, 'r', encoding='utf-8') as f: 455 service_lines = f.readlines() 456 457 for line in service_lines: 458 if line.startswith('ExecStart='): 459 sysargs_str = line.split(' -m meerschaum ')[-1].split('<')[0] 460 return shlex.split(sysargs_str) 461 462 return []
Return the sysargs from the service file.
464 def run_command( 465 self, 466 command_args: List[str], 467 as_output: bool = False, 468 debug: bool = False, 469 ) -> Union[SuccessTuple, str]: 470 """ 471 Run a `systemd` command and return success. 472 473 Parameters 474 ---------- 475 command_args: List[str] 476 The command to pass to `systemctl --user`. 477 478 as_output: bool, default False 479 If `True`, return the process stdout output. 480 Defaults to a `SuccessTuple`. 481 482 Returns 483 ------- 484 A `SuccessTuple` indicating success or a str for the process output. 485 """ 486 from meerschaum.utils.process import run_process 487 488 if command_args[:2] != ['systemctl', '--user']: 489 command_args = ['systemctl', '--user'] + command_args 490 491 if debug: 492 dprint(shlex.join(command_args)) 493 494 proc = run_process( 495 command_args, 496 foreground=False, 497 as_proc=True, 498 capture_output=True, 499 text=True, 500 ) 501 stdout, stderr = proc.communicate() 502 if debug: 503 dprint(f"{stdout}") 504 505 if as_output: 506 return stdout.strip() 507 508 command_success = proc.wait() == 0 509 command_msg = ( 510 "Success" 511 if command_success 512 else f"Failed to execute command `{shlex.join(command_args)}`." 513 ) 514 return command_success, command_msg
Run a systemd
command and return success.
Parameters
- command_args (List[str]):
The command to pass to
systemctl --user
. - as_output (bool, default False):
If
True
, return the process stdout output. Defaults to aSuccessTuple
.
Returns
- A
SuccessTuple
indicating success or a str for the process output.
516 def get_job_stdin_file(self, name: str, debug: bool = False): 517 """ 518 Return a `StdinFile` for the job. 519 """ 520 from meerschaum.utils.daemon import StdinFile 521 if '_stdin_files' not in self.__dict__: 522 self._stdin_files: Dict[str, StdinFile] = {} 523 524 if name not in self._stdin_files: 525 socket_path = self.get_socket_path(name, debug=debug) 526 socket_path.parent.mkdir(parents=True, exist_ok=True) 527 self._stdin_files[name] = StdinFile(socket_path) 528 529 return self._stdin_files[name]
Return a StdinFile
for the job.
531 def create_job( 532 self, 533 name: str, 534 sysargs: List[str], 535 properties: Optional[Dict[str, Any]] = None, 536 debug: bool = False, 537 ) -> SuccessTuple: 538 """ 539 Create a job as a service to be run by `systemd`. 540 """ 541 from meerschaum.utils.misc import make_symlink 542 service_name = self.get_service_name(name, debug=debug) 543 service_file_path = self.get_service_file_path(name, debug=debug) 544 service_symlink_file_path = self.get_service_symlink_file_path(name, debug=debug) 545 socket_stdin = self.get_job_stdin_file(name, debug=debug) 546 _ = socket_stdin.file_handler 547 548 ### Init the externally_managed file. 549 ### NOTE: We must write the pickle file in addition to the properties file. 550 job = self.get_hidden_job(name, sysargs=sysargs, properties=properties, debug=debug) 551 job._set_externally_managed() 552 pickle_success, pickle_msg = job.daemon.write_pickle() 553 if not pickle_success: 554 return pickle_success, pickle_msg 555 properties_success, properties_msg = job.daemon.write_properties() 556 if not properties_success: 557 return properties_success, properties_msg 558 559 service_file_path.parent.mkdir(parents=True, exist_ok=True) 560 561 with open(service_file_path, 'w+', encoding='utf-8') as f: 562 f.write(self.get_service_file_text(name, sysargs, debug=debug)) 563 564 symlink_success, symlink_msg = make_symlink(service_file_path, service_symlink_file_path) 565 if not symlink_success: 566 return symlink_success, symlink_msg 567 568 commands = [ 569 ['daemon-reload'], 570 ['enable', service_name], 571 ['start', service_name], 572 ] 573 574 fails = 0 575 for command_list in commands: 576 command_success, command_msg = self.run_command(command_list, debug=debug) 577 if not command_success: 578 fails += 1 579 580 if fails > 1: 581 return False, "Failed to reload systemd." 582 583 return True, f"Started job '{name}' via systemd."
Create a job as a service to be run by systemd
.
585 def start_job(self, name: str, debug: bool = False) -> SuccessTuple: 586 """ 587 Stop a job's service. 588 """ 589 job = self.get_hidden_job(name, debug=debug) 590 job.daemon._remove_stop_file() 591 592 status = self.get_job_status(name, debug=debug) 593 if status == 'paused': 594 return self.run_command( 595 ['kill', '-s', 'SIGCONT', self.get_service_name(name, debug=debug)], 596 debug=debug, 597 ) 598 599 return self.run_command( 600 ['start', self.get_service_name(name, debug=debug)], 601 debug=debug, 602 )
Stop a job's service.
604 def stop_job(self, name: str, debug: bool = False) -> SuccessTuple: 605 """ 606 Stop a job's service. 607 """ 608 job = self.get_hidden_job(name, debug=debug) 609 job.daemon._write_stop_file('quit') 610 sigint_success, sigint_msg = self.run_command( 611 ['kill', '-s', 'SIGINT', self.get_service_name(name, debug=debug)], 612 debug=debug, 613 ) 614 615 check_timeout_interval = get_config('jobs', 'check_timeout_interval_seconds') 616 loop_start = time.perf_counter() 617 timeout_seconds = get_config('jobs', 'timeout_seconds') 618 while (time.perf_counter() - loop_start) < timeout_seconds: 619 if self.get_job_status(name, debug=debug) == 'stopped': 620 return True, 'Success' 621 622 time.sleep(check_timeout_interval) 623 624 return self.run_command( 625 ['stop', self.get_service_name(name, debug=debug)], 626 debug=debug, 627 )
Stop a job's service.
629 def pause_job(self, name: str, debug: bool = False) -> SuccessTuple: 630 """ 631 Pause a job's service. 632 """ 633 job = self.get_hidden_job(name, debug=debug) 634 job.daemon._write_stop_file('pause') 635 return self.run_command( 636 ['kill', '-s', 'SIGSTOP', self.get_service_name(name, debug=debug)], 637 debug=debug, 638 )
Pause a job's service.
640 def delete_job(self, name: str, debug: bool = False) -> SuccessTuple: 641 """ 642 Delete a job's service. 643 """ 644 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 645 job = self.get_hidden_job(name, debug=debug) 646 647 if not job.delete_after_completion: 648 _ = self.stop_job(name, debug=debug) 649 _ = self.run_command( 650 ['disable', self.get_service_name(name, debug=debug)], 651 debug=debug, 652 ) 653 654 service_job_path = self.get_service_job_path(name, debug=debug) 655 try: 656 if service_job_path.exists(): 657 shutil.rmtree(service_job_path) 658 except Exception as e: 659 warn(e) 660 return False, str(e) 661 662 service_logs_path = self.get_service_logs_path(name, debug=debug) 663 logs_paths = [ 664 (SYSTEMD_LOGS_RESOURCES_PATH / name) 665 for name in os.listdir(SYSTEMD_LOGS_RESOURCES_PATH) 666 if name.startswith(service_logs_path.name + '.') 667 ] 668 paths = [ 669 self.get_service_file_path(name, debug=debug), 670 self.get_service_symlink_file_path(name, debug=debug), 671 self.get_socket_path(name, debug=debug), 672 self.get_result_path(name, debug=debug), 673 ] + logs_paths 674 675 for path in paths: 676 if path.exists(): 677 try: 678 path.unlink() 679 except Exception as e: 680 warn(e) 681 return False, str(e) 682 683 _ = job.delete() 684 685 return self.run_command(['daemon-reload'], debug=debug)
Delete a job's service.
687 def get_logs(self, name: str, debug: bool = False) -> str: 688 """ 689 Return a job's journal logs. 690 """ 691 rotating_file = self.get_job_rotating_file(name, debug=debug) 692 return rotating_file.read()
Return a job's journal logs.
694 def get_job_stop_time(self, name: str, debug: bool = False) -> Union[datetime, None]: 695 """ 696 Return a job's stop time. 697 """ 698 job = self.get_hidden_job(name, debug=debug) 699 return job.stop_time
Return a job's stop time.
701 def get_job_is_blocking_on_stdin(self, name: str, debug: bool = False) -> bool: 702 """ 703 Return whether a job is blocking on stdin. 704 """ 705 socket_path = self.get_socket_path(name, debug=debug) 706 blocking_path = socket_path.parent / (socket_path.name + '.block') 707 return blocking_path.exists()
Return whether a job is blocking on stdin.
709 def get_job_rotating_file(self, name: str, debug: bool = False): 710 """ 711 Return a `RotatingFile` for the job's log output. 712 """ 713 from meerschaum.utils.daemon import RotatingFile 714 service_logs_path = self.get_service_logs_path(name, debug=debug) 715 return RotatingFile(service_logs_path)
Return a RotatingFile
for the job's log output.
717 async def monitor_logs_async( 718 self, 719 name: str, 720 *args, 721 debug: bool = False, 722 **kwargs 723 ): 724 """ 725 Monitor a job's output. 726 """ 727 from meerschaum.config.paths import SYSTEMD_LOGS_RESOURCES_PATH 728 job = self.get_hidden_job(name, debug=debug) 729 kwargs.update({ 730 '_logs_path': SYSTEMD_LOGS_RESOURCES_PATH, 731 '_log': self.get_job_rotating_file(name, debug=debug), 732 '_stdin_file': self.get_job_stdin_file(name, debug=debug), 733 'debug': debug, 734 }) 735 await job.monitor_logs_async(*args, **kwargs)
Monitor a job's output.