meerschaum.utils.daemon.FileDescriptorInterceptor

Intercept OS-level file descriptors.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Intercept OS-level file descriptors.
  7"""
  8
  9import os
 10import select
 11import traceback
 12import errno
 13from threading import Event
 14from datetime import datetime
 15from meerschaum.utils.typing import Callable
 16from meerschaum.utils.warnings import warn
 17from meerschaum.config.paths import DAEMON_ERROR_LOG_PATH
 18
 19FD_CLOSED: int = 9
 20STOP_READING_FD_EVENT: Event = Event()
 21
 22class FileDescriptorInterceptor:
 23    """
 24    A management class to intercept data written to a file descriptor.
 25    """
 26    def __init__(
 27        self,
 28        file_descriptor: int,
 29        injection_hook: Callable[[], str],
 30    ):
 31        """
 32        Parameters
 33        ----------
 34        file_descriptor: int
 35            The OS file descriptor from which to read.
 36
 37        injection_hook: Callable[[], str]
 38            A callable which returns a string to be injected into the written data.
 39        """
 40        self.stop_event = Event()
 41        self.injection_hook = injection_hook
 42        self.original_file_descriptor = file_descriptor
 43        self.new_file_descriptor = os.dup(file_descriptor)
 44        self.read_pipe, self.write_pipe = os.pipe()
 45        self.signal_read_pipe, self.signal_write_pipe = os.pipe()
 46        os.dup2(self.write_pipe, file_descriptor)
 47
 48    def start_interception(self):
 49        """
 50        Read from the file descriptor and write the modified data after injection.
 51
 52        NOTE: This is blocking and is meant to be run in a thread.
 53        """
 54        os.set_blocking(self.read_pipe, False)
 55        os.set_blocking(self.signal_read_pipe, False)
 56        is_first_read = True
 57        while not self.stop_event.is_set():
 58            try:
 59                rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1)
 60                if self.signal_read_pipe in rlist:
 61                    break
 62                if not rlist:
 63                    continue
 64                data = os.read(self.read_pipe, 1024)
 65                if not data:
 66                    break
 67            except BlockingIOError:
 68                continue
 69            except OSError as e:
 70                if e.errno == errno.EBADF:
 71                    ### File descriptor is closed.
 72                    pass
 73                elif e.errno == errno.EINTR:
 74                    continue  # Interrupted system call, just try again
 75                else:
 76                    warn(f"OSError in FileDescriptorInterceptor: {e}")
 77                break
 78
 79            try:
 80                first_char_is_newline = data[0] == b'\n'
 81                last_char_is_newline = data[-1] == b'\n'
 82
 83                injected_str = self.injection_hook()
 84                injected_bytes = injected_str.encode('utf-8')
 85
 86                if is_first_read:
 87                    data = b'\n' + data
 88                    is_first_read = False
 89
 90                modified_data = (
 91                    (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n')
 92                    if last_char_is_newline
 93                    else data.replace(b'\n', b'\n' + injected_bytes)
 94                )
 95                os.write(self.new_file_descriptor, modified_data)
 96            except (BrokenPipeError, OSError):
 97                break
 98            except Exception:
 99                with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
100                    f.write(traceback.format_exc())
101                break
102
103
104    def stop_interception(self):
105        """
106        Close the new file descriptors.
107        """
108        self.stop_event.set()
109        os.write(self.signal_write_pipe, b'\0')
110        try:
111            os.close(self.new_file_descriptor)
112        except OSError as e:
113            if e.errno != FD_CLOSED:
114                warn(
115                    "Error while trying to close the duplicated file descriptor:\n"
116                    + f"{traceback.format_exc()}"
117                )
118
119        try:
120            os.close(self.write_pipe)
121        except OSError as e:
122            if e.errno != FD_CLOSED:
123                warn(
124                    "Error while trying to close the write-pipe "
125                    + "to the intercepted file descriptor:\n"
126                    + f"{traceback.format_exc()}"
127                )
128        try:
129            os.close(self.read_pipe)
130        except OSError as e:
131            if e.errno != FD_CLOSED:
132                warn(
133                    "Error while trying to close the read-pipe "
134                    + "to the intercepted file descriptor:\n"
135                    + f"{traceback.format_exc()}"
136                )
137
138        try:
139            os.close(self.signal_read_pipe)
140        except OSError as e:
141            if e.errno != FD_CLOSED:
142                warn(
143                    "Error while trying to close the signal-read-pipe "
144                    + "to the intercepted file descriptor:\n"
145                    + f"{traceback.format_exc()}"
146                )
147
148        try:
149            os.close(self.signal_write_pipe)
150        except OSError as e:
151            if e.errno != FD_CLOSED:
152                warn(
153                    "Error while trying to close the signal-write-pipe "
154                    + "to the intercepted file descriptor:\n"
155                    + f"{traceback.format_exc()}"
156                )
FD_CLOSED: int = 9
STOP_READING_FD_EVENT: threading.Event = <threading.Event object>
class FileDescriptorInterceptor:
 23class FileDescriptorInterceptor:
 24    """
 25    A management class to intercept data written to a file descriptor.
 26    """
 27    def __init__(
 28        self,
 29        file_descriptor: int,
 30        injection_hook: Callable[[], str],
 31    ):
 32        """
 33        Parameters
 34        ----------
 35        file_descriptor: int
 36            The OS file descriptor from which to read.
 37
 38        injection_hook: Callable[[], str]
 39            A callable which returns a string to be injected into the written data.
 40        """
 41        self.stop_event = Event()
 42        self.injection_hook = injection_hook
 43        self.original_file_descriptor = file_descriptor
 44        self.new_file_descriptor = os.dup(file_descriptor)
 45        self.read_pipe, self.write_pipe = os.pipe()
 46        self.signal_read_pipe, self.signal_write_pipe = os.pipe()
 47        os.dup2(self.write_pipe, file_descriptor)
 48
 49    def start_interception(self):
 50        """
 51        Read from the file descriptor and write the modified data after injection.
 52
 53        NOTE: This is blocking and is meant to be run in a thread.
 54        """
 55        os.set_blocking(self.read_pipe, False)
 56        os.set_blocking(self.signal_read_pipe, False)
 57        is_first_read = True
 58        while not self.stop_event.is_set():
 59            try:
 60                rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1)
 61                if self.signal_read_pipe in rlist:
 62                    break
 63                if not rlist:
 64                    continue
 65                data = os.read(self.read_pipe, 1024)
 66                if not data:
 67                    break
 68            except BlockingIOError:
 69                continue
 70            except OSError as e:
 71                if e.errno == errno.EBADF:
 72                    ### File descriptor is closed.
 73                    pass
 74                elif e.errno == errno.EINTR:
 75                    continue  # Interrupted system call, just try again
 76                else:
 77                    warn(f"OSError in FileDescriptorInterceptor: {e}")
 78                break
 79
 80            try:
 81                first_char_is_newline = data[0] == b'\n'
 82                last_char_is_newline = data[-1] == b'\n'
 83
 84                injected_str = self.injection_hook()
 85                injected_bytes = injected_str.encode('utf-8')
 86
 87                if is_first_read:
 88                    data = b'\n' + data
 89                    is_first_read = False
 90
 91                modified_data = (
 92                    (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n')
 93                    if last_char_is_newline
 94                    else data.replace(b'\n', b'\n' + injected_bytes)
 95                )
 96                os.write(self.new_file_descriptor, modified_data)
 97            except (BrokenPipeError, OSError):
 98                break
 99            except Exception:
100                with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
101                    f.write(traceback.format_exc())
102                break
103
104
105    def stop_interception(self):
106        """
107        Close the new file descriptors.
108        """
109        self.stop_event.set()
110        os.write(self.signal_write_pipe, b'\0')
111        try:
112            os.close(self.new_file_descriptor)
113        except OSError as e:
114            if e.errno != FD_CLOSED:
115                warn(
116                    "Error while trying to close the duplicated file descriptor:\n"
117                    + f"{traceback.format_exc()}"
118                )
119
120        try:
121            os.close(self.write_pipe)
122        except OSError as e:
123            if e.errno != FD_CLOSED:
124                warn(
125                    "Error while trying to close the write-pipe "
126                    + "to the intercepted file descriptor:\n"
127                    + f"{traceback.format_exc()}"
128                )
129        try:
130            os.close(self.read_pipe)
131        except OSError as e:
132            if e.errno != FD_CLOSED:
133                warn(
134                    "Error while trying to close the read-pipe "
135                    + "to the intercepted file descriptor:\n"
136                    + f"{traceback.format_exc()}"
137                )
138
139        try:
140            os.close(self.signal_read_pipe)
141        except OSError as e:
142            if e.errno != FD_CLOSED:
143                warn(
144                    "Error while trying to close the signal-read-pipe "
145                    + "to the intercepted file descriptor:\n"
146                    + f"{traceback.format_exc()}"
147                )
148
149        try:
150            os.close(self.signal_write_pipe)
151        except OSError as e:
152            if e.errno != FD_CLOSED:
153                warn(
154                    "Error while trying to close the signal-write-pipe "
155                    + "to the intercepted file descriptor:\n"
156                    + f"{traceback.format_exc()}"
157                )

A management class to intercept data written to a file descriptor.

FileDescriptorInterceptor(file_descriptor: int, injection_hook: Callable[[], str])
27    def __init__(
28        self,
29        file_descriptor: int,
30        injection_hook: Callable[[], str],
31    ):
32        """
33        Parameters
34        ----------
35        file_descriptor: int
36            The OS file descriptor from which to read.
37
38        injection_hook: Callable[[], str]
39            A callable which returns a string to be injected into the written data.
40        """
41        self.stop_event = Event()
42        self.injection_hook = injection_hook
43        self.original_file_descriptor = file_descriptor
44        self.new_file_descriptor = os.dup(file_descriptor)
45        self.read_pipe, self.write_pipe = os.pipe()
46        self.signal_read_pipe, self.signal_write_pipe = os.pipe()
47        os.dup2(self.write_pipe, file_descriptor)
Parameters
  • file_descriptor (int): The OS file descriptor from which to read.
  • injection_hook (Callable[[], str]): A callable which returns a string to be injected into the written data.
stop_event
injection_hook
original_file_descriptor
new_file_descriptor
def start_interception(self):
 49    def start_interception(self):
 50        """
 51        Read from the file descriptor and write the modified data after injection.
 52
 53        NOTE: This is blocking and is meant to be run in a thread.
 54        """
 55        os.set_blocking(self.read_pipe, False)
 56        os.set_blocking(self.signal_read_pipe, False)
 57        is_first_read = True
 58        while not self.stop_event.is_set():
 59            try:
 60                rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1)
 61                if self.signal_read_pipe in rlist:
 62                    break
 63                if not rlist:
 64                    continue
 65                data = os.read(self.read_pipe, 1024)
 66                if not data:
 67                    break
 68            except BlockingIOError:
 69                continue
 70            except OSError as e:
 71                if e.errno == errno.EBADF:
 72                    ### File descriptor is closed.
 73                    pass
 74                elif e.errno == errno.EINTR:
 75                    continue  # Interrupted system call, just try again
 76                else:
 77                    warn(f"OSError in FileDescriptorInterceptor: {e}")
 78                break
 79
 80            try:
 81                first_char_is_newline = data[0] == b'\n'
 82                last_char_is_newline = data[-1] == b'\n'
 83
 84                injected_str = self.injection_hook()
 85                injected_bytes = injected_str.encode('utf-8')
 86
 87                if is_first_read:
 88                    data = b'\n' + data
 89                    is_first_read = False
 90
 91                modified_data = (
 92                    (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n')
 93                    if last_char_is_newline
 94                    else data.replace(b'\n', b'\n' + injected_bytes)
 95                )
 96                os.write(self.new_file_descriptor, modified_data)
 97            except (BrokenPipeError, OSError):
 98                break
 99            except Exception:
100                with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f:
101                    f.write(traceback.format_exc())
102                break

Read from the file descriptor and write the modified data after injection.

NOTE: This is blocking and is meant to be run in a thread.

def stop_interception(self):
105    def stop_interception(self):
106        """
107        Close the new file descriptors.
108        """
109        self.stop_event.set()
110        os.write(self.signal_write_pipe, b'\0')
111        try:
112            os.close(self.new_file_descriptor)
113        except OSError as e:
114            if e.errno != FD_CLOSED:
115                warn(
116                    "Error while trying to close the duplicated file descriptor:\n"
117                    + f"{traceback.format_exc()}"
118                )
119
120        try:
121            os.close(self.write_pipe)
122        except OSError as e:
123            if e.errno != FD_CLOSED:
124                warn(
125                    "Error while trying to close the write-pipe "
126                    + "to the intercepted file descriptor:\n"
127                    + f"{traceback.format_exc()}"
128                )
129        try:
130            os.close(self.read_pipe)
131        except OSError as e:
132            if e.errno != FD_CLOSED:
133                warn(
134                    "Error while trying to close the read-pipe "
135                    + "to the intercepted file descriptor:\n"
136                    + f"{traceback.format_exc()}"
137                )
138
139        try:
140            os.close(self.signal_read_pipe)
141        except OSError as e:
142            if e.errno != FD_CLOSED:
143                warn(
144                    "Error while trying to close the signal-read-pipe "
145                    + "to the intercepted file descriptor:\n"
146                    + f"{traceback.format_exc()}"
147                )
148
149        try:
150            os.close(self.signal_write_pipe)
151        except OSError as e:
152            if e.errno != FD_CLOSED:
153                warn(
154                    "Error while trying to close the signal-write-pipe "
155                    + "to the intercepted file descriptor:\n"
156                    + f"{traceback.format_exc()}"
157                )

Close the new file descriptors.