meerschaum.utils.daemon.StdinFile
Create a file manager to pass STDIN to the Daemon.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Create a file manager to pass STDIN to the Daemon. 7""" 8 9import io 10import pathlib 11import time 12import os 13import selectors 14import traceback 15 16from meerschaum.utils.typing import Optional, Union 17from meerschaum.utils.warnings import warn 18 19 20class StdinFile(io.TextIOBase): 21 """ 22 Redirect user input into a Daemon's context. 23 """ 24 def __init__( 25 self, 26 file_path: Union[pathlib.Path, str], 27 lock_file_path: Optional[pathlib.Path] = None, 28 ): 29 if isinstance(file_path, str): 30 file_path = pathlib.Path(file_path) 31 32 self.file_path = file_path 33 self.blocking_file_path = ( 34 lock_file_path 35 if lock_file_path is not None 36 else (file_path.parent / (file_path.name + '.block')) 37 ) 38 self._file_handler = None 39 self._fd = None 40 self.sel = selectors.DefaultSelector() 41 42 @property 43 def file_handler(self): 44 """ 45 Return the read file handler to the provided file path. 46 """ 47 if self._file_handler is not None: 48 return self._file_handler 49 50 if self.file_path.exists(): 51 self.file_path.unlink() 52 53 self.file_path.parent.mkdir(parents=True, exist_ok=True) 54 os.mkfifo(self.file_path.as_posix(), mode=0o600) 55 56 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 57 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 58 self.sel.register(self._file_handler, selectors.EVENT_READ) 59 return self._file_handler 60 61 def write(self, data): 62 if isinstance(data, str): 63 data = data.encode('utf-8') 64 65 with open(self.file_path, 'wb') as f: 66 f.write(data) 67 68 def fileno(self): 69 fileno = self.file_handler.fileno() 70 return fileno 71 72 def read(self, size=-1): 73 """ 74 Read from the FIFO pipe, blocking on EOFError. 75 """ 76 _ = self.file_handler 77 while True: 78 try: 79 data = self._file_handler.read(size) 80 if data: 81 try: 82 if self.blocking_file_path.exists(): 83 self.blocking_file_path.unlink() 84 except Exception: 85 warn(traceback.format_exc()) 86 return data.decode('utf-8') 87 except (OSError, EOFError): 88 pass 89 90 self.blocking_file_path.touch() 91 time.sleep(0.1) 92 93 def readline(self, size=-1): 94 line = '' 95 while True: 96 data = self.read(1) 97 if not data or data == '\n': 98 break 99 line += data 100 101 return line 102 103 def close(self): 104 if self._file_handler is not None: 105 self.sel.unregister(self._file_handler) 106 self._file_handler.close() 107 os.close(self._fd) 108 self._file_handler = None 109 self._fd = None 110 111 super().close() 112 113 def is_open(self): 114 return self._file_handler is not None 115 116 117 def __str__(self) -> str: 118 return f"StdinFile('{self.file_path}')" 119 120 def __repr__(self) -> str: 121 return str(self)
class
StdinFile(io.TextIOBase):
21class StdinFile(io.TextIOBase): 22 """ 23 Redirect user input into a Daemon's context. 24 """ 25 def __init__( 26 self, 27 file_path: Union[pathlib.Path, str], 28 lock_file_path: Optional[pathlib.Path] = None, 29 ): 30 if isinstance(file_path, str): 31 file_path = pathlib.Path(file_path) 32 33 self.file_path = file_path 34 self.blocking_file_path = ( 35 lock_file_path 36 if lock_file_path is not None 37 else (file_path.parent / (file_path.name + '.block')) 38 ) 39 self._file_handler = None 40 self._fd = None 41 self.sel = selectors.DefaultSelector() 42 43 @property 44 def file_handler(self): 45 """ 46 Return the read file handler to the provided file path. 47 """ 48 if self._file_handler is not None: 49 return self._file_handler 50 51 if self.file_path.exists(): 52 self.file_path.unlink() 53 54 self.file_path.parent.mkdir(parents=True, exist_ok=True) 55 os.mkfifo(self.file_path.as_posix(), mode=0o600) 56 57 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 58 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 59 self.sel.register(self._file_handler, selectors.EVENT_READ) 60 return self._file_handler 61 62 def write(self, data): 63 if isinstance(data, str): 64 data = data.encode('utf-8') 65 66 with open(self.file_path, 'wb') as f: 67 f.write(data) 68 69 def fileno(self): 70 fileno = self.file_handler.fileno() 71 return fileno 72 73 def read(self, size=-1): 74 """ 75 Read from the FIFO pipe, blocking on EOFError. 76 """ 77 _ = self.file_handler 78 while True: 79 try: 80 data = self._file_handler.read(size) 81 if data: 82 try: 83 if self.blocking_file_path.exists(): 84 self.blocking_file_path.unlink() 85 except Exception: 86 warn(traceback.format_exc()) 87 return data.decode('utf-8') 88 except (OSError, EOFError): 89 pass 90 91 self.blocking_file_path.touch() 92 time.sleep(0.1) 93 94 def readline(self, size=-1): 95 line = '' 96 while True: 97 data = self.read(1) 98 if not data or data == '\n': 99 break 100 line += data 101 102 return line 103 104 def close(self): 105 if self._file_handler is not None: 106 self.sel.unregister(self._file_handler) 107 self._file_handler.close() 108 os.close(self._fd) 109 self._file_handler = None 110 self._fd = None 111 112 super().close() 113 114 def is_open(self): 115 return self._file_handler is not None 116 117 118 def __str__(self) -> str: 119 return f"StdinFile('{self.file_path}')" 120 121 def __repr__(self) -> str: 122 return str(self)
Redirect user input into a Daemon's context.
StdinFile( file_path: Union[pathlib.Path, str], lock_file_path: Optional[pathlib.Path] = None)
25 def __init__( 26 self, 27 file_path: Union[pathlib.Path, str], 28 lock_file_path: Optional[pathlib.Path] = None, 29 ): 30 if isinstance(file_path, str): 31 file_path = pathlib.Path(file_path) 32 33 self.file_path = file_path 34 self.blocking_file_path = ( 35 lock_file_path 36 if lock_file_path is not None 37 else (file_path.parent / (file_path.name + '.block')) 38 ) 39 self._file_handler = None 40 self._fd = None 41 self.sel = selectors.DefaultSelector()
file_handler
43 @property 44 def file_handler(self): 45 """ 46 Return the read file handler to the provided file path. 47 """ 48 if self._file_handler is not None: 49 return self._file_handler 50 51 if self.file_path.exists(): 52 self.file_path.unlink() 53 54 self.file_path.parent.mkdir(parents=True, exist_ok=True) 55 os.mkfifo(self.file_path.as_posix(), mode=0o600) 56 57 self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK) 58 self._file_handler = os.fdopen(self._fd, 'rb', buffering=0) 59 self.sel.register(self._file_handler, selectors.EVENT_READ) 60 return self._file_handler
Return the read file handler to the provided file path.
def
write(self, data):
62 def write(self, data): 63 if isinstance(data, str): 64 data = data.encode('utf-8') 65 66 with open(self.file_path, 'wb') as f: 67 f.write(data)
Write string s to stream.
Return the number of characters written (which is always equal to the length of the string).
def
fileno(self):
Return underlying file descriptor if one exists.
Raise OSError if the IO object does not use a file descriptor.
def
read(self, size=-1):
73 def read(self, size=-1): 74 """ 75 Read from the FIFO pipe, blocking on EOFError. 76 """ 77 _ = self.file_handler 78 while True: 79 try: 80 data = self._file_handler.read(size) 81 if data: 82 try: 83 if self.blocking_file_path.exists(): 84 self.blocking_file_path.unlink() 85 except Exception: 86 warn(traceback.format_exc()) 87 return data.decode('utf-8') 88 except (OSError, EOFError): 89 pass 90 91 self.blocking_file_path.touch() 92 time.sleep(0.1)
Read from the FIFO pipe, blocking on EOFError.
def
readline(self, size=-1):
94 def readline(self, size=-1): 95 line = '' 96 while True: 97 data = self.read(1) 98 if not data or data == '\n': 99 break 100 line += data 101 102 return line
Read until newline or EOF.
Return an empty string if EOF is hit immediately. If size is specified, at most size characters will be read.
def
close(self):
104 def close(self): 105 if self._file_handler is not None: 106 self.sel.unregister(self._file_handler) 107 self._file_handler.close() 108 os.close(self._fd) 109 self._file_handler = None 110 self._fd = None 111 112 super().close()
Flush and close the IO object.
This method has no effect if the file is already closed.