meerschaum.utils.process

Custom process-handling functions. See meerschaum.utils.pool for multiprocessing and meerschaum.utils.threading for threads.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Custom process-handling functions.
  7See `meerschaum.utils.pool` for multiprocessing and
  8`meerschaum.utils.threading` for threads.
  9"""
 10
 11from __future__ import annotations
 12
 13import os
 14import signal
 15import subprocess
 16import sys
 17import platform
 18
 19import meerschaum as mrsm
 20from meerschaum.utils.typing import Union, Optional, Any, Callable, Dict, Tuple
 21from meerschaum._internal.static import STATIC_CONFIG
 22
 23_child_processes = []
 24def signal_handler(sig, frame):
 25    for child in _child_processes:
 26        try:
 27            child.send_signal(sig)
 28            child.wait()
 29        except Exception:
 30            pass
 31
 32def run_process(
 33    *args,
 34    foreground: bool = False,
 35    as_proc: bool = False,
 36    line_callback: Optional[Callable[[bytes], Any]] = None,
 37    store_proc_dict: Optional[Dict[str, Any]] = None,
 38    store_proc_key: str = 'child_process',
 39    capture_output: bool = False,
 40    **kw: Any
 41) -> Union[int, subprocess.Popen]:
 42    """Original foreground solution found here:
 43    https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
 44
 45    Parameters
 46    ----------
 47    *args:
 48        The sysargs to execute.
 49
 50    foreground: bool, default False
 51        If `True`, execute the process as a foreground process that passes Ctrl-C to children.
 52        From the original post:
 53        The "correct" way of spawning a new subprocess:
 54        signals like C-c must only go
 55        to the child process, and not to this python.
 56        
 57        Some side-info about "how ctrl-c works":
 58        https://unix.stackexchange.com/a/149756/1321
 59
 60    as_proc: bool, default False
 61        If `True`, return the `subprocess.Popen` object.
 62
 63    line_callback: Optional[Callable[[str], Any]], default None
 64        If provided, poll the process and execute the callback when `readline()` gets new text.
 65
 66    store_proc_dict: Optional[Dict[str, Any]], default None
 67        If provided, store the `subprocess.Popen` object under the key `store_proc_key`.
 68        Useful for accessing the process while it is polling in another thread.
 69
 70    store_proc_key: str, default 'child_process'
 71        If `store_proc_dict` is provided, store the process in the dictionary under this key.
 72
 73    kw: Any
 74        Additional keyword arguments to pass to `subprocess.Popen`.
 75
 76    Returns
 77    -------
 78    Either an int for the return code or a `subprocess.Popen` object.
 79    """
 80    try:
 81        import termios
 82    except ImportError:
 83        termios = None
 84
 85    if platform.system() == 'Windows':
 86        foreground = False
 87
 88    def print_line(line):
 89        sys.stdout.write(line.decode('utf-8'))
 90        sys.stdout.flush()
 91
 92  
 93    if capture_output or line_callback is not None:
 94        kw['stdout'] = subprocess.PIPE
 95        kw['stderr'] = subprocess.STDOUT
 96    elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']):
 97        kw['stdout'] = subprocess.PIPE
 98        kw['stderr'] = subprocess.STDOUT
 99        if line_callback is None:
100            line_callback = print_line
101
102    if 'env' not in kw:
103        kw['env'] = os.environ
104
105    user_preexec_fn = kw.get("preexec_fn", None)
106
107    if foreground:
108        try:
109            old_pgrp = os.tcgetpgrp(sys.stdin.fileno())
110        except Exception as e:
111            termios = None
112        if termios:
113            try:
114                old_attr = termios.tcgetattr(sys.stdin.fileno())
115            except Exception as e:
116                termios = None
117
118    def new_pgid():
119        if user_preexec_fn:
120            user_preexec_fn()
121
122        # set a new process group id
123        os.setpgid(os.getpid(), os.getpid())
124
125        # generally, the child process should stop itself
126        # before exec so the parent can set its new pgid.
127        # (setting pgid has to be done before the child execs).
128        # however, Python 'guarantee' that `preexec_fn`
129        # is run before `Popen` returns.
130        # this is because `Popen` waits for the closure of
131        # the error relay pipe '`errpipe_write`',
132        # which happens at child's exec.
133        # this is also the reason the child can't stop itself
134        # in Python's `Popen`, since the `Popen` call would never
135        # terminate then.
136        # `os.kill(os.getpid(), signal.SIGSTOP)`
137
138    if foreground:
139        kw['preexec_fn'] = new_pgid
140
141    try:
142        child = subprocess.Popen(*args, **kw)
143        _child_processes.append(child)
144
145        # we can't set the process group id from the parent since the child
146        # will already have exec'd. and we can't SIGSTOP it before exec,
147        # see above.
148        # `os.setpgid(child.pid, child.pid)`
149
150        if foreground:
151            # set the child's process group as new foreground
152            try:
153                os.tcsetpgrp(sys.stdin.fileno(), child.pid)
154            except Exception as e:
155                pass
156            # revive the child,
157            # because it may have been stopped due to SIGTTOU or
158            # SIGTTIN when it tried using stdout/stdin
159            # after setpgid was called, and before we made it
160            # forward process by tcsetpgrp.
161            os.kill(child.pid, signal.SIGCONT)
162
163        # wait for the child to terminate
164        if store_proc_dict is not None:
165            store_proc_dict[store_proc_key] = child
166        _ret = poll_process(child, line_callback) if line_callback is not None else child.wait()
167        ret = _ret if not as_proc else child
168    except KeyboardInterrupt:
169        child.send_signal(signal.SIGINT)
170        ret = child.wait() if not as_proc else child
171    finally:
172        if foreground:
173            # we have to mask SIGTTOU because tcsetpgrp
174            # raises SIGTTOU to all current background
175            # process group members (i.e. us) when switching tty's pgrp
176            # it we didn't do that, we'd get SIGSTOP'd
177            hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN)
178            # make us tty's foreground again
179            try:
180                os.tcsetpgrp(sys.stdin.fileno(), old_pgrp)
181            except Exception:
182                pass
183            # now restore the handler
184            signal.signal(signal.SIGTTOU, hdlr)
185            # restore terminal attributes
186            if termios:
187                termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr)
188
189    return ret
190
191
192def poll_process(
193    proc: subprocess.Popen,
194    line_callback: Callable[[bytes], Any],
195    timeout_seconds: Union[int, float, None] = None,
196    timeout_callback: Optional[Callable[[Any], Any]] = None,
197    timeout_callback_args: Optional[Tuple[Any]] = None,
198    timeout_callback_kwargs: Optional[Dict[str, Any]] = None,
199) -> int:
200    """
201    Poll a process and execute a callback function for each line printed to the process's `stdout`.
202    """
203    from meerschaum.utils.threading import Timer
204    from meerschaum.utils.warnings import warn
205
206    def timeout_handler():
207        nonlocal timeout_callback_args, timeout_callback_kwargs
208        try:
209            if platform.system() != 'Windows':
210                ### The process being killed may have children.
211                os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
212            else:
213                proc.send_signal(signal.CTRL_BREAK_EVENT)
214            proc.terminate()
215        except Exception as e:
216            warn(f"Failed to kill process:\n{e}")
217        if timeout_callback_args is None:
218            timeout_callback_args = []
219        if timeout_callback_kwargs is None:
220            timeout_callback_kwargs = {}
221        timeout_callback(*timeout_callback_args, **timeout_callback_kwargs)
222
223    if timeout_seconds is not None:
224        watchdog_thread = Timer(timeout_seconds, timeout_handler)
225        watchdog_thread.daemon = True
226        watchdog_thread.start()
227
228    while proc.poll() is None:
229        line = proc.stdout.readline()
230        line_callback(line)
231
232    if timeout_seconds is not None:
233        watchdog_thread.cancel()
234
235    return proc.poll()
236
237
238def _stop_process(
239    proc: subprocess.Popen,
240    timeout_seconds: int = 8,
241):
242    """
243    Stop a `subproccess.Popen` object.
244    """
245    proc.terminate()
246    try:
247        proc.wait(timeout=timeout_seconds)
248    except subprocess.TimeoutExpired:
249        proc.kill()
def signal_handler(sig, frame):
25def signal_handler(sig, frame):
26    for child in _child_processes:
27        try:
28            child.send_signal(sig)
29            child.wait()
30        except Exception:
31            pass
def run_process( *args, foreground: bool = False, as_proc: bool = False, line_callback: Optional[Callable[[bytes], Any]] = None, store_proc_dict: Optional[Dict[str, Any]] = None, store_proc_key: str = 'child_process', capture_output: bool = False, **kw: Any) -> Union[int, subprocess.Popen]:
 33def run_process(
 34    *args,
 35    foreground: bool = False,
 36    as_proc: bool = False,
 37    line_callback: Optional[Callable[[bytes], Any]] = None,
 38    store_proc_dict: Optional[Dict[str, Any]] = None,
 39    store_proc_key: str = 'child_process',
 40    capture_output: bool = False,
 41    **kw: Any
 42) -> Union[int, subprocess.Popen]:
 43    """Original foreground solution found here:
 44    https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
 45
 46    Parameters
 47    ----------
 48    *args:
 49        The sysargs to execute.
 50
 51    foreground: bool, default False
 52        If `True`, execute the process as a foreground process that passes Ctrl-C to children.
 53        From the original post:
 54        The "correct" way of spawning a new subprocess:
 55        signals like C-c must only go
 56        to the child process, and not to this python.
 57        
 58        Some side-info about "how ctrl-c works":
 59        https://unix.stackexchange.com/a/149756/1321
 60
 61    as_proc: bool, default False
 62        If `True`, return the `subprocess.Popen` object.
 63
 64    line_callback: Optional[Callable[[str], Any]], default None
 65        If provided, poll the process and execute the callback when `readline()` gets new text.
 66
 67    store_proc_dict: Optional[Dict[str, Any]], default None
 68        If provided, store the `subprocess.Popen` object under the key `store_proc_key`.
 69        Useful for accessing the process while it is polling in another thread.
 70
 71    store_proc_key: str, default 'child_process'
 72        If `store_proc_dict` is provided, store the process in the dictionary under this key.
 73
 74    kw: Any
 75        Additional keyword arguments to pass to `subprocess.Popen`.
 76
 77    Returns
 78    -------
 79    Either an int for the return code or a `subprocess.Popen` object.
 80    """
 81    try:
 82        import termios
 83    except ImportError:
 84        termios = None
 85
 86    if platform.system() == 'Windows':
 87        foreground = False
 88
 89    def print_line(line):
 90        sys.stdout.write(line.decode('utf-8'))
 91        sys.stdout.flush()
 92
 93  
 94    if capture_output or line_callback is not None:
 95        kw['stdout'] = subprocess.PIPE
 96        kw['stderr'] = subprocess.STDOUT
 97    elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']):
 98        kw['stdout'] = subprocess.PIPE
 99        kw['stderr'] = subprocess.STDOUT
100        if line_callback is None:
101            line_callback = print_line
102
103    if 'env' not in kw:
104        kw['env'] = os.environ
105
106    user_preexec_fn = kw.get("preexec_fn", None)
107
108    if foreground:
109        try:
110            old_pgrp = os.tcgetpgrp(sys.stdin.fileno())
111        except Exception as e:
112            termios = None
113        if termios:
114            try:
115                old_attr = termios.tcgetattr(sys.stdin.fileno())
116            except Exception as e:
117                termios = None
118
119    def new_pgid():
120        if user_preexec_fn:
121            user_preexec_fn()
122
123        # set a new process group id
124        os.setpgid(os.getpid(), os.getpid())
125
126        # generally, the child process should stop itself
127        # before exec so the parent can set its new pgid.
128        # (setting pgid has to be done before the child execs).
129        # however, Python 'guarantee' that `preexec_fn`
130        # is run before `Popen` returns.
131        # this is because `Popen` waits for the closure of
132        # the error relay pipe '`errpipe_write`',
133        # which happens at child's exec.
134        # this is also the reason the child can't stop itself
135        # in Python's `Popen`, since the `Popen` call would never
136        # terminate then.
137        # `os.kill(os.getpid(), signal.SIGSTOP)`
138
139    if foreground:
140        kw['preexec_fn'] = new_pgid
141
142    try:
143        child = subprocess.Popen(*args, **kw)
144        _child_processes.append(child)
145
146        # we can't set the process group id from the parent since the child
147        # will already have exec'd. and we can't SIGSTOP it before exec,
148        # see above.
149        # `os.setpgid(child.pid, child.pid)`
150
151        if foreground:
152            # set the child's process group as new foreground
153            try:
154                os.tcsetpgrp(sys.stdin.fileno(), child.pid)
155            except Exception as e:
156                pass
157            # revive the child,
158            # because it may have been stopped due to SIGTTOU or
159            # SIGTTIN when it tried using stdout/stdin
160            # after setpgid was called, and before we made it
161            # forward process by tcsetpgrp.
162            os.kill(child.pid, signal.SIGCONT)
163
164        # wait for the child to terminate
165        if store_proc_dict is not None:
166            store_proc_dict[store_proc_key] = child
167        _ret = poll_process(child, line_callback) if line_callback is not None else child.wait()
168        ret = _ret if not as_proc else child
169    except KeyboardInterrupt:
170        child.send_signal(signal.SIGINT)
171        ret = child.wait() if not as_proc else child
172    finally:
173        if foreground:
174            # we have to mask SIGTTOU because tcsetpgrp
175            # raises SIGTTOU to all current background
176            # process group members (i.e. us) when switching tty's pgrp
177            # it we didn't do that, we'd get SIGSTOP'd
178            hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN)
179            # make us tty's foreground again
180            try:
181                os.tcsetpgrp(sys.stdin.fileno(), old_pgrp)
182            except Exception:
183                pass
184            # now restore the handler
185            signal.signal(signal.SIGTTOU, hdlr)
186            # restore terminal attributes
187            if termios:
188                termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr)
189
190    return ret

Original foreground solution found here: https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess

Parameters
  • *args:: The sysargs to execute.
  • foreground (bool, default False): If True, execute the process as a foreground process that passes Ctrl-C to children. From the original post: The "correct" way of spawning a new subprocess: signals like C-c must only go to the child process, and not to this python.

    Some side-info about "how ctrl-c works": https://unix.stackexchange.com/a/149756/1321

  • as_proc (bool, default False): If True, return the subprocess.Popen object.
  • line_callback (Optional[Callable[[str], Any]], default None): If provided, poll the process and execute the callback when readline() gets new text.
  • store_proc_dict (Optional[Dict[str, Any]], default None): If provided, store the subprocess.Popen object under the key store_proc_key. Useful for accessing the process while it is polling in another thread.
  • store_proc_key (str, default 'child_process'): If store_proc_dict is provided, store the process in the dictionary under this key.
  • kw (Any): Additional keyword arguments to pass to subprocess.Popen.
Returns
  • Either an int for the return code or a subprocess.Popen object.
def poll_process( proc: subprocess.Popen, line_callback: Callable[[bytes], Any], timeout_seconds: Union[int, float, NoneType] = None, timeout_callback: Optional[Callable[[Any], Any]] = None, timeout_callback_args: Optional[Tuple[Any]] = None, timeout_callback_kwargs: Optional[Dict[str, Any]] = None) -> int:
193def poll_process(
194    proc: subprocess.Popen,
195    line_callback: Callable[[bytes], Any],
196    timeout_seconds: Union[int, float, None] = None,
197    timeout_callback: Optional[Callable[[Any], Any]] = None,
198    timeout_callback_args: Optional[Tuple[Any]] = None,
199    timeout_callback_kwargs: Optional[Dict[str, Any]] = None,
200) -> int:
201    """
202    Poll a process and execute a callback function for each line printed to the process's `stdout`.
203    """
204    from meerschaum.utils.threading import Timer
205    from meerschaum.utils.warnings import warn
206
207    def timeout_handler():
208        nonlocal timeout_callback_args, timeout_callback_kwargs
209        try:
210            if platform.system() != 'Windows':
211                ### The process being killed may have children.
212                os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
213            else:
214                proc.send_signal(signal.CTRL_BREAK_EVENT)
215            proc.terminate()
216        except Exception as e:
217            warn(f"Failed to kill process:\n{e}")
218        if timeout_callback_args is None:
219            timeout_callback_args = []
220        if timeout_callback_kwargs is None:
221            timeout_callback_kwargs = {}
222        timeout_callback(*timeout_callback_args, **timeout_callback_kwargs)
223
224    if timeout_seconds is not None:
225        watchdog_thread = Timer(timeout_seconds, timeout_handler)
226        watchdog_thread.daemon = True
227        watchdog_thread.start()
228
229    while proc.poll() is None:
230        line = proc.stdout.readline()
231        line_callback(line)
232
233    if timeout_seconds is not None:
234        watchdog_thread.cancel()
235
236    return proc.poll()

Poll a process and execute a callback function for each line printed to the process's stdout.