meerschaum.utils.process

Custom process-handling functions. See meerschaum.utils.pool for multiprocessing and meerschaum.utils.threading for threads.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Custom process-handling functions.
  7See `meerschaum.utils.pool` for multiprocessing and
  8`meerschaum.utils.threading` for threads.
  9"""
 10
 11from __future__ import annotations
 12
 13import os
 14import signal
 15import subprocess
 16import sys
 17import platform
 18
 19import meerschaum as mrsm
 20from meerschaum.utils.typing import Union, Optional, Any, Callable, Dict, Tuple
 21from meerschaum.config.static import STATIC_CONFIG
 22
 23_child_processes = []
 24def signal_handler(sig, frame):
 25    for child in _child_processes:
 26        try:
 27            child.send_signal(sig)
 28            child.wait()
 29        except Exception:
 30            pass
 31
 32def run_process(
 33    *args,
 34    foreground: bool = False,
 35    as_proc: bool = False,
 36    line_callback: Optional[Callable[[bytes], Any]] = None,
 37    store_proc_dict: Optional[Dict[str, Any]] = None,
 38    store_proc_key: str = 'child_process',
 39    capture_output: bool = False,
 40    **kw: Any
 41) -> Union[int, subprocess.Popen]:
 42    """Original foreground solution found here:
 43    https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
 44
 45    Parameters
 46    ----------
 47    *args:
 48        The sysargs to execute.
 49
 50    foreground: bool, default False
 51        If `True`, execute the process as a foreground process that passes Ctrl-C to children.
 52        From the original post:
 53        The "correct" way of spawning a new subprocess:
 54        signals like C-c must only go
 55        to the child process, and not to this python.
 56        
 57        Some side-info about "how ctrl-c works":
 58        https://unix.stackexchange.com/a/149756/1321
 59
 60    as_proc: bool, default False
 61        If `True`, return the `subprocess.Popen` object.
 62
 63    line_callback: Optional[Callable[[str], Any]], default None
 64        If provided, poll the process and execute the callback when `readline()` gets new text.
 65
 66    store_proc_dict: Optional[Dict[str, Any]], default None
 67        If provided, store the `subprocess.Popen` object under the key `store_proc_key`.
 68        Useful for accessing the process while it is polling in another thread.
 69
 70    store_proc_key: str, default 'child_process'
 71        If `store_proc_dict` is provided, store the process in the dictionary under this key.
 72
 73    kw: Any
 74        Additional keyword arguments to pass to `subprocess.Popen`.
 75
 76    Returns
 77    -------
 78    Either an int for the return code or a `subprocess.Popen` object.
 79    """
 80    try:
 81        import termios
 82    except ImportError:
 83        termios = None
 84
 85    if platform.system() == 'Windows':
 86        foreground = False
 87
 88    def print_line(line):
 89        sys.stdout.write(line.decode('utf-8'))
 90        sys.stdout.flush()
 91
 92  
 93    if capture_output or line_callback is not None:
 94        kw['stdout'] = subprocess.PIPE
 95        kw['stderr'] = subprocess.STDOUT
 96    elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']):
 97        kw['stdout'] = subprocess.PIPE
 98        kw['stderr'] = subprocess.STDOUT
 99        if line_callback is None:
100            line_callback = print_line
101
102    if 'env' not in kw:
103        kw['env'] = os.environ
104
105    user_preexec_fn = kw.get("preexec_fn", None)
106
107    if foreground:
108        try:
109            old_pgrp = os.tcgetpgrp(sys.stdin.fileno())
110        except Exception as e:
111            termios = None
112        if termios:
113            try:
114                old_attr = termios.tcgetattr(sys.stdin.fileno())
115            except Exception as e:
116                termios = None
117
118    def new_pgid():
119        if user_preexec_fn:
120            user_preexec_fn()
121
122        # set a new process group id
123        os.setpgid(os.getpid(), os.getpid())
124
125        # generally, the child process should stop itself
126        # before exec so the parent can set its new pgid.
127        # (setting pgid has to be done before the child execs).
128        # however, Python 'guarantee' that `preexec_fn`
129        # is run before `Popen` returns.
130        # this is because `Popen` waits for the closure of
131        # the error relay pipe '`errpipe_write`',
132        # which happens at child's exec.
133        # this is also the reason the child can't stop itself
134        # in Python's `Popen`, since the `Popen` call would never
135        # terminate then.
136        # `os.kill(os.getpid(), signal.SIGSTOP)`
137
138    if foreground:
139        kw['preexec_fn'] = new_pgid
140
141    try:
142        child = subprocess.Popen(*args, **kw)
143        _child_processes.append(child)
144
145        # we can't set the process group id from the parent since the child
146        # will already have exec'd. and we can't SIGSTOP it before exec,
147        # see above.
148        # `os.setpgid(child.pid, child.pid)`
149
150        if foreground:
151            # set the child's process group as new foreground
152            try:
153                os.tcsetpgrp(sys.stdin.fileno(), child.pid)
154            except Exception as e:
155                pass
156            # revive the child,
157            # because it may have been stopped due to SIGTTOU or
158            # SIGTTIN when it tried using stdout/stdin
159            # after setpgid was called, and before we made it
160            # forward process by tcsetpgrp.
161            os.kill(child.pid, signal.SIGCONT)
162
163        # wait for the child to terminate
164        if store_proc_dict is not None:
165            store_proc_dict[store_proc_key] = child
166        _ret = poll_process(child, line_callback) if line_callback is not None else child.wait()
167        ret = _ret if not as_proc else child
168    except KeyboardInterrupt:
169        child.send_signal(signal.SIGINT)
170        ret = child.wait() if not as_proc else child
171    finally:
172        if foreground:
173            # we have to mask SIGTTOU because tcsetpgrp
174            # raises SIGTTOU to all current background
175            # process group members (i.e. us) when switching tty's pgrp
176            # it we didn't do that, we'd get SIGSTOP'd
177            hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN)
178            # make us tty's foreground again
179            try:
180                os.tcsetpgrp(sys.stdin.fileno(), old_pgrp)
181            except Exception as e:
182                pass
183            # now restore the handler
184            signal.signal(signal.SIGTTOU, hdlr)
185            # restore terminal attributes
186            if termios:
187                termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr)
188
189    return ret
190
191
192def poll_process(
193    proc: subprocess.Popen,
194    line_callback: Callable[[bytes], Any],
195    timeout_seconds: Union[int, float, None] = None,
196    timeout_callback: Optional[Callable[[Any], Any]] = None,
197    timeout_callback_args: Optional[Tuple[Any]] = None,
198    timeout_callback_kwargs: Optional[Dict[str, Any]] = None,
199) -> int:
200    """
201    Poll a process and execute a callback function for each line printed to the process's `stdout`.
202    """
203    from meerschaum.utils.threading import Timer
204    from meerschaum.utils.warnings import warn
205
206    def timeout_handler():
207        nonlocal timeout_callback_args, timeout_callback_kwargs
208        try:
209            if platform.system() != 'Windows':
210                ### The process being killed may have children.
211                os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
212            else:
213                proc.send_signal(signal.CTRL_BREAK_EVENT)
214            proc.terminate()
215        except Exception as e:
216            warn(f"Failed to kill process:\n{e}")
217        if timeout_callback_args is None:
218            timeout_callback_args = []
219        if timeout_callback_kwargs is None:
220            timeout_callback_kwargs = {}
221        timeout_callback(*timeout_callback_args, **timeout_callback_kwargs)
222
223    if timeout_seconds is not None:
224        watchdog_thread = Timer(timeout_seconds, timeout_handler)
225        watchdog_thread.daemon = True
226        watchdog_thread.start()
227
228    while proc.poll() is None:
229        line = proc.stdout.readline()
230        line_callback(line)
231
232    if timeout_seconds is not None:
233        watchdog_thread.cancel()
234
235    return proc.poll()
236
237
238def _stop_process(
239    proc: subprocess.Popen,
240    timeout_seconds: int = 8,
241):
242    """
243    Stop a `subproccess.Popen` object.
244    """
245    proc.terminate()
246    try:
247        proc.wait(timeout=timeout_seconds)
248    except subprocess.TimeoutExpired:
249        proc.kill()
def signal_handler(sig, frame):
25def signal_handler(sig, frame):
26    for child in _child_processes:
27        try:
28            child.send_signal(sig)
29            child.wait()
30        except Exception:
31            pass
def run_process( *args, foreground: bool = False, as_proc: bool = False, line_callback: Optional[Callable[[bytes], Any]] = None, store_proc_dict: Optional[Dict[str, Any]] = None, store_proc_key: str = 'child_process', capture_output: bool = False, **kw: Any) -> Union[int, subprocess.Popen]:
 33def run_process(
 34    *args,
 35    foreground: bool = False,
 36    as_proc: bool = False,
 37    line_callback: Optional[Callable[[bytes], Any]] = None,
 38    store_proc_dict: Optional[Dict[str, Any]] = None,
 39    store_proc_key: str = 'child_process',
 40    capture_output: bool = False,
 41    **kw: Any
 42) -> Union[int, subprocess.Popen]:
 43    """Original foreground solution found here:
 44    https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
 45
 46    Parameters
 47    ----------
 48    *args:
 49        The sysargs to execute.
 50
 51    foreground: bool, default False
 52        If `True`, execute the process as a foreground process that passes Ctrl-C to children.
 53        From the original post:
 54        The "correct" way of spawning a new subprocess:
 55        signals like C-c must only go
 56        to the child process, and not to this python.
 57        
 58        Some side-info about "how ctrl-c works":
 59        https://unix.stackexchange.com/a/149756/1321
 60
 61    as_proc: bool, default False
 62        If `True`, return the `subprocess.Popen` object.
 63
 64    line_callback: Optional[Callable[[str], Any]], default None
 65        If provided, poll the process and execute the callback when `readline()` gets new text.
 66
 67    store_proc_dict: Optional[Dict[str, Any]], default None
 68        If provided, store the `subprocess.Popen` object under the key `store_proc_key`.
 69        Useful for accessing the process while it is polling in another thread.
 70
 71    store_proc_key: str, default 'child_process'
 72        If `store_proc_dict` is provided, store the process in the dictionary under this key.
 73
 74    kw: Any
 75        Additional keyword arguments to pass to `subprocess.Popen`.
 76
 77    Returns
 78    -------
 79    Either an int for the return code or a `subprocess.Popen` object.
 80    """
 81    try:
 82        import termios
 83    except ImportError:
 84        termios = None
 85
 86    if platform.system() == 'Windows':
 87        foreground = False
 88
 89    def print_line(line):
 90        sys.stdout.write(line.decode('utf-8'))
 91        sys.stdout.flush()
 92
 93  
 94    if capture_output or line_callback is not None:
 95        kw['stdout'] = subprocess.PIPE
 96        kw['stderr'] = subprocess.STDOUT
 97    elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']):
 98        kw['stdout'] = subprocess.PIPE
 99        kw['stderr'] = subprocess.STDOUT
100        if line_callback is None:
101            line_callback = print_line
102
103    if 'env' not in kw:
104        kw['env'] = os.environ
105
106    user_preexec_fn = kw.get("preexec_fn", None)
107
108    if foreground:
109        try:
110            old_pgrp = os.tcgetpgrp(sys.stdin.fileno())
111        except Exception as e:
112            termios = None
113        if termios:
114            try:
115                old_attr = termios.tcgetattr(sys.stdin.fileno())
116            except Exception as e:
117                termios = None
118
119    def new_pgid():
120        if user_preexec_fn:
121            user_preexec_fn()
122
123        # set a new process group id
124        os.setpgid(os.getpid(), os.getpid())
125
126        # generally, the child process should stop itself
127        # before exec so the parent can set its new pgid.
128        # (setting pgid has to be done before the child execs).
129        # however, Python 'guarantee' that `preexec_fn`
130        # is run before `Popen` returns.
131        # this is because `Popen` waits for the closure of
132        # the error relay pipe '`errpipe_write`',
133        # which happens at child's exec.
134        # this is also the reason the child can't stop itself
135        # in Python's `Popen`, since the `Popen` call would never
136        # terminate then.
137        # `os.kill(os.getpid(), signal.SIGSTOP)`
138
139    if foreground:
140        kw['preexec_fn'] = new_pgid
141
142    try:
143        child = subprocess.Popen(*args, **kw)
144        _child_processes.append(child)
145
146        # we can't set the process group id from the parent since the child
147        # will already have exec'd. and we can't SIGSTOP it before exec,
148        # see above.
149        # `os.setpgid(child.pid, child.pid)`
150
151        if foreground:
152            # set the child's process group as new foreground
153            try:
154                os.tcsetpgrp(sys.stdin.fileno(), child.pid)
155            except Exception as e:
156                pass
157            # revive the child,
158            # because it may have been stopped due to SIGTTOU or
159            # SIGTTIN when it tried using stdout/stdin
160            # after setpgid was called, and before we made it
161            # forward process by tcsetpgrp.
162            os.kill(child.pid, signal.SIGCONT)
163
164        # wait for the child to terminate
165        if store_proc_dict is not None:
166            store_proc_dict[store_proc_key] = child
167        _ret = poll_process(child, line_callback) if line_callback is not None else child.wait()
168        ret = _ret if not as_proc else child
169    except KeyboardInterrupt:
170        child.send_signal(signal.SIGINT)
171        ret = child.wait() if not as_proc else child
172    finally:
173        if foreground:
174            # we have to mask SIGTTOU because tcsetpgrp
175            # raises SIGTTOU to all current background
176            # process group members (i.e. us) when switching tty's pgrp
177            # it we didn't do that, we'd get SIGSTOP'd
178            hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN)
179            # make us tty's foreground again
180            try:
181                os.tcsetpgrp(sys.stdin.fileno(), old_pgrp)
182            except Exception as e:
183                pass
184            # now restore the handler
185            signal.signal(signal.SIGTTOU, hdlr)
186            # restore terminal attributes
187            if termios:
188                termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr)
189
190    return ret

Original foreground solution found here: https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess

Parameters
  • *args:: The sysargs to execute.
  • foreground (bool, default False): If True, execute the process as a foreground process that passes Ctrl-C to children. From the original post: The "correct" way of spawning a new subprocess: signals like C-c must only go to the child process, and not to this python.

    Some side-info about "how ctrl-c works": https://unix.stackexchange.com/a/149756/1321

  • as_proc (bool, default False): If True, return the subprocess.Popen object.
  • line_callback (Optional[Callable[[str], Any]], default None): If provided, poll the process and execute the callback when readline() gets new text.
  • store_proc_dict (Optional[Dict[str, Any]], default None): If provided, store the subprocess.Popen object under the key store_proc_key. Useful for accessing the process while it is polling in another thread.
  • store_proc_key (str, default 'child_process'): If store_proc_dict is provided, store the process in the dictionary under this key.
  • kw (Any): Additional keyword arguments to pass to subprocess.Popen.
Returns
  • Either an int for the return code or a subprocess.Popen object.
def poll_process( proc: subprocess.Popen, line_callback: Callable[[bytes], Any], timeout_seconds: Union[int, float, NoneType] = None, timeout_callback: Optional[Callable[[Any], Any]] = None, timeout_callback_args: Optional[Tuple[Any]] = None, timeout_callback_kwargs: Optional[Dict[str, Any]] = None) -> int:
193def poll_process(
194    proc: subprocess.Popen,
195    line_callback: Callable[[bytes], Any],
196    timeout_seconds: Union[int, float, None] = None,
197    timeout_callback: Optional[Callable[[Any], Any]] = None,
198    timeout_callback_args: Optional[Tuple[Any]] = None,
199    timeout_callback_kwargs: Optional[Dict[str, Any]] = None,
200) -> int:
201    """
202    Poll a process and execute a callback function for each line printed to the process's `stdout`.
203    """
204    from meerschaum.utils.threading import Timer
205    from meerschaum.utils.warnings import warn
206
207    def timeout_handler():
208        nonlocal timeout_callback_args, timeout_callback_kwargs
209        try:
210            if platform.system() != 'Windows':
211                ### The process being killed may have children.
212                os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
213            else:
214                proc.send_signal(signal.CTRL_BREAK_EVENT)
215            proc.terminate()
216        except Exception as e:
217            warn(f"Failed to kill process:\n{e}")
218        if timeout_callback_args is None:
219            timeout_callback_args = []
220        if timeout_callback_kwargs is None:
221            timeout_callback_kwargs = {}
222        timeout_callback(*timeout_callback_args, **timeout_callback_kwargs)
223
224    if timeout_seconds is not None:
225        watchdog_thread = Timer(timeout_seconds, timeout_handler)
226        watchdog_thread.daemon = True
227        watchdog_thread.start()
228
229    while proc.poll() is None:
230        line = proc.stdout.readline()
231        line_callback(line)
232
233    if timeout_seconds is not None:
234        watchdog_thread.cancel()
235
236    return proc.poll()

Poll a process and execute a callback function for each line printed to the process's stdout.