meerschaum.utils.daemon.StdinFile

Create a file manager to pass STDIN to the Daemon.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Create a file manager to pass STDIN to the Daemon.
  7"""
  8
  9import io
 10import pathlib
 11import time
 12import os
 13import selectors
 14import traceback
 15
 16from meerschaum.utils.typing import Optional, Union
 17from meerschaum.utils.warnings import warn
 18
 19
 20class StdinFile(io.TextIOBase):
 21    """
 22    Redirect user input into a Daemon's context.
 23    """
 24    def __init__(
 25        self,
 26        file_path: Union[pathlib.Path, str],
 27        lock_file_path: Optional[pathlib.Path] = None,
 28    ):
 29        if isinstance(file_path, str):
 30            file_path = pathlib.Path(file_path)
 31
 32        self.file_path = file_path
 33        self.blocking_file_path = (
 34            lock_file_path
 35            if lock_file_path is not None
 36            else (file_path.parent / (file_path.name + '.block'))
 37        )
 38        self._file_handler = None
 39        self._fd = None
 40        self.sel = selectors.DefaultSelector()
 41
 42    @property
 43    def file_handler(self):
 44        """
 45        Return the read file handler to the provided file path.
 46        """
 47        if self._file_handler is not None:
 48            return self._file_handler
 49
 50        if self.file_path.exists():
 51            self.file_path.unlink()
 52
 53        self.file_path.parent.mkdir(parents=True, exist_ok=True)
 54        os.mkfifo(self.file_path.as_posix(), mode=0o600)
 55
 56        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
 57        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
 58        self.sel.register(self._file_handler, selectors.EVENT_READ)
 59        return self._file_handler
 60
 61    def write(self, data):
 62        if isinstance(data, str):
 63            data = data.encode('utf-8')
 64
 65        with open(self.file_path, 'wb') as f:
 66            f.write(data)
 67
 68    def fileno(self):
 69        fileno = self.file_handler.fileno()
 70        return fileno
 71
 72    def read(self, size=-1):
 73        """
 74        Read from the FIFO pipe, blocking on EOFError.
 75        """
 76        _ = self.file_handler
 77        while True:
 78            try:
 79                data = self._file_handler.read(size)
 80                if data:
 81                    try:
 82                        if self.blocking_file_path.exists():
 83                            self.blocking_file_path.unlink()
 84                    except Exception:
 85                        warn(traceback.format_exc())
 86                    return data.decode('utf-8')
 87            except (OSError, EOFError):
 88                pass
 89
 90            self.blocking_file_path.touch()
 91            time.sleep(0.1)
 92
 93    def readline(self, size=-1):
 94        line = ''
 95        while True:
 96            data = self.read(1)
 97            if not data or data == '\n':
 98                break
 99            line += data
100
101        return line
102
103    def close(self):
104        if self._file_handler is not None:
105            self.sel.unregister(self._file_handler)
106            self._file_handler.close()
107            os.close(self._fd)
108            self._file_handler = None
109            self._fd = None
110
111        super().close()
112
113    def is_open(self):
114        return self._file_handler is not None
115
116
117    def __str__(self) -> str:
118        return f"StdinFile('{self.file_path}')"
119
120    def __repr__(self) -> str:
121        return str(self)
class StdinFile(io.TextIOBase):
 21class StdinFile(io.TextIOBase):
 22    """
 23    Redirect user input into a Daemon's context.
 24    """
 25    def __init__(
 26        self,
 27        file_path: Union[pathlib.Path, str],
 28        lock_file_path: Optional[pathlib.Path] = None,
 29    ):
 30        if isinstance(file_path, str):
 31            file_path = pathlib.Path(file_path)
 32
 33        self.file_path = file_path
 34        self.blocking_file_path = (
 35            lock_file_path
 36            if lock_file_path is not None
 37            else (file_path.parent / (file_path.name + '.block'))
 38        )
 39        self._file_handler = None
 40        self._fd = None
 41        self.sel = selectors.DefaultSelector()
 42
 43    @property
 44    def file_handler(self):
 45        """
 46        Return the read file handler to the provided file path.
 47        """
 48        if self._file_handler is not None:
 49            return self._file_handler
 50
 51        if self.file_path.exists():
 52            self.file_path.unlink()
 53
 54        self.file_path.parent.mkdir(parents=True, exist_ok=True)
 55        os.mkfifo(self.file_path.as_posix(), mode=0o600)
 56
 57        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
 58        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
 59        self.sel.register(self._file_handler, selectors.EVENT_READ)
 60        return self._file_handler
 61
 62    def write(self, data):
 63        if isinstance(data, str):
 64            data = data.encode('utf-8')
 65
 66        with open(self.file_path, 'wb') as f:
 67            f.write(data)
 68
 69    def fileno(self):
 70        fileno = self.file_handler.fileno()
 71        return fileno
 72
 73    def read(self, size=-1):
 74        """
 75        Read from the FIFO pipe, blocking on EOFError.
 76        """
 77        _ = self.file_handler
 78        while True:
 79            try:
 80                data = self._file_handler.read(size)
 81                if data:
 82                    try:
 83                        if self.blocking_file_path.exists():
 84                            self.blocking_file_path.unlink()
 85                    except Exception:
 86                        warn(traceback.format_exc())
 87                    return data.decode('utf-8')
 88            except (OSError, EOFError):
 89                pass
 90
 91            self.blocking_file_path.touch()
 92            time.sleep(0.1)
 93
 94    def readline(self, size=-1):
 95        line = ''
 96        while True:
 97            data = self.read(1)
 98            if not data or data == '\n':
 99                break
100            line += data
101
102        return line
103
104    def close(self):
105        if self._file_handler is not None:
106            self.sel.unregister(self._file_handler)
107            self._file_handler.close()
108            os.close(self._fd)
109            self._file_handler = None
110            self._fd = None
111
112        super().close()
113
114    def is_open(self):
115        return self._file_handler is not None
116
117
118    def __str__(self) -> str:
119        return f"StdinFile('{self.file_path}')"
120
121    def __repr__(self) -> str:
122        return str(self)

Redirect user input into a Daemon's context.

StdinFile( file_path: Union[pathlib.Path, str], lock_file_path: Optional[pathlib.Path] = None)
25    def __init__(
26        self,
27        file_path: Union[pathlib.Path, str],
28        lock_file_path: Optional[pathlib.Path] = None,
29    ):
30        if isinstance(file_path, str):
31            file_path = pathlib.Path(file_path)
32
33        self.file_path = file_path
34        self.blocking_file_path = (
35            lock_file_path
36            if lock_file_path is not None
37            else (file_path.parent / (file_path.name + '.block'))
38        )
39        self._file_handler = None
40        self._fd = None
41        self.sel = selectors.DefaultSelector()
file_path
blocking_file_path
sel
file_handler
43    @property
44    def file_handler(self):
45        """
46        Return the read file handler to the provided file path.
47        """
48        if self._file_handler is not None:
49            return self._file_handler
50
51        if self.file_path.exists():
52            self.file_path.unlink()
53
54        self.file_path.parent.mkdir(parents=True, exist_ok=True)
55        os.mkfifo(self.file_path.as_posix(), mode=0o600)
56
57        self._fd = os.open(self.file_path, os.O_RDONLY | os.O_NONBLOCK)
58        self._file_handler = os.fdopen(self._fd, 'rb', buffering=0)
59        self.sel.register(self._file_handler, selectors.EVENT_READ)
60        return self._file_handler

Return the read file handler to the provided file path.

def write(self, data):
62    def write(self, data):
63        if isinstance(data, str):
64            data = data.encode('utf-8')
65
66        with open(self.file_path, 'wb') as f:
67            f.write(data)

Write string s to stream.

Return the number of characters written (which is always equal to the length of the string).

def fileno(self):
69    def fileno(self):
70        fileno = self.file_handler.fileno()
71        return fileno

Return underlying file descriptor if one exists.

Raise OSError if the IO object does not use a file descriptor.

def read(self, size=-1):
73    def read(self, size=-1):
74        """
75        Read from the FIFO pipe, blocking on EOFError.
76        """
77        _ = self.file_handler
78        while True:
79            try:
80                data = self._file_handler.read(size)
81                if data:
82                    try:
83                        if self.blocking_file_path.exists():
84                            self.blocking_file_path.unlink()
85                    except Exception:
86                        warn(traceback.format_exc())
87                    return data.decode('utf-8')
88            except (OSError, EOFError):
89                pass
90
91            self.blocking_file_path.touch()
92            time.sleep(0.1)

Read from the FIFO pipe, blocking on EOFError.

def readline(self, size=-1):
 94    def readline(self, size=-1):
 95        line = ''
 96        while True:
 97            data = self.read(1)
 98            if not data or data == '\n':
 99                break
100            line += data
101
102        return line

Read until newline or EOF.

Return an empty string if EOF is hit immediately. If size is specified, at most size characters will be read.

def close(self):
104    def close(self):
105        if self._file_handler is not None:
106            self.sel.unregister(self._file_handler)
107            self._file_handler.close()
108            os.close(self._fd)
109            self._file_handler = None
110            self._fd = None
111
112        super().close()

Flush and close the IO object.

This method has no effect if the file is already closed.

def is_open(self):
114    def is_open(self):
115        return self._file_handler is not None