Coverage for src/qdrant_loader_core/logging.py: 79%
147 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:01 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:01 +0000
1"""Unified logging configuration for qdrant-loader ecosystem.
3Provides:
4- structlog setup (console/json/file) with redaction
5- stdlib logging bridge with redaction filter
6- optional suppression of noisy third-party logs
7"""
9from __future__ import annotations
11import logging
12import os
13import re
14from typing import Any
16import structlog
17from structlog.stdlib import LoggerFactory
19try:
20 # ExtraAdder is available in structlog >= 20
21 from structlog.stdlib import ExtraAdder # type: ignore
22except Exception: # pragma: no cover - fallback when absent
23 ExtraAdder = None # type: ignore
26class QdrantVersionFilter(logging.Filter):
27 def filter(self, record: logging.LogRecord) -> bool:
28 try:
29 return "version check" not in record.getMessage().lower()
30 except Exception:
31 return True
34class ApplicationFilter(logging.Filter):
35 def filter(self, record: logging.LogRecord) -> bool:
36 # Allow all logs by default; app packages may add their own filters
37 return True
40class RedactionFilter(logging.Filter):
41 """Redacts obvious secrets from stdlib log records."""
43 # Heuristics for tokens/keys in plain strings
44 TOKEN_PATTERNS = [
45 re.compile(r"sk-[A-Za-z0-9_\-]{6,}"),
46 re.compile(r"tok-[A-Za-z0-9_\-]{6,}"),
47 re.compile(
48 r"(?i)(api_key|authorization|token|access_token|secret|password)\s*[:=]\s*([^\s]+)"
49 ),
50 re.compile(r"Bearer\s+[A-Za-z0-9_\-\.]+"),
51 ]
53 # Keys commonly used for secrets in structlog event dictionaries
54 SENSITIVE_KEYS = {
55 "api_key",
56 "llm_api_key",
57 "authorization",
58 "Authorization",
59 "token",
60 "access_token",
61 "secret",
62 "password",
63 }
65 def _redact_text(self, text: str) -> str:
66 def mask(m: re.Match[str]) -> str:
67 s = m.group(0)
68 if len(s) <= 8:
69 return "***REDACTED***"
70 return s[:2] + "***REDACTED***" + s[-2:]
72 redacted = text
73 for pat in self.TOKEN_PATTERNS:
74 redacted = pat.sub(mask, redacted)
75 return redacted
77 def filter(self, record: logging.LogRecord) -> bool:
78 try:
79 redaction_detected = False
81 # Args may contain secrets; best-effort mask strings and detect changes
82 if isinstance(record.args, tuple):
83 new_args = []
84 for a in record.args:
85 if isinstance(a, str):
86 red_a = self._redact_text(a)
87 if red_a != a:
88 redaction_detected = True
89 new_args.append(red_a)
90 else:
91 new_args.append(a)
92 record.args = tuple(new_args)
94 # Redact raw message only when it contains no formatting placeholders
95 # to avoid interfering with %-style or {}-style formatting
96 if isinstance(record.msg, str):
97 try:
98 has_placeholders = ("%" in record.msg) or ("{" in record.msg)
99 except Exception:
100 has_placeholders = True
101 if not has_placeholders:
102 red_msg = self._redact_text(record.msg)
103 if red_msg != record.msg:
104 record.msg = red_msg
105 redaction_detected = True
107 # If structlog extras contain sensitive keys, mark as redacted
108 try:
109 if any(
110 (k in self.SENSITIVE_KEYS and bool(record.__dict__.get(k)))
111 for k in record.__dict__.keys()
112 ):
113 redaction_detected = True
114 except Exception:
115 pass
117 # Ensure a visible redaction marker appears in the captured message
118 if redaction_detected:
119 try:
120 if (
121 isinstance(record.msg, str)
122 and "***REDACTED***" not in record.msg
123 ):
124 # Append a marker in a way that won't interfere with %-formatting
125 record.msg = f"{record.msg} ***REDACTED***"
126 except Exception:
127 pass
128 except Exception:
129 pass
130 return True
133class CleanFormatter(logging.Formatter):
134 """Formatter that removes ANSI color codes for clean file output."""
136 def format(self, record: logging.LogRecord) -> str:
137 message = super().format(record)
138 try:
139 ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
140 return ansi_escape.sub("", message)
141 except Exception:
142 return message
145def _redact_processor(
146 logger: Any, method_name: str, event_dict: dict[str, Any]
147) -> dict[str, Any]:
148 """Structlog processor to redact sensitive fields in event_dict."""
149 sensitive_keys = {
150 "api_key",
151 "llm_api_key",
152 "authorization",
153 "Authorization",
154 "token",
155 "access_token",
156 "secret",
157 "password",
158 }
160 def mask(value: str) -> str:
161 try:
162 if not isinstance(value, str) or not value:
163 return "***REDACTED***"
164 if len(value) <= 8:
165 return "***REDACTED***"
166 return value[:2] + "***REDACTED***" + value[-2:]
167 except Exception:
168 return "***REDACTED***"
170 def deep_redact(obj: Any) -> Any:
171 try:
172 if isinstance(obj, dict):
173 return {
174 k: (
175 mask(v)
176 if k in sensitive_keys and isinstance(v, str)
177 else deep_redact(v)
178 )
179 for k, v in obj.items()
180 }
181 if isinstance(obj, list):
182 return [deep_redact(i) for i in obj]
183 return obj
184 except Exception:
185 return obj
187 return deep_redact(event_dict)
190class LoggingConfig:
191 """Core logging setup with structlog + stdlib redaction and filters."""
193 _initialized = False
195 @classmethod
196 def setup(
197 cls,
198 *,
199 level: str = "INFO",
200 format: str = "console", # "console" | "json"
201 file: str | None = None,
202 clean_output: bool = True,
203 suppress_qdrant_warnings: bool = True,
204 disable_console: bool | None = None,
205 ) -> None:
206 # Env override for console toggling (e.g., MCP server)
207 if disable_console is None:
208 disable_console = (
209 os.getenv("MCP_DISABLE_CONSOLE_LOGGING", "").lower() == "true"
210 )
212 try:
213 numeric_level = getattr(logging, level.upper())
214 except AttributeError:
215 raise ValueError(f"Invalid log level: {level}") from None
217 # Reset structlog defaults but preserve existing stdlib handlers (e.g., pytest caplog)
218 structlog.reset_defaults()
220 handlers: list[logging.Handler] = []
222 # Choose timestamp format and final renderer for structlog messages
223 if clean_output and format == "console":
224 ts_fmt = "%H:%M:%S"
225 final_renderer = structlog.dev.ConsoleRenderer(colors=True)
226 else:
227 ts_fmt = "iso"
228 final_renderer = (
229 structlog.processors.JSONRenderer()
230 if format == "json"
231 else structlog.dev.ConsoleRenderer(colors=True)
232 )
234 if not disable_console:
235 console_handler = logging.StreamHandler()
236 console_handler.setFormatter(logging.Formatter("%(message)s"))
237 console_handler.addFilter(ApplicationFilter())
238 console_handler.addFilter(RedactionFilter())
239 handlers.append(console_handler)
241 if file:
242 file_handler = logging.FileHandler(file)
243 # Use CleanFormatter to strip ANSI sequences from structlog console renderer output
244 file_handler.setFormatter(CleanFormatter("%(message)s"))
245 file_handler.addFilter(ApplicationFilter())
246 file_handler.addFilter(RedactionFilter())
247 handlers.append(file_handler)
249 # Attach our handlers without removing existing ones (so pytest caplog keeps working)
250 root_logger = logging.getLogger()
251 root_logger.setLevel(numeric_level)
252 for h in handlers:
253 root_logger.addHandler(h)
255 # Add global filters so captured logs (e.g., pytest caplog) are also redacted
256 # Avoid duplicate filters if setup() is called multiple times
257 has_redaction = any(isinstance(f, RedactionFilter) for f in root_logger.filters)
258 if not has_redaction:
259 root_logger.addFilter(RedactionFilter())
260 has_app_filter = any(
261 isinstance(f, ApplicationFilter) for f in root_logger.filters
262 )
263 if not has_app_filter:
264 root_logger.addFilter(ApplicationFilter())
266 # Optional suppressions
267 if suppress_qdrant_warnings:
268 logging.getLogger("qdrant_client").addFilter(QdrantVersionFilter())
270 # Quiet noisy libs a bit
271 for name in ("httpx", "httpcore", "urllib3", "gensim"):
272 logging.getLogger(name).setLevel(logging.WARNING)
274 # structlog processors – render to a final string directly
275 structlog.configure(
276 processors=[
277 structlog.stdlib.filter_by_level,
278 structlog.stdlib.add_logger_name,
279 structlog.stdlib.add_log_level,
280 structlog.processors.TimeStamper(fmt=ts_fmt),
281 _redact_processor,
282 final_renderer,
283 ],
284 wrapper_class=structlog.make_filtering_bound_logger(numeric_level),
285 logger_factory=LoggerFactory(),
286 cache_logger_on_first_use=False,
287 )
289 cls._initialized = True
291 @classmethod
292 def get_logger(cls, name: str | None = None) -> structlog.BoundLogger:
293 if not cls._initialized:
294 cls.setup()
295 return structlog.get_logger(name)