meerschaum.utils.process
Custom process-handling functions.
See meerschaum.utils.pool
for multiprocessing and
meerschaum.utils.threading
for threads.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Custom process-handling functions. 7See `meerschaum.utils.pool` for multiprocessing and 8`meerschaum.utils.threading` for threads. 9""" 10 11from __future__ import annotations 12 13import os 14import signal 15import subprocess 16import sys 17import platform 18 19import meerschaum as mrsm 20from meerschaum.utils.typing import Union, Optional, Any, Callable, Dict, Tuple 21from meerschaum._internal.static import STATIC_CONFIG 22 23_child_processes = [] 24def signal_handler(sig, frame): 25 for child in _child_processes: 26 try: 27 child.send_signal(sig) 28 child.wait() 29 except Exception: 30 pass 31 32def run_process( 33 *args, 34 foreground: bool = False, 35 as_proc: bool = False, 36 line_callback: Optional[Callable[[bytes], Any]] = None, 37 store_proc_dict: Optional[Dict[str, Any]] = None, 38 store_proc_key: str = 'child_process', 39 capture_output: bool = False, 40 **kw: Any 41) -> Union[int, subprocess.Popen]: 42 """Original foreground solution found here: 43 https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess 44 45 Parameters 46 ---------- 47 *args: 48 The sysargs to execute. 49 50 foreground: bool, default False 51 If `True`, execute the process as a foreground process that passes Ctrl-C to children. 52 From the original post: 53 The "correct" way of spawning a new subprocess: 54 signals like C-c must only go 55 to the child process, and not to this python. 56 57 Some side-info about "how ctrl-c works": 58 https://unix.stackexchange.com/a/149756/1321 59 60 as_proc: bool, default False 61 If `True`, return the `subprocess.Popen` object. 62 63 line_callback: Optional[Callable[[str], Any]], default None 64 If provided, poll the process and execute the callback when `readline()` gets new text. 65 66 store_proc_dict: Optional[Dict[str, Any]], default None 67 If provided, store the `subprocess.Popen` object under the key `store_proc_key`. 68 Useful for accessing the process while it is polling in another thread. 69 70 store_proc_key: str, default 'child_process' 71 If `store_proc_dict` is provided, store the process in the dictionary under this key. 72 73 kw: Any 74 Additional keyword arguments to pass to `subprocess.Popen`. 75 76 Returns 77 ------- 78 Either an int for the return code or a `subprocess.Popen` object. 79 """ 80 try: 81 import termios 82 except ImportError: 83 termios = None 84 85 if platform.system() == 'Windows': 86 foreground = False 87 88 def print_line(line): 89 sys.stdout.write(line.decode('utf-8')) 90 sys.stdout.flush() 91 92 93 if capture_output or line_callback is not None: 94 kw['stdout'] = subprocess.PIPE 95 kw['stderr'] = subprocess.STDOUT 96 elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']): 97 kw['stdout'] = subprocess.PIPE 98 kw['stderr'] = subprocess.STDOUT 99 if line_callback is None: 100 line_callback = print_line 101 102 if 'env' not in kw: 103 kw['env'] = os.environ 104 105 user_preexec_fn = kw.get("preexec_fn", None) 106 107 if foreground: 108 try: 109 old_pgrp = os.tcgetpgrp(sys.stdin.fileno()) 110 except Exception as e: 111 termios = None 112 if termios: 113 try: 114 old_attr = termios.tcgetattr(sys.stdin.fileno()) 115 except Exception as e: 116 termios = None 117 118 def new_pgid(): 119 if user_preexec_fn: 120 user_preexec_fn() 121 122 # set a new process group id 123 os.setpgid(os.getpid(), os.getpid()) 124 125 # generally, the child process should stop itself 126 # before exec so the parent can set its new pgid. 127 # (setting pgid has to be done before the child execs). 128 # however, Python 'guarantee' that `preexec_fn` 129 # is run before `Popen` returns. 130 # this is because `Popen` waits for the closure of 131 # the error relay pipe '`errpipe_write`', 132 # which happens at child's exec. 133 # this is also the reason the child can't stop itself 134 # in Python's `Popen`, since the `Popen` call would never 135 # terminate then. 136 # `os.kill(os.getpid(), signal.SIGSTOP)` 137 138 if foreground: 139 kw['preexec_fn'] = new_pgid 140 141 try: 142 child = subprocess.Popen(*args, **kw) 143 _child_processes.append(child) 144 145 # we can't set the process group id from the parent since the child 146 # will already have exec'd. and we can't SIGSTOP it before exec, 147 # see above. 148 # `os.setpgid(child.pid, child.pid)` 149 150 if foreground: 151 # set the child's process group as new foreground 152 try: 153 os.tcsetpgrp(sys.stdin.fileno(), child.pid) 154 except Exception as e: 155 pass 156 # revive the child, 157 # because it may have been stopped due to SIGTTOU or 158 # SIGTTIN when it tried using stdout/stdin 159 # after setpgid was called, and before we made it 160 # forward process by tcsetpgrp. 161 os.kill(child.pid, signal.SIGCONT) 162 163 # wait for the child to terminate 164 if store_proc_dict is not None: 165 store_proc_dict[store_proc_key] = child 166 _ret = poll_process(child, line_callback) if line_callback is not None else child.wait() 167 ret = _ret if not as_proc else child 168 except KeyboardInterrupt: 169 child.send_signal(signal.SIGINT) 170 ret = child.wait() if not as_proc else child 171 finally: 172 if foreground: 173 # we have to mask SIGTTOU because tcsetpgrp 174 # raises SIGTTOU to all current background 175 # process group members (i.e. us) when switching tty's pgrp 176 # it we didn't do that, we'd get SIGSTOP'd 177 hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN) 178 # make us tty's foreground again 179 try: 180 os.tcsetpgrp(sys.stdin.fileno(), old_pgrp) 181 except Exception: 182 pass 183 # now restore the handler 184 signal.signal(signal.SIGTTOU, hdlr) 185 # restore terminal attributes 186 if termios: 187 termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr) 188 189 return ret 190 191 192def poll_process( 193 proc: subprocess.Popen, 194 line_callback: Callable[[bytes], Any], 195 timeout_seconds: Union[int, float, None] = None, 196 timeout_callback: Optional[Callable[[Any], Any]] = None, 197 timeout_callback_args: Optional[Tuple[Any]] = None, 198 timeout_callback_kwargs: Optional[Dict[str, Any]] = None, 199) -> int: 200 """ 201 Poll a process and execute a callback function for each line printed to the process's `stdout`. 202 """ 203 from meerschaum.utils.threading import Timer 204 from meerschaum.utils.warnings import warn 205 206 def timeout_handler(): 207 nonlocal timeout_callback_args, timeout_callback_kwargs 208 try: 209 if platform.system() != 'Windows': 210 ### The process being killed may have children. 211 os.killpg(os.getpgid(proc.pid), signal.SIGKILL) 212 else: 213 proc.send_signal(signal.CTRL_BREAK_EVENT) 214 proc.terminate() 215 except Exception as e: 216 warn(f"Failed to kill process:\n{e}") 217 if timeout_callback_args is None: 218 timeout_callback_args = [] 219 if timeout_callback_kwargs is None: 220 timeout_callback_kwargs = {} 221 timeout_callback(*timeout_callback_args, **timeout_callback_kwargs) 222 223 if timeout_seconds is not None: 224 watchdog_thread = Timer(timeout_seconds, timeout_handler) 225 watchdog_thread.daemon = True 226 watchdog_thread.start() 227 228 while proc.poll() is None: 229 line = proc.stdout.readline() 230 line_callback(line) 231 232 if timeout_seconds is not None: 233 watchdog_thread.cancel() 234 235 return proc.poll() 236 237 238def _stop_process( 239 proc: subprocess.Popen, 240 timeout_seconds: int = 8, 241): 242 """ 243 Stop a `subproccess.Popen` object. 244 """ 245 proc.terminate() 246 try: 247 proc.wait(timeout=timeout_seconds) 248 except subprocess.TimeoutExpired: 249 proc.kill()
def
signal_handler(sig, frame):
def
run_process( *args, foreground: bool = False, as_proc: bool = False, line_callback: Optional[Callable[[bytes], Any]] = None, store_proc_dict: Optional[Dict[str, Any]] = None, store_proc_key: str = 'child_process', capture_output: bool = False, **kw: Any) -> Union[int, subprocess.Popen]:
33def run_process( 34 *args, 35 foreground: bool = False, 36 as_proc: bool = False, 37 line_callback: Optional[Callable[[bytes], Any]] = None, 38 store_proc_dict: Optional[Dict[str, Any]] = None, 39 store_proc_key: str = 'child_process', 40 capture_output: bool = False, 41 **kw: Any 42) -> Union[int, subprocess.Popen]: 43 """Original foreground solution found here: 44 https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess 45 46 Parameters 47 ---------- 48 *args: 49 The sysargs to execute. 50 51 foreground: bool, default False 52 If `True`, execute the process as a foreground process that passes Ctrl-C to children. 53 From the original post: 54 The "correct" way of spawning a new subprocess: 55 signals like C-c must only go 56 to the child process, and not to this python. 57 58 Some side-info about "how ctrl-c works": 59 https://unix.stackexchange.com/a/149756/1321 60 61 as_proc: bool, default False 62 If `True`, return the `subprocess.Popen` object. 63 64 line_callback: Optional[Callable[[str], Any]], default None 65 If provided, poll the process and execute the callback when `readline()` gets new text. 66 67 store_proc_dict: Optional[Dict[str, Any]], default None 68 If provided, store the `subprocess.Popen` object under the key `store_proc_key`. 69 Useful for accessing the process while it is polling in another thread. 70 71 store_proc_key: str, default 'child_process' 72 If `store_proc_dict` is provided, store the process in the dictionary under this key. 73 74 kw: Any 75 Additional keyword arguments to pass to `subprocess.Popen`. 76 77 Returns 78 ------- 79 Either an int for the return code or a `subprocess.Popen` object. 80 """ 81 try: 82 import termios 83 except ImportError: 84 termios = None 85 86 if platform.system() == 'Windows': 87 foreground = False 88 89 def print_line(line): 90 sys.stdout.write(line.decode('utf-8')) 91 sys.stdout.flush() 92 93 94 if capture_output or line_callback is not None: 95 kw['stdout'] = subprocess.PIPE 96 kw['stderr'] = subprocess.STDOUT 97 elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']): 98 kw['stdout'] = subprocess.PIPE 99 kw['stderr'] = subprocess.STDOUT 100 if line_callback is None: 101 line_callback = print_line 102 103 if 'env' not in kw: 104 kw['env'] = os.environ 105 106 user_preexec_fn = kw.get("preexec_fn", None) 107 108 if foreground: 109 try: 110 old_pgrp = os.tcgetpgrp(sys.stdin.fileno()) 111 except Exception as e: 112 termios = None 113 if termios: 114 try: 115 old_attr = termios.tcgetattr(sys.stdin.fileno()) 116 except Exception as e: 117 termios = None 118 119 def new_pgid(): 120 if user_preexec_fn: 121 user_preexec_fn() 122 123 # set a new process group id 124 os.setpgid(os.getpid(), os.getpid()) 125 126 # generally, the child process should stop itself 127 # before exec so the parent can set its new pgid. 128 # (setting pgid has to be done before the child execs). 129 # however, Python 'guarantee' that `preexec_fn` 130 # is run before `Popen` returns. 131 # this is because `Popen` waits for the closure of 132 # the error relay pipe '`errpipe_write`', 133 # which happens at child's exec. 134 # this is also the reason the child can't stop itself 135 # in Python's `Popen`, since the `Popen` call would never 136 # terminate then. 137 # `os.kill(os.getpid(), signal.SIGSTOP)` 138 139 if foreground: 140 kw['preexec_fn'] = new_pgid 141 142 try: 143 child = subprocess.Popen(*args, **kw) 144 _child_processes.append(child) 145 146 # we can't set the process group id from the parent since the child 147 # will already have exec'd. and we can't SIGSTOP it before exec, 148 # see above. 149 # `os.setpgid(child.pid, child.pid)` 150 151 if foreground: 152 # set the child's process group as new foreground 153 try: 154 os.tcsetpgrp(sys.stdin.fileno(), child.pid) 155 except Exception as e: 156 pass 157 # revive the child, 158 # because it may have been stopped due to SIGTTOU or 159 # SIGTTIN when it tried using stdout/stdin 160 # after setpgid was called, and before we made it 161 # forward process by tcsetpgrp. 162 os.kill(child.pid, signal.SIGCONT) 163 164 # wait for the child to terminate 165 if store_proc_dict is not None: 166 store_proc_dict[store_proc_key] = child 167 _ret = poll_process(child, line_callback) if line_callback is not None else child.wait() 168 ret = _ret if not as_proc else child 169 except KeyboardInterrupt: 170 child.send_signal(signal.SIGINT) 171 ret = child.wait() if not as_proc else child 172 finally: 173 if foreground: 174 # we have to mask SIGTTOU because tcsetpgrp 175 # raises SIGTTOU to all current background 176 # process group members (i.e. us) when switching tty's pgrp 177 # it we didn't do that, we'd get SIGSTOP'd 178 hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN) 179 # make us tty's foreground again 180 try: 181 os.tcsetpgrp(sys.stdin.fileno(), old_pgrp) 182 except Exception: 183 pass 184 # now restore the handler 185 signal.signal(signal.SIGTTOU, hdlr) 186 # restore terminal attributes 187 if termios: 188 termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr) 189 190 return ret
Original foreground solution found here: https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
Parameters
- *args:: The sysargs to execute.
foreground (bool, default False): If
True
, execute the process as a foreground process that passes Ctrl-C to children. From the original post: The "correct" way of spawning a new subprocess: signals like C-c must only go to the child process, and not to this python.Some side-info about "how ctrl-c works": https://unix.stackexchange.com/a/149756/1321
- as_proc (bool, default False):
If
True
, return thesubprocess.Popen
object. - line_callback (Optional[Callable[[str], Any]], default None):
If provided, poll the process and execute the callback when
readline()
gets new text. - store_proc_dict (Optional[Dict[str, Any]], default None):
If provided, store the
subprocess.Popen
object under the keystore_proc_key
. Useful for accessing the process while it is polling in another thread. - store_proc_key (str, default 'child_process'):
If
store_proc_dict
is provided, store the process in the dictionary under this key. - kw (Any):
Additional keyword arguments to pass to
subprocess.Popen
.
Returns
- Either an int for the return code or a
subprocess.Popen
object.
def
poll_process( proc: subprocess.Popen, line_callback: Callable[[bytes], Any], timeout_seconds: Union[int, float, NoneType] = None, timeout_callback: Optional[Callable[[Any], Any]] = None, timeout_callback_args: Optional[Tuple[Any]] = None, timeout_callback_kwargs: Optional[Dict[str, Any]] = None) -> int:
193def poll_process( 194 proc: subprocess.Popen, 195 line_callback: Callable[[bytes], Any], 196 timeout_seconds: Union[int, float, None] = None, 197 timeout_callback: Optional[Callable[[Any], Any]] = None, 198 timeout_callback_args: Optional[Tuple[Any]] = None, 199 timeout_callback_kwargs: Optional[Dict[str, Any]] = None, 200) -> int: 201 """ 202 Poll a process and execute a callback function for each line printed to the process's `stdout`. 203 """ 204 from meerschaum.utils.threading import Timer 205 from meerschaum.utils.warnings import warn 206 207 def timeout_handler(): 208 nonlocal timeout_callback_args, timeout_callback_kwargs 209 try: 210 if platform.system() != 'Windows': 211 ### The process being killed may have children. 212 os.killpg(os.getpgid(proc.pid), signal.SIGKILL) 213 else: 214 proc.send_signal(signal.CTRL_BREAK_EVENT) 215 proc.terminate() 216 except Exception as e: 217 warn(f"Failed to kill process:\n{e}") 218 if timeout_callback_args is None: 219 timeout_callback_args = [] 220 if timeout_callback_kwargs is None: 221 timeout_callback_kwargs = {} 222 timeout_callback(*timeout_callback_args, **timeout_callback_kwargs) 223 224 if timeout_seconds is not None: 225 watchdog_thread = Timer(timeout_seconds, timeout_handler) 226 watchdog_thread.daemon = True 227 watchdog_thread.start() 228 229 while proc.poll() is None: 230 line = proc.stdout.readline() 231 line_callback(line) 232 233 if timeout_seconds is not None: 234 watchdog_thread.cancel() 235 236 return proc.poll()
Poll a process and execute a callback function for each line printed to the process's stdout
.