Coverage for src/qdrant_loader_core/logging.py: 67%
199 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 07:16 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-11 07:16 +0000
1"""Unified logging configuration for qdrant-loader ecosystem.
3Provides:
4- structlog setup (console/json/file) with redaction
5- stdlib logging bridge with redaction filter
6- optional suppression of noisy third-party logs
7"""
9from __future__ import annotations
11import logging
12import os
13import re
14from typing import Any
16import structlog
17from structlog.stdlib import LoggerFactory
19try:
20 # ExtraAdder is available in structlog >= 20
21 from structlog.stdlib import ExtraAdder # type: ignore
22except Exception: # pragma: no cover - fallback when absent
23 ExtraAdder = None # type: ignore
26class QdrantVersionFilter(logging.Filter):
27 def filter(self, record: logging.LogRecord) -> bool:
28 try:
29 return "version check" not in record.getMessage().lower()
30 except Exception:
31 return True
34class ApplicationFilter(logging.Filter):
35 def filter(self, record: logging.LogRecord) -> bool:
36 # Allow all logs by default; app packages may add their own filters
37 return True
40class RedactionFilter(logging.Filter):
41 """Redacts obvious secrets from stdlib log records."""
43 # Heuristics for tokens/keys in plain strings
44 TOKEN_PATTERNS = [
45 re.compile(r"sk-[A-Za-z0-9_\-]{6,}"),
46 re.compile(r"tok-[A-Za-z0-9_\-]{6,}"),
47 re.compile(
48 r"(?i)(api_key|authorization|token|access_token|secret|password)\s*[:=]\s*([^\s]+)"
49 ),
50 re.compile(r"Bearer\s+[A-Za-z0-9_\-\.]+"),
51 ]
53 # Keys commonly used for secrets in structlog event dictionaries
54 SENSITIVE_KEYS = {
55 "api_key",
56 "llm_api_key",
57 "authorization",
58 "Authorization",
59 "token",
60 "access_token",
61 "secret",
62 "password",
63 }
65 def _redact_text(self, text: str) -> str:
66 def mask(m: re.Match[str]) -> str:
67 s = m.group(0)
68 if len(s) <= 8:
69 return "***REDACTED***"
70 return s[:2] + "***REDACTED***" + s[-2:]
72 redacted = text
73 for pat in self.TOKEN_PATTERNS:
74 redacted = pat.sub(mask, redacted)
75 return redacted
77 def filter(self, record: logging.LogRecord) -> bool:
78 try:
79 redaction_detected = False
81 # Args may contain secrets; best-effort mask strings and detect changes
82 if isinstance(record.args, tuple):
83 new_args = []
84 for a in record.args:
85 if isinstance(a, str):
86 red_a = self._redact_text(a)
87 if red_a != a:
88 redaction_detected = True
89 new_args.append(red_a)
90 else:
91 new_args.append(a)
92 record.args = tuple(new_args)
94 # Redact raw message only when it contains no formatting placeholders
95 # to avoid interfering with %-style or {}-style formatting
96 if isinstance(record.msg, str):
97 try:
98 has_placeholders = ("%" in record.msg) or ("{" in record.msg)
99 except Exception:
100 has_placeholders = True
101 if not has_placeholders:
102 red_msg = self._redact_text(record.msg)
103 if red_msg != record.msg:
104 record.msg = red_msg
105 redaction_detected = True
107 # If structlog extras contain sensitive keys, mark as redacted
108 try:
109 if any(
110 (k in self.SENSITIVE_KEYS and bool(record.__dict__.get(k)))
111 for k in record.__dict__.keys()
112 ):
113 redaction_detected = True
114 except Exception:
115 pass
117 # Ensure a visible redaction marker appears in the captured message
118 if redaction_detected:
119 try:
120 if (
121 isinstance(record.msg, str)
122 and "***REDACTED***" not in record.msg
123 ):
124 # Append a marker in a way that won't interfere with %-formatting
125 record.msg = f"{record.msg} ***REDACTED***"
126 except Exception:
127 pass
128 except Exception:
129 pass
130 return True
133class CleanFormatter(logging.Formatter):
134 """Formatter that removes ANSI color codes for clean file output."""
136 def format(self, record: logging.LogRecord) -> str:
137 message = super().format(record)
138 try:
139 ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
140 return ansi_escape.sub("", message)
141 except Exception:
142 return message
145def _redact_processor(
146 logger: Any, method_name: str, event_dict: dict[str, Any]
147) -> dict[str, Any]:
148 """Structlog processor to redact sensitive fields in event_dict."""
149 sensitive_keys = {
150 "api_key",
151 "llm_api_key",
152 "authorization",
153 "Authorization",
154 "token",
155 "access_token",
156 "secret",
157 "password",
158 }
160 def mask(value: str) -> str:
161 try:
162 if not isinstance(value, str) or not value:
163 return "***REDACTED***"
164 if len(value) <= 8:
165 return "***REDACTED***"
166 return value[:2] + "***REDACTED***" + value[-2:]
167 except Exception:
168 return "***REDACTED***"
170 def deep_redact(obj: Any) -> Any:
171 try:
172 if isinstance(obj, dict):
173 return {
174 k: (
175 mask(v)
176 if k in sensitive_keys and isinstance(v, str)
177 else deep_redact(v)
178 )
179 for k, v in obj.items()
180 }
181 if isinstance(obj, list):
182 return [deep_redact(i) for i in obj]
183 return obj
184 except Exception:
185 return obj
187 return deep_redact(event_dict)
190class LoggingConfig:
191 """Core logging setup with structlog + stdlib redaction and filters."""
193 _initialized = False
194 _installed_handlers: list[logging.Handler] = []
195 _file_handler: logging.FileHandler | None = None
196 _current_config: (
197 tuple[
198 str, # level
199 str, # format
200 str | None, # file
201 bool, # clean_output
202 bool, # suppress_qdrant_warnings
203 bool, # disable_console
204 ]
205 | None
206 ) = None
208 @classmethod
209 def setup(
210 cls,
211 *,
212 level: str = "INFO",
213 format: str = "console", # "console" | "json"
214 file: str | None = None,
215 clean_output: bool = True,
216 suppress_qdrant_warnings: bool = True,
217 disable_console: bool | None = None,
218 ) -> None:
219 # Env override for console toggling (e.g., MCP server)
220 if disable_console is None:
221 disable_console = (
222 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true"
223 )
225 try:
226 numeric_level = getattr(logging, level.upper())
227 except AttributeError:
228 raise ValueError(f"Invalid log level: {level}") from None
230 # Short-circuit when configuration is unchanged
231 current_tuple = (
232 level.upper(),
233 format,
234 file,
235 bool(clean_output),
236 bool(suppress_qdrant_warnings),
237 bool(disable_console),
238 )
239 if cls._initialized and cls._current_config == current_tuple:
240 return
242 # Reset structlog defaults but preserve existing stdlib handlers (e.g., pytest caplog)
243 structlog.reset_defaults()
245 # Remove any handlers previously added by this class, and also clear
246 # any pre-existing root handlers that may cause duplicated outputs.
247 # We keep this conservative by only touching the root logger.
248 root_logger = logging.getLogger()
249 # First remove our previously installed handlers
250 for h in list(cls._installed_handlers):
251 try:
252 root_logger.removeHandler(h)
253 if isinstance(h, logging.FileHandler):
254 try:
255 h.close()
256 except Exception:
257 pass
258 except Exception:
259 pass
260 cls._installed_handlers.clear()
262 # Then remove any remaining handlers on the root logger (e.g., added by
263 # earlier setup calls or third-parties) to avoid duplicate emissions.
264 # This is safe for CLI usage; tests relying on caplog attach to non-root loggers.
265 for h in list(root_logger.handlers):
266 try:
267 root_logger.removeHandler(h)
268 if isinstance(h, logging.FileHandler):
269 try:
270 h.close()
271 except Exception:
272 pass
273 except Exception:
274 pass
276 handlers: list[logging.Handler] = []
278 # Choose timestamp format and final renderer for structlog messages
279 if clean_output and format == "console":
280 ts_fmt = "%H:%M:%S"
281 final_renderer = structlog.dev.ConsoleRenderer(colors=True)
282 else:
283 ts_fmt = "iso"
284 final_renderer = (
285 structlog.processors.JSONRenderer()
286 if format == "json"
287 else structlog.dev.ConsoleRenderer(colors=True)
288 )
290 if not disable_console:
291 console_handler = logging.StreamHandler()
292 console_handler.setFormatter(logging.Formatter("%(message)s"))
293 console_handler.addFilter(ApplicationFilter())
294 console_handler.addFilter(RedactionFilter())
295 handlers.append(console_handler)
297 if file:
298 file_handler = logging.FileHandler(file)
299 # Use CleanFormatter to strip ANSI sequences from structlog console renderer output
300 file_handler.setFormatter(CleanFormatter("%(message)s"))
301 file_handler.addFilter(ApplicationFilter())
302 file_handler.addFilter(RedactionFilter())
303 handlers.append(file_handler)
305 # Attach our handlers without removing existing ones (so pytest caplog keeps working)
306 root_logger.setLevel(numeric_level)
307 for h in handlers:
308 root_logger.addHandler(h)
309 # Track handlers we installed to avoid duplicates on re-setup
310 cls._installed_handlers.extend(handlers)
311 # Track file handler for lightweight reconfiguration
312 cls._file_handler = next(
313 (h for h in handlers if isinstance(h, logging.FileHandler)), None
314 )
316 # Add global filters so captured logs (e.g., pytest caplog) are also redacted
317 # Avoid duplicate filters if setup() is called multiple times
318 has_redaction = any(isinstance(f, RedactionFilter) for f in root_logger.filters)
319 if not has_redaction:
320 root_logger.addFilter(RedactionFilter())
321 has_app_filter = any(
322 isinstance(f, ApplicationFilter) for f in root_logger.filters
323 )
324 if not has_app_filter:
325 root_logger.addFilter(ApplicationFilter())
327 # Optional suppressions
328 if suppress_qdrant_warnings:
329 logging.getLogger("qdrant_client").addFilter(QdrantVersionFilter())
331 # Quiet noisy libs a bit
332 for name in ("httpx", "httpcore", "urllib3", "gensim"):
333 logging.getLogger(name).setLevel(logging.WARNING)
335 # structlog processors – render to a final string directly
336 structlog.configure(
337 processors=[
338 structlog.stdlib.filter_by_level,
339 structlog.stdlib.add_logger_name,
340 structlog.stdlib.add_log_level,
341 structlog.processors.TimeStamper(fmt=ts_fmt),
342 _redact_processor,
343 final_renderer,
344 ],
345 wrapper_class=structlog.make_filtering_bound_logger(numeric_level),
346 logger_factory=LoggerFactory(),
347 cache_logger_on_first_use=False,
348 )
350 cls._initialized = True
351 cls._current_config = current_tuple
353 @classmethod
354 def get_logger(cls, name: str | None = None) -> structlog.BoundLogger:
355 if not cls._initialized:
356 cls.setup()
357 return structlog.get_logger(name)
359 @classmethod
360 def reconfigure(cls, *, file: str | None = None) -> None:
361 """Lightweight reconfiguration for file destination.
363 Replaces only the file handler while keeping console handlers and
364 structlog processors intact.
365 """
366 root_logger = logging.getLogger()
367 # Remove existing file handler if present
368 if cls._file_handler is not None:
369 try:
370 root_logger.removeHandler(cls._file_handler)
371 cls._file_handler.close()
372 except Exception:
373 pass
374 cls._installed_handlers = [h for h in cls._installed_handlers if h is not cls._file_handler]
375 cls._file_handler = None
377 # Add new file handler if requested
378 if file:
379 fh = logging.FileHandler(file)
380 fh.setFormatter(CleanFormatter("%(message)s"))
381 fh.addFilter(ApplicationFilter())
382 fh.addFilter(RedactionFilter())
383 root_logger.addHandler(fh)
384 cls._installed_handlers.append(fh)
385 cls._file_handler = fh
387 # Update current config tuple if available
388 if cls._current_config is not None:
389 level, fmt, _, clean_output, suppress_qdrant_warnings, disable_console = cls._current_config
390 cls._current_config = (level, fmt, file, clean_output, suppress_qdrant_warnings, disable_console)