meerschaum.utils.daemon.FileDescriptorInterceptor
Intercept OS-level file descriptors.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Intercept OS-level file descriptors. 7""" 8 9import os 10import select 11import traceback 12import errno 13from threading import Event 14from datetime import datetime 15from meerschaum.utils.typing import Callable 16from meerschaum.utils.warnings import warn 17from meerschaum.config.paths import DAEMON_ERROR_LOG_PATH 18 19FD_CLOSED: int = 9 20STOP_READING_FD_EVENT: Event = Event() 21 22class FileDescriptorInterceptor: 23 """ 24 A management class to intercept data written to a file descriptor. 25 """ 26 def __init__( 27 self, 28 file_descriptor: int, 29 injection_hook: Callable[[], str], 30 ): 31 """ 32 Parameters 33 ---------- 34 file_descriptor: int 35 The OS file descriptor from which to read. 36 37 injection_hook: Callable[[], str] 38 A callable which returns a string to be injected into the written data. 39 """ 40 self.stop_event = Event() 41 self.injection_hook = injection_hook 42 self.original_file_descriptor = file_descriptor 43 self.new_file_descriptor = os.dup(file_descriptor) 44 self.read_pipe, self.write_pipe = os.pipe() 45 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 46 os.dup2(self.write_pipe, file_descriptor) 47 48 def start_interception(self): 49 """ 50 Read from the file descriptor and write the modified data after injection. 51 52 NOTE: This is blocking and is meant to be run in a thread. 53 """ 54 os.set_blocking(self.read_pipe, False) 55 os.set_blocking(self.signal_read_pipe, False) 56 is_first_read = True 57 while not self.stop_event.is_set(): 58 try: 59 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 60 if self.signal_read_pipe in rlist: 61 break 62 if not rlist: 63 continue 64 data = os.read(self.read_pipe, 1024) 65 if not data: 66 break 67 except BlockingIOError: 68 continue 69 except OSError as e: 70 if e.errno == errno.EBADF: 71 ### File descriptor is closed. 72 pass 73 elif e.errno == errno.EINTR: 74 continue # Interrupted system call, just try again 75 else: 76 warn(f"OSError in FileDescriptorInterceptor: {e}") 77 break 78 79 try: 80 last_char_is_newline = data[-1] == b'\n' 81 82 injected_str = self.injection_hook() 83 injected_bytes = injected_str.encode('utf-8') 84 85 if is_first_read: 86 data = b'\n' + data 87 is_first_read = False 88 89 modified_data = ( 90 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 91 if last_char_is_newline 92 else data.replace(b'\n', b'\n' + injected_bytes) 93 ) 94 os.write(self.new_file_descriptor, modified_data) 95 except (BrokenPipeError, OSError): 96 break 97 except Exception: 98 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 99 f.write(traceback.format_exc()) 100 break 101 102 103 def stop_interception(self): 104 """ 105 Close the new file descriptors. 106 """ 107 self.stop_event.set() 108 os.write(self.signal_write_pipe, b'\0') 109 try: 110 os.close(self.new_file_descriptor) 111 except OSError as e: 112 if e.errno != FD_CLOSED: 113 warn( 114 "Error while trying to close the duplicated file descriptor:\n" 115 + f"{traceback.format_exc()}" 116 ) 117 118 try: 119 os.close(self.write_pipe) 120 except OSError as e: 121 if e.errno != FD_CLOSED: 122 warn( 123 "Error while trying to close the write-pipe " 124 + "to the intercepted file descriptor:\n" 125 + f"{traceback.format_exc()}" 126 ) 127 try: 128 os.close(self.read_pipe) 129 except OSError as e: 130 if e.errno != FD_CLOSED: 131 warn( 132 "Error while trying to close the read-pipe " 133 + "to the intercepted file descriptor:\n" 134 + f"{traceback.format_exc()}" 135 ) 136 137 try: 138 os.close(self.signal_read_pipe) 139 except OSError as e: 140 if e.errno != FD_CLOSED: 141 warn( 142 "Error while trying to close the signal-read-pipe " 143 + "to the intercepted file descriptor:\n" 144 + f"{traceback.format_exc()}" 145 ) 146 147 try: 148 os.close(self.signal_write_pipe) 149 except OSError as e: 150 if e.errno != FD_CLOSED: 151 warn( 152 "Error while trying to close the signal-write-pipe " 153 + "to the intercepted file descriptor:\n" 154 + f"{traceback.format_exc()}" 155 )
FD_CLOSED: int =
9
STOP_READING_FD_EVENT: threading.Event =
<threading.Event at 0x7513cf3d1250: unset>
class
FileDescriptorInterceptor:
23class FileDescriptorInterceptor: 24 """ 25 A management class to intercept data written to a file descriptor. 26 """ 27 def __init__( 28 self, 29 file_descriptor: int, 30 injection_hook: Callable[[], str], 31 ): 32 """ 33 Parameters 34 ---------- 35 file_descriptor: int 36 The OS file descriptor from which to read. 37 38 injection_hook: Callable[[], str] 39 A callable which returns a string to be injected into the written data. 40 """ 41 self.stop_event = Event() 42 self.injection_hook = injection_hook 43 self.original_file_descriptor = file_descriptor 44 self.new_file_descriptor = os.dup(file_descriptor) 45 self.read_pipe, self.write_pipe = os.pipe() 46 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 47 os.dup2(self.write_pipe, file_descriptor) 48 49 def start_interception(self): 50 """ 51 Read from the file descriptor and write the modified data after injection. 52 53 NOTE: This is blocking and is meant to be run in a thread. 54 """ 55 os.set_blocking(self.read_pipe, False) 56 os.set_blocking(self.signal_read_pipe, False) 57 is_first_read = True 58 while not self.stop_event.is_set(): 59 try: 60 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 61 if self.signal_read_pipe in rlist: 62 break 63 if not rlist: 64 continue 65 data = os.read(self.read_pipe, 1024) 66 if not data: 67 break 68 except BlockingIOError: 69 continue 70 except OSError as e: 71 if e.errno == errno.EBADF: 72 ### File descriptor is closed. 73 pass 74 elif e.errno == errno.EINTR: 75 continue # Interrupted system call, just try again 76 else: 77 warn(f"OSError in FileDescriptorInterceptor: {e}") 78 break 79 80 try: 81 last_char_is_newline = data[-1] == b'\n' 82 83 injected_str = self.injection_hook() 84 injected_bytes = injected_str.encode('utf-8') 85 86 if is_first_read: 87 data = b'\n' + data 88 is_first_read = False 89 90 modified_data = ( 91 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 92 if last_char_is_newline 93 else data.replace(b'\n', b'\n' + injected_bytes) 94 ) 95 os.write(self.new_file_descriptor, modified_data) 96 except (BrokenPipeError, OSError): 97 break 98 except Exception: 99 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 100 f.write(traceback.format_exc()) 101 break 102 103 104 def stop_interception(self): 105 """ 106 Close the new file descriptors. 107 """ 108 self.stop_event.set() 109 os.write(self.signal_write_pipe, b'\0') 110 try: 111 os.close(self.new_file_descriptor) 112 except OSError as e: 113 if e.errno != FD_CLOSED: 114 warn( 115 "Error while trying to close the duplicated file descriptor:\n" 116 + f"{traceback.format_exc()}" 117 ) 118 119 try: 120 os.close(self.write_pipe) 121 except OSError as e: 122 if e.errno != FD_CLOSED: 123 warn( 124 "Error while trying to close the write-pipe " 125 + "to the intercepted file descriptor:\n" 126 + f"{traceback.format_exc()}" 127 ) 128 try: 129 os.close(self.read_pipe) 130 except OSError as e: 131 if e.errno != FD_CLOSED: 132 warn( 133 "Error while trying to close the read-pipe " 134 + "to the intercepted file descriptor:\n" 135 + f"{traceback.format_exc()}" 136 ) 137 138 try: 139 os.close(self.signal_read_pipe) 140 except OSError as e: 141 if e.errno != FD_CLOSED: 142 warn( 143 "Error while trying to close the signal-read-pipe " 144 + "to the intercepted file descriptor:\n" 145 + f"{traceback.format_exc()}" 146 ) 147 148 try: 149 os.close(self.signal_write_pipe) 150 except OSError as e: 151 if e.errno != FD_CLOSED: 152 warn( 153 "Error while trying to close the signal-write-pipe " 154 + "to the intercepted file descriptor:\n" 155 + f"{traceback.format_exc()}" 156 )
A management class to intercept data written to a file descriptor.
FileDescriptorInterceptor(file_descriptor: int, injection_hook: Callable[[], str])
27 def __init__( 28 self, 29 file_descriptor: int, 30 injection_hook: Callable[[], str], 31 ): 32 """ 33 Parameters 34 ---------- 35 file_descriptor: int 36 The OS file descriptor from which to read. 37 38 injection_hook: Callable[[], str] 39 A callable which returns a string to be injected into the written data. 40 """ 41 self.stop_event = Event() 42 self.injection_hook = injection_hook 43 self.original_file_descriptor = file_descriptor 44 self.new_file_descriptor = os.dup(file_descriptor) 45 self.read_pipe, self.write_pipe = os.pipe() 46 self.signal_read_pipe, self.signal_write_pipe = os.pipe() 47 os.dup2(self.write_pipe, file_descriptor)
Parameters
- file_descriptor (int): The OS file descriptor from which to read.
- injection_hook (Callable[[], str]): A callable which returns a string to be injected into the written data.
def
start_interception(self):
49 def start_interception(self): 50 """ 51 Read from the file descriptor and write the modified data after injection. 52 53 NOTE: This is blocking and is meant to be run in a thread. 54 """ 55 os.set_blocking(self.read_pipe, False) 56 os.set_blocking(self.signal_read_pipe, False) 57 is_first_read = True 58 while not self.stop_event.is_set(): 59 try: 60 rlist, _, _ = select.select([self.read_pipe, self.signal_read_pipe], [], [], 0.1) 61 if self.signal_read_pipe in rlist: 62 break 63 if not rlist: 64 continue 65 data = os.read(self.read_pipe, 1024) 66 if not data: 67 break 68 except BlockingIOError: 69 continue 70 except OSError as e: 71 if e.errno == errno.EBADF: 72 ### File descriptor is closed. 73 pass 74 elif e.errno == errno.EINTR: 75 continue # Interrupted system call, just try again 76 else: 77 warn(f"OSError in FileDescriptorInterceptor: {e}") 78 break 79 80 try: 81 last_char_is_newline = data[-1] == b'\n' 82 83 injected_str = self.injection_hook() 84 injected_bytes = injected_str.encode('utf-8') 85 86 if is_first_read: 87 data = b'\n' + data 88 is_first_read = False 89 90 modified_data = ( 91 (data[:-1].replace(b'\n', b'\n' + injected_bytes) + b'\n') 92 if last_char_is_newline 93 else data.replace(b'\n', b'\n' + injected_bytes) 94 ) 95 os.write(self.new_file_descriptor, modified_data) 96 except (BrokenPipeError, OSError): 97 break 98 except Exception: 99 with open(DAEMON_ERROR_LOG_PATH, 'a+', encoding='utf-8') as f: 100 f.write(traceback.format_exc()) 101 break
Read from the file descriptor and write the modified data after injection.
NOTE: This is blocking and is meant to be run in a thread.
def
stop_interception(self):
104 def stop_interception(self): 105 """ 106 Close the new file descriptors. 107 """ 108 self.stop_event.set() 109 os.write(self.signal_write_pipe, b'\0') 110 try: 111 os.close(self.new_file_descriptor) 112 except OSError as e: 113 if e.errno != FD_CLOSED: 114 warn( 115 "Error while trying to close the duplicated file descriptor:\n" 116 + f"{traceback.format_exc()}" 117 ) 118 119 try: 120 os.close(self.write_pipe) 121 except OSError as e: 122 if e.errno != FD_CLOSED: 123 warn( 124 "Error while trying to close the write-pipe " 125 + "to the intercepted file descriptor:\n" 126 + f"{traceback.format_exc()}" 127 ) 128 try: 129 os.close(self.read_pipe) 130 except OSError as e: 131 if e.errno != FD_CLOSED: 132 warn( 133 "Error while trying to close the read-pipe " 134 + "to the intercepted file descriptor:\n" 135 + f"{traceback.format_exc()}" 136 ) 137 138 try: 139 os.close(self.signal_read_pipe) 140 except OSError as e: 141 if e.errno != FD_CLOSED: 142 warn( 143 "Error while trying to close the signal-read-pipe " 144 + "to the intercepted file descriptor:\n" 145 + f"{traceback.format_exc()}" 146 ) 147 148 try: 149 os.close(self.signal_write_pipe) 150 except OSError as e: 151 if e.errno != FD_CLOSED: 152 warn( 153 "Error while trying to close the signal-write-pipe " 154 + "to the intercepted file descriptor:\n" 155 + f"{traceback.format_exc()}" 156 )
Close the new file descriptors.