meerschaum.utils.process

Custom process-handling functions. See meerschaum.utils.pool for multiprocessing and meerschaum.utils.threading for threads.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Custom process-handling functions.
  7See `meerschaum.utils.pool` for multiprocessing and
  8`meerschaum.utils.threading` for threads.
  9"""
 10
 11from __future__ import annotations
 12import os, signal, subprocess, sys, platform, traceback
 13from meerschaum.utils.typing import Union, Optional, Any, Callable, Dict, Tuple
 14from meerschaum.config.static import STATIC_CONFIG
 15
 16_child_processes = []
 17def signal_handler(sig, frame):
 18    for child in _child_processes:
 19        child.send_signal(sig)
 20        child.wait()
 21
 22def run_process(
 23    *args,
 24    foreground: bool = False,
 25    as_proc: bool = False,
 26    line_callback: Optional[Callable[[bytes], Any]] = None,
 27    store_proc_dict: Optional[Dict[str, Any]] = None,
 28    store_proc_key: str = 'child_process',
 29    capture_output: bool = False,
 30    **kw: Any
 31) -> Union[int, subprocess.Popen]:
 32    """Original foreground solution found here:
 33    https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
 34
 35    Parameters
 36    ----------
 37    *args:
 38        The sysargs to execute.
 39
 40    foreground: bool, default False
 41        If `True`, execute the process as a foreground process that passes Ctrl-C to children.
 42        From the original post:
 43        The "correct" way of spawning a new subprocess:
 44        signals like C-c must only go
 45        to the child process, and not to this python.
 46        
 47        Some side-info about "how ctrl-c works":
 48        https://unix.stackexchange.com/a/149756/1321
 49
 50    as_proc: bool, default False
 51        If `True`, return the `subprocess.Popen` object.
 52
 53    line_callback: Optional[Callable[[str], Any]], default None
 54        If provided, poll the process and execute the callback when `readline()` gets new text.
 55
 56    store_proc_dict: Optional[Dict[str, Any]], default None
 57        If provided, store the `subprocess.Popen` object under the key `store_proc_key`.
 58        Useful for accessing the process while it is polling in another thread.
 59
 60    store_proc_key: str, default 'child_process'
 61        If `store_proc_dict` is provided, store the process in the dictionary under this key.
 62
 63    kw: Any
 64        Additional keyword arguments to pass to `subprocess.Popen`.
 65
 66    Returns
 67    -------
 68    Either an int for the return code or a `subprocess.Popen` object.
 69    """
 70    try:
 71        import termios
 72    except ImportError:
 73        termios = None
 74
 75    if platform.system() == 'Windows':
 76        foreground = False
 77
 78    def print_line(line):
 79        sys.stdout.write(line.decode('utf-8'))
 80        sys.stdout.flush()
 81
 82  
 83    if capture_output or line_callback is not None:
 84        kw['stdout'] = subprocess.PIPE
 85        kw['stderr'] = subprocess.STDOUT
 86    elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']):
 87        kw['stdout'] = subprocess.PIPE
 88        kw['stderr'] = subprocess.STDOUT
 89        if line_callback is None:
 90            line_callback = print_line
 91
 92    if 'env' not in kw:
 93        kw['env'] = os.environ
 94
 95    user_preexec_fn = kw.get("preexec_fn", None)
 96
 97    if foreground:
 98        try:
 99            old_pgrp = os.tcgetpgrp(sys.stdin.fileno())
100        except Exception as e:
101            termios = None
102        if termios:
103            try:
104                old_attr = termios.tcgetattr(sys.stdin.fileno())
105            except Exception as e:
106                termios = None
107
108    def new_pgid():
109        if user_preexec_fn:
110            user_preexec_fn()
111
112        # set a new process group id
113        os.setpgid(os.getpid(), os.getpid())
114
115        # generally, the child process should stop itself
116        # before exec so the parent can set its new pgid.
117        # (setting pgid has to be done before the child execs).
118        # however, Python 'guarantee' that `preexec_fn`
119        # is run before `Popen` returns.
120        # this is because `Popen` waits for the closure of
121        # the error relay pipe '`errpipe_write`',
122        # which happens at child's exec.
123        # this is also the reason the child can't stop itself
124        # in Python's `Popen`, since the `Popen` call would never
125        # terminate then.
126        # `os.kill(os.getpid(), signal.SIGSTOP)`
127
128    if foreground:
129        kw['preexec_fn'] = new_pgid
130
131    try:
132        child = subprocess.Popen(*args, **kw)
133        _child_processes.append(child)
134
135        # we can't set the process group id from the parent since the child
136        # will already have exec'd. and we can't SIGSTOP it before exec,
137        # see above.
138        # `os.setpgid(child.pid, child.pid)`
139
140        if foreground:
141            # set the child's process group as new foreground
142            try:
143                os.tcsetpgrp(sys.stdin.fileno(), child.pid)
144            except Exception as e:
145                pass
146            # revive the child,
147            # because it may have been stopped due to SIGTTOU or
148            # SIGTTIN when it tried using stdout/stdin
149            # after setpgid was called, and before we made it
150            # forward process by tcsetpgrp.
151            os.kill(child.pid, signal.SIGCONT)
152
153        # wait for the child to terminate
154        if store_proc_dict is not None:
155            store_proc_dict[store_proc_key] = child
156        _ret = poll_process(child, line_callback) if line_callback is not None else child.wait()
157        ret = _ret if not as_proc else child
158    except KeyboardInterrupt:
159        child.send_signal(signal.SIGINT)
160        ret = child.wait() if not as_proc else child
161    finally:
162        if foreground:
163            # we have to mask SIGTTOU because tcsetpgrp
164            # raises SIGTTOU to all current background
165            # process group members (i.e. us) when switching tty's pgrp
166            # it we didn't do that, we'd get SIGSTOP'd
167            hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN)
168            # make us tty's foreground again
169            try:
170                os.tcsetpgrp(sys.stdin.fileno(), old_pgrp)
171            except Exception as e:
172                pass
173            # now restore the handler
174            signal.signal(signal.SIGTTOU, hdlr)
175            # restore terminal attributes
176            if termios:
177                termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr)
178
179    return ret
180
181
182def poll_process(
183    proc: subprocess.Popen,
184    line_callback: Callable[[bytes], Any],
185    timeout_seconds: Union[int, float, None] = None,
186    timeout_callback: Optional[Callable[[Any], Any]] = None,
187    timeout_callback_args: Optional[Tuple[Any]] = None,
188    timeout_callback_kwargs: Optional[Dict[str, Any]] = None,
189) -> int:
190    """
191    Poll a process and execute a callback function for each line printed to the process's `stdout`.
192    """
193    from meerschaum.utils.threading import Timer
194    from meerschaum.utils.warnings import warn
195
196    def timeout_handler():
197        nonlocal timeout_callback_args, timeout_callback_kwargs
198        try:
199            if platform.system() != 'Windows':
200                ### The process being killed may have children.
201                os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
202            else:
203                proc.send_signal(signal.CTRL_BREAK_EVENT)
204            proc.terminate()
205        except Exception as e:
206            warn(f"Failed to kill process:\n{e}")
207        if timeout_callback_args is None:
208            timeout_callback_args = []
209        if timeout_callback_kwargs is None:
210            timeout_callback_kwargs = {}
211        timeout_callback(*timeout_callback_args, **timeout_callback_kwargs)
212
213    if timeout_seconds is not None:
214        watchdog_thread = Timer(timeout_seconds, timeout_handler)
215        watchdog_thread.daemon = True
216        watchdog_thread.start()
217
218    while proc.poll() is None:
219        line = proc.stdout.readline()
220        line_callback(line)
221
222    if timeout_seconds is not None:
223        watchdog_thread.cancel()
224
225    return proc.poll()
def signal_handler(sig, frame):
18def signal_handler(sig, frame):
19    for child in _child_processes:
20        child.send_signal(sig)
21        child.wait()
def run_process( *args, foreground: bool = False, as_proc: bool = False, line_callback: Optional[Callable[[bytes], Any]] = None, store_proc_dict: Optional[Dict[str, Any]] = None, store_proc_key: str = 'child_process', capture_output: bool = False, **kw: Any) -> Union[int, subprocess.Popen]:
 23def run_process(
 24    *args,
 25    foreground: bool = False,
 26    as_proc: bool = False,
 27    line_callback: Optional[Callable[[bytes], Any]] = None,
 28    store_proc_dict: Optional[Dict[str, Any]] = None,
 29    store_proc_key: str = 'child_process',
 30    capture_output: bool = False,
 31    **kw: Any
 32) -> Union[int, subprocess.Popen]:
 33    """Original foreground solution found here:
 34    https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
 35
 36    Parameters
 37    ----------
 38    *args:
 39        The sysargs to execute.
 40
 41    foreground: bool, default False
 42        If `True`, execute the process as a foreground process that passes Ctrl-C to children.
 43        From the original post:
 44        The "correct" way of spawning a new subprocess:
 45        signals like C-c must only go
 46        to the child process, and not to this python.
 47        
 48        Some side-info about "how ctrl-c works":
 49        https://unix.stackexchange.com/a/149756/1321
 50
 51    as_proc: bool, default False
 52        If `True`, return the `subprocess.Popen` object.
 53
 54    line_callback: Optional[Callable[[str], Any]], default None
 55        If provided, poll the process and execute the callback when `readline()` gets new text.
 56
 57    store_proc_dict: Optional[Dict[str, Any]], default None
 58        If provided, store the `subprocess.Popen` object under the key `store_proc_key`.
 59        Useful for accessing the process while it is polling in another thread.
 60
 61    store_proc_key: str, default 'child_process'
 62        If `store_proc_dict` is provided, store the process in the dictionary under this key.
 63
 64    kw: Any
 65        Additional keyword arguments to pass to `subprocess.Popen`.
 66
 67    Returns
 68    -------
 69    Either an int for the return code or a `subprocess.Popen` object.
 70    """
 71    try:
 72        import termios
 73    except ImportError:
 74        termios = None
 75
 76    if platform.system() == 'Windows':
 77        foreground = False
 78
 79    def print_line(line):
 80        sys.stdout.write(line.decode('utf-8'))
 81        sys.stdout.flush()
 82
 83  
 84    if capture_output or line_callback is not None:
 85        kw['stdout'] = subprocess.PIPE
 86        kw['stderr'] = subprocess.STDOUT
 87    elif os.environ.get(STATIC_CONFIG['environment']['daemon_id']):
 88        kw['stdout'] = subprocess.PIPE
 89        kw['stderr'] = subprocess.STDOUT
 90        if line_callback is None:
 91            line_callback = print_line
 92
 93    if 'env' not in kw:
 94        kw['env'] = os.environ
 95
 96    user_preexec_fn = kw.get("preexec_fn", None)
 97
 98    if foreground:
 99        try:
100            old_pgrp = os.tcgetpgrp(sys.stdin.fileno())
101        except Exception as e:
102            termios = None
103        if termios:
104            try:
105                old_attr = termios.tcgetattr(sys.stdin.fileno())
106            except Exception as e:
107                termios = None
108
109    def new_pgid():
110        if user_preexec_fn:
111            user_preexec_fn()
112
113        # set a new process group id
114        os.setpgid(os.getpid(), os.getpid())
115
116        # generally, the child process should stop itself
117        # before exec so the parent can set its new pgid.
118        # (setting pgid has to be done before the child execs).
119        # however, Python 'guarantee' that `preexec_fn`
120        # is run before `Popen` returns.
121        # this is because `Popen` waits for the closure of
122        # the error relay pipe '`errpipe_write`',
123        # which happens at child's exec.
124        # this is also the reason the child can't stop itself
125        # in Python's `Popen`, since the `Popen` call would never
126        # terminate then.
127        # `os.kill(os.getpid(), signal.SIGSTOP)`
128
129    if foreground:
130        kw['preexec_fn'] = new_pgid
131
132    try:
133        child = subprocess.Popen(*args, **kw)
134        _child_processes.append(child)
135
136        # we can't set the process group id from the parent since the child
137        # will already have exec'd. and we can't SIGSTOP it before exec,
138        # see above.
139        # `os.setpgid(child.pid, child.pid)`
140
141        if foreground:
142            # set the child's process group as new foreground
143            try:
144                os.tcsetpgrp(sys.stdin.fileno(), child.pid)
145            except Exception as e:
146                pass
147            # revive the child,
148            # because it may have been stopped due to SIGTTOU or
149            # SIGTTIN when it tried using stdout/stdin
150            # after setpgid was called, and before we made it
151            # forward process by tcsetpgrp.
152            os.kill(child.pid, signal.SIGCONT)
153
154        # wait for the child to terminate
155        if store_proc_dict is not None:
156            store_proc_dict[store_proc_key] = child
157        _ret = poll_process(child, line_callback) if line_callback is not None else child.wait()
158        ret = _ret if not as_proc else child
159    except KeyboardInterrupt:
160        child.send_signal(signal.SIGINT)
161        ret = child.wait() if not as_proc else child
162    finally:
163        if foreground:
164            # we have to mask SIGTTOU because tcsetpgrp
165            # raises SIGTTOU to all current background
166            # process group members (i.e. us) when switching tty's pgrp
167            # it we didn't do that, we'd get SIGSTOP'd
168            hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN)
169            # make us tty's foreground again
170            try:
171                os.tcsetpgrp(sys.stdin.fileno(), old_pgrp)
172            except Exception as e:
173                pass
174            # now restore the handler
175            signal.signal(signal.SIGTTOU, hdlr)
176            # restore terminal attributes
177            if termios:
178                termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr)
179
180    return ret

Original foreground solution found here: https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess

Parameters
  • *args:: The sysargs to execute.
  • foreground (bool, default False): If True, execute the process as a foreground process that passes Ctrl-C to children. From the original post: The "correct" way of spawning a new subprocess: signals like C-c must only go to the child process, and not to this python.

    Some side-info about "how ctrl-c works": https://unix.stackexchange.com/a/149756/1321

  • as_proc (bool, default False): If True, return the subprocess.Popen object.
  • line_callback (Optional[Callable[[str], Any]], default None): If provided, poll the process and execute the callback when readline() gets new text.
  • store_proc_dict (Optional[Dict[str, Any]], default None): If provided, store the subprocess.Popen object under the key store_proc_key. Useful for accessing the process while it is polling in another thread.
  • store_proc_key (str, default 'child_process'): If store_proc_dict is provided, store the process in the dictionary under this key.
  • kw (Any): Additional keyword arguments to pass to subprocess.Popen.
Returns
  • Either an int for the return code or a subprocess.Popen object.
def poll_process( proc: subprocess.Popen, line_callback: Callable[[bytes], Any], timeout_seconds: Union[int, float, NoneType] = None, timeout_callback: Optional[Callable[[Any], Any]] = None, timeout_callback_args: Optional[Tuple[Any]] = None, timeout_callback_kwargs: Optional[Dict[str, Any]] = None) -> int:
183def poll_process(
184    proc: subprocess.Popen,
185    line_callback: Callable[[bytes], Any],
186    timeout_seconds: Union[int, float, None] = None,
187    timeout_callback: Optional[Callable[[Any], Any]] = None,
188    timeout_callback_args: Optional[Tuple[Any]] = None,
189    timeout_callback_kwargs: Optional[Dict[str, Any]] = None,
190) -> int:
191    """
192    Poll a process and execute a callback function for each line printed to the process's `stdout`.
193    """
194    from meerschaum.utils.threading import Timer
195    from meerschaum.utils.warnings import warn
196
197    def timeout_handler():
198        nonlocal timeout_callback_args, timeout_callback_kwargs
199        try:
200            if platform.system() != 'Windows':
201                ### The process being killed may have children.
202                os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
203            else:
204                proc.send_signal(signal.CTRL_BREAK_EVENT)
205            proc.terminate()
206        except Exception as e:
207            warn(f"Failed to kill process:\n{e}")
208        if timeout_callback_args is None:
209            timeout_callback_args = []
210        if timeout_callback_kwargs is None:
211            timeout_callback_kwargs = {}
212        timeout_callback(*timeout_callback_args, **timeout_callback_kwargs)
213
214    if timeout_seconds is not None:
215        watchdog_thread = Timer(timeout_seconds, timeout_handler)
216        watchdog_thread.daemon = True
217        watchdog_thread.start()
218
219    while proc.poll() is None:
220        line = proc.stdout.readline()
221        line_callback(line)
222
223    if timeout_seconds is not None:
224        watchdog_thread.cancel()
225
226    return proc.poll()

Poll a process and execute a callback function for each line printed to the process's stdout.