meerschaum.utils.daemon.StdinFile

Create a file manager to pass STDIN to the Daemon.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Create a file manager to pass STDIN to the Daemon.
  7"""
  8
  9import io
 10import pathlib
 11import time
 12import os
 13import selectors
 14import traceback
 15
 16import meerschaum as mrsm
 17from meerschaum.utils.typing import Optional, Union
 18from meerschaum.utils.warnings import warn
 19
 20
 21class StdinFile(io.TextIOBase):
 22    """
 23    Redirect user input into a Daemon's context.
 24    """
 25    def __init__(
 26        self,
 27        file_path: Union[pathlib.Path, str],
 28        lock_file_path: Optional[pathlib.Path] = None,
 29        decode: bool = True,
 30        refresh_seconds: Union[int, float, None] = None,
 31    ):
 32        if isinstance(file_path, str):
 33            file_path = pathlib.Path(file_path)
 34
 35        self.file_path = file_path
 36        self.blocking_file_path = (
 37            lock_file_path
 38            if lock_file_path is not None
 39            else (file_path.parent / (file_path.name + '.block'))
 40        )
 41        self._file_handler = None
 42        self._fd = None
 43        self.sel = selectors.DefaultSelector()
 44        self.decode = decode
 45        self._write_fp = None
 46        self._refresh_seconds = refresh_seconds
 47
 48    @property
 49    def encoding(self):
 50        return 'utf-8'
 51
 52    @property
 53    def file_handler(self):
 54        """
 55        Return the read file handler to the provided file path.
 56        """
 57        if self._file_handler is not None:
 58            return self._file_handler
 59
 60        if not self.file_path.exists():
 61            self.file_path.parent.mkdir(parents=True, exist_ok=True)
 62            os.mkfifo(self.file_path.as_posix(), mode=0o600)
 63
 64        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
 65        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
 66        self.sel.register(self._file_handler, selectors.EVENT_READ)
 67        return self._file_handler
 68
 69    def write(self, data):
 70        if self._write_fp is None:
 71            self.file_path.parent.mkdir(parents=True, exist_ok=True)
 72            if not self.file_path.exists():
 73                os.mkfifo(self.file_path.as_posix(), mode=0o600)
 74            self._write_fp = open(self.file_path, 'wb')
 75
 76        if isinstance(data, str):
 77            data = data.encode('utf-8')
 78        try:
 79            self._write_fp.write(data)
 80            self._write_fp.flush()
 81        except BrokenPipeError:
 82            pass
 83
 84    def fileno(self):
 85        fileno = self.file_handler.fileno()
 86        return fileno
 87
 88    def read(self, size=-1):
 89        """
 90        Read from the FIFO pipe, blocking on EOFError.
 91        """
 92        _ = self.file_handler
 93        while True:
 94            try:
 95                data = self._file_handler.read(size)
 96                if data:
 97                    try:
 98                        if self.blocking_file_path.exists():
 99                            self.blocking_file_path.unlink()
100                    except Exception:
101                        warn(traceback.format_exc())
102                    return data.decode('utf-8') if self.decode else data
103            except (OSError, EOFError):
104                pass
105
106            if not self.blocking_file_path.exists():
107                self.blocking_file_path.touch()
108            time.sleep(self.refresh_seconds)
109
110    def readline(self, size=-1):
111        line = '' if self.decode else b''
112        while True:
113            data = self.read(1)
114            if not data or ((data == '\n') if self.decode else (data == b'\n')):
115                break
116            line += data
117
118        return line
119
120    def close(self):
121        if self._file_handler is not None:
122            self.sel.unregister(self._file_handler)
123            self._file_handler.close()
124            try:
125                os.close(self._fd)
126            except OSError:
127                pass
128            self._file_handler = None
129            self._fd = None
130
131        if self._write_fp is not None:
132            try:
133                self._write_fp.close()
134            except BrokenPipeError:
135                pass
136            self._write_fp = None
137
138        try:
139            if self.blocking_file_path.exists():
140                self.blocking_file_path.unlink()
141        except Exception:
142            pass
143        super().close()
144
145    def is_open(self):
146        return self._file_handler is not None
147
148    def isatty(self) -> bool:
149        return False
150
151    @property
152    def refresh_seconds(self) -> Union[int, float]:
153        """
154        How many seconds between checking for blocking functions.
155        """
156        if not self._refresh_seconds:
157            self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds')
158        return self._refresh_seconds
159
160    def __str__(self) -> str:
161        return f"StdinFile('{self.file_path}')"
162
163    def __repr__(self) -> str:
164        return str(self)
class StdinFile(io.TextIOBase):
 22class StdinFile(io.TextIOBase):
 23    """
 24    Redirect user input into a Daemon's context.
 25    """
 26    def __init__(
 27        self,
 28        file_path: Union[pathlib.Path, str],
 29        lock_file_path: Optional[pathlib.Path] = None,
 30        decode: bool = True,
 31        refresh_seconds: Union[int, float, None] = None,
 32    ):
 33        if isinstance(file_path, str):
 34            file_path = pathlib.Path(file_path)
 35
 36        self.file_path = file_path
 37        self.blocking_file_path = (
 38            lock_file_path
 39            if lock_file_path is not None
 40            else (file_path.parent / (file_path.name + '.block'))
 41        )
 42        self._file_handler = None
 43        self._fd = None
 44        self.sel = selectors.DefaultSelector()
 45        self.decode = decode
 46        self._write_fp = None
 47        self._refresh_seconds = refresh_seconds
 48
 49    @property
 50    def encoding(self):
 51        return 'utf-8'
 52
 53    @property
 54    def file_handler(self):
 55        """
 56        Return the read file handler to the provided file path.
 57        """
 58        if self._file_handler is not None:
 59            return self._file_handler
 60
 61        if not self.file_path.exists():
 62            self.file_path.parent.mkdir(parents=True, exist_ok=True)
 63            os.mkfifo(self.file_path.as_posix(), mode=0o600)
 64
 65        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
 66        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
 67        self.sel.register(self._file_handler, selectors.EVENT_READ)
 68        return self._file_handler
 69
 70    def write(self, data):
 71        if self._write_fp is None:
 72            self.file_path.parent.mkdir(parents=True, exist_ok=True)
 73            if not self.file_path.exists():
 74                os.mkfifo(self.file_path.as_posix(), mode=0o600)
 75            self._write_fp = open(self.file_path, 'wb')
 76
 77        if isinstance(data, str):
 78            data = data.encode('utf-8')
 79        try:
 80            self._write_fp.write(data)
 81            self._write_fp.flush()
 82        except BrokenPipeError:
 83            pass
 84
 85    def fileno(self):
 86        fileno = self.file_handler.fileno()
 87        return fileno
 88
 89    def read(self, size=-1):
 90        """
 91        Read from the FIFO pipe, blocking on EOFError.
 92        """
 93        _ = self.file_handler
 94        while True:
 95            try:
 96                data = self._file_handler.read(size)
 97                if data:
 98                    try:
 99                        if self.blocking_file_path.exists():
100                            self.blocking_file_path.unlink()
101                    except Exception:
102                        warn(traceback.format_exc())
103                    return data.decode('utf-8') if self.decode else data
104            except (OSError, EOFError):
105                pass
106
107            if not self.blocking_file_path.exists():
108                self.blocking_file_path.touch()
109            time.sleep(self.refresh_seconds)
110
111    def readline(self, size=-1):
112        line = '' if self.decode else b''
113        while True:
114            data = self.read(1)
115            if not data or ((data == '\n') if self.decode else (data == b'\n')):
116                break
117            line += data
118
119        return line
120
121    def close(self):
122        if self._file_handler is not None:
123            self.sel.unregister(self._file_handler)
124            self._file_handler.close()
125            try:
126                os.close(self._fd)
127            except OSError:
128                pass
129            self._file_handler = None
130            self._fd = None
131
132        if self._write_fp is not None:
133            try:
134                self._write_fp.close()
135            except BrokenPipeError:
136                pass
137            self._write_fp = None
138
139        try:
140            if self.blocking_file_path.exists():
141                self.blocking_file_path.unlink()
142        except Exception:
143            pass
144        super().close()
145
146    def is_open(self):
147        return self._file_handler is not None
148
149    def isatty(self) -> bool:
150        return False
151
152    @property
153    def refresh_seconds(self) -> Union[int, float]:
154        """
155        How many seconds between checking for blocking functions.
156        """
157        if not self._refresh_seconds:
158            self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds')
159        return self._refresh_seconds
160
161    def __str__(self) -> str:
162        return f"StdinFile('{self.file_path}')"
163
164    def __repr__(self) -> str:
165        return str(self)

Redirect user input into a Daemon's context.

StdinFile( file_path: Union[pathlib.Path, str], lock_file_path: Optional[pathlib.Path] = None, decode: bool = True, refresh_seconds: Union[int, float, NoneType] = None)
26    def __init__(
27        self,
28        file_path: Union[pathlib.Path, str],
29        lock_file_path: Optional[pathlib.Path] = None,
30        decode: bool = True,
31        refresh_seconds: Union[int, float, None] = None,
32    ):
33        if isinstance(file_path, str):
34            file_path = pathlib.Path(file_path)
35
36        self.file_path = file_path
37        self.blocking_file_path = (
38            lock_file_path
39            if lock_file_path is not None
40            else (file_path.parent / (file_path.name + '.block'))
41        )
42        self._file_handler = None
43        self._fd = None
44        self.sel = selectors.DefaultSelector()
45        self.decode = decode
46        self._write_fp = None
47        self._refresh_seconds = refresh_seconds
file_path
blocking_file_path
sel
decode
encoding
49    @property
50    def encoding(self):
51        return 'utf-8'

Encoding of the text stream.

Subclasses should override.

file_handler
53    @property
54    def file_handler(self):
55        """
56        Return the read file handler to the provided file path.
57        """
58        if self._file_handler is not None:
59            return self._file_handler
60
61        if not self.file_path.exists():
62            self.file_path.parent.mkdir(parents=True, exist_ok=True)
63            os.mkfifo(self.file_path.as_posix(), mode=0o600)
64
65        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
66        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
67        self.sel.register(self._file_handler, selectors.EVENT_READ)
68        return self._file_handler

Return the read file handler to the provided file path.

def write(self, data):
70    def write(self, data):
71        if self._write_fp is None:
72            self.file_path.parent.mkdir(parents=True, exist_ok=True)
73            if not self.file_path.exists():
74                os.mkfifo(self.file_path.as_posix(), mode=0o600)
75            self._write_fp = open(self.file_path, 'wb')
76
77        if isinstance(data, str):
78            data = data.encode('utf-8')
79        try:
80            self._write_fp.write(data)
81            self._write_fp.flush()
82        except BrokenPipeError:
83            pass

Write string s to stream.

Return the number of characters written (which is always equal to the length of the string).

def fileno(self):
85    def fileno(self):
86        fileno = self.file_handler.fileno()
87        return fileno

Return underlying file descriptor if one exists.

Raise OSError if the IO object does not use a file descriptor.

def read(self, size=-1):
 89    def read(self, size=-1):
 90        """
 91        Read from the FIFO pipe, blocking on EOFError.
 92        """
 93        _ = self.file_handler
 94        while True:
 95            try:
 96                data = self._file_handler.read(size)
 97                if data:
 98                    try:
 99                        if self.blocking_file_path.exists():
100                            self.blocking_file_path.unlink()
101                    except Exception:
102                        warn(traceback.format_exc())
103                    return data.decode('utf-8') if self.decode else data
104            except (OSError, EOFError):
105                pass
106
107            if not self.blocking_file_path.exists():
108                self.blocking_file_path.touch()
109            time.sleep(self.refresh_seconds)

Read from the FIFO pipe, blocking on EOFError.

def readline(self, size=-1):
111    def readline(self, size=-1):
112        line = '' if self.decode else b''
113        while True:
114            data = self.read(1)
115            if not data or ((data == '\n') if self.decode else (data == b'\n')):
116                break
117            line += data
118
119        return line

Read until newline or EOF.

Return an empty string if EOF is hit immediately. If size is specified, at most size characters will be read.

def close(self):
121    def close(self):
122        if self._file_handler is not None:
123            self.sel.unregister(self._file_handler)
124            self._file_handler.close()
125            try:
126                os.close(self._fd)
127            except OSError:
128                pass
129            self._file_handler = None
130            self._fd = None
131
132        if self._write_fp is not None:
133            try:
134                self._write_fp.close()
135            except BrokenPipeError:
136                pass
137            self._write_fp = None
138
139        try:
140            if self.blocking_file_path.exists():
141                self.blocking_file_path.unlink()
142        except Exception:
143            pass
144        super().close()

Flush and close the IO object.

This method has no effect if the file is already closed.

def is_open(self):
146    def is_open(self):
147        return self._file_handler is not None
def isatty(self) -> bool:
149    def isatty(self) -> bool:
150        return False

Return whether this is an 'interactive' stream.

Return False if it can't be determined.

refresh_seconds: Union[int, float]
152    @property
153    def refresh_seconds(self) -> Union[int, float]:
154        """
155        How many seconds between checking for blocking functions.
156        """
157        if not self._refresh_seconds:
158            self._refresh_seconds = mrsm.get_config('system', 'cli', 'refresh_seconds')
159        return self._refresh_seconds

How many seconds between checking for blocking functions.