meerschaum.utils.daemon.RotatingFile
Create a file-like object that manages sub-files under the hood.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Create a file-like object that manages sub-files under the hood. 7""" 8 9import os 10import io 11import re 12import pathlib 13import traceback 14import sys 15import atexit 16from datetime import datetime, timezone 17from typing import List, Optional, Tuple, Callable 18from meerschaum.config import get_config 19from meerschaum.utils.warnings import warn 20from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor 21from meerschaum.utils.threading import Thread 22import meerschaum as mrsm 23import threading 24daemon = mrsm.attempt_import('daemon') 25 26class RotatingFile(io.IOBase): 27 """ 28 A `RotatingFile` may be treated like a normal file-like object. 29 Under the hood, however, it will create new sub-files and delete old ones. 30 """ 31 32 SEEK_BACK_ATTEMPTS: int = 5 33 34 def __init__( 35 self, 36 file_path: pathlib.Path, 37 num_files_to_keep: Optional[int] = None, 38 max_file_size: Optional[int] = None, 39 redirect_streams: bool = False, 40 write_timestamps: bool = False, 41 timestamp_format: Optional[str] = None, 42 write_callback: Optional[Callable[[str], None]] = None, 43 ): 44 """ 45 Create a file-like object which manages other files. 46 47 Parameters 48 ---------- 49 num_files_to_keep: int, default None 50 How many sub-files to keep at any given time. 51 Defaults to the configured value (5). 52 53 max_file_size: int, default None 54 How large in bytes each sub-file can grow before another file is created. 55 Note that this is not a hard limit but rather a threshold 56 which may be slightly exceeded. 57 Defaults to the configured value (100_000). 58 59 redirect_streams: bool, default False 60 If `True`, redirect previous file streams when opening a new file descriptor. 61 62 NOTE: Only set this to `True` if you are entering into a daemon context. 63 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 64 65 write_timestamps: bool, default False 66 If `True`, prepend the current UTC timestamp to each line of the file. 67 68 timestamp_format: str, default None 69 If `write_timestamps` is `True`, use this format for the timestamps. 70 Defaults to `'%Y-%m-%d %H:%M'`. 71 72 write_callback: Optional[Callable[[str], None]], default None 73 If provided, execute this callback with the data to be written. 74 """ 75 self.file_path = pathlib.Path(file_path) 76 if num_files_to_keep is None: 77 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 78 if max_file_size is None: 79 max_file_size = get_config('jobs', 'logs', 'max_file_size') 80 if timestamp_format is None: 81 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 82 if num_files_to_keep < 1: 83 raise ValueError("At least 1 file must be kept.") 84 if max_file_size < 100: 85 raise ValueError("Subfiles must contain at least 100 bytes.") 86 87 self.num_files_to_keep = num_files_to_keep 88 self.max_file_size = max_file_size 89 self.redirect_streams = redirect_streams 90 self.write_timestamps = write_timestamps 91 self.timestamp_format = timestamp_format 92 self.write_callback = write_callback 93 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?') 94 95 ### When subfiles are opened, map from their index to the file objects. 96 self.subfile_objects = {} 97 self._redirected_subfile_objects = {} 98 self._current_file_obj = None 99 self._previous_file_obj = None 100 101 ### When reading, keep track of the file index and position. 102 self._cursor: Tuple[int, int] = (0, 0) 103 104 ### Don't forget to close any stray files. 105 atexit.register(self.close) 106 107 108 def fileno(self): 109 """ 110 Return the file descriptor for the latest subfile. 111 """ 112 self.refresh_files(start_interception=False) 113 return self._current_file_obj.fileno() 114 115 116 def get_latest_subfile_path(self) -> pathlib.Path: 117 """ 118 Return the path for the latest subfile to which to write into. 119 """ 120 return self.get_subfile_path_from_index( 121 self.get_latest_subfile_index() 122 ) 123 124 125 def get_remaining_subfile_size(self, subfile_index: int) -> int: 126 """ 127 Return the remaining buffer size for a subfile. 128 129 Parameters 130 --------- 131 subfile_index: int 132 The index of the subfile to be checked. 133 134 Returns 135 ------- 136 The remaining size in bytes. 137 """ 138 subfile_path = self.get_subfile_path_from_index(subfile_index) 139 if not subfile_path.exists(): 140 return self.max_file_size 141 142 return self.max_file_size - os.path.getsize(subfile_path) 143 144 145 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 146 """ 147 Return whether a given subfile is too large. 148 149 Parameters 150 ---------- 151 subfile_index: int 152 The index of the subfile to be checked. 153 154 potential_new_len: int, default 0 155 The length of a potential write of new data. 156 157 Returns 158 ------- 159 A bool indicating the subfile is or will be too large. 160 """ 161 subfile_path = self.get_subfile_path_from_index(subfile_index) 162 if not subfile_path.exists(): 163 return False 164 165 self.flush() 166 167 return ( 168 (os.path.getsize(subfile_path) + potential_new_len) 169 >= 170 self.max_file_size 171 ) 172 173 174 def get_latest_subfile_index(self) -> int: 175 """ 176 Return the latest existing subfile index. 177 If no index may be found, return -1. 178 """ 179 existing_subfile_paths = self.get_existing_subfile_paths() 180 latest_index = ( 181 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 182 if existing_subfile_paths 183 else 0 184 ) 185 return latest_index 186 187 188 def get_index_from_subfile_name(self, subfile_name: str) -> int: 189 """ 190 Return the index from a given subfile name. 191 If the file name cannot be parsed, return -1. 192 """ 193 try: 194 return int(subfile_name.replace(self.file_path.name + '.', '')) 195 except Exception: 196 return -1 197 198 199 def get_subfile_name_from_index(self, subfile_index: int) -> str: 200 """ 201 Return the subfile name from the given index. 202 """ 203 return f'{self.file_path.name}.{subfile_index}' 204 205 206 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 207 """ 208 Return the subfile's path from its index. 209 """ 210 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index) 211 212 213 def get_existing_subfile_indices(self) -> List[int]: 214 """ 215 Return of list of subfile indices which exist on disk. 216 """ 217 existing_subfile_paths = self.get_existing_subfile_paths() 218 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths] 219 220 221 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 222 """ 223 Return a list of file paths that match the input filename pattern. 224 """ 225 if not self.file_path.parent.exists(): 226 return [] 227 228 subfile_names_indices = sorted( 229 [ 230 (file_name, self.get_index_from_subfile_name(file_name)) 231 for file_name in os.listdir(self.file_path.parent) 232 if ( 233 file_name.startswith(self.file_path.name) 234 and re.match(self.subfile_regex_pattern, file_name) 235 ) 236 ], 237 key=lambda x: x[1], 238 ) 239 return [ 240 (self.file_path.parent / file_name) 241 for file_name, _ in subfile_names_indices 242 ] 243 244 245 def refresh_files( 246 self, 247 potential_new_len: int = 0, 248 start_interception: bool = False, 249 ) -> '_io.TextUIWrapper': 250 """ 251 Check the state of the subfiles. 252 If the latest subfile is too large, create a new file and delete old ones. 253 254 Parameters 255 ---------- 256 potential_new_len: int, default 0 257 258 start_interception: bool, default False 259 If `True`, kick off the file interception threads. 260 """ 261 self.flush() 262 263 latest_subfile_index = self.get_latest_subfile_index() 264 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 265 266 ### First run with existing log files: open the most recent log file. 267 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 268 269 ### Sometimes a new file is created but output doesn't switch over. 270 lost_latest_handle = ( 271 self._current_file_obj is not None 272 and 273 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 274 ) 275 if is_first_run_with_logs or lost_latest_handle: 276 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 277 if self.redirect_streams: 278 try: 279 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 280 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 281 except OSError: 282 warn( 283 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 284 ) 285 if start_interception and self.write_timestamps: 286 self.start_log_fd_interception() 287 288 create_new_file = ( 289 (latest_subfile_index == -1) 290 or 291 self._current_file_obj is None 292 or 293 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 294 ) 295 if create_new_file: 296 self.increment_subfiles() 297 298 return self._current_file_obj 299 300 def increment_subfiles(self, increment_by: int = 1): 301 """ 302 Create a new subfile and switch the file pointer over. 303 """ 304 latest_subfile_index = self.get_latest_subfile_index() 305 old_subfile_index = latest_subfile_index 306 new_subfile_index = old_subfile_index + increment_by 307 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 308 self._previous_file_obj = self._current_file_obj 309 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 310 self.subfile_objects[new_subfile_index] = self._current_file_obj 311 self.flush() 312 313 if self.redirect_streams: 314 if self._previous_file_obj is not None: 315 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 316 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 317 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 318 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 319 self.close(unused_only=True) 320 321 ### Sanity check in case writing somehow fails. 322 if self._previous_file_obj is self._current_file_obj: 323 self._previous_file_obj = None 324 325 self.delete(unused_only=True) 326 327 def close(self, unused_only: bool = False) -> None: 328 """ 329 Close any open file descriptors. 330 331 Parameters 332 ---------- 333 unused_only: bool, default False 334 If `True`, only close file descriptors not currently in use. 335 """ 336 self.stop_log_fd_interception(unused_only=unused_only) 337 subfile_indices = sorted(self.subfile_objects.keys()) 338 for subfile_index in subfile_indices: 339 subfile_object = self.subfile_objects[subfile_index] 340 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 341 continue 342 try: 343 if not subfile_object.closed: 344 subfile_object.close() 345 except Exception: 346 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 347 348 _ = self.subfile_objects.pop(subfile_index, None) 349 if self.redirect_streams: 350 _ = self._redirected_subfile_objects.pop(subfile_index, None) 351 352 if not unused_only: 353 self._previous_file_obj = None 354 self._current_file_obj = None 355 356 357 def get_timestamp_prefix_str(self) -> str: 358 """ 359 Return the current minute prefix string. 360 """ 361 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | ' 362 363 364 def write(self, data: str) -> None: 365 """ 366 Write the given text into the latest subfile. 367 If the subfile will be too large, create a new subfile. 368 If too many subfiles exist at once, the oldest one will be deleted. 369 370 NOTE: This will not split data across multiple files. 371 As such, if data is larger than max_file_size, then the corresponding subfile 372 may exceed this limit. 373 """ 374 try: 375 if callable(self.write_callback): 376 self.write_callback(data) 377 except Exception: 378 warn(f"Failed to execute write callback:\n{traceback.format_exc()}") 379 380 try: 381 self.file_path.parent.mkdir(exist_ok=True, parents=True) 382 if isinstance(data, bytes): 383 data = data.decode('utf-8') 384 385 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 386 suffix_str = "\n" if self.write_timestamps else "" 387 self.refresh_files( 388 potential_new_len = len(prefix_str + data + suffix_str), 389 start_interception = self.write_timestamps, 390 ) 391 try: 392 if prefix_str: 393 self._current_file_obj.write(prefix_str) 394 self._current_file_obj.write(data) 395 if suffix_str: 396 self._current_file_obj.write(suffix_str) 397 except BrokenPipeError: 398 warn("BrokenPipeError encountered. The daemon may have been terminated.") 399 return 400 except Exception: 401 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 402 self.flush() 403 self.delete(unused_only=True) 404 except Exception as e: 405 warn(f"Unexpected error in RotatingFile.write: {e}") 406 407 408 def delete(self, unused_only: bool = False) -> None: 409 """ 410 Delete old subfiles. 411 412 Parameters 413 ---------- 414 unused_only: bool, default False 415 If `True`, only delete subfiles which are no longer needed. 416 """ 417 existing_subfile_paths = self.get_existing_subfile_paths() 418 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 419 return 420 421 self.flush() 422 self.close(unused_only=unused_only) 423 424 end_ix = ( 425 (-1 * self.num_files_to_keep) 426 if unused_only 427 else len(existing_subfile_paths) 428 ) 429 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 430 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 431 432 try: 433 subfile_path_to_delete.unlink() 434 except Exception: 435 warn( 436 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 437 + f"{traceback.format_exc()}" 438 ) 439 440 441 def read(self, *args, **kwargs) -> str: 442 """ 443 Read the contents of the existing subfiles. 444 """ 445 existing_subfile_indices = [ 446 self.get_index_from_subfile_name(subfile_path.name) 447 for subfile_path in self.get_existing_subfile_paths() 448 ] 449 paths_to_read = [ 450 self.get_subfile_path_from_index(subfile_index) 451 for subfile_index in existing_subfile_indices 452 if subfile_index >= self._cursor[0] 453 ] 454 buffer = '' 455 refresh_cursor = True 456 for subfile_path in paths_to_read: 457 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 458 seek_ix = ( 459 self._cursor[1] 460 if subfile_index == self._cursor[0] 461 else 0 462 ) 463 464 if ( 465 subfile_index in self.subfile_objects 466 and 467 subfile_index not in self._redirected_subfile_objects 468 ): 469 subfile_object = self.subfile_objects[subfile_index] 470 for i in range(self.SEEK_BACK_ATTEMPTS): 471 try: 472 subfile_object.seek(max(seek_ix - i, 0)) 473 buffer += subfile_object.read() 474 except UnicodeDecodeError: 475 continue 476 break 477 else: 478 with open(subfile_path, 'r', encoding='utf-8') as f: 479 for i in range(self.SEEK_BACK_ATTEMPTS): 480 try: 481 f.seek(max(seek_ix - i, 0)) 482 buffer += f.read() 483 except UnicodeDecodeError: 484 continue 485 break 486 487 ### Handle the case when no files have yet been opened. 488 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 489 self._cursor = (subfile_index, f.tell()) 490 refresh_cursor = False 491 492 if refresh_cursor: 493 self.refresh_cursor() 494 return buffer 495 496 497 def refresh_cursor(self) -> None: 498 """ 499 Update the cursor to the latest subfile index and file.tell() value. 500 """ 501 self.flush() 502 existing_subfile_paths = self.get_existing_subfile_paths() 503 current_ix = ( 504 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 505 if existing_subfile_paths 506 else 0 507 ) 508 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 509 self._cursor = (current_ix, position) 510 511 512 def readlines(self) -> List[str]: 513 """ 514 Return a list of lines of text. 515 """ 516 existing_subfile_indices = [ 517 self.get_index_from_subfile_name(subfile_path.name) 518 for subfile_path in self.get_existing_subfile_paths() 519 ] 520 paths_to_read = [ 521 self.get_subfile_path_from_index(subfile_index) 522 for subfile_index in existing_subfile_indices 523 if subfile_index >= self._cursor[0] 524 ] 525 526 lines = [] 527 refresh_cursor = True 528 for subfile_path in paths_to_read: 529 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 530 seek_ix = ( 531 self._cursor[1] 532 if subfile_index == self._cursor[0] 533 else 0 534 ) 535 536 subfile_lines = [] 537 if ( 538 subfile_index in self.subfile_objects 539 and 540 subfile_index not in self._redirected_subfile_objects 541 ): 542 subfile_object = self.subfile_objects[subfile_index] 543 for i in range(self.SEEK_BACK_ATTEMPTS): 544 try: 545 subfile_object.seek(max((seek_ix - i), 0)) 546 subfile_lines = subfile_object.readlines() 547 except UnicodeDecodeError: 548 continue 549 break 550 else: 551 with open(subfile_path, 'r', encoding='utf-8') as f: 552 for i in range(self.SEEK_BACK_ATTEMPTS): 553 try: 554 f.seek(max(seek_ix - i, 0)) 555 subfile_lines = f.readlines() 556 except UnicodeDecodeError: 557 continue 558 break 559 560 ### Handle the case when no files have yet been opened. 561 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 562 self._cursor = (subfile_index, f.tell()) 563 refresh_cursor = False 564 565 ### Sometimes a line may span multiple files. 566 if lines and subfile_lines and not lines[-1].endswith('\n'): 567 lines[-1] += subfile_lines[0] 568 new_lines = subfile_lines[1:] 569 else: 570 new_lines = subfile_lines 571 lines.extend(new_lines) 572 573 if refresh_cursor: 574 self.refresh_cursor() 575 return lines 576 577 578 def seekable(self) -> bool: 579 return True 580 581 582 def seek(self, position: int) -> None: 583 """ 584 Seek to the beginning of the logs stream. 585 """ 586 existing_subfile_indices = self.get_existing_subfile_indices() 587 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 588 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 589 if position == 0: 590 self._cursor = (min_ix, 0) 591 return 592 593 self._cursor = (max_ix, position) 594 if self._current_file_obj is not None: 595 self._current_file_obj.seek(position) 596 597 598 def flush(self) -> None: 599 """ 600 Flush any open subfiles. 601 """ 602 for subfile_index, subfile_object in self.subfile_objects.items(): 603 if not subfile_object.closed: 604 try: 605 subfile_object.flush() 606 except Exception: 607 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 608 609 if self.redirect_streams: 610 try: 611 sys.stdout.flush() 612 except BrokenPipeError: 613 pass 614 except Exception: 615 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 616 try: 617 sys.stderr.flush() 618 except BrokenPipeError: 619 pass 620 except Exception: 621 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}") 622 623 624 def start_log_fd_interception(self): 625 """ 626 Start the file descriptor monitoring threads. 627 """ 628 if not self.write_timestamps: 629 return 630 631 self._stdout_interceptor = FileDescriptorInterceptor( 632 sys.stdout.fileno(), 633 self.get_timestamp_prefix_str, 634 ) 635 self._stderr_interceptor = FileDescriptorInterceptor( 636 sys.stderr.fileno(), 637 self.get_timestamp_prefix_str, 638 ) 639 640 self._stdout_interceptor_thread = Thread( 641 target = self._stdout_interceptor.start_interception, 642 daemon = True, 643 ) 644 self._stderr_interceptor_thread = Thread( 645 target = self._stderr_interceptor.start_interception, 646 daemon = True, 647 ) 648 self._stdout_interceptor_thread.start() 649 self._stderr_interceptor_thread.start() 650 self._intercepting = True 651 652 if '_interceptor_threads' not in self.__dict__: 653 self._interceptor_threads = [] 654 if '_interceptors' not in self.__dict__: 655 self._interceptors = [] 656 self._interceptor_threads.extend([ 657 self._stdout_interceptor_thread, 658 self._stderr_interceptor_thread, 659 ]) 660 self._interceptors.extend([ 661 self._stdout_interceptor, 662 self._stderr_interceptor, 663 ]) 664 self.stop_log_fd_interception(unused_only=True) 665 666 667 def stop_log_fd_interception(self, unused_only: bool = False): 668 """ 669 Stop the file descriptor monitoring threads. 670 """ 671 if not self.write_timestamps: 672 return 673 674 interceptors = self.__dict__.get('_interceptors', []) 675 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 676 677 end_ix = len(interceptors) if not unused_only else -2 678 679 for interceptor in interceptors[:end_ix]: 680 interceptor.stop_interception() 681 del interceptors[:end_ix] 682 683 for thread in interceptor_threads[:end_ix]: 684 try: 685 thread.join() 686 except Exception: 687 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 688 del interceptor_threads[:end_ix] 689 690 def touch(self): 691 """ 692 Touch the latest subfile. 693 """ 694 subfile_path = self.get_latest_subfile_path() 695 subfile_path.touch() 696 697 def isatty(self) -> bool: 698 return True 699 700 def __repr__(self) -> str: 701 """ 702 Return basic info for this `RotatingFile`. 703 """ 704 return ( 705 "RotatingFile(" 706 + f"'{self.file_path.as_posix()}', " 707 + f"num_files_to_keep={self.num_files_to_keep}, " 708 + f"max_file_size={self.max_file_size})" 709 )
27class RotatingFile(io.IOBase): 28 """ 29 A `RotatingFile` may be treated like a normal file-like object. 30 Under the hood, however, it will create new sub-files and delete old ones. 31 """ 32 33 SEEK_BACK_ATTEMPTS: int = 5 34 35 def __init__( 36 self, 37 file_path: pathlib.Path, 38 num_files_to_keep: Optional[int] = None, 39 max_file_size: Optional[int] = None, 40 redirect_streams: bool = False, 41 write_timestamps: bool = False, 42 timestamp_format: Optional[str] = None, 43 write_callback: Optional[Callable[[str], None]] = None, 44 ): 45 """ 46 Create a file-like object which manages other files. 47 48 Parameters 49 ---------- 50 num_files_to_keep: int, default None 51 How many sub-files to keep at any given time. 52 Defaults to the configured value (5). 53 54 max_file_size: int, default None 55 How large in bytes each sub-file can grow before another file is created. 56 Note that this is not a hard limit but rather a threshold 57 which may be slightly exceeded. 58 Defaults to the configured value (100_000). 59 60 redirect_streams: bool, default False 61 If `True`, redirect previous file streams when opening a new file descriptor. 62 63 NOTE: Only set this to `True` if you are entering into a daemon context. 64 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 65 66 write_timestamps: bool, default False 67 If `True`, prepend the current UTC timestamp to each line of the file. 68 69 timestamp_format: str, default None 70 If `write_timestamps` is `True`, use this format for the timestamps. 71 Defaults to `'%Y-%m-%d %H:%M'`. 72 73 write_callback: Optional[Callable[[str], None]], default None 74 If provided, execute this callback with the data to be written. 75 """ 76 self.file_path = pathlib.Path(file_path) 77 if num_files_to_keep is None: 78 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 79 if max_file_size is None: 80 max_file_size = get_config('jobs', 'logs', 'max_file_size') 81 if timestamp_format is None: 82 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 83 if num_files_to_keep < 1: 84 raise ValueError("At least 1 file must be kept.") 85 if max_file_size < 100: 86 raise ValueError("Subfiles must contain at least 100 bytes.") 87 88 self.num_files_to_keep = num_files_to_keep 89 self.max_file_size = max_file_size 90 self.redirect_streams = redirect_streams 91 self.write_timestamps = write_timestamps 92 self.timestamp_format = timestamp_format 93 self.write_callback = write_callback 94 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?') 95 96 ### When subfiles are opened, map from their index to the file objects. 97 self.subfile_objects = {} 98 self._redirected_subfile_objects = {} 99 self._current_file_obj = None 100 self._previous_file_obj = None 101 102 ### When reading, keep track of the file index and position. 103 self._cursor: Tuple[int, int] = (0, 0) 104 105 ### Don't forget to close any stray files. 106 atexit.register(self.close) 107 108 109 def fileno(self): 110 """ 111 Return the file descriptor for the latest subfile. 112 """ 113 self.refresh_files(start_interception=False) 114 return self._current_file_obj.fileno() 115 116 117 def get_latest_subfile_path(self) -> pathlib.Path: 118 """ 119 Return the path for the latest subfile to which to write into. 120 """ 121 return self.get_subfile_path_from_index( 122 self.get_latest_subfile_index() 123 ) 124 125 126 def get_remaining_subfile_size(self, subfile_index: int) -> int: 127 """ 128 Return the remaining buffer size for a subfile. 129 130 Parameters 131 --------- 132 subfile_index: int 133 The index of the subfile to be checked. 134 135 Returns 136 ------- 137 The remaining size in bytes. 138 """ 139 subfile_path = self.get_subfile_path_from_index(subfile_index) 140 if not subfile_path.exists(): 141 return self.max_file_size 142 143 return self.max_file_size - os.path.getsize(subfile_path) 144 145 146 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 147 """ 148 Return whether a given subfile is too large. 149 150 Parameters 151 ---------- 152 subfile_index: int 153 The index of the subfile to be checked. 154 155 potential_new_len: int, default 0 156 The length of a potential write of new data. 157 158 Returns 159 ------- 160 A bool indicating the subfile is or will be too large. 161 """ 162 subfile_path = self.get_subfile_path_from_index(subfile_index) 163 if not subfile_path.exists(): 164 return False 165 166 self.flush() 167 168 return ( 169 (os.path.getsize(subfile_path) + potential_new_len) 170 >= 171 self.max_file_size 172 ) 173 174 175 def get_latest_subfile_index(self) -> int: 176 """ 177 Return the latest existing subfile index. 178 If no index may be found, return -1. 179 """ 180 existing_subfile_paths = self.get_existing_subfile_paths() 181 latest_index = ( 182 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 183 if existing_subfile_paths 184 else 0 185 ) 186 return latest_index 187 188 189 def get_index_from_subfile_name(self, subfile_name: str) -> int: 190 """ 191 Return the index from a given subfile name. 192 If the file name cannot be parsed, return -1. 193 """ 194 try: 195 return int(subfile_name.replace(self.file_path.name + '.', '')) 196 except Exception: 197 return -1 198 199 200 def get_subfile_name_from_index(self, subfile_index: int) -> str: 201 """ 202 Return the subfile name from the given index. 203 """ 204 return f'{self.file_path.name}.{subfile_index}' 205 206 207 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 208 """ 209 Return the subfile's path from its index. 210 """ 211 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index) 212 213 214 def get_existing_subfile_indices(self) -> List[int]: 215 """ 216 Return of list of subfile indices which exist on disk. 217 """ 218 existing_subfile_paths = self.get_existing_subfile_paths() 219 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths] 220 221 222 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 223 """ 224 Return a list of file paths that match the input filename pattern. 225 """ 226 if not self.file_path.parent.exists(): 227 return [] 228 229 subfile_names_indices = sorted( 230 [ 231 (file_name, self.get_index_from_subfile_name(file_name)) 232 for file_name in os.listdir(self.file_path.parent) 233 if ( 234 file_name.startswith(self.file_path.name) 235 and re.match(self.subfile_regex_pattern, file_name) 236 ) 237 ], 238 key=lambda x: x[1], 239 ) 240 return [ 241 (self.file_path.parent / file_name) 242 for file_name, _ in subfile_names_indices 243 ] 244 245 246 def refresh_files( 247 self, 248 potential_new_len: int = 0, 249 start_interception: bool = False, 250 ) -> '_io.TextUIWrapper': 251 """ 252 Check the state of the subfiles. 253 If the latest subfile is too large, create a new file and delete old ones. 254 255 Parameters 256 ---------- 257 potential_new_len: int, default 0 258 259 start_interception: bool, default False 260 If `True`, kick off the file interception threads. 261 """ 262 self.flush() 263 264 latest_subfile_index = self.get_latest_subfile_index() 265 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 266 267 ### First run with existing log files: open the most recent log file. 268 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 269 270 ### Sometimes a new file is created but output doesn't switch over. 271 lost_latest_handle = ( 272 self._current_file_obj is not None 273 and 274 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 275 ) 276 if is_first_run_with_logs or lost_latest_handle: 277 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 278 if self.redirect_streams: 279 try: 280 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 281 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 282 except OSError: 283 warn( 284 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 285 ) 286 if start_interception and self.write_timestamps: 287 self.start_log_fd_interception() 288 289 create_new_file = ( 290 (latest_subfile_index == -1) 291 or 292 self._current_file_obj is None 293 or 294 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 295 ) 296 if create_new_file: 297 self.increment_subfiles() 298 299 return self._current_file_obj 300 301 def increment_subfiles(self, increment_by: int = 1): 302 """ 303 Create a new subfile and switch the file pointer over. 304 """ 305 latest_subfile_index = self.get_latest_subfile_index() 306 old_subfile_index = latest_subfile_index 307 new_subfile_index = old_subfile_index + increment_by 308 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 309 self._previous_file_obj = self._current_file_obj 310 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 311 self.subfile_objects[new_subfile_index] = self._current_file_obj 312 self.flush() 313 314 if self.redirect_streams: 315 if self._previous_file_obj is not None: 316 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 317 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 318 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 319 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 320 self.close(unused_only=True) 321 322 ### Sanity check in case writing somehow fails. 323 if self._previous_file_obj is self._current_file_obj: 324 self._previous_file_obj = None 325 326 self.delete(unused_only=True) 327 328 def close(self, unused_only: bool = False) -> None: 329 """ 330 Close any open file descriptors. 331 332 Parameters 333 ---------- 334 unused_only: bool, default False 335 If `True`, only close file descriptors not currently in use. 336 """ 337 self.stop_log_fd_interception(unused_only=unused_only) 338 subfile_indices = sorted(self.subfile_objects.keys()) 339 for subfile_index in subfile_indices: 340 subfile_object = self.subfile_objects[subfile_index] 341 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 342 continue 343 try: 344 if not subfile_object.closed: 345 subfile_object.close() 346 except Exception: 347 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 348 349 _ = self.subfile_objects.pop(subfile_index, None) 350 if self.redirect_streams: 351 _ = self._redirected_subfile_objects.pop(subfile_index, None) 352 353 if not unused_only: 354 self._previous_file_obj = None 355 self._current_file_obj = None 356 357 358 def get_timestamp_prefix_str(self) -> str: 359 """ 360 Return the current minute prefix string. 361 """ 362 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | ' 363 364 365 def write(self, data: str) -> None: 366 """ 367 Write the given text into the latest subfile. 368 If the subfile will be too large, create a new subfile. 369 If too many subfiles exist at once, the oldest one will be deleted. 370 371 NOTE: This will not split data across multiple files. 372 As such, if data is larger than max_file_size, then the corresponding subfile 373 may exceed this limit. 374 """ 375 try: 376 if callable(self.write_callback): 377 self.write_callback(data) 378 except Exception: 379 warn(f"Failed to execute write callback:\n{traceback.format_exc()}") 380 381 try: 382 self.file_path.parent.mkdir(exist_ok=True, parents=True) 383 if isinstance(data, bytes): 384 data = data.decode('utf-8') 385 386 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 387 suffix_str = "\n" if self.write_timestamps else "" 388 self.refresh_files( 389 potential_new_len = len(prefix_str + data + suffix_str), 390 start_interception = self.write_timestamps, 391 ) 392 try: 393 if prefix_str: 394 self._current_file_obj.write(prefix_str) 395 self._current_file_obj.write(data) 396 if suffix_str: 397 self._current_file_obj.write(suffix_str) 398 except BrokenPipeError: 399 warn("BrokenPipeError encountered. The daemon may have been terminated.") 400 return 401 except Exception: 402 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 403 self.flush() 404 self.delete(unused_only=True) 405 except Exception as e: 406 warn(f"Unexpected error in RotatingFile.write: {e}") 407 408 409 def delete(self, unused_only: bool = False) -> None: 410 """ 411 Delete old subfiles. 412 413 Parameters 414 ---------- 415 unused_only: bool, default False 416 If `True`, only delete subfiles which are no longer needed. 417 """ 418 existing_subfile_paths = self.get_existing_subfile_paths() 419 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 420 return 421 422 self.flush() 423 self.close(unused_only=unused_only) 424 425 end_ix = ( 426 (-1 * self.num_files_to_keep) 427 if unused_only 428 else len(existing_subfile_paths) 429 ) 430 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 431 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 432 433 try: 434 subfile_path_to_delete.unlink() 435 except Exception: 436 warn( 437 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 438 + f"{traceback.format_exc()}" 439 ) 440 441 442 def read(self, *args, **kwargs) -> str: 443 """ 444 Read the contents of the existing subfiles. 445 """ 446 existing_subfile_indices = [ 447 self.get_index_from_subfile_name(subfile_path.name) 448 for subfile_path in self.get_existing_subfile_paths() 449 ] 450 paths_to_read = [ 451 self.get_subfile_path_from_index(subfile_index) 452 for subfile_index in existing_subfile_indices 453 if subfile_index >= self._cursor[0] 454 ] 455 buffer = '' 456 refresh_cursor = True 457 for subfile_path in paths_to_read: 458 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 459 seek_ix = ( 460 self._cursor[1] 461 if subfile_index == self._cursor[0] 462 else 0 463 ) 464 465 if ( 466 subfile_index in self.subfile_objects 467 and 468 subfile_index not in self._redirected_subfile_objects 469 ): 470 subfile_object = self.subfile_objects[subfile_index] 471 for i in range(self.SEEK_BACK_ATTEMPTS): 472 try: 473 subfile_object.seek(max(seek_ix - i, 0)) 474 buffer += subfile_object.read() 475 except UnicodeDecodeError: 476 continue 477 break 478 else: 479 with open(subfile_path, 'r', encoding='utf-8') as f: 480 for i in range(self.SEEK_BACK_ATTEMPTS): 481 try: 482 f.seek(max(seek_ix - i, 0)) 483 buffer += f.read() 484 except UnicodeDecodeError: 485 continue 486 break 487 488 ### Handle the case when no files have yet been opened. 489 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 490 self._cursor = (subfile_index, f.tell()) 491 refresh_cursor = False 492 493 if refresh_cursor: 494 self.refresh_cursor() 495 return buffer 496 497 498 def refresh_cursor(self) -> None: 499 """ 500 Update the cursor to the latest subfile index and file.tell() value. 501 """ 502 self.flush() 503 existing_subfile_paths = self.get_existing_subfile_paths() 504 current_ix = ( 505 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 506 if existing_subfile_paths 507 else 0 508 ) 509 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 510 self._cursor = (current_ix, position) 511 512 513 def readlines(self) -> List[str]: 514 """ 515 Return a list of lines of text. 516 """ 517 existing_subfile_indices = [ 518 self.get_index_from_subfile_name(subfile_path.name) 519 for subfile_path in self.get_existing_subfile_paths() 520 ] 521 paths_to_read = [ 522 self.get_subfile_path_from_index(subfile_index) 523 for subfile_index in existing_subfile_indices 524 if subfile_index >= self._cursor[0] 525 ] 526 527 lines = [] 528 refresh_cursor = True 529 for subfile_path in paths_to_read: 530 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 531 seek_ix = ( 532 self._cursor[1] 533 if subfile_index == self._cursor[0] 534 else 0 535 ) 536 537 subfile_lines = [] 538 if ( 539 subfile_index in self.subfile_objects 540 and 541 subfile_index not in self._redirected_subfile_objects 542 ): 543 subfile_object = self.subfile_objects[subfile_index] 544 for i in range(self.SEEK_BACK_ATTEMPTS): 545 try: 546 subfile_object.seek(max((seek_ix - i), 0)) 547 subfile_lines = subfile_object.readlines() 548 except UnicodeDecodeError: 549 continue 550 break 551 else: 552 with open(subfile_path, 'r', encoding='utf-8') as f: 553 for i in range(self.SEEK_BACK_ATTEMPTS): 554 try: 555 f.seek(max(seek_ix - i, 0)) 556 subfile_lines = f.readlines() 557 except UnicodeDecodeError: 558 continue 559 break 560 561 ### Handle the case when no files have yet been opened. 562 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 563 self._cursor = (subfile_index, f.tell()) 564 refresh_cursor = False 565 566 ### Sometimes a line may span multiple files. 567 if lines and subfile_lines and not lines[-1].endswith('\n'): 568 lines[-1] += subfile_lines[0] 569 new_lines = subfile_lines[1:] 570 else: 571 new_lines = subfile_lines 572 lines.extend(new_lines) 573 574 if refresh_cursor: 575 self.refresh_cursor() 576 return lines 577 578 579 def seekable(self) -> bool: 580 return True 581 582 583 def seek(self, position: int) -> None: 584 """ 585 Seek to the beginning of the logs stream. 586 """ 587 existing_subfile_indices = self.get_existing_subfile_indices() 588 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 589 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 590 if position == 0: 591 self._cursor = (min_ix, 0) 592 return 593 594 self._cursor = (max_ix, position) 595 if self._current_file_obj is not None: 596 self._current_file_obj.seek(position) 597 598 599 def flush(self) -> None: 600 """ 601 Flush any open subfiles. 602 """ 603 for subfile_index, subfile_object in self.subfile_objects.items(): 604 if not subfile_object.closed: 605 try: 606 subfile_object.flush() 607 except Exception: 608 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 609 610 if self.redirect_streams: 611 try: 612 sys.stdout.flush() 613 except BrokenPipeError: 614 pass 615 except Exception: 616 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 617 try: 618 sys.stderr.flush() 619 except BrokenPipeError: 620 pass 621 except Exception: 622 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}") 623 624 625 def start_log_fd_interception(self): 626 """ 627 Start the file descriptor monitoring threads. 628 """ 629 if not self.write_timestamps: 630 return 631 632 self._stdout_interceptor = FileDescriptorInterceptor( 633 sys.stdout.fileno(), 634 self.get_timestamp_prefix_str, 635 ) 636 self._stderr_interceptor = FileDescriptorInterceptor( 637 sys.stderr.fileno(), 638 self.get_timestamp_prefix_str, 639 ) 640 641 self._stdout_interceptor_thread = Thread( 642 target = self._stdout_interceptor.start_interception, 643 daemon = True, 644 ) 645 self._stderr_interceptor_thread = Thread( 646 target = self._stderr_interceptor.start_interception, 647 daemon = True, 648 ) 649 self._stdout_interceptor_thread.start() 650 self._stderr_interceptor_thread.start() 651 self._intercepting = True 652 653 if '_interceptor_threads' not in self.__dict__: 654 self._interceptor_threads = [] 655 if '_interceptors' not in self.__dict__: 656 self._interceptors = [] 657 self._interceptor_threads.extend([ 658 self._stdout_interceptor_thread, 659 self._stderr_interceptor_thread, 660 ]) 661 self._interceptors.extend([ 662 self._stdout_interceptor, 663 self._stderr_interceptor, 664 ]) 665 self.stop_log_fd_interception(unused_only=True) 666 667 668 def stop_log_fd_interception(self, unused_only: bool = False): 669 """ 670 Stop the file descriptor monitoring threads. 671 """ 672 if not self.write_timestamps: 673 return 674 675 interceptors = self.__dict__.get('_interceptors', []) 676 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 677 678 end_ix = len(interceptors) if not unused_only else -2 679 680 for interceptor in interceptors[:end_ix]: 681 interceptor.stop_interception() 682 del interceptors[:end_ix] 683 684 for thread in interceptor_threads[:end_ix]: 685 try: 686 thread.join() 687 except Exception: 688 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 689 del interceptor_threads[:end_ix] 690 691 def touch(self): 692 """ 693 Touch the latest subfile. 694 """ 695 subfile_path = self.get_latest_subfile_path() 696 subfile_path.touch() 697 698 def isatty(self) -> bool: 699 return True 700 701 def __repr__(self) -> str: 702 """ 703 Return basic info for this `RotatingFile`. 704 """ 705 return ( 706 "RotatingFile(" 707 + f"'{self.file_path.as_posix()}', " 708 + f"num_files_to_keep={self.num_files_to_keep}, " 709 + f"max_file_size={self.max_file_size})" 710 )
A RotatingFile
may be treated like a normal file-like object.
Under the hood, however, it will create new sub-files and delete old ones.
35 def __init__( 36 self, 37 file_path: pathlib.Path, 38 num_files_to_keep: Optional[int] = None, 39 max_file_size: Optional[int] = None, 40 redirect_streams: bool = False, 41 write_timestamps: bool = False, 42 timestamp_format: Optional[str] = None, 43 write_callback: Optional[Callable[[str], None]] = None, 44 ): 45 """ 46 Create a file-like object which manages other files. 47 48 Parameters 49 ---------- 50 num_files_to_keep: int, default None 51 How many sub-files to keep at any given time. 52 Defaults to the configured value (5). 53 54 max_file_size: int, default None 55 How large in bytes each sub-file can grow before another file is created. 56 Note that this is not a hard limit but rather a threshold 57 which may be slightly exceeded. 58 Defaults to the configured value (100_000). 59 60 redirect_streams: bool, default False 61 If `True`, redirect previous file streams when opening a new file descriptor. 62 63 NOTE: Only set this to `True` if you are entering into a daemon context. 64 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 65 66 write_timestamps: bool, default False 67 If `True`, prepend the current UTC timestamp to each line of the file. 68 69 timestamp_format: str, default None 70 If `write_timestamps` is `True`, use this format for the timestamps. 71 Defaults to `'%Y-%m-%d %H:%M'`. 72 73 write_callback: Optional[Callable[[str], None]], default None 74 If provided, execute this callback with the data to be written. 75 """ 76 self.file_path = pathlib.Path(file_path) 77 if num_files_to_keep is None: 78 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 79 if max_file_size is None: 80 max_file_size = get_config('jobs', 'logs', 'max_file_size') 81 if timestamp_format is None: 82 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 83 if num_files_to_keep < 1: 84 raise ValueError("At least 1 file must be kept.") 85 if max_file_size < 100: 86 raise ValueError("Subfiles must contain at least 100 bytes.") 87 88 self.num_files_to_keep = num_files_to_keep 89 self.max_file_size = max_file_size 90 self.redirect_streams = redirect_streams 91 self.write_timestamps = write_timestamps 92 self.timestamp_format = timestamp_format 93 self.write_callback = write_callback 94 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?') 95 96 ### When subfiles are opened, map from their index to the file objects. 97 self.subfile_objects = {} 98 self._redirected_subfile_objects = {} 99 self._current_file_obj = None 100 self._previous_file_obj = None 101 102 ### When reading, keep track of the file index and position. 103 self._cursor: Tuple[int, int] = (0, 0) 104 105 ### Don't forget to close any stray files. 106 atexit.register(self.close)
Create a file-like object which manages other files.
Parameters
- num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
- max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
redirect_streams (bool, default False): If
True
, redirect previous file streams when opening a new file descriptor.NOTE: Only set this to
True
if you are entering into a daemon context. Doing so will redirectsys.stdout
andsys.stderr
into the log files.- write_timestamps (bool, default False):
If
True
, prepend the current UTC timestamp to each line of the file. - timestamp_format (str, default None):
If
write_timestamps
isTrue
, use this format for the timestamps. Defaults to'%Y-%m-%d %H:%M'
. - write_callback (Optional[Callable[[str], None]], default None): If provided, execute this callback with the data to be written.
109 def fileno(self): 110 """ 111 Return the file descriptor for the latest subfile. 112 """ 113 self.refresh_files(start_interception=False) 114 return self._current_file_obj.fileno()
Return the file descriptor for the latest subfile.
117 def get_latest_subfile_path(self) -> pathlib.Path: 118 """ 119 Return the path for the latest subfile to which to write into. 120 """ 121 return self.get_subfile_path_from_index( 122 self.get_latest_subfile_index() 123 )
Return the path for the latest subfile to which to write into.
126 def get_remaining_subfile_size(self, subfile_index: int) -> int: 127 """ 128 Return the remaining buffer size for a subfile. 129 130 Parameters 131 --------- 132 subfile_index: int 133 The index of the subfile to be checked. 134 135 Returns 136 ------- 137 The remaining size in bytes. 138 """ 139 subfile_path = self.get_subfile_path_from_index(subfile_index) 140 if not subfile_path.exists(): 141 return self.max_file_size 142 143 return self.max_file_size - os.path.getsize(subfile_path)
Return the remaining buffer size for a subfile.
Parameters
- subfile_index (int): The index of the subfile to be checked.
Returns
- The remaining size in bytes.
146 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 147 """ 148 Return whether a given subfile is too large. 149 150 Parameters 151 ---------- 152 subfile_index: int 153 The index of the subfile to be checked. 154 155 potential_new_len: int, default 0 156 The length of a potential write of new data. 157 158 Returns 159 ------- 160 A bool indicating the subfile is or will be too large. 161 """ 162 subfile_path = self.get_subfile_path_from_index(subfile_index) 163 if not subfile_path.exists(): 164 return False 165 166 self.flush() 167 168 return ( 169 (os.path.getsize(subfile_path) + potential_new_len) 170 >= 171 self.max_file_size 172 )
Return whether a given subfile is too large.
Parameters
- subfile_index (int): The index of the subfile to be checked.
- potential_new_len (int, default 0): The length of a potential write of new data.
Returns
- A bool indicating the subfile is or will be too large.
175 def get_latest_subfile_index(self) -> int: 176 """ 177 Return the latest existing subfile index. 178 If no index may be found, return -1. 179 """ 180 existing_subfile_paths = self.get_existing_subfile_paths() 181 latest_index = ( 182 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 183 if existing_subfile_paths 184 else 0 185 ) 186 return latest_index
Return the latest existing subfile index. If no index may be found, return -1.
189 def get_index_from_subfile_name(self, subfile_name: str) -> int: 190 """ 191 Return the index from a given subfile name. 192 If the file name cannot be parsed, return -1. 193 """ 194 try: 195 return int(subfile_name.replace(self.file_path.name + '.', '')) 196 except Exception: 197 return -1
Return the index from a given subfile name. If the file name cannot be parsed, return -1.
200 def get_subfile_name_from_index(self, subfile_index: int) -> str: 201 """ 202 Return the subfile name from the given index. 203 """ 204 return f'{self.file_path.name}.{subfile_index}'
Return the subfile name from the given index.
207 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 208 """ 209 Return the subfile's path from its index. 210 """ 211 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
Return the subfile's path from its index.
214 def get_existing_subfile_indices(self) -> List[int]: 215 """ 216 Return of list of subfile indices which exist on disk. 217 """ 218 existing_subfile_paths = self.get_existing_subfile_paths() 219 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
Return of list of subfile indices which exist on disk.
222 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 223 """ 224 Return a list of file paths that match the input filename pattern. 225 """ 226 if not self.file_path.parent.exists(): 227 return [] 228 229 subfile_names_indices = sorted( 230 [ 231 (file_name, self.get_index_from_subfile_name(file_name)) 232 for file_name in os.listdir(self.file_path.parent) 233 if ( 234 file_name.startswith(self.file_path.name) 235 and re.match(self.subfile_regex_pattern, file_name) 236 ) 237 ], 238 key=lambda x: x[1], 239 ) 240 return [ 241 (self.file_path.parent / file_name) 242 for file_name, _ in subfile_names_indices 243 ]
Return a list of file paths that match the input filename pattern.
246 def refresh_files( 247 self, 248 potential_new_len: int = 0, 249 start_interception: bool = False, 250 ) -> '_io.TextUIWrapper': 251 """ 252 Check the state of the subfiles. 253 If the latest subfile is too large, create a new file and delete old ones. 254 255 Parameters 256 ---------- 257 potential_new_len: int, default 0 258 259 start_interception: bool, default False 260 If `True`, kick off the file interception threads. 261 """ 262 self.flush() 263 264 latest_subfile_index = self.get_latest_subfile_index() 265 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 266 267 ### First run with existing log files: open the most recent log file. 268 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 269 270 ### Sometimes a new file is created but output doesn't switch over. 271 lost_latest_handle = ( 272 self._current_file_obj is not None 273 and 274 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 275 ) 276 if is_first_run_with_logs or lost_latest_handle: 277 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 278 if self.redirect_streams: 279 try: 280 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 281 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 282 except OSError: 283 warn( 284 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 285 ) 286 if start_interception and self.write_timestamps: 287 self.start_log_fd_interception() 288 289 create_new_file = ( 290 (latest_subfile_index == -1) 291 or 292 self._current_file_obj is None 293 or 294 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 295 ) 296 if create_new_file: 297 self.increment_subfiles() 298 299 return self._current_file_obj
Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.
Parameters
potential_new_len (int, default 0):
start_interception (bool, default False): If
True
, kick off the file interception threads.
301 def increment_subfiles(self, increment_by: int = 1): 302 """ 303 Create a new subfile and switch the file pointer over. 304 """ 305 latest_subfile_index = self.get_latest_subfile_index() 306 old_subfile_index = latest_subfile_index 307 new_subfile_index = old_subfile_index + increment_by 308 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 309 self._previous_file_obj = self._current_file_obj 310 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 311 self.subfile_objects[new_subfile_index] = self._current_file_obj 312 self.flush() 313 314 if self.redirect_streams: 315 if self._previous_file_obj is not None: 316 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 317 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 318 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 319 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 320 self.close(unused_only=True) 321 322 ### Sanity check in case writing somehow fails. 323 if self._previous_file_obj is self._current_file_obj: 324 self._previous_file_obj = None 325 326 self.delete(unused_only=True)
Create a new subfile and switch the file pointer over.
328 def close(self, unused_only: bool = False) -> None: 329 """ 330 Close any open file descriptors. 331 332 Parameters 333 ---------- 334 unused_only: bool, default False 335 If `True`, only close file descriptors not currently in use. 336 """ 337 self.stop_log_fd_interception(unused_only=unused_only) 338 subfile_indices = sorted(self.subfile_objects.keys()) 339 for subfile_index in subfile_indices: 340 subfile_object = self.subfile_objects[subfile_index] 341 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 342 continue 343 try: 344 if not subfile_object.closed: 345 subfile_object.close() 346 except Exception: 347 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 348 349 _ = self.subfile_objects.pop(subfile_index, None) 350 if self.redirect_streams: 351 _ = self._redirected_subfile_objects.pop(subfile_index, None) 352 353 if not unused_only: 354 self._previous_file_obj = None 355 self._current_file_obj = None
Close any open file descriptors.
Parameters
- unused_only (bool, default False):
If
True
, only close file descriptors not currently in use.
358 def get_timestamp_prefix_str(self) -> str: 359 """ 360 Return the current minute prefix string. 361 """ 362 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
Return the current minute prefix string.
365 def write(self, data: str) -> None: 366 """ 367 Write the given text into the latest subfile. 368 If the subfile will be too large, create a new subfile. 369 If too many subfiles exist at once, the oldest one will be deleted. 370 371 NOTE: This will not split data across multiple files. 372 As such, if data is larger than max_file_size, then the corresponding subfile 373 may exceed this limit. 374 """ 375 try: 376 if callable(self.write_callback): 377 self.write_callback(data) 378 except Exception: 379 warn(f"Failed to execute write callback:\n{traceback.format_exc()}") 380 381 try: 382 self.file_path.parent.mkdir(exist_ok=True, parents=True) 383 if isinstance(data, bytes): 384 data = data.decode('utf-8') 385 386 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 387 suffix_str = "\n" if self.write_timestamps else "" 388 self.refresh_files( 389 potential_new_len = len(prefix_str + data + suffix_str), 390 start_interception = self.write_timestamps, 391 ) 392 try: 393 if prefix_str: 394 self._current_file_obj.write(prefix_str) 395 self._current_file_obj.write(data) 396 if suffix_str: 397 self._current_file_obj.write(suffix_str) 398 except BrokenPipeError: 399 warn("BrokenPipeError encountered. The daemon may have been terminated.") 400 return 401 except Exception: 402 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 403 self.flush() 404 self.delete(unused_only=True) 405 except Exception as e: 406 warn(f"Unexpected error in RotatingFile.write: {e}")
Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.
NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.
409 def delete(self, unused_only: bool = False) -> None: 410 """ 411 Delete old subfiles. 412 413 Parameters 414 ---------- 415 unused_only: bool, default False 416 If `True`, only delete subfiles which are no longer needed. 417 """ 418 existing_subfile_paths = self.get_existing_subfile_paths() 419 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 420 return 421 422 self.flush() 423 self.close(unused_only=unused_only) 424 425 end_ix = ( 426 (-1 * self.num_files_to_keep) 427 if unused_only 428 else len(existing_subfile_paths) 429 ) 430 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 431 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 432 433 try: 434 subfile_path_to_delete.unlink() 435 except Exception: 436 warn( 437 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 438 + f"{traceback.format_exc()}" 439 )
Delete old subfiles.
Parameters
- unused_only (bool, default False):
If
True
, only delete subfiles which are no longer needed.
442 def read(self, *args, **kwargs) -> str: 443 """ 444 Read the contents of the existing subfiles. 445 """ 446 existing_subfile_indices = [ 447 self.get_index_from_subfile_name(subfile_path.name) 448 for subfile_path in self.get_existing_subfile_paths() 449 ] 450 paths_to_read = [ 451 self.get_subfile_path_from_index(subfile_index) 452 for subfile_index in existing_subfile_indices 453 if subfile_index >= self._cursor[0] 454 ] 455 buffer = '' 456 refresh_cursor = True 457 for subfile_path in paths_to_read: 458 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 459 seek_ix = ( 460 self._cursor[1] 461 if subfile_index == self._cursor[0] 462 else 0 463 ) 464 465 if ( 466 subfile_index in self.subfile_objects 467 and 468 subfile_index not in self._redirected_subfile_objects 469 ): 470 subfile_object = self.subfile_objects[subfile_index] 471 for i in range(self.SEEK_BACK_ATTEMPTS): 472 try: 473 subfile_object.seek(max(seek_ix - i, 0)) 474 buffer += subfile_object.read() 475 except UnicodeDecodeError: 476 continue 477 break 478 else: 479 with open(subfile_path, 'r', encoding='utf-8') as f: 480 for i in range(self.SEEK_BACK_ATTEMPTS): 481 try: 482 f.seek(max(seek_ix - i, 0)) 483 buffer += f.read() 484 except UnicodeDecodeError: 485 continue 486 break 487 488 ### Handle the case when no files have yet been opened. 489 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 490 self._cursor = (subfile_index, f.tell()) 491 refresh_cursor = False 492 493 if refresh_cursor: 494 self.refresh_cursor() 495 return buffer
Read the contents of the existing subfiles.
498 def refresh_cursor(self) -> None: 499 """ 500 Update the cursor to the latest subfile index and file.tell() value. 501 """ 502 self.flush() 503 existing_subfile_paths = self.get_existing_subfile_paths() 504 current_ix = ( 505 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 506 if existing_subfile_paths 507 else 0 508 ) 509 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 510 self._cursor = (current_ix, position)
Update the cursor to the latest subfile index and file.tell() value.
513 def readlines(self) -> List[str]: 514 """ 515 Return a list of lines of text. 516 """ 517 existing_subfile_indices = [ 518 self.get_index_from_subfile_name(subfile_path.name) 519 for subfile_path in self.get_existing_subfile_paths() 520 ] 521 paths_to_read = [ 522 self.get_subfile_path_from_index(subfile_index) 523 for subfile_index in existing_subfile_indices 524 if subfile_index >= self._cursor[0] 525 ] 526 527 lines = [] 528 refresh_cursor = True 529 for subfile_path in paths_to_read: 530 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 531 seek_ix = ( 532 self._cursor[1] 533 if subfile_index == self._cursor[0] 534 else 0 535 ) 536 537 subfile_lines = [] 538 if ( 539 subfile_index in self.subfile_objects 540 and 541 subfile_index not in self._redirected_subfile_objects 542 ): 543 subfile_object = self.subfile_objects[subfile_index] 544 for i in range(self.SEEK_BACK_ATTEMPTS): 545 try: 546 subfile_object.seek(max((seek_ix - i), 0)) 547 subfile_lines = subfile_object.readlines() 548 except UnicodeDecodeError: 549 continue 550 break 551 else: 552 with open(subfile_path, 'r', encoding='utf-8') as f: 553 for i in range(self.SEEK_BACK_ATTEMPTS): 554 try: 555 f.seek(max(seek_ix - i, 0)) 556 subfile_lines = f.readlines() 557 except UnicodeDecodeError: 558 continue 559 break 560 561 ### Handle the case when no files have yet been opened. 562 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 563 self._cursor = (subfile_index, f.tell()) 564 refresh_cursor = False 565 566 ### Sometimes a line may span multiple files. 567 if lines and subfile_lines and not lines[-1].endswith('\n'): 568 lines[-1] += subfile_lines[0] 569 new_lines = subfile_lines[1:] 570 else: 571 new_lines = subfile_lines 572 lines.extend(new_lines) 573 574 if refresh_cursor: 575 self.refresh_cursor() 576 return lines
Return a list of lines of text.
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
583 def seek(self, position: int) -> None: 584 """ 585 Seek to the beginning of the logs stream. 586 """ 587 existing_subfile_indices = self.get_existing_subfile_indices() 588 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 589 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 590 if position == 0: 591 self._cursor = (min_ix, 0) 592 return 593 594 self._cursor = (max_ix, position) 595 if self._current_file_obj is not None: 596 self._current_file_obj.seek(position)
Seek to the beginning of the logs stream.
599 def flush(self) -> None: 600 """ 601 Flush any open subfiles. 602 """ 603 for subfile_index, subfile_object in self.subfile_objects.items(): 604 if not subfile_object.closed: 605 try: 606 subfile_object.flush() 607 except Exception: 608 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 609 610 if self.redirect_streams: 611 try: 612 sys.stdout.flush() 613 except BrokenPipeError: 614 pass 615 except Exception: 616 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 617 try: 618 sys.stderr.flush() 619 except BrokenPipeError: 620 pass 621 except Exception: 622 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
Flush any open subfiles.
625 def start_log_fd_interception(self): 626 """ 627 Start the file descriptor monitoring threads. 628 """ 629 if not self.write_timestamps: 630 return 631 632 self._stdout_interceptor = FileDescriptorInterceptor( 633 sys.stdout.fileno(), 634 self.get_timestamp_prefix_str, 635 ) 636 self._stderr_interceptor = FileDescriptorInterceptor( 637 sys.stderr.fileno(), 638 self.get_timestamp_prefix_str, 639 ) 640 641 self._stdout_interceptor_thread = Thread( 642 target = self._stdout_interceptor.start_interception, 643 daemon = True, 644 ) 645 self._stderr_interceptor_thread = Thread( 646 target = self._stderr_interceptor.start_interception, 647 daemon = True, 648 ) 649 self._stdout_interceptor_thread.start() 650 self._stderr_interceptor_thread.start() 651 self._intercepting = True 652 653 if '_interceptor_threads' not in self.__dict__: 654 self._interceptor_threads = [] 655 if '_interceptors' not in self.__dict__: 656 self._interceptors = [] 657 self._interceptor_threads.extend([ 658 self._stdout_interceptor_thread, 659 self._stderr_interceptor_thread, 660 ]) 661 self._interceptors.extend([ 662 self._stdout_interceptor, 663 self._stderr_interceptor, 664 ]) 665 self.stop_log_fd_interception(unused_only=True)
Start the file descriptor monitoring threads.
668 def stop_log_fd_interception(self, unused_only: bool = False): 669 """ 670 Stop the file descriptor monitoring threads. 671 """ 672 if not self.write_timestamps: 673 return 674 675 interceptors = self.__dict__.get('_interceptors', []) 676 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 677 678 end_ix = len(interceptors) if not unused_only else -2 679 680 for interceptor in interceptors[:end_ix]: 681 interceptor.stop_interception() 682 del interceptors[:end_ix] 683 684 for thread in interceptor_threads[:end_ix]: 685 try: 686 thread.join() 687 except Exception: 688 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 689 del interceptor_threads[:end_ix]
Stop the file descriptor monitoring threads.