meerschaum.utils.process
Custom process-handling functions.
See meerschaum.utils.pool
for multiprocessing and
meerschaum.utils.threading
for threads.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Custom process-handling functions. 7See `meerschaum.utils.pool` for multiprocessing and 8`meerschaum.utils.threading` for threads. 9""" 10 11from __future__ import annotations 12import os, signal, subprocess, sys, platform, traceback 13from meerschaum.utils.typing import Union, Optional, Any, Callable, Dict, Tuple 14from meerschaum.config.static import STATIC_CONFIG 15 16_child_processes = [] 17def signal_handler(sig, frame): 18 for child in _child_processes: 19 child.send_signal(sig) 20 child.wait() 21 22def run_process( 23 *args, 24 foreground: bool = False, 25 as_proc: bool = False, 26 line_callback: Optional[Callable[[bytes], Any]] = None, 27 store_proc_dict: Optional[Dict[str, Any]] = None, 28 store_proc_key: str = 'child_process', 29 capture_output: bool = False, 30 **kw: Any 31) -> Union[int, subprocess.Popen]: 32 """Original foreground solution found here: 33 https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess 34 35 Parameters 36 ---------- 37 *args: 38 The sysargs to execute. 39 40 foreground: bool, default False 41 If `True`, execute the process as a foreground process that passes Ctrl-C to children. 42 From the original post: 43 The "correct" way of spawning a new subprocess: 44 signals like C-c must only go 45 to the child process, and not to this python. 46 47 Some side-info about "how ctrl-c works": 48 https://unix.stackexchange.com/a/149756/1321 49 50 as_proc: bool, default False 51 If `True`, return the `subprocess.Popen` object. 52 53 line_callback: Optional[Callable[[str], Any]], default None 54 If provided, poll the process and execute the callback when `readline()` gets new text. 55 56 store_proc_dict: Optional[Dict[str, Any]], default None 57 If provided, store the `subprocess.Popen` object under the key `store_proc_key`. 58 Useful for accessing the process while it is polling in another thread. 59 60 store_proc_key: str, default 'child_process' 61 If `store_proc_dict` is provided, store the process in the dictionary under this key. 62 63 kw: Any 64 Additional keyword arguments to pass to `subprocess.Popen`. 65 66 Returns 67 ------- 68 Either an int for the return code or a `subprocess.Popen` object. 69 """ 70 try: 71 import termios 72 except ImportError: 73 termios = None 74 75 if platform.system() == 'Windows': 76 foreground = False 77 78 def print_line(line): 79 sys.stdout.write(line.decode('utf-8')) 80 sys.stdout.flush() 81 82 83 if capture_output or line_callback is not None: 84 kw['stdout'] = subprocess.PIPE 85 kw['stderr'] = subprocess.STDOUT 86 elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']): 87 kw['stdout'] = subprocess.PIPE 88 kw['stderr'] = subprocess.STDOUT 89 if line_callback is None: 90 line_callback = print_line 91 92 if 'env' not in kw: 93 kw['env'] = os.environ 94 95 user_preexec_fn = kw.get("preexec_fn", None) 96 97 if foreground: 98 try: 99 old_pgrp = os.tcgetpgrp(sys.stdin.fileno()) 100 except Exception as e: 101 termios = None 102 if termios: 103 try: 104 old_attr = termios.tcgetattr(sys.stdin.fileno()) 105 except Exception as e: 106 termios = None 107 108 def new_pgid(): 109 if user_preexec_fn: 110 user_preexec_fn() 111 112 # set a new process group id 113 os.setpgid(os.getpid(), os.getpid()) 114 115 # generally, the child process should stop itself 116 # before exec so the parent can set its new pgid. 117 # (setting pgid has to be done before the child execs). 118 # however, Python 'guarantee' that `preexec_fn` 119 # is run before `Popen` returns. 120 # this is because `Popen` waits for the closure of 121 # the error relay pipe '`errpipe_write`', 122 # which happens at child's exec. 123 # this is also the reason the child can't stop itself 124 # in Python's `Popen`, since the `Popen` call would never 125 # terminate then. 126 # `os.kill(os.getpid(), signal.SIGSTOP)` 127 128 if foreground: 129 kw['preexec_fn'] = new_pgid 130 131 try: 132 child = subprocess.Popen(*args, **kw) 133 _child_processes.append(child) 134 135 # we can't set the process group id from the parent since the child 136 # will already have exec'd. and we can't SIGSTOP it before exec, 137 # see above. 138 # `os.setpgid(child.pid, child.pid)` 139 140 if foreground: 141 # set the child's process group as new foreground 142 try: 143 os.tcsetpgrp(sys.stdin.fileno(), child.pid) 144 except Exception as e: 145 pass 146 # revive the child, 147 # because it may have been stopped due to SIGTTOU or 148 # SIGTTIN when it tried using stdout/stdin 149 # after setpgid was called, and before we made it 150 # forward process by tcsetpgrp. 151 os.kill(child.pid, signal.SIGCONT) 152 153 # wait for the child to terminate 154 if store_proc_dict is not None: 155 store_proc_dict[store_proc_key] = child 156 _ret = poll_process(child, line_callback) if line_callback is not None else child.wait() 157 ret = _ret if not as_proc else child 158 except KeyboardInterrupt: 159 child.send_signal(signal.SIGINT) 160 ret = child.wait() if not as_proc else child 161 finally: 162 if foreground: 163 # we have to mask SIGTTOU because tcsetpgrp 164 # raises SIGTTOU to all current background 165 # process group members (i.e. us) when switching tty's pgrp 166 # it we didn't do that, we'd get SIGSTOP'd 167 hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN) 168 # make us tty's foreground again 169 try: 170 os.tcsetpgrp(sys.stdin.fileno(), old_pgrp) 171 except Exception as e: 172 pass 173 # now restore the handler 174 signal.signal(signal.SIGTTOU, hdlr) 175 # restore terminal attributes 176 if termios: 177 termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr) 178 179 return ret 180 181 182def poll_process( 183 proc: subprocess.Popen, 184 line_callback: Callable[[bytes], Any], 185 timeout_seconds: Union[int, float, None] = None, 186 timeout_callback: Optional[Callable[[Any], Any]] = None, 187 timeout_callback_args: Optional[Tuple[Any]] = None, 188 timeout_callback_kwargs: Optional[Dict[str, Any]] = None, 189) -> int: 190 """ 191 Poll a process and execute a callback function for each line printed to the process's `stdout`. 192 """ 193 from meerschaum.utils.threading import Timer 194 from meerschaum.utils.warnings import warn 195 196 def timeout_handler(): 197 nonlocal timeout_callback_args, timeout_callback_kwargs 198 try: 199 if platform.system() != 'Windows': 200 ### The process being killed may have children. 201 os.killpg(os.getpgid(proc.pid), signal.SIGKILL) 202 else: 203 proc.send_signal(signal.CTRL_BREAK_EVENT) 204 proc.terminate() 205 except Exception as e: 206 warn(f"Failed to kill process:\n{e}") 207 if timeout_callback_args is None: 208 timeout_callback_args = [] 209 if timeout_callback_kwargs is None: 210 timeout_callback_kwargs = {} 211 timeout_callback(*timeout_callback_args, **timeout_callback_kwargs) 212 213 if timeout_seconds is not None: 214 watchdog_thread = Timer(timeout_seconds, timeout_handler) 215 watchdog_thread.daemon = True 216 watchdog_thread.start() 217 218 while proc.poll() is None: 219 line = proc.stdout.readline() 220 line_callback(line) 221 222 if timeout_seconds is not None: 223 watchdog_thread.cancel() 224 225 return proc.poll()
def
signal_handler(sig, frame):
def
run_process( *args, foreground: bool = False, as_proc: bool = False, line_callback: Optional[Callable[[bytes], Any]] = None, store_proc_dict: Optional[Dict[str, Any]] = None, store_proc_key: str = 'child_process', capture_output: bool = False, **kw: Any) -> Union[int, subprocess.Popen]:
23def run_process( 24 *args, 25 foreground: bool = False, 26 as_proc: bool = False, 27 line_callback: Optional[Callable[[bytes], Any]] = None, 28 store_proc_dict: Optional[Dict[str, Any]] = None, 29 store_proc_key: str = 'child_process', 30 capture_output: bool = False, 31 **kw: Any 32) -> Union[int, subprocess.Popen]: 33 """Original foreground solution found here: 34 https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess 35 36 Parameters 37 ---------- 38 *args: 39 The sysargs to execute. 40 41 foreground: bool, default False 42 If `True`, execute the process as a foreground process that passes Ctrl-C to children. 43 From the original post: 44 The "correct" way of spawning a new subprocess: 45 signals like C-c must only go 46 to the child process, and not to this python. 47 48 Some side-info about "how ctrl-c works": 49 https://unix.stackexchange.com/a/149756/1321 50 51 as_proc: bool, default False 52 If `True`, return the `subprocess.Popen` object. 53 54 line_callback: Optional[Callable[[str], Any]], default None 55 If provided, poll the process and execute the callback when `readline()` gets new text. 56 57 store_proc_dict: Optional[Dict[str, Any]], default None 58 If provided, store the `subprocess.Popen` object under the key `store_proc_key`. 59 Useful for accessing the process while it is polling in another thread. 60 61 store_proc_key: str, default 'child_process' 62 If `store_proc_dict` is provided, store the process in the dictionary under this key. 63 64 kw: Any 65 Additional keyword arguments to pass to `subprocess.Popen`. 66 67 Returns 68 ------- 69 Either an int for the return code or a `subprocess.Popen` object. 70 """ 71 try: 72 import termios 73 except ImportError: 74 termios = None 75 76 if platform.system() == 'Windows': 77 foreground = False 78 79 def print_line(line): 80 sys.stdout.write(line.decode('utf-8')) 81 sys.stdout.flush() 82 83 84 if capture_output or line_callback is not None: 85 kw['stdout'] = subprocess.PIPE 86 kw['stderr'] = subprocess.STDOUT 87 elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']): 88 kw['stdout'] = subprocess.PIPE 89 kw['stderr'] = subprocess.STDOUT 90 if line_callback is None: 91 line_callback = print_line 92 93 if 'env' not in kw: 94 kw['env'] = os.environ 95 96 user_preexec_fn = kw.get("preexec_fn", None) 97 98 if foreground: 99 try: 100 old_pgrp = os.tcgetpgrp(sys.stdin.fileno()) 101 except Exception as e: 102 termios = None 103 if termios: 104 try: 105 old_attr = termios.tcgetattr(sys.stdin.fileno()) 106 except Exception as e: 107 termios = None 108 109 def new_pgid(): 110 if user_preexec_fn: 111 user_preexec_fn() 112 113 # set a new process group id 114 os.setpgid(os.getpid(), os.getpid()) 115 116 # generally, the child process should stop itself 117 # before exec so the parent can set its new pgid. 118 # (setting pgid has to be done before the child execs). 119 # however, Python 'guarantee' that `preexec_fn` 120 # is run before `Popen` returns. 121 # this is because `Popen` waits for the closure of 122 # the error relay pipe '`errpipe_write`', 123 # which happens at child's exec. 124 # this is also the reason the child can't stop itself 125 # in Python's `Popen`, since the `Popen` call would never 126 # terminate then. 127 # `os.kill(os.getpid(), signal.SIGSTOP)` 128 129 if foreground: 130 kw['preexec_fn'] = new_pgid 131 132 try: 133 child = subprocess.Popen(*args, **kw) 134 _child_processes.append(child) 135 136 # we can't set the process group id from the parent since the child 137 # will already have exec'd. and we can't SIGSTOP it before exec, 138 # see above. 139 # `os.setpgid(child.pid, child.pid)` 140 141 if foreground: 142 # set the child's process group as new foreground 143 try: 144 os.tcsetpgrp(sys.stdin.fileno(), child.pid) 145 except Exception as e: 146 pass 147 # revive the child, 148 # because it may have been stopped due to SIGTTOU or 149 # SIGTTIN when it tried using stdout/stdin 150 # after setpgid was called, and before we made it 151 # forward process by tcsetpgrp. 152 os.kill(child.pid, signal.SIGCONT) 153 154 # wait for the child to terminate 155 if store_proc_dict is not None: 156 store_proc_dict[store_proc_key] = child 157 _ret = poll_process(child, line_callback) if line_callback is not None else child.wait() 158 ret = _ret if not as_proc else child 159 except KeyboardInterrupt: 160 child.send_signal(signal.SIGINT) 161 ret = child.wait() if not as_proc else child 162 finally: 163 if foreground: 164 # we have to mask SIGTTOU because tcsetpgrp 165 # raises SIGTTOU to all current background 166 # process group members (i.e. us) when switching tty's pgrp 167 # it we didn't do that, we'd get SIGSTOP'd 168 hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN) 169 # make us tty's foreground again 170 try: 171 os.tcsetpgrp(sys.stdin.fileno(), old_pgrp) 172 except Exception as e: 173 pass 174 # now restore the handler 175 signal.signal(signal.SIGTTOU, hdlr) 176 # restore terminal attributes 177 if termios: 178 termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr) 179 180 return ret
Original foreground solution found here: https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
Parameters
- *args:: The sysargs to execute.
foreground (bool, default False): If
True
, execute the process as a foreground process that passes Ctrl-C to children. From the original post: The "correct" way of spawning a new subprocess: signals like C-c must only go to the child process, and not to this python.Some side-info about "how ctrl-c works": https://unix.stackexchange.com/a/149756/1321
- as_proc (bool, default False):
If
True
, return thesubprocess.Popen
object. - line_callback (Optional[Callable[[str], Any]], default None):
If provided, poll the process and execute the callback when
readline()
gets new text. - store_proc_dict (Optional[Dict[str, Any]], default None):
If provided, store the
subprocess.Popen
object under the keystore_proc_key
. Useful for accessing the process while it is polling in another thread. - store_proc_key (str, default 'child_process'):
If
store_proc_dict
is provided, store the process in the dictionary under this key. - kw (Any):
Additional keyword arguments to pass to
subprocess.Popen
.
Returns
- Either an int for the return code or a
subprocess.Popen
object.
def
poll_process( proc: subprocess.Popen, line_callback: Callable[[bytes], Any], timeout_seconds: Union[int, float, NoneType] = None, timeout_callback: Optional[Callable[[Any], Any]] = None, timeout_callback_args: Optional[Tuple[Any]] = None, timeout_callback_kwargs: Optional[Dict[str, Any]] = None) -> int:
183def poll_process( 184 proc: subprocess.Popen, 185 line_callback: Callable[[bytes], Any], 186 timeout_seconds: Union[int, float, None] = None, 187 timeout_callback: Optional[Callable[[Any], Any]] = None, 188 timeout_callback_args: Optional[Tuple[Any]] = None, 189 timeout_callback_kwargs: Optional[Dict[str, Any]] = None, 190) -> int: 191 """ 192 Poll a process and execute a callback function for each line printed to the process's `stdout`. 193 """ 194 from meerschaum.utils.threading import Timer 195 from meerschaum.utils.warnings import warn 196 197 def timeout_handler(): 198 nonlocal timeout_callback_args, timeout_callback_kwargs 199 try: 200 if platform.system() != 'Windows': 201 ### The process being killed may have children. 202 os.killpg(os.getpgid(proc.pid), signal.SIGKILL) 203 else: 204 proc.send_signal(signal.CTRL_BREAK_EVENT) 205 proc.terminate() 206 except Exception as e: 207 warn(f"Failed to kill process:\n{e}") 208 if timeout_callback_args is None: 209 timeout_callback_args = [] 210 if timeout_callback_kwargs is None: 211 timeout_callback_kwargs = {} 212 timeout_callback(*timeout_callback_args, **timeout_callback_kwargs) 213 214 if timeout_seconds is not None: 215 watchdog_thread = Timer(timeout_seconds, timeout_handler) 216 watchdog_thread.daemon = True 217 watchdog_thread.start() 218 219 while proc.poll() is None: 220 line = proc.stdout.readline() 221 line_callback(line) 222 223 if timeout_seconds is not None: 224 watchdog_thread.cancel() 225 226 return proc.poll()
Poll a process and execute a callback function for each line printed to the process's stdout
.