meerschaum.utils.daemon.RotatingFile

Create a file-like object that manages sub-files under the hood.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Create a file-like object that manages sub-files under the hood.
  7"""
  8
  9import os
 10import io
 11import re
 12import pathlib
 13import traceback
 14import sys
 15import atexit
 16from datetime import datetime, timezone
 17from typing import List, Optional, Tuple, Callable
 18from meerschaum.config import get_config
 19from meerschaum.utils.warnings import warn
 20from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor
 21from meerschaum.utils.threading import Thread
 22import meerschaum as mrsm
 23import threading
 24daemon = mrsm.attempt_import('daemon')
 25
 26class RotatingFile(io.IOBase):
 27    """
 28    A `RotatingFile` may be treated like a normal file-like object.
 29    Under the hood, however, it will create new sub-files and delete old ones.
 30    """
 31
 32    SEEK_BACK_ATTEMPTS: int = 5
 33
 34    def __init__(
 35        self,
 36        file_path: pathlib.Path,
 37        num_files_to_keep: Optional[int] = None,
 38        max_file_size: Optional[int] = None,
 39        redirect_streams: bool = False,
 40        write_timestamps: bool = False,
 41        timestamp_format: Optional[str] = None,
 42        write_callback: Optional[Callable[[str], None]] = None,
 43    ):
 44        """
 45        Create a file-like object which manages other files.
 46
 47        Parameters
 48        ----------
 49        num_files_to_keep: int, default None
 50            How many sub-files to keep at any given time.
 51            Defaults to the configured value (5).
 52
 53        max_file_size: int, default None
 54            How large in bytes each sub-file can grow before another file is created.
 55            Note that this is not a hard limit but rather a threshold
 56            which may be slightly exceeded.
 57            Defaults to the configured value (100_000).
 58
 59        redirect_streams: bool, default False
 60            If `True`, redirect previous file streams when opening a new file descriptor.
 61            
 62            NOTE: Only set this to `True` if you are entering into a daemon context.
 63            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 64
 65        write_timestamps: bool, default False
 66            If `True`, prepend the current UTC timestamp to each line of the file.
 67
 68        timestamp_format: str, default None
 69            If `write_timestamps` is `True`, use this format for the timestamps.
 70            Defaults to `'%Y-%m-%d %H:%M'`.
 71
 72        write_callback: Optional[Callable[[str], None]], default None
 73            If provided, execute this callback with the data to be written.
 74        """
 75        self.file_path = pathlib.Path(file_path)
 76        if num_files_to_keep is None:
 77            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 78        if max_file_size is None:
 79            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 80        if timestamp_format is None:
 81            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 82        if num_files_to_keep < 1:
 83            raise ValueError("At least 1 file must be kept.")
 84        if max_file_size < 100:
 85            raise ValueError("Subfiles must contain at least 100 bytes.")
 86
 87        self.num_files_to_keep = num_files_to_keep
 88        self.max_file_size = max_file_size
 89        self.redirect_streams = redirect_streams
 90        self.write_timestamps = write_timestamps
 91        self.timestamp_format = timestamp_format
 92        self.write_callback = write_callback
 93        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?')
 94
 95        ### When subfiles are opened, map from their index to the file objects.
 96        self.subfile_objects = {}
 97        self._redirected_subfile_objects = {}
 98        self._current_file_obj = None
 99        self._previous_file_obj = None
100
101        ### When reading, keep track of the file index and position.
102        self._cursor: Tuple[int, int] = (0, 0)
103
104        ### Don't forget to close any stray files.
105        atexit.register(self.close)
106
107
108    def fileno(self):
109        """
110        Return the file descriptor for the latest subfile.
111        """
112        self.refresh_files(start_interception=False)
113        return self._current_file_obj.fileno()
114
115
116    def get_latest_subfile_path(self) -> pathlib.Path:
117        """
118        Return the path for the latest subfile to which to write into.
119        """
120        return self.get_subfile_path_from_index(
121            self.get_latest_subfile_index()
122        )
123
124
125    def get_remaining_subfile_size(self, subfile_index: int) -> int:
126        """
127        Return the remaining buffer size for a subfile.
128
129        Parameters
130        ---------
131        subfile_index: int
132            The index of the subfile to be checked.
133
134        Returns
135        -------
136        The remaining size in bytes.
137        """
138        subfile_path = self.get_subfile_path_from_index(subfile_index)
139        if not subfile_path.exists():
140            return self.max_file_size
141
142        return self.max_file_size - os.path.getsize(subfile_path)
143
144
145    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
146        """
147        Return whether a given subfile is too large.
148
149        Parameters
150        ----------
151        subfile_index: int
152            The index of the subfile to be checked.
153
154        potential_new_len: int, default 0
155            The length of a potential write of new data.
156
157        Returns
158        -------
159        A bool indicating the subfile is or will be too large.
160        """
161        subfile_path = self.get_subfile_path_from_index(subfile_index)
162        if not subfile_path.exists():
163            return False
164
165        self.flush()
166
167        return (
168            (os.path.getsize(subfile_path) + potential_new_len)
169            >=
170            self.max_file_size
171        )
172
173
174    def get_latest_subfile_index(self) -> int:
175        """
176        Return the latest existing subfile index.
177        If no index may be found, return -1.
178        """
179        existing_subfile_paths = self.get_existing_subfile_paths()
180        latest_index = (
181            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
182            if existing_subfile_paths
183            else 0
184        )
185        return latest_index
186
187
188    def get_index_from_subfile_name(self, subfile_name: str) -> int:
189        """
190        Return the index from a given subfile name.
191        If the file name cannot be parsed, return -1.
192        """
193        try:
194            return int(subfile_name.replace(self.file_path.name + '.', ''))
195        except Exception:
196            return -1
197
198
199    def get_subfile_name_from_index(self, subfile_index: int) -> str:
200        """
201        Return the subfile name from the given index.
202        """
203        return f'{self.file_path.name}.{subfile_index}'
204
205
206    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
207        """
208        Return the subfile's path from its index.
209        """
210        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
211
212
213    def get_existing_subfile_indices(self) -> List[int]:
214        """
215        Return of list of subfile indices which exist on disk.
216        """
217        existing_subfile_paths = self.get_existing_subfile_paths()
218        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
219
220
221    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
222        """
223        Return a list of file paths that match the input filename pattern.
224        """
225        if not self.file_path.parent.exists():
226            return []
227
228        subfile_names_indices = sorted(
229            [
230                (file_name, self.get_index_from_subfile_name(file_name))
231                for file_name in os.listdir(self.file_path.parent)
232                if (
233                    file_name.startswith(self.file_path.name)
234                    and re.match(self.subfile_regex_pattern, file_name)
235                )
236            ],
237            key=lambda x: x[1],
238        )
239        return [
240            (self.file_path.parent / file_name)
241            for file_name, _ in subfile_names_indices
242        ]
243
244
245    def refresh_files(
246        self,
247        potential_new_len: int = 0,
248        start_interception: bool = False,
249    ) -> '_io.TextUIWrapper':
250        """
251        Check the state of the subfiles.
252        If the latest subfile is too large, create a new file and delete old ones.
253
254        Parameters
255        ----------
256        potential_new_len: int, default 0
257
258        start_interception: bool, default False
259            If `True`, kick off the file interception threads.
260        """
261        self.flush()
262
263        latest_subfile_index = self.get_latest_subfile_index()
264        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
265
266        ### First run with existing log files: open the most recent log file.
267        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
268
269        ### Sometimes a new file is created but output doesn't switch over.
270        lost_latest_handle = (
271            self._current_file_obj is not None
272            and
273            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
274        )
275        if is_first_run_with_logs or lost_latest_handle:
276            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
277            if self.redirect_streams:
278                try:
279                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
280                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
281                except OSError:
282                    warn(
283                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
284                    )
285                if start_interception and self.write_timestamps:
286                    self.start_log_fd_interception()
287
288        create_new_file = (
289            (latest_subfile_index == -1)
290            or
291            self._current_file_obj is None
292            or
293            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
294        )
295        if create_new_file:
296            self.increment_subfiles()
297
298        return self._current_file_obj
299
300    def increment_subfiles(self, increment_by: int = 1):
301        """
302        Create a new subfile and switch the file pointer over.
303        """
304        latest_subfile_index = self.get_latest_subfile_index()
305        old_subfile_index = latest_subfile_index
306        new_subfile_index = old_subfile_index + increment_by
307        new_file_path = self.get_subfile_path_from_index(new_subfile_index)
308        self._previous_file_obj = self._current_file_obj
309        self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
310        self.subfile_objects[new_subfile_index] = self._current_file_obj
311        self.flush()
312
313        if self.redirect_streams:
314            if self._previous_file_obj is not None:
315                self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
316                daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
317            daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
318            daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
319        self.close(unused_only=True)
320
321        ### Sanity check in case writing somehow fails.
322        if self._previous_file_obj is self._current_file_obj:
323            self._previous_file_obj = None
324
325        self.delete(unused_only=True)
326
327    def close(self, unused_only: bool = False) -> None:
328        """
329        Close any open file descriptors.
330
331        Parameters
332        ----------
333        unused_only: bool, default False
334            If `True`, only close file descriptors not currently in use.
335        """
336        self.stop_log_fd_interception(unused_only=unused_only)
337        subfile_indices = sorted(self.subfile_objects.keys())
338        for subfile_index in subfile_indices:
339            subfile_object = self.subfile_objects[subfile_index]
340            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
341                continue
342            try:
343                if not subfile_object.closed:
344                    subfile_object.close()
345            except Exception:
346                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
347
348            _ = self.subfile_objects.pop(subfile_index, None)
349            if self.redirect_streams:
350                _ = self._redirected_subfile_objects.pop(subfile_index, None)
351
352        if not unused_only:
353            self._previous_file_obj = None
354            self._current_file_obj = None
355
356
357    def get_timestamp_prefix_str(self) -> str:
358        """
359        Return the current minute prefix string.
360        """
361        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
362
363
364    def write(self, data: str) -> None:
365        """
366        Write the given text into the latest subfile.
367        If the subfile will be too large, create a new subfile.
368        If too many subfiles exist at once, the oldest one will be deleted.
369
370        NOTE: This will not split data across multiple files.
371        As such, if data is larger than max_file_size, then the corresponding subfile
372        may exceed this limit.
373        """
374        try:
375            if callable(self.write_callback):
376                self.write_callback(data)
377        except Exception:
378            warn(f"Failed to execute write callback:\n{traceback.format_exc()}")
379
380        try:
381            self.file_path.parent.mkdir(exist_ok=True, parents=True)
382            if isinstance(data, bytes):
383                data = data.decode('utf-8')
384
385            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
386            suffix_str = "\n" if self.write_timestamps else ""
387            self.refresh_files(
388                potential_new_len = len(prefix_str + data + suffix_str),
389                start_interception = self.write_timestamps,
390            )
391            try:
392                if prefix_str:
393                    self._current_file_obj.write(prefix_str)
394                self._current_file_obj.write(data)
395                if suffix_str:
396                    self._current_file_obj.write(suffix_str)
397            except BrokenPipeError:
398                warn("BrokenPipeError encountered. The daemon may have been terminated.")
399                return
400            except Exception:
401                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
402            self.flush()
403            self.delete(unused_only=True)
404        except Exception as e:
405            warn(f"Unexpected error in RotatingFile.write: {e}")
406
407
408    def delete(self, unused_only: bool = False) -> None:
409        """
410        Delete old subfiles.
411
412        Parameters
413        ----------
414        unused_only: bool, default False
415            If `True`, only delete subfiles which are no longer needed.
416        """
417        existing_subfile_paths = self.get_existing_subfile_paths()
418        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
419            return
420
421        self.flush()
422        self.close(unused_only=unused_only)
423
424        end_ix = (
425            (-1 * self.num_files_to_keep)
426            if unused_only
427            else len(existing_subfile_paths)
428        )
429        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
430            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
431
432            try:
433                subfile_path_to_delete.unlink()
434            except Exception:
435                warn(
436                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
437                    + f"{traceback.format_exc()}"
438                )
439
440
441    def read(self, *args, **kwargs) -> str:
442        """
443        Read the contents of the existing subfiles.
444        """
445        existing_subfile_indices = [
446            self.get_index_from_subfile_name(subfile_path.name)
447            for subfile_path in self.get_existing_subfile_paths()
448        ]
449        paths_to_read = [
450            self.get_subfile_path_from_index(subfile_index)
451            for subfile_index in existing_subfile_indices
452            if subfile_index >= self._cursor[0]
453        ]
454        buffer = ''
455        refresh_cursor = True
456        for subfile_path in paths_to_read:
457            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
458            seek_ix = (
459                self._cursor[1]
460                if subfile_index == self._cursor[0]
461                else 0
462            )
463
464            if (
465                subfile_index in self.subfile_objects
466                and
467                subfile_index not in self._redirected_subfile_objects
468            ):
469                subfile_object = self.subfile_objects[subfile_index]
470                for i in range(self.SEEK_BACK_ATTEMPTS):
471                    try:
472                        subfile_object.seek(max(seek_ix - i, 0))
473                        buffer += subfile_object.read()
474                    except UnicodeDecodeError:
475                        continue
476                    break
477            else:
478                with open(subfile_path, 'r', encoding='utf-8') as f:
479                    for i in range(self.SEEK_BACK_ATTEMPTS):
480                        try:
481                            f.seek(max(seek_ix - i, 0))
482                            buffer += f.read()
483                        except UnicodeDecodeError:
484                            continue
485                        break
486
487                    ### Handle the case when no files have yet been opened.
488                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
489                        self._cursor = (subfile_index, f.tell())
490                        refresh_cursor = False
491
492        if refresh_cursor:
493            self.refresh_cursor()
494        return buffer
495
496
497    def refresh_cursor(self) -> None:
498        """
499        Update the cursor to the latest subfile index and file.tell() value.
500        """
501        self.flush()
502        existing_subfile_paths = self.get_existing_subfile_paths()
503        current_ix = (
504            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
505            if existing_subfile_paths
506            else 0
507        )
508        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
509        self._cursor = (current_ix, position)
510
511
512    def readlines(self) -> List[str]:
513        """
514        Return a list of lines of text.
515        """
516        existing_subfile_indices = [
517            self.get_index_from_subfile_name(subfile_path.name)
518            for subfile_path in self.get_existing_subfile_paths()
519        ]
520        paths_to_read = [
521            self.get_subfile_path_from_index(subfile_index)
522            for subfile_index in existing_subfile_indices
523            if subfile_index >= self._cursor[0]
524        ]
525
526        lines = []
527        refresh_cursor = True
528        for subfile_path in paths_to_read:
529            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
530            seek_ix = (
531                self._cursor[1]
532                if subfile_index == self._cursor[0]
533                else 0
534            )
535
536            subfile_lines = []
537            if (
538                subfile_index in self.subfile_objects
539                and
540                subfile_index not in self._redirected_subfile_objects
541            ):
542                subfile_object = self.subfile_objects[subfile_index]
543                for i in range(self.SEEK_BACK_ATTEMPTS):
544                    try:
545                        subfile_object.seek(max((seek_ix - i), 0))
546                        subfile_lines = subfile_object.readlines()
547                    except UnicodeDecodeError:
548                        continue
549                    break
550            else:
551                with open(subfile_path, 'r', encoding='utf-8') as f:
552                    for i in range(self.SEEK_BACK_ATTEMPTS):
553                        try:
554                            f.seek(max(seek_ix - i, 0))
555                            subfile_lines = f.readlines()
556                        except UnicodeDecodeError:
557                            continue
558                        break
559
560                    ### Handle the case when no files have yet been opened.
561                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
562                        self._cursor = (subfile_index, f.tell())
563                        refresh_cursor = False
564
565            ### Sometimes a line may span multiple files.
566            if lines and subfile_lines and not lines[-1].endswith('\n'):
567                lines[-1] += subfile_lines[0]
568                new_lines = subfile_lines[1:]
569            else:
570                new_lines = subfile_lines
571            lines.extend(new_lines)
572
573        if refresh_cursor:
574            self.refresh_cursor()
575        return lines
576
577
578    def seekable(self) -> bool:
579        return True
580
581
582    def seek(self, position: int) -> None:
583        """
584        Seek to the beginning of the logs stream.
585        """
586        existing_subfile_indices = self.get_existing_subfile_indices()
587        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
588        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
589        if position == 0:
590            self._cursor = (min_ix, 0)
591            return
592
593        self._cursor = (max_ix, position)
594        if self._current_file_obj is not None:
595            self._current_file_obj.seek(position)
596
597    
598    def flush(self) -> None:
599        """
600        Flush any open subfiles.
601        """
602        for subfile_index, subfile_object in self.subfile_objects.items():
603            if not subfile_object.closed:
604                try:
605                    subfile_object.flush()
606                except Exception:
607                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
608
609        if self.redirect_streams:
610            try:
611                sys.stdout.flush()
612            except BrokenPipeError:
613                pass
614            except Exception:
615                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
616            try:
617                sys.stderr.flush()
618            except BrokenPipeError:
619                pass
620            except Exception:
621                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
622
623
624    def start_log_fd_interception(self):
625        """
626        Start the file descriptor monitoring threads.
627        """
628        if not self.write_timestamps:
629            return
630
631        self._stdout_interceptor = FileDescriptorInterceptor(
632            sys.stdout.fileno(),
633            self.get_timestamp_prefix_str,
634        )
635        self._stderr_interceptor = FileDescriptorInterceptor(
636            sys.stderr.fileno(),
637            self.get_timestamp_prefix_str,
638        )
639
640        self._stdout_interceptor_thread = Thread(
641            target = self._stdout_interceptor.start_interception,
642            daemon = True,
643        )
644        self._stderr_interceptor_thread = Thread(
645            target = self._stderr_interceptor.start_interception,
646            daemon = True,
647        )
648        self._stdout_interceptor_thread.start()
649        self._stderr_interceptor_thread.start()
650        self._intercepting = True
651
652        if '_interceptor_threads' not in self.__dict__:
653            self._interceptor_threads = []
654        if '_interceptors' not in self.__dict__:
655            self._interceptors = []
656        self._interceptor_threads.extend([
657            self._stdout_interceptor_thread,
658            self._stderr_interceptor_thread,
659        ])
660        self._interceptors.extend([
661            self._stdout_interceptor,
662            self._stderr_interceptor,
663        ])
664        self.stop_log_fd_interception(unused_only=True)
665
666
667    def stop_log_fd_interception(self, unused_only: bool = False):
668        """
669        Stop the file descriptor monitoring threads.
670        """
671        if not self.write_timestamps:
672            return
673
674        interceptors = self.__dict__.get('_interceptors', [])
675        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
676
677        end_ix = len(interceptors) if not unused_only else -2
678
679        for interceptor in interceptors[:end_ix]:
680            interceptor.stop_interception()
681        del interceptors[:end_ix]
682
683        for thread in interceptor_threads[:end_ix]:
684            try:
685                thread.join()
686            except Exception:
687                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
688        del interceptor_threads[:end_ix]
689
690    def touch(self):
691        """
692        Touch the latest subfile.
693        """
694        subfile_path = self.get_latest_subfile_path()
695        subfile_path.touch()
696
697    def isatty(self) -> bool:
698        return True
699
700    def __repr__(self) -> str:
701        """
702        Return basic info for this `RotatingFile`.
703        """
704        return (
705            "RotatingFile("
706            + f"'{self.file_path.as_posix()}', "
707            + f"num_files_to_keep={self.num_files_to_keep}, "
708            + f"max_file_size={self.max_file_size})"
709        )
class RotatingFile(io.IOBase):
 27class RotatingFile(io.IOBase):
 28    """
 29    A `RotatingFile` may be treated like a normal file-like object.
 30    Under the hood, however, it will create new sub-files and delete old ones.
 31    """
 32
 33    SEEK_BACK_ATTEMPTS: int = 5
 34
 35    def __init__(
 36        self,
 37        file_path: pathlib.Path,
 38        num_files_to_keep: Optional[int] = None,
 39        max_file_size: Optional[int] = None,
 40        redirect_streams: bool = False,
 41        write_timestamps: bool = False,
 42        timestamp_format: Optional[str] = None,
 43        write_callback: Optional[Callable[[str], None]] = None,
 44    ):
 45        """
 46        Create a file-like object which manages other files.
 47
 48        Parameters
 49        ----------
 50        num_files_to_keep: int, default None
 51            How many sub-files to keep at any given time.
 52            Defaults to the configured value (5).
 53
 54        max_file_size: int, default None
 55            How large in bytes each sub-file can grow before another file is created.
 56            Note that this is not a hard limit but rather a threshold
 57            which may be slightly exceeded.
 58            Defaults to the configured value (100_000).
 59
 60        redirect_streams: bool, default False
 61            If `True`, redirect previous file streams when opening a new file descriptor.
 62            
 63            NOTE: Only set this to `True` if you are entering into a daemon context.
 64            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 65
 66        write_timestamps: bool, default False
 67            If `True`, prepend the current UTC timestamp to each line of the file.
 68
 69        timestamp_format: str, default None
 70            If `write_timestamps` is `True`, use this format for the timestamps.
 71            Defaults to `'%Y-%m-%d %H:%M'`.
 72
 73        write_callback: Optional[Callable[[str], None]], default None
 74            If provided, execute this callback with the data to be written.
 75        """
 76        self.file_path = pathlib.Path(file_path)
 77        if num_files_to_keep is None:
 78            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 79        if max_file_size is None:
 80            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 81        if timestamp_format is None:
 82            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 83        if num_files_to_keep < 1:
 84            raise ValueError("At least 1 file must be kept.")
 85        if max_file_size < 100:
 86            raise ValueError("Subfiles must contain at least 100 bytes.")
 87
 88        self.num_files_to_keep = num_files_to_keep
 89        self.max_file_size = max_file_size
 90        self.redirect_streams = redirect_streams
 91        self.write_timestamps = write_timestamps
 92        self.timestamp_format = timestamp_format
 93        self.write_callback = write_callback
 94        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?')
 95
 96        ### When subfiles are opened, map from their index to the file objects.
 97        self.subfile_objects = {}
 98        self._redirected_subfile_objects = {}
 99        self._current_file_obj = None
100        self._previous_file_obj = None
101
102        ### When reading, keep track of the file index and position.
103        self._cursor: Tuple[int, int] = (0, 0)
104
105        ### Don't forget to close any stray files.
106        atexit.register(self.close)
107
108
109    def fileno(self):
110        """
111        Return the file descriptor for the latest subfile.
112        """
113        self.refresh_files(start_interception=False)
114        return self._current_file_obj.fileno()
115
116
117    def get_latest_subfile_path(self) -> pathlib.Path:
118        """
119        Return the path for the latest subfile to which to write into.
120        """
121        return self.get_subfile_path_from_index(
122            self.get_latest_subfile_index()
123        )
124
125
126    def get_remaining_subfile_size(self, subfile_index: int) -> int:
127        """
128        Return the remaining buffer size for a subfile.
129
130        Parameters
131        ---------
132        subfile_index: int
133            The index of the subfile to be checked.
134
135        Returns
136        -------
137        The remaining size in bytes.
138        """
139        subfile_path = self.get_subfile_path_from_index(subfile_index)
140        if not subfile_path.exists():
141            return self.max_file_size
142
143        return self.max_file_size - os.path.getsize(subfile_path)
144
145
146    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
147        """
148        Return whether a given subfile is too large.
149
150        Parameters
151        ----------
152        subfile_index: int
153            The index of the subfile to be checked.
154
155        potential_new_len: int, default 0
156            The length of a potential write of new data.
157
158        Returns
159        -------
160        A bool indicating the subfile is or will be too large.
161        """
162        subfile_path = self.get_subfile_path_from_index(subfile_index)
163        if not subfile_path.exists():
164            return False
165
166        self.flush()
167
168        return (
169            (os.path.getsize(subfile_path) + potential_new_len)
170            >=
171            self.max_file_size
172        )
173
174
175    def get_latest_subfile_index(self) -> int:
176        """
177        Return the latest existing subfile index.
178        If no index may be found, return -1.
179        """
180        existing_subfile_paths = self.get_existing_subfile_paths()
181        latest_index = (
182            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
183            if existing_subfile_paths
184            else 0
185        )
186        return latest_index
187
188
189    def get_index_from_subfile_name(self, subfile_name: str) -> int:
190        """
191        Return the index from a given subfile name.
192        If the file name cannot be parsed, return -1.
193        """
194        try:
195            return int(subfile_name.replace(self.file_path.name + '.', ''))
196        except Exception:
197            return -1
198
199
200    def get_subfile_name_from_index(self, subfile_index: int) -> str:
201        """
202        Return the subfile name from the given index.
203        """
204        return f'{self.file_path.name}.{subfile_index}'
205
206
207    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
208        """
209        Return the subfile's path from its index.
210        """
211        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
212
213
214    def get_existing_subfile_indices(self) -> List[int]:
215        """
216        Return of list of subfile indices which exist on disk.
217        """
218        existing_subfile_paths = self.get_existing_subfile_paths()
219        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
220
221
222    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
223        """
224        Return a list of file paths that match the input filename pattern.
225        """
226        if not self.file_path.parent.exists():
227            return []
228
229        subfile_names_indices = sorted(
230            [
231                (file_name, self.get_index_from_subfile_name(file_name))
232                for file_name in os.listdir(self.file_path.parent)
233                if (
234                    file_name.startswith(self.file_path.name)
235                    and re.match(self.subfile_regex_pattern, file_name)
236                )
237            ],
238            key=lambda x: x[1],
239        )
240        return [
241            (self.file_path.parent / file_name)
242            for file_name, _ in subfile_names_indices
243        ]
244
245
246    def refresh_files(
247        self,
248        potential_new_len: int = 0,
249        start_interception: bool = False,
250    ) -> '_io.TextUIWrapper':
251        """
252        Check the state of the subfiles.
253        If the latest subfile is too large, create a new file and delete old ones.
254
255        Parameters
256        ----------
257        potential_new_len: int, default 0
258
259        start_interception: bool, default False
260            If `True`, kick off the file interception threads.
261        """
262        self.flush()
263
264        latest_subfile_index = self.get_latest_subfile_index()
265        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
266
267        ### First run with existing log files: open the most recent log file.
268        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
269
270        ### Sometimes a new file is created but output doesn't switch over.
271        lost_latest_handle = (
272            self._current_file_obj is not None
273            and
274            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
275        )
276        if is_first_run_with_logs or lost_latest_handle:
277            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
278            if self.redirect_streams:
279                try:
280                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
281                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
282                except OSError:
283                    warn(
284                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
285                    )
286                if start_interception and self.write_timestamps:
287                    self.start_log_fd_interception()
288
289        create_new_file = (
290            (latest_subfile_index == -1)
291            or
292            self._current_file_obj is None
293            or
294            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
295        )
296        if create_new_file:
297            self.increment_subfiles()
298
299        return self._current_file_obj
300
301    def increment_subfiles(self, increment_by: int = 1):
302        """
303        Create a new subfile and switch the file pointer over.
304        """
305        latest_subfile_index = self.get_latest_subfile_index()
306        old_subfile_index = latest_subfile_index
307        new_subfile_index = old_subfile_index + increment_by
308        new_file_path = self.get_subfile_path_from_index(new_subfile_index)
309        self._previous_file_obj = self._current_file_obj
310        self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
311        self.subfile_objects[new_subfile_index] = self._current_file_obj
312        self.flush()
313
314        if self.redirect_streams:
315            if self._previous_file_obj is not None:
316                self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
317                daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
318            daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
319            daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
320        self.close(unused_only=True)
321
322        ### Sanity check in case writing somehow fails.
323        if self._previous_file_obj is self._current_file_obj:
324            self._previous_file_obj = None
325
326        self.delete(unused_only=True)
327
328    def close(self, unused_only: bool = False) -> None:
329        """
330        Close any open file descriptors.
331
332        Parameters
333        ----------
334        unused_only: bool, default False
335            If `True`, only close file descriptors not currently in use.
336        """
337        self.stop_log_fd_interception(unused_only=unused_only)
338        subfile_indices = sorted(self.subfile_objects.keys())
339        for subfile_index in subfile_indices:
340            subfile_object = self.subfile_objects[subfile_index]
341            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
342                continue
343            try:
344                if not subfile_object.closed:
345                    subfile_object.close()
346            except Exception:
347                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
348
349            _ = self.subfile_objects.pop(subfile_index, None)
350            if self.redirect_streams:
351                _ = self._redirected_subfile_objects.pop(subfile_index, None)
352
353        if not unused_only:
354            self._previous_file_obj = None
355            self._current_file_obj = None
356
357
358    def get_timestamp_prefix_str(self) -> str:
359        """
360        Return the current minute prefix string.
361        """
362        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
363
364
365    def write(self, data: str) -> None:
366        """
367        Write the given text into the latest subfile.
368        If the subfile will be too large, create a new subfile.
369        If too many subfiles exist at once, the oldest one will be deleted.
370
371        NOTE: This will not split data across multiple files.
372        As such, if data is larger than max_file_size, then the corresponding subfile
373        may exceed this limit.
374        """
375        try:
376            if callable(self.write_callback):
377                self.write_callback(data)
378        except Exception:
379            warn(f"Failed to execute write callback:\n{traceback.format_exc()}")
380
381        try:
382            self.file_path.parent.mkdir(exist_ok=True, parents=True)
383            if isinstance(data, bytes):
384                data = data.decode('utf-8')
385
386            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
387            suffix_str = "\n" if self.write_timestamps else ""
388            self.refresh_files(
389                potential_new_len = len(prefix_str + data + suffix_str),
390                start_interception = self.write_timestamps,
391            )
392            try:
393                if prefix_str:
394                    self._current_file_obj.write(prefix_str)
395                self._current_file_obj.write(data)
396                if suffix_str:
397                    self._current_file_obj.write(suffix_str)
398            except BrokenPipeError:
399                warn("BrokenPipeError encountered. The daemon may have been terminated.")
400                return
401            except Exception:
402                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
403            self.flush()
404            self.delete(unused_only=True)
405        except Exception as e:
406            warn(f"Unexpected error in RotatingFile.write: {e}")
407
408
409    def delete(self, unused_only: bool = False) -> None:
410        """
411        Delete old subfiles.
412
413        Parameters
414        ----------
415        unused_only: bool, default False
416            If `True`, only delete subfiles which are no longer needed.
417        """
418        existing_subfile_paths = self.get_existing_subfile_paths()
419        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
420            return
421
422        self.flush()
423        self.close(unused_only=unused_only)
424
425        end_ix = (
426            (-1 * self.num_files_to_keep)
427            if unused_only
428            else len(existing_subfile_paths)
429        )
430        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
431            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
432
433            try:
434                subfile_path_to_delete.unlink()
435            except Exception:
436                warn(
437                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
438                    + f"{traceback.format_exc()}"
439                )
440
441
442    def read(self, *args, **kwargs) -> str:
443        """
444        Read the contents of the existing subfiles.
445        """
446        existing_subfile_indices = [
447            self.get_index_from_subfile_name(subfile_path.name)
448            for subfile_path in self.get_existing_subfile_paths()
449        ]
450        paths_to_read = [
451            self.get_subfile_path_from_index(subfile_index)
452            for subfile_index in existing_subfile_indices
453            if subfile_index >= self._cursor[0]
454        ]
455        buffer = ''
456        refresh_cursor = True
457        for subfile_path in paths_to_read:
458            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
459            seek_ix = (
460                self._cursor[1]
461                if subfile_index == self._cursor[0]
462                else 0
463            )
464
465            if (
466                subfile_index in self.subfile_objects
467                and
468                subfile_index not in self._redirected_subfile_objects
469            ):
470                subfile_object = self.subfile_objects[subfile_index]
471                for i in range(self.SEEK_BACK_ATTEMPTS):
472                    try:
473                        subfile_object.seek(max(seek_ix - i, 0))
474                        buffer += subfile_object.read()
475                    except UnicodeDecodeError:
476                        continue
477                    break
478            else:
479                with open(subfile_path, 'r', encoding='utf-8') as f:
480                    for i in range(self.SEEK_BACK_ATTEMPTS):
481                        try:
482                            f.seek(max(seek_ix - i, 0))
483                            buffer += f.read()
484                        except UnicodeDecodeError:
485                            continue
486                        break
487
488                    ### Handle the case when no files have yet been opened.
489                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
490                        self._cursor = (subfile_index, f.tell())
491                        refresh_cursor = False
492
493        if refresh_cursor:
494            self.refresh_cursor()
495        return buffer
496
497
498    def refresh_cursor(self) -> None:
499        """
500        Update the cursor to the latest subfile index and file.tell() value.
501        """
502        self.flush()
503        existing_subfile_paths = self.get_existing_subfile_paths()
504        current_ix = (
505            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
506            if existing_subfile_paths
507            else 0
508        )
509        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
510        self._cursor = (current_ix, position)
511
512
513    def readlines(self) -> List[str]:
514        """
515        Return a list of lines of text.
516        """
517        existing_subfile_indices = [
518            self.get_index_from_subfile_name(subfile_path.name)
519            for subfile_path in self.get_existing_subfile_paths()
520        ]
521        paths_to_read = [
522            self.get_subfile_path_from_index(subfile_index)
523            for subfile_index in existing_subfile_indices
524            if subfile_index >= self._cursor[0]
525        ]
526
527        lines = []
528        refresh_cursor = True
529        for subfile_path in paths_to_read:
530            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
531            seek_ix = (
532                self._cursor[1]
533                if subfile_index == self._cursor[0]
534                else 0
535            )
536
537            subfile_lines = []
538            if (
539                subfile_index in self.subfile_objects
540                and
541                subfile_index not in self._redirected_subfile_objects
542            ):
543                subfile_object = self.subfile_objects[subfile_index]
544                for i in range(self.SEEK_BACK_ATTEMPTS):
545                    try:
546                        subfile_object.seek(max((seek_ix - i), 0))
547                        subfile_lines = subfile_object.readlines()
548                    except UnicodeDecodeError:
549                        continue
550                    break
551            else:
552                with open(subfile_path, 'r', encoding='utf-8') as f:
553                    for i in range(self.SEEK_BACK_ATTEMPTS):
554                        try:
555                            f.seek(max(seek_ix - i, 0))
556                            subfile_lines = f.readlines()
557                        except UnicodeDecodeError:
558                            continue
559                        break
560
561                    ### Handle the case when no files have yet been opened.
562                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
563                        self._cursor = (subfile_index, f.tell())
564                        refresh_cursor = False
565
566            ### Sometimes a line may span multiple files.
567            if lines and subfile_lines and not lines[-1].endswith('\n'):
568                lines[-1] += subfile_lines[0]
569                new_lines = subfile_lines[1:]
570            else:
571                new_lines = subfile_lines
572            lines.extend(new_lines)
573
574        if refresh_cursor:
575            self.refresh_cursor()
576        return lines
577
578
579    def seekable(self) -> bool:
580        return True
581
582
583    def seek(self, position: int) -> None:
584        """
585        Seek to the beginning of the logs stream.
586        """
587        existing_subfile_indices = self.get_existing_subfile_indices()
588        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
589        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
590        if position == 0:
591            self._cursor = (min_ix, 0)
592            return
593
594        self._cursor = (max_ix, position)
595        if self._current_file_obj is not None:
596            self._current_file_obj.seek(position)
597
598    
599    def flush(self) -> None:
600        """
601        Flush any open subfiles.
602        """
603        for subfile_index, subfile_object in self.subfile_objects.items():
604            if not subfile_object.closed:
605                try:
606                    subfile_object.flush()
607                except Exception:
608                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
609
610        if self.redirect_streams:
611            try:
612                sys.stdout.flush()
613            except BrokenPipeError:
614                pass
615            except Exception:
616                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
617            try:
618                sys.stderr.flush()
619            except BrokenPipeError:
620                pass
621            except Exception:
622                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
623
624
625    def start_log_fd_interception(self):
626        """
627        Start the file descriptor monitoring threads.
628        """
629        if not self.write_timestamps:
630            return
631
632        self._stdout_interceptor = FileDescriptorInterceptor(
633            sys.stdout.fileno(),
634            self.get_timestamp_prefix_str,
635        )
636        self._stderr_interceptor = FileDescriptorInterceptor(
637            sys.stderr.fileno(),
638            self.get_timestamp_prefix_str,
639        )
640
641        self._stdout_interceptor_thread = Thread(
642            target = self._stdout_interceptor.start_interception,
643            daemon = True,
644        )
645        self._stderr_interceptor_thread = Thread(
646            target = self._stderr_interceptor.start_interception,
647            daemon = True,
648        )
649        self._stdout_interceptor_thread.start()
650        self._stderr_interceptor_thread.start()
651        self._intercepting = True
652
653        if '_interceptor_threads' not in self.__dict__:
654            self._interceptor_threads = []
655        if '_interceptors' not in self.__dict__:
656            self._interceptors = []
657        self._interceptor_threads.extend([
658            self._stdout_interceptor_thread,
659            self._stderr_interceptor_thread,
660        ])
661        self._interceptors.extend([
662            self._stdout_interceptor,
663            self._stderr_interceptor,
664        ])
665        self.stop_log_fd_interception(unused_only=True)
666
667
668    def stop_log_fd_interception(self, unused_only: bool = False):
669        """
670        Stop the file descriptor monitoring threads.
671        """
672        if not self.write_timestamps:
673            return
674
675        interceptors = self.__dict__.get('_interceptors', [])
676        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
677
678        end_ix = len(interceptors) if not unused_only else -2
679
680        for interceptor in interceptors[:end_ix]:
681            interceptor.stop_interception()
682        del interceptors[:end_ix]
683
684        for thread in interceptor_threads[:end_ix]:
685            try:
686                thread.join()
687            except Exception:
688                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
689        del interceptor_threads[:end_ix]
690
691    def touch(self):
692        """
693        Touch the latest subfile.
694        """
695        subfile_path = self.get_latest_subfile_path()
696        subfile_path.touch()
697
698    def isatty(self) -> bool:
699        return True
700
701    def __repr__(self) -> str:
702        """
703        Return basic info for this `RotatingFile`.
704        """
705        return (
706            "RotatingFile("
707            + f"'{self.file_path.as_posix()}', "
708            + f"num_files_to_keep={self.num_files_to_keep}, "
709            + f"max_file_size={self.max_file_size})"
710        )

A RotatingFile may be treated like a normal file-like object. Under the hood, however, it will create new sub-files and delete old ones.

RotatingFile( file_path: pathlib.Path, num_files_to_keep: Optional[int] = None, max_file_size: Optional[int] = None, redirect_streams: bool = False, write_timestamps: bool = False, timestamp_format: Optional[str] = None, write_callback: Optional[Callable[[str], NoneType]] = None)
 35    def __init__(
 36        self,
 37        file_path: pathlib.Path,
 38        num_files_to_keep: Optional[int] = None,
 39        max_file_size: Optional[int] = None,
 40        redirect_streams: bool = False,
 41        write_timestamps: bool = False,
 42        timestamp_format: Optional[str] = None,
 43        write_callback: Optional[Callable[[str], None]] = None,
 44    ):
 45        """
 46        Create a file-like object which manages other files.
 47
 48        Parameters
 49        ----------
 50        num_files_to_keep: int, default None
 51            How many sub-files to keep at any given time.
 52            Defaults to the configured value (5).
 53
 54        max_file_size: int, default None
 55            How large in bytes each sub-file can grow before another file is created.
 56            Note that this is not a hard limit but rather a threshold
 57            which may be slightly exceeded.
 58            Defaults to the configured value (100_000).
 59
 60        redirect_streams: bool, default False
 61            If `True`, redirect previous file streams when opening a new file descriptor.
 62            
 63            NOTE: Only set this to `True` if you are entering into a daemon context.
 64            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 65
 66        write_timestamps: bool, default False
 67            If `True`, prepend the current UTC timestamp to each line of the file.
 68
 69        timestamp_format: str, default None
 70            If `write_timestamps` is `True`, use this format for the timestamps.
 71            Defaults to `'%Y-%m-%d %H:%M'`.
 72
 73        write_callback: Optional[Callable[[str], None]], default None
 74            If provided, execute this callback with the data to be written.
 75        """
 76        self.file_path = pathlib.Path(file_path)
 77        if num_files_to_keep is None:
 78            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 79        if max_file_size is None:
 80            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 81        if timestamp_format is None:
 82            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 83        if num_files_to_keep < 1:
 84            raise ValueError("At least 1 file must be kept.")
 85        if max_file_size < 100:
 86            raise ValueError("Subfiles must contain at least 100 bytes.")
 87
 88        self.num_files_to_keep = num_files_to_keep
 89        self.max_file_size = max_file_size
 90        self.redirect_streams = redirect_streams
 91        self.write_timestamps = write_timestamps
 92        self.timestamp_format = timestamp_format
 93        self.write_callback = write_callback
 94        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?')
 95
 96        ### When subfiles are opened, map from their index to the file objects.
 97        self.subfile_objects = {}
 98        self._redirected_subfile_objects = {}
 99        self._current_file_obj = None
100        self._previous_file_obj = None
101
102        ### When reading, keep track of the file index and position.
103        self._cursor: Tuple[int, int] = (0, 0)
104
105        ### Don't forget to close any stray files.
106        atexit.register(self.close)

Create a file-like object which manages other files.

Parameters
  • num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
  • max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
  • redirect_streams (bool, default False): If True, redirect previous file streams when opening a new file descriptor.

    NOTE: Only set this to True if you are entering into a daemon context. Doing so will redirect sys.stdout and sys.stderr into the log files.

  • write_timestamps (bool, default False): If True, prepend the current UTC timestamp to each line of the file.
  • timestamp_format (str, default None): If write_timestamps is True, use this format for the timestamps. Defaults to '%Y-%m-%d %H:%M'.
  • write_callback (Optional[Callable[[str], None]], default None): If provided, execute this callback with the data to be written.
SEEK_BACK_ATTEMPTS: int = 5
file_path
num_files_to_keep
max_file_size
redirect_streams
write_timestamps
timestamp_format
write_callback
subfile_regex_pattern
subfile_objects
def fileno(self):
109    def fileno(self):
110        """
111        Return the file descriptor for the latest subfile.
112        """
113        self.refresh_files(start_interception=False)
114        return self._current_file_obj.fileno()

Return the file descriptor for the latest subfile.

def get_latest_subfile_path(self) -> pathlib.Path:
117    def get_latest_subfile_path(self) -> pathlib.Path:
118        """
119        Return the path for the latest subfile to which to write into.
120        """
121        return self.get_subfile_path_from_index(
122            self.get_latest_subfile_index()
123        )

Return the path for the latest subfile to which to write into.

def get_remaining_subfile_size(self, subfile_index: int) -> int:
126    def get_remaining_subfile_size(self, subfile_index: int) -> int:
127        """
128        Return the remaining buffer size for a subfile.
129
130        Parameters
131        ---------
132        subfile_index: int
133            The index of the subfile to be checked.
134
135        Returns
136        -------
137        The remaining size in bytes.
138        """
139        subfile_path = self.get_subfile_path_from_index(subfile_index)
140        if not subfile_path.exists():
141            return self.max_file_size
142
143        return self.max_file_size - os.path.getsize(subfile_path)

Return the remaining buffer size for a subfile.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
Returns
  • The remaining size in bytes.
def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
146    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
147        """
148        Return whether a given subfile is too large.
149
150        Parameters
151        ----------
152        subfile_index: int
153            The index of the subfile to be checked.
154
155        potential_new_len: int, default 0
156            The length of a potential write of new data.
157
158        Returns
159        -------
160        A bool indicating the subfile is or will be too large.
161        """
162        subfile_path = self.get_subfile_path_from_index(subfile_index)
163        if not subfile_path.exists():
164            return False
165
166        self.flush()
167
168        return (
169            (os.path.getsize(subfile_path) + potential_new_len)
170            >=
171            self.max_file_size
172        )

Return whether a given subfile is too large.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
  • potential_new_len (int, default 0): The length of a potential write of new data.
Returns
  • A bool indicating the subfile is or will be too large.
def get_latest_subfile_index(self) -> int:
175    def get_latest_subfile_index(self) -> int:
176        """
177        Return the latest existing subfile index.
178        If no index may be found, return -1.
179        """
180        existing_subfile_paths = self.get_existing_subfile_paths()
181        latest_index = (
182            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
183            if existing_subfile_paths
184            else 0
185        )
186        return latest_index

Return the latest existing subfile index. If no index may be found, return -1.

def get_index_from_subfile_name(self, subfile_name: str) -> int:
189    def get_index_from_subfile_name(self, subfile_name: str) -> int:
190        """
191        Return the index from a given subfile name.
192        If the file name cannot be parsed, return -1.
193        """
194        try:
195            return int(subfile_name.replace(self.file_path.name + '.', ''))
196        except Exception:
197            return -1

Return the index from a given subfile name. If the file name cannot be parsed, return -1.

def get_subfile_name_from_index(self, subfile_index: int) -> str:
200    def get_subfile_name_from_index(self, subfile_index: int) -> str:
201        """
202        Return the subfile name from the given index.
203        """
204        return f'{self.file_path.name}.{subfile_index}'

Return the subfile name from the given index.

def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
207    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
208        """
209        Return the subfile's path from its index.
210        """
211        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)

Return the subfile's path from its index.

def get_existing_subfile_indices(self) -> List[int]:
214    def get_existing_subfile_indices(self) -> List[int]:
215        """
216        Return of list of subfile indices which exist on disk.
217        """
218        existing_subfile_paths = self.get_existing_subfile_paths()
219        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]

Return of list of subfile indices which exist on disk.

def get_existing_subfile_paths(self) -> List[pathlib.Path]:
222    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
223        """
224        Return a list of file paths that match the input filename pattern.
225        """
226        if not self.file_path.parent.exists():
227            return []
228
229        subfile_names_indices = sorted(
230            [
231                (file_name, self.get_index_from_subfile_name(file_name))
232                for file_name in os.listdir(self.file_path.parent)
233                if (
234                    file_name.startswith(self.file_path.name)
235                    and re.match(self.subfile_regex_pattern, file_name)
236                )
237            ],
238            key=lambda x: x[1],
239        )
240        return [
241            (self.file_path.parent / file_name)
242            for file_name, _ in subfile_names_indices
243        ]

Return a list of file paths that match the input filename pattern.

def refresh_files( self, potential_new_len: int = 0, start_interception: bool = False) -> '_io.TextUIWrapper':
246    def refresh_files(
247        self,
248        potential_new_len: int = 0,
249        start_interception: bool = False,
250    ) -> '_io.TextUIWrapper':
251        """
252        Check the state of the subfiles.
253        If the latest subfile is too large, create a new file and delete old ones.
254
255        Parameters
256        ----------
257        potential_new_len: int, default 0
258
259        start_interception: bool, default False
260            If `True`, kick off the file interception threads.
261        """
262        self.flush()
263
264        latest_subfile_index = self.get_latest_subfile_index()
265        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
266
267        ### First run with existing log files: open the most recent log file.
268        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
269
270        ### Sometimes a new file is created but output doesn't switch over.
271        lost_latest_handle = (
272            self._current_file_obj is not None
273            and
274            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
275        )
276        if is_first_run_with_logs or lost_latest_handle:
277            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
278            if self.redirect_streams:
279                try:
280                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
281                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
282                except OSError:
283                    warn(
284                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
285                    )
286                if start_interception and self.write_timestamps:
287                    self.start_log_fd_interception()
288
289        create_new_file = (
290            (latest_subfile_index == -1)
291            or
292            self._current_file_obj is None
293            or
294            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
295        )
296        if create_new_file:
297            self.increment_subfiles()
298
299        return self._current_file_obj

Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.

Parameters
  • potential_new_len (int, default 0):

  • start_interception (bool, default False): If True, kick off the file interception threads.

def increment_subfiles(self, increment_by: int = 1):
301    def increment_subfiles(self, increment_by: int = 1):
302        """
303        Create a new subfile and switch the file pointer over.
304        """
305        latest_subfile_index = self.get_latest_subfile_index()
306        old_subfile_index = latest_subfile_index
307        new_subfile_index = old_subfile_index + increment_by
308        new_file_path = self.get_subfile_path_from_index(new_subfile_index)
309        self._previous_file_obj = self._current_file_obj
310        self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
311        self.subfile_objects[new_subfile_index] = self._current_file_obj
312        self.flush()
313
314        if self.redirect_streams:
315            if self._previous_file_obj is not None:
316                self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
317                daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
318            daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
319            daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
320        self.close(unused_only=True)
321
322        ### Sanity check in case writing somehow fails.
323        if self._previous_file_obj is self._current_file_obj:
324            self._previous_file_obj = None
325
326        self.delete(unused_only=True)

Create a new subfile and switch the file pointer over.

def close(self, unused_only: bool = False) -> None:
328    def close(self, unused_only: bool = False) -> None:
329        """
330        Close any open file descriptors.
331
332        Parameters
333        ----------
334        unused_only: bool, default False
335            If `True`, only close file descriptors not currently in use.
336        """
337        self.stop_log_fd_interception(unused_only=unused_only)
338        subfile_indices = sorted(self.subfile_objects.keys())
339        for subfile_index in subfile_indices:
340            subfile_object = self.subfile_objects[subfile_index]
341            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
342                continue
343            try:
344                if not subfile_object.closed:
345                    subfile_object.close()
346            except Exception:
347                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
348
349            _ = self.subfile_objects.pop(subfile_index, None)
350            if self.redirect_streams:
351                _ = self._redirected_subfile_objects.pop(subfile_index, None)
352
353        if not unused_only:
354            self._previous_file_obj = None
355            self._current_file_obj = None

Close any open file descriptors.

Parameters
  • unused_only (bool, default False): If True, only close file descriptors not currently in use.
def get_timestamp_prefix_str(self) -> str:
358    def get_timestamp_prefix_str(self) -> str:
359        """
360        Return the current minute prefix string.
361        """
362        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '

Return the current minute prefix string.

def write(self, data: str) -> None:
365    def write(self, data: str) -> None:
366        """
367        Write the given text into the latest subfile.
368        If the subfile will be too large, create a new subfile.
369        If too many subfiles exist at once, the oldest one will be deleted.
370
371        NOTE: This will not split data across multiple files.
372        As such, if data is larger than max_file_size, then the corresponding subfile
373        may exceed this limit.
374        """
375        try:
376            if callable(self.write_callback):
377                self.write_callback(data)
378        except Exception:
379            warn(f"Failed to execute write callback:\n{traceback.format_exc()}")
380
381        try:
382            self.file_path.parent.mkdir(exist_ok=True, parents=True)
383            if isinstance(data, bytes):
384                data = data.decode('utf-8')
385
386            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
387            suffix_str = "\n" if self.write_timestamps else ""
388            self.refresh_files(
389                potential_new_len = len(prefix_str + data + suffix_str),
390                start_interception = self.write_timestamps,
391            )
392            try:
393                if prefix_str:
394                    self._current_file_obj.write(prefix_str)
395                self._current_file_obj.write(data)
396                if suffix_str:
397                    self._current_file_obj.write(suffix_str)
398            except BrokenPipeError:
399                warn("BrokenPipeError encountered. The daemon may have been terminated.")
400                return
401            except Exception:
402                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
403            self.flush()
404            self.delete(unused_only=True)
405        except Exception as e:
406            warn(f"Unexpected error in RotatingFile.write: {e}")

Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.

NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.

def delete(self, unused_only: bool = False) -> None:
409    def delete(self, unused_only: bool = False) -> None:
410        """
411        Delete old subfiles.
412
413        Parameters
414        ----------
415        unused_only: bool, default False
416            If `True`, only delete subfiles which are no longer needed.
417        """
418        existing_subfile_paths = self.get_existing_subfile_paths()
419        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
420            return
421
422        self.flush()
423        self.close(unused_only=unused_only)
424
425        end_ix = (
426            (-1 * self.num_files_to_keep)
427            if unused_only
428            else len(existing_subfile_paths)
429        )
430        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
431            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
432
433            try:
434                subfile_path_to_delete.unlink()
435            except Exception:
436                warn(
437                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
438                    + f"{traceback.format_exc()}"
439                )

Delete old subfiles.

Parameters
  • unused_only (bool, default False): If True, only delete subfiles which are no longer needed.
def read(self, *args, **kwargs) -> str:
442    def read(self, *args, **kwargs) -> str:
443        """
444        Read the contents of the existing subfiles.
445        """
446        existing_subfile_indices = [
447            self.get_index_from_subfile_name(subfile_path.name)
448            for subfile_path in self.get_existing_subfile_paths()
449        ]
450        paths_to_read = [
451            self.get_subfile_path_from_index(subfile_index)
452            for subfile_index in existing_subfile_indices
453            if subfile_index >= self._cursor[0]
454        ]
455        buffer = ''
456        refresh_cursor = True
457        for subfile_path in paths_to_read:
458            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
459            seek_ix = (
460                self._cursor[1]
461                if subfile_index == self._cursor[0]
462                else 0
463            )
464
465            if (
466                subfile_index in self.subfile_objects
467                and
468                subfile_index not in self._redirected_subfile_objects
469            ):
470                subfile_object = self.subfile_objects[subfile_index]
471                for i in range(self.SEEK_BACK_ATTEMPTS):
472                    try:
473                        subfile_object.seek(max(seek_ix - i, 0))
474                        buffer += subfile_object.read()
475                    except UnicodeDecodeError:
476                        continue
477                    break
478            else:
479                with open(subfile_path, 'r', encoding='utf-8') as f:
480                    for i in range(self.SEEK_BACK_ATTEMPTS):
481                        try:
482                            f.seek(max(seek_ix - i, 0))
483                            buffer += f.read()
484                        except UnicodeDecodeError:
485                            continue
486                        break
487
488                    ### Handle the case when no files have yet been opened.
489                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
490                        self._cursor = (subfile_index, f.tell())
491                        refresh_cursor = False
492
493        if refresh_cursor:
494            self.refresh_cursor()
495        return buffer

Read the contents of the existing subfiles.

def refresh_cursor(self) -> None:
498    def refresh_cursor(self) -> None:
499        """
500        Update the cursor to the latest subfile index and file.tell() value.
501        """
502        self.flush()
503        existing_subfile_paths = self.get_existing_subfile_paths()
504        current_ix = (
505            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
506            if existing_subfile_paths
507            else 0
508        )
509        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
510        self._cursor = (current_ix, position)

Update the cursor to the latest subfile index and file.tell() value.

def readlines(self) -> List[str]:
513    def readlines(self) -> List[str]:
514        """
515        Return a list of lines of text.
516        """
517        existing_subfile_indices = [
518            self.get_index_from_subfile_name(subfile_path.name)
519            for subfile_path in self.get_existing_subfile_paths()
520        ]
521        paths_to_read = [
522            self.get_subfile_path_from_index(subfile_index)
523            for subfile_index in existing_subfile_indices
524            if subfile_index >= self._cursor[0]
525        ]
526
527        lines = []
528        refresh_cursor = True
529        for subfile_path in paths_to_read:
530            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
531            seek_ix = (
532                self._cursor[1]
533                if subfile_index == self._cursor[0]
534                else 0
535            )
536
537            subfile_lines = []
538            if (
539                subfile_index in self.subfile_objects
540                and
541                subfile_index not in self._redirected_subfile_objects
542            ):
543                subfile_object = self.subfile_objects[subfile_index]
544                for i in range(self.SEEK_BACK_ATTEMPTS):
545                    try:
546                        subfile_object.seek(max((seek_ix - i), 0))
547                        subfile_lines = subfile_object.readlines()
548                    except UnicodeDecodeError:
549                        continue
550                    break
551            else:
552                with open(subfile_path, 'r', encoding='utf-8') as f:
553                    for i in range(self.SEEK_BACK_ATTEMPTS):
554                        try:
555                            f.seek(max(seek_ix - i, 0))
556                            subfile_lines = f.readlines()
557                        except UnicodeDecodeError:
558                            continue
559                        break
560
561                    ### Handle the case when no files have yet been opened.
562                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
563                        self._cursor = (subfile_index, f.tell())
564                        refresh_cursor = False
565
566            ### Sometimes a line may span multiple files.
567            if lines and subfile_lines and not lines[-1].endswith('\n'):
568                lines[-1] += subfile_lines[0]
569                new_lines = subfile_lines[1:]
570            else:
571                new_lines = subfile_lines
572            lines.extend(new_lines)
573
574        if refresh_cursor:
575            self.refresh_cursor()
576        return lines

Return a list of lines of text.

def seekable(self) -> bool:
579    def seekable(self) -> bool:
580        return True

Return whether object supports random access.

If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().

def seek(self, position: int) -> None:
583    def seek(self, position: int) -> None:
584        """
585        Seek to the beginning of the logs stream.
586        """
587        existing_subfile_indices = self.get_existing_subfile_indices()
588        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
589        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
590        if position == 0:
591            self._cursor = (min_ix, 0)
592            return
593
594        self._cursor = (max_ix, position)
595        if self._current_file_obj is not None:
596            self._current_file_obj.seek(position)

Seek to the beginning of the logs stream.

def flush(self) -> None:
599    def flush(self) -> None:
600        """
601        Flush any open subfiles.
602        """
603        for subfile_index, subfile_object in self.subfile_objects.items():
604            if not subfile_object.closed:
605                try:
606                    subfile_object.flush()
607                except Exception:
608                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
609
610        if self.redirect_streams:
611            try:
612                sys.stdout.flush()
613            except BrokenPipeError:
614                pass
615            except Exception:
616                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
617            try:
618                sys.stderr.flush()
619            except BrokenPipeError:
620                pass
621            except Exception:
622                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")

Flush any open subfiles.

def start_log_fd_interception(self):
625    def start_log_fd_interception(self):
626        """
627        Start the file descriptor monitoring threads.
628        """
629        if not self.write_timestamps:
630            return
631
632        self._stdout_interceptor = FileDescriptorInterceptor(
633            sys.stdout.fileno(),
634            self.get_timestamp_prefix_str,
635        )
636        self._stderr_interceptor = FileDescriptorInterceptor(
637            sys.stderr.fileno(),
638            self.get_timestamp_prefix_str,
639        )
640
641        self._stdout_interceptor_thread = Thread(
642            target = self._stdout_interceptor.start_interception,
643            daemon = True,
644        )
645        self._stderr_interceptor_thread = Thread(
646            target = self._stderr_interceptor.start_interception,
647            daemon = True,
648        )
649        self._stdout_interceptor_thread.start()
650        self._stderr_interceptor_thread.start()
651        self._intercepting = True
652
653        if '_interceptor_threads' not in self.__dict__:
654            self._interceptor_threads = []
655        if '_interceptors' not in self.__dict__:
656            self._interceptors = []
657        self._interceptor_threads.extend([
658            self._stdout_interceptor_thread,
659            self._stderr_interceptor_thread,
660        ])
661        self._interceptors.extend([
662            self._stdout_interceptor,
663            self._stderr_interceptor,
664        ])
665        self.stop_log_fd_interception(unused_only=True)

Start the file descriptor monitoring threads.

def stop_log_fd_interception(self, unused_only: bool = False):
668    def stop_log_fd_interception(self, unused_only: bool = False):
669        """
670        Stop the file descriptor monitoring threads.
671        """
672        if not self.write_timestamps:
673            return
674
675        interceptors = self.__dict__.get('_interceptors', [])
676        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
677
678        end_ix = len(interceptors) if not unused_only else -2
679
680        for interceptor in interceptors[:end_ix]:
681            interceptor.stop_interception()
682        del interceptors[:end_ix]
683
684        for thread in interceptor_threads[:end_ix]:
685            try:
686                thread.join()
687            except Exception:
688                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
689        del interceptor_threads[:end_ix]

Stop the file descriptor monitoring threads.

def touch(self):
691    def touch(self):
692        """
693        Touch the latest subfile.
694        """
695        subfile_path = self.get_latest_subfile_path()
696        subfile_path.touch()

Touch the latest subfile.

def isatty(self) -> bool:
698    def isatty(self) -> bool:
699        return True

Return whether this is an 'interactive' stream.

Return False if it can't be determined.