meerschaum.utils.daemon.RotatingFile
Create a file-like object that manages sub-files under the hood.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Create a file-like object that manages sub-files under the hood. 7""" 8 9import os 10import io 11import re 12import pathlib 13import traceback 14import sys 15import atexit 16from datetime import datetime, timezone 17from typing import List, Optional, Tuple 18from meerschaum.config import get_config 19from meerschaum.utils.warnings import warn 20from meerschaum.utils.daemon.FileDescriptorInterceptor import FileDescriptorInterceptor 21from meerschaum.utils.threading import Thread 22import meerschaum as mrsm 23daemon = mrsm.attempt_import('daemon') 24 25class RotatingFile(io.IOBase): 26 """ 27 A `RotatingFile` may be treated like a normal file-like object. 28 Under the hood, however, it will create new sub-files and delete old ones. 29 """ 30 31 SEEK_BACK_ATTEMPTS: int = 5 32 33 def __init__( 34 self, 35 file_path: pathlib.Path, 36 num_files_to_keep: Optional[int] = None, 37 max_file_size: Optional[int] = None, 38 redirect_streams: bool = False, 39 write_timestamps: bool = False, 40 timestamp_format: Optional[str] = None, 41 ): 42 """ 43 Create a file-like object which manages other files. 44 45 Parameters 46 ---------- 47 num_files_to_keep: int, default None 48 How many sub-files to keep at any given time. 49 Defaults to the configured value (5). 50 51 max_file_size: int, default None 52 How large in bytes each sub-file can grow before another file is created. 53 Note that this is not a hard limit but rather a threshold 54 which may be slightly exceeded. 55 Defaults to the configured value (100_000). 56 57 redirect_streams: bool, default False 58 If `True`, redirect previous file streams when opening a new file descriptor. 59 60 NOTE: Only set this to `True` if you are entering into a daemon context. 61 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 62 63 write_timestamps: bool, default False 64 If `True`, prepend the current UTC timestamp to each line of the file. 65 66 timestamp_format: str, default None 67 If `write_timestamps` is `True`, use this format for the timestamps. 68 Defaults to `'%Y-%m-%d %H:%M'`. 69 """ 70 self.file_path = pathlib.Path(file_path) 71 if num_files_to_keep is None: 72 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 73 if max_file_size is None: 74 max_file_size = get_config('jobs', 'logs', 'max_file_size') 75 if timestamp_format is None: 76 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 77 if num_files_to_keep < 2: 78 raise ValueError("At least 2 files must be kept.") 79 if max_file_size < 1: 80 raise ValueError("Subfiles must contain at least one byte.") 81 82 self.num_files_to_keep = num_files_to_keep 83 self.max_file_size = max_file_size 84 self.redirect_streams = redirect_streams 85 self.write_timestamps = write_timestamps 86 self.timestamp_format = timestamp_format 87 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$') 88 89 ### When subfiles are opened, map from their index to the file objects. 90 self.subfile_objects = {} 91 self._redirected_subfile_objects = {} 92 self._current_file_obj = None 93 self._previous_file_obj = None 94 95 ### When reading, keep track of the file index and position. 96 self._cursor: Tuple[int, int] = (0, 0) 97 98 ### Don't forget to close any stray files. 99 atexit.register(self.close) 100 101 102 def fileno(self): 103 """ 104 Return the file descriptor for the latest subfile. 105 """ 106 self.refresh_files(start_interception=False) 107 return self._current_file_obj.fileno() 108 109 110 def get_latest_subfile_path(self) -> pathlib.Path: 111 """ 112 Return the path for the latest subfile to which to write into. 113 """ 114 return self.get_subfile_path_from_index( 115 self.get_latest_subfile_index() 116 ) 117 118 119 def get_remaining_subfile_size(self, subfile_index: int) -> int: 120 """ 121 Return the remaining buffer size for a subfile. 122 123 Parameters 124 --------- 125 subfile_index: int 126 The index of the subfile to be checked. 127 128 Returns 129 ------- 130 The remaining size in bytes. 131 """ 132 subfile_path = self.get_subfile_path_from_index(subfile_index) 133 if not subfile_path.exists(): 134 return self.max_file_size 135 136 return self.max_file_size - os.path.getsize(subfile_path) 137 138 139 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 140 """ 141 Return whether a given subfile is too large. 142 143 Parameters 144 ---------- 145 subfile_index: int 146 The index of the subfile to be checked. 147 148 potential_new_len: int, default 0 149 The length of a potential write of new data. 150 151 Returns 152 ------- 153 A bool indicating the subfile is or will be too large. 154 """ 155 subfile_path = self.get_subfile_path_from_index(subfile_index) 156 if not subfile_path.exists(): 157 return False 158 159 self.flush() 160 161 return ( 162 (os.path.getsize(subfile_path) + potential_new_len) 163 >= 164 self.max_file_size 165 ) 166 167 168 def get_latest_subfile_index(self) -> int: 169 """ 170 Return the latest existing subfile index. 171 If no index may be found, return -1. 172 """ 173 existing_subfile_paths = self.get_existing_subfile_paths() 174 latest_index = ( 175 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 176 if existing_subfile_paths 177 else 0 178 ) 179 return latest_index 180 181 182 def get_index_from_subfile_name(self, subfile_name: str) -> int: 183 """ 184 Return the index from a given subfile name. 185 If the file name cannot be parsed, return -1. 186 """ 187 try: 188 return int(subfile_name.replace(self.file_path.name + '.', '')) 189 except Exception as e: 190 return -1 191 192 193 def get_subfile_name_from_index(self, subfile_index: int) -> str: 194 """ 195 Return the subfile name from the given index. 196 """ 197 return f'{self.file_path.name}.{subfile_index}' 198 199 200 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 201 """ 202 Return the subfile's path from its index. 203 """ 204 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index) 205 206 207 def get_existing_subfile_indices(self) -> List[int]: 208 """ 209 Return of list of subfile indices which exist on disk. 210 """ 211 existing_subfile_paths = self.get_existing_subfile_paths() 212 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths] 213 214 215 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 216 """ 217 Return a list of file paths that match the input filename pattern. 218 """ 219 if not self.file_path.parent.exists(): 220 return [] 221 222 subfile_names_indices = sorted( 223 [ 224 (file_name, self.get_index_from_subfile_name(file_name)) 225 for file_name in os.listdir(self.file_path.parent) 226 if ( 227 file_name.startswith(self.file_path.name) 228 and re.match(self.subfile_regex_pattern, file_name) 229 ) 230 ], 231 key=lambda x: x[1], 232 ) 233 return [ 234 (self.file_path.parent / file_name) 235 for file_name, _ in subfile_names_indices 236 ] 237 238 239 def refresh_files( 240 self, 241 potential_new_len: int = 0, 242 start_interception: bool = False, 243 ) -> '_io.TextUIWrapper': 244 """ 245 Check the state of the subfiles. 246 If the latest subfile is too large, create a new file and delete old ones. 247 248 Parameters 249 ---------- 250 potential_new_len: int, default 0 251 252 start_interception: bool, default False 253 If `True`, kick off the file interception threads. 254 """ 255 self.flush() 256 257 latest_subfile_index = self.get_latest_subfile_index() 258 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 259 260 ### First run with existing log files: open the most recent log file. 261 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 262 263 ### Sometimes a new file is created but output doesn't switch over. 264 lost_latest_handle = ( 265 self._current_file_obj is not None 266 and 267 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 268 ) 269 if is_first_run_with_logs or lost_latest_handle: 270 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 271 if self.redirect_streams: 272 try: 273 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 274 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 275 except OSError as e: 276 warn( 277 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 278 ) 279 if start_interception and self.write_timestamps: 280 self.start_log_fd_interception() 281 282 create_new_file = ( 283 (latest_subfile_index == -1) 284 or 285 self._current_file_obj is None 286 or 287 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 288 ) 289 if create_new_file: 290 old_subfile_index = latest_subfile_index 291 new_subfile_index = old_subfile_index + 1 292 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 293 self._previous_file_obj = self._current_file_obj 294 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 295 self.subfile_objects[new_subfile_index] = self._current_file_obj 296 self.flush() 297 298 if self._previous_file_obj is not None: 299 if self.redirect_streams: 300 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 301 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 302 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 303 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 304 self.close(unused_only=True) 305 306 ### Sanity check in case writing somehow fails. 307 if self._previous_file_obj is self._current_file_obj: 308 self._previous_file_obj = None 309 310 self.delete(unused_only=True) 311 312 return self._current_file_obj 313 314 315 def close(self, unused_only: bool = False) -> None: 316 """ 317 Close any open file descriptors. 318 319 Parameters 320 ---------- 321 unused_only: bool, default False 322 If `True`, only close file descriptors not currently in use. 323 """ 324 self.stop_log_fd_interception(unused_only=unused_only) 325 subfile_indices = sorted(self.subfile_objects.keys()) 326 for subfile_index in subfile_indices: 327 subfile_object = self.subfile_objects[subfile_index] 328 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 329 continue 330 try: 331 if not subfile_object.closed: 332 subfile_object.close() 333 except Exception as e: 334 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 335 336 _ = self.subfile_objects.pop(subfile_index, None) 337 if self.redirect_streams: 338 _ = self._redirected_subfile_objects.pop(subfile_index, None) 339 340 if not unused_only: 341 self._previous_file_obj = None 342 self._current_file_obj = None 343 344 345 def get_timestamp_prefix_str(self) -> str: 346 """ 347 Return the current minute prefix string. 348 """ 349 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | ' 350 351 352 def write(self, data: str) -> None: 353 """ 354 Write the given text into the latest subfile. 355 If the subfile will be too large, create a new subfile. 356 If too many subfiles exist at once, the oldest one will be deleted. 357 358 NOTE: This will not split data across multiple files. 359 As such, if data is larger than max_file_size, then the corresponding subfile 360 may exceed this limit. 361 """ 362 try: 363 self.file_path.parent.mkdir(exist_ok=True, parents=True) 364 if isinstance(data, bytes): 365 data = data.decode('utf-8') 366 367 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 368 suffix_str = "\n" if self.write_timestamps else "" 369 self.refresh_files( 370 potential_new_len = len(prefix_str + data + suffix_str), 371 start_interception = self.write_timestamps, 372 ) 373 try: 374 if prefix_str: 375 self._current_file_obj.write(prefix_str) 376 self._current_file_obj.write(data) 377 if suffix_str: 378 self._current_file_obj.write(suffix_str) 379 except BrokenPipeError: 380 warn("BrokenPipeError encountered. The daemon may have been terminated.") 381 return 382 except Exception as e: 383 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 384 self.flush() 385 self.delete(unused_only=True) 386 except Exception as e: 387 warn(f"Unexpected error in RotatingFile.write: {e}") 388 389 390 def delete(self, unused_only: bool = False) -> None: 391 """ 392 Delete old subfiles. 393 394 Parameters 395 ---------- 396 unused_only: bool, default False 397 If `True`, only delete subfiles which are no longer needed. 398 """ 399 existing_subfile_paths = self.get_existing_subfile_paths() 400 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 401 return 402 403 self.flush() 404 self.close(unused_only=unused_only) 405 406 end_ix = ( 407 (-1 * self.num_files_to_keep) 408 if unused_only 409 else len(existing_subfile_paths) 410 ) 411 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 412 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 413 subfile_object = self.subfile_objects.get(subfile_index, None) 414 415 try: 416 subfile_path_to_delete.unlink() 417 except Exception as e: 418 warn( 419 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 420 + f"{traceback.format_exc()}" 421 ) 422 423 424 def read(self, *args, **kwargs) -> str: 425 """ 426 Read the contents of the existing subfiles. 427 """ 428 existing_subfile_indices = [ 429 self.get_index_from_subfile_name(subfile_path.name) 430 for subfile_path in self.get_existing_subfile_paths() 431 ] 432 paths_to_read = [ 433 self.get_subfile_path_from_index(subfile_index) 434 for subfile_index in existing_subfile_indices 435 if subfile_index >= self._cursor[0] 436 ] 437 buffer = '' 438 refresh_cursor = True 439 for subfile_path in paths_to_read: 440 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 441 seek_ix = ( 442 self._cursor[1] 443 if subfile_index == self._cursor[0] 444 else 0 445 ) 446 447 if ( 448 subfile_index in self.subfile_objects 449 and 450 subfile_index not in self._redirected_subfile_objects 451 ): 452 subfile_object = self.subfile_objects[subfile_index] 453 for i in range(self.SEEK_BACK_ATTEMPTS): 454 try: 455 subfile_object.seek(max(seek_ix - i, 0)) 456 buffer += subfile_object.read() 457 except UnicodeDecodeError: 458 continue 459 break 460 else: 461 with open(subfile_path, 'r', encoding='utf-8') as f: 462 for i in range(self.SEEK_BACK_ATTEMPTS): 463 try: 464 f.seek(max(seek_ix - i, 0)) 465 buffer += f.read() 466 except UnicodeDecodeError: 467 continue 468 break 469 470 ### Handle the case when no files have yet been opened. 471 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 472 self._cursor = (subfile_index, f.tell()) 473 refresh_cursor = False 474 475 if refresh_cursor: 476 self.refresh_cursor() 477 return buffer 478 479 480 def refresh_cursor(self) -> None: 481 """ 482 Update the cursor to the latest subfile index and file.tell() value. 483 """ 484 self.flush() 485 existing_subfile_paths = self.get_existing_subfile_paths() 486 current_ix = ( 487 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 488 if existing_subfile_paths 489 else 0 490 ) 491 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 492 self._cursor = (current_ix, position) 493 494 495 def readlines(self) -> List[str]: 496 """ 497 Return a list of lines of text. 498 """ 499 existing_subfile_indices = [ 500 self.get_index_from_subfile_name(subfile_path.name) 501 for subfile_path in self.get_existing_subfile_paths() 502 ] 503 paths_to_read = [ 504 self.get_subfile_path_from_index(subfile_index) 505 for subfile_index in existing_subfile_indices 506 if subfile_index >= self._cursor[0] 507 ] 508 509 lines = [] 510 refresh_cursor = True 511 for subfile_path in paths_to_read: 512 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 513 seek_ix = ( 514 self._cursor[1] 515 if subfile_index == self._cursor[0] 516 else 0 517 ) 518 519 subfile_lines = [] 520 if ( 521 subfile_index in self.subfile_objects 522 and 523 subfile_index not in self._redirected_subfile_objects 524 ): 525 subfile_object = self.subfile_objects[subfile_index] 526 for i in range(self.SEEK_BACK_ATTEMPTS): 527 try: 528 subfile_object.seek(max((seek_ix - i), 0)) 529 subfile_lines = subfile_object.readlines() 530 except UnicodeDecodeError: 531 continue 532 break 533 else: 534 with open(subfile_path, 'r', encoding='utf-8') as f: 535 for i in range(self.SEEK_BACK_ATTEMPTS): 536 try: 537 f.seek(max(seek_ix - i, 0)) 538 subfile_lines = f.readlines() 539 except UnicodeDecodeError: 540 continue 541 break 542 543 ### Handle the case when no files have yet been opened. 544 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 545 self._cursor = (subfile_index, f.tell()) 546 refresh_cursor = False 547 548 ### Sometimes a line may span multiple files. 549 if lines and subfile_lines and not lines[-1].endswith('\n'): 550 lines[-1] += subfile_lines[0] 551 new_lines = subfile_lines[1:] 552 else: 553 new_lines = subfile_lines 554 lines.extend(new_lines) 555 556 if refresh_cursor: 557 self.refresh_cursor() 558 return lines 559 560 561 def seekable(self) -> bool: 562 return True 563 564 565 def seek(self, position: int) -> None: 566 """ 567 Seek to the beginning of the logs stream. 568 """ 569 existing_subfile_indices = self.get_existing_subfile_indices() 570 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 571 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 572 if position == 0: 573 self._cursor = (min_ix, 0) 574 return 575 576 self._cursor = (max_ix, position) 577 if self._current_file_obj is not None: 578 self._current_file_obj.seek(position) 579 580 581 def flush(self) -> None: 582 """ 583 Flush any open subfiles. 584 """ 585 for subfile_index, subfile_object in self.subfile_objects.items(): 586 if not subfile_object.closed: 587 try: 588 subfile_object.flush() 589 except Exception as e: 590 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 591 if self.redirect_streams: 592 try: 593 sys.stdout.flush() 594 except BrokenPipeError: 595 pass 596 except Exception as e: 597 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 598 try: 599 sys.stderr.flush() 600 except BrokenPipeError: 601 pass 602 except Exception as e: 603 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}") 604 605 606 def start_log_fd_interception(self): 607 """ 608 Start the file descriptor monitoring threads. 609 """ 610 if not self.write_timestamps: 611 return 612 613 self._stdout_interceptor = FileDescriptorInterceptor( 614 sys.stdout.fileno(), 615 self.get_timestamp_prefix_str, 616 ) 617 self._stderr_interceptor = FileDescriptorInterceptor( 618 sys.stderr.fileno(), 619 self.get_timestamp_prefix_str, 620 ) 621 622 self._stdout_interceptor_thread = Thread( 623 target = self._stdout_interceptor.start_interception, 624 daemon = True, 625 ) 626 self._stderr_interceptor_thread = Thread( 627 target = self._stderr_interceptor.start_interception, 628 daemon = True, 629 ) 630 self._stdout_interceptor_thread.start() 631 self._stderr_interceptor_thread.start() 632 self._intercepting = True 633 634 if '_interceptor_threads' not in self.__dict__: 635 self._interceptor_threads = [] 636 if '_interceptors' not in self.__dict__: 637 self._interceptors = [] 638 self._interceptor_threads.extend([ 639 self._stdout_interceptor_thread, 640 self._stderr_interceptor_thread, 641 ]) 642 self._interceptors.extend([ 643 self._stdout_interceptor, 644 self._stderr_interceptor, 645 ]) 646 self.stop_log_fd_interception(unused_only=True) 647 648 649 def stop_log_fd_interception(self, unused_only: bool = False): 650 """ 651 Stop the file descriptor monitoring threads. 652 """ 653 if not self.write_timestamps: 654 return 655 656 interceptors = self.__dict__.get('_interceptors', []) 657 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 658 659 end_ix = len(interceptors) if not unused_only else -2 660 661 for interceptor in interceptors[:end_ix]: 662 interceptor.stop_interception() 663 del interceptors[:end_ix] 664 665 for thread in interceptor_threads[:end_ix]: 666 try: 667 thread.join() 668 except Exception as e: 669 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 670 del interceptor_threads[:end_ix] 671 672 673 def __repr__(self) -> str: 674 """ 675 Return basic info for this `RotatingFile`. 676 """ 677 return ( 678 "RotatingFile(" 679 + f"'{self.file_path.as_posix()}', " 680 + f"num_files_to_keep={self.num_files_to_keep}, " 681 + f"max_file_size={self.max_file_size})" 682 )
26class RotatingFile(io.IOBase): 27 """ 28 A `RotatingFile` may be treated like a normal file-like object. 29 Under the hood, however, it will create new sub-files and delete old ones. 30 """ 31 32 SEEK_BACK_ATTEMPTS: int = 5 33 34 def __init__( 35 self, 36 file_path: pathlib.Path, 37 num_files_to_keep: Optional[int] = None, 38 max_file_size: Optional[int] = None, 39 redirect_streams: bool = False, 40 write_timestamps: bool = False, 41 timestamp_format: Optional[str] = None, 42 ): 43 """ 44 Create a file-like object which manages other files. 45 46 Parameters 47 ---------- 48 num_files_to_keep: int, default None 49 How many sub-files to keep at any given time. 50 Defaults to the configured value (5). 51 52 max_file_size: int, default None 53 How large in bytes each sub-file can grow before another file is created. 54 Note that this is not a hard limit but rather a threshold 55 which may be slightly exceeded. 56 Defaults to the configured value (100_000). 57 58 redirect_streams: bool, default False 59 If `True`, redirect previous file streams when opening a new file descriptor. 60 61 NOTE: Only set this to `True` if you are entering into a daemon context. 62 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 63 64 write_timestamps: bool, default False 65 If `True`, prepend the current UTC timestamp to each line of the file. 66 67 timestamp_format: str, default None 68 If `write_timestamps` is `True`, use this format for the timestamps. 69 Defaults to `'%Y-%m-%d %H:%M'`. 70 """ 71 self.file_path = pathlib.Path(file_path) 72 if num_files_to_keep is None: 73 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 74 if max_file_size is None: 75 max_file_size = get_config('jobs', 'logs', 'max_file_size') 76 if timestamp_format is None: 77 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 78 if num_files_to_keep < 2: 79 raise ValueError("At least 2 files must be kept.") 80 if max_file_size < 1: 81 raise ValueError("Subfiles must contain at least one byte.") 82 83 self.num_files_to_keep = num_files_to_keep 84 self.max_file_size = max_file_size 85 self.redirect_streams = redirect_streams 86 self.write_timestamps = write_timestamps 87 self.timestamp_format = timestamp_format 88 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$') 89 90 ### When subfiles are opened, map from their index to the file objects. 91 self.subfile_objects = {} 92 self._redirected_subfile_objects = {} 93 self._current_file_obj = None 94 self._previous_file_obj = None 95 96 ### When reading, keep track of the file index and position. 97 self._cursor: Tuple[int, int] = (0, 0) 98 99 ### Don't forget to close any stray files. 100 atexit.register(self.close) 101 102 103 def fileno(self): 104 """ 105 Return the file descriptor for the latest subfile. 106 """ 107 self.refresh_files(start_interception=False) 108 return self._current_file_obj.fileno() 109 110 111 def get_latest_subfile_path(self) -> pathlib.Path: 112 """ 113 Return the path for the latest subfile to which to write into. 114 """ 115 return self.get_subfile_path_from_index( 116 self.get_latest_subfile_index() 117 ) 118 119 120 def get_remaining_subfile_size(self, subfile_index: int) -> int: 121 """ 122 Return the remaining buffer size for a subfile. 123 124 Parameters 125 --------- 126 subfile_index: int 127 The index of the subfile to be checked. 128 129 Returns 130 ------- 131 The remaining size in bytes. 132 """ 133 subfile_path = self.get_subfile_path_from_index(subfile_index) 134 if not subfile_path.exists(): 135 return self.max_file_size 136 137 return self.max_file_size - os.path.getsize(subfile_path) 138 139 140 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 141 """ 142 Return whether a given subfile is too large. 143 144 Parameters 145 ---------- 146 subfile_index: int 147 The index of the subfile to be checked. 148 149 potential_new_len: int, default 0 150 The length of a potential write of new data. 151 152 Returns 153 ------- 154 A bool indicating the subfile is or will be too large. 155 """ 156 subfile_path = self.get_subfile_path_from_index(subfile_index) 157 if not subfile_path.exists(): 158 return False 159 160 self.flush() 161 162 return ( 163 (os.path.getsize(subfile_path) + potential_new_len) 164 >= 165 self.max_file_size 166 ) 167 168 169 def get_latest_subfile_index(self) -> int: 170 """ 171 Return the latest existing subfile index. 172 If no index may be found, return -1. 173 """ 174 existing_subfile_paths = self.get_existing_subfile_paths() 175 latest_index = ( 176 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 177 if existing_subfile_paths 178 else 0 179 ) 180 return latest_index 181 182 183 def get_index_from_subfile_name(self, subfile_name: str) -> int: 184 """ 185 Return the index from a given subfile name. 186 If the file name cannot be parsed, return -1. 187 """ 188 try: 189 return int(subfile_name.replace(self.file_path.name + '.', '')) 190 except Exception as e: 191 return -1 192 193 194 def get_subfile_name_from_index(self, subfile_index: int) -> str: 195 """ 196 Return the subfile name from the given index. 197 """ 198 return f'{self.file_path.name}.{subfile_index}' 199 200 201 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 202 """ 203 Return the subfile's path from its index. 204 """ 205 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index) 206 207 208 def get_existing_subfile_indices(self) -> List[int]: 209 """ 210 Return of list of subfile indices which exist on disk. 211 """ 212 existing_subfile_paths = self.get_existing_subfile_paths() 213 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths] 214 215 216 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 217 """ 218 Return a list of file paths that match the input filename pattern. 219 """ 220 if not self.file_path.parent.exists(): 221 return [] 222 223 subfile_names_indices = sorted( 224 [ 225 (file_name, self.get_index_from_subfile_name(file_name)) 226 for file_name in os.listdir(self.file_path.parent) 227 if ( 228 file_name.startswith(self.file_path.name) 229 and re.match(self.subfile_regex_pattern, file_name) 230 ) 231 ], 232 key=lambda x: x[1], 233 ) 234 return [ 235 (self.file_path.parent / file_name) 236 for file_name, _ in subfile_names_indices 237 ] 238 239 240 def refresh_files( 241 self, 242 potential_new_len: int = 0, 243 start_interception: bool = False, 244 ) -> '_io.TextUIWrapper': 245 """ 246 Check the state of the subfiles. 247 If the latest subfile is too large, create a new file and delete old ones. 248 249 Parameters 250 ---------- 251 potential_new_len: int, default 0 252 253 start_interception: bool, default False 254 If `True`, kick off the file interception threads. 255 """ 256 self.flush() 257 258 latest_subfile_index = self.get_latest_subfile_index() 259 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 260 261 ### First run with existing log files: open the most recent log file. 262 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 263 264 ### Sometimes a new file is created but output doesn't switch over. 265 lost_latest_handle = ( 266 self._current_file_obj is not None 267 and 268 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 269 ) 270 if is_first_run_with_logs or lost_latest_handle: 271 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 272 if self.redirect_streams: 273 try: 274 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 275 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 276 except OSError as e: 277 warn( 278 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 279 ) 280 if start_interception and self.write_timestamps: 281 self.start_log_fd_interception() 282 283 create_new_file = ( 284 (latest_subfile_index == -1) 285 or 286 self._current_file_obj is None 287 or 288 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 289 ) 290 if create_new_file: 291 old_subfile_index = latest_subfile_index 292 new_subfile_index = old_subfile_index + 1 293 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 294 self._previous_file_obj = self._current_file_obj 295 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 296 self.subfile_objects[new_subfile_index] = self._current_file_obj 297 self.flush() 298 299 if self._previous_file_obj is not None: 300 if self.redirect_streams: 301 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 302 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 303 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 304 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 305 self.close(unused_only=True) 306 307 ### Sanity check in case writing somehow fails. 308 if self._previous_file_obj is self._current_file_obj: 309 self._previous_file_obj = None 310 311 self.delete(unused_only=True) 312 313 return self._current_file_obj 314 315 316 def close(self, unused_only: bool = False) -> None: 317 """ 318 Close any open file descriptors. 319 320 Parameters 321 ---------- 322 unused_only: bool, default False 323 If `True`, only close file descriptors not currently in use. 324 """ 325 self.stop_log_fd_interception(unused_only=unused_only) 326 subfile_indices = sorted(self.subfile_objects.keys()) 327 for subfile_index in subfile_indices: 328 subfile_object = self.subfile_objects[subfile_index] 329 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 330 continue 331 try: 332 if not subfile_object.closed: 333 subfile_object.close() 334 except Exception as e: 335 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 336 337 _ = self.subfile_objects.pop(subfile_index, None) 338 if self.redirect_streams: 339 _ = self._redirected_subfile_objects.pop(subfile_index, None) 340 341 if not unused_only: 342 self._previous_file_obj = None 343 self._current_file_obj = None 344 345 346 def get_timestamp_prefix_str(self) -> str: 347 """ 348 Return the current minute prefix string. 349 """ 350 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | ' 351 352 353 def write(self, data: str) -> None: 354 """ 355 Write the given text into the latest subfile. 356 If the subfile will be too large, create a new subfile. 357 If too many subfiles exist at once, the oldest one will be deleted. 358 359 NOTE: This will not split data across multiple files. 360 As such, if data is larger than max_file_size, then the corresponding subfile 361 may exceed this limit. 362 """ 363 try: 364 self.file_path.parent.mkdir(exist_ok=True, parents=True) 365 if isinstance(data, bytes): 366 data = data.decode('utf-8') 367 368 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 369 suffix_str = "\n" if self.write_timestamps else "" 370 self.refresh_files( 371 potential_new_len = len(prefix_str + data + suffix_str), 372 start_interception = self.write_timestamps, 373 ) 374 try: 375 if prefix_str: 376 self._current_file_obj.write(prefix_str) 377 self._current_file_obj.write(data) 378 if suffix_str: 379 self._current_file_obj.write(suffix_str) 380 except BrokenPipeError: 381 warn("BrokenPipeError encountered. The daemon may have been terminated.") 382 return 383 except Exception as e: 384 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 385 self.flush() 386 self.delete(unused_only=True) 387 except Exception as e: 388 warn(f"Unexpected error in RotatingFile.write: {e}") 389 390 391 def delete(self, unused_only: bool = False) -> None: 392 """ 393 Delete old subfiles. 394 395 Parameters 396 ---------- 397 unused_only: bool, default False 398 If `True`, only delete subfiles which are no longer needed. 399 """ 400 existing_subfile_paths = self.get_existing_subfile_paths() 401 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 402 return 403 404 self.flush() 405 self.close(unused_only=unused_only) 406 407 end_ix = ( 408 (-1 * self.num_files_to_keep) 409 if unused_only 410 else len(existing_subfile_paths) 411 ) 412 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 413 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 414 subfile_object = self.subfile_objects.get(subfile_index, None) 415 416 try: 417 subfile_path_to_delete.unlink() 418 except Exception as e: 419 warn( 420 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 421 + f"{traceback.format_exc()}" 422 ) 423 424 425 def read(self, *args, **kwargs) -> str: 426 """ 427 Read the contents of the existing subfiles. 428 """ 429 existing_subfile_indices = [ 430 self.get_index_from_subfile_name(subfile_path.name) 431 for subfile_path in self.get_existing_subfile_paths() 432 ] 433 paths_to_read = [ 434 self.get_subfile_path_from_index(subfile_index) 435 for subfile_index in existing_subfile_indices 436 if subfile_index >= self._cursor[0] 437 ] 438 buffer = '' 439 refresh_cursor = True 440 for subfile_path in paths_to_read: 441 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 442 seek_ix = ( 443 self._cursor[1] 444 if subfile_index == self._cursor[0] 445 else 0 446 ) 447 448 if ( 449 subfile_index in self.subfile_objects 450 and 451 subfile_index not in self._redirected_subfile_objects 452 ): 453 subfile_object = self.subfile_objects[subfile_index] 454 for i in range(self.SEEK_BACK_ATTEMPTS): 455 try: 456 subfile_object.seek(max(seek_ix - i, 0)) 457 buffer += subfile_object.read() 458 except UnicodeDecodeError: 459 continue 460 break 461 else: 462 with open(subfile_path, 'r', encoding='utf-8') as f: 463 for i in range(self.SEEK_BACK_ATTEMPTS): 464 try: 465 f.seek(max(seek_ix - i, 0)) 466 buffer += f.read() 467 except UnicodeDecodeError: 468 continue 469 break 470 471 ### Handle the case when no files have yet been opened. 472 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 473 self._cursor = (subfile_index, f.tell()) 474 refresh_cursor = False 475 476 if refresh_cursor: 477 self.refresh_cursor() 478 return buffer 479 480 481 def refresh_cursor(self) -> None: 482 """ 483 Update the cursor to the latest subfile index and file.tell() value. 484 """ 485 self.flush() 486 existing_subfile_paths = self.get_existing_subfile_paths() 487 current_ix = ( 488 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 489 if existing_subfile_paths 490 else 0 491 ) 492 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 493 self._cursor = (current_ix, position) 494 495 496 def readlines(self) -> List[str]: 497 """ 498 Return a list of lines of text. 499 """ 500 existing_subfile_indices = [ 501 self.get_index_from_subfile_name(subfile_path.name) 502 for subfile_path in self.get_existing_subfile_paths() 503 ] 504 paths_to_read = [ 505 self.get_subfile_path_from_index(subfile_index) 506 for subfile_index in existing_subfile_indices 507 if subfile_index >= self._cursor[0] 508 ] 509 510 lines = [] 511 refresh_cursor = True 512 for subfile_path in paths_to_read: 513 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 514 seek_ix = ( 515 self._cursor[1] 516 if subfile_index == self._cursor[0] 517 else 0 518 ) 519 520 subfile_lines = [] 521 if ( 522 subfile_index in self.subfile_objects 523 and 524 subfile_index not in self._redirected_subfile_objects 525 ): 526 subfile_object = self.subfile_objects[subfile_index] 527 for i in range(self.SEEK_BACK_ATTEMPTS): 528 try: 529 subfile_object.seek(max((seek_ix - i), 0)) 530 subfile_lines = subfile_object.readlines() 531 except UnicodeDecodeError: 532 continue 533 break 534 else: 535 with open(subfile_path, 'r', encoding='utf-8') as f: 536 for i in range(self.SEEK_BACK_ATTEMPTS): 537 try: 538 f.seek(max(seek_ix - i, 0)) 539 subfile_lines = f.readlines() 540 except UnicodeDecodeError: 541 continue 542 break 543 544 ### Handle the case when no files have yet been opened. 545 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 546 self._cursor = (subfile_index, f.tell()) 547 refresh_cursor = False 548 549 ### Sometimes a line may span multiple files. 550 if lines and subfile_lines and not lines[-1].endswith('\n'): 551 lines[-1] += subfile_lines[0] 552 new_lines = subfile_lines[1:] 553 else: 554 new_lines = subfile_lines 555 lines.extend(new_lines) 556 557 if refresh_cursor: 558 self.refresh_cursor() 559 return lines 560 561 562 def seekable(self) -> bool: 563 return True 564 565 566 def seek(self, position: int) -> None: 567 """ 568 Seek to the beginning of the logs stream. 569 """ 570 existing_subfile_indices = self.get_existing_subfile_indices() 571 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 572 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 573 if position == 0: 574 self._cursor = (min_ix, 0) 575 return 576 577 self._cursor = (max_ix, position) 578 if self._current_file_obj is not None: 579 self._current_file_obj.seek(position) 580 581 582 def flush(self) -> None: 583 """ 584 Flush any open subfiles. 585 """ 586 for subfile_index, subfile_object in self.subfile_objects.items(): 587 if not subfile_object.closed: 588 try: 589 subfile_object.flush() 590 except Exception as e: 591 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 592 if self.redirect_streams: 593 try: 594 sys.stdout.flush() 595 except BrokenPipeError: 596 pass 597 except Exception as e: 598 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 599 try: 600 sys.stderr.flush() 601 except BrokenPipeError: 602 pass 603 except Exception as e: 604 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}") 605 606 607 def start_log_fd_interception(self): 608 """ 609 Start the file descriptor monitoring threads. 610 """ 611 if not self.write_timestamps: 612 return 613 614 self._stdout_interceptor = FileDescriptorInterceptor( 615 sys.stdout.fileno(), 616 self.get_timestamp_prefix_str, 617 ) 618 self._stderr_interceptor = FileDescriptorInterceptor( 619 sys.stderr.fileno(), 620 self.get_timestamp_prefix_str, 621 ) 622 623 self._stdout_interceptor_thread = Thread( 624 target = self._stdout_interceptor.start_interception, 625 daemon = True, 626 ) 627 self._stderr_interceptor_thread = Thread( 628 target = self._stderr_interceptor.start_interception, 629 daemon = True, 630 ) 631 self._stdout_interceptor_thread.start() 632 self._stderr_interceptor_thread.start() 633 self._intercepting = True 634 635 if '_interceptor_threads' not in self.__dict__: 636 self._interceptor_threads = [] 637 if '_interceptors' not in self.__dict__: 638 self._interceptors = [] 639 self._interceptor_threads.extend([ 640 self._stdout_interceptor_thread, 641 self._stderr_interceptor_thread, 642 ]) 643 self._interceptors.extend([ 644 self._stdout_interceptor, 645 self._stderr_interceptor, 646 ]) 647 self.stop_log_fd_interception(unused_only=True) 648 649 650 def stop_log_fd_interception(self, unused_only: bool = False): 651 """ 652 Stop the file descriptor monitoring threads. 653 """ 654 if not self.write_timestamps: 655 return 656 657 interceptors = self.__dict__.get('_interceptors', []) 658 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 659 660 end_ix = len(interceptors) if not unused_only else -2 661 662 for interceptor in interceptors[:end_ix]: 663 interceptor.stop_interception() 664 del interceptors[:end_ix] 665 666 for thread in interceptor_threads[:end_ix]: 667 try: 668 thread.join() 669 except Exception as e: 670 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 671 del interceptor_threads[:end_ix] 672 673 674 def __repr__(self) -> str: 675 """ 676 Return basic info for this `RotatingFile`. 677 """ 678 return ( 679 "RotatingFile(" 680 + f"'{self.file_path.as_posix()}', " 681 + f"num_files_to_keep={self.num_files_to_keep}, " 682 + f"max_file_size={self.max_file_size})" 683 )
A RotatingFile
may be treated like a normal file-like object.
Under the hood, however, it will create new sub-files and delete old ones.
34 def __init__( 35 self, 36 file_path: pathlib.Path, 37 num_files_to_keep: Optional[int] = None, 38 max_file_size: Optional[int] = None, 39 redirect_streams: bool = False, 40 write_timestamps: bool = False, 41 timestamp_format: Optional[str] = None, 42 ): 43 """ 44 Create a file-like object which manages other files. 45 46 Parameters 47 ---------- 48 num_files_to_keep: int, default None 49 How many sub-files to keep at any given time. 50 Defaults to the configured value (5). 51 52 max_file_size: int, default None 53 How large in bytes each sub-file can grow before another file is created. 54 Note that this is not a hard limit but rather a threshold 55 which may be slightly exceeded. 56 Defaults to the configured value (100_000). 57 58 redirect_streams: bool, default False 59 If `True`, redirect previous file streams when opening a new file descriptor. 60 61 NOTE: Only set this to `True` if you are entering into a daemon context. 62 Doing so will redirect `sys.stdout` and `sys.stderr` into the log files. 63 64 write_timestamps: bool, default False 65 If `True`, prepend the current UTC timestamp to each line of the file. 66 67 timestamp_format: str, default None 68 If `write_timestamps` is `True`, use this format for the timestamps. 69 Defaults to `'%Y-%m-%d %H:%M'`. 70 """ 71 self.file_path = pathlib.Path(file_path) 72 if num_files_to_keep is None: 73 num_files_to_keep = get_config('jobs', 'logs', 'num_files_to_keep') 74 if max_file_size is None: 75 max_file_size = get_config('jobs', 'logs', 'max_file_size') 76 if timestamp_format is None: 77 timestamp_format = get_config('jobs', 'logs', 'timestamps', 'format') 78 if num_files_to_keep < 2: 79 raise ValueError("At least 2 files must be kept.") 80 if max_file_size < 1: 81 raise ValueError("Subfiles must contain at least one byte.") 82 83 self.num_files_to_keep = num_files_to_keep 84 self.max_file_size = max_file_size 85 self.redirect_streams = redirect_streams 86 self.write_timestamps = write_timestamps 87 self.timestamp_format = timestamp_format 88 self.subfile_regex_pattern = re.compile(r'(.*)\.log(?:\.\d+)?$') 89 90 ### When subfiles are opened, map from their index to the file objects. 91 self.subfile_objects = {} 92 self._redirected_subfile_objects = {} 93 self._current_file_obj = None 94 self._previous_file_obj = None 95 96 ### When reading, keep track of the file index and position. 97 self._cursor: Tuple[int, int] = (0, 0) 98 99 ### Don't forget to close any stray files. 100 atexit.register(self.close)
Create a file-like object which manages other files.
Parameters
- num_files_to_keep (int, default None): How many sub-files to keep at any given time. Defaults to the configured value (5).
- max_file_size (int, default None): How large in bytes each sub-file can grow before another file is created. Note that this is not a hard limit but rather a threshold which may be slightly exceeded. Defaults to the configured value (100_000).
redirect_streams (bool, default False): If
True
, redirect previous file streams when opening a new file descriptor.NOTE: Only set this to
True
if you are entering into a daemon context. Doing so will redirectsys.stdout
andsys.stderr
into the log files.- write_timestamps (bool, default False):
If
True
, prepend the current UTC timestamp to each line of the file. - timestamp_format (str, default None):
If
write_timestamps
isTrue
, use this format for the timestamps. Defaults to'%Y-%m-%d %H:%M'
.
103 def fileno(self): 104 """ 105 Return the file descriptor for the latest subfile. 106 """ 107 self.refresh_files(start_interception=False) 108 return self._current_file_obj.fileno()
Return the file descriptor for the latest subfile.
111 def get_latest_subfile_path(self) -> pathlib.Path: 112 """ 113 Return the path for the latest subfile to which to write into. 114 """ 115 return self.get_subfile_path_from_index( 116 self.get_latest_subfile_index() 117 )
Return the path for the latest subfile to which to write into.
120 def get_remaining_subfile_size(self, subfile_index: int) -> int: 121 """ 122 Return the remaining buffer size for a subfile. 123 124 Parameters 125 --------- 126 subfile_index: int 127 The index of the subfile to be checked. 128 129 Returns 130 ------- 131 The remaining size in bytes. 132 """ 133 subfile_path = self.get_subfile_path_from_index(subfile_index) 134 if not subfile_path.exists(): 135 return self.max_file_size 136 137 return self.max_file_size - os.path.getsize(subfile_path)
Return the remaining buffer size for a subfile.
Parameters
- subfile_index (int): The index of the subfile to be checked.
Returns
- The remaining size in bytes.
140 def is_subfile_too_large(self, subfile_index: int, potential_new_len: int = 0) -> bool: 141 """ 142 Return whether a given subfile is too large. 143 144 Parameters 145 ---------- 146 subfile_index: int 147 The index of the subfile to be checked. 148 149 potential_new_len: int, default 0 150 The length of a potential write of new data. 151 152 Returns 153 ------- 154 A bool indicating the subfile is or will be too large. 155 """ 156 subfile_path = self.get_subfile_path_from_index(subfile_index) 157 if not subfile_path.exists(): 158 return False 159 160 self.flush() 161 162 return ( 163 (os.path.getsize(subfile_path) + potential_new_len) 164 >= 165 self.max_file_size 166 )
Return whether a given subfile is too large.
Parameters
- subfile_index (int): The index of the subfile to be checked.
- potential_new_len (int, default 0): The length of a potential write of new data.
Returns
- A bool indicating the subfile is or will be too large.
169 def get_latest_subfile_index(self) -> int: 170 """ 171 Return the latest existing subfile index. 172 If no index may be found, return -1. 173 """ 174 existing_subfile_paths = self.get_existing_subfile_paths() 175 latest_index = ( 176 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 177 if existing_subfile_paths 178 else 0 179 ) 180 return latest_index
Return the latest existing subfile index. If no index may be found, return -1.
183 def get_index_from_subfile_name(self, subfile_name: str) -> int: 184 """ 185 Return the index from a given subfile name. 186 If the file name cannot be parsed, return -1. 187 """ 188 try: 189 return int(subfile_name.replace(self.file_path.name + '.', '')) 190 except Exception as e: 191 return -1
Return the index from a given subfile name. If the file name cannot be parsed, return -1.
194 def get_subfile_name_from_index(self, subfile_index: int) -> str: 195 """ 196 Return the subfile name from the given index. 197 """ 198 return f'{self.file_path.name}.{subfile_index}'
Return the subfile name from the given index.
201 def get_subfile_path_from_index(self, subfile_index: int) -> pathlib.Path: 202 """ 203 Return the subfile's path from its index. 204 """ 205 return self.file_path.parent / self.get_subfile_name_from_index(subfile_index)
Return the subfile's path from its index.
208 def get_existing_subfile_indices(self) -> List[int]: 209 """ 210 Return of list of subfile indices which exist on disk. 211 """ 212 existing_subfile_paths = self.get_existing_subfile_paths() 213 return [self.get_index_from_subfile_name(path.name) for path in existing_subfile_paths]
Return of list of subfile indices which exist on disk.
216 def get_existing_subfile_paths(self) -> List[pathlib.Path]: 217 """ 218 Return a list of file paths that match the input filename pattern. 219 """ 220 if not self.file_path.parent.exists(): 221 return [] 222 223 subfile_names_indices = sorted( 224 [ 225 (file_name, self.get_index_from_subfile_name(file_name)) 226 for file_name in os.listdir(self.file_path.parent) 227 if ( 228 file_name.startswith(self.file_path.name) 229 and re.match(self.subfile_regex_pattern, file_name) 230 ) 231 ], 232 key=lambda x: x[1], 233 ) 234 return [ 235 (self.file_path.parent / file_name) 236 for file_name, _ in subfile_names_indices 237 ]
Return a list of file paths that match the input filename pattern.
240 def refresh_files( 241 self, 242 potential_new_len: int = 0, 243 start_interception: bool = False, 244 ) -> '_io.TextUIWrapper': 245 """ 246 Check the state of the subfiles. 247 If the latest subfile is too large, create a new file and delete old ones. 248 249 Parameters 250 ---------- 251 potential_new_len: int, default 0 252 253 start_interception: bool, default False 254 If `True`, kick off the file interception threads. 255 """ 256 self.flush() 257 258 latest_subfile_index = self.get_latest_subfile_index() 259 latest_subfile_path = self.get_subfile_path_from_index(latest_subfile_index) 260 261 ### First run with existing log files: open the most recent log file. 262 is_first_run_with_logs = ((latest_subfile_index > -1) and self._current_file_obj is None) 263 264 ### Sometimes a new file is created but output doesn't switch over. 265 lost_latest_handle = ( 266 self._current_file_obj is not None 267 and 268 self.get_index_from_subfile_name(self._current_file_obj.name) == -1 269 ) 270 if is_first_run_with_logs or lost_latest_handle: 271 self._current_file_obj = open(latest_subfile_path, 'a+', encoding='utf-8') 272 if self.redirect_streams: 273 try: 274 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 275 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 276 except OSError as e: 277 warn( 278 f"Encountered an issue when redirecting streams:\n{traceback.format_exc()}" 279 ) 280 if start_interception and self.write_timestamps: 281 self.start_log_fd_interception() 282 283 create_new_file = ( 284 (latest_subfile_index == -1) 285 or 286 self._current_file_obj is None 287 or 288 self.is_subfile_too_large(latest_subfile_index, potential_new_len) 289 ) 290 if create_new_file: 291 old_subfile_index = latest_subfile_index 292 new_subfile_index = old_subfile_index + 1 293 new_file_path = self.get_subfile_path_from_index(new_subfile_index) 294 self._previous_file_obj = self._current_file_obj 295 self._current_file_obj = open(new_file_path, 'a+', encoding='utf-8') 296 self.subfile_objects[new_subfile_index] = self._current_file_obj 297 self.flush() 298 299 if self._previous_file_obj is not None: 300 if self.redirect_streams: 301 self._redirected_subfile_objects[old_subfile_index] = self._previous_file_obj 302 daemon.daemon.redirect_stream(self._previous_file_obj, self._current_file_obj) 303 daemon.daemon.redirect_stream(sys.stdout, self._current_file_obj) 304 daemon.daemon.redirect_stream(sys.stderr, self._current_file_obj) 305 self.close(unused_only=True) 306 307 ### Sanity check in case writing somehow fails. 308 if self._previous_file_obj is self._current_file_obj: 309 self._previous_file_obj = None 310 311 self.delete(unused_only=True) 312 313 return self._current_file_obj
Check the state of the subfiles. If the latest subfile is too large, create a new file and delete old ones.
Parameters
potential_new_len (int, default 0):
start_interception (bool, default False): If
True
, kick off the file interception threads.
316 def close(self, unused_only: bool = False) -> None: 317 """ 318 Close any open file descriptors. 319 320 Parameters 321 ---------- 322 unused_only: bool, default False 323 If `True`, only close file descriptors not currently in use. 324 """ 325 self.stop_log_fd_interception(unused_only=unused_only) 326 subfile_indices = sorted(self.subfile_objects.keys()) 327 for subfile_index in subfile_indices: 328 subfile_object = self.subfile_objects[subfile_index] 329 if unused_only and subfile_object in (self._previous_file_obj, self._current_file_obj): 330 continue 331 try: 332 if not subfile_object.closed: 333 subfile_object.close() 334 except Exception as e: 335 warn(f"Failed to close an open subfile:\n{traceback.format_exc()}") 336 337 _ = self.subfile_objects.pop(subfile_index, None) 338 if self.redirect_streams: 339 _ = self._redirected_subfile_objects.pop(subfile_index, None) 340 341 if not unused_only: 342 self._previous_file_obj = None 343 self._current_file_obj = None
Close any open file descriptors.
Parameters
- unused_only (bool, default False):
If
True
, only close file descriptors not currently in use.
346 def get_timestamp_prefix_str(self) -> str: 347 """ 348 Return the current minute prefix string. 349 """ 350 return datetime.now(timezone.utc).strftime(self.timestamp_format) + ' | '
Return the current minute prefix string.
353 def write(self, data: str) -> None: 354 """ 355 Write the given text into the latest subfile. 356 If the subfile will be too large, create a new subfile. 357 If too many subfiles exist at once, the oldest one will be deleted. 358 359 NOTE: This will not split data across multiple files. 360 As such, if data is larger than max_file_size, then the corresponding subfile 361 may exceed this limit. 362 """ 363 try: 364 self.file_path.parent.mkdir(exist_ok=True, parents=True) 365 if isinstance(data, bytes): 366 data = data.decode('utf-8') 367 368 prefix_str = self.get_timestamp_prefix_str() if self.write_timestamps else "" 369 suffix_str = "\n" if self.write_timestamps else "" 370 self.refresh_files( 371 potential_new_len = len(prefix_str + data + suffix_str), 372 start_interception = self.write_timestamps, 373 ) 374 try: 375 if prefix_str: 376 self._current_file_obj.write(prefix_str) 377 self._current_file_obj.write(data) 378 if suffix_str: 379 self._current_file_obj.write(suffix_str) 380 except BrokenPipeError: 381 warn("BrokenPipeError encountered. The daemon may have been terminated.") 382 return 383 except Exception as e: 384 warn(f"Failed to write to subfile:\n{traceback.format_exc()}") 385 self.flush() 386 self.delete(unused_only=True) 387 except Exception as e: 388 warn(f"Unexpected error in RotatingFile.write: {e}")
Write the given text into the latest subfile. If the subfile will be too large, create a new subfile. If too many subfiles exist at once, the oldest one will be deleted.
NOTE: This will not split data across multiple files. As such, if data is larger than max_file_size, then the corresponding subfile may exceed this limit.
391 def delete(self, unused_only: bool = False) -> None: 392 """ 393 Delete old subfiles. 394 395 Parameters 396 ---------- 397 unused_only: bool, default False 398 If `True`, only delete subfiles which are no longer needed. 399 """ 400 existing_subfile_paths = self.get_existing_subfile_paths() 401 if unused_only and len(existing_subfile_paths) <= self.num_files_to_keep: 402 return 403 404 self.flush() 405 self.close(unused_only=unused_only) 406 407 end_ix = ( 408 (-1 * self.num_files_to_keep) 409 if unused_only 410 else len(existing_subfile_paths) 411 ) 412 for subfile_path_to_delete in existing_subfile_paths[0:end_ix]: 413 subfile_index = self.get_index_from_subfile_name(subfile_path_to_delete.name) 414 subfile_object = self.subfile_objects.get(subfile_index, None) 415 416 try: 417 subfile_path_to_delete.unlink() 418 except Exception as e: 419 warn( 420 f"Unable to delete subfile '{subfile_path_to_delete}':\n" 421 + f"{traceback.format_exc()}" 422 )
Delete old subfiles.
Parameters
- unused_only (bool, default False):
If
True
, only delete subfiles which are no longer needed.
425 def read(self, *args, **kwargs) -> str: 426 """ 427 Read the contents of the existing subfiles. 428 """ 429 existing_subfile_indices = [ 430 self.get_index_from_subfile_name(subfile_path.name) 431 for subfile_path in self.get_existing_subfile_paths() 432 ] 433 paths_to_read = [ 434 self.get_subfile_path_from_index(subfile_index) 435 for subfile_index in existing_subfile_indices 436 if subfile_index >= self._cursor[0] 437 ] 438 buffer = '' 439 refresh_cursor = True 440 for subfile_path in paths_to_read: 441 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 442 seek_ix = ( 443 self._cursor[1] 444 if subfile_index == self._cursor[0] 445 else 0 446 ) 447 448 if ( 449 subfile_index in self.subfile_objects 450 and 451 subfile_index not in self._redirected_subfile_objects 452 ): 453 subfile_object = self.subfile_objects[subfile_index] 454 for i in range(self.SEEK_BACK_ATTEMPTS): 455 try: 456 subfile_object.seek(max(seek_ix - i, 0)) 457 buffer += subfile_object.read() 458 except UnicodeDecodeError: 459 continue 460 break 461 else: 462 with open(subfile_path, 'r', encoding='utf-8') as f: 463 for i in range(self.SEEK_BACK_ATTEMPTS): 464 try: 465 f.seek(max(seek_ix - i, 0)) 466 buffer += f.read() 467 except UnicodeDecodeError: 468 continue 469 break 470 471 ### Handle the case when no files have yet been opened. 472 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 473 self._cursor = (subfile_index, f.tell()) 474 refresh_cursor = False 475 476 if refresh_cursor: 477 self.refresh_cursor() 478 return buffer
Read the contents of the existing subfiles.
481 def refresh_cursor(self) -> None: 482 """ 483 Update the cursor to the latest subfile index and file.tell() value. 484 """ 485 self.flush() 486 existing_subfile_paths = self.get_existing_subfile_paths() 487 current_ix = ( 488 self.get_index_from_subfile_name(existing_subfile_paths[-1].name) 489 if existing_subfile_paths 490 else 0 491 ) 492 position = self._current_file_obj.tell() if self._current_file_obj is not None else 0 493 self._cursor = (current_ix, position)
Update the cursor to the latest subfile index and file.tell() value.
496 def readlines(self) -> List[str]: 497 """ 498 Return a list of lines of text. 499 """ 500 existing_subfile_indices = [ 501 self.get_index_from_subfile_name(subfile_path.name) 502 for subfile_path in self.get_existing_subfile_paths() 503 ] 504 paths_to_read = [ 505 self.get_subfile_path_from_index(subfile_index) 506 for subfile_index in existing_subfile_indices 507 if subfile_index >= self._cursor[0] 508 ] 509 510 lines = [] 511 refresh_cursor = True 512 for subfile_path in paths_to_read: 513 subfile_index = self.get_index_from_subfile_name(subfile_path.name) 514 seek_ix = ( 515 self._cursor[1] 516 if subfile_index == self._cursor[0] 517 else 0 518 ) 519 520 subfile_lines = [] 521 if ( 522 subfile_index in self.subfile_objects 523 and 524 subfile_index not in self._redirected_subfile_objects 525 ): 526 subfile_object = self.subfile_objects[subfile_index] 527 for i in range(self.SEEK_BACK_ATTEMPTS): 528 try: 529 subfile_object.seek(max((seek_ix - i), 0)) 530 subfile_lines = subfile_object.readlines() 531 except UnicodeDecodeError: 532 continue 533 break 534 else: 535 with open(subfile_path, 'r', encoding='utf-8') as f: 536 for i in range(self.SEEK_BACK_ATTEMPTS): 537 try: 538 f.seek(max(seek_ix - i, 0)) 539 subfile_lines = f.readlines() 540 except UnicodeDecodeError: 541 continue 542 break 543 544 ### Handle the case when no files have yet been opened. 545 if not self.subfile_objects and subfile_path == paths_to_read[-1]: 546 self._cursor = (subfile_index, f.tell()) 547 refresh_cursor = False 548 549 ### Sometimes a line may span multiple files. 550 if lines and subfile_lines and not lines[-1].endswith('\n'): 551 lines[-1] += subfile_lines[0] 552 new_lines = subfile_lines[1:] 553 else: 554 new_lines = subfile_lines 555 lines.extend(new_lines) 556 557 if refresh_cursor: 558 self.refresh_cursor() 559 return lines
Return a list of lines of text.
Return whether object supports random access.
If False, seek(), tell() and truncate() will raise OSError. This method may need to do a test seek().
566 def seek(self, position: int) -> None: 567 """ 568 Seek to the beginning of the logs stream. 569 """ 570 existing_subfile_indices = self.get_existing_subfile_indices() 571 min_ix = existing_subfile_indices[0] if existing_subfile_indices else 0 572 max_ix = existing_subfile_indices[-1] if existing_subfile_indices else 0 573 if position == 0: 574 self._cursor = (min_ix, 0) 575 return 576 577 self._cursor = (max_ix, position) 578 if self._current_file_obj is not None: 579 self._current_file_obj.seek(position)
Seek to the beginning of the logs stream.
582 def flush(self) -> None: 583 """ 584 Flush any open subfiles. 585 """ 586 for subfile_index, subfile_object in self.subfile_objects.items(): 587 if not subfile_object.closed: 588 try: 589 subfile_object.flush() 590 except Exception as e: 591 warn(f"Failed to flush subfile {subfile_index}:\n{traceback.format_exc()}") 592 if self.redirect_streams: 593 try: 594 sys.stdout.flush() 595 except BrokenPipeError: 596 pass 597 except Exception as e: 598 warn(f"Failed to flush STDOUT:\n{traceback.format_exc()}") 599 try: 600 sys.stderr.flush() 601 except BrokenPipeError: 602 pass 603 except Exception as e: 604 warn(f"Failed to flush STDERR:\n{traceback.format_exc()}")
Flush any open subfiles.
607 def start_log_fd_interception(self): 608 """ 609 Start the file descriptor monitoring threads. 610 """ 611 if not self.write_timestamps: 612 return 613 614 self._stdout_interceptor = FileDescriptorInterceptor( 615 sys.stdout.fileno(), 616 self.get_timestamp_prefix_str, 617 ) 618 self._stderr_interceptor = FileDescriptorInterceptor( 619 sys.stderr.fileno(), 620 self.get_timestamp_prefix_str, 621 ) 622 623 self._stdout_interceptor_thread = Thread( 624 target = self._stdout_interceptor.start_interception, 625 daemon = True, 626 ) 627 self._stderr_interceptor_thread = Thread( 628 target = self._stderr_interceptor.start_interception, 629 daemon = True, 630 ) 631 self._stdout_interceptor_thread.start() 632 self._stderr_interceptor_thread.start() 633 self._intercepting = True 634 635 if '_interceptor_threads' not in self.__dict__: 636 self._interceptor_threads = [] 637 if '_interceptors' not in self.__dict__: 638 self._interceptors = [] 639 self._interceptor_threads.extend([ 640 self._stdout_interceptor_thread, 641 self._stderr_interceptor_thread, 642 ]) 643 self._interceptors.extend([ 644 self._stdout_interceptor, 645 self._stderr_interceptor, 646 ]) 647 self.stop_log_fd_interception(unused_only=True)
Start the file descriptor monitoring threads.
650 def stop_log_fd_interception(self, unused_only: bool = False): 651 """ 652 Stop the file descriptor monitoring threads. 653 """ 654 if not self.write_timestamps: 655 return 656 657 interceptors = self.__dict__.get('_interceptors', []) 658 interceptor_threads = self.__dict__.get('_interceptor_threads', []) 659 660 end_ix = len(interceptors) if not unused_only else -2 661 662 for interceptor in interceptors[:end_ix]: 663 interceptor.stop_interception() 664 del interceptors[:end_ix] 665 666 for thread in interceptor_threads[:end_ix]: 667 try: 668 thread.join() 669 except Exception as e: 670 warn(f"Failed to join interceptor threads:\n{traceback.format_exc()}") 671 del interceptor_threads[:end_ix]
Stop the file descriptor monitoring threads.