Coverage for src / qdrant_loader / utils / logging.py: 38%

245 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Centralized logging configuration for the application.""" 

2 

3import logging 

4import re 

5 

6import structlog 

7 

8from qdrant_loader.utils.sensitive import redact_sensitive_data 

9 

10 

11class QdrantVersionFilter(logging.Filter): 

12 """Filter to suppress Qdrant version check warnings.""" 

13 

14 def filter(self, record): 

15 return "version check" not in record.getMessage().lower() 

16 

17 

18class ApplicationFilter(logging.Filter): 

19 """Filter to only show logs from our application.""" 

20 

21 def filter(self, record): 

22 # Only show logs from our application 

23 return not record.name.startswith(("httpx", "httpcore", "urllib3")) 

24 

25 

26class SQLiteFilter(logging.Filter): 

27 """Filter to suppress verbose SQLite operation logs.""" 

28 

29 def filter(self, record): 

30 # Suppress verbose SQLite debug logs 

31 message = record.getMessage() 

32 sqlite_patterns = [ 

33 "executing functools.partial(<built-in method", 

34 "operation functools.partial(<built-in method", 

35 "executing <function connect.", 

36 "operation <function connect.", 

37 "returning exception", 

38 ] 

39 return not any(pattern in message for pattern in sqlite_patterns) 

40 

41 

42class VerbosityFilter(logging.Filter): 

43 """Filter to reduce verbosity of debug messages.""" 

44 

45 def filter(self, record): 

46 # Only filter DEBUG level messages 

47 if record.levelno != logging.DEBUG: 

48 return True 

49 

50 # Suppress overly verbose debug messages 

51 message = record.getMessage() 

52 verbose_patterns = [ 

53 "HTTP Request:", 

54 "Response status:", 

55 "Request headers:", 

56 "Response headers:", 

57 # PDF parsing debug messages 

58 "seek:", 

59 "nexttoken:", 

60 "do_keyword:", 

61 "nextobject:", 

62 "add_results:", 

63 "register:", 

64 "getobj:", 

65 "get_unichr:", 

66 "exec:", 

67 # Character encoding detection 

68 "confidence =", 

69 "prober hit error", 

70 "not active", 

71 # File processing verbosity 

72 "checking if file should be processed", 

73 "current configuration", 

74 "checking file extension", 

75 "file type detection", 

76 "file supported via", 

77 "starting metadata extraction", 

78 "completed metadata extraction", 

79 "document metadata:", 

80 # HTTP client debug 

81 "connect_tcp.started", 

82 "connect_tcp.complete", 

83 "send_request_headers", 

84 "send_request_body", 

85 "receive_response_headers", 

86 "receive_response_body", 

87 "response_closed", 

88 ] 

89 return not any(pattern in message for pattern in verbose_patterns) 

90 

91 

92class WindowsSafeConsoleHandler(logging.StreamHandler): 

93 """Custom console handler that handles Windows encoding issues.""" 

94 

95 # ANSI escape sequence pattern 

96 ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") 

97 

98 def emit(self, record): 

99 """Emit a record, handling Windows console encoding issues.""" 

100 try: 

101 # Check if stream is still open before writing 

102 if hasattr(self.stream, "closed") and self.stream.closed: 

103 return # Skip logging if stream is closed 

104 

105 # Get the formatted message 

106 msg = self.format(record) 

107 

108 # For Windows, handle Unicode characters proactively 

109 import sys 

110 

111 if sys.platform == "win32": 

112 # Replace problematic Unicode characters with safe alternatives 

113 safe_msg = ( 

114 msg.replace("🚀", "[ROCKET]") 

115 .replace("📄", "[DOCUMENT]") 

116 .replace("✅", "[CHECK]") 

117 .replace("❌", "[CROSS]") 

118 .replace("⚠️", "[WARNING]") 

119 .replace("🔍", "[SEARCH]") 

120 .replace("💾", "[SAVE]") 

121 .replace("🔧", "[TOOL]") 

122 .replace("📊", "[CHART]") 

123 .replace("🎯", "[TARGET]") 

124 .replace("⚙️", "[GEAR]") 

125 .replace("⏱️", "[TIMER]") 

126 .replace("⏭️", "[NEXT]") 

127 .replace("🏗️", "[CONSTRUCTION]") 

128 .replace("1️⃣", "[1]") 

129 .replace("2️⃣", "[2]") 

130 .replace("3️⃣", "[3]") 

131 .replace("4️⃣", "[4]") 

132 ) 

133 try: 

134 self.stream.write(safe_msg + self.terminator) 

135 except UnicodeEncodeError: 

136 # Final fallback: encode with ASCII and replace all unsupported chars 

137 ascii_msg = safe_msg.encode("ascii", errors="replace").decode( 

138 "ascii" 

139 ) 

140 self.stream.write(ascii_msg + self.terminator) 

141 else: 

142 # For non-Windows, use standard handling 

143 self.stream.write(msg + self.terminator) 

144 

145 self.flush() 

146 except (ValueError, OSError): 

147 # Stream is closed or unavailable, skip silently 

148 pass 

149 except Exception: 

150 self.handleError(record) 

151 

152 

153class CleanFileHandler(logging.FileHandler): 

154 """Custom file handler that strips ANSI color codes from log messages.""" 

155 

156 # ANSI escape sequence pattern 

157 ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") 

158 

159 def emit(self, record): 

160 """Emit a record, stripping ANSI codes from the message.""" 

161 try: 

162 # Check if stream is still open before writing 

163 if hasattr(self.stream, "closed") and self.stream.closed: 

164 return # Skip logging if stream is closed 

165 

166 # Get the formatted message 

167 msg = self.format(record) 

168 # Strip ANSI escape sequences 

169 clean_msg = self.ANSI_ESCAPE.sub("", msg) 

170 

171 # Handle Windows console encoding issues more robustly 

172 stream = self.stream 

173 try: 

174 stream.write(clean_msg + self.terminator) 

175 except UnicodeEncodeError: 

176 # Fallback for Windows console encoding issues 

177 # Replace problematic Unicode characters with safe alternatives 

178 import sys 

179 

180 if sys.platform == "win32": 

181 # Replace common emoji/Unicode characters with ASCII equivalents 

182 safe_msg = ( 

183 clean_msg.replace("🚀", "[ROCKET]") 

184 .replace("📄", "[DOCUMENT]") 

185 .replace("✅", "[CHECK]") 

186 .replace("❌", "[CROSS]") 

187 .replace("⚠️", "[WARNING]") 

188 .replace("🔍", "[SEARCH]") 

189 .replace("💾", "[SAVE]") 

190 .replace("🔧", "[TOOL]") 

191 .replace("📊", "[CHART]") 

192 .replace("🎯", "[TARGET]") 

193 .replace("⚙️", "[GEAR]") 

194 .replace("⏱️", "[TIMER]") 

195 .replace("⏭️", "[NEXT]") 

196 .replace("🏗️", "[CONSTRUCTION]") 

197 .replace("1️⃣", "[1]") 

198 .replace("2️⃣", "[2]") 

199 .replace("3️⃣", "[3]") 

200 .replace("4️⃣", "[4]") 

201 ) 

202 try: 

203 stream.write(safe_msg + self.terminator) 

204 except UnicodeEncodeError: 

205 # Final fallback: encode with ASCII and replace all unsupported chars 

206 ascii_msg = safe_msg.encode("ascii", errors="replace").decode( 

207 "ascii" 

208 ) 

209 stream.write(ascii_msg + self.terminator) 

210 else: 

211 # For non-Windows, try UTF-8 encoding with replacement 

212 safe_msg = clean_msg.encode("utf-8", errors="replace").decode( 

213 "utf-8", errors="replace" 

214 ) 

215 stream.write(safe_msg + self.terminator) 

216 self.flush() 

217 except (ValueError, OSError): 

218 # Stream is closed or unavailable, skip silently 

219 pass 

220 except Exception: 

221 self.handleError(record) 

222 

223 

224class CleanFormatter(logging.Formatter): 

225 """Custom formatter that shows only the message for INFO level logs.""" 

226 

227 def __init__(self, use_custom_renderer=False): 

228 super().__init__() 

229 self.use_custom_renderer = use_custom_renderer 

230 

231 def format(self, record): 

232 message = record.getMessage() 

233 

234 # If we're using the custom renderer, just return the message as-is 

235 # since the CustomConsoleRenderer already handled the formatting 

236 if self.use_custom_renderer: 

237 return message 

238 

239 # For INFO level, just show the message 

240 if record.levelno == logging.INFO: 

241 return message 

242 else: 

243 # For other levels, we need to reorder timestamp and level 

244 # Check if message starts with a timestamp (HH:MM:SS format) 

245 # The message might contain ANSI color codes, so we need to account for that 

246 time_pattern = ( 

247 r"^(?:\x1b\[[0-9;]*m)?(\d{2}:\d{2}:\d{2})(?:\x1b\[[0-9;]*m)?\s+(.*)" 

248 ) 

249 match = re.match(time_pattern, message) 

250 

251 if match: 

252 timestamp = match.group(1) 

253 rest_of_message = match.group(2) 

254 

255 # Check if the rest_of_message already contains a level tag 

256 # This can happen when CustomConsoleRenderer already formatted it 

257 level_in_message_pattern = ( 

258 r"^\[(?:DEBUG|INFO|WARNING|ERROR|CRITICAL)\]\s" 

259 ) 

260 if re.match(level_in_message_pattern, rest_of_message): 

261 # Level already present, don't add another one 

262 return f"{timestamp} {rest_of_message}" 

263 else: 

264 # Add level tag 

265 return f"{timestamp} [{record.levelname}] {rest_of_message}" 

266 else: 

267 # No timestamp found 

268 # Check if message already has a level tag at the beginning 

269 level_in_message_pattern = ( 

270 r"^\[(?:DEBUG|INFO|WARNING|ERROR|CRITICAL)\]\s" 

271 ) 

272 if re.match(level_in_message_pattern, message): 

273 # Level already present, don't add another one 

274 return message 

275 else: 

276 # Add level tag 

277 return f"[{record.levelname}] {message}" 

278 

279 

280class FileRenderer: 

281 """Custom renderer for file output without timestamps (FileFormatter will add them).""" 

282 

283 def __call__(self, logger, method_name, event_dict): 

284 # Extract the main message 

285 event = event_dict.pop("event", "") 

286 

287 # Format additional key-value pairs 

288 if event_dict: 

289 extras = " ".join(f"{k}={v}" for k, v in event_dict.items()) 

290 return f"{event} {extras}".strip() 

291 else: 

292 return event 

293 

294 

295class FileFormatter(logging.Formatter): 

296 """Custom formatter for file output that provides clean, readable logs without ANSI codes.""" 

297 

298 # ANSI escape sequence pattern 

299 ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") 

300 

301 def format(self, record): 

302 # Get the timestamp 

303 timestamp = self.formatTime(record, "%Y-%m-%d %H:%M:%S") 

304 

305 # Get the level name 

306 level = record.levelname 

307 

308 # Get the message (this will be the already formatted structlog message) 

309 message = record.getMessage() 

310 

311 # First, strip ANSI escape sequences 

312 clean_message = self.ANSI_ESCAPE.sub("", message) 

313 

314 # Now check if the clean message starts with a timestamp and remove it 

315 # Pattern for structlog output: "HH:MM:SS message content" 

316 time_pattern = r"^\d{2}:\d{2}:\d{2}\s+" 

317 if re.match(time_pattern, clean_message): 

318 # Remove the structlog timestamp since we're adding our own 

319 clean_message = re.sub(time_pattern, "", clean_message) 

320 

321 # Check if the message already contains a level tag to avoid duplication 

322 level_in_message_pattern = r"^\[(?:DEBUG|INFO|WARNING|ERROR|CRITICAL)\]\s" 

323 has_level_tag = re.match(level_in_message_pattern, clean_message) 

324 

325 # Format based on log level 

326 if record.levelno == logging.INFO: 

327 # For INFO level, use a clean format: timestamp | message 

328 return f"{timestamp} | {clean_message}" 

329 else: 

330 # For other levels, include the level only if not already present 

331 if has_level_tag: 

332 return f"{timestamp} | {clean_message}" 

333 else: 

334 return f"{timestamp} | [{level}] {clean_message}" 

335 

336 

337class CustomConsoleRenderer: 

338 """Custom console renderer that formats timestamp and level correctly.""" 

339 

340 def __init__(self, colors=True): 

341 self.colors = colors 

342 self._console_renderer = structlog.dev.ConsoleRenderer(colors=colors) 

343 # ANSI color codes 

344 self.gray = "\033[90m" if colors else "" 

345 self.green = "\033[92m" if colors else "" # Bright green for INFO 

346 self.yellow = "\033[93m" if colors else "" # Bright yellow for WARNING 

347 self.red = "\033[91m" if colors else "" # Bright red for ERROR 

348 self.magenta = "\033[95m" if colors else "" # Bright magenta for CRITICAL 

349 self.cyan = "\033[96m" if colors else "" # Bright cyan for DEBUG 

350 self.reset = "\033[0m" if colors else "" 

351 

352 def _get_level_color(self, level): 

353 """Get the appropriate color for a log level.""" 

354 level_colors = { 

355 "DEBUG": self.cyan, 

356 "INFO": self.green, # Green for INFO 

357 "WARNING": self.yellow, 

358 "ERROR": self.red, 

359 "CRITICAL": self.magenta, 

360 } 

361 return level_colors.get(level, "") 

362 

363 def __call__(self, logger, method_name, event_dict): 

364 # Extract timestamp if present 

365 timestamp = event_dict.pop("timestamp", None) 

366 

367 # Get the level from method_name 

368 level = method_name.upper() 

369 

370 # Use the default console renderer to format the rest 

371 formatted = self._console_renderer(logger, method_name, event_dict) 

372 

373 # If we have a timestamp 

374 if timestamp and isinstance(timestamp, str) and len(timestamp) >= 8: 

375 time_part = timestamp[:8] # Get HH:MM:SS part 

376 

377 # Remove the timestamp from the formatted message if it's there 

378 if formatted.startswith(time_part): 

379 formatted = formatted[len(time_part) :].lstrip() 

380 

381 # Add gray color to timestamp 

382 colored_timestamp = f"{self.gray}{time_part}{self.reset}" 

383 

384 # Get colored level for all levels including INFO 

385 level_color = self._get_level_color(level) 

386 colored_level = ( 

387 f"{level_color}[{level}]{self.reset}" if level_color else f"[{level}]" 

388 ) 

389 

390 # Show timestamp, colored level, and message for all levels 

391 return f"{colored_timestamp} {colored_level} {formatted}" 

392 

393 # Fallback if no timestamp 

394 level_color = self._get_level_color(level) 

395 colored_level = ( 

396 f"{level_color}[{level}]{self.reset}" if level_color else f"[{level}]" 

397 ) 

398 return f"{colored_level} {formatted}" 

399 

400 

401class LoggingConfig: 

402 """Centralized logging configuration.""" 

403 

404 _initialized = False 

405 _current_config = None 

406 

407 @classmethod 

408 def setup( 

409 cls, 

410 level: str = "INFO", 

411 format: str = "console", 

412 file: str | None = None, 

413 suppress_qdrant_warnings: bool = True, 

414 clean_output: bool = True, 

415 ) -> None: 

416 """Setup logging configuration. 

417 

418 Args: 

419 level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) 

420 format: Log format (json or console) 

421 file: Path to log file (optional) 

422 suppress_qdrant_warnings: Whether to suppress Qdrant version check warnings 

423 clean_output: Whether to use clean, less verbose output 

424 """ 

425 try: 

426 # Convert string level to logging level 

427 numeric_level = getattr(logging, level.upper()) 

428 except AttributeError: 

429 raise ValueError(f"Invalid log level: {level}") from None 

430 

431 # Reset logging configuration 

432 logging.getLogger().handlers = [] 

433 structlog.reset_defaults() 

434 

435 # Create a list of handlers 

436 handlers = [] 

437 

438 # Add console handler with Windows encoding support 

439 

440 # Use Windows-safe console handler for all platforms 

441 console_handler = WindowsSafeConsoleHandler() 

442 

443 if clean_output and format == "console": 

444 # Use clean formatter for console output 

445 console_handler.setFormatter(CleanFormatter(use_custom_renderer=True)) 

446 else: 

447 console_handler.setFormatter(logging.Formatter("%(message)s")) 

448 

449 console_handler.addFilter(ApplicationFilter()) # Only show our application logs 

450 console_handler.addFilter(SQLiteFilter()) # Suppress verbose SQLite logs 

451 

452 if clean_output: 

453 console_handler.addFilter(VerbosityFilter()) # Reduce verbosity 

454 

455 handlers.append(console_handler) 

456 

457 # Add file handler if file is configured 

458 if file: 

459 file_handler = CleanFileHandler(file) 

460 file_handler.setFormatter(FileFormatter()) 

461 file_handler.addFilter( 

462 SQLiteFilter() 

463 ) # Suppress verbose SQLite logs in files too 

464 # Don't apply verbosity filter to file logs - keep everything for debugging 

465 handlers.append(file_handler) 

466 

467 # Configure standard logging 

468 logging.basicConfig( 

469 level=numeric_level, 

470 format="%(message)s", 

471 handlers=handlers, 

472 ) 

473 

474 # Apply SQLite filter to all existing loggers 

475 root_logger = logging.getLogger() 

476 for handler in root_logger.handlers: 

477 handler.addFilter(SQLiteFilter()) 

478 

479 # Add filter to suppress Qdrant version check warnings 

480 if suppress_qdrant_warnings: 

481 qdrant_logger = logging.getLogger("qdrant_client") 

482 qdrant_logger.addFilter(QdrantVersionFilter()) 

483 

484 # Suppress verbose SQLAlchemy and SQLite logs 

485 sqlalchemy_loggers = [ 

486 "sqlalchemy.engine", 

487 "sqlalchemy.dialects", 

488 "sqlalchemy.pool", 

489 "aiosqlite", 

490 "sqlite3", 

491 ] 

492 

493 for logger_name in sqlalchemy_loggers: 

494 logger = logging.getLogger(logger_name) 

495 logger.addFilter(SQLiteFilter()) 

496 logger.setLevel(logging.WARNING) # Only show warnings and errors 

497 

498 # Suppress verbose third-party library debug logs 

499 noisy_loggers = [ 

500 "chardet", # Character encoding detection 

501 "chardet.charsetprober", 

502 "chardet.latin1prober", 

503 "chardet.mbcharsetprober", 

504 "chardet.sbcharsetprober", 

505 "chardet.utf8prober", 

506 "pdfminer", # PDF parsing 

507 "pdfminer.pdfparser", 

508 "pdfminer.pdfdocument", 

509 "pdfminer.pdfinterp", 

510 "pdfminer.converter", 

511 "pdfplumber", # PDF processing 

512 "markitdown", # File conversion 

513 "httpcore", # HTTP client debug logs 

514 "httpx", # HTTP client debug logs 

515 "gensim", # Topic modeling library 

516 "gensim.models", # LDA model training logs 

517 "gensim.models.ldamodel", # Specific LDA training logs 

518 "gensim.corpora", # Dictionary and corpus logs 

519 ] 

520 

521 for logger_name in noisy_loggers: 

522 logger = logging.getLogger(logger_name) 

523 logger.setLevel(logging.WARNING) # Only show warnings and errors 

524 

525 # Configure structlog processors based on format and clean_output 

526 # Redaction processor to mask sensitive fields in event_dict 

527 def _redact_sensitive(logger, method_name, event_dict): # type: ignore[no-redef] 

528 sensitive_keys = { 

529 "api_key", 

530 "llm_api_key", 

531 "authorization", 

532 "Authorization", 

533 "token", 

534 "access_token", 

535 "secret", 

536 "password", 

537 } 

538 sensitive_key_pattern = re.compile( 

539 r"(?i)(?:^|[_-])(token|secret|password|api[_-]?key|access[_-]?key|private[_-]?key|authorization)(?:$|[_-])" 

540 ) 

541 

542 def _is_sensitive_key(key: object) -> bool: 

543 if not isinstance(key, str): 

544 return False 

545 return key in sensitive_keys or bool(sensitive_key_pattern.search(key)) 

546 

547 def _mask(value: str) -> str: 

548 try: 

549 if not isinstance(value, str) or not value: 

550 return "***REDACTED***" 

551 # Preserve small hint while masking majority 

552 if len(value) <= 8: 

553 return "***REDACTED***" 

554 return value[:2] + "***REDACTED***" + value[-2:] 

555 except Exception: 

556 return "***REDACTED***" 

557 

558 def _sanitize_string(value: str) -> str: 

559 try: 

560 return redact_sensitive_data(value, mask="***REDACTED***") 

561 except Exception: 

562 return "***REDACTED***" 

563 

564 def _deep_redact(obj): 

565 try: 

566 if isinstance(obj, dict): 

567 red = {} 

568 for k, v in obj.items(): 

569 if _is_sensitive_key(k): 

570 red[k] = ( 

571 _mask(v) if isinstance(v, str) else "***REDACTED***" 

572 ) 

573 else: 

574 red[k] = _deep_redact(v) 

575 return red 

576 if isinstance(obj, list): 

577 return [_deep_redact(i) for i in obj] 

578 if isinstance(obj, str): 

579 return _sanitize_string(obj) 

580 return obj 

581 except Exception: 

582 return obj 

583 

584 return _deep_redact(event_dict) 

585 

586 if clean_output and format == "console": 

587 # Minimal processors for clean output 

588 processors = [ 

589 structlog.stdlib.filter_by_level, 

590 structlog.processors.TimeStamper(fmt="%H:%M:%S"), 

591 _redact_sensitive, 

592 CustomConsoleRenderer(colors=True), 

593 ] 

594 else: 

595 # Full processors for detailed output 

596 processors = [ 

597 structlog.stdlib.filter_by_level, 

598 structlog.stdlib.add_logger_name, 

599 structlog.stdlib.add_log_level, 

600 structlog.processors.TimeStamper(fmt="iso"), 

601 structlog.processors.StackInfoRenderer(), 

602 structlog.processors.UnicodeDecoder(), 

603 structlog.processors.CallsiteParameterAdder( 

604 [ 

605 structlog.processors.CallsiteParameter.FILENAME, 

606 structlog.processors.CallsiteParameter.FUNC_NAME, 

607 structlog.processors.CallsiteParameter.LINENO, 

608 ] 

609 ), 

610 _redact_sensitive, 

611 ] 

612 

613 if format == "json": 

614 processors.append(structlog.processors.JSONRenderer()) 

615 else: 

616 processors.append(structlog.dev.ConsoleRenderer(colors=True)) 

617 

618 # Configure structlog 

619 structlog.configure( 

620 processors=processors, 

621 wrapper_class=structlog.make_filtering_bound_logger(numeric_level), 

622 logger_factory=structlog.stdlib.LoggerFactory(), 

623 cache_logger_on_first_use=False, # Disable caching to ensure new configuration is used 

624 ) 

625 

626 cls._initialized = True 

627 cls._current_config = ( 

628 level, 

629 format, 

630 file, 

631 suppress_qdrant_warnings, 

632 clean_output, 

633 ) 

634 

635 @classmethod 

636 def get_logger(cls, name: str | None = None) -> structlog.BoundLogger: 

637 """Get a logger instance. 

638 

639 Args: 

640 name: Logger name. If None, will use the calling module's name. 

641 

642 Returns: 

643 structlog.BoundLogger: Logger instance 

644 """ 

645 if not cls._initialized: 

646 # Initialize with default settings if not already initialized 

647 cls.setup() 

648 return structlog.get_logger(name) 

649 

650 

651# Standardize on core logging configuration 

652try: 

653 from qdrant_loader_core.logging import ( 

654 LoggingConfig as _CoreLoggingConfig, # type: ignore 

655 ) 

656 

657 LoggingConfig = _CoreLoggingConfig # type: ignore[assignment] 

658except Exception: 

659 # Fallback to local implementation if core is unavailable 

660 pass