Coverage for src / qdrant_loader_core / logging_filters.py: 76%
62 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 09:42 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 09:42 +0000
1"""Logging filters for redaction and noise suppression."""
3from __future__ import annotations
5import logging
6import re
9class QdrantVersionFilter(logging.Filter):
10 def filter(self, record: logging.LogRecord) -> bool:
11 try:
12 return "version check" not in record.getMessage().lower()
13 except Exception:
14 return True
17class ApplicationFilter(logging.Filter):
18 def filter(self, record: logging.LogRecord) -> bool:
19 # Allow all logs by default; app packages may add their own filters
20 return True
23class RedactionFilter(logging.Filter):
24 """Redacts obvious secrets from stdlib log records."""
26 # Heuristics for tokens/keys in plain strings
27 TOKEN_PATTERNS = [
28 re.compile(r"sk-[A-Za-z0-9_\-]{6,}"),
29 re.compile(r"tok-[A-Za-z0-9_\-]{6,}"),
30 re.compile(
31 r"(?i)(api_key|authorization|token|access_token|secret|password)\s*[:=]\s*([^\s]+)"
32 ),
33 re.compile(r"Bearer\s+[A-Za-z0-9_\-\.]+"),
34 ]
36 # Keys commonly used for secrets in structlog event dictionaries
37 SENSITIVE_KEYS = {
38 "api_key",
39 "llm_api_key",
40 "authorization",
41 "Authorization",
42 "token",
43 "access_token",
44 "secret",
45 "password",
46 }
48 def _redact_text(self, text: str) -> str:
49 def mask(m: re.Match[str]) -> str:
50 s = m.group(0)
51 if len(s) <= 8:
52 return "***REDACTED***"
53 return s[:2] + "***REDACTED***" + s[-2:]
55 redacted = text
56 for pat in self.TOKEN_PATTERNS:
57 redacted = pat.sub(mask, redacted)
58 return redacted
60 def filter(self, record: logging.LogRecord) -> bool:
61 try:
62 redaction_detected = False
64 # Args may contain secrets; best-effort mask strings and detect changes
65 if isinstance(record.args, tuple):
66 new_args = []
67 for a in record.args:
68 if isinstance(a, str):
69 red_a = self._redact_text(a)
70 if red_a != a:
71 redaction_detected = True
72 new_args.append(red_a)
73 else:
74 new_args.append(a)
75 record.args = tuple(new_args)
77 # Redact raw message only when it contains no formatting placeholders
78 # to avoid interfering with %-style or {}-style formatting
79 if isinstance(record.msg, str):
80 try:
81 has_placeholders = ("%" in record.msg) or ("{" in record.msg)
82 except Exception:
83 has_placeholders = True
84 if not has_placeholders:
85 red_msg = self._redact_text(record.msg)
86 if red_msg != record.msg:
87 record.msg = red_msg
88 redaction_detected = True
90 # If structlog extras contain sensitive keys, mark as redacted
91 try:
92 if any(
93 (k in self.SENSITIVE_KEYS and bool(record.__dict__.get(k)))
94 for k in record.__dict__.keys()
95 ):
96 redaction_detected = True
97 except Exception:
98 pass
100 # Ensure a visible redaction marker appears in the captured message
101 if redaction_detected:
102 try:
103 if (
104 isinstance(record.msg, str)
105 and "***REDACTED***" not in record.msg
106 ):
107 # Append a marker in a way that won't interfere with %-formatting
108 record.msg = f"{record.msg} ***REDACTED***"
109 except Exception:
110 pass
111 except Exception:
112 pass
113 return True