meerschaum.utils.process
Custom process-handling functions.
See meerschaum.utils.pool
for multiprocessing and
meerschaum.utils.threading
for threads.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Custom process-handling functions. 7See `meerschaum.utils.pool` for multiprocessing and 8`meerschaum.utils.threading` for threads. 9""" 10 11from __future__ import annotations 12 13import os 14import signal 15import subprocess 16import sys 17import platform 18 19import meerschaum as mrsm 20from meerschaum.utils.typing import Union, Optional, Any, Callable, Dict, Tuple 21from meerschaum.config.static import STATIC_CONFIG 22 23_child_processes = [] 24def signal_handler(sig, frame): 25 for child in _child_processes: 26 try: 27 child.send_signal(sig) 28 child.wait() 29 except Exception: 30 pass 31 32def run_process( 33 *args, 34 foreground: bool = False, 35 as_proc: bool = False, 36 line_callback: Optional[Callable[[bytes], Any]] = None, 37 store_proc_dict: Optional[Dict[str, Any]] = None, 38 store_proc_key: str = 'child_process', 39 capture_output: bool = False, 40 **kw: Any 41) -> Union[int, subprocess.Popen]: 42 """Original foreground solution found here: 43 https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess 44 45 Parameters 46 ---------- 47 *args: 48 The sysargs to execute. 49 50 foreground: bool, default False 51 If `True`, execute the process as a foreground process that passes Ctrl-C to children. 52 From the original post: 53 The "correct" way of spawning a new subprocess: 54 signals like C-c must only go 55 to the child process, and not to this python. 56 57 Some side-info about "how ctrl-c works": 58 https://unix.stackexchange.com/a/149756/1321 59 60 as_proc: bool, default False 61 If `True`, return the `subprocess.Popen` object. 62 63 line_callback: Optional[Callable[[str], Any]], default None 64 If provided, poll the process and execute the callback when `readline()` gets new text. 65 66 store_proc_dict: Optional[Dict[str, Any]], default None 67 If provided, store the `subprocess.Popen` object under the key `store_proc_key`. 68 Useful for accessing the process while it is polling in another thread. 69 70 store_proc_key: str, default 'child_process' 71 If `store_proc_dict` is provided, store the process in the dictionary under this key. 72 73 kw: Any 74 Additional keyword arguments to pass to `subprocess.Popen`. 75 76 Returns 77 ------- 78 Either an int for the return code or a `subprocess.Popen` object. 79 """ 80 try: 81 import termios 82 except ImportError: 83 termios = None 84 85 if platform.system() == 'Windows': 86 foreground = False 87 88 def print_line(line): 89 sys.stdout.write(line.decode('utf-8')) 90 sys.stdout.flush() 91 92 93 if capture_output or line_callback is not None: 94 kw['stdout'] = subprocess.PIPE 95 kw['stderr'] = subprocess.STDOUT 96 elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']): 97 kw['stdout'] = subprocess.PIPE 98 kw['stderr'] = subprocess.STDOUT 99 if line_callback is None: 100 line_callback = print_line 101 102 if 'env' not in kw: 103 kw['env'] = os.environ 104 105 user_preexec_fn = kw.get("preexec_fn", None) 106 107 if foreground: 108 try: 109 old_pgrp = os.tcgetpgrp(sys.stdin.fileno()) 110 except Exception as e: 111 termios = None 112 if termios: 113 try: 114 old_attr = termios.tcgetattr(sys.stdin.fileno()) 115 except Exception as e: 116 termios = None 117 118 def new_pgid(): 119 if user_preexec_fn: 120 user_preexec_fn() 121 122 # set a new process group id 123 os.setpgid(os.getpid(), os.getpid()) 124 125 # generally, the child process should stop itself 126 # before exec so the parent can set its new pgid. 127 # (setting pgid has to be done before the child execs). 128 # however, Python 'guarantee' that `preexec_fn` 129 # is run before `Popen` returns. 130 # this is because `Popen` waits for the closure of 131 # the error relay pipe '`errpipe_write`', 132 # which happens at child's exec. 133 # this is also the reason the child can't stop itself 134 # in Python's `Popen`, since the `Popen` call would never 135 # terminate then. 136 # `os.kill(os.getpid(), signal.SIGSTOP)` 137 138 if foreground: 139 kw['preexec_fn'] = new_pgid 140 141 try: 142 child = subprocess.Popen(*args, **kw) 143 _child_processes.append(child) 144 145 # we can't set the process group id from the parent since the child 146 # will already have exec'd. and we can't SIGSTOP it before exec, 147 # see above. 148 # `os.setpgid(child.pid, child.pid)` 149 150 if foreground: 151 # set the child's process group as new foreground 152 try: 153 os.tcsetpgrp(sys.stdin.fileno(), child.pid) 154 except Exception as e: 155 pass 156 # revive the child, 157 # because it may have been stopped due to SIGTTOU or 158 # SIGTTIN when it tried using stdout/stdin 159 # after setpgid was called, and before we made it 160 # forward process by tcsetpgrp. 161 os.kill(child.pid, signal.SIGCONT) 162 163 # wait for the child to terminate 164 if store_proc_dict is not None: 165 store_proc_dict[store_proc_key] = child 166 _ret = poll_process(child, line_callback) if line_callback is not None else child.wait() 167 ret = _ret if not as_proc else child 168 except KeyboardInterrupt: 169 child.send_signal(signal.SIGINT) 170 ret = child.wait() if not as_proc else child 171 finally: 172 if foreground: 173 # we have to mask SIGTTOU because tcsetpgrp 174 # raises SIGTTOU to all current background 175 # process group members (i.e. us) when switching tty's pgrp 176 # it we didn't do that, we'd get SIGSTOP'd 177 hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN) 178 # make us tty's foreground again 179 try: 180 os.tcsetpgrp(sys.stdin.fileno(), old_pgrp) 181 except Exception as e: 182 pass 183 # now restore the handler 184 signal.signal(signal.SIGTTOU, hdlr) 185 # restore terminal attributes 186 if termios: 187 termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr) 188 189 return ret 190 191 192def poll_process( 193 proc: subprocess.Popen, 194 line_callback: Callable[[bytes], Any], 195 timeout_seconds: Union[int, float, None] = None, 196 timeout_callback: Optional[Callable[[Any], Any]] = None, 197 timeout_callback_args: Optional[Tuple[Any]] = None, 198 timeout_callback_kwargs: Optional[Dict[str, Any]] = None, 199) -> int: 200 """ 201 Poll a process and execute a callback function for each line printed to the process's `stdout`. 202 """ 203 from meerschaum.utils.threading import Timer 204 from meerschaum.utils.warnings import warn 205 206 def timeout_handler(): 207 nonlocal timeout_callback_args, timeout_callback_kwargs 208 try: 209 if platform.system() != 'Windows': 210 ### The process being killed may have children. 211 os.killpg(os.getpgid(proc.pid), signal.SIGKILL) 212 else: 213 proc.send_signal(signal.CTRL_BREAK_EVENT) 214 proc.terminate() 215 except Exception as e: 216 warn(f"Failed to kill process:\n{e}") 217 if timeout_callback_args is None: 218 timeout_callback_args = [] 219 if timeout_callback_kwargs is None: 220 timeout_callback_kwargs = {} 221 timeout_callback(*timeout_callback_args, **timeout_callback_kwargs) 222 223 if timeout_seconds is not None: 224 watchdog_thread = Timer(timeout_seconds, timeout_handler) 225 watchdog_thread.daemon = True 226 watchdog_thread.start() 227 228 while proc.poll() is None: 229 line = proc.stdout.readline() 230 line_callback(line) 231 232 if timeout_seconds is not None: 233 watchdog_thread.cancel() 234 235 return proc.poll() 236 237 238def _stop_process( 239 proc: subprocess.Popen, 240 timeout_seconds: int = 8, 241): 242 """ 243 Stop a `subproccess.Popen` object. 244 """ 245 proc.terminate() 246 try: 247 proc.wait(timeout=timeout_seconds) 248 except subprocess.TimeoutExpired: 249 proc.kill()
def
signal_handler(sig, frame):
def
run_process( *args, foreground: bool = False, as_proc: bool = False, line_callback: Optional[Callable[[bytes], Any]] = None, store_proc_dict: Optional[Dict[str, Any]] = None, store_proc_key: str = 'child_process', capture_output: bool = False, **kw: Any) -> Union[int, subprocess.Popen]:
33def run_process( 34 *args, 35 foreground: bool = False, 36 as_proc: bool = False, 37 line_callback: Optional[Callable[[bytes], Any]] = None, 38 store_proc_dict: Optional[Dict[str, Any]] = None, 39 store_proc_key: str = 'child_process', 40 capture_output: bool = False, 41 **kw: Any 42) -> Union[int, subprocess.Popen]: 43 """Original foreground solution found here: 44 https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess 45 46 Parameters 47 ---------- 48 *args: 49 The sysargs to execute. 50 51 foreground: bool, default False 52 If `True`, execute the process as a foreground process that passes Ctrl-C to children. 53 From the original post: 54 The "correct" way of spawning a new subprocess: 55 signals like C-c must only go 56 to the child process, and not to this python. 57 58 Some side-info about "how ctrl-c works": 59 https://unix.stackexchange.com/a/149756/1321 60 61 as_proc: bool, default False 62 If `True`, return the `subprocess.Popen` object. 63 64 line_callback: Optional[Callable[[str], Any]], default None 65 If provided, poll the process and execute the callback when `readline()` gets new text. 66 67 store_proc_dict: Optional[Dict[str, Any]], default None 68 If provided, store the `subprocess.Popen` object under the key `store_proc_key`. 69 Useful for accessing the process while it is polling in another thread. 70 71 store_proc_key: str, default 'child_process' 72 If `store_proc_dict` is provided, store the process in the dictionary under this key. 73 74 kw: Any 75 Additional keyword arguments to pass to `subprocess.Popen`. 76 77 Returns 78 ------- 79 Either an int for the return code or a `subprocess.Popen` object. 80 """ 81 try: 82 import termios 83 except ImportError: 84 termios = None 85 86 if platform.system() == 'Windows': 87 foreground = False 88 89 def print_line(line): 90 sys.stdout.write(line.decode('utf-8')) 91 sys.stdout.flush() 92 93 94 if capture_output or line_callback is not None: 95 kw['stdout'] = subprocess.PIPE 96 kw['stderr'] = subprocess.STDOUT 97 elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']): 98 kw['stdout'] = subprocess.PIPE 99 kw['stderr'] = subprocess.STDOUT 100 if line_callback is None: 101 line_callback = print_line 102 103 if 'env' not in kw: 104 kw['env'] = os.environ 105 106 user_preexec_fn = kw.get("preexec_fn", None) 107 108 if foreground: 109 try: 110 old_pgrp = os.tcgetpgrp(sys.stdin.fileno()) 111 except Exception as e: 112 termios = None 113 if termios: 114 try: 115 old_attr = termios.tcgetattr(sys.stdin.fileno()) 116 except Exception as e: 117 termios = None 118 119 def new_pgid(): 120 if user_preexec_fn: 121 user_preexec_fn() 122 123 # set a new process group id 124 os.setpgid(os.getpid(), os.getpid()) 125 126 # generally, the child process should stop itself 127 # before exec so the parent can set its new pgid. 128 # (setting pgid has to be done before the child execs). 129 # however, Python 'guarantee' that `preexec_fn` 130 # is run before `Popen` returns. 131 # this is because `Popen` waits for the closure of 132 # the error relay pipe '`errpipe_write`', 133 # which happens at child's exec. 134 # this is also the reason the child can't stop itself 135 # in Python's `Popen`, since the `Popen` call would never 136 # terminate then. 137 # `os.kill(os.getpid(), signal.SIGSTOP)` 138 139 if foreground: 140 kw['preexec_fn'] = new_pgid 141 142 try: 143 child = subprocess.Popen(*args, **kw) 144 _child_processes.append(child) 145 146 # we can't set the process group id from the parent since the child 147 # will already have exec'd. and we can't SIGSTOP it before exec, 148 # see above. 149 # `os.setpgid(child.pid, child.pid)` 150 151 if foreground: 152 # set the child's process group as new foreground 153 try: 154 os.tcsetpgrp(sys.stdin.fileno(), child.pid) 155 except Exception as e: 156 pass 157 # revive the child, 158 # because it may have been stopped due to SIGTTOU or 159 # SIGTTIN when it tried using stdout/stdin 160 # after setpgid was called, and before we made it 161 # forward process by tcsetpgrp. 162 os.kill(child.pid, signal.SIGCONT) 163 164 # wait for the child to terminate 165 if store_proc_dict is not None: 166 store_proc_dict[store_proc_key] = child 167 _ret = poll_process(child, line_callback) if line_callback is not None else child.wait() 168 ret = _ret if not as_proc else child 169 except KeyboardInterrupt: 170 child.send_signal(signal.SIGINT) 171 ret = child.wait() if not as_proc else child 172 finally: 173 if foreground: 174 # we have to mask SIGTTOU because tcsetpgrp 175 # raises SIGTTOU to all current background 176 # process group members (i.e. us) when switching tty's pgrp 177 # it we didn't do that, we'd get SIGSTOP'd 178 hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN) 179 # make us tty's foreground again 180 try: 181 os.tcsetpgrp(sys.stdin.fileno(), old_pgrp) 182 except Exception as e: 183 pass 184 # now restore the handler 185 signal.signal(signal.SIGTTOU, hdlr) 186 # restore terminal attributes 187 if termios: 188 termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr) 189 190 return ret
Original foreground solution found here: https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
Parameters
- *args:: The sysargs to execute.
foreground (bool, default False): If
True
, execute the process as a foreground process that passes Ctrl-C to children. From the original post: The "correct" way of spawning a new subprocess: signals like C-c must only go to the child process, and not to this python.Some side-info about "how ctrl-c works": https://unix.stackexchange.com/a/149756/1321
- as_proc (bool, default False):
If
True
, return thesubprocess.Popen
object. - line_callback (Optional[Callable[[str], Any]], default None):
If provided, poll the process and execute the callback when
readline()
gets new text. - store_proc_dict (Optional[Dict[str, Any]], default None):
If provided, store the
subprocess.Popen
object under the keystore_proc_key
. Useful for accessing the process while it is polling in another thread. - store_proc_key (str, default 'child_process'):
If
store_proc_dict
is provided, store the process in the dictionary under this key. - kw (Any):
Additional keyword arguments to pass to
subprocess.Popen
.
Returns
- Either an int for the return code or a
subprocess.Popen
object.
def
poll_process( proc: subprocess.Popen, line_callback: Callable[[bytes], Any], timeout_seconds: Union[int, float, NoneType] = None, timeout_callback: Optional[Callable[[Any], Any]] = None, timeout_callback_args: Optional[Tuple[Any]] = None, timeout_callback_kwargs: Optional[Dict[str, Any]] = None) -> int:
193def poll_process( 194 proc: subprocess.Popen, 195 line_callback: Callable[[bytes], Any], 196 timeout_seconds: Union[int, float, None] = None, 197 timeout_callback: Optional[Callable[[Any], Any]] = None, 198 timeout_callback_args: Optional[Tuple[Any]] = None, 199 timeout_callback_kwargs: Optional[Dict[str, Any]] = None, 200) -> int: 201 """ 202 Poll a process and execute a callback function for each line printed to the process's `stdout`. 203 """ 204 from meerschaum.utils.threading import Timer 205 from meerschaum.utils.warnings import warn 206 207 def timeout_handler(): 208 nonlocal timeout_callback_args, timeout_callback_kwargs 209 try: 210 if platform.system() != 'Windows': 211 ### The process being killed may have children. 212 os.killpg(os.getpgid(proc.pid), signal.SIGKILL) 213 else: 214 proc.send_signal(signal.CTRL_BREAK_EVENT) 215 proc.terminate() 216 except Exception as e: 217 warn(f"Failed to kill process:\n{e}") 218 if timeout_callback_args is None: 219 timeout_callback_args = [] 220 if timeout_callback_kwargs is None: 221 timeout_callback_kwargs = {} 222 timeout_callback(*timeout_callback_args, **timeout_callback_kwargs) 223 224 if timeout_seconds is not None: 225 watchdog_thread = Timer(timeout_seconds, timeout_handler) 226 watchdog_thread.daemon = True 227 watchdog_thread.start() 228 229 while proc.poll() is None: 230 line = proc.stdout.readline() 231 line_callback(line) 232 233 if timeout_seconds is not None: 234 watchdog_thread.cancel() 235 236 return proc.poll()
Poll a process and execute a callback function for each line printed to the process's stdout
.