Coverage for src/qdrant_loader_core/logging.py: 79%

147 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:01 +0000

1"""Unified logging configuration for qdrant-loader ecosystem. 

2 

3Provides: 

4- structlog setup (console/json/file) with redaction 

5- stdlib logging bridge with redaction filter 

6- optional suppression of noisy third-party logs 

7""" 

8 

9from __future__ import annotations 

10 

11import logging 

12import os 

13import re 

14from typing import Any 

15 

16import structlog 

17from structlog.stdlib import LoggerFactory 

18 

19try: 

20 # ExtraAdder is available in structlog >= 20 

21 from structlog.stdlib import ExtraAdder # type: ignore 

22except Exception: # pragma: no cover - fallback when absent 

23 ExtraAdder = None # type: ignore 

24 

25 

26class QdrantVersionFilter(logging.Filter): 

27 def filter(self, record: logging.LogRecord) -> bool: 

28 try: 

29 return "version check" not in record.getMessage().lower() 

30 except Exception: 

31 return True 

32 

33 

34class ApplicationFilter(logging.Filter): 

35 def filter(self, record: logging.LogRecord) -> bool: 

36 # Allow all logs by default; app packages may add their own filters 

37 return True 

38 

39 

40class RedactionFilter(logging.Filter): 

41 """Redacts obvious secrets from stdlib log records.""" 

42 

43 # Heuristics for tokens/keys in plain strings 

44 TOKEN_PATTERNS = [ 

45 re.compile(r"sk-[A-Za-z0-9_\-]{6,}"), 

46 re.compile(r"tok-[A-Za-z0-9_\-]{6,}"), 

47 re.compile( 

48 r"(?i)(api_key|authorization|token|access_token|secret|password)\s*[:=]\s*([^\s]+)" 

49 ), 

50 re.compile(r"Bearer\s+[A-Za-z0-9_\-\.]+"), 

51 ] 

52 

53 # Keys commonly used for secrets in structlog event dictionaries 

54 SENSITIVE_KEYS = { 

55 "api_key", 

56 "llm_api_key", 

57 "authorization", 

58 "Authorization", 

59 "token", 

60 "access_token", 

61 "secret", 

62 "password", 

63 } 

64 

65 def _redact_text(self, text: str) -> str: 

66 def mask(m: re.Match[str]) -> str: 

67 s = m.group(0) 

68 if len(s) <= 8: 

69 return "***REDACTED***" 

70 return s[:2] + "***REDACTED***" + s[-2:] 

71 

72 redacted = text 

73 for pat in self.TOKEN_PATTERNS: 

74 redacted = pat.sub(mask, redacted) 

75 return redacted 

76 

77 def filter(self, record: logging.LogRecord) -> bool: 

78 try: 

79 redaction_detected = False 

80 

81 # Args may contain secrets; best-effort mask strings and detect changes 

82 if isinstance(record.args, tuple): 

83 new_args = [] 

84 for a in record.args: 

85 if isinstance(a, str): 

86 red_a = self._redact_text(a) 

87 if red_a != a: 

88 redaction_detected = True 

89 new_args.append(red_a) 

90 else: 

91 new_args.append(a) 

92 record.args = tuple(new_args) 

93 

94 # Redact raw message only when it contains no formatting placeholders 

95 # to avoid interfering with %-style or {}-style formatting 

96 if isinstance(record.msg, str): 

97 try: 

98 has_placeholders = ("%" in record.msg) or ("{" in record.msg) 

99 except Exception: 

100 has_placeholders = True 

101 if not has_placeholders: 

102 red_msg = self._redact_text(record.msg) 

103 if red_msg != record.msg: 

104 record.msg = red_msg 

105 redaction_detected = True 

106 

107 # If structlog extras contain sensitive keys, mark as redacted 

108 try: 

109 if any( 

110 (k in self.SENSITIVE_KEYS and bool(record.__dict__.get(k))) 

111 for k in record.__dict__.keys() 

112 ): 

113 redaction_detected = True 

114 except Exception: 

115 pass 

116 

117 # Ensure a visible redaction marker appears in the captured message 

118 if redaction_detected: 

119 try: 

120 if ( 

121 isinstance(record.msg, str) 

122 and "***REDACTED***" not in record.msg 

123 ): 

124 # Append a marker in a way that won't interfere with %-formatting 

125 record.msg = f"{record.msg} ***REDACTED***" 

126 except Exception: 

127 pass 

128 except Exception: 

129 pass 

130 return True 

131 

132 

133class CleanFormatter(logging.Formatter): 

134 """Formatter that removes ANSI color codes for clean file output.""" 

135 

136 def format(self, record: logging.LogRecord) -> str: 

137 message = super().format(record) 

138 try: 

139 ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") 

140 return ansi_escape.sub("", message) 

141 except Exception: 

142 return message 

143 

144 

145def _redact_processor( 

146 logger: Any, method_name: str, event_dict: dict[str, Any] 

147) -> dict[str, Any]: 

148 """Structlog processor to redact sensitive fields in event_dict.""" 

149 sensitive_keys = { 

150 "api_key", 

151 "llm_api_key", 

152 "authorization", 

153 "Authorization", 

154 "token", 

155 "access_token", 

156 "secret", 

157 "password", 

158 } 

159 

160 def mask(value: str) -> str: 

161 try: 

162 if not isinstance(value, str) or not value: 

163 return "***REDACTED***" 

164 if len(value) <= 8: 

165 return "***REDACTED***" 

166 return value[:2] + "***REDACTED***" + value[-2:] 

167 except Exception: 

168 return "***REDACTED***" 

169 

170 def deep_redact(obj: Any) -> Any: 

171 try: 

172 if isinstance(obj, dict): 

173 return { 

174 k: ( 

175 mask(v) 

176 if k in sensitive_keys and isinstance(v, str) 

177 else deep_redact(v) 

178 ) 

179 for k, v in obj.items() 

180 } 

181 if isinstance(obj, list): 

182 return [deep_redact(i) for i in obj] 

183 return obj 

184 except Exception: 

185 return obj 

186 

187 return deep_redact(event_dict) 

188 

189 

190class LoggingConfig: 

191 """Core logging setup with structlog + stdlib redaction and filters.""" 

192 

193 _initialized = False 

194 

195 @classmethod 

196 def setup( 

197 cls, 

198 *, 

199 level: str = "INFO", 

200 format: str = "console", # "console" | "json" 

201 file: str | None = None, 

202 clean_output: bool = True, 

203 suppress_qdrant_warnings: bool = True, 

204 disable_console: bool | None = None, 

205 ) -> None: 

206 # Env override for console toggling (e.g., MCP server) 

207 if disable_console is None: 

208 disable_console = ( 

209 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true" 

210 ) 

211 

212 try: 

213 numeric_level = getattr(logging, level.upper()) 

214 except AttributeError: 

215 raise ValueError(f"Invalid log level: {level}") from None 

216 

217 # Reset structlog defaults but preserve existing stdlib handlers (e.g., pytest caplog) 

218 structlog.reset_defaults() 

219 

220 handlers: list[logging.Handler] = [] 

221 

222 # Choose timestamp format and final renderer for structlog messages 

223 if clean_output and format == "console": 

224 ts_fmt = "%H:%M:%S" 

225 final_renderer = structlog.dev.ConsoleRenderer(colors=True) 

226 else: 

227 ts_fmt = "iso" 

228 final_renderer = ( 

229 structlog.processors.JSONRenderer() 

230 if format == "json" 

231 else structlog.dev.ConsoleRenderer(colors=True) 

232 ) 

233 

234 if not disable_console: 

235 console_handler = logging.StreamHandler() 

236 console_handler.setFormatter(logging.Formatter("%(message)s")) 

237 console_handler.addFilter(ApplicationFilter()) 

238 console_handler.addFilter(RedactionFilter()) 

239 handlers.append(console_handler) 

240 

241 if file: 

242 file_handler = logging.FileHandler(file) 

243 # Use CleanFormatter to strip ANSI sequences from structlog console renderer output 

244 file_handler.setFormatter(CleanFormatter("%(message)s")) 

245 file_handler.addFilter(ApplicationFilter()) 

246 file_handler.addFilter(RedactionFilter()) 

247 handlers.append(file_handler) 

248 

249 # Attach our handlers without removing existing ones (so pytest caplog keeps working) 

250 root_logger = logging.getLogger() 

251 root_logger.setLevel(numeric_level) 

252 for h in handlers: 

253 root_logger.addHandler(h) 

254 

255 # Add global filters so captured logs (e.g., pytest caplog) are also redacted 

256 # Avoid duplicate filters if setup() is called multiple times 

257 has_redaction = any(isinstance(f, RedactionFilter) for f in root_logger.filters) 

258 if not has_redaction: 

259 root_logger.addFilter(RedactionFilter()) 

260 has_app_filter = any( 

261 isinstance(f, ApplicationFilter) for f in root_logger.filters 

262 ) 

263 if not has_app_filter: 

264 root_logger.addFilter(ApplicationFilter()) 

265 

266 # Optional suppressions 

267 if suppress_qdrant_warnings: 

268 logging.getLogger("qdrant_client").addFilter(QdrantVersionFilter()) 

269 

270 # Quiet noisy libs a bit 

271 for name in ("httpx", "httpcore", "urllib3", "gensim"): 

272 logging.getLogger(name).setLevel(logging.WARNING) 

273 

274 # structlog processors – render to a final string directly 

275 structlog.configure( 

276 processors=[ 

277 structlog.stdlib.filter_by_level, 

278 structlog.stdlib.add_logger_name, 

279 structlog.stdlib.add_log_level, 

280 structlog.processors.TimeStamper(fmt=ts_fmt), 

281 _redact_processor, 

282 final_renderer, 

283 ], 

284 wrapper_class=structlog.make_filtering_bound_logger(numeric_level), 

285 logger_factory=LoggerFactory(), 

286 cache_logger_on_first_use=False, 

287 ) 

288 

289 cls._initialized = True 

290 

291 @classmethod 

292 def get_logger(cls, name: str | None = None) -> structlog.BoundLogger: 

293 if not cls._initialized: 

294 cls.setup() 

295 return structlog.get_logger(name)