meerschaum.utils.process

Custom process-handling functions. See meerschaum.utils.pool for multiprocessing and meerschaum.utils.threading for threads.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Custom process-handling functions.
  7See `meerschaum.utils.pool` for multiprocessing and
  8`meerschaum.utils.threading` for threads.
  9"""
 10
 11from __future__ import annotations
 12import os, signal, subprocess, sys, platform
 13from meerschaum.utils.typing import Union, Optional, Any, Callable, Dict, Tuple
 14
 15def run_process(
 16        *args,
 17        foreground: bool = False,
 18        as_proc: bool = False,
 19        line_callback: Optional[Callable[[bytes], Any]] = None,
 20        store_proc_dict: Optional[Dict[str, Any]] = None,
 21        store_proc_key: str = 'child_process',
 22        capture_output: bool = False,
 23        **kw: Any
 24    ) -> Union[int, subprocess.Popen]:
 25    """Original foreground solution found here:
 26    https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
 27
 28    Parameters
 29    ----------
 30    *args:
 31        The sysargs to execute.
 32
 33    foreground: bool, default False
 34        If `True`, execute the process as a foreground process that passes Ctrl-C to children.
 35        From the original post:
 36        The "correct" way of spawning a new subprocess:
 37        signals like C-c must only go
 38        to the child process, and not to this python.
 39        
 40        Some side-info about "how ctrl-c works":
 41        https://unix.stackexchange.com/a/149756/1321
 42
 43    as_proc: bool, default False
 44        If `True`, return the `subprocess.Popen` object.
 45
 46    line_callback: Optional[Callable[[str], Any]], default None
 47        If provided, poll the process and execute the callback when `readline()` gets new text.
 48
 49    store_proc_dict: Optional[Dict[str, Any]], default None
 50        If provided, store the `subprocess.Popen` object under the key `store_proc_key`.
 51        Useful for accessing the process while it is polling in another thread.
 52
 53    store_proc_key: str, default 'child_process'
 54        If `store_proc_dict` is provided, store the process in the dictionary under this key.
 55
 56    kw: Any
 57        Additional keyword arguments to pass to `subprocess.Popen`.
 58
 59    Returns
 60    -------
 61    Either an int for the return code or a `subprocess.Popen` object.
 62    """
 63    try:
 64        import termios
 65    except ImportError:
 66        termios = None
 67
 68    if platform.system() == 'Windows':
 69        foreground = False
 70
 71    if line_callback is not None:
 72        kw['stdout'] = subprocess.PIPE
 73        kw['stderr'] = subprocess.STDOUT
 74
 75    if 'env' not in kw:
 76        kw['env'] = os.environ
 77
 78    user_preexec_fn = kw.get("preexec_fn", None)
 79
 80    if foreground:
 81        try:
 82            old_pgrp = os.tcgetpgrp(sys.stdin.fileno())
 83        except Exception as e:
 84            termios = None
 85        if termios:
 86            try:
 87                old_attr = termios.tcgetattr(sys.stdin.fileno())
 88            except Exception as e:
 89                termios = None
 90
 91    def new_pgid():
 92        if user_preexec_fn:
 93            user_preexec_fn()
 94
 95        # set a new process group id
 96        os.setpgid(os.getpid(), os.getpid())
 97
 98        # generally, the child process should stop itself
 99        # before exec so the parent can set its new pgid.
100        # (setting pgid has to be done before the child execs).
101        # however, Python 'guarantee' that `preexec_fn`
102        # is run before `Popen` returns.
103        # this is because `Popen` waits for the closure of
104        # the error relay pipe '`errpipe_write`',
105        # which happens at child's exec.
106        # this is also the reason the child can't stop itself
107        # in Python's `Popen`, since the `Popen` call would never
108        # terminate then.
109        # `os.kill(os.getpid(), signal.SIGSTOP)`
110
111    if foreground:
112        kw['preexec_fn'] = new_pgid
113
114    try:
115        # fork the child
116        #  stdout, stderr = (
117            #  (sys.stdout, sys.stderr) if not capture_output
118            #  else (subprocess.PIPE, subprocess.PIPE)
119        #  )
120        if capture_output:
121            kw['stdout'] = subprocess.PIPE
122            kw['stderr'] = subprocess.PIPE
123
124        child = subprocess.Popen(*args, **kw)
125
126        # we can't set the process group id from the parent since the child
127        # will already have exec'd. and we can't SIGSTOP it before exec,
128        # see above.
129        # `os.setpgid(child.pid, child.pid)`
130
131        if foreground:
132            # set the child's process group as new foreground
133            try:
134                os.tcsetpgrp(sys.stdin.fileno(), child.pid)
135            except Exception as e:
136                pass
137            # revive the child,
138            # because it may have been stopped due to SIGTTOU or
139            # SIGTTIN when it tried using stdout/stdin
140            # after setpgid was called, and before we made it
141            # forward process by tcsetpgrp.
142            os.kill(child.pid, signal.SIGCONT)
143
144        # wait for the child to terminate
145        if store_proc_dict is not None:
146            store_proc_dict[store_proc_key] = child
147        _ret = poll_process(child, line_callback) if line_callback is not None else child.wait()
148        ret = _ret if not as_proc else child
149
150    finally:
151        if foreground:
152            # we have to mask SIGTTOU because tcsetpgrp
153            # raises SIGTTOU to all current background
154            # process group members (i.e. us) when switching tty's pgrp
155            # it we didn't do that, we'd get SIGSTOP'd
156            hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN)
157            # make us tty's foreground again
158            try:
159                os.tcsetpgrp(sys.stdin.fileno(), old_pgrp)
160            except Exception as e:
161                pass
162            # now restore the handler
163            signal.signal(signal.SIGTTOU, hdlr)
164            # restore terminal attributes
165            if termios:
166                termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr)
167
168    return ret
169
170def poll_process(
171        proc: subprocess.Popen,
172        line_callback: Callable[[bytes], Any],
173        timeout_seconds: Union[int, float, None] = None,
174        timeout_callback: Optional[Callable[[Any], Any]] = None,
175        timeout_callback_args: Optional[Tuple[Any]] = None,
176        timeout_callback_kwargs: Optional[Dict[str, Any]] = None,
177    ) -> int:
178    """
179    Poll a process and execute a callback function for each line printed to the process's `stdout`.
180    """
181    from meerschaum.utils.threading import Timer
182
183    def timeout_handler():
184        nonlocal timeout_callback_args, timeout_callback_kwargs
185        proc.terminate()
186        if timeout_callback_args is None:
187            timeout_callback_args = []
188        if timeout_callback_kwargs is None:
189            timeout_callback_kwargs = {}
190        timeout_callback(*timeout_callback_args, **timeout_callback_kwargs)
191
192    if timeout_seconds is not None:
193        watchdog_thread = Timer(timeout_seconds, timeout_handler)
194        watchdog_thread.daemon = True
195        watchdog_thread.start()
196
197    while proc.poll() is None:
198        line = proc.stdout.readline()
199        line_callback(line)
200    if timeout_seconds is not None:
201        watchdog_thread.cancel()
202    return proc.poll()
def run_process( *args, foreground: bool = False, as_proc: bool = False, line_callback: Optional[Callable[[bytes], Any]] = None, store_proc_dict: Optional[Dict[str, Any]] = None, store_proc_key: str = 'child_process', capture_output: bool = False, **kw: Any) -> Union[int, subprocess.Popen]:
 16def run_process(
 17        *args,
 18        foreground: bool = False,
 19        as_proc: bool = False,
 20        line_callback: Optional[Callable[[bytes], Any]] = None,
 21        store_proc_dict: Optional[Dict[str, Any]] = None,
 22        store_proc_key: str = 'child_process',
 23        capture_output: bool = False,
 24        **kw: Any
 25    ) -> Union[int, subprocess.Popen]:
 26    """Original foreground solution found here:
 27    https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess
 28
 29    Parameters
 30    ----------
 31    *args:
 32        The sysargs to execute.
 33
 34    foreground: bool, default False
 35        If `True`, execute the process as a foreground process that passes Ctrl-C to children.
 36        From the original post:
 37        The "correct" way of spawning a new subprocess:
 38        signals like C-c must only go
 39        to the child process, and not to this python.
 40        
 41        Some side-info about "how ctrl-c works":
 42        https://unix.stackexchange.com/a/149756/1321
 43
 44    as_proc: bool, default False
 45        If `True`, return the `subprocess.Popen` object.
 46
 47    line_callback: Optional[Callable[[str], Any]], default None
 48        If provided, poll the process and execute the callback when `readline()` gets new text.
 49
 50    store_proc_dict: Optional[Dict[str, Any]], default None
 51        If provided, store the `subprocess.Popen` object under the key `store_proc_key`.
 52        Useful for accessing the process while it is polling in another thread.
 53
 54    store_proc_key: str, default 'child_process'
 55        If `store_proc_dict` is provided, store the process in the dictionary under this key.
 56
 57    kw: Any
 58        Additional keyword arguments to pass to `subprocess.Popen`.
 59
 60    Returns
 61    -------
 62    Either an int for the return code or a `subprocess.Popen` object.
 63    """
 64    try:
 65        import termios
 66    except ImportError:
 67        termios = None
 68
 69    if platform.system() == 'Windows':
 70        foreground = False
 71
 72    if line_callback is not None:
 73        kw['stdout'] = subprocess.PIPE
 74        kw['stderr'] = subprocess.STDOUT
 75
 76    if 'env' not in kw:
 77        kw['env'] = os.environ
 78
 79    user_preexec_fn = kw.get("preexec_fn", None)
 80
 81    if foreground:
 82        try:
 83            old_pgrp = os.tcgetpgrp(sys.stdin.fileno())
 84        except Exception as e:
 85            termios = None
 86        if termios:
 87            try:
 88                old_attr = termios.tcgetattr(sys.stdin.fileno())
 89            except Exception as e:
 90                termios = None
 91
 92    def new_pgid():
 93        if user_preexec_fn:
 94            user_preexec_fn()
 95
 96        # set a new process group id
 97        os.setpgid(os.getpid(), os.getpid())
 98
 99        # generally, the child process should stop itself
100        # before exec so the parent can set its new pgid.
101        # (setting pgid has to be done before the child execs).
102        # however, Python 'guarantee' that `preexec_fn`
103        # is run before `Popen` returns.
104        # this is because `Popen` waits for the closure of
105        # the error relay pipe '`errpipe_write`',
106        # which happens at child's exec.
107        # this is also the reason the child can't stop itself
108        # in Python's `Popen`, since the `Popen` call would never
109        # terminate then.
110        # `os.kill(os.getpid(), signal.SIGSTOP)`
111
112    if foreground:
113        kw['preexec_fn'] = new_pgid
114
115    try:
116        # fork the child
117        #  stdout, stderr = (
118            #  (sys.stdout, sys.stderr) if not capture_output
119            #  else (subprocess.PIPE, subprocess.PIPE)
120        #  )
121        if capture_output:
122            kw['stdout'] = subprocess.PIPE
123            kw['stderr'] = subprocess.PIPE
124
125        child = subprocess.Popen(*args, **kw)
126
127        # we can't set the process group id from the parent since the child
128        # will already have exec'd. and we can't SIGSTOP it before exec,
129        # see above.
130        # `os.setpgid(child.pid, child.pid)`
131
132        if foreground:
133            # set the child's process group as new foreground
134            try:
135                os.tcsetpgrp(sys.stdin.fileno(), child.pid)
136            except Exception as e:
137                pass
138            # revive the child,
139            # because it may have been stopped due to SIGTTOU or
140            # SIGTTIN when it tried using stdout/stdin
141            # after setpgid was called, and before we made it
142            # forward process by tcsetpgrp.
143            os.kill(child.pid, signal.SIGCONT)
144
145        # wait for the child to terminate
146        if store_proc_dict is not None:
147            store_proc_dict[store_proc_key] = child
148        _ret = poll_process(child, line_callback) if line_callback is not None else child.wait()
149        ret = _ret if not as_proc else child
150
151    finally:
152        if foreground:
153            # we have to mask SIGTTOU because tcsetpgrp
154            # raises SIGTTOU to all current background
155            # process group members (i.e. us) when switching tty's pgrp
156            # it we didn't do that, we'd get SIGSTOP'd
157            hdlr = signal.signal(signal.SIGTTOU, signal.SIG_IGN)
158            # make us tty's foreground again
159            try:
160                os.tcsetpgrp(sys.stdin.fileno(), old_pgrp)
161            except Exception as e:
162                pass
163            # now restore the handler
164            signal.signal(signal.SIGTTOU, hdlr)
165            # restore terminal attributes
166            if termios:
167                termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, old_attr)
168
169    return ret

Original foreground solution found here: https://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess

Parameters
  • *args:: The sysargs to execute.
  • foreground (bool, default False): If True, execute the process as a foreground process that passes Ctrl-C to children. From the original post: The "correct" way of spawning a new subprocess: signals like C-c must only go to the child process, and not to this python.

    Some side-info about "how ctrl-c works": https://unix.stackexchange.com/a/149756/1321

  • as_proc (bool, default False): If True, return the subprocess.Popen object.
  • line_callback (Optional[Callable[[str], Any]], default None): If provided, poll the process and execute the callback when readline() gets new text.
  • store_proc_dict (Optional[Dict[str, Any]], default None): If provided, store the subprocess.Popen object under the key store_proc_key. Useful for accessing the process while it is polling in another thread.
  • store_proc_key (str, default 'child_process'): If store_proc_dict is provided, store the process in the dictionary under this key.
  • kw (Any): Additional keyword arguments to pass to subprocess.Popen.
Returns
  • Either an int for the return code or a subprocess.Popen object.
def poll_process( proc: subprocess.Popen, line_callback: Callable[[bytes], Any], timeout_seconds: Union[int, float, NoneType] = None, timeout_callback: Optional[Callable[[Any], Any]] = None, timeout_callback_args: Optional[Tuple[Any]] = None, timeout_callback_kwargs: Optional[Dict[str, Any]] = None) -> int:
171def poll_process(
172        proc: subprocess.Popen,
173        line_callback: Callable[[bytes], Any],
174        timeout_seconds: Union[int, float, None] = None,
175        timeout_callback: Optional[Callable[[Any], Any]] = None,
176        timeout_callback_args: Optional[Tuple[Any]] = None,
177        timeout_callback_kwargs: Optional[Dict[str, Any]] = None,
178    ) -> int:
179    """
180    Poll a process and execute a callback function for each line printed to the process's `stdout`.
181    """
182    from meerschaum.utils.threading import Timer
183
184    def timeout_handler():
185        nonlocal timeout_callback_args, timeout_callback_kwargs
186        proc.terminate()
187        if timeout_callback_args is None:
188            timeout_callback_args = []
189        if timeout_callback_kwargs is None:
190            timeout_callback_kwargs = {}
191        timeout_callback(*timeout_callback_args, **timeout_callback_kwargs)
192
193    if timeout_seconds is not None:
194        watchdog_thread = Timer(timeout_seconds, timeout_handler)
195        watchdog_thread.daemon = True
196        watchdog_thread.start()
197
198    while proc.poll() is None:
199        line = proc.stdout.readline()
200        line_callback(line)
201    if timeout_seconds is not None:
202        watchdog_thread.cancel()
203    return proc.poll()

Poll a process and execute a callback function for each line printed to the process's stdout.