meerschaum.utils.daemon.FileDescriptorInterceptor
Intercept OS-level file descriptors.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Intercept OS-level file descriptors. 7""" 8 9import os 10import select 11import traceback 12import errno 13from threading import Event 14from datetime import datetime 15from meerschaum.utils.typing import Callable 16from meerschaum.utils.warnings import warn 17from meerschaum.config.paths import DAEMON_ERROR_LOG_PATH 18 19FD_CLOSED: int = 9 20STOP_READING_FD_EVENT: Event = Event() 21 22class FileDescriptorInterceptor: 23 """ 24 A management class to intercept data written to a file descriptor. 25 """ 26 def __init__( 27 self, 28 file_descriptor: int, 29 injection_hook: Callable[[], str], 30 ): 31 """ 32 Parameters 33 ---------- 34 file_descriptor: int 35 The OS file descriptor from which to read. 36 37 injection_hook: Callable[[], str] 38 A callable which returns a string to be injected into the written data. 39 """ 40 self.stop_event = Event() 41 self.injection_hook = injection_hook 42 self.original_file_descriptor = file_descriptor 43 self.new_file_descriptor = os.dup(file_descriptor) 44 self.read_pipe, self.write_pipe = os.pipe() 45 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 46 os.dup2(self.write_pipe, file_descriptor) 47 48 def start_interception(self): 49 """ 50 Read from the file descriptor and write the modified data after injection. 51 52 NOTE: This is blocking and is meant to be run in a thread. 53 """ 54 os.set_blocking(self.read_pipe, False) 55 os.set_blocking(self.signal_read_pipe, False) 56 is_first_read = True 57 while not self.stop_event.is_set(): 58 try: 59 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 60 if self.signal_read_pipe in rlist: 61 break 62 if not rlist: 63 continue 64 data = os.read(self.read_pipe, 1024) 65 if not data: 66 break 67 except BlockingIOError: 68 continue 69 except OSError as e: 70 if e.errno == errno.EBADF: 71 ### File descriptor is closed. 72 pass 73 elif e.errno == errno.EINTR: 74 continue # Interrupted system call, just try again 75 else: 76 warn(f"OSError in FileDescriptorInterceptor: {e}") 77 break 78 79 try: 80 first_char_is_newline = data[0] == b'\n' 81 last_char_is_newline = data[-1] == b'\n' 82 83 injected_str = self.injection_hook() 84 injected_bytes = injected_str.encode('utf-8') 85 86 if is_first_read: 87 data = b'\n' + data 88 is_first_read = False 89 90 modified_data = ( 91 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 92 if last_char_is_newline 93 else data.replace(b'\n', b'\n' + injected_bytes) 94 ) 95 os.write(self.new_file_descriptor, modified_data) 96 except (BrokenPipeError, OSError): 97 break 98 except Exception: 99 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 100 f.write(traceback.format_exc()) 101 break 102 103 104 def stop_interception(self): 105 """ 106 Close the new file descriptors. 107 """ 108 self.stop_event.set() 109 os.write(self.signal_write_pipe, b'\0') 110 try: 111 os.close(self.new_file_descriptor) 112 except OSError as e: 113 if e.errno != FD_CLOSED: 114 warn( 115 "Error while trying to close the duplicated file descriptor:\n" 116 + f"{traceback.format_exc()}" 117 ) 118 119 try: 120 os.close(self.write_pipe) 121 except OSError as e: 122 if e.errno != FD_CLOSED: 123 warn( 124 "Error while trying to close the write-pipe " 125 + "to the intercepted file descriptor:\n" 126 + f"{traceback.format_exc()}" 127 ) 128 try: 129 os.close(self.read_pipe) 130 except OSError as e: 131 if e.errno != FD_CLOSED: 132 warn( 133 "Error while trying to close the read-pipe " 134 + "to the intercepted file descriptor:\n" 135 + f"{traceback.format_exc()}" 136 ) 137 138 try: 139 os.close(self.signal_read_pipe) 140 except OSError as e: 141 if e.errno != FD_CLOSED: 142 warn( 143 "Error while trying to close the signal-read-pipe " 144 + "to the intercepted file descriptor:\n" 145 + f"{traceback.format_exc()}" 146 ) 147 148 try: 149 os.close(self.signal_write_pipe) 150 except OSError as e: 151 if e.errno != FD_CLOSED: 152 warn( 153 "Error while trying to close the signal-write-pipe " 154 + "to the intercepted file descriptor:\n" 155 + f"{traceback.format_exc()}" 156 )
FD_CLOSED: int =
9
STOP_READING_FD_EVENT: threading.Event =
<threading.Event at 0x73d1e464a060: unset>
class
FileDescriptorInterceptor:
23class FileDescriptorInterceptor: 24 """ 25 A management class to intercept data written to a file descriptor. 26 """ 27 def __init__( 28 self, 29 file_descriptor: int, 30 injection_hook: Callable[[], str], 31 ): 32 """ 33 Parameters 34 ---------- 35 file_descriptor: int 36 The OS file descriptor from which to read. 37 38 injection_hook: Callable[[], str] 39 A callable which returns a string to be injected into the written data. 40 """ 41 self.stop_event = Event() 42 self.injection_hook = injection_hook 43 self.original_file_descriptor = file_descriptor 44 self.new_file_descriptor = os.dup(file_descriptor) 45 self.read_pipe, self.write_pipe = os.pipe() 46 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 47 os.dup2(self.write_pipe, file_descriptor) 48 49 def start_interception(self): 50 """ 51 Read from the file descriptor and write the modified data after injection. 52 53 NOTE: This is blocking and is meant to be run in a thread. 54 """ 55 os.set_blocking(self.read_pipe, False) 56 os.set_blocking(self.signal_read_pipe, False) 57 is_first_read = True 58 while not self.stop_event.is_set(): 59 try: 60 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 61 if self.signal_read_pipe in rlist: 62 break 63 if not rlist: 64 continue 65 data = os.read(self.read_pipe, 1024) 66 if not data: 67 break 68 except BlockingIOError: 69 continue 70 except OSError as e: 71 if e.errno == errno.EBADF: 72 ### File descriptor is closed. 73 pass 74 elif e.errno == errno.EINTR: 75 continue # Interrupted system call, just try again 76 else: 77 warn(f"OSError in FileDescriptorInterceptor: {e}") 78 break 79 80 try: 81 first_char_is_newline = data[0] == b'\n' 82 last_char_is_newline = data[-1] == b'\n' 83 84 injected_str = self.injection_hook() 85 injected_bytes = injected_str.encode('utf-8') 86 87 if is_first_read: 88 data = b'\n' + data 89 is_first_read = False 90 91 modified_data = ( 92 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 93 if last_char_is_newline 94 else data.replace(b'\n', b'\n' + injected_bytes) 95 ) 96 os.write(self.new_file_descriptor, modified_data) 97 except (BrokenPipeError, OSError): 98 break 99 except Exception: 100 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 101 f.write(traceback.format_exc()) 102 break 103 104 105 def stop_interception(self): 106 """ 107 Close the new file descriptors. 108 """ 109 self.stop_event.set() 110 os.write(self.signal_write_pipe, b'\0') 111 try: 112 os.close(self.new_file_descriptor) 113 except OSError as e: 114 if e.errno != FD_CLOSED: 115 warn( 116 "Error while trying to close the duplicated file descriptor:\n" 117 + f"{traceback.format_exc()}" 118 ) 119 120 try: 121 os.close(self.write_pipe) 122 except OSError as e: 123 if e.errno != FD_CLOSED: 124 warn( 125 "Error while trying to close the write-pipe " 126 + "to the intercepted file descriptor:\n" 127 + f"{traceback.format_exc()}" 128 ) 129 try: 130 os.close(self.read_pipe) 131 except OSError as e: 132 if e.errno != FD_CLOSED: 133 warn( 134 "Error while trying to close the read-pipe " 135 + "to the intercepted file descriptor:\n" 136 + f"{traceback.format_exc()}" 137 ) 138 139 try: 140 os.close(self.signal_read_pipe) 141 except OSError as e: 142 if e.errno != FD_CLOSED: 143 warn( 144 "Error while trying to close the signal-read-pipe " 145 + "to the intercepted file descriptor:\n" 146 + f"{traceback.format_exc()}" 147 ) 148 149 try: 150 os.close(self.signal_write_pipe) 151 except OSError as e: 152 if e.errno != FD_CLOSED: 153 warn( 154 "Error while trying to close the signal-write-pipe " 155 + "to the intercepted file descriptor:\n" 156 + f"{traceback.format_exc()}" 157 )
A management class to intercept data written to a file descriptor.
FileDescriptorInterceptor(file_descriptor: int, injection_hook: Callable[[], str])
27 def __init__( 28 self, 29 file_descriptor: int, 30 injection_hook: Callable[[], str], 31 ): 32 """ 33 Parameters 34 ---------- 35 file_descriptor: int 36 The OS file descriptor from which to read. 37 38 injection_hook: Callable[[], str] 39 A callable which returns a string to be injected into the written data. 40 """ 41 self.stop_event = Event() 42 self.injection_hook = injection_hook 43 self.original_file_descriptor = file_descriptor 44 self.new_file_descriptor = os.dup(file_descriptor) 45 self.read_pipe, self.write_pipe = os.pipe() 46 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 47 os.dup2(self.write_pipe, file_descriptor)
Parameters
- file_descriptor (int): The OS file descriptor from which to read.
- injection_hook (Callable[[], str]): A callable which returns a string to be injected into the written data.
def
start_interception(self):
49 def start_interception(self): 50 """ 51 Read from the file descriptor and write the modified data after injection. 52 53 NOTE: This is blocking and is meant to be run in a thread. 54 """ 55 os.set_blocking(self.read_pipe, False) 56 os.set_blocking(self.signal_read_pipe, False) 57 is_first_read = True 58 while not self.stop_event.is_set(): 59 try: 60 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 61 if self.signal_read_pipe in rlist: 62 break 63 if not rlist: 64 continue 65 data = os.read(self.read_pipe, 1024) 66 if not data: 67 break 68 except BlockingIOError: 69 continue 70 except OSError as e: 71 if e.errno == errno.EBADF: 72 ### File descriptor is closed. 73 pass 74 elif e.errno == errno.EINTR: 75 continue # Interrupted system call, just try again 76 else: 77 warn(f"OSError in FileDescriptorInterceptor: {e}") 78 break 79 80 try: 81 first_char_is_newline = data[0] == b'\n' 82 last_char_is_newline = data[-1] == b'\n' 83 84 injected_str = self.injection_hook() 85 injected_bytes = injected_str.encode('utf-8') 86 87 if is_first_read: 88 data = b'\n' + data 89 is_first_read = False 90 91 modified_data = ( 92 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 93 if last_char_is_newline 94 else data.replace(b'\n', b'\n' + injected_bytes) 95 ) 96 os.write(self.new_file_descriptor, modified_data) 97 except (BrokenPipeError, OSError): 98 break 99 except Exception: 100 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 101 f.write(traceback.format_exc()) 102 break
Read from the file descriptor and write the modified data after injection.
NOTE: This is blocking and is meant to be run in a thread.
def
stop_interception(self):
105 def stop_interception(self): 106 """ 107 Close the new file descriptors. 108 """ 109 self.stop_event.set() 110 os.write(self.signal_write_pipe, b'\0') 111 try: 112 os.close(self.new_file_descriptor) 113 except OSError as e: 114 if e.errno != FD_CLOSED: 115 warn( 116 "Error while trying to close the duplicated file descriptor:\n" 117 + f"{traceback.format_exc()}" 118 ) 119 120 try: 121 os.close(self.write_pipe) 122 except OSError as e: 123 if e.errno != FD_CLOSED: 124 warn( 125 "Error while trying to close the write-pipe " 126 + "to the intercepted file descriptor:\n" 127 + f"{traceback.format_exc()}" 128 ) 129 try: 130 os.close(self.read_pipe) 131 except OSError as e: 132 if e.errno != FD_CLOSED: 133 warn( 134 "Error while trying to close the read-pipe " 135 + "to the intercepted file descriptor:\n" 136 + f"{traceback.format_exc()}" 137 ) 138 139 try: 140 os.close(self.signal_read_pipe) 141 except OSError as e: 142 if e.errno != FD_CLOSED: 143 warn( 144 "Error while trying to close the signal-read-pipe " 145 + "to the intercepted file descriptor:\n" 146 + f"{traceback.format_exc()}" 147 ) 148 149 try: 150 os.close(self.signal_write_pipe) 151 except OSError as e: 152 if e.errno != FD_CLOSED: 153 warn( 154 "Error while trying to close the signal-write-pipe " 155 + "to the intercepted file descriptor:\n" 156 + f"{traceback.format_exc()}" 157 )
Close the new file descriptors.