meerschaum.utils.daemon.RotatingFile

Create a file-like object that manages sub-files under the hood.

  1#! /usr/bin/env python3
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Create a file-like object that manages sub-files under the hood.
  7"""
  8
  9import os
 10import io
 11import re
 12import pathlib
 13import traceback
 14import sys
 15import atexit
 16from datetime import datetime, timezone
 17from typing import List, Optional, Tuple
 18from meerschaum.config import get_config
 19from meerschaum.utils.warnings import warn
 20from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor
 21from meerschaum.utils.threading import Thread
 22import meerschaum as mrsm
 23daemon = mrsm.attempt_import('daemon')
 24
 25class RotatingFile(io.IOBase):
 26    """
 27    A `RotatingFile` may be treated like a normal file-like object.
 28    Under the hood, however, it will create new sub-files and delete old ones.
 29    """
 30
 31    SEEK_BACK_ATTEMPTS: int = 5
 32
 33    def __init__(
 34        self,
 35        file_path: pathlib.Path,
 36        num_files_to_keep: Optional[int] = None,
 37        max_file_size: Optional[int] = None,
 38        redirect_streams: bool = False,
 39        write_timestamps: bool = False,
 40        timestamp_format: Optional[str] = None,
 41    ):
 42        """
 43        Create a file-like object which manages other files.
 44
 45        Parameters
 46        ----------
 47        num_files_to_keep: int, default None
 48            How many sub-files to keep at any given time.
 49            Defaults to the configured value (5).
 50
 51        max_file_size: int, default None
 52            How large in bytes each sub-file can grow before another file is created.
 53            Note that this is not a hard limit but rather a threshold
 54            which may be slightly exceeded.
 55            Defaults to the configured value (100_000).
 56
 57        redirect_streams: bool, default False
 58            If `True`, redirect previous file streams when opening a new file descriptor.
 59            
 60            NOTE: Only set this to `True` if you are entering into a daemon context.
 61            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 62
 63        write_timestamps: bool, default False
 64            If `True`, prepend the current UTC timestamp to each line of the file.
 65
 66        timestamp_format: str, default None
 67            If `write_timestamps` is `True`, use this format for the timestamps.
 68            Defaults to `'%Y-%m-%d %H:%M'`.
 69        """
 70        self.file_path = pathlib.Path(file_path)
 71        if num_files_to_keep is None:
 72            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 73        if max_file_size is None:
 74            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 75        if timestamp_format is None:
 76            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 77        if num_files_to_keep < 2:
 78            raise ValueError("At least 2 files must be kept.")
 79        if max_file_size < 1:
 80            raise ValueError("Subfiles must contain at least one byte.")
 81
 82        self.num_files_to_keep = num_files_to_keep
 83        self.max_file_size = max_file_size
 84        self.redirect_streams = redirect_streams
 85        self.write_timestamps = write_timestamps
 86        self.timestamp_format = timestamp_format
 87        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$')
 88
 89        ### When subfiles are opened, map from their index to the file objects.
 90        self.subfile_objects = {}
 91        self._redirected_subfile_objects = {}
 92        self._current_file_obj = None
 93        self._previous_file_obj = None
 94
 95        ### When reading, keep track of the file index and position.
 96        self._cursor: Tuple[int, int] = (0, 0)
 97
 98        ### Don't forget to close any stray files.
 99        atexit.register(self.close)
100
101
102    def fileno(self):
103        """
104        Return the file descriptor for the latest subfile.
105        """
106        self.refresh_files(start_interception=False)
107        return self._current_file_obj.fileno()
108
109
110    def get_latest_subfile_path(self) -> pathlib.Path:
111        """
112        Return the path for the latest subfile to which to write into.
113        """
114        return self.get_subfile_path_from_index(
115            self.get_latest_subfile_index()
116        )
117
118
119    def get_remaining_subfile_size(self, subfile_index: int) -> int:
120        """
121        Return the remaining buffer size for a subfile.
122
123        Parameters
124        ---------
125        subfile_index: int
126            The index of the subfile to be checked.
127
128        Returns
129        -------
130        The remaining size in bytes.
131        """
132        subfile_path = self.get_subfile_path_from_index(subfile_index)
133        if not subfile_path.exists():
134            return self.max_file_size
135
136        return self.max_file_size - os.path.getsize(subfile_path)
137
138
139    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
140        """
141        Return whether a given subfile is too large.
142
143        Parameters
144        ----------
145        subfile_index: int
146            The index of the subfile to be checked.
147
148        potential_new_len: int, default 0
149            The length of a potential write of new data.
150
151        Returns
152        -------
153        A bool indicating the subfile is or will be too large.
154        """
155        subfile_path = self.get_subfile_path_from_index(subfile_index)
156        if not subfile_path.exists():
157            return False
158
159        self.flush()
160
161        return (
162            (os.path.getsize(subfile_path) + potential_new_len)
163            >=
164            self.max_file_size
165        )
166
167
168    def get_latest_subfile_index(self) -> int:
169        """
170        Return the latest existing subfile index.
171        If no index may be found, return -1.
172        """
173        existing_subfile_paths = self.get_existing_subfile_paths()
174        latest_index = (
175            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
176            if existing_subfile_paths
177            else 0
178        )
179        return latest_index
180
181
182    def get_index_from_subfile_name(self, subfile_name: str) -> int:
183        """
184        Return the index from a given subfile name.
185        If the file name cannot be parsed, return -1.
186        """
187        try:
188            return int(subfile_name.replace(self.file_path.name + '.', ''))
189        except Exception as e:
190            return -1
191
192
193    def get_subfile_name_from_index(self, subfile_index: int) -> str:
194        """
195        Return the subfile name from the given index.
196        """
197        return f'{self.file_path.name}.{subfile_index}'
198
199
200    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
201        """
202        Return the subfile's path from its index.
203        """
204        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
205
206
207    def get_existing_subfile_indices(self) -> List[int]:
208        """
209        Return of list of subfile indices which exist on disk.
210        """
211        existing_subfile_paths = self.get_existing_subfile_paths()
212        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
213
214
215    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
216        """
217        Return a list of file paths that match the input filename pattern.
218        """
219        if not self.file_path.parent.exists():
220            return []
221
222        subfile_names_indices = sorted(
223            [
224                (file_name, self.get_index_from_subfile_name(file_name))
225                for file_name in os.listdir(self.file_path.parent)
226                if (
227                    file_name.startswith(self.file_path.name)
228                    and re.match(self.subfile_regex_pattern, file_name)
229                )
230            ],
231            key=lambda x: x[1],
232        )
233        return [
234            (self.file_path.parent / file_name)
235            for file_name, _ in subfile_names_indices
236        ]
237
238
239    def refresh_files(
240        self,
241        potential_new_len: int = 0,
242        start_interception: bool = False,
243    ) -> '_io.TextUIWrapper':
244        """
245        Check the state of the subfiles.
246        If the latest subfile is too large, create a new file and delete old ones.
247
248        Parameters
249        ----------
250        potential_new_len: int, default 0
251
252        start_interception: bool, default False
253            If `True`, kick off the file interception threads.
254        """
255        self.flush()
256
257        latest_subfile_index = self.get_latest_subfile_index()
258        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
259
260        ### First run with existing log files: open the most recent log file.
261        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
262
263        ### Sometimes a new file is created but output doesn't switch over.
264        lost_latest_handle = (
265            self._current_file_obj is not None
266            and
267            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
268        )
269        if is_first_run_with_logs or lost_latest_handle:
270            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
271            if self.redirect_streams:
272                try:
273                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
274                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
275                except OSError as e:
276                    warn(
277                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
278                    )
279                if start_interception and self.write_timestamps:
280                    self.start_log_fd_interception()
281
282        create_new_file = (
283            (latest_subfile_index == -1)
284            or
285            self._current_file_obj is None
286            or
287            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
288        )
289        if create_new_file:
290            old_subfile_index = latest_subfile_index
291            new_subfile_index = old_subfile_index + 1
292            new_file_path = self.get_subfile_path_from_index(new_subfile_index)
293            self._previous_file_obj = self._current_file_obj
294            self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
295            self.subfile_objects[new_subfile_index] = self._current_file_obj
296            self.flush()
297
298            if self._previous_file_obj is not None:
299                if self.redirect_streams:
300                    self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
301                    daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
302                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
303                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
304                self.close(unused_only=True)
305
306            ### Sanity check in case writing somehow fails.
307            if self._previous_file_obj is self._current_file_obj:
308                self._previous_file_obj = None
309
310            self.delete(unused_only=True)
311
312        return self._current_file_obj
313
314
315    def close(self, unused_only: bool = False) -> None:
316        """
317        Close any open file descriptors.
318
319        Parameters
320        ----------
321        unused_only: bool, default False
322            If `True`, only close file descriptors not currently in use.
323        """
324        self.stop_log_fd_interception(unused_only=unused_only)
325        subfile_indices = sorted(self.subfile_objects.keys())
326        for subfile_index in subfile_indices:
327            subfile_object = self.subfile_objects[subfile_index]
328            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
329                continue
330            try:
331                if not subfile_object.closed:
332                    subfile_object.close()
333            except Exception as e:
334                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
335
336            _ = self.subfile_objects.pop(subfile_index, None)
337            if self.redirect_streams:
338                _ = self._redirected_subfile_objects.pop(subfile_index, None)
339
340        if not unused_only:
341            self._previous_file_obj = None
342            self._current_file_obj = None
343
344
345    def get_timestamp_prefix_str(self) -> str:
346        """
347        Return the current minute prefix string.
348        """
349        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
350
351
352    def write(self, data: str) -> None:
353        """
354        Write the given text into the latest subfile.
355        If the subfile will be too large, create a new subfile.
356        If too many subfiles exist at once, the oldest one will be deleted.
357
358        NOTE: This will not split data across multiple files.
359        As such, if data is larger than max_file_size, then the corresponding subfile
360        may exceed this limit.
361        """
362        try:
363            self.file_path.parent.mkdir(exist_ok=True, parents=True)
364            if isinstance(data, bytes):
365                data = data.decode('utf-8')
366
367            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
368            suffix_str = "\n" if self.write_timestamps else ""
369            self.refresh_files(
370                potential_new_len = len(prefix_str + data + suffix_str),
371                start_interception = self.write_timestamps,
372            )
373            try:
374                if prefix_str:
375                    self._current_file_obj.write(prefix_str)
376                self._current_file_obj.write(data)
377                if suffix_str:
378                    self._current_file_obj.write(suffix_str)
379            except BrokenPipeError:
380                warn("BrokenPipeError encountered. The daemon may have been terminated.")
381                return
382            except Exception as e:
383                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
384            self.flush()
385            self.delete(unused_only=True)
386        except Exception as e:
387            warn(f"Unexpected error in RotatingFile.write: {e}")
388
389
390    def delete(self, unused_only: bool = False) -> None:
391        """
392        Delete old subfiles.
393
394        Parameters
395        ----------
396        unused_only: bool, default False
397            If `True`, only delete subfiles which are no longer needed.
398        """
399        existing_subfile_paths = self.get_existing_subfile_paths()
400        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
401            return
402
403        self.flush()
404        self.close(unused_only=unused_only)
405
406        end_ix = (
407            (-1 * self.num_files_to_keep)
408            if unused_only
409            else len(existing_subfile_paths)
410        )
411        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
412            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
413            subfile_object = self.subfile_objects.get(subfile_index, None)
414
415            try:
416                subfile_path_to_delete.unlink()
417            except Exception as e:
418                warn(
419                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
420                    + f"{traceback.format_exc()}"
421                )
422
423
424    def read(self, *args, **kwargs) -> str:
425        """
426        Read the contents of the existing subfiles.
427        """
428        existing_subfile_indices = [
429            self.get_index_from_subfile_name(subfile_path.name)
430            for subfile_path in self.get_existing_subfile_paths()
431        ]
432        paths_to_read = [
433            self.get_subfile_path_from_index(subfile_index)
434            for subfile_index in existing_subfile_indices
435            if subfile_index >= self._cursor[0]
436        ]
437        buffer = ''
438        refresh_cursor = True
439        for subfile_path in paths_to_read:
440            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
441            seek_ix = (
442                self._cursor[1]
443                if subfile_index == self._cursor[0]
444                else 0
445            )
446
447            if (
448                subfile_index in self.subfile_objects
449                and
450                subfile_index not in self._redirected_subfile_objects
451            ):
452                subfile_object = self.subfile_objects[subfile_index]
453                for i in range(self.SEEK_BACK_ATTEMPTS):
454                    try:
455                        subfile_object.seek(max(seek_ix - i, 0))
456                        buffer += subfile_object.read()
457                    except UnicodeDecodeError:
458                        continue
459                    break
460            else:
461                with open(subfile_path, 'r', encoding='utf-8') as f:
462                    for i in range(self.SEEK_BACK_ATTEMPTS):
463                        try:
464                            f.seek(max(seek_ix - i, 0))
465                            buffer += f.read()
466                        except UnicodeDecodeError:
467                            continue
468                        break
469
470                    ### Handle the case when no files have yet been opened.
471                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
472                        self._cursor = (subfile_index, f.tell())
473                        refresh_cursor = False
474
475        if refresh_cursor:
476            self.refresh_cursor()
477        return buffer
478
479
480    def refresh_cursor(self) -> None:
481        """
482        Update the cursor to the latest subfile index and file.tell() value.
483        """
484        self.flush()
485        existing_subfile_paths = self.get_existing_subfile_paths()
486        current_ix = (
487            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
488            if existing_subfile_paths
489            else 0
490        )
491        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
492        self._cursor = (current_ix, position)
493
494
495    def readlines(self) -> List[str]:
496        """
497        Return a list of lines of text.
498        """
499        existing_subfile_indices = [
500            self.get_index_from_subfile_name(subfile_path.name)
501            for subfile_path in self.get_existing_subfile_paths()
502        ]
503        paths_to_read = [
504            self.get_subfile_path_from_index(subfile_index)
505            for subfile_index in existing_subfile_indices
506            if subfile_index >= self._cursor[0]
507        ]
508
509        lines = []
510        refresh_cursor = True
511        for subfile_path in paths_to_read:
512            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
513            seek_ix = (
514                self._cursor[1]
515                if subfile_index == self._cursor[0]
516                else 0
517            )
518
519            subfile_lines = []
520            if (
521                subfile_index in self.subfile_objects
522                and
523                subfile_index not in self._redirected_subfile_objects
524            ):
525                subfile_object = self.subfile_objects[subfile_index]
526                for i in range(self.SEEK_BACK_ATTEMPTS):
527                    try:
528                        subfile_object.seek(max((seek_ix - i), 0))
529                        subfile_lines = subfile_object.readlines()
530                    except UnicodeDecodeError:
531                        continue
532                    break
533            else:
534                with open(subfile_path, 'r', encoding='utf-8') as f:
535                    for i in range(self.SEEK_BACK_ATTEMPTS):
536                        try:
537                            f.seek(max(seek_ix - i, 0))
538                            subfile_lines = f.readlines()
539                        except UnicodeDecodeError:
540                            continue
541                        break
542
543                    ### Handle the case when no files have yet been opened.
544                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
545                        self._cursor = (subfile_index, f.tell())
546                        refresh_cursor = False
547
548            ### Sometimes a line may span multiple files.
549            if lines and subfile_lines and not lines[-1].endswith('\n'):
550                lines[-1] += subfile_lines[0]
551                new_lines = subfile_lines[1:]
552            else:
553                new_lines = subfile_lines
554            lines.extend(new_lines)
555
556        if refresh_cursor:
557            self.refresh_cursor()
558        return lines
559
560
561    def seekable(self) -> bool:
562        return True
563
564
565    def seek(self, position: int) -> None:
566        """
567        Seek to the beginning of the logs stream.
568        """
569        existing_subfile_indices = self.get_existing_subfile_indices()
570        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
571        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
572        if position == 0:
573            self._cursor = (min_ix, 0)
574            return
575
576        self._cursor = (max_ix, position)
577        if self._current_file_obj is not None:
578            self._current_file_obj.seek(position)
579
580    
581    def flush(self) -> None:
582        """
583        Flush any open subfiles.
584        """
585        for subfile_index, subfile_object in self.subfile_objects.items():
586            if not subfile_object.closed:
587                try:
588                    subfile_object.flush()
589                except Exception as e:
590                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
591        if self.redirect_streams:
592            try:
593                sys.stdout.flush()
594            except BrokenPipeError:
595                pass
596            except Exception as e:
597                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
598            try:
599                sys.stderr.flush()
600            except BrokenPipeError:
601                pass
602            except Exception as e:
603                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
604
605
606    def start_log_fd_interception(self):
607        """
608        Start the file descriptor monitoring threads.
609        """
610        if not self.write_timestamps:
611            return
612
613        self._stdout_interceptor = FileDescriptorInterceptor(
614            sys.stdout.fileno(),
615            self.get_timestamp_prefix_str,
616        )
617        self._stderr_interceptor = FileDescriptorInterceptor(
618            sys.stderr.fileno(),
619            self.get_timestamp_prefix_str,
620        )
621
622        self._stdout_interceptor_thread = Thread(
623            target = self._stdout_interceptor.start_interception,
624            daemon = True,
625        )
626        self._stderr_interceptor_thread = Thread(
627            target = self._stderr_interceptor.start_interception,
628            daemon = True,
629        )
630        self._stdout_interceptor_thread.start()
631        self._stderr_interceptor_thread.start()
632        self._intercepting = True
633
634        if '_interceptor_threads' not in self.__dict__:
635            self._interceptor_threads = []
636        if '_interceptors' not in self.__dict__:
637            self._interceptors = []
638        self._interceptor_threads.extend([
639            self._stdout_interceptor_thread,
640            self._stderr_interceptor_thread,
641        ])
642        self._interceptors.extend([
643            self._stdout_interceptor,
644            self._stderr_interceptor,
645        ])
646        self.stop_log_fd_interception(unused_only=True)
647
648
649    def stop_log_fd_interception(self, unused_only: bool = False):
650        """
651        Stop the file descriptor monitoring threads.
652        """
653        if not self.write_timestamps:
654            return
655
656        interceptors = self.__dict__.get('_interceptors', [])
657        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
658
659        end_ix = len(interceptors) if not unused_only else -2
660
661        for interceptor in interceptors[:end_ix]:
662            interceptor.stop_interception()
663        del interceptors[:end_ix]
664
665        for thread in interceptor_threads[:end_ix]:
666            try:
667                thread.join()
668            except Exception as e:
669                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
670        del interceptor_threads[:end_ix]
671
672
673    def __repr__(self) -> str:
674        """
675        Return basic info for this `RotatingFile`.
676        """
677        return (
678            "RotatingFile("
679            + f"'{self.file_path.as_posix()}', "
680            + f"num_files_to_keep={self.num_files_to_keep}, "
681            + f"max_file_size={self.max_file_size})"
682        )
class RotatingFile(io.IOBase):
 26class RotatingFile(io.IOBase):
 27    """
 28    A `RotatingFile` may be treated like a normal file-like object.
 29    Under the hood, however, it will create new sub-files and delete old ones.
 30    """
 31
 32    SEEK_BACK_ATTEMPTS: int = 5
 33
 34    def __init__(
 35        self,
 36        file_path: pathlib.Path,
 37        num_files_to_keep: Optional[int] = None,
 38        max_file_size: Optional[int] = None,
 39        redirect_streams: bool = False,
 40        write_timestamps: bool = False,
 41        timestamp_format: Optional[str] = None,
 42    ):
 43        """
 44        Create a file-like object which manages other files.
 45
 46        Parameters
 47        ----------
 48        num_files_to_keep: int, default None
 49            How many sub-files to keep at any given time.
 50            Defaults to the configured value (5).
 51
 52        max_file_size: int, default None
 53            How large in bytes each sub-file can grow before another file is created.
 54            Note that this is not a hard limit but rather a threshold
 55            which may be slightly exceeded.
 56            Defaults to the configured value (100_000).
 57
 58        redirect_streams: bool, default False
 59            If `True`, redirect previous file streams when opening a new file descriptor.
 60            
 61            NOTE: Only set this to `True` if you are entering into a daemon context.
 62            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 63
 64        write_timestamps: bool, default False
 65            If `True`, prepend the current UTC timestamp to each line of the file.
 66
 67        timestamp_format: str, default None
 68            If `write_timestamps` is `True`, use this format for the timestamps.
 69            Defaults to `'%Y-%m-%d %H:%M'`.
 70        """
 71        self.file_path = pathlib.Path(file_path)
 72        if num_files_to_keep is None:
 73            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 74        if max_file_size is None:
 75            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 76        if timestamp_format is None:
 77            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 78        if num_files_to_keep < 2:
 79            raise ValueError("At least 2 files must be kept.")
 80        if max_file_size < 1:
 81            raise ValueError("Subfiles must contain at least one byte.")
 82
 83        self.num_files_to_keep = num_files_to_keep
 84        self.max_file_size = max_file_size
 85        self.redirect_streams = redirect_streams
 86        self.write_timestamps = write_timestamps
 87        self.timestamp_format = timestamp_format
 88        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$')
 89
 90        ### When subfiles are opened, map from their index to the file objects.
 91        self.subfile_objects = {}
 92        self._redirected_subfile_objects = {}
 93        self._current_file_obj = None
 94        self._previous_file_obj = None
 95
 96        ### When reading, keep track of the file index and position.
 97        self._cursor: Tuple[int, int] = (0, 0)
 98
 99        ### Don't forget to close any stray files.
100        atexit.register(self.close)
101
102
103    def fileno(self):
104        """
105        Return the file descriptor for the latest subfile.
106        """
107        self.refresh_files(start_interception=False)
108        return self._current_file_obj.fileno()
109
110
111    def get_latest_subfile_path(self) -> pathlib.Path:
112        """
113        Return the path for the latest subfile to which to write into.
114        """
115        return self.get_subfile_path_from_index(
116            self.get_latest_subfile_index()
117        )
118
119
120    def get_remaining_subfile_size(self, subfile_index: int) -> int:
121        """
122        Return the remaining buffer size for a subfile.
123
124        Parameters
125        ---------
126        subfile_index: int
127            The index of the subfile to be checked.
128
129        Returns
130        -------
131        The remaining size in bytes.
132        """
133        subfile_path = self.get_subfile_path_from_index(subfile_index)
134        if not subfile_path.exists():
135            return self.max_file_size
136
137        return self.max_file_size - os.path.getsize(subfile_path)
138
139
140    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
141        """
142        Return whether a given subfile is too large.
143
144        Parameters
145        ----------
146        subfile_index: int
147            The index of the subfile to be checked.
148
149        potential_new_len: int, default 0
150            The length of a potential write of new data.
151
152        Returns
153        -------
154        A bool indicating the subfile is or will be too large.
155        """
156        subfile_path = self.get_subfile_path_from_index(subfile_index)
157        if not subfile_path.exists():
158            return False
159
160        self.flush()
161
162        return (
163            (os.path.getsize(subfile_path) + potential_new_len)
164            >=
165            self.max_file_size
166        )
167
168
169    def get_latest_subfile_index(self) -> int:
170        """
171        Return the latest existing subfile index.
172        If no index may be found, return -1.
173        """
174        existing_subfile_paths = self.get_existing_subfile_paths()
175        latest_index = (
176            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
177            if existing_subfile_paths
178            else 0
179        )
180        return latest_index
181
182
183    def get_index_from_subfile_name(self, subfile_name: str) -> int:
184        """
185        Return the index from a given subfile name.
186        If the file name cannot be parsed, return -1.
187        """
188        try:
189            return int(subfile_name.replace(self.file_path.name + '.', ''))
190        except Exception as e:
191            return -1
192
193
194    def get_subfile_name_from_index(self, subfile_index: int) -> str:
195        """
196        Return the subfile name from the given index.
197        """
198        return f'{self.file_path.name}.{subfile_index}'
199
200
201    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
202        """
203        Return the subfile's path from its index.
204        """
205        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
206
207
208    def get_existing_subfile_indices(self) -> List[int]:
209        """
210        Return of list of subfile indices which exist on disk.
211        """
212        existing_subfile_paths = self.get_existing_subfile_paths()
213        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
214
215
216    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
217        """
218        Return a list of file paths that match the input filename pattern.
219        """
220        if not self.file_path.parent.exists():
221            return []
222
223        subfile_names_indices = sorted(
224            [
225                (file_name, self.get_index_from_subfile_name(file_name))
226                for file_name in os.listdir(self.file_path.parent)
227                if (
228                    file_name.startswith(self.file_path.name)
229                    and re.match(self.subfile_regex_pattern, file_name)
230                )
231            ],
232            key=lambda x: x[1],
233        )
234        return [
235            (self.file_path.parent / file_name)
236            for file_name, _ in subfile_names_indices
237        ]
238
239
240    def refresh_files(
241        self,
242        potential_new_len: int = 0,
243        start_interception: bool = False,
244    ) -> '_io.TextUIWrapper':
245        """
246        Check the state of the subfiles.
247        If the latest subfile is too large, create a new file and delete old ones.
248
249        Parameters
250        ----------
251        potential_new_len: int, default 0
252
253        start_interception: bool, default False
254            If `True`, kick off the file interception threads.
255        """
256        self.flush()
257
258        latest_subfile_index = self.get_latest_subfile_index()
259        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
260
261        ### First run with existing log files: open the most recent log file.
262        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
263
264        ### Sometimes a new file is created but output doesn't switch over.
265        lost_latest_handle = (
266            self._current_file_obj is not None
267            and
268            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
269        )
270        if is_first_run_with_logs or lost_latest_handle:
271            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
272            if self.redirect_streams:
273                try:
274                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
275                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
276                except OSError as e:
277                    warn(
278                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
279                    )
280                if start_interception and self.write_timestamps:
281                    self.start_log_fd_interception()
282
283        create_new_file = (
284            (latest_subfile_index == -1)
285            or
286            self._current_file_obj is None
287            or
288            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
289        )
290        if create_new_file:
291            old_subfile_index = latest_subfile_index
292            new_subfile_index = old_subfile_index + 1
293            new_file_path = self.get_subfile_path_from_index(new_subfile_index)
294            self._previous_file_obj = self._current_file_obj
295            self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
296            self.subfile_objects[new_subfile_index] = self._current_file_obj
297            self.flush()
298
299            if self._previous_file_obj is not None:
300                if self.redirect_streams:
301                    self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
302                    daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
303                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
304                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
305                self.close(unused_only=True)
306
307            ### Sanity check in case writing somehow fails.
308            if self._previous_file_obj is self._current_file_obj:
309                self._previous_file_obj = None
310
311            self.delete(unused_only=True)
312
313        return self._current_file_obj
314
315
316    def close(self, unused_only: bool = False) -> None:
317        """
318        Close any open file descriptors.
319
320        Parameters
321        ----------
322        unused_only: bool, default False
323            If `True`, only close file descriptors not currently in use.
324        """
325        self.stop_log_fd_interception(unused_only=unused_only)
326        subfile_indices = sorted(self.subfile_objects.keys())
327        for subfile_index in subfile_indices:
328            subfile_object = self.subfile_objects[subfile_index]
329            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
330                continue
331            try:
332                if not subfile_object.closed:
333                    subfile_object.close()
334            except Exception as e:
335                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
336
337            _ = self.subfile_objects.pop(subfile_index, None)
338            if self.redirect_streams:
339                _ = self._redirected_subfile_objects.pop(subfile_index, None)
340
341        if not unused_only:
342            self._previous_file_obj = None
343            self._current_file_obj = None
344
345
346    def get_timestamp_prefix_str(self) -> str:
347        """
348        Return the current minute prefix string.
349        """
350        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
351
352
353    def write(self, data: str) -> None:
354        """
355        Write the given text into the latest subfile.
356        If the subfile will be too large, create a new subfile.
357        If too many subfiles exist at once, the oldest one will be deleted.
358
359        NOTE: This will not split data across multiple files.
360        As such, if data is larger than max_file_size, then the corresponding subfile
361        may exceed this limit.
362        """
363        try:
364            self.file_path.parent.mkdir(exist_ok=True, parents=True)
365            if isinstance(data, bytes):
366                data = data.decode('utf-8')
367
368            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
369            suffix_str = "\n" if self.write_timestamps else ""
370            self.refresh_files(
371                potential_new_len = len(prefix_str + data + suffix_str),
372                start_interception = self.write_timestamps,
373            )
374            try:
375                if prefix_str:
376                    self._current_file_obj.write(prefix_str)
377                self._current_file_obj.write(data)
378                if suffix_str:
379                    self._current_file_obj.write(suffix_str)
380            except BrokenPipeError:
381                warn("BrokenPipeError encountered. The daemon may have been terminated.")
382                return
383            except Exception as e:
384                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
385            self.flush()
386            self.delete(unused_only=True)
387        except Exception as e:
388            warn(f"Unexpected error in RotatingFile.write: {e}")
389
390
391    def delete(self, unused_only: bool = False) -> None:
392        """
393        Delete old subfiles.
394
395        Parameters
396        ----------
397        unused_only: bool, default False
398            If `True`, only delete subfiles which are no longer needed.
399        """
400        existing_subfile_paths = self.get_existing_subfile_paths()
401        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
402            return
403
404        self.flush()
405        self.close(unused_only=unused_only)
406
407        end_ix = (
408            (-1 * self.num_files_to_keep)
409            if unused_only
410            else len(existing_subfile_paths)
411        )
412        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
413            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
414            subfile_object = self.subfile_objects.get(subfile_index, None)
415
416            try:
417                subfile_path_to_delete.unlink()
418            except Exception as e:
419                warn(
420                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
421                    + f"{traceback.format_exc()}"
422                )
423
424
425    def read(self, *args, **kwargs) -> str:
426        """
427        Read the contents of the existing subfiles.
428        """
429        existing_subfile_indices = [
430            self.get_index_from_subfile_name(subfile_path.name)
431            for subfile_path in self.get_existing_subfile_paths()
432        ]
433        paths_to_read = [
434            self.get_subfile_path_from_index(subfile_index)
435            for subfile_index in existing_subfile_indices
436            if subfile_index >= self._cursor[0]
437        ]
438        buffer = ''
439        refresh_cursor = True
440        for subfile_path in paths_to_read:
441            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
442            seek_ix = (
443                self._cursor[1]
444                if subfile_index == self._cursor[0]
445                else 0
446            )
447
448            if (
449                subfile_index in self.subfile_objects
450                and
451                subfile_index not in self._redirected_subfile_objects
452            ):
453                subfile_object = self.subfile_objects[subfile_index]
454                for i in range(self.SEEK_BACK_ATTEMPTS):
455                    try:
456                        subfile_object.seek(max(seek_ix - i, 0))
457                        buffer += subfile_object.read()
458                    except UnicodeDecodeError:
459                        continue
460                    break
461            else:
462                with open(subfile_path, 'r', encoding='utf-8') as f:
463                    for i in range(self.SEEK_BACK_ATTEMPTS):
464                        try:
465                            f.seek(max(seek_ix - i, 0))
466                            buffer += f.read()
467                        except UnicodeDecodeError:
468                            continue
469                        break
470
471                    ### Handle the case when no files have yet been opened.
472                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
473                        self._cursor = (subfile_index, f.tell())
474                        refresh_cursor = False
475
476        if refresh_cursor:
477            self.refresh_cursor()
478        return buffer
479
480
481    def refresh_cursor(self) -> None:
482        """
483        Update the cursor to the latest subfile index and file.tell() value.
484        """
485        self.flush()
486        existing_subfile_paths = self.get_existing_subfile_paths()
487        current_ix = (
488            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
489            if existing_subfile_paths
490            else 0
491        )
492        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
493        self._cursor = (current_ix, position)
494
495
496    def readlines(self) -> List[str]:
497        """
498        Return a list of lines of text.
499        """
500        existing_subfile_indices = [
501            self.get_index_from_subfile_name(subfile_path.name)
502            for subfile_path in self.get_existing_subfile_paths()
503        ]
504        paths_to_read = [
505            self.get_subfile_path_from_index(subfile_index)
506            for subfile_index in existing_subfile_indices
507            if subfile_index >= self._cursor[0]
508        ]
509
510        lines = []
511        refresh_cursor = True
512        for subfile_path in paths_to_read:
513            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
514            seek_ix = (
515                self._cursor[1]
516                if subfile_index == self._cursor[0]
517                else 0
518            )
519
520            subfile_lines = []
521            if (
522                subfile_index in self.subfile_objects
523                and
524                subfile_index not in self._redirected_subfile_objects
525            ):
526                subfile_object = self.subfile_objects[subfile_index]
527                for i in range(self.SEEK_BACK_ATTEMPTS):
528                    try:
529                        subfile_object.seek(max((seek_ix - i), 0))
530                        subfile_lines = subfile_object.readlines()
531                    except UnicodeDecodeError:
532                        continue
533                    break
534            else:
535                with open(subfile_path, 'r', encoding='utf-8') as f:
536                    for i in range(self.SEEK_BACK_ATTEMPTS):
537                        try:
538                            f.seek(max(seek_ix - i, 0))
539                            subfile_lines = f.readlines()
540                        except UnicodeDecodeError:
541                            continue
542                        break
543
544                    ### Handle the case when no files have yet been opened.
545                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
546                        self._cursor = (subfile_index, f.tell())
547                        refresh_cursor = False
548
549            ### Sometimes a line may span multiple files.
550            if lines and subfile_lines and not lines[-1].endswith('\n'):
551                lines[-1] += subfile_lines[0]
552                new_lines = subfile_lines[1:]
553            else:
554                new_lines = subfile_lines
555            lines.extend(new_lines)
556
557        if refresh_cursor:
558            self.refresh_cursor()
559        return lines
560
561
562    def seekable(self) -> bool:
563        return True
564
565
566    def seek(self, position: int) -> None:
567        """
568        Seek to the beginning of the logs stream.
569        """
570        existing_subfile_indices = self.get_existing_subfile_indices()
571        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
572        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
573        if position == 0:
574            self._cursor = (min_ix, 0)
575            return
576
577        self._cursor = (max_ix, position)
578        if self._current_file_obj is not None:
579            self._current_file_obj.seek(position)
580
581    
582    def flush(self) -> None:
583        """
584        Flush any open subfiles.
585        """
586        for subfile_index, subfile_object in self.subfile_objects.items():
587            if not subfile_object.closed:
588                try:
589                    subfile_object.flush()
590                except Exception as e:
591                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
592        if self.redirect_streams:
593            try:
594                sys.stdout.flush()
595            except BrokenPipeError:
596                pass
597            except Exception as e:
598                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
599            try:
600                sys.stderr.flush()
601            except BrokenPipeError:
602                pass
603            except Exception as e:
604                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
605
606
607    def start_log_fd_interception(self):
608        """
609        Start the file descriptor monitoring threads.
610        """
611        if not self.write_timestamps:
612            return
613
614        self._stdout_interceptor = FileDescriptorInterceptor(
615            sys.stdout.fileno(),
616            self.get_timestamp_prefix_str,
617        )
618        self._stderr_interceptor = FileDescriptorInterceptor(
619            sys.stderr.fileno(),
620            self.get_timestamp_prefix_str,
621        )
622
623        self._stdout_interceptor_thread = Thread(
624            target = self._stdout_interceptor.start_interception,
625            daemon = True,
626        )
627        self._stderr_interceptor_thread = Thread(
628            target = self._stderr_interceptor.start_interception,
629            daemon = True,
630        )
631        self._stdout_interceptor_thread.start()
632        self._stderr_interceptor_thread.start()
633        self._intercepting = True
634
635        if '_interceptor_threads' not in self.__dict__:
636            self._interceptor_threads = []
637        if '_interceptors' not in self.__dict__:
638            self._interceptors = []
639        self._interceptor_threads.extend([
640            self._stdout_interceptor_thread,
641            self._stderr_interceptor_thread,
642        ])
643        self._interceptors.extend([
644            self._stdout_interceptor,
645            self._stderr_interceptor,
646        ])
647        self.stop_log_fd_interception(unused_only=True)
648
649
650    def stop_log_fd_interception(self, unused_only: bool = False):
651        """
652        Stop the file descriptor monitoring threads.
653        """
654        if not self.write_timestamps:
655            return
656
657        interceptors = self.__dict__.get('_interceptors', [])
658        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
659
660        end_ix = len(interceptors) if not unused_only else -2
661
662        for interceptor in interceptors[:end_ix]:
663            interceptor.stop_interception()
664        del interceptors[:end_ix]
665
666        for thread in interceptor_threads[:end_ix]:
667            try:
668                thread.join()
669            except Exception as e:
670                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
671        del interceptor_threads[:end_ix]
672
673
674    def __repr__(self) -> str:
675        """
676        Return basic info for this `RotatingFile`.
677        """
678        return (
679            "RotatingFile("
680            + f"'{self.file_path.as_posix()}', "
681            + f"num_files_to_keep={self.num_files_to_keep}, "
682            + f"max_file_size={self.max_file_size})"
683        )

A RotatingFile may be treated like a normal file-like object. Under the hood, however, it will create new sub-files and delete old ones.

RotatingFile( file_path: pathlib.Path, num_files_to_keep: Optional[int] = None, max_file_size: Optional[int] = None, redirect_streams: bool = False, write_timestamps: bool = False, timestamp_format: Optional[str] = None)
 34    def __init__(
 35        self,
 36        file_path: pathlib.Path,
 37        num_files_to_keep: Optional[int] = None,
 38        max_file_size: Optional[int] = None,
 39        redirect_streams: bool = False,
 40        write_timestamps: bool = False,
 41        timestamp_format: Optional[str] = None,
 42    ):
 43        """
 44        Create a file-like object which manages other files.
 45
 46        Parameters
 47        ----------
 48        num_files_to_keep: int, default None
 49            How many sub-files to keep at any given time.
 50            Defaults to the configured value (5).
 51
 52        max_file_size: int, default None
 53            How large in bytes each sub-file can grow before another file is created.
 54            Note that this is not a hard limit but rather a threshold
 55            which may be slightly exceeded.
 56            Defaults to the configured value (100_000).
 57
 58        redirect_streams: bool, default False
 59            If `True`, redirect previous file streams when opening a new file descriptor.
 60            
 61            NOTE: Only set this to `True` if you are entering into a daemon context.
 62            Doing so will redirect `sys.stdout` and `sys.stderr` into the log files.
 63
 64        write_timestamps: bool, default False
 65            If `True`, prepend the current UTC timestamp to each line of the file.
 66
 67        timestamp_format: str, default None
 68            If `write_timestamps` is `True`, use this format for the timestamps.
 69            Defaults to `'%Y-%m-%d %H:%M'`.
 70        """
 71        self.file_path = pathlib.Path(file_path)
 72        if num_files_to_keep is None:
 73            num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep')
 74        if max_file_size is None:
 75            max_file_size = get_config('jobs', 'logs', 'max_file_size')
 76        if timestamp_format is None:
 77            timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format')
 78        if num_files_to_keep < 2:
 79            raise ValueError("At least 2 files must be kept.")
 80        if max_file_size < 1:
 81            raise ValueError("Subfiles must contain at least one byte.")
 82
 83        self.num_files_to_keep = num_files_to_keep
 84        self.max_file_size = max_file_size
 85        self.redirect_streams = redirect_streams
 86        self.write_timestamps = write_timestamps
 87        self.timestamp_format = timestamp_format
 88        self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$')
 89
 90        ### When subfiles are opened, map from their index to the file objects.
 91        self.subfile_objects = {}
 92        self._redirected_subfile_objects = {}
 93        self._current_file_obj = None
 94        self._previous_file_obj = None
 95
 96        ### When reading, keep track of the file index and position.
 97        self._cursor: Tuple[int, int] = (0, 0)
 98
 99        ### Don't forget to close any stray files.
100        atexit.register(self.close)

Create a file-like object which manages other files.

Parameters
  • num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
  • max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
  • redirect_streams (bool, default False): If True, redirect previous file streams when opening a new file descriptor.

    NOTE: Only set this to True if you are entering into a daemon context. Doing so will redirect sys.stdout and sys.stderr into the log files.

  • write_timestamps (bool, default False): If True, prepend the current UTC timestamp to each line of the file.
  • timestamp_format (str, default None): If write_timestamps is True, use this format for the timestamps. Defaults to '%Y-%m-%d %H:%M'.
SEEK_BACK_ATTEMPTS: int = 5
file_path
num_files_to_keep
max_file_size
redirect_streams
write_timestamps
timestamp_format
subfile_regex_pattern
subfile_objects
def fileno(self):
103    def fileno(self):
104        """
105        Return the file descriptor for the latest subfile.
106        """
107        self.refresh_files(start_interception=False)
108        return self._current_file_obj.fileno()

Return the file descriptor for the latest subfile.

def get_latest_subfile_path(self) -> pathlib.Path:
111    def get_latest_subfile_path(self) -> pathlib.Path:
112        """
113        Return the path for the latest subfile to which to write into.
114        """
115        return self.get_subfile_path_from_index(
116            self.get_latest_subfile_index()
117        )

Return the path for the latest subfile to which to write into.

def get_remaining_subfile_size(self, subfile_index: int) -> int:
120    def get_remaining_subfile_size(self, subfile_index: int) -> int:
121        """
122        Return the remaining buffer size for a subfile.
123
124        Parameters
125        ---------
126        subfile_index: int
127            The index of the subfile to be checked.
128
129        Returns
130        -------
131        The remaining size in bytes.
132        """
133        subfile_path = self.get_subfile_path_from_index(subfile_index)
134        if not subfile_path.exists():
135            return self.max_file_size
136
137        return self.max_file_size - os.path.getsize(subfile_path)

Return the remaining buffer size for a subfile.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
Returns
  • The remaining size in bytes.
def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
140    def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool:
141        """
142        Return whether a given subfile is too large.
143
144        Parameters
145        ----------
146        subfile_index: int
147            The index of the subfile to be checked.
148
149        potential_new_len: int, default 0
150            The length of a potential write of new data.
151
152        Returns
153        -------
154        A bool indicating the subfile is or will be too large.
155        """
156        subfile_path = self.get_subfile_path_from_index(subfile_index)
157        if not subfile_path.exists():
158            return False
159
160        self.flush()
161
162        return (
163            (os.path.getsize(subfile_path) + potential_new_len)
164            >=
165            self.max_file_size
166        )

Return whether a given subfile is too large.

Parameters
  • subfile_index (int): The index of the subfile to be checked.
  • potential_new_len (int, default 0): The length of a potential write of new data.
Returns
  • A bool indicating the subfile is or will be too large.
def get_latest_subfile_index(self) -> int:
169    def get_latest_subfile_index(self) -> int:
170        """
171        Return the latest existing subfile index.
172        If no index may be found, return -1.
173        """
174        existing_subfile_paths = self.get_existing_subfile_paths()
175        latest_index = (
176            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
177            if existing_subfile_paths
178            else 0
179        )
180        return latest_index

Return the latest existing subfile index. If no index may be found, return -1.

def get_index_from_subfile_name(self, subfile_name: str) -> int:
183    def get_index_from_subfile_name(self, subfile_name: str) -> int:
184        """
185        Return the index from a given subfile name.
186        If the file name cannot be parsed, return -1.
187        """
188        try:
189            return int(subfile_name.replace(self.file_path.name + '.', ''))
190        except Exception as e:
191            return -1

Return the index from a given subfile name. If the file name cannot be parsed, return -1.

def get_subfile_name_from_index(self, subfile_index: int) -> str:
194    def get_subfile_name_from_index(self, subfile_index: int) -> str:
195        """
196        Return the subfile name from the given index.
197        """
198        return f'{self.file_path.name}.{subfile_index}'

Return the subfile name from the given index.

def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
201    def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path:
202        """
203        Return the subfile's path from its index.
204        """
205        return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)

Return the subfile's path from its index.

def get_existing_subfile_indices(self) -> List[int]:
208    def get_existing_subfile_indices(self) -> List[int]:
209        """
210        Return of list of subfile indices which exist on disk.
211        """
212        existing_subfile_paths = self.get_existing_subfile_paths()
213        return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]

Return of list of subfile indices which exist on disk.

def get_existing_subfile_paths(self) -> List[pathlib.Path]:
216    def get_existing_subfile_paths(self) -> List[pathlib.Path]:
217        """
218        Return a list of file paths that match the input filename pattern.
219        """
220        if not self.file_path.parent.exists():
221            return []
222
223        subfile_names_indices = sorted(
224            [
225                (file_name, self.get_index_from_subfile_name(file_name))
226                for file_name in os.listdir(self.file_path.parent)
227                if (
228                    file_name.startswith(self.file_path.name)
229                    and re.match(self.subfile_regex_pattern, file_name)
230                )
231            ],
232            key=lambda x: x[1],
233        )
234        return [
235            (self.file_path.parent / file_name)
236            for file_name, _ in subfile_names_indices
237        ]

Return a list of file paths that match the input filename pattern.

def refresh_files( self, potential_new_len: int = 0, start_interception: bool = False) -> '_io.TextUIWrapper':
240    def refresh_files(
241        self,
242        potential_new_len: int = 0,
243        start_interception: bool = False,
244    ) -> '_io.TextUIWrapper':
245        """
246        Check the state of the subfiles.
247        If the latest subfile is too large, create a new file and delete old ones.
248
249        Parameters
250        ----------
251        potential_new_len: int, default 0
252
253        start_interception: bool, default False
254            If `True`, kick off the file interception threads.
255        """
256        self.flush()
257
258        latest_subfile_index = self.get_latest_subfile_index()
259        latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index)
260
261        ### First run with existing log files: open the most recent log file.
262        is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None)
263
264        ### Sometimes a new file is created but output doesn't switch over.
265        lost_latest_handle = (
266            self._current_file_obj is not None
267            and
268            self.get_index_from_subfile_name(self._current_file_obj.name) == -1
269        )
270        if is_first_run_with_logs or lost_latest_handle:
271            self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8')
272            if self.redirect_streams:
273                try:
274                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
275                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
276                except OSError as e:
277                    warn(
278                        f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}"
279                    )
280                if start_interception and self.write_timestamps:
281                    self.start_log_fd_interception()
282
283        create_new_file = (
284            (latest_subfile_index == -1)
285            or
286            self._current_file_obj is None
287            or
288            self.is_subfile_too_large(latest_subfile_index, potential_new_len)
289        )
290        if create_new_file:
291            old_subfile_index = latest_subfile_index
292            new_subfile_index = old_subfile_index + 1
293            new_file_path = self.get_subfile_path_from_index(new_subfile_index)
294            self._previous_file_obj = self._current_file_obj
295            self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8')
296            self.subfile_objects[new_subfile_index] = self._current_file_obj
297            self.flush()
298
299            if self._previous_file_obj is not None:
300                if self.redirect_streams:
301                    self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj
302                    daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj)
303                    daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj)
304                    daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj)
305                self.close(unused_only=True)
306
307            ### Sanity check in case writing somehow fails.
308            if self._previous_file_obj is self._current_file_obj:
309                self._previous_file_obj = None
310
311            self.delete(unused_only=True)
312
313        return self._current_file_obj

Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.

Parameters
  • potential_new_len (int, default 0):

  • start_interception (bool, default False): If True, kick off the file interception threads.

def close(self, unused_only: bool = False) -> None:
316    def close(self, unused_only: bool = False) -> None:
317        """
318        Close any open file descriptors.
319
320        Parameters
321        ----------
322        unused_only: bool, default False
323            If `True`, only close file descriptors not currently in use.
324        """
325        self.stop_log_fd_interception(unused_only=unused_only)
326        subfile_indices = sorted(self.subfile_objects.keys())
327        for subfile_index in subfile_indices:
328            subfile_object = self.subfile_objects[subfile_index]
329            if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj):
330                continue
331            try:
332                if not subfile_object.closed:
333                    subfile_object.close()
334            except Exception as e:
335                warn(f"Failed to close an open subfile:\n{traceback.format_exc()}")
336
337            _ = self.subfile_objects.pop(subfile_index, None)
338            if self.redirect_streams:
339                _ = self._redirected_subfile_objects.pop(subfile_index, None)
340
341        if not unused_only:
342            self._previous_file_obj = None
343            self._current_file_obj = None

Close any open file descriptors.

Parameters
  • unused_only (bool, default False): If True, only close file descriptors not currently in use.
def get_timestamp_prefix_str(self) -> str:
346    def get_timestamp_prefix_str(self) -> str:
347        """
348        Return the current minute prefix string.
349        """
350        return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '

Return the current minute prefix string.

def write(self, data: str) -> None:
353    def write(self, data: str) -> None:
354        """
355        Write the given text into the latest subfile.
356        If the subfile will be too large, create a new subfile.
357        If too many subfiles exist at once, the oldest one will be deleted.
358
359        NOTE: This will not split data across multiple files.
360        As such, if data is larger than max_file_size, then the corresponding subfile
361        may exceed this limit.
362        """
363        try:
364            self.file_path.parent.mkdir(exist_ok=True, parents=True)
365            if isinstance(data, bytes):
366                data = data.decode('utf-8')
367
368            prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else ""
369            suffix_str = "\n" if self.write_timestamps else ""
370            self.refresh_files(
371                potential_new_len = len(prefix_str + data + suffix_str),
372                start_interception = self.write_timestamps,
373            )
374            try:
375                if prefix_str:
376                    self._current_file_obj.write(prefix_str)
377                self._current_file_obj.write(data)
378                if suffix_str:
379                    self._current_file_obj.write(suffix_str)
380            except BrokenPipeError:
381                warn("BrokenPipeError encountered. The daemon may have been terminated.")
382                return
383            except Exception as e:
384                warn(f"Failed to write to subfile:\n{traceback.format_exc()}")
385            self.flush()
386            self.delete(unused_only=True)
387        except Exception as e:
388            warn(f"Unexpected error in RotatingFile.write: {e}")

Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.

NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.

def delete(self, unused_only: bool = False) -> None:
391    def delete(self, unused_only: bool = False) -> None:
392        """
393        Delete old subfiles.
394
395        Parameters
396        ----------
397        unused_only: bool, default False
398            If `True`, only delete subfiles which are no longer needed.
399        """
400        existing_subfile_paths = self.get_existing_subfile_paths()
401        if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep:
402            return
403
404        self.flush()
405        self.close(unused_only=unused_only)
406
407        end_ix = (
408            (-1 * self.num_files_to_keep)
409            if unused_only
410            else len(existing_subfile_paths)
411        )
412        for subfile_path_to_delete in existing_subfile_paths[0:end_ix]:
413            subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name)
414            subfile_object = self.subfile_objects.get(subfile_index, None)
415
416            try:
417                subfile_path_to_delete.unlink()
418            except Exception as e:
419                warn(
420                    f"Unable to delete subfile '{subfile_path_to_delete}':\n"
421                    + f"{traceback.format_exc()}"
422                )

Delete old subfiles.

Parameters
  • unused_only (bool, default False): If True, only delete subfiles which are no longer needed.
def read(self, *args, **kwargs) -> str:
425    def read(self, *args, **kwargs) -> str:
426        """
427        Read the contents of the existing subfiles.
428        """
429        existing_subfile_indices = [
430            self.get_index_from_subfile_name(subfile_path.name)
431            for subfile_path in self.get_existing_subfile_paths()
432        ]
433        paths_to_read = [
434            self.get_subfile_path_from_index(subfile_index)
435            for subfile_index in existing_subfile_indices
436            if subfile_index >= self._cursor[0]
437        ]
438        buffer = ''
439        refresh_cursor = True
440        for subfile_path in paths_to_read:
441            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
442            seek_ix = (
443                self._cursor[1]
444                if subfile_index == self._cursor[0]
445                else 0
446            )
447
448            if (
449                subfile_index in self.subfile_objects
450                and
451                subfile_index not in self._redirected_subfile_objects
452            ):
453                subfile_object = self.subfile_objects[subfile_index]
454                for i in range(self.SEEK_BACK_ATTEMPTS):
455                    try:
456                        subfile_object.seek(max(seek_ix - i, 0))
457                        buffer += subfile_object.read()
458                    except UnicodeDecodeError:
459                        continue
460                    break
461            else:
462                with open(subfile_path, 'r', encoding='utf-8') as f:
463                    for i in range(self.SEEK_BACK_ATTEMPTS):
464                        try:
465                            f.seek(max(seek_ix - i, 0))
466                            buffer += f.read()
467                        except UnicodeDecodeError:
468                            continue
469                        break
470
471                    ### Handle the case when no files have yet been opened.
472                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
473                        self._cursor = (subfile_index, f.tell())
474                        refresh_cursor = False
475
476        if refresh_cursor:
477            self.refresh_cursor()
478        return buffer

Read the contents of the existing subfiles.

def refresh_cursor(self) -> None:
481    def refresh_cursor(self) -> None:
482        """
483        Update the cursor to the latest subfile index and file.tell() value.
484        """
485        self.flush()
486        existing_subfile_paths = self.get_existing_subfile_paths()
487        current_ix = (
488            self.get_index_from_subfile_name(existing_subfile_paths[-1].name)
489            if existing_subfile_paths
490            else 0
491        )
492        position = self._current_file_obj.tell() if self._current_file_obj is not None else 0
493        self._cursor = (current_ix, position)

Update the cursor to the latest subfile index and file.tell() value.

def readlines(self) -> List[str]:
496    def readlines(self) -> List[str]:
497        """
498        Return a list of lines of text.
499        """
500        existing_subfile_indices = [
501            self.get_index_from_subfile_name(subfile_path.name)
502            for subfile_path in self.get_existing_subfile_paths()
503        ]
504        paths_to_read = [
505            self.get_subfile_path_from_index(subfile_index)
506            for subfile_index in existing_subfile_indices
507            if subfile_index >= self._cursor[0]
508        ]
509
510        lines = []
511        refresh_cursor = True
512        for subfile_path in paths_to_read:
513            subfile_index = self.get_index_from_subfile_name(subfile_path.name)
514            seek_ix = (
515                self._cursor[1]
516                if subfile_index == self._cursor[0]
517                else 0
518            )
519
520            subfile_lines = []
521            if (
522                subfile_index in self.subfile_objects
523                and
524                subfile_index not in self._redirected_subfile_objects
525            ):
526                subfile_object = self.subfile_objects[subfile_index]
527                for i in range(self.SEEK_BACK_ATTEMPTS):
528                    try:
529                        subfile_object.seek(max((seek_ix - i), 0))
530                        subfile_lines = subfile_object.readlines()
531                    except UnicodeDecodeError:
532                        continue
533                    break
534            else:
535                with open(subfile_path, 'r', encoding='utf-8') as f:
536                    for i in range(self.SEEK_BACK_ATTEMPTS):
537                        try:
538                            f.seek(max(seek_ix - i, 0))
539                            subfile_lines = f.readlines()
540                        except UnicodeDecodeError:
541                            continue
542                        break
543
544                    ### Handle the case when no files have yet been opened.
545                    if not self.subfile_objects and subfile_path == paths_to_read[-1]:
546                        self._cursor = (subfile_index, f.tell())
547                        refresh_cursor = False
548
549            ### Sometimes a line may span multiple files.
550            if lines and subfile_lines and not lines[-1].endswith('\n'):
551                lines[-1] += subfile_lines[0]
552                new_lines = subfile_lines[1:]
553            else:
554                new_lines = subfile_lines
555            lines.extend(new_lines)
556
557        if refresh_cursor:
558            self.refresh_cursor()
559        return lines

Return a list of lines of text.

def seekable(self) -> bool:
562    def seekable(self) -> bool:
563        return True

Return whether object supports random access.

If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().

def seek(self, position: int) -> None:
566    def seek(self, position: int) -> None:
567        """
568        Seek to the beginning of the logs stream.
569        """
570        existing_subfile_indices = self.get_existing_subfile_indices()
571        min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0
572        max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0
573        if position == 0:
574            self._cursor = (min_ix, 0)
575            return
576
577        self._cursor = (max_ix, position)
578        if self._current_file_obj is not None:
579            self._current_file_obj.seek(position)

Seek to the beginning of the logs stream.

def flush(self) -> None:
582    def flush(self) -> None:
583        """
584        Flush any open subfiles.
585        """
586        for subfile_index, subfile_object in self.subfile_objects.items():
587            if not subfile_object.closed:
588                try:
589                    subfile_object.flush()
590                except Exception as e:
591                    warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}")
592        if self.redirect_streams:
593            try:
594                sys.stdout.flush()
595            except BrokenPipeError:
596                pass
597            except Exception as e:
598                warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}")
599            try:
600                sys.stderr.flush()
601            except BrokenPipeError:
602                pass
603            except Exception as e:
604                warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")

Flush any open subfiles.

def start_log_fd_interception(self):
607    def start_log_fd_interception(self):
608        """
609        Start the file descriptor monitoring threads.
610        """
611        if not self.write_timestamps:
612            return
613
614        self._stdout_interceptor = FileDescriptorInterceptor(
615            sys.stdout.fileno(),
616            self.get_timestamp_prefix_str,
617        )
618        self._stderr_interceptor = FileDescriptorInterceptor(
619            sys.stderr.fileno(),
620            self.get_timestamp_prefix_str,
621        )
622
623        self._stdout_interceptor_thread = Thread(
624            target = self._stdout_interceptor.start_interception,
625            daemon = True,
626        )
627        self._stderr_interceptor_thread = Thread(
628            target = self._stderr_interceptor.start_interception,
629            daemon = True,
630        )
631        self._stdout_interceptor_thread.start()
632        self._stderr_interceptor_thread.start()
633        self._intercepting = True
634
635        if '_interceptor_threads' not in self.__dict__:
636            self._interceptor_threads = []
637        if '_interceptors' not in self.__dict__:
638            self._interceptors = []
639        self._interceptor_threads.extend([
640            self._stdout_interceptor_thread,
641            self._stderr_interceptor_thread,
642        ])
643        self._interceptors.extend([
644            self._stdout_interceptor,
645            self._stderr_interceptor,
646        ])
647        self.stop_log_fd_interception(unused_only=True)

Start the file descriptor monitoring threads.

def stop_log_fd_interception(self, unused_only: bool = False):
650    def stop_log_fd_interception(self, unused_only: bool = False):
651        """
652        Stop the file descriptor monitoring threads.
653        """
654        if not self.write_timestamps:
655            return
656
657        interceptors = self.__dict__.get('_interceptors', [])
658        interceptor_threads = self.__dict__.get('_interceptor_threads', [])
659
660        end_ix = len(interceptors) if not unused_only else -2
661
662        for interceptor in interceptors[:end_ix]:
663            interceptor.stop_interception()
664        del interceptors[:end_ix]
665
666        for thread in interceptor_threads[:end_ix]:
667            try:
668                thread.join()
669            except Exception as e:
670                warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}")
671        del interceptor_threads[:end_ix]

Stop the file descriptor monitoring threads.