meerschaum.utils.daemon.StdinFile
Create a file manager to pass STDIN to the Daemon.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Create a file manager to pass STDIN to the Daemon. 7""" 8 9import io 10import pathlib 11import time 12import os 13import selectors 14import traceback 15 16import meerschaum as mrsm 17from meerschaum.utils.typing import Optional, Union 18from meerschaum.utils.warnings import warn 19 20 21class StdinFile(io.TextIOBase): 22 """ 23 Redirect user input into a Daemon's context. 24 """ 25 def __init__( 26 self, 27 file_path: Union[pathlib.Path, str], 28 lock_file_path: Optional[pathlib.Path] = None, 29 decode: bool = True, 30 refresh_seconds: Union[int, float, None] = None, 31 ): 32 if isinstance(file_path, str): 33 file_path = pathlib.Path(file_path) 34 35 self.file_path = file_path 36 self.blocking_file_path = ( 37 lock_file_path 38 if lock_file_path is not None 39 else (file_path.parent / (file_path.name + '.block')) 40 ) 41 self._file_handler = None 42 self._fd = None 43 self.sel = selectors.DefaultSelector() 44 self.decode = decode 45 self._write_fp = None 46 self._refresh_seconds = refresh_seconds 47 48 @property 49 def encoding(self): 50 return 'utf-8' 51 52 @property 53 def file_handler(self): 54 """ 55 Return the read file handler to the provided file path. 56 """ 57 if self._file_handler is not None: 58 return self._file_handler 59 60 if not self.file_path.exists(): 61 self.file_path.parent.mkdir(parents=True, exist_ok=True) 62 os.mkfifo(self.file_path.as_posix(), mode=0o600) 63 64 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 65 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 66 self.sel.register(self._file_handler, selectors.EVENT_READ) 67 return self._file_handler 68 69 def write(self, data): 70 if self._write_fp is None: 71 self.file_path.parent.mkdir(parents=True, exist_ok=True) 72 if not self.file_path.exists(): 73 os.mkfifo(self.file_path.as_posix(), mode=0o600) 74 self._write_fp = open(self.file_path, 'wb') 75 76 if isinstance(data, str): 77 data = data.encode('utf-8') 78 try: 79 self._write_fp.write(data) 80 self._write_fp.flush() 81 except BrokenPipeError: 82 pass 83 84 def fileno(self): 85 fileno = self.file_handler.fileno() 86 return fileno 87 88 def read(self, size=-1): 89 """ 90 Read from the FIFO pipe, blocking on EOFError. 91 """ 92 _ = self.file_handler 93 while True: 94 try: 95 data = self._file_handler.read(size) 96 if data: 97 try: 98 if self.blocking_file_path.exists(): 99 self.blocking_file_path.unlink() 100 except Exception: 101 warn(traceback.format_exc()) 102 return data.decode('utf-8') if self.decode else data 103 except (OSError, EOFError): 104 pass 105 106 if not self.blocking_file_path.exists(): 107 self.blocking_file_path.touch() 108 time.sleep(self.refresh_seconds) 109 110 def readline(self, size=-1): 111 line = '' if self.decode else b'' 112 while True: 113 data = self.read(1) 114 if not data or ((data == '\n') if self.decode else (data == b'\n')): 115 break 116 line += data 117 118 return line 119 120 def close(self): 121 if self._file_handler is not None: 122 self.sel.unregister(self._file_handler) 123 self._file_handler.close() 124 try: 125 os.close(self._fd) 126 except OSError: 127 pass 128 self._file_handler = None 129 self._fd = None 130 131 if self._write_fp is not None: 132 try: 133 self._write_fp.close() 134 except BrokenPipeError: 135 pass 136 self._write_fp = None 137 138 try: 139 if self.blocking_file_path.exists(): 140 self.blocking_file_path.unlink() 141 except Exception: 142 pass 143 super().close() 144 145 def is_open(self): 146 return self._file_handler is not None 147 148 def isatty(self) -> bool: 149 return False 150 151 @property 152 def refresh_seconds(self) -> Union[int, float]: 153 """ 154 How many seconds between checking for blocking functions. 155 """ 156 if not self._refresh_seconds: 157 self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds') 158 return self._refresh_seconds 159 160 def __str__(self) -> str: 161 return f"StdinFile('{self.file_path}')" 162 163 def __repr__(self) -> str: 164 return str(self)
22class StdinFile(io.TextIOBase): 23 """ 24 Redirect user input into a Daemon's context. 25 """ 26 def __init__( 27 self, 28 file_path: Union[pathlib.Path, str], 29 lock_file_path: Optional[pathlib.Path] = None, 30 decode: bool = True, 31 refresh_seconds: Union[int, float, None] = None, 32 ): 33 if isinstance(file_path, str): 34 file_path = pathlib.Path(file_path) 35 36 self.file_path = file_path 37 self.blocking_file_path = ( 38 lock_file_path 39 if lock_file_path is not None 40 else (file_path.parent / (file_path.name + '.block')) 41 ) 42 self._file_handler = None 43 self._fd = None 44 self.sel = selectors.DefaultSelector() 45 self.decode = decode 46 self._write_fp = None 47 self._refresh_seconds = refresh_seconds 48 49 @property 50 def encoding(self): 51 return 'utf-8' 52 53 @property 54 def file_handler(self): 55 """ 56 Return the read file handler to the provided file path. 57 """ 58 if self._file_handler is not None: 59 return self._file_handler 60 61 if not self.file_path.exists(): 62 self.file_path.parent.mkdir(parents=True, exist_ok=True) 63 os.mkfifo(self.file_path.as_posix(), mode=0o600) 64 65 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 66 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 67 self.sel.register(self._file_handler, selectors.EVENT_READ) 68 return self._file_handler 69 70 def write(self, data): 71 if self._write_fp is None: 72 self.file_path.parent.mkdir(parents=True, exist_ok=True) 73 if not self.file_path.exists(): 74 os.mkfifo(self.file_path.as_posix(), mode=0o600) 75 self._write_fp = open(self.file_path, 'wb') 76 77 if isinstance(data, str): 78 data = data.encode('utf-8') 79 try: 80 self._write_fp.write(data) 81 self._write_fp.flush() 82 except BrokenPipeError: 83 pass 84 85 def fileno(self): 86 fileno = self.file_handler.fileno() 87 return fileno 88 89 def read(self, size=-1): 90 """ 91 Read from the FIFO pipe, blocking on EOFError. 92 """ 93 _ = self.file_handler 94 while True: 95 try: 96 data = self._file_handler.read(size) 97 if data: 98 try: 99 if self.blocking_file_path.exists(): 100 self.blocking_file_path.unlink() 101 except Exception: 102 warn(traceback.format_exc()) 103 return data.decode('utf-8') if self.decode else data 104 except (OSError, EOFError): 105 pass 106 107 if not self.blocking_file_path.exists(): 108 self.blocking_file_path.touch() 109 time.sleep(self.refresh_seconds) 110 111 def readline(self, size=-1): 112 line = '' if self.decode else b'' 113 while True: 114 data = self.read(1) 115 if not data or ((data == '\n') if self.decode else (data == b'\n')): 116 break 117 line += data 118 119 return line 120 121 def close(self): 122 if self._file_handler is not None: 123 self.sel.unregister(self._file_handler) 124 self._file_handler.close() 125 try: 126 os.close(self._fd) 127 except OSError: 128 pass 129 self._file_handler = None 130 self._fd = None 131 132 if self._write_fp is not None: 133 try: 134 self._write_fp.close() 135 except BrokenPipeError: 136 pass 137 self._write_fp = None 138 139 try: 140 if self.blocking_file_path.exists(): 141 self.blocking_file_path.unlink() 142 except Exception: 143 pass 144 super().close() 145 146 def is_open(self): 147 return self._file_handler is not None 148 149 def isatty(self) -> bool: 150 return False 151 152 @property 153 def refresh_seconds(self) -> Union[int, float]: 154 """ 155 How many seconds between checking for blocking functions. 156 """ 157 if not self._refresh_seconds: 158 self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds') 159 return self._refresh_seconds 160 161 def __str__(self) -> str: 162 return f"StdinFile('{self.file_path}')" 163 164 def __repr__(self) -> str: 165 return str(self)
Redirect user input into a Daemon's context.
26 def __init__( 27 self, 28 file_path: Union[pathlib.Path, str], 29 lock_file_path: Optional[pathlib.Path] = None, 30 decode: bool = True, 31 refresh_seconds: Union[int, float, None] = None, 32 ): 33 if isinstance(file_path, str): 34 file_path = pathlib.Path(file_path) 35 36 self.file_path = file_path 37 self.blocking_file_path = ( 38 lock_file_path 39 if lock_file_path is not None 40 else (file_path.parent / (file_path.name + '.block')) 41 ) 42 self._file_handler = None 43 self._fd = None 44 self.sel = selectors.DefaultSelector() 45 self.decode = decode 46 self._write_fp = None 47 self._refresh_seconds = refresh_seconds
53 @property 54 def file_handler(self): 55 """ 56 Return the read file handler to the provided file path. 57 """ 58 if self._file_handler is not None: 59 return self._file_handler 60 61 if not self.file_path.exists(): 62 self.file_path.parent.mkdir(parents=True, exist_ok=True) 63 os.mkfifo(self.file_path.as_posix(), mode=0o600) 64 65 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 66 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 67 self.sel.register(self._file_handler, selectors.EVENT_READ) 68 return self._file_handler
Return the read file handler to the provided file path.
70 def write(self, data): 71 if self._write_fp is None: 72 self.file_path.parent.mkdir(parents=True, exist_ok=True) 73 if not self.file_path.exists(): 74 os.mkfifo(self.file_path.as_posix(), mode=0o600) 75 self._write_fp = open(self.file_path, 'wb') 76 77 if isinstance(data, str): 78 data = data.encode('utf-8') 79 try: 80 self._write_fp.write(data) 81 self._write_fp.flush() 82 except BrokenPipeError: 83 pass
Write string s to stream.
Return the number of characters written (which is always equal to the length of the string).
Return underlying file descriptor if one exists.
Raise OSError if the IO object does not use a file descriptor.
89 def read(self, size=-1): 90 """ 91 Read from the FIFO pipe, blocking on EOFError. 92 """ 93 _ = self.file_handler 94 while True: 95 try: 96 data = self._file_handler.read(size) 97 if data: 98 try: 99 if self.blocking_file_path.exists(): 100 self.blocking_file_path.unlink() 101 except Exception: 102 warn(traceback.format_exc()) 103 return data.decode('utf-8') if self.decode else data 104 except (OSError, EOFError): 105 pass 106 107 if not self.blocking_file_path.exists(): 108 self.blocking_file_path.touch() 109 time.sleep(self.refresh_seconds)
Read from the FIFO pipe, blocking on EOFError.
111 def readline(self, size=-1): 112 line = '' if self.decode else b'' 113 while True: 114 data = self.read(1) 115 if not data or ((data == '\n') if self.decode else (data == b'\n')): 116 break 117 line += data 118 119 return line
Read until newline or EOF.
Return an empty string if EOF is hit immediately. If size is specified, at most size characters will be read.
121 def close(self): 122 if self._file_handler is not None: 123 self.sel.unregister(self._file_handler) 124 self._file_handler.close() 125 try: 126 os.close(self._fd) 127 except OSError: 128 pass 129 self._file_handler = None 130 self._fd = None 131 132 if self._write_fp is not None: 133 try: 134 self._write_fp.close() 135 except BrokenPipeError: 136 pass 137 self._write_fp = None 138 139 try: 140 if self.blocking_file_path.exists(): 141 self.blocking_file_path.unlink() 142 except Exception: 143 pass 144 super().close()
Flush and close the IO object.
This method has no effect if the file is already closed.
Return whether this is an 'interactive' stream.
Return False if it can't be determined.
152 @property 153 def refresh_seconds(self) -> Union[int, float]: 154 """ 155 How many seconds between checking for blocking functions. 156 """ 157 if not self._refresh_seconds: 158 self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds') 159 return self._refresh_seconds
How many seconds between checking for blocking functions.