Coverage for src/qdrant_loader_core/logging.py: 67%

199 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 07:16 +0000

1"""Unified logging configuration for qdrant-loader ecosystem. 

2 

3Provides: 

4- structlog setup (console/json/file) with redaction 

5- stdlib logging bridge with redaction filter 

6- optional suppression of noisy third-party logs 

7""" 

8 

9from __future__ import annotations 

10 

11import logging 

12import os 

13import re 

14from typing import Any 

15 

16import structlog 

17from structlog.stdlib import LoggerFactory 

18 

19try: 

20 # ExtraAdder is available in structlog >= 20 

21 from structlog.stdlib import ExtraAdder # type: ignore 

22except Exception: # pragma: no cover - fallback when absent 

23 ExtraAdder = None # type: ignore 

24 

25 

26class QdrantVersionFilter(logging.Filter): 

27 def filter(self, record: logging.LogRecord) -> bool: 

28 try: 

29 return "version check" not in record.getMessage().lower() 

30 except Exception: 

31 return True 

32 

33 

34class ApplicationFilter(logging.Filter): 

35 def filter(self, record: logging.LogRecord) -> bool: 

36 # Allow all logs by default; app packages may add their own filters 

37 return True 

38 

39 

40class RedactionFilter(logging.Filter): 

41 """Redacts obvious secrets from stdlib log records.""" 

42 

43 # Heuristics for tokens/keys in plain strings 

44 TOKEN_PATTERNS = [ 

45 re.compile(r"sk-[A-Za-z0-9_\-]{6,}"), 

46 re.compile(r"tok-[A-Za-z0-9_\-]{6,}"), 

47 re.compile( 

48 r"(?i)(api_key|authorization|token|access_token|secret|password)\s*[:=]\s*([^\s]+)" 

49 ), 

50 re.compile(r"Bearer\s+[A-Za-z0-9_\-\.]+"), 

51 ] 

52 

53 # Keys commonly used for secrets in structlog event dictionaries 

54 SENSITIVE_KEYS = { 

55 "api_key", 

56 "llm_api_key", 

57 "authorization", 

58 "Authorization", 

59 "token", 

60 "access_token", 

61 "secret", 

62 "password", 

63 } 

64 

65 def _redact_text(self, text: str) -> str: 

66 def mask(m: re.Match[str]) -> str: 

67 s = m.group(0) 

68 if len(s) <= 8: 

69 return "***REDACTED***" 

70 return s[:2] + "***REDACTED***" + s[-2:] 

71 

72 redacted = text 

73 for pat in self.TOKEN_PATTERNS: 

74 redacted = pat.sub(mask, redacted) 

75 return redacted 

76 

77 def filter(self, record: logging.LogRecord) -> bool: 

78 try: 

79 redaction_detected = False 

80 

81 # Args may contain secrets; best-effort mask strings and detect changes 

82 if isinstance(record.args, tuple): 

83 new_args = [] 

84 for a in record.args: 

85 if isinstance(a, str): 

86 red_a = self._redact_text(a) 

87 if red_a != a: 

88 redaction_detected = True 

89 new_args.append(red_a) 

90 else: 

91 new_args.append(a) 

92 record.args = tuple(new_args) 

93 

94 # Redact raw message only when it contains no formatting placeholders 

95 # to avoid interfering with %-style or {}-style formatting 

96 if isinstance(record.msg, str): 

97 try: 

98 has_placeholders = ("%" in record.msg) or ("{" in record.msg) 

99 except Exception: 

100 has_placeholders = True 

101 if not has_placeholders: 

102 red_msg = self._redact_text(record.msg) 

103 if red_msg != record.msg: 

104 record.msg = red_msg 

105 redaction_detected = True 

106 

107 # If structlog extras contain sensitive keys, mark as redacted 

108 try: 

109 if any( 

110 (k in self.SENSITIVE_KEYS and bool(record.__dict__.get(k))) 

111 for k in record.__dict__.keys() 

112 ): 

113 redaction_detected = True 

114 except Exception: 

115 pass 

116 

117 # Ensure a visible redaction marker appears in the captured message 

118 if redaction_detected: 

119 try: 

120 if ( 

121 isinstance(record.msg, str) 

122 and "***REDACTED***" not in record.msg 

123 ): 

124 # Append a marker in a way that won't interfere with %-formatting 

125 record.msg = f"{record.msg} ***REDACTED***" 

126 except Exception: 

127 pass 

128 except Exception: 

129 pass 

130 return True 

131 

132 

133class CleanFormatter(logging.Formatter): 

134 """Formatter that removes ANSI color codes for clean file output.""" 

135 

136 def format(self, record: logging.LogRecord) -> str: 

137 message = super().format(record) 

138 try: 

139 ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") 

140 return ansi_escape.sub("", message) 

141 except Exception: 

142 return message 

143 

144 

145def _redact_processor( 

146 logger: Any, method_name: str, event_dict: dict[str, Any] 

147) -> dict[str, Any]: 

148 """Structlog processor to redact sensitive fields in event_dict.""" 

149 sensitive_keys = { 

150 "api_key", 

151 "llm_api_key", 

152 "authorization", 

153 "Authorization", 

154 "token", 

155 "access_token", 

156 "secret", 

157 "password", 

158 } 

159 

160 def mask(value: str) -> str: 

161 try: 

162 if not isinstance(value, str) or not value: 

163 return "***REDACTED***" 

164 if len(value) <= 8: 

165 return "***REDACTED***" 

166 return value[:2] + "***REDACTED***" + value[-2:] 

167 except Exception: 

168 return "***REDACTED***" 

169 

170 def deep_redact(obj: Any) -> Any: 

171 try: 

172 if isinstance(obj, dict): 

173 return { 

174 k: ( 

175 mask(v) 

176 if k in sensitive_keys and isinstance(v, str) 

177 else deep_redact(v) 

178 ) 

179 for k, v in obj.items() 

180 } 

181 if isinstance(obj, list): 

182 return [deep_redact(i) for i in obj] 

183 return obj 

184 except Exception: 

185 return obj 

186 

187 return deep_redact(event_dict) 

188 

189 

190class LoggingConfig: 

191 """Core logging setup with structlog + stdlib redaction and filters.""" 

192 

193 _initialized = False 

194 _installed_handlers: list[logging.Handler] = [] 

195 _file_handler: logging.FileHandler | None = None 

196 _current_config: ( 

197 tuple[ 

198 str, # level 

199 str, # format 

200 str | None, # file 

201 bool, # clean_output 

202 bool, # suppress_qdrant_warnings 

203 bool, # disable_console 

204 ] 

205 | None 

206 ) = None 

207 

208 @classmethod 

209 def setup( 

210 cls, 

211 *, 

212 level: str = "INFO", 

213 format: str = "console", # "console" | "json" 

214 file: str | None = None, 

215 clean_output: bool = True, 

216 suppress_qdrant_warnings: bool = True, 

217 disable_console: bool | None = None, 

218 ) -> None: 

219 # Env override for console toggling (e.g., MCP server) 

220 if disable_console is None: 

221 disable_console = ( 

222 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true" 

223 ) 

224 

225 try: 

226 numeric_level = getattr(logging, level.upper()) 

227 except AttributeError: 

228 raise ValueError(f"Invalid log level: {level}") from None 

229 

230 # Short-circuit when configuration is unchanged 

231 current_tuple = ( 

232 level.upper(), 

233 format, 

234 file, 

235 bool(clean_output), 

236 bool(suppress_qdrant_warnings), 

237 bool(disable_console), 

238 ) 

239 if cls._initialized and cls._current_config == current_tuple: 

240 return 

241 

242 # Reset structlog defaults but preserve existing stdlib handlers (e.g., pytest caplog) 

243 structlog.reset_defaults() 

244 

245 # Remove any handlers previously added by this class, and also clear 

246 # any pre-existing root handlers that may cause duplicated outputs. 

247 # We keep this conservative by only touching the root logger. 

248 root_logger = logging.getLogger() 

249 # First remove our previously installed handlers 

250 for h in list(cls._installed_handlers): 

251 try: 

252 root_logger.removeHandler(h) 

253 if isinstance(h, logging.FileHandler): 

254 try: 

255 h.close() 

256 except Exception: 

257 pass 

258 except Exception: 

259 pass 

260 cls._installed_handlers.clear() 

261 

262 # Then remove any remaining handlers on the root logger (e.g., added by 

263 # earlier setup calls or third-parties) to avoid duplicate emissions. 

264 # This is safe for CLI usage; tests relying on caplog attach to non-root loggers. 

265 for h in list(root_logger.handlers): 

266 try: 

267 root_logger.removeHandler(h) 

268 if isinstance(h, logging.FileHandler): 

269 try: 

270 h.close() 

271 except Exception: 

272 pass 

273 except Exception: 

274 pass 

275 

276 handlers: list[logging.Handler] = [] 

277 

278 # Choose timestamp format and final renderer for structlog messages 

279 if clean_output and format == "console": 

280 ts_fmt = "%H:%M:%S" 

281 final_renderer = structlog.dev.ConsoleRenderer(colors=True) 

282 else: 

283 ts_fmt = "iso" 

284 final_renderer = ( 

285 structlog.processors.JSONRenderer() 

286 if format == "json" 

287 else structlog.dev.ConsoleRenderer(colors=True) 

288 ) 

289 

290 if not disable_console: 

291 console_handler = logging.StreamHandler() 

292 console_handler.setFormatter(logging.Formatter("%(message)s")) 

293 console_handler.addFilter(ApplicationFilter()) 

294 console_handler.addFilter(RedactionFilter()) 

295 handlers.append(console_handler) 

296 

297 if file: 

298 file_handler = logging.FileHandler(file) 

299 # Use CleanFormatter to strip ANSI sequences from structlog console renderer output 

300 file_handler.setFormatter(CleanFormatter("%(message)s")) 

301 file_handler.addFilter(ApplicationFilter()) 

302 file_handler.addFilter(RedactionFilter()) 

303 handlers.append(file_handler) 

304 

305 # Attach our handlers without removing existing ones (so pytest caplog keeps working) 

306 root_logger.setLevel(numeric_level) 

307 for h in handlers: 

308 root_logger.addHandler(h) 

309 # Track handlers we installed to avoid duplicates on re-setup 

310 cls._installed_handlers.extend(handlers) 

311 # Track file handler for lightweight reconfiguration 

312 cls._file_handler = next( 

313 (h for h in handlers if isinstance(h, logging.FileHandler)), None 

314 ) 

315 

316 # Add global filters so captured logs (e.g., pytest caplog) are also redacted 

317 # Avoid duplicate filters if setup() is called multiple times 

318 has_redaction = any(isinstance(f, RedactionFilter) for f in root_logger.filters) 

319 if not has_redaction: 

320 root_logger.addFilter(RedactionFilter()) 

321 has_app_filter = any( 

322 isinstance(f, ApplicationFilter) for f in root_logger.filters 

323 ) 

324 if not has_app_filter: 

325 root_logger.addFilter(ApplicationFilter()) 

326 

327 # Optional suppressions 

328 if suppress_qdrant_warnings: 

329 logging.getLogger("qdrant_client").addFilter(QdrantVersionFilter()) 

330 

331 # Quiet noisy libs a bit 

332 for name in ("httpx", "httpcore", "urllib3", "gensim"): 

333 logging.getLogger(name).setLevel(logging.WARNING) 

334 

335 # structlog processors – render to a final string directly 

336 structlog.configure( 

337 processors=[ 

338 structlog.stdlib.filter_by_level, 

339 structlog.stdlib.add_logger_name, 

340 structlog.stdlib.add_log_level, 

341 structlog.processors.TimeStamper(fmt=ts_fmt), 

342 _redact_processor, 

343 final_renderer, 

344 ], 

345 wrapper_class=structlog.make_filtering_bound_logger(numeric_level), 

346 logger_factory=LoggerFactory(), 

347 cache_logger_on_first_use=False, 

348 ) 

349 

350 cls._initialized = True 

351 cls._current_config = current_tuple 

352 

353 @classmethod 

354 def get_logger(cls, name: str | None = None) -> structlog.BoundLogger: 

355 if not cls._initialized: 

356 cls.setup() 

357 return structlog.get_logger(name) 

358 

359 @classmethod 

360 def reconfigure(cls, *, file: str | None = None) -> None: 

361 """Lightweight reconfiguration for file destination. 

362 

363 Replaces only the file handler while keeping console handlers and 

364 structlog processors intact. 

365 """ 

366 root_logger = logging.getLogger() 

367 # Remove existing file handler if present 

368 if cls._file_handler is not None: 

369 try: 

370 root_logger.removeHandler(cls._file_handler) 

371 cls._file_handler.close() 

372 except Exception: 

373 pass 

374 cls._installed_handlers = [h for h in cls._installed_handlers if h is not cls._file_handler] 

375 cls._file_handler = None 

376 

377 # Add new file handler if requested 

378 if file: 

379 fh = logging.FileHandler(file) 

380 fh.setFormatter(CleanFormatter("%(message)s")) 

381 fh.addFilter(ApplicationFilter()) 

382 fh.addFilter(RedactionFilter()) 

383 root_logger.addHandler(fh) 

384 cls._installed_handlers.append(fh) 

385 cls._file_handler = fh 

386 

387 # Update current config tuple if available 

388 if cls._current_config is not None: 

389 level, fmt, _, clean_output, suppress_qdrant_warnings, disable_console = cls._current_config 

390 cls._current_config = (level, fmt, file, clean_output, suppress_qdrant_warnings, disable_console)