meerschaum.utils.daemon.RotatingFile
Create a file-like object that manages sub-files under the hood.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Create a file-like object that manages sub-files under the hood. 7""" 8 9import os 10import io 11import re 12import pathlib 13import traceback 14import sys 15import atexit 16from datetime import datetime, timezone, timedelta 17from typing import List, Union, Optional, Tuple 18from meerschaum.config import get_config 19from meerschaum.utils.warnings import warn 20from meerschaum.utils.misc import round_time 21from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor 22from meerschaum.utils.threading import Thread 23import meerschaum as mrsm 24daemon = mrsm.attempt_import('daemon') 25 26class RotatingFile(io.IOBase): 27 """ 28 A `RotatingFile` may be treated like a normal file-like object. 29 Under the hood, however, it will create new sub-files and delete old ones. 30 """ 31 32 SEEK_BACK_ATTEMPTS: int = 5 33 34 def __init__( 35 self, 36 file_path: pathlib.Path, 37 num_files_to_keep: Optional[int] = None, 38 max_file_size: Optional[int] = None, 39 redirect_streams: bool = False, 40 write_timestamps: bool = False, 41 timestamp_format: Optional[str] = None, 42 ): 43 """ 44 Create a file-like object which manages other files. 45 46 Parameters 47 ---------- 48 num_files_to_keep: int, default None 49 How many sub-files to keep at any given time. 50 Defaults to the configured value (5). 51 52 max_file_size: int, default None 53 How large in bytes each sub-file can grow before another file is created. 54 Note that this is not a hard limit but rather a threshold 55 which may be slightly exceeded. 56 Defaults to the configured value (100_000). 57 58 redirect_streams: bool, default False 59 If `True`, redirect previous file streams when opening a new file descriptor. 60 61 NOTE: Only set this to `True` if you are entering into a daemon context. 62 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 63 64 write_timestamps: bool, default False 65 If `True`, prepend the current UTC timestamp to each line of the file. 66 67 timestamp_format: str, default None 68 If `write_timestamps` is `True`, use this format for the timestamps. 69 Defaults to `'%Y-%m-%d %H:%M'`. 70 """ 71 self.file_path = pathlib.Path(file_path) 72 if num_files_to_keep is None: 73 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 74 if max_file_size is None: 75 max_file_size = get_config('jobs', 'logs', 'max_file_size') 76 if timestamp_format is None: 77 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 78 if num_files_to_keep < 2: 79 raise ValueError("At least 2 files must be kept.") 80 if max_file_size < 1: 81 raise ValueError("Subfiles must contain at least one byte.") 82 83 self.num_files_to_keep = num_files_to_keep 84 self.max_file_size = max_file_size 85 self.redirect_streams = redirect_streams 86 self.write_timestamps = write_timestamps 87 self.timestamp_format = timestamp_format 88 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$') 89 90 ### When subfiles are opened, map from their index to the file objects. 91 self.subfile_objects = {} 92 self._redirected_subfile_objects = {} 93 self._current_file_obj = None 94 self._previous_file_obj = None 95 96 ### When reading, keep track of the file index and position. 97 self._cursor: Tuple[int, int] = (0, 0) 98 99 ### Don't forget to close any stray files. 100 atexit.register(self.close) 101 102 103 def fileno(self): 104 """ 105 Return the file descriptor for the latest subfile. 106 """ 107 self.refresh_files(start_interception=False) 108 return self._current_file_obj.fileno() 109 110 111 def get_latest_subfile_path(self) -> pathlib.Path: 112 """ 113 Return the path for the latest subfile to which to write into. 114 """ 115 return self.get_subfile_path_from_index( 116 self.get_latest_subfile_index() 117 ) 118 119 120 def get_remaining_subfile_size(self, subfile_index: int) -> int: 121 """ 122 Return the remaining buffer size for a subfile. 123 124 Parameters 125 --------- 126 subfile_index: int 127 The index of the subfile to be checked. 128 129 Returns 130 ------- 131 The remaining size in bytes. 132 """ 133 subfile_path = self.get_subfile_path_from_index(subfile_index) 134 if not subfile_path.exists(): 135 return self.max_file_size 136 137 return self.max_file_size - os.path.getsize(subfile_path) 138 139 140 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 141 """ 142 Return whether a given subfile is too large. 143 144 Parameters 145 ---------- 146 subfile_index: int 147 The index of the subfile to be checked. 148 149 potential_new_len: int, default 0 150 The length of a potential write of new data. 151 152 Returns 153 ------- 154 A bool indicating the subfile is or will be too large. 155 """ 156 subfile_path = self.get_subfile_path_from_index(subfile_index) 157 if not subfile_path.exists(): 158 return False 159 160 self.flush() 161 162 return ( 163 (os.path.getsize(subfile_path) + potential_new_len) 164 >= 165 self.max_file_size 166 ) 167 168 169 def get_latest_subfile_index(self) -> int: 170 """ 171 Return the latest existing subfile index. 172 If no index may be found, return -1. 173 """ 174 existing_subfile_paths = self.get_existing_subfile_paths() 175 latest_index = ( 176 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 177 if existing_subfile_paths 178 else 0 179 ) 180 return latest_index 181 182 183 def get_index_from_subfile_name(self, subfile_name: str) -> int: 184 """ 185 Return the index from a given subfile name. 186 If the file name cannot be parsed, return -1. 187 """ 188 try: 189 return int(subfile_name.replace(self.file_path.name + '.', '')) 190 except Exception as e: 191 return -1 192 193 194 def get_subfile_name_from_index(self, subfile_index: int) -> str: 195 """ 196 Return the subfile name from the given index. 197 """ 198 return f'{self.file_path.name}.{subfile_index}' 199 200 201 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 202 """ 203 Return the subfile's path from its index. 204 """ 205 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index) 206 207 208 def get_existing_subfile_indices(self) -> List[int]: 209 """ 210 Return of list of subfile indices which exist on disk. 211 """ 212 existing_subfile_paths = self.get_existing_subfile_paths() 213 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths] 214 215 216 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 217 """ 218 Return a list of file paths that match the input filename pattern. 219 """ 220 if not self.file_path.parent.exists(): 221 return [] 222 223 subfile_names_indices = sorted( 224 [ 225 (file_name, self.get_index_from_subfile_name(file_name)) 226 for file_name in os.listdir(self.file_path.parent) 227 if ( 228 file_name.startswith(self.file_path.name) 229 and re.match(self.subfile_regex_pattern, file_name) 230 ) 231 ], 232 key=lambda x: x[1], 233 ) 234 return [ 235 (self.file_path.parent / file_name) 236 for file_name, _ in subfile_names_indices 237 ] 238 239 240 def refresh_files( 241 self, 242 potential_new_len: int = 0, 243 start_interception: bool = False, 244 ) -> '_io.TextUIWrapper': 245 """ 246 Check the state of the subfiles. 247 If the latest subfile is too large, create a new file and delete old ones. 248 249 Parameters 250 ---------- 251 potential_new_len: int, default 0 252 253 start_interception: bool, default False 254 If `True`, kick off the file interception threads. 255 """ 256 self.flush() 257 258 latest_subfile_index = self.get_latest_subfile_index() 259 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 260 261 ### First run with existing log files: open the most recent log file. 262 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 263 264 ### Sometimes a new file is created but output doesn't switch over. 265 lost_latest_handle = ( 266 self._current_file_obj is not None 267 and 268 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 269 ) 270 if is_first_run_with_logs or lost_latest_handle: 271 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 272 if self.redirect_streams: 273 try: 274 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 275 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 276 except OSError as e: 277 warn( 278 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 279 ) 280 if start_interception and self.write_timestamps: 281 self.start_log_fd_interception() 282 283 create_new_file = ( 284 (latest_subfile_index == -1) 285 or 286 self._current_file_obj is None 287 or 288 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 289 ) 290 if create_new_file: 291 old_subfile_index = latest_subfile_index 292 new_subfile_index = old_subfile_index + 1 293 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 294 self._previous_file_obj = self._current_file_obj 295 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 296 self.subfile_objects[new_subfile_index] = self._current_file_obj 297 self.flush() 298 299 if self._previous_file_obj is not None: 300 if self.redirect_streams: 301 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 302 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 303 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 304 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 305 self.close(unused_only=True) 306 307 ### Sanity check in case writing somehow fails. 308 if self._previous_file_obj is self._current_file_obj: 309 self._previous_file_obj = None 310 311 self.delete(unused_only=True) 312 313 return self._current_file_obj 314 315 316 def close(self, unused_only: bool = False) -> None: 317 """ 318 Close any open file descriptors. 319 320 Parameters 321 ---------- 322 unused_only: bool, default False 323 If `True`, only close file descriptors not currently in use. 324 """ 325 self.stop_log_fd_interception(unused_only=unused_only) 326 subfile_indices = sorted(self.subfile_objects.keys()) 327 for subfile_index in subfile_indices: 328 subfile_object = self.subfile_objects[subfile_index] 329 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 330 continue 331 try: 332 if not subfile_object.closed: 333 subfile_object.close() 334 except Exception as e: 335 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 336 337 _ = self.subfile_objects.pop(subfile_index, None) 338 if self.redirect_streams: 339 _ = self._redirected_subfile_objects.pop(subfile_index, None) 340 341 if not unused_only: 342 self._previous_file_obj = None 343 self._current_file_obj = None 344 345 346 def get_timestamp_prefix_str(self) -> str: 347 """ 348 Return the current minute prefix string. 349 """ 350 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | ' 351 352 353 def write(self, data: str) -> None: 354 """ 355 Write the given text into the latest subfile. 356 If the subfile will be too large, create a new subfile. 357 If too many subfiles exist at once, the oldest one will be deleted. 358 359 NOTE: This will not split data across multiple files. 360 As such, if data is larger than max_file_size, then the corresponding subfile 361 may exceed this limit. 362 """ 363 try: 364 self.file_path.parent.mkdir(exist_ok=True, parents=True) 365 if isinstance(data, bytes): 366 data = data.decode('utf-8') 367 368 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 369 suffix_str = "\n" if self.write_timestamps else "" 370 self.refresh_files( 371 potential_new_len = len(prefix_str + data + suffix_str), 372 start_interception = self.write_timestamps, 373 ) 374 try: 375 if prefix_str: 376 self._current_file_obj.write(prefix_str) 377 self._current_file_obj.write(data) 378 if suffix_str: 379 self._current_file_obj.write(suffix_str) 380 except BrokenPipeError: 381 warn("BrokenPipeError encountered. The daemon may have been terminated.") 382 return 383 except Exception as e: 384 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 385 self.flush() 386 self.delete(unused_only=True) 387 except Exception as e: 388 warn(f"Unexpected error in RotatingFile.write: {e}") 389 390 391 def delete(self, unused_only: bool = False) -> None: 392 """ 393 Delete old subfiles. 394 395 Parameters 396 ---------- 397 unused_only: bool, default False 398 If `True`, only delete subfiles which are no longer needed. 399 """ 400 existing_subfile_paths = self.get_existing_subfile_paths() 401 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 402 return 403 404 self.flush() 405 self.close(unused_only=unused_only) 406 407 end_ix = ( 408 (-1 * self.num_files_to_keep) 409 if unused_only 410 else len(existing_subfile_paths) 411 ) 412 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 413 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 414 subfile_object = self.subfile_objects.get(subfile_index, None) 415 416 try: 417 subfile_path_to_delete.unlink() 418 except Exception as e: 419 warn( 420 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 421 + f"{traceback.format_exc()}" 422 ) 423 424 425 def read(self, *args, **kwargs) -> str: 426 """ 427 Read the contents of the existing subfiles. 428 """ 429 existing_subfile_indices = [ 430 self.get_index_from_subfile_name(subfile_path.name) 431 for subfile_path in self.get_existing_subfile_paths() 432 ] 433 paths_to_read = [ 434 self.get_subfile_path_from_index(subfile_index) 435 for subfile_index in existing_subfile_indices 436 if subfile_index >= self._cursor[0] 437 ] 438 buffer = '' 439 refresh_cursor = True 440 for subfile_path in paths_to_read: 441 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 442 seek_ix = ( 443 self._cursor[1] 444 if subfile_index == self._cursor[0] 445 else 0 446 ) 447 448 if ( 449 subfile_index in self.subfile_objects 450 and 451 subfile_index not in self._redirected_subfile_objects 452 ): 453 subfile_object = self.subfile_objects[subfile_index] 454 for i in range(self.SEEK_BACK_ATTEMPTS): 455 try: 456 subfile_object.seek(max(seek_ix - i, 0)) 457 buffer += subfile_object.read() 458 except UnicodeDecodeError: 459 continue 460 break 461 else: 462 with open(subfile_path, 'r', encoding='utf-8') as f: 463 for i in range(self.SEEK_BACK_ATTEMPTS): 464 try: 465 f.seek(max(seek_ix - i, 0)) 466 buffer += f.read() 467 except UnicodeDecodeError: 468 continue 469 break 470 471 ### Handle the case when no files have yet been opened. 472 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 473 self._cursor = (subfile_index, f.tell()) 474 refresh_cursor = False 475 476 if refresh_cursor: 477 self.refresh_cursor() 478 return buffer 479 480 481 def refresh_cursor(self) -> None: 482 """ 483 Update the cursor to the latest subfile index and file.tell() value. 484 """ 485 self.flush() 486 existing_subfile_paths = self.get_existing_subfile_paths() 487 current_ix = ( 488 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 489 if existing_subfile_paths 490 else 0 491 ) 492 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 493 self._cursor = (current_ix, position) 494 495 496 def readlines(self) -> List[str]: 497 """ 498 Return a list of lines of text. 499 """ 500 existing_subfile_indices = [ 501 self.get_index_from_subfile_name(subfile_path.name) 502 for subfile_path in self.get_existing_subfile_paths() 503 ] 504 paths_to_read = [ 505 self.get_subfile_path_from_index(subfile_index) 506 for subfile_index in existing_subfile_indices 507 if subfile_index >= self._cursor[0] 508 ] 509 510 lines = [] 511 refresh_cursor = True 512 for subfile_path in paths_to_read: 513 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 514 seek_ix = ( 515 self._cursor[1] 516 if subfile_index == self._cursor[0] 517 else 0 518 ) 519 520 if ( 521 subfile_index in self.subfile_objects 522 and 523 subfile_index not in self._redirected_subfile_objects 524 ): 525 subfile_object = self.subfile_objects[subfile_index] 526 for i in range(self.SEEK_BACK_ATTEMPTS): 527 try: 528 subfile_object.seek(max((seek_ix - i), 0)) 529 subfile_lines = subfile_object.readlines() 530 except UnicodeDecodeError: 531 continue 532 break 533 else: 534 with open(subfile_path, 'r', encoding='utf-8') as f: 535 for i in range(self.SEEK_BACK_ATTEMPTS): 536 try: 537 f.seek(max(seek_ix - i, 0)) 538 subfile_lines = f.readlines() 539 except UnicodeDecodeError: 540 continue 541 break 542 543 ### Handle the case when no files have yet been opened. 544 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 545 self._cursor = (subfile_index, f.tell()) 546 refresh_cursor = False 547 548 ### Sometimes a line may span multiple files. 549 if lines and subfile_lines and not lines[-1].endswith('\n'): 550 lines[-1] += subfile_lines[0] 551 new_lines = subfile_lines[1:] 552 else: 553 new_lines = subfile_lines 554 lines.extend(new_lines) 555 556 if refresh_cursor: 557 self.refresh_cursor() 558 return lines 559 560 561 def seekable(self) -> bool: 562 return True 563 564 565 def seek(self, position: int) -> None: 566 """ 567 Seek to the beginning of the logs stream. 568 """ 569 existing_subfile_indices = self.get_existing_subfile_indices() 570 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 571 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 572 if position == 0: 573 self._cursor = (min_ix, 0) 574 return 575 576 self._cursor = (max_ix, position) 577 if self._current_file_obj is not None: 578 self._current_file_obj.seek(position) 579 580 581 def flush(self) -> None: 582 """ 583 Flush any open subfiles. 584 """ 585 for subfile_index, subfile_object in self.subfile_objects.items(): 586 if not subfile_object.closed: 587 try: 588 subfile_object.flush() 589 except Exception as e: 590 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 591 if self.redirect_streams: 592 try: 593 sys.stdout.flush() 594 except BrokenPipeError: 595 pass 596 except Exception as e: 597 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 598 try: 599 sys.stderr.flush() 600 except BrokenPipeError: 601 pass 602 except Exception as e: 603 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}") 604 605 606 def start_log_fd_interception(self): 607 """ 608 Start the file descriptor monitoring threads. 609 """ 610 if not self.write_timestamps: 611 return 612 613 self._stdout_interceptor = FileDescriptorInterceptor( 614 sys.stdout.fileno(), 615 self.get_timestamp_prefix_str, 616 ) 617 self._stderr_interceptor = FileDescriptorInterceptor( 618 sys.stderr.fileno(), 619 self.get_timestamp_prefix_str, 620 ) 621 622 self._stdout_interceptor_thread = Thread( 623 target = self._stdout_interceptor.start_interception, 624 daemon = True, 625 ) 626 self._stderr_interceptor_thread = Thread( 627 target = self._stderr_interceptor.start_interception, 628 daemon = True, 629 ) 630 self._stdout_interceptor_thread.start() 631 self._stderr_interceptor_thread.start() 632 self._intercepting = True 633 634 if '_interceptor_threads' not in self.__dict__: 635 self._interceptor_threads = [] 636 if '_interceptors' not in self.__dict__: 637 self._interceptors = [] 638 self._interceptor_threads.extend([ 639 self._stdout_interceptor_thread, 640 self._stderr_interceptor_thread, 641 ]) 642 self._interceptors.extend([ 643 self._stdout_interceptor, 644 self._stderr_interceptor, 645 ]) 646 self.stop_log_fd_interception(unused_only=True) 647 648 649 def stop_log_fd_interception(self, unused_only: bool = False): 650 """ 651 Stop the file descriptor monitoring threads. 652 """ 653 if not self.write_timestamps: 654 return 655 656 interceptors = self.__dict__.get('_interceptors', []) 657 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 658 659 end_ix = len(interceptors) if not unused_only else -2 660 661 for interceptor in interceptors[:end_ix]: 662 interceptor.stop_interception() 663 del interceptors[:end_ix] 664 665 for thread in interceptor_threads[:end_ix]: 666 try: 667 thread.join() 668 except Exception as e: 669 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 670 del interceptor_threads[:end_ix] 671 672 673 def __repr__(self) -> str: 674 """ 675 Return basic info for this `RotatingFile`. 676 """ 677 return ( 678 "RotatingFile(" 679 + f"'{self.file_path.as_posix()}', " 680 + f"num_files_to_keep={self.num_files_to_keep}, " 681 + f"max_file_size={self.max_file_size})" 682 )
27class RotatingFile(io.IOBase): 28 """ 29 A `RotatingFile` may be treated like a normal file-like object. 30 Under the hood, however, it will create new sub-files and delete old ones. 31 """ 32 33 SEEK_BACK_ATTEMPTS: int = 5 34 35 def __init__( 36 self, 37 file_path: pathlib.Path, 38 num_files_to_keep: Optional[int] = None, 39 max_file_size: Optional[int] = None, 40 redirect_streams: bool = False, 41 write_timestamps: bool = False, 42 timestamp_format: Optional[str] = None, 43 ): 44 """ 45 Create a file-like object which manages other files. 46 47 Parameters 48 ---------- 49 num_files_to_keep: int, default None 50 How many sub-files to keep at any given time. 51 Defaults to the configured value (5). 52 53 max_file_size: int, default None 54 How large in bytes each sub-file can grow before another file is created. 55 Note that this is not a hard limit but rather a threshold 56 which may be slightly exceeded. 57 Defaults to the configured value (100_000). 58 59 redirect_streams: bool, default False 60 If `True`, redirect previous file streams when opening a new file descriptor. 61 62 NOTE: Only set this to `True` if you are entering into a daemon context. 63 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 64 65 write_timestamps: bool, default False 66 If `True`, prepend the current UTC timestamp to each line of the file. 67 68 timestamp_format: str, default None 69 If `write_timestamps` is `True`, use this format for the timestamps. 70 Defaults to `'%Y-%m-%d %H:%M'`. 71 """ 72 self.file_path = pathlib.Path(file_path) 73 if num_files_to_keep is None: 74 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 75 if max_file_size is None: 76 max_file_size = get_config('jobs', 'logs', 'max_file_size') 77 if timestamp_format is None: 78 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 79 if num_files_to_keep < 2: 80 raise ValueError("At least 2 files must be kept.") 81 if max_file_size < 1: 82 raise ValueError("Subfiles must contain at least one byte.") 83 84 self.num_files_to_keep = num_files_to_keep 85 self.max_file_size = max_file_size 86 self.redirect_streams = redirect_streams 87 self.write_timestamps = write_timestamps 88 self.timestamp_format = timestamp_format 89 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$') 90 91 ### When subfiles are opened, map from their index to the file objects. 92 self.subfile_objects = {} 93 self._redirected_subfile_objects = {} 94 self._current_file_obj = None 95 self._previous_file_obj = None 96 97 ### When reading, keep track of the file index and position. 98 self._cursor: Tuple[int, int] = (0, 0) 99 100 ### Don't forget to close any stray files. 101 atexit.register(self.close) 102 103 104 def fileno(self): 105 """ 106 Return the file descriptor for the latest subfile. 107 """ 108 self.refresh_files(start_interception=False) 109 return self._current_file_obj.fileno() 110 111 112 def get_latest_subfile_path(self) -> pathlib.Path: 113 """ 114 Return the path for the latest subfile to which to write into. 115 """ 116 return self.get_subfile_path_from_index( 117 self.get_latest_subfile_index() 118 ) 119 120 121 def get_remaining_subfile_size(self, subfile_index: int) -> int: 122 """ 123 Return the remaining buffer size for a subfile. 124 125 Parameters 126 --------- 127 subfile_index: int 128 The index of the subfile to be checked. 129 130 Returns 131 ------- 132 The remaining size in bytes. 133 """ 134 subfile_path = self.get_subfile_path_from_index(subfile_index) 135 if not subfile_path.exists(): 136 return self.max_file_size 137 138 return self.max_file_size - os.path.getsize(subfile_path) 139 140 141 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 142 """ 143 Return whether a given subfile is too large. 144 145 Parameters 146 ---------- 147 subfile_index: int 148 The index of the subfile to be checked. 149 150 potential_new_len: int, default 0 151 The length of a potential write of new data. 152 153 Returns 154 ------- 155 A bool indicating the subfile is or will be too large. 156 """ 157 subfile_path = self.get_subfile_path_from_index(subfile_index) 158 if not subfile_path.exists(): 159 return False 160 161 self.flush() 162 163 return ( 164 (os.path.getsize(subfile_path) + potential_new_len) 165 >= 166 self.max_file_size 167 ) 168 169 170 def get_latest_subfile_index(self) -> int: 171 """ 172 Return the latest existing subfile index. 173 If no index may be found, return -1. 174 """ 175 existing_subfile_paths = self.get_existing_subfile_paths() 176 latest_index = ( 177 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 178 if existing_subfile_paths 179 else 0 180 ) 181 return latest_index 182 183 184 def get_index_from_subfile_name(self, subfile_name: str) -> int: 185 """ 186 Return the index from a given subfile name. 187 If the file name cannot be parsed, return -1. 188 """ 189 try: 190 return int(subfile_name.replace(self.file_path.name + '.', '')) 191 except Exception as e: 192 return -1 193 194 195 def get_subfile_name_from_index(self, subfile_index: int) -> str: 196 """ 197 Return the subfile name from the given index. 198 """ 199 return f'{self.file_path.name}.{subfile_index}' 200 201 202 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 203 """ 204 Return the subfile's path from its index. 205 """ 206 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index) 207 208 209 def get_existing_subfile_indices(self) -> List[int]: 210 """ 211 Return of list of subfile indices which exist on disk. 212 """ 213 existing_subfile_paths = self.get_existing_subfile_paths() 214 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths] 215 216 217 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 218 """ 219 Return a list of file paths that match the input filename pattern. 220 """ 221 if not self.file_path.parent.exists(): 222 return [] 223 224 subfile_names_indices = sorted( 225 [ 226 (file_name, self.get_index_from_subfile_name(file_name)) 227 for file_name in os.listdir(self.file_path.parent) 228 if ( 229 file_name.startswith(self.file_path.name) 230 and re.match(self.subfile_regex_pattern, file_name) 231 ) 232 ], 233 key=lambda x: x[1], 234 ) 235 return [ 236 (self.file_path.parent / file_name) 237 for file_name, _ in subfile_names_indices 238 ] 239 240 241 def refresh_files( 242 self, 243 potential_new_len: int = 0, 244 start_interception: bool = False, 245 ) -> '_io.TextUIWrapper': 246 """ 247 Check the state of the subfiles. 248 If the latest subfile is too large, create a new file and delete old ones. 249 250 Parameters 251 ---------- 252 potential_new_len: int, default 0 253 254 start_interception: bool, default False 255 If `True`, kick off the file interception threads. 256 """ 257 self.flush() 258 259 latest_subfile_index = self.get_latest_subfile_index() 260 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 261 262 ### First run with existing log files: open the most recent log file. 263 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 264 265 ### Sometimes a new file is created but output doesn't switch over. 266 lost_latest_handle = ( 267 self._current_file_obj is not None 268 and 269 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 270 ) 271 if is_first_run_with_logs or lost_latest_handle: 272 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 273 if self.redirect_streams: 274 try: 275 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 276 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 277 except OSError as e: 278 warn( 279 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 280 ) 281 if start_interception and self.write_timestamps: 282 self.start_log_fd_interception() 283 284 create_new_file = ( 285 (latest_subfile_index == -1) 286 or 287 self._current_file_obj is None 288 or 289 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 290 ) 291 if create_new_file: 292 old_subfile_index = latest_subfile_index 293 new_subfile_index = old_subfile_index + 1 294 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 295 self._previous_file_obj = self._current_file_obj 296 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 297 self.subfile_objects[new_subfile_index] = self._current_file_obj 298 self.flush() 299 300 if self._previous_file_obj is not None: 301 if self.redirect_streams: 302 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 303 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 304 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 305 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 306 self.close(unused_only=True) 307 308 ### Sanity check in case writing somehow fails. 309 if self._previous_file_obj is self._current_file_obj: 310 self._previous_file_obj = None 311 312 self.delete(unused_only=True) 313 314 return self._current_file_obj 315 316 317 def close(self, unused_only: bool = False) -> None: 318 """ 319 Close any open file descriptors. 320 321 Parameters 322 ---------- 323 unused_only: bool, default False 324 If `True`, only close file descriptors not currently in use. 325 """ 326 self.stop_log_fd_interception(unused_only=unused_only) 327 subfile_indices = sorted(self.subfile_objects.keys()) 328 for subfile_index in subfile_indices: 329 subfile_object = self.subfile_objects[subfile_index] 330 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 331 continue 332 try: 333 if not subfile_object.closed: 334 subfile_object.close() 335 except Exception as e: 336 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 337 338 _ = self.subfile_objects.pop(subfile_index, None) 339 if self.redirect_streams: 340 _ = self._redirected_subfile_objects.pop(subfile_index, None) 341 342 if not unused_only: 343 self._previous_file_obj = None 344 self._current_file_obj = None 345 346 347 def get_timestamp_prefix_str(self) -> str: 348 """ 349 Return the current minute prefix string. 350 """ 351 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | ' 352 353 354 def write(self, data: str) -> None: 355 """ 356 Write the given text into the latest subfile. 357 If the subfile will be too large, create a new subfile. 358 If too many subfiles exist at once, the oldest one will be deleted. 359 360 NOTE: This will not split data across multiple files. 361 As such, if data is larger than max_file_size, then the corresponding subfile 362 may exceed this limit. 363 """ 364 try: 365 self.file_path.parent.mkdir(exist_ok=True, parents=True) 366 if isinstance(data, bytes): 367 data = data.decode('utf-8') 368 369 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 370 suffix_str = "\n" if self.write_timestamps else "" 371 self.refresh_files( 372 potential_new_len = len(prefix_str + data + suffix_str), 373 start_interception = self.write_timestamps, 374 ) 375 try: 376 if prefix_str: 377 self._current_file_obj.write(prefix_str) 378 self._current_file_obj.write(data) 379 if suffix_str: 380 self._current_file_obj.write(suffix_str) 381 except BrokenPipeError: 382 warn("BrokenPipeError encountered. The daemon may have been terminated.") 383 return 384 except Exception as e: 385 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 386 self.flush() 387 self.delete(unused_only=True) 388 except Exception as e: 389 warn(f"Unexpected error in RotatingFile.write: {e}") 390 391 392 def delete(self, unused_only: bool = False) -> None: 393 """ 394 Delete old subfiles. 395 396 Parameters 397 ---------- 398 unused_only: bool, default False 399 If `True`, only delete subfiles which are no longer needed. 400 """ 401 existing_subfile_paths = self.get_existing_subfile_paths() 402 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 403 return 404 405 self.flush() 406 self.close(unused_only=unused_only) 407 408 end_ix = ( 409 (-1 * self.num_files_to_keep) 410 if unused_only 411 else len(existing_subfile_paths) 412 ) 413 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 414 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 415 subfile_object = self.subfile_objects.get(subfile_index, None) 416 417 try: 418 subfile_path_to_delete.unlink() 419 except Exception as e: 420 warn( 421 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 422 + f"{traceback.format_exc()}" 423 ) 424 425 426 def read(self, *args, **kwargs) -> str: 427 """ 428 Read the contents of the existing subfiles. 429 """ 430 existing_subfile_indices = [ 431 self.get_index_from_subfile_name(subfile_path.name) 432 for subfile_path in self.get_existing_subfile_paths() 433 ] 434 paths_to_read = [ 435 self.get_subfile_path_from_index(subfile_index) 436 for subfile_index in existing_subfile_indices 437 if subfile_index >= self._cursor[0] 438 ] 439 buffer = '' 440 refresh_cursor = True 441 for subfile_path in paths_to_read: 442 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 443 seek_ix = ( 444 self._cursor[1] 445 if subfile_index == self._cursor[0] 446 else 0 447 ) 448 449 if ( 450 subfile_index in self.subfile_objects 451 and 452 subfile_index not in self._redirected_subfile_objects 453 ): 454 subfile_object = self.subfile_objects[subfile_index] 455 for i in range(self.SEEK_BACK_ATTEMPTS): 456 try: 457 subfile_object.seek(max(seek_ix - i, 0)) 458 buffer += subfile_object.read() 459 except UnicodeDecodeError: 460 continue 461 break 462 else: 463 with open(subfile_path, 'r', encoding='utf-8') as f: 464 for i in range(self.SEEK_BACK_ATTEMPTS): 465 try: 466 f.seek(max(seek_ix - i, 0)) 467 buffer += f.read() 468 except UnicodeDecodeError: 469 continue 470 break 471 472 ### Handle the case when no files have yet been opened. 473 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 474 self._cursor = (subfile_index, f.tell()) 475 refresh_cursor = False 476 477 if refresh_cursor: 478 self.refresh_cursor() 479 return buffer 480 481 482 def refresh_cursor(self) -> None: 483 """ 484 Update the cursor to the latest subfile index and file.tell() value. 485 """ 486 self.flush() 487 existing_subfile_paths = self.get_existing_subfile_paths() 488 current_ix = ( 489 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 490 if existing_subfile_paths 491 else 0 492 ) 493 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 494 self._cursor = (current_ix, position) 495 496 497 def readlines(self) -> List[str]: 498 """ 499 Return a list of lines of text. 500 """ 501 existing_subfile_indices = [ 502 self.get_index_from_subfile_name(subfile_path.name) 503 for subfile_path in self.get_existing_subfile_paths() 504 ] 505 paths_to_read = [ 506 self.get_subfile_path_from_index(subfile_index) 507 for subfile_index in existing_subfile_indices 508 if subfile_index >= self._cursor[0] 509 ] 510 511 lines = [] 512 refresh_cursor = True 513 for subfile_path in paths_to_read: 514 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 515 seek_ix = ( 516 self._cursor[1] 517 if subfile_index == self._cursor[0] 518 else 0 519 ) 520 521 if ( 522 subfile_index in self.subfile_objects 523 and 524 subfile_index not in self._redirected_subfile_objects 525 ): 526 subfile_object = self.subfile_objects[subfile_index] 527 for i in range(self.SEEK_BACK_ATTEMPTS): 528 try: 529 subfile_object.seek(max((seek_ix - i), 0)) 530 subfile_lines = subfile_object.readlines() 531 except UnicodeDecodeError: 532 continue 533 break 534 else: 535 with open(subfile_path, 'r', encoding='utf-8') as f: 536 for i in range(self.SEEK_BACK_ATTEMPTS): 537 try: 538 f.seek(max(seek_ix - i, 0)) 539 subfile_lines = f.readlines() 540 except UnicodeDecodeError: 541 continue 542 break 543 544 ### Handle the case when no files have yet been opened. 545 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 546 self._cursor = (subfile_index, f.tell()) 547 refresh_cursor = False 548 549 ### Sometimes a line may span multiple files. 550 if lines and subfile_lines and not lines[-1].endswith('\n'): 551 lines[-1] += subfile_lines[0] 552 new_lines = subfile_lines[1:] 553 else: 554 new_lines = subfile_lines 555 lines.extend(new_lines) 556 557 if refresh_cursor: 558 self.refresh_cursor() 559 return lines 560 561 562 def seekable(self) -> bool: 563 return True 564 565 566 def seek(self, position: int) -> None: 567 """ 568 Seek to the beginning of the logs stream. 569 """ 570 existing_subfile_indices = self.get_existing_subfile_indices() 571 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 572 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 573 if position == 0: 574 self._cursor = (min_ix, 0) 575 return 576 577 self._cursor = (max_ix, position) 578 if self._current_file_obj is not None: 579 self._current_file_obj.seek(position) 580 581 582 def flush(self) -> None: 583 """ 584 Flush any open subfiles. 585 """ 586 for subfile_index, subfile_object in self.subfile_objects.items(): 587 if not subfile_object.closed: 588 try: 589 subfile_object.flush() 590 except Exception as e: 591 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 592 if self.redirect_streams: 593 try: 594 sys.stdout.flush() 595 except BrokenPipeError: 596 pass 597 except Exception as e: 598 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 599 try: 600 sys.stderr.flush() 601 except BrokenPipeError: 602 pass 603 except Exception as e: 604 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}") 605 606 607 def start_log_fd_interception(self): 608 """ 609 Start the file descriptor monitoring threads. 610 """ 611 if not self.write_timestamps: 612 return 613 614 self._stdout_interceptor = FileDescriptorInterceptor( 615 sys.stdout.fileno(), 616 self.get_timestamp_prefix_str, 617 ) 618 self._stderr_interceptor = FileDescriptorInterceptor( 619 sys.stderr.fileno(), 620 self.get_timestamp_prefix_str, 621 ) 622 623 self._stdout_interceptor_thread = Thread( 624 target = self._stdout_interceptor.start_interception, 625 daemon = True, 626 ) 627 self._stderr_interceptor_thread = Thread( 628 target = self._stderr_interceptor.start_interception, 629 daemon = True, 630 ) 631 self._stdout_interceptor_thread.start() 632 self._stderr_interceptor_thread.start() 633 self._intercepting = True 634 635 if '_interceptor_threads' not in self.__dict__: 636 self._interceptor_threads = [] 637 if '_interceptors' not in self.__dict__: 638 self._interceptors = [] 639 self._interceptor_threads.extend([ 640 self._stdout_interceptor_thread, 641 self._stderr_interceptor_thread, 642 ]) 643 self._interceptors.extend([ 644 self._stdout_interceptor, 645 self._stderr_interceptor, 646 ]) 647 self.stop_log_fd_interception(unused_only=True) 648 649 650 def stop_log_fd_interception(self, unused_only: bool = False): 651 """ 652 Stop the file descriptor monitoring threads. 653 """ 654 if not self.write_timestamps: 655 return 656 657 interceptors = self.__dict__.get('_interceptors', []) 658 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 659 660 end_ix = len(interceptors) if not unused_only else -2 661 662 for interceptor in interceptors[:end_ix]: 663 interceptor.stop_interception() 664 del interceptors[:end_ix] 665 666 for thread in interceptor_threads[:end_ix]: 667 try: 668 thread.join() 669 except Exception as e: 670 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 671 del interceptor_threads[:end_ix] 672 673 674 def __repr__(self) -> str: 675 """ 676 Return basic info for this `RotatingFile`. 677 """ 678 return ( 679 "RotatingFile(" 680 + f"'{self.file_path.as_posix()}', " 681 + f"num_files_to_keep={self.num_files_to_keep}, " 682 + f"max_file_size={self.max_file_size})" 683 )
A RotatingFile
may be treated like a normal file-like object.
Under the hood, however, it will create new sub-files and delete old ones.
35 def __init__( 36 self, 37 file_path: pathlib.Path, 38 num_files_to_keep: Optional[int] = None, 39 max_file_size: Optional[int] = None, 40 redirect_streams: bool = False, 41 write_timestamps: bool = False, 42 timestamp_format: Optional[str] = None, 43 ): 44 """ 45 Create a file-like object which manages other files. 46 47 Parameters 48 ---------- 49 num_files_to_keep: int, default None 50 How many sub-files to keep at any given time. 51 Defaults to the configured value (5). 52 53 max_file_size: int, default None 54 How large in bytes each sub-file can grow before another file is created. 55 Note that this is not a hard limit but rather a threshold 56 which may be slightly exceeded. 57 Defaults to the configured value (100_000). 58 59 redirect_streams: bool, default False 60 If `True`, redirect previous file streams when opening a new file descriptor. 61 62 NOTE: Only set this to `True` if you are entering into a daemon context. 63 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 64 65 write_timestamps: bool, default False 66 If `True`, prepend the current UTC timestamp to each line of the file. 67 68 timestamp_format: str, default None 69 If `write_timestamps` is `True`, use this format for the timestamps. 70 Defaults to `'%Y-%m-%d %H:%M'`. 71 """ 72 self.file_path = pathlib.Path(file_path) 73 if num_files_to_keep is None: 74 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 75 if max_file_size is None: 76 max_file_size = get_config('jobs', 'logs', 'max_file_size') 77 if timestamp_format is None: 78 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 79 if num_files_to_keep < 2: 80 raise ValueError("At least 2 files must be kept.") 81 if max_file_size < 1: 82 raise ValueError("Subfiles must contain at least one byte.") 83 84 self.num_files_to_keep = num_files_to_keep 85 self.max_file_size = max_file_size 86 self.redirect_streams = redirect_streams 87 self.write_timestamps = write_timestamps 88 self.timestamp_format = timestamp_format 89 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$') 90 91 ### When subfiles are opened, map from their index to the file objects. 92 self.subfile_objects = {} 93 self._redirected_subfile_objects = {} 94 self._current_file_obj = None 95 self._previous_file_obj = None 96 97 ### When reading, keep track of the file index and position. 98 self._cursor: Tuple[int, int] = (0, 0) 99 100 ### Don't forget to close any stray files. 101 atexit.register(self.close)
Create a file-like object which manages other files.
Parameters
- num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
- max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
redirect_streams (bool, default False): If
True
, redirect previous file streams when opening a new file descriptor.NOTE: Only set this to
True
if you are entering into a daemon context. Doing so will redirectsys.stdout
andsys.stderr
into the log files.- write_timestamps (bool, default False):
If
True
, prepend the current UTC timestamp to each line of the file. - timestamp_format (str, default None):
If
write_timestamps
isTrue
, use this format for the timestamps. Defaults to'%Y-%m-%d %H:%M'
.
104 def fileno(self): 105 """ 106 Return the file descriptor for the latest subfile. 107 """ 108 self.refresh_files(start_interception=False) 109 return self._current_file_obj.fileno()
Return the file descriptor for the latest subfile.
112 def get_latest_subfile_path(self) -> pathlib.Path: 113 """ 114 Return the path for the latest subfile to which to write into. 115 """ 116 return self.get_subfile_path_from_index( 117 self.get_latest_subfile_index() 118 )
Return the path for the latest subfile to which to write into.
121 def get_remaining_subfile_size(self, subfile_index: int) -> int: 122 """ 123 Return the remaining buffer size for a subfile. 124 125 Parameters 126 --------- 127 subfile_index: int 128 The index of the subfile to be checked. 129 130 Returns 131 ------- 132 The remaining size in bytes. 133 """ 134 subfile_path = self.get_subfile_path_from_index(subfile_index) 135 if not subfile_path.exists(): 136 return self.max_file_size 137 138 return self.max_file_size - os.path.getsize(subfile_path)
Return the remaining buffer size for a subfile.
Parameters
- subfile_index (int): The index of the subfile to be checked.
Returns
- The remaining size in bytes.
141 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 142 """ 143 Return whether a given subfile is too large. 144 145 Parameters 146 ---------- 147 subfile_index: int 148 The index of the subfile to be checked. 149 150 potential_new_len: int, default 0 151 The length of a potential write of new data. 152 153 Returns 154 ------- 155 A bool indicating the subfile is or will be too large. 156 """ 157 subfile_path = self.get_subfile_path_from_index(subfile_index) 158 if not subfile_path.exists(): 159 return False 160 161 self.flush() 162 163 return ( 164 (os.path.getsize(subfile_path) + potential_new_len) 165 >= 166 self.max_file_size 167 )
Return whether a given subfile is too large.
Parameters
- subfile_index (int): The index of the subfile to be checked.
- potential_new_len (int, default 0): The length of a potential write of new data.
Returns
- A bool indicating the subfile is or will be too large.
170 def get_latest_subfile_index(self) -> int: 171 """ 172 Return the latest existing subfile index. 173 If no index may be found, return -1. 174 """ 175 existing_subfile_paths = self.get_existing_subfile_paths() 176 latest_index = ( 177 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 178 if existing_subfile_paths 179 else 0 180 ) 181 return latest_index
Return the latest existing subfile index. If no index may be found, return -1.
184 def get_index_from_subfile_name(self, subfile_name: str) -> int: 185 """ 186 Return the index from a given subfile name. 187 If the file name cannot be parsed, return -1. 188 """ 189 try: 190 return int(subfile_name.replace(self.file_path.name + '.', '')) 191 except Exception as e: 192 return -1
Return the index from a given subfile name. If the file name cannot be parsed, return -1.
195 def get_subfile_name_from_index(self, subfile_index: int) -> str: 196 """ 197 Return the subfile name from the given index. 198 """ 199 return f'{self.file_path.name}.{subfile_index}'
Return the subfile name from the given index.
202 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 203 """ 204 Return the subfile's path from its index. 205 """ 206 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
Return the subfile's path from its index.
209 def get_existing_subfile_indices(self) -> List[int]: 210 """ 211 Return of list of subfile indices which exist on disk. 212 """ 213 existing_subfile_paths = self.get_existing_subfile_paths() 214 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
Return of list of subfile indices which exist on disk.
217 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 218 """ 219 Return a list of file paths that match the input filename pattern. 220 """ 221 if not self.file_path.parent.exists(): 222 return [] 223 224 subfile_names_indices = sorted( 225 [ 226 (file_name, self.get_index_from_subfile_name(file_name)) 227 for file_name in os.listdir(self.file_path.parent) 228 if ( 229 file_name.startswith(self.file_path.name) 230 and re.match(self.subfile_regex_pattern, file_name) 231 ) 232 ], 233 key=lambda x: x[1], 234 ) 235 return [ 236 (self.file_path.parent / file_name) 237 for file_name, _ in subfile_names_indices 238 ]
Return a list of file paths that match the input filename pattern.
241 def refresh_files( 242 self, 243 potential_new_len: int = 0, 244 start_interception: bool = False, 245 ) -> '_io.TextUIWrapper': 246 """ 247 Check the state of the subfiles. 248 If the latest subfile is too large, create a new file and delete old ones. 249 250 Parameters 251 ---------- 252 potential_new_len: int, default 0 253 254 start_interception: bool, default False 255 If `True`, kick off the file interception threads. 256 """ 257 self.flush() 258 259 latest_subfile_index = self.get_latest_subfile_index() 260 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 261 262 ### First run with existing log files: open the most recent log file. 263 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 264 265 ### Sometimes a new file is created but output doesn't switch over. 266 lost_latest_handle = ( 267 self._current_file_obj is not None 268 and 269 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 270 ) 271 if is_first_run_with_logs or lost_latest_handle: 272 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 273 if self.redirect_streams: 274 try: 275 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 276 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 277 except OSError as e: 278 warn( 279 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 280 ) 281 if start_interception and self.write_timestamps: 282 self.start_log_fd_interception() 283 284 create_new_file = ( 285 (latest_subfile_index == -1) 286 or 287 self._current_file_obj is None 288 or 289 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 290 ) 291 if create_new_file: 292 old_subfile_index = latest_subfile_index 293 new_subfile_index = old_subfile_index + 1 294 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 295 self._previous_file_obj = self._current_file_obj 296 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 297 self.subfile_objects[new_subfile_index] = self._current_file_obj 298 self.flush() 299 300 if self._previous_file_obj is not None: 301 if self.redirect_streams: 302 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 303 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 304 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 305 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 306 self.close(unused_only=True) 307 308 ### Sanity check in case writing somehow fails. 309 if self._previous_file_obj is self._current_file_obj: 310 self._previous_file_obj = None 311 312 self.delete(unused_only=True) 313 314 return self._current_file_obj
Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.
Parameters
potential_new_len (int, default 0):
start_interception (bool, default False): If
True
, kick off the file interception threads.
317 def close(self, unused_only: bool = False) -> None: 318 """ 319 Close any open file descriptors. 320 321 Parameters 322 ---------- 323 unused_only: bool, default False 324 If `True`, only close file descriptors not currently in use. 325 """ 326 self.stop_log_fd_interception(unused_only=unused_only) 327 subfile_indices = sorted(self.subfile_objects.keys()) 328 for subfile_index in subfile_indices: 329 subfile_object = self.subfile_objects[subfile_index] 330 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 331 continue 332 try: 333 if not subfile_object.closed: 334 subfile_object.close() 335 except Exception as e: 336 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 337 338 _ = self.subfile_objects.pop(subfile_index, None) 339 if self.redirect_streams: 340 _ = self._redirected_subfile_objects.pop(subfile_index, None) 341 342 if not unused_only: 343 self._previous_file_obj = None 344 self._current_file_obj = None
Close any open file descriptors.
Parameters
- unused_only (bool, default False):
If
True
, only close file descriptors not currently in use.
347 def get_timestamp_prefix_str(self) -> str: 348 """ 349 Return the current minute prefix string. 350 """ 351 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
Return the current minute prefix string.
354 def write(self, data: str) -> None: 355 """ 356 Write the given text into the latest subfile. 357 If the subfile will be too large, create a new subfile. 358 If too many subfiles exist at once, the oldest one will be deleted. 359 360 NOTE: This will not split data across multiple files. 361 As such, if data is larger than max_file_size, then the corresponding subfile 362 may exceed this limit. 363 """ 364 try: 365 self.file_path.parent.mkdir(exist_ok=True, parents=True) 366 if isinstance(data, bytes): 367 data = data.decode('utf-8') 368 369 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 370 suffix_str = "\n" if self.write_timestamps else "" 371 self.refresh_files( 372 potential_new_len = len(prefix_str + data + suffix_str), 373 start_interception = self.write_timestamps, 374 ) 375 try: 376 if prefix_str: 377 self._current_file_obj.write(prefix_str) 378 self._current_file_obj.write(data) 379 if suffix_str: 380 self._current_file_obj.write(suffix_str) 381 except BrokenPipeError: 382 warn("BrokenPipeError encountered. The daemon may have been terminated.") 383 return 384 except Exception as e: 385 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 386 self.flush() 387 self.delete(unused_only=True) 388 except Exception as e: 389 warn(f"Unexpected error in RotatingFile.write: {e}")
Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.
NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.
392 def delete(self, unused_only: bool = False) -> None: 393 """ 394 Delete old subfiles. 395 396 Parameters 397 ---------- 398 unused_only: bool, default False 399 If `True`, only delete subfiles which are no longer needed. 400 """ 401 existing_subfile_paths = self.get_existing_subfile_paths() 402 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 403 return 404 405 self.flush() 406 self.close(unused_only=unused_only) 407 408 end_ix = ( 409 (-1 * self.num_files_to_keep) 410 if unused_only 411 else len(existing_subfile_paths) 412 ) 413 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 414 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 415 subfile_object = self.subfile_objects.get(subfile_index, None) 416 417 try: 418 subfile_path_to_delete.unlink() 419 except Exception as e: 420 warn( 421 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 422 + f"{traceback.format_exc()}" 423 )
Delete old subfiles.
Parameters
- unused_only (bool, default False):
If
True
, only delete subfiles which are no longer needed.
426 def read(self, *args, **kwargs) -> str: 427 """ 428 Read the contents of the existing subfiles. 429 """ 430 existing_subfile_indices = [ 431 self.get_index_from_subfile_name(subfile_path.name) 432 for subfile_path in self.get_existing_subfile_paths() 433 ] 434 paths_to_read = [ 435 self.get_subfile_path_from_index(subfile_index) 436 for subfile_index in existing_subfile_indices 437 if subfile_index >= self._cursor[0] 438 ] 439 buffer = '' 440 refresh_cursor = True 441 for subfile_path in paths_to_read: 442 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 443 seek_ix = ( 444 self._cursor[1] 445 if subfile_index == self._cursor[0] 446 else 0 447 ) 448 449 if ( 450 subfile_index in self.subfile_objects 451 and 452 subfile_index not in self._redirected_subfile_objects 453 ): 454 subfile_object = self.subfile_objects[subfile_index] 455 for i in range(self.SEEK_BACK_ATTEMPTS): 456 try: 457 subfile_object.seek(max(seek_ix - i, 0)) 458 buffer += subfile_object.read() 459 except UnicodeDecodeError: 460 continue 461 break 462 else: 463 with open(subfile_path, 'r', encoding='utf-8') as f: 464 for i in range(self.SEEK_BACK_ATTEMPTS): 465 try: 466 f.seek(max(seek_ix - i, 0)) 467 buffer += f.read() 468 except UnicodeDecodeError: 469 continue 470 break 471 472 ### Handle the case when no files have yet been opened. 473 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 474 self._cursor = (subfile_index, f.tell()) 475 refresh_cursor = False 476 477 if refresh_cursor: 478 self.refresh_cursor() 479 return buffer
Read the contents of the existing subfiles.
482 def refresh_cursor(self) -> None: 483 """ 484 Update the cursor to the latest subfile index and file.tell() value. 485 """ 486 self.flush() 487 existing_subfile_paths = self.get_existing_subfile_paths() 488 current_ix = ( 489 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 490 if existing_subfile_paths 491 else 0 492 ) 493 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 494 self._cursor = (current_ix, position)
Update the cursor to the latest subfile index and file.tell() value.
497 def readlines(self) -> List[str]: 498 """ 499 Return a list of lines of text. 500 """ 501 existing_subfile_indices = [ 502 self.get_index_from_subfile_name(subfile_path.name) 503 for subfile_path in self.get_existing_subfile_paths() 504 ] 505 paths_to_read = [ 506 self.get_subfile_path_from_index(subfile_index) 507 for subfile_index in existing_subfile_indices 508 if subfile_index >= self._cursor[0] 509 ] 510 511 lines = [] 512 refresh_cursor = True 513 for subfile_path in paths_to_read: 514 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 515 seek_ix = ( 516 self._cursor[1] 517 if subfile_index == self._cursor[0] 518 else 0 519 ) 520 521 if ( 522 subfile_index in self.subfile_objects 523 and 524 subfile_index not in self._redirected_subfile_objects 525 ): 526 subfile_object = self.subfile_objects[subfile_index] 527 for i in range(self.SEEK_BACK_ATTEMPTS): 528 try: 529 subfile_object.seek(max((seek_ix - i), 0)) 530 subfile_lines = subfile_object.readlines() 531 except UnicodeDecodeError: 532 continue 533 break 534 else: 535 with open(subfile_path, 'r', encoding='utf-8') as f: 536 for i in range(self.SEEK_BACK_ATTEMPTS): 537 try: 538 f.seek(max(seek_ix - i, 0)) 539 subfile_lines = f.readlines() 540 except UnicodeDecodeError: 541 continue 542 break 543 544 ### Handle the case when no files have yet been opened. 545 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 546 self._cursor = (subfile_index, f.tell()) 547 refresh_cursor = False 548 549 ### Sometimes a line may span multiple files. 550 if lines and subfile_lines and not lines[-1].endswith('\n'): 551 lines[-1] += subfile_lines[0] 552 new_lines = subfile_lines[1:] 553 else: 554 new_lines = subfile_lines 555 lines.extend(new_lines) 556 557 if refresh_cursor: 558 self.refresh_cursor() 559 return lines
Return a list of lines of text.
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
566 def seek(self, position: int) -> None: 567 """ 568 Seek to the beginning of the logs stream. 569 """ 570 existing_subfile_indices = self.get_existing_subfile_indices() 571 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 572 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 573 if position == 0: 574 self._cursor = (min_ix, 0) 575 return 576 577 self._cursor = (max_ix, position) 578 if self._current_file_obj is not None: 579 self._current_file_obj.seek(position)
Seek to the beginning of the logs stream.
582 def flush(self) -> None: 583 """ 584 Flush any open subfiles. 585 """ 586 for subfile_index, subfile_object in self.subfile_objects.items(): 587 if not subfile_object.closed: 588 try: 589 subfile_object.flush() 590 except Exception as e: 591 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 592 if self.redirect_streams: 593 try: 594 sys.stdout.flush() 595 except BrokenPipeError: 596 pass 597 except Exception as e: 598 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 599 try: 600 sys.stderr.flush() 601 except BrokenPipeError: 602 pass 603 except Exception as e: 604 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
Flush any open subfiles.
607 def start_log_fd_interception(self): 608 """ 609 Start the file descriptor monitoring threads. 610 """ 611 if not self.write_timestamps: 612 return 613 614 self._stdout_interceptor = FileDescriptorInterceptor( 615 sys.stdout.fileno(), 616 self.get_timestamp_prefix_str, 617 ) 618 self._stderr_interceptor = FileDescriptorInterceptor( 619 sys.stderr.fileno(), 620 self.get_timestamp_prefix_str, 621 ) 622 623 self._stdout_interceptor_thread = Thread( 624 target = self._stdout_interceptor.start_interception, 625 daemon = True, 626 ) 627 self._stderr_interceptor_thread = Thread( 628 target = self._stderr_interceptor.start_interception, 629 daemon = True, 630 ) 631 self._stdout_interceptor_thread.start() 632 self._stderr_interceptor_thread.start() 633 self._intercepting = True 634 635 if '_interceptor_threads' not in self.__dict__: 636 self._interceptor_threads = [] 637 if '_interceptors' not in self.__dict__: 638 self._interceptors = [] 639 self._interceptor_threads.extend([ 640 self._stdout_interceptor_thread, 641 self._stderr_interceptor_thread, 642 ]) 643 self._interceptors.extend([ 644 self._stdout_interceptor, 645 self._stderr_interceptor, 646 ]) 647 self.stop_log_fd_interception(unused_only=True)
Start the file descriptor monitoring threads.
650 def stop_log_fd_interception(self, unused_only: bool = False): 651 """ 652 Stop the file descriptor monitoring threads. 653 """ 654 if not self.write_timestamps: 655 return 656 657 interceptors = self.__dict__.get('_interceptors', []) 658 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 659 660 end_ix = len(interceptors) if not unused_only else -2 661 662 for interceptor in interceptors[:end_ix]: 663 interceptor.stop_interception() 664 del interceptors[:end_ix] 665 666 for thread in interceptor_threads[:end_ix]: 667 try: 668 thread.join() 669 except Exception as e: 670 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 671 del interceptor_threads[:end_ix]
Stop the file descriptor monitoring threads.