meerschaum.utils.daemon.RotatingFile

Create a file-like object that manages sub-files under the hood.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Create a file-like object that manages sub-files under the hood.
  7"""
  8
  9import os
 10import io
 11import re
 12import pathlib
 13import traceback
 14import sys
 15import atexit
 16from datetime import datetime, timezone, timedelta
 17from typing import List, Union, Optional, Tuple
 18from meerschaum.config import get_config
 19from meerschaum.utils.warnings import warn
 20from meerschaum.utils.misc import round_time
 21from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor
 22from meerschaum.utils.threading import Thread
 23import meerschaum as mrsm
 24daemon = mrsm.attempt_import('daemon')
 25
 26class RotatingFile(io.IOBase):
 27    """
 28    A `RotatingFile` may be treated like a normal file-like object.
 29    Under the hood, however, it will create new sub-files and delete old ones.
 30    """
 31
 32    SEEK_BACK_ATTEMPTS: int = 5
 33
 34    def __init__(
 35        self,
 36        file_path: pathlib.Path,
 37        num_files_to_keep: Optional[int] = None,
 38        max_file_size: Optional[int] = None,
 39        redirect_streams: bool = False,
 40        write_timestamps: bool = False,
 41        timestamp_format: Optional[str] = None,
 42    ):
 43        """
 44        Create a file-like object which manages other files.
 45
 46        Parameters
 47        ----------
 48        num_files_to_keep: int, default None
 49            How many sub-files to keep at any given time.
 50            Defaults to the configured value (5).
 51
 52        max_file_size: int, default None
 53            How large in bytes each sub-file can grow before another file is created.
 54            Note that this is not a hard limit but rather a threshold
 55            which may be slightly exceeded.
 56            Defaults to the configured value (100_000).
 57
 58        redirect_streams: bool, default False
 59            If `True`, redirect previous file streams when opening a new file descriptor.
 60            
 61            NOTE: Only set this to `True` if you are entering into a daemon context.
 62            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 63
 64        write_timestamps: bool, default False
 65            If `True`, prepend the current UTC timestamp to each line of the file.
 66
 67        timestamp_format: str, default None
 68            If `write_timestamps` is `True`, use this format for the timestamps.
 69            Defaults to `'%Y-%m-%d %H:%M'`.
 70        """
 71        self.file_path = pathlib.Path(file_path)
 72        if num_files_to_keep is None:
 73            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 74        if max_file_size is None:
 75            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 76        if timestamp_format is None:
 77            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 78        if num_files_to_keep < 2:
 79            raise ValueError("At least 2 files must be kept.")
 80        if max_file_size < 1:
 81            raise ValueError("Subfiles must contain at least one byte.")
 82
 83        self.num_files_to_keep = num_files_to_keep
 84        self.max_file_size = max_file_size
 85        self.redirect_streams = redirect_streams
 86        self.write_timestamps = write_timestamps
 87        self.timestamp_format = timestamp_format
 88        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$')
 89
 90        ### When subfiles are opened, map from their index to the file objects.
 91        self.subfile_objects = {}
 92        self._redirected_subfile_objects = {}
 93        self._current_file_obj = None
 94        self._previous_file_obj = None
 95
 96        ### When reading, keep track of the file index and position.
 97        self._cursor: Tuple[int, int] = (0, 0)
 98
 99        ### Don't forget to close any stray files.
100        atexit.register(self.close)
101
102
103    def fileno(self):
104        """
105        Return the file descriptor for the latest subfile.
106        """
107        self.refresh_files(start_interception=False)
108        return self._current_file_obj.fileno()
109
110
111    def get_latest_subfile_path(self) -> pathlib.Path:
112        """
113        Return the path for the latest subfile to which to write into.
114        """
115        return self.get_subfile_path_from_index(
116            self.get_latest_subfile_index()
117        )
118
119
120    def get_remaining_subfile_size(self, subfile_index: int) -> int:
121        """
122        Return the remaining buffer size for a subfile.
123
124        Parameters
125        ---------
126        subfile_index: int
127            The index of the subfile to be checked.
128
129        Returns
130        -------
131        The remaining size in bytes.
132        """
133        subfile_path = self.get_subfile_path_from_index(subfile_index)
134        if not subfile_path.exists():
135            return self.max_file_size
136
137        return self.max_file_size - os.path.getsize(subfile_path)
138
139
140    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
141        """
142        Return whether a given subfile is too large.
143
144        Parameters
145        ----------
146        subfile_index: int
147            The index of the subfile to be checked.
148
149        potential_new_len: int, default 0
150            The length of a potential write of new data.
151
152        Returns
153        -------
154        A bool indicating the subfile is or will be too large.
155        """
156        subfile_path = self.get_subfile_path_from_index(subfile_index)
157        if not subfile_path.exists():
158            return False
159
160        self.flush()
161
162        return (
163            (os.path.getsize(subfile_path) + potential_new_len)
164            >=
165            self.max_file_size
166        )
167
168
169    def get_latest_subfile_index(self) -> int:
170        """
171        Return the latest existing subfile index.
172        If no index may be found, return -1.
173        """
174        existing_subfile_paths = self.get_existing_subfile_paths()
175        latest_index = (
176            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
177            if existing_subfile_paths
178            else 0
179        )
180        return latest_index
181
182
183    def get_index_from_subfile_name(self, subfile_name: str) -> int:
184        """
185        Return the index from a given subfile name.
186        If the file name cannot be parsed, return -1.
187        """
188        try:
189            return int(subfile_name.replace(self.file_path.name + '.', ''))
190        except Exception as e:
191            return -1
192
193
194    def get_subfile_name_from_index(self, subfile_index: int) -> str:
195        """
196        Return the subfile name from the given index.
197        """
198        return f'{self.file_path.name}.{subfile_index}'
199
200
201    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
202        """
203        Return the subfile's path from its index.
204        """
205        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
206
207
208    def get_existing_subfile_indices(self) -> List[int]:
209        """
210        Return of list of subfile indices which exist on disk.
211        """
212        existing_subfile_paths = self.get_existing_subfile_paths()
213        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
214
215
216    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
217        """
218        Return a list of file paths that match the input filename pattern.
219        """
220        if not self.file_path.parent.exists():
221            return []
222
223        subfile_names_indices = sorted(
224            [
225                (file_name, self.get_index_from_subfile_name(file_name))
226                for file_name in os.listdir(self.file_path.parent)
227                if (
228                    file_name.startswith(self.file_path.name)
229                    and re.match(self.subfile_regex_pattern, file_name)
230                )
231            ],
232            key=lambda x: x[1],
233        )
234        return [
235            (self.file_path.parent / file_name)
236            for file_name, _ in subfile_names_indices
237        ]
238
239
240    def refresh_files(
241        self,
242        potential_new_len: int = 0,
243        start_interception: bool = False,
244    ) -> '_io.TextUIWrapper':
245        """
246        Check the state of the subfiles.
247        If the latest subfile is too large, create a new file and delete old ones.
248
249        Parameters
250        ----------
251        potential_new_len: int, default 0
252
253        start_interception: bool, default False
254            If `True`, kick off the file interception threads.
255        """
256        self.flush()
257
258        latest_subfile_index = self.get_latest_subfile_index()
259        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
260
261        ### First run with existing log files: open the most recent log file.
262        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
263
264        ### Sometimes a new file is created but output doesn't switch over.
265        lost_latest_handle = (
266            self._current_file_obj is not None
267            and
268            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
269        )
270        if is_first_run_with_logs or lost_latest_handle:
271            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
272            if self.redirect_streams:
273                try:
274                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
275                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
276                except OSError as e:
277                    warn(
278                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
279                    )
280                if start_interception and self.write_timestamps:
281                    self.start_log_fd_interception()
282
283        create_new_file = (
284            (latest_subfile_index == -1)
285            or
286            self._current_file_obj is None
287            or
288            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
289        )
290        if create_new_file:
291            old_subfile_index = latest_subfile_index
292            new_subfile_index = old_subfile_index + 1
293            new_file_path = self.get_subfile_path_from_index(new_subfile_index)
294            self._previous_file_obj = self._current_file_obj
295            self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
296            self.subfile_objects[new_subfile_index] = self._current_file_obj
297            self.flush()
298
299            if self._previous_file_obj is not None:
300                if self.redirect_streams:
301                    self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
302                    daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
303                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
304                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
305                self.close(unused_only=True)
306
307            ### Sanity check in case writing somehow fails.
308            if self._previous_file_obj is self._current_file_obj:
309                self._previous_file_obj = None
310
311            self.delete(unused_only=True)
312
313        return self._current_file_obj
314
315
316    def close(self, unused_only: bool = False) -> None:
317        """
318        Close any open file descriptors.
319
320        Parameters
321        ----------
322        unused_only: bool, default False
323            If `True`, only close file descriptors not currently in use.
324        """
325        self.stop_log_fd_interception(unused_only=unused_only)
326        subfile_indices = sorted(self.subfile_objects.keys())
327        for subfile_index in subfile_indices:
328            subfile_object = self.subfile_objects[subfile_index]
329            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
330                continue
331            try:
332                if not subfile_object.closed:
333                    subfile_object.close()
334            except Exception as e:
335                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
336
337            _ = self.subfile_objects.pop(subfile_index, None)
338            if self.redirect_streams:
339                _ = self._redirected_subfile_objects.pop(subfile_index, None)
340
341        if not unused_only:
342            self._previous_file_obj = None
343            self._current_file_obj = None
344
345
346    def get_timestamp_prefix_str(self) -> str:
347        """
348        Return the current minute prefix string.
349        """
350        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
351
352
353    def write(self, data: str) -> None:
354        """
355        Write the given text into the latest subfile.
356        If the subfile will be too large, create a new subfile.
357        If too many subfiles exist at once, the oldest one will be deleted.
358
359        NOTE: This will not split data across multiple files.
360        As such, if data is larger than max_file_size, then the corresponding subfile
361        may exceed this limit.
362        """
363        try:
364            self.file_path.parent.mkdir(exist_ok=True, parents=True)
365            if isinstance(data, bytes):
366                data = data.decode('utf-8')
367
368            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
369            suffix_str = "\n" if self.write_timestamps else ""
370            self.refresh_files(
371                potential_new_len = len(prefix_str + data + suffix_str),
372                start_interception = self.write_timestamps,
373            )
374            try:
375                if prefix_str:
376                    self._current_file_obj.write(prefix_str)
377                self._current_file_obj.write(data)
378                if suffix_str:
379                    self._current_file_obj.write(suffix_str)
380            except BrokenPipeError:
381                warn("BrokenPipeError encountered. The daemon may have been terminated.")
382                return
383            except Exception as e:
384                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
385            self.flush()
386            self.delete(unused_only=True)
387        except Exception as e:
388            warn(f"Unexpected error in RotatingFile.write: {e}")
389
390
391    def delete(self, unused_only: bool = False) -> None:
392        """
393        Delete old subfiles.
394
395        Parameters
396        ----------
397        unused_only: bool, default False
398            If `True`, only delete subfiles which are no longer needed.
399        """
400        existing_subfile_paths = self.get_existing_subfile_paths()
401        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
402            return
403
404        self.flush()
405        self.close(unused_only=unused_only)
406
407        end_ix = (
408            (-1 * self.num_files_to_keep)
409            if unused_only
410            else len(existing_subfile_paths)
411        )
412        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
413            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
414            subfile_object = self.subfile_objects.get(subfile_index, None)
415
416            try:
417                subfile_path_to_delete.unlink()
418            except Exception as e:
419                warn(
420                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
421                    + f"{traceback.format_exc()}"
422                )
423
424
425    def read(self, *args, **kwargs) -> str:
426        """
427        Read the contents of the existing subfiles.
428        """
429        existing_subfile_indices = [
430            self.get_index_from_subfile_name(subfile_path.name)
431            for subfile_path in self.get_existing_subfile_paths()
432        ]
433        paths_to_read = [
434            self.get_subfile_path_from_index(subfile_index)
435            for subfile_index in existing_subfile_indices
436            if subfile_index >= self._cursor[0]
437        ]
438        buffer = ''
439        refresh_cursor = True
440        for subfile_path in paths_to_read:
441            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
442            seek_ix = (
443                self._cursor[1]
444                if subfile_index == self._cursor[0]
445                else 0
446            )
447
448            if (
449                subfile_index in self.subfile_objects
450                and
451                subfile_index not in self._redirected_subfile_objects
452            ):
453                subfile_object = self.subfile_objects[subfile_index]
454                for i in range(self.SEEK_BACK_ATTEMPTS):
455                    try:
456                        subfile_object.seek(max(seek_ix - i, 0))
457                        buffer += subfile_object.read()
458                    except UnicodeDecodeError:
459                        continue
460                    break
461            else:
462                with open(subfile_path, 'r', encoding='utf-8') as f:
463                    for i in range(self.SEEK_BACK_ATTEMPTS):
464                        try:
465                            f.seek(max(seek_ix - i, 0))
466                            buffer += f.read()
467                        except UnicodeDecodeError:
468                            continue
469                        break
470
471                    ### Handle the case when no files have yet been opened.
472                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
473                        self._cursor = (subfile_index, f.tell())
474                        refresh_cursor = False
475
476        if refresh_cursor:
477            self.refresh_cursor()
478        return buffer
479
480
481    def refresh_cursor(self) -> None:
482        """
483        Update the cursor to the latest subfile index and file.tell() value.
484        """
485        self.flush()
486        existing_subfile_paths = self.get_existing_subfile_paths()
487        current_ix = (
488            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
489            if existing_subfile_paths
490            else 0
491        )
492        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
493        self._cursor = (current_ix, position)
494
495
496    def readlines(self) -> List[str]:
497        """
498        Return a list of lines of text.
499        """
500        existing_subfile_indices = [
501            self.get_index_from_subfile_name(subfile_path.name)
502            for subfile_path in self.get_existing_subfile_paths()
503        ]
504        paths_to_read = [
505            self.get_subfile_path_from_index(subfile_index)
506            for subfile_index in existing_subfile_indices
507            if subfile_index >= self._cursor[0]
508        ]
509
510        lines = []
511        refresh_cursor = True
512        for subfile_path in paths_to_read:
513            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
514            seek_ix = (
515                self._cursor[1]
516                if subfile_index == self._cursor[0]
517                else 0
518            )
519
520            if (
521                subfile_index in self.subfile_objects
522                and
523                subfile_index not in self._redirected_subfile_objects
524            ):
525                subfile_object = self.subfile_objects[subfile_index]
526                for i in range(self.SEEK_BACK_ATTEMPTS):
527                    try:
528                        subfile_object.seek(max((seek_ix - i), 0))
529                        subfile_lines = subfile_object.readlines()
530                    except UnicodeDecodeError:
531                        continue
532                    break
533            else:
534                with open(subfile_path, 'r', encoding='utf-8') as f:
535                    for i in range(self.SEEK_BACK_ATTEMPTS):
536                        try:
537                            f.seek(max(seek_ix - i, 0))
538                            subfile_lines = f.readlines()
539                        except UnicodeDecodeError:
540                            continue
541                        break
542
543                    ### Handle the case when no files have yet been opened.
544                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
545                        self._cursor = (subfile_index, f.tell())
546                        refresh_cursor = False
547
548            ### Sometimes a line may span multiple files.
549            if lines and subfile_lines and not lines[-1].endswith('\n'):
550                lines[-1] += subfile_lines[0]
551                new_lines = subfile_lines[1:]
552            else:
553                new_lines = subfile_lines
554            lines.extend(new_lines)
555
556        if refresh_cursor:
557            self.refresh_cursor()
558        return lines
559
560
561    def seekable(self) -> bool:
562        return True
563
564
565    def seek(self, position: int) -> None:
566        """
567        Seek to the beginning of the logs stream.
568        """
569        existing_subfile_indices = self.get_existing_subfile_indices()
570        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
571        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
572        if position == 0:
573            self._cursor = (min_ix, 0)
574            return
575
576        self._cursor = (max_ix, position)
577        if self._current_file_obj is not None:
578            self._current_file_obj.seek(position)
579
580    
581    def flush(self) -> None:
582        """
583        Flush any open subfiles.
584        """
585        for subfile_index, subfile_object in self.subfile_objects.items():
586            if not subfile_object.closed:
587                try:
588                    subfile_object.flush()
589                except Exception as e:
590                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
591        if self.redirect_streams:
592            try:
593                sys.stdout.flush()
594            except BrokenPipeError:
595                pass
596            except Exception as e:
597                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
598            try:
599                sys.stderr.flush()
600            except BrokenPipeError:
601                pass
602            except Exception as e:
603                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
604
605
606    def start_log_fd_interception(self):
607        """
608        Start the file descriptor monitoring threads.
609        """
610        if not self.write_timestamps:
611            return
612
613        self._stdout_interceptor = FileDescriptorInterceptor(
614            sys.stdout.fileno(),
615            self.get_timestamp_prefix_str,
616        )
617        self._stderr_interceptor = FileDescriptorInterceptor(
618            sys.stderr.fileno(),
619            self.get_timestamp_prefix_str,
620        )
621
622        self._stdout_interceptor_thread = Thread(
623            target = self._stdout_interceptor.start_interception,
624            daemon = True,
625        )
626        self._stderr_interceptor_thread = Thread(
627            target = self._stderr_interceptor.start_interception,
628            daemon = True,
629        )
630        self._stdout_interceptor_thread.start()
631        self._stderr_interceptor_thread.start()
632        self._intercepting = True
633
634        if '_interceptor_threads' not in self.__dict__:
635            self._interceptor_threads = []
636        if '_interceptors' not in self.__dict__:
637            self._interceptors = []
638        self._interceptor_threads.extend([
639            self._stdout_interceptor_thread,
640            self._stderr_interceptor_thread,
641        ])
642        self._interceptors.extend([
643            self._stdout_interceptor,
644            self._stderr_interceptor,
645        ])
646        self.stop_log_fd_interception(unused_only=True)
647
648
649    def stop_log_fd_interception(self, unused_only: bool = False):
650        """
651        Stop the file descriptor monitoring threads.
652        """
653        if not self.write_timestamps:
654            return
655
656        interceptors = self.__dict__.get('_interceptors', [])
657        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
658
659        end_ix = len(interceptors) if not unused_only else -2
660
661        for interceptor in interceptors[:end_ix]:
662            interceptor.stop_interception()
663        del interceptors[:end_ix]
664
665        for thread in interceptor_threads[:end_ix]:
666            try:
667                thread.join()
668            except Exception as e:
669                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
670        del interceptor_threads[:end_ix]
671
672
673    def __repr__(self) -> str:
674        """
675        Return basic info for this `RotatingFile`.
676        """
677        return (
678            "RotatingFile("
679            + f"'{self.file_path.as_posix()}', "
680            + f"num_files_to_keep={self.num_files_to_keep}, "
681            + f"max_file_size={self.max_file_size})"
682        )
class RotatingFile(io.IOBase):
 27class RotatingFile(io.IOBase):
 28    """
 29    A `RotatingFile` may be treated like a normal file-like object.
 30    Under the hood, however, it will create new sub-files and delete old ones.
 31    """
 32
 33    SEEK_BACK_ATTEMPTS: int = 5
 34
 35    def __init__(
 36        self,
 37        file_path: pathlib.Path,
 38        num_files_to_keep: Optional[int] = None,
 39        max_file_size: Optional[int] = None,
 40        redirect_streams: bool = False,
 41        write_timestamps: bool = False,
 42        timestamp_format: Optional[str] = None,
 43    ):
 44        """
 45        Create a file-like object which manages other files.
 46
 47        Parameters
 48        ----------
 49        num_files_to_keep: int, default None
 50            How many sub-files to keep at any given time.
 51            Defaults to the configured value (5).
 52
 53        max_file_size: int, default None
 54            How large in bytes each sub-file can grow before another file is created.
 55            Note that this is not a hard limit but rather a threshold
 56            which may be slightly exceeded.
 57            Defaults to the configured value (100_000).
 58
 59        redirect_streams: bool, default False
 60            If `True`, redirect previous file streams when opening a new file descriptor.
 61            
 62            NOTE: Only set this to `True` if you are entering into a daemon context.
 63            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 64
 65        write_timestamps: bool, default False
 66            If `True`, prepend the current UTC timestamp to each line of the file.
 67
 68        timestamp_format: str, default None
 69            If `write_timestamps` is `True`, use this format for the timestamps.
 70            Defaults to `'%Y-%m-%d %H:%M'`.
 71        """
 72        self.file_path = pathlib.Path(file_path)
 73        if num_files_to_keep is None:
 74            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 75        if max_file_size is None:
 76            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 77        if timestamp_format is None:
 78            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 79        if num_files_to_keep < 2:
 80            raise ValueError("At least 2 files must be kept.")
 81        if max_file_size < 1:
 82            raise ValueError("Subfiles must contain at least one byte.")
 83
 84        self.num_files_to_keep = num_files_to_keep
 85        self.max_file_size = max_file_size
 86        self.redirect_streams = redirect_streams
 87        self.write_timestamps = write_timestamps
 88        self.timestamp_format = timestamp_format
 89        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$')
 90
 91        ### When subfiles are opened, map from their index to the file objects.
 92        self.subfile_objects = {}
 93        self._redirected_subfile_objects = {}
 94        self._current_file_obj = None
 95        self._previous_file_obj = None
 96
 97        ### When reading, keep track of the file index and position.
 98        self._cursor: Tuple[int, int] = (0, 0)
 99
100        ### Don't forget to close any stray files.
101        atexit.register(self.close)
102
103
104    def fileno(self):
105        """
106        Return the file descriptor for the latest subfile.
107        """
108        self.refresh_files(start_interception=False)
109        return self._current_file_obj.fileno()
110
111
112    def get_latest_subfile_path(self) -> pathlib.Path:
113        """
114        Return the path for the latest subfile to which to write into.
115        """
116        return self.get_subfile_path_from_index(
117            self.get_latest_subfile_index()
118        )
119
120
121    def get_remaining_subfile_size(self, subfile_index: int) -> int:
122        """
123        Return the remaining buffer size for a subfile.
124
125        Parameters
126        ---------
127        subfile_index: int
128            The index of the subfile to be checked.
129
130        Returns
131        -------
132        The remaining size in bytes.
133        """
134        subfile_path = self.get_subfile_path_from_index(subfile_index)
135        if not subfile_path.exists():
136            return self.max_file_size
137
138        return self.max_file_size - os.path.getsize(subfile_path)
139
140
141    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
142        """
143        Return whether a given subfile is too large.
144
145        Parameters
146        ----------
147        subfile_index: int
148            The index of the subfile to be checked.
149
150        potential_new_len: int, default 0
151            The length of a potential write of new data.
152
153        Returns
154        -------
155        A bool indicating the subfile is or will be too large.
156        """
157        subfile_path = self.get_subfile_path_from_index(subfile_index)
158        if not subfile_path.exists():
159            return False
160
161        self.flush()
162
163        return (
164            (os.path.getsize(subfile_path) + potential_new_len)
165            >=
166            self.max_file_size
167        )
168
169
170    def get_latest_subfile_index(self) -> int:
171        """
172        Return the latest existing subfile index.
173        If no index may be found, return -1.
174        """
175        existing_subfile_paths = self.get_existing_subfile_paths()
176        latest_index = (
177            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
178            if existing_subfile_paths
179            else 0
180        )
181        return latest_index
182
183
184    def get_index_from_subfile_name(self, subfile_name: str) -> int:
185        """
186        Return the index from a given subfile name.
187        If the file name cannot be parsed, return -1.
188        """
189        try:
190            return int(subfile_name.replace(self.file_path.name + '.', ''))
191        except Exception as e:
192            return -1
193
194
195    def get_subfile_name_from_index(self, subfile_index: int) -> str:
196        """
197        Return the subfile name from the given index.
198        """
199        return f'{self.file_path.name}.{subfile_index}'
200
201
202    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
203        """
204        Return the subfile's path from its index.
205        """
206        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
207
208
209    def get_existing_subfile_indices(self) -> List[int]:
210        """
211        Return of list of subfile indices which exist on disk.
212        """
213        existing_subfile_paths = self.get_existing_subfile_paths()
214        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
215
216
217    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
218        """
219        Return a list of file paths that match the input filename pattern.
220        """
221        if not self.file_path.parent.exists():
222            return []
223
224        subfile_names_indices = sorted(
225            [
226                (file_name, self.get_index_from_subfile_name(file_name))
227                for file_name in os.listdir(self.file_path.parent)
228                if (
229                    file_name.startswith(self.file_path.name)
230                    and re.match(self.subfile_regex_pattern, file_name)
231                )
232            ],
233            key=lambda x: x[1],
234        )
235        return [
236            (self.file_path.parent / file_name)
237            for file_name, _ in subfile_names_indices
238        ]
239
240
241    def refresh_files(
242        self,
243        potential_new_len: int = 0,
244        start_interception: bool = False,
245    ) -> '_io.TextUIWrapper':
246        """
247        Check the state of the subfiles.
248        If the latest subfile is too large, create a new file and delete old ones.
249
250        Parameters
251        ----------
252        potential_new_len: int, default 0
253
254        start_interception: bool, default False
255            If `True`, kick off the file interception threads.
256        """
257        self.flush()
258
259        latest_subfile_index = self.get_latest_subfile_index()
260        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
261
262        ### First run with existing log files: open the most recent log file.
263        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
264
265        ### Sometimes a new file is created but output doesn't switch over.
266        lost_latest_handle = (
267            self._current_file_obj is not None
268            and
269            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
270        )
271        if is_first_run_with_logs or lost_latest_handle:
272            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
273            if self.redirect_streams:
274                try:
275                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
276                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
277                except OSError as e:
278                    warn(
279                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
280                    )
281                if start_interception and self.write_timestamps:
282                    self.start_log_fd_interception()
283
284        create_new_file = (
285            (latest_subfile_index == -1)
286            or
287            self._current_file_obj is None
288            or
289            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
290        )
291        if create_new_file:
292            old_subfile_index = latest_subfile_index
293            new_subfile_index = old_subfile_index + 1
294            new_file_path = self.get_subfile_path_from_index(new_subfile_index)
295            self._previous_file_obj = self._current_file_obj
296            self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
297            self.subfile_objects[new_subfile_index] = self._current_file_obj
298            self.flush()
299
300            if self._previous_file_obj is not None:
301                if self.redirect_streams:
302                    self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
303                    daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
304                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
305                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
306                self.close(unused_only=True)
307
308            ### Sanity check in case writing somehow fails.
309            if self._previous_file_obj is self._current_file_obj:
310                self._previous_file_obj = None
311
312            self.delete(unused_only=True)
313
314        return self._current_file_obj
315
316
317    def close(self, unused_only: bool = False) -> None:
318        """
319        Close any open file descriptors.
320
321        Parameters
322        ----------
323        unused_only: bool, default False
324            If `True`, only close file descriptors not currently in use.
325        """
326        self.stop_log_fd_interception(unused_only=unused_only)
327        subfile_indices = sorted(self.subfile_objects.keys())
328        for subfile_index in subfile_indices:
329            subfile_object = self.subfile_objects[subfile_index]
330            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
331                continue
332            try:
333                if not subfile_object.closed:
334                    subfile_object.close()
335            except Exception as e:
336                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
337
338            _ = self.subfile_objects.pop(subfile_index, None)
339            if self.redirect_streams:
340                _ = self._redirected_subfile_objects.pop(subfile_index, None)
341
342        if not unused_only:
343            self._previous_file_obj = None
344            self._current_file_obj = None
345
346
347    def get_timestamp_prefix_str(self) -> str:
348        """
349        Return the current minute prefix string.
350        """
351        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
352
353
354    def write(self, data: str) -> None:
355        """
356        Write the given text into the latest subfile.
357        If the subfile will be too large, create a new subfile.
358        If too many subfiles exist at once, the oldest one will be deleted.
359
360        NOTE: This will not split data across multiple files.
361        As such, if data is larger than max_file_size, then the corresponding subfile
362        may exceed this limit.
363        """
364        try:
365            self.file_path.parent.mkdir(exist_ok=True, parents=True)
366            if isinstance(data, bytes):
367                data = data.decode('utf-8')
368
369            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
370            suffix_str = "\n" if self.write_timestamps else ""
371            self.refresh_files(
372                potential_new_len = len(prefix_str + data + suffix_str),
373                start_interception = self.write_timestamps,
374            )
375            try:
376                if prefix_str:
377                    self._current_file_obj.write(prefix_str)
378                self._current_file_obj.write(data)
379                if suffix_str:
380                    self._current_file_obj.write(suffix_str)
381            except BrokenPipeError:
382                warn("BrokenPipeError encountered. The daemon may have been terminated.")
383                return
384            except Exception as e:
385                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
386            self.flush()
387            self.delete(unused_only=True)
388        except Exception as e:
389            warn(f"Unexpected error in RotatingFile.write: {e}")
390
391
392    def delete(self, unused_only: bool = False) -> None:
393        """
394        Delete old subfiles.
395
396        Parameters
397        ----------
398        unused_only: bool, default False
399            If `True`, only delete subfiles which are no longer needed.
400        """
401        existing_subfile_paths = self.get_existing_subfile_paths()
402        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
403            return
404
405        self.flush()
406        self.close(unused_only=unused_only)
407
408        end_ix = (
409            (-1 * self.num_files_to_keep)
410            if unused_only
411            else len(existing_subfile_paths)
412        )
413        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
414            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
415            subfile_object = self.subfile_objects.get(subfile_index, None)
416
417            try:
418                subfile_path_to_delete.unlink()
419            except Exception as e:
420                warn(
421                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
422                    + f"{traceback.format_exc()}"
423                )
424
425
426    def read(self, *args, **kwargs) -> str:
427        """
428        Read the contents of the existing subfiles.
429        """
430        existing_subfile_indices = [
431            self.get_index_from_subfile_name(subfile_path.name)
432            for subfile_path in self.get_existing_subfile_paths()
433        ]
434        paths_to_read = [
435            self.get_subfile_path_from_index(subfile_index)
436            for subfile_index in existing_subfile_indices
437            if subfile_index >= self._cursor[0]
438        ]
439        buffer = ''
440        refresh_cursor = True
441        for subfile_path in paths_to_read:
442            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
443            seek_ix = (
444                self._cursor[1]
445                if subfile_index == self._cursor[0]
446                else 0
447            )
448
449            if (
450                subfile_index in self.subfile_objects
451                and
452                subfile_index not in self._redirected_subfile_objects
453            ):
454                subfile_object = self.subfile_objects[subfile_index]
455                for i in range(self.SEEK_BACK_ATTEMPTS):
456                    try:
457                        subfile_object.seek(max(seek_ix - i, 0))
458                        buffer += subfile_object.read()
459                    except UnicodeDecodeError:
460                        continue
461                    break
462            else:
463                with open(subfile_path, 'r', encoding='utf-8') as f:
464                    for i in range(self.SEEK_BACK_ATTEMPTS):
465                        try:
466                            f.seek(max(seek_ix - i, 0))
467                            buffer += f.read()
468                        except UnicodeDecodeError:
469                            continue
470                        break
471
472                    ### Handle the case when no files have yet been opened.
473                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
474                        self._cursor = (subfile_index, f.tell())
475                        refresh_cursor = False
476
477        if refresh_cursor:
478            self.refresh_cursor()
479        return buffer
480
481
482    def refresh_cursor(self) -> None:
483        """
484        Update the cursor to the latest subfile index and file.tell() value.
485        """
486        self.flush()
487        existing_subfile_paths = self.get_existing_subfile_paths()
488        current_ix = (
489            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
490            if existing_subfile_paths
491            else 0
492        )
493        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
494        self._cursor = (current_ix, position)
495
496
497    def readlines(self) -> List[str]:
498        """
499        Return a list of lines of text.
500        """
501        existing_subfile_indices = [
502            self.get_index_from_subfile_name(subfile_path.name)
503            for subfile_path in self.get_existing_subfile_paths()
504        ]
505        paths_to_read = [
506            self.get_subfile_path_from_index(subfile_index)
507            for subfile_index in existing_subfile_indices
508            if subfile_index >= self._cursor[0]
509        ]
510
511        lines = []
512        refresh_cursor = True
513        for subfile_path in paths_to_read:
514            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
515            seek_ix = (
516                self._cursor[1]
517                if subfile_index == self._cursor[0]
518                else 0
519            )
520
521            if (
522                subfile_index in self.subfile_objects
523                and
524                subfile_index not in self._redirected_subfile_objects
525            ):
526                subfile_object = self.subfile_objects[subfile_index]
527                for i in range(self.SEEK_BACK_ATTEMPTS):
528                    try:
529                        subfile_object.seek(max((seek_ix - i), 0))
530                        subfile_lines = subfile_object.readlines()
531                    except UnicodeDecodeError:
532                        continue
533                    break
534            else:
535                with open(subfile_path, 'r', encoding='utf-8') as f:
536                    for i in range(self.SEEK_BACK_ATTEMPTS):
537                        try:
538                            f.seek(max(seek_ix - i, 0))
539                            subfile_lines = f.readlines()
540                        except UnicodeDecodeError:
541                            continue
542                        break
543
544                    ### Handle the case when no files have yet been opened.
545                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
546                        self._cursor = (subfile_index, f.tell())
547                        refresh_cursor = False
548
549            ### Sometimes a line may span multiple files.
550            if lines and subfile_lines and not lines[-1].endswith('\n'):
551                lines[-1] += subfile_lines[0]
552                new_lines = subfile_lines[1:]
553            else:
554                new_lines = subfile_lines
555            lines.extend(new_lines)
556
557        if refresh_cursor:
558            self.refresh_cursor()
559        return lines
560
561
562    def seekable(self) -> bool:
563        return True
564
565
566    def seek(self, position: int) -> None:
567        """
568        Seek to the beginning of the logs stream.
569        """
570        existing_subfile_indices = self.get_existing_subfile_indices()
571        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
572        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
573        if position == 0:
574            self._cursor = (min_ix, 0)
575            return
576
577        self._cursor = (max_ix, position)
578        if self._current_file_obj is not None:
579            self._current_file_obj.seek(position)
580
581    
582    def flush(self) -> None:
583        """
584        Flush any open subfiles.
585        """
586        for subfile_index, subfile_object in self.subfile_objects.items():
587            if not subfile_object.closed:
588                try:
589                    subfile_object.flush()
590                except Exception as e:
591                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
592        if self.redirect_streams:
593            try:
594                sys.stdout.flush()
595            except BrokenPipeError:
596                pass
597            except Exception as e:
598                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
599            try:
600                sys.stderr.flush()
601            except BrokenPipeError:
602                pass
603            except Exception as e:
604                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
605
606
607    def start_log_fd_interception(self):
608        """
609        Start the file descriptor monitoring threads.
610        """
611        if not self.write_timestamps:
612            return
613
614        self._stdout_interceptor = FileDescriptorInterceptor(
615            sys.stdout.fileno(),
616            self.get_timestamp_prefix_str,
617        )
618        self._stderr_interceptor = FileDescriptorInterceptor(
619            sys.stderr.fileno(),
620            self.get_timestamp_prefix_str,
621        )
622
623        self._stdout_interceptor_thread = Thread(
624            target = self._stdout_interceptor.start_interception,
625            daemon = True,
626        )
627        self._stderr_interceptor_thread = Thread(
628            target = self._stderr_interceptor.start_interception,
629            daemon = True,
630        )
631        self._stdout_interceptor_thread.start()
632        self._stderr_interceptor_thread.start()
633        self._intercepting = True
634
635        if '_interceptor_threads' not in self.__dict__:
636            self._interceptor_threads = []
637        if '_interceptors' not in self.__dict__:
638            self._interceptors = []
639        self._interceptor_threads.extend([
640            self._stdout_interceptor_thread,
641            self._stderr_interceptor_thread,
642        ])
643        self._interceptors.extend([
644            self._stdout_interceptor,
645            self._stderr_interceptor,
646        ])
647        self.stop_log_fd_interception(unused_only=True)
648
649
650    def stop_log_fd_interception(self, unused_only: bool = False):
651        """
652        Stop the file descriptor monitoring threads.
653        """
654        if not self.write_timestamps:
655            return
656
657        interceptors = self.__dict__.get('_interceptors', [])
658        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
659
660        end_ix = len(interceptors) if not unused_only else -2
661
662        for interceptor in interceptors[:end_ix]:
663            interceptor.stop_interception()
664        del interceptors[:end_ix]
665
666        for thread in interceptor_threads[:end_ix]:
667            try:
668                thread.join()
669            except Exception as e:
670                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
671        del interceptor_threads[:end_ix]
672
673
674    def __repr__(self) -> str:
675        """
676        Return basic info for this `RotatingFile`.
677        """
678        return (
679            "RotatingFile("
680            + f"'{self.file_path.as_posix()}', "
681            + f"num_files_to_keep={self.num_files_to_keep}, "
682            + f"max_file_size={self.max_file_size})"
683        )

A RotatingFile may be treated like a normal file-like object. Under the hood, however, it will create new sub-files and delete old ones.

RotatingFile( file_path: pathlib.Path, num_files_to_keep: Optional[int] = None, max_file_size: Optional[int] = None, redirect_streams: bool = False, write_timestamps: bool = False, timestamp_format: Optional[str] = None)
 35    def __init__(
 36        self,
 37        file_path: pathlib.Path,
 38        num_files_to_keep: Optional[int] = None,
 39        max_file_size: Optional[int] = None,
 40        redirect_streams: bool = False,
 41        write_timestamps: bool = False,
 42        timestamp_format: Optional[str] = None,
 43    ):
 44        """
 45        Create a file-like object which manages other files.
 46
 47        Parameters
 48        ----------
 49        num_files_to_keep: int, default None
 50            How many sub-files to keep at any given time.
 51            Defaults to the configured value (5).
 52
 53        max_file_size: int, default None
 54            How large in bytes each sub-file can grow before another file is created.
 55            Note that this is not a hard limit but rather a threshold
 56            which may be slightly exceeded.
 57            Defaults to the configured value (100_000).
 58
 59        redirect_streams: bool, default False
 60            If `True`, redirect previous file streams when opening a new file descriptor.
 61            
 62            NOTE: Only set this to `True` if you are entering into a daemon context.
 63            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 64
 65        write_timestamps: bool, default False
 66            If `True`, prepend the current UTC timestamp to each line of the file.
 67
 68        timestamp_format: str, default None
 69            If `write_timestamps` is `True`, use this format for the timestamps.
 70            Defaults to `'%Y-%m-%d %H:%M'`.
 71        """
 72        self.file_path = pathlib.Path(file_path)
 73        if num_files_to_keep is None:
 74            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 75        if max_file_size is None:
 76            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 77        if timestamp_format is None:
 78            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 79        if num_files_to_keep < 2:
 80            raise ValueError("At least 2 files must be kept.")
 81        if max_file_size < 1:
 82            raise ValueError("Subfiles must contain at least one byte.")
 83
 84        self.num_files_to_keep = num_files_to_keep
 85        self.max_file_size = max_file_size
 86        self.redirect_streams = redirect_streams
 87        self.write_timestamps = write_timestamps
 88        self.timestamp_format = timestamp_format
 89        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$')
 90
 91        ### When subfiles are opened, map from their index to the file objects.
 92        self.subfile_objects = {}
 93        self._redirected_subfile_objects = {}
 94        self._current_file_obj = None
 95        self._previous_file_obj = None
 96
 97        ### When reading, keep track of the file index and position.
 98        self._cursor: Tuple[int, int] = (0, 0)
 99
100        ### Don't forget to close any stray files.
101        atexit.register(self.close)

Create a file-like object which manages other files.

Parameters
  • num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
  • max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
  • redirect_streams (bool, default False): If True, redirect previous file streams when opening a new file descriptor.

    NOTE: Only set this to True if you are entering into a daemon context. Doing so will redirect sys.stdout and sys.stderr into the log files.

  • write_timestamps (bool, default False): If True, prepend the current UTC timestamp to each line of the file.
  • timestamp_format (str, default None): If write_timestamps is True, use this format for the timestamps. Defaults to '%Y-%m-%d %H:%M'.
SEEK_BACK_ATTEMPTS: int = 5
file_path
num_files_to_keep
max_file_size
redirect_streams
write_timestamps
timestamp_format
subfile_regex_pattern
subfile_objects
def fileno(self):
104    def fileno(self):
105        """
106        Return the file descriptor for the latest subfile.
107        """
108        self.refresh_files(start_interception=False)
109        return self._current_file_obj.fileno()

Return the file descriptor for the latest subfile.

def get_latest_subfile_path(self) -> pathlib.Path:
112    def get_latest_subfile_path(self) -> pathlib.Path:
113        """
114        Return the path for the latest subfile to which to write into.
115        """
116        return self.get_subfile_path_from_index(
117            self.get_latest_subfile_index()
118        )

Return the path for the latest subfile to which to write into.

def get_remaining_subfile_size(self, subfile_index: int) -> int:
121    def get_remaining_subfile_size(self, subfile_index: int) -> int:
122        """
123        Return the remaining buffer size for a subfile.
124
125        Parameters
126        ---------
127        subfile_index: int
128            The index of the subfile to be checked.
129
130        Returns
131        -------
132        The remaining size in bytes.
133        """
134        subfile_path = self.get_subfile_path_from_index(subfile_index)
135        if not subfile_path.exists():
136            return self.max_file_size
137
138        return self.max_file_size - os.path.getsize(subfile_path)

Return the remaining buffer size for a subfile.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
Returns
  • The remaining size in bytes.
def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
141    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
142        """
143        Return whether a given subfile is too large.
144
145        Parameters
146        ----------
147        subfile_index: int
148            The index of the subfile to be checked.
149
150        potential_new_len: int, default 0
151            The length of a potential write of new data.
152
153        Returns
154        -------
155        A bool indicating the subfile is or will be too large.
156        """
157        subfile_path = self.get_subfile_path_from_index(subfile_index)
158        if not subfile_path.exists():
159            return False
160
161        self.flush()
162
163        return (
164            (os.path.getsize(subfile_path) + potential_new_len)
165            >=
166            self.max_file_size
167        )

Return whether a given subfile is too large.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
  • potential_new_len (int, default 0): The length of a potential write of new data.
Returns
  • A bool indicating the subfile is or will be too large.
def get_latest_subfile_index(self) -> int:
170    def get_latest_subfile_index(self) -> int:
171        """
172        Return the latest existing subfile index.
173        If no index may be found, return -1.
174        """
175        existing_subfile_paths = self.get_existing_subfile_paths()
176        latest_index = (
177            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
178            if existing_subfile_paths
179            else 0
180        )
181        return latest_index

Return the latest existing subfile index. If no index may be found, return -1.

def get_index_from_subfile_name(self, subfile_name: str) -> int:
184    def get_index_from_subfile_name(self, subfile_name: str) -> int:
185        """
186        Return the index from a given subfile name.
187        If the file name cannot be parsed, return -1.
188        """
189        try:
190            return int(subfile_name.replace(self.file_path.name + '.', ''))
191        except Exception as e:
192            return -1

Return the index from a given subfile name. If the file name cannot be parsed, return -1.

def get_subfile_name_from_index(self, subfile_index: int) -> str:
195    def get_subfile_name_from_index(self, subfile_index: int) -> str:
196        """
197        Return the subfile name from the given index.
198        """
199        return f'{self.file_path.name}.{subfile_index}'

Return the subfile name from the given index.

def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
202    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
203        """
204        Return the subfile's path from its index.
205        """
206        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)

Return the subfile's path from its index.

def get_existing_subfile_indices(self) -> List[int]:
209    def get_existing_subfile_indices(self) -> List[int]:
210        """
211        Return of list of subfile indices which exist on disk.
212        """
213        existing_subfile_paths = self.get_existing_subfile_paths()
214        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]

Return of list of subfile indices which exist on disk.

def get_existing_subfile_paths(self) -> List[pathlib.Path]:
217    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
218        """
219        Return a list of file paths that match the input filename pattern.
220        """
221        if not self.file_path.parent.exists():
222            return []
223
224        subfile_names_indices = sorted(
225            [
226                (file_name, self.get_index_from_subfile_name(file_name))
227                for file_name in os.listdir(self.file_path.parent)
228                if (
229                    file_name.startswith(self.file_path.name)
230                    and re.match(self.subfile_regex_pattern, file_name)
231                )
232            ],
233            key=lambda x: x[1],
234        )
235        return [
236            (self.file_path.parent / file_name)
237            for file_name, _ in subfile_names_indices
238        ]

Return a list of file paths that match the input filename pattern.

def refresh_files( self, potential_new_len: int = 0, start_interception: bool = False) -> '_io.TextUIWrapper':
241    def refresh_files(
242        self,
243        potential_new_len: int = 0,
244        start_interception: bool = False,
245    ) -> '_io.TextUIWrapper':
246        """
247        Check the state of the subfiles.
248        If the latest subfile is too large, create a new file and delete old ones.
249
250        Parameters
251        ----------
252        potential_new_len: int, default 0
253
254        start_interception: bool, default False
255            If `True`, kick off the file interception threads.
256        """
257        self.flush()
258
259        latest_subfile_index = self.get_latest_subfile_index()
260        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
261
262        ### First run with existing log files: open the most recent log file.
263        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
264
265        ### Sometimes a new file is created but output doesn't switch over.
266        lost_latest_handle = (
267            self._current_file_obj is not None
268            and
269            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
270        )
271        if is_first_run_with_logs or lost_latest_handle:
272            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
273            if self.redirect_streams:
274                try:
275                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
276                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
277                except OSError as e:
278                    warn(
279                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
280                    )
281                if start_interception and self.write_timestamps:
282                    self.start_log_fd_interception()
283
284        create_new_file = (
285            (latest_subfile_index == -1)
286            or
287            self._current_file_obj is None
288            or
289            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
290        )
291        if create_new_file:
292            old_subfile_index = latest_subfile_index
293            new_subfile_index = old_subfile_index + 1
294            new_file_path = self.get_subfile_path_from_index(new_subfile_index)
295            self._previous_file_obj = self._current_file_obj
296            self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
297            self.subfile_objects[new_subfile_index] = self._current_file_obj
298            self.flush()
299
300            if self._previous_file_obj is not None:
301                if self.redirect_streams:
302                    self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
303                    daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
304                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
305                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
306                self.close(unused_only=True)
307
308            ### Sanity check in case writing somehow fails.
309            if self._previous_file_obj is self._current_file_obj:
310                self._previous_file_obj = None
311
312            self.delete(unused_only=True)
313
314        return self._current_file_obj

Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.

Parameters
  • potential_new_len (int, default 0):

  • start_interception (bool, default False): If True, kick off the file interception threads.

def close(self, unused_only: bool = False) -> None:
317    def close(self, unused_only: bool = False) -> None:
318        """
319        Close any open file descriptors.
320
321        Parameters
322        ----------
323        unused_only: bool, default False
324            If `True`, only close file descriptors not currently in use.
325        """
326        self.stop_log_fd_interception(unused_only=unused_only)
327        subfile_indices = sorted(self.subfile_objects.keys())
328        for subfile_index in subfile_indices:
329            subfile_object = self.subfile_objects[subfile_index]
330            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
331                continue
332            try:
333                if not subfile_object.closed:
334                    subfile_object.close()
335            except Exception as e:
336                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
337
338            _ = self.subfile_objects.pop(subfile_index, None)
339            if self.redirect_streams:
340                _ = self._redirected_subfile_objects.pop(subfile_index, None)
341
342        if not unused_only:
343            self._previous_file_obj = None
344            self._current_file_obj = None

Close any open file descriptors.

Parameters
  • unused_only (bool, default False): If True, only close file descriptors not currently in use.
def get_timestamp_prefix_str(self) -> str:
347    def get_timestamp_prefix_str(self) -> str:
348        """
349        Return the current minute prefix string.
350        """
351        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '

Return the current minute prefix string.

def write(self, data: str) -> None:
354    def write(self, data: str) -> None:
355        """
356        Write the given text into the latest subfile.
357        If the subfile will be too large, create a new subfile.
358        If too many subfiles exist at once, the oldest one will be deleted.
359
360        NOTE: This will not split data across multiple files.
361        As such, if data is larger than max_file_size, then the corresponding subfile
362        may exceed this limit.
363        """
364        try:
365            self.file_path.parent.mkdir(exist_ok=True, parents=True)
366            if isinstance(data, bytes):
367                data = data.decode('utf-8')
368
369            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
370            suffix_str = "\n" if self.write_timestamps else ""
371            self.refresh_files(
372                potential_new_len = len(prefix_str + data + suffix_str),
373                start_interception = self.write_timestamps,
374            )
375            try:
376                if prefix_str:
377                    self._current_file_obj.write(prefix_str)
378                self._current_file_obj.write(data)
379                if suffix_str:
380                    self._current_file_obj.write(suffix_str)
381            except BrokenPipeError:
382                warn("BrokenPipeError encountered. The daemon may have been terminated.")
383                return
384            except Exception as e:
385                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
386            self.flush()
387            self.delete(unused_only=True)
388        except Exception as e:
389            warn(f"Unexpected error in RotatingFile.write: {e}")

Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.

NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.

def delete(self, unused_only: bool = False) -> None:
392    def delete(self, unused_only: bool = False) -> None:
393        """
394        Delete old subfiles.
395
396        Parameters
397        ----------
398        unused_only: bool, default False
399            If `True`, only delete subfiles which are no longer needed.
400        """
401        existing_subfile_paths = self.get_existing_subfile_paths()
402        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
403            return
404
405        self.flush()
406        self.close(unused_only=unused_only)
407
408        end_ix = (
409            (-1 * self.num_files_to_keep)
410            if unused_only
411            else len(existing_subfile_paths)
412        )
413        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
414            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
415            subfile_object = self.subfile_objects.get(subfile_index, None)
416
417            try:
418                subfile_path_to_delete.unlink()
419            except Exception as e:
420                warn(
421                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
422                    + f"{traceback.format_exc()}"
423                )

Delete old subfiles.

Parameters
  • unused_only (bool, default False): If True, only delete subfiles which are no longer needed.
def read(self, *args, **kwargs) -> str:
426    def read(self, *args, **kwargs) -> str:
427        """
428        Read the contents of the existing subfiles.
429        """
430        existing_subfile_indices = [
431            self.get_index_from_subfile_name(subfile_path.name)
432            for subfile_path in self.get_existing_subfile_paths()
433        ]
434        paths_to_read = [
435            self.get_subfile_path_from_index(subfile_index)
436            for subfile_index in existing_subfile_indices
437            if subfile_index >= self._cursor[0]
438        ]
439        buffer = ''
440        refresh_cursor = True
441        for subfile_path in paths_to_read:
442            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
443            seek_ix = (
444                self._cursor[1]
445                if subfile_index == self._cursor[0]
446                else 0
447            )
448
449            if (
450                subfile_index in self.subfile_objects
451                and
452                subfile_index not in self._redirected_subfile_objects
453            ):
454                subfile_object = self.subfile_objects[subfile_index]
455                for i in range(self.SEEK_BACK_ATTEMPTS):
456                    try:
457                        subfile_object.seek(max(seek_ix - i, 0))
458                        buffer += subfile_object.read()
459                    except UnicodeDecodeError:
460                        continue
461                    break
462            else:
463                with open(subfile_path, 'r', encoding='utf-8') as f:
464                    for i in range(self.SEEK_BACK_ATTEMPTS):
465                        try:
466                            f.seek(max(seek_ix - i, 0))
467                            buffer += f.read()
468                        except UnicodeDecodeError:
469                            continue
470                        break
471
472                    ### Handle the case when no files have yet been opened.
473                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
474                        self._cursor = (subfile_index, f.tell())
475                        refresh_cursor = False
476
477        if refresh_cursor:
478            self.refresh_cursor()
479        return buffer

Read the contents of the existing subfiles.

def refresh_cursor(self) -> None:
482    def refresh_cursor(self) -> None:
483        """
484        Update the cursor to the latest subfile index and file.tell() value.
485        """
486        self.flush()
487        existing_subfile_paths = self.get_existing_subfile_paths()
488        current_ix = (
489            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
490            if existing_subfile_paths
491            else 0
492        )
493        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
494        self._cursor = (current_ix, position)

Update the cursor to the latest subfile index and file.tell() value.

def readlines(self) -> List[str]:
497    def readlines(self) -> List[str]:
498        """
499        Return a list of lines of text.
500        """
501        existing_subfile_indices = [
502            self.get_index_from_subfile_name(subfile_path.name)
503            for subfile_path in self.get_existing_subfile_paths()
504        ]
505        paths_to_read = [
506            self.get_subfile_path_from_index(subfile_index)
507            for subfile_index in existing_subfile_indices
508            if subfile_index >= self._cursor[0]
509        ]
510
511        lines = []
512        refresh_cursor = True
513        for subfile_path in paths_to_read:
514            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
515            seek_ix = (
516                self._cursor[1]
517                if subfile_index == self._cursor[0]
518                else 0
519            )
520
521            if (
522                subfile_index in self.subfile_objects
523                and
524                subfile_index not in self._redirected_subfile_objects
525            ):
526                subfile_object = self.subfile_objects[subfile_index]
527                for i in range(self.SEEK_BACK_ATTEMPTS):
528                    try:
529                        subfile_object.seek(max((seek_ix - i), 0))
530                        subfile_lines = subfile_object.readlines()
531                    except UnicodeDecodeError:
532                        continue
533                    break
534            else:
535                with open(subfile_path, 'r', encoding='utf-8') as f:
536                    for i in range(self.SEEK_BACK_ATTEMPTS):
537                        try:
538                            f.seek(max(seek_ix - i, 0))
539                            subfile_lines = f.readlines()
540                        except UnicodeDecodeError:
541                            continue
542                        break
543
544                    ### Handle the case when no files have yet been opened.
545                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
546                        self._cursor = (subfile_index, f.tell())
547                        refresh_cursor = False
548
549            ### Sometimes a line may span multiple files.
550            if lines and subfile_lines and not lines[-1].endswith('\n'):
551                lines[-1] += subfile_lines[0]
552                new_lines = subfile_lines[1:]
553            else:
554                new_lines = subfile_lines
555            lines.extend(new_lines)
556
557        if refresh_cursor:
558            self.refresh_cursor()
559        return lines

Return a list of lines of text.

def seekable(self) -> bool:
562    def seekable(self) -> bool:
563        return True

Return whether object supports random access.

If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().

def seek(self, position: int) -> None:
566    def seek(self, position: int) -> None:
567        """
568        Seek to the beginning of the logs stream.
569        """
570        existing_subfile_indices = self.get_existing_subfile_indices()
571        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
572        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
573        if position == 0:
574            self._cursor = (min_ix, 0)
575            return
576
577        self._cursor = (max_ix, position)
578        if self._current_file_obj is not None:
579            self._current_file_obj.seek(position)

Seek to the beginning of the logs stream.

def flush(self) -> None:
582    def flush(self) -> None:
583        """
584        Flush any open subfiles.
585        """
586        for subfile_index, subfile_object in self.subfile_objects.items():
587            if not subfile_object.closed:
588                try:
589                    subfile_object.flush()
590                except Exception as e:
591                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
592        if self.redirect_streams:
593            try:
594                sys.stdout.flush()
595            except BrokenPipeError:
596                pass
597            except Exception as e:
598                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
599            try:
600                sys.stderr.flush()
601            except BrokenPipeError:
602                pass
603            except Exception as e:
604                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")

Flush any open subfiles.

def start_log_fd_interception(self):
607    def start_log_fd_interception(self):
608        """
609        Start the file descriptor monitoring threads.
610        """
611        if not self.write_timestamps:
612            return
613
614        self._stdout_interceptor = FileDescriptorInterceptor(
615            sys.stdout.fileno(),
616            self.get_timestamp_prefix_str,
617        )
618        self._stderr_interceptor = FileDescriptorInterceptor(
619            sys.stderr.fileno(),
620            self.get_timestamp_prefix_str,
621        )
622
623        self._stdout_interceptor_thread = Thread(
624            target = self._stdout_interceptor.start_interception,
625            daemon = True,
626        )
627        self._stderr_interceptor_thread = Thread(
628            target = self._stderr_interceptor.start_interception,
629            daemon = True,
630        )
631        self._stdout_interceptor_thread.start()
632        self._stderr_interceptor_thread.start()
633        self._intercepting = True
634
635        if '_interceptor_threads' not in self.__dict__:
636            self._interceptor_threads = []
637        if '_interceptors' not in self.__dict__:
638            self._interceptors = []
639        self._interceptor_threads.extend([
640            self._stdout_interceptor_thread,
641            self._stderr_interceptor_thread,
642        ])
643        self._interceptors.extend([
644            self._stdout_interceptor,
645            self._stderr_interceptor,
646        ])
647        self.stop_log_fd_interception(unused_only=True)

Start the file descriptor monitoring threads.

def stop_log_fd_interception(self, unused_only: bool = False):
650    def stop_log_fd_interception(self, unused_only: bool = False):
651        """
652        Stop the file descriptor monitoring threads.
653        """
654        if not self.write_timestamps:
655            return
656
657        interceptors = self.__dict__.get('_interceptors', [])
658        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
659
660        end_ix = len(interceptors) if not unused_only else -2
661
662        for interceptor in interceptors[:end_ix]:
663            interceptor.stop_interception()
664        del interceptors[:end_ix]
665
666        for thread in interceptor_threads[:end_ix]:
667            try:
668                thread.join()
669            except Exception as e:
670                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
671        del interceptor_threads[:end_ix]

Stop the file descriptor monitoring threads.