Coverage for src / qdrant_loader / utils / sensitive.py: 93%
83 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""Utilities for redacting sensitive values from logs and error messages."""
3from __future__ import annotations
5import re
7_SENSITIVE_FIELD_RE = re.compile(
8 r"(?i)(?P<quote>['\"]?)(?P<key>[a-z0-9_\-]*(?:token|api[_-]?key|password|secret|access[_-]?key|private[_-]?key|authorization)[a-z0-9_\-]*)(?P=quote)\s*(?P<sep>[:=])\s*(?P<value>'[^']*'|\"[^\"]*\"|[^,\s}\]]+)"
9)
10_INPUT_VALUE_PREFIX_RE = re.compile(r"input_value\s*=\s*", re.IGNORECASE)
11_AUTHORIZATION_RE = re.compile(
12 r'(?i)(authorization\s*[:=]\s*)(?:(?:bearer|basic|token)\s+)?(?:"[^"]*"|\'[^\']*\'|[^\s,]+)'
13)
14_OPENAI_KEY_RE = re.compile(r"\bsk-[a-zA-Z0-9\-_]{12,}\b")
17def _consume_input_value(text: str, start: int) -> int:
18 """Consume the value that follows input_value=, handling nested structures."""
19 if start >= len(text):
20 return start
22 ch = text[start]
24 # Quoted scalar
25 if ch in "'\"":
26 quote = ch
27 i = start + 1
28 escaped = False
29 while i < len(text):
30 cur = text[i]
31 if escaped:
32 escaped = False
33 elif cur == "\\":
34 escaped = True
35 elif cur == quote:
36 return i + 1
37 i += 1
38 return i
40 # Nested dict/list structures
41 if ch in "[{":
42 stack = ["]" if ch == "[" else "}"]
43 i = start + 1
44 in_quote = ""
45 escaped = False
47 while i < len(text) and stack:
48 cur = text[i]
50 if in_quote:
51 if escaped:
52 escaped = False
53 elif cur == "\\":
54 escaped = True
55 elif cur == in_quote:
56 in_quote = ""
57 else:
58 if cur in "'\"":
59 in_quote = cur
60 elif cur in "[{":
61 stack.append("]" if cur == "[" else "}")
62 elif cur in "]}" and stack and cur == stack[-1]:
63 stack.pop()
64 i += 1
66 return i
68 # Unquoted scalar value: consume until delimiter
69 i = start
70 while i < len(text) and text[i] not in ",\n":
71 i += 1
72 return i
75def _mask_input_value_segments(text: str, mask: str) -> str:
76 """Mask all input_value=... occurrences, including deeply nested values."""
77 if "input_value" not in text.lower():
78 return text
80 out_parts: list[str] = []
81 cursor = 0
83 for match in _INPUT_VALUE_PREFIX_RE.finditer(text):
84 value_start = match.end()
85 value_end = _consume_input_value(text, value_start)
87 out_parts.append(text[cursor : match.start()])
88 out_parts.append(f"{match.group(0)}{mask}")
89 cursor = value_end
91 out_parts.append(text[cursor:])
92 return "".join(out_parts)
95def redact_sensitive_data(text: str, mask: str = "**") -> str:
96 """Redact common secret patterns in free-form text.
98 Args:
99 text: Input text that may contain secrets.
100 mask: Replacement value for sensitive data.
102 Returns:
103 Redacted text safe for logs and terminal output.
104 """
105 if not text:
106 return text
108 def _replace_field(match: re.Match[str]) -> str:
109 quote = match.group("quote") or ""
110 key = match.group("key")
111 sep = match.group("sep")
112 return f"{quote}{key}{quote}{sep}{mask}"
114 redacted = _mask_input_value_segments(text, mask)
115 redacted = _AUTHORIZATION_RE.sub(rf"\1{mask}", redacted)
116 redacted = _SENSITIVE_FIELD_RE.sub(_replace_field, redacted)
117 redacted = _OPENAI_KEY_RE.sub(mask, redacted)
118 return redacted
121def sanitize_exception_message(error: Exception | str, mask: str = "**") -> str:
122 """Convert an exception or message string to a safe, redacted message."""
123 redacted = redact_sensitive_data(str(error), mask=mask)
125 if redacted and redacted.strip():
126 return redacted
128 if isinstance(error, Exception):
129 return error.__class__.__name__
131 return "<redacted>"