Coverage for src / qdrant_loader / utils / sensitive.py: 93%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Utilities for redacting sensitive values from logs and error messages.""" 

2 

3from __future__ import annotations 

4 

5import re 

6 

7_SENSITIVE_FIELD_RE = re.compile( 

8 r"(?i)(?P<quote>['\"]?)(?P<key>[a-z0-9_\-]*(?:token|api[_-]?key|password|secret|access[_-]?key|private[_-]?key|authorization)[a-z0-9_\-]*)(?P=quote)\s*(?P<sep>[:=])\s*(?P<value>'[^']*'|\"[^\"]*\"|[^,\s}\]]+)" 

9) 

10_INPUT_VALUE_PREFIX_RE = re.compile(r"input_value\s*=\s*", re.IGNORECASE) 

11_AUTHORIZATION_RE = re.compile( 

12 r'(?i)(authorization\s*[:=]\s*)(?:(?:bearer|basic|token)\s+)?(?:"[^"]*"|\'[^\']*\'|[^\s,]+)' 

13) 

14_OPENAI_KEY_RE = re.compile(r"\bsk-[a-zA-Z0-9\-_]{12,}\b") 

15 

16 

17def _consume_input_value(text: str, start: int) -> int: 

18 """Consume the value that follows input_value=, handling nested structures.""" 

19 if start >= len(text): 

20 return start 

21 

22 ch = text[start] 

23 

24 # Quoted scalar 

25 if ch in "'\"": 

26 quote = ch 

27 i = start + 1 

28 escaped = False 

29 while i < len(text): 

30 cur = text[i] 

31 if escaped: 

32 escaped = False 

33 elif cur == "\\": 

34 escaped = True 

35 elif cur == quote: 

36 return i + 1 

37 i += 1 

38 return i 

39 

40 # Nested dict/list structures 

41 if ch in "[{": 

42 stack = ["]" if ch == "[" else "}"] 

43 i = start + 1 

44 in_quote = "" 

45 escaped = False 

46 

47 while i < len(text) and stack: 

48 cur = text[i] 

49 

50 if in_quote: 

51 if escaped: 

52 escaped = False 

53 elif cur == "\\": 

54 escaped = True 

55 elif cur == in_quote: 

56 in_quote = "" 

57 else: 

58 if cur in "'\"": 

59 in_quote = cur 

60 elif cur in "[{": 

61 stack.append("]" if cur == "[" else "}") 

62 elif cur in "]}" and stack and cur == stack[-1]: 

63 stack.pop() 

64 i += 1 

65 

66 return i 

67 

68 # Unquoted scalar value: consume until delimiter 

69 i = start 

70 while i < len(text) and text[i] not in ",\n": 

71 i += 1 

72 return i 

73 

74 

75def _mask_input_value_segments(text: str, mask: str) -> str: 

76 """Mask all input_value=... occurrences, including deeply nested values.""" 

77 if "input_value" not in text.lower(): 

78 return text 

79 

80 out_parts: list[str] = [] 

81 cursor = 0 

82 

83 for match in _INPUT_VALUE_PREFIX_RE.finditer(text): 

84 value_start = match.end() 

85 value_end = _consume_input_value(text, value_start) 

86 

87 out_parts.append(text[cursor : match.start()]) 

88 out_parts.append(f"{match.group(0)}{mask}") 

89 cursor = value_end 

90 

91 out_parts.append(text[cursor:]) 

92 return "".join(out_parts) 

93 

94 

95def redact_sensitive_data(text: str, mask: str = "**") -> str: 

96 """Redact common secret patterns in free-form text. 

97 

98 Args: 

99 text: Input text that may contain secrets. 

100 mask: Replacement value for sensitive data. 

101 

102 Returns: 

103 Redacted text safe for logs and terminal output. 

104 """ 

105 if not text: 

106 return text 

107 

108 def _replace_field(match: re.Match[str]) -> str: 

109 quote = match.group("quote") or "" 

110 key = match.group("key") 

111 sep = match.group("sep") 

112 return f"{quote}{key}{quote}{sep}{mask}" 

113 

114 redacted = _mask_input_value_segments(text, mask) 

115 redacted = _AUTHORIZATION_RE.sub(rf"\1{mask}", redacted) 

116 redacted = _SENSITIVE_FIELD_RE.sub(_replace_field, redacted) 

117 redacted = _OPENAI_KEY_RE.sub(mask, redacted) 

118 return redacted 

119 

120 

121def sanitize_exception_message(error: Exception | str, mask: str = "**") -> str: 

122 """Convert an exception or message string to a safe, redacted message.""" 

123 redacted = redact_sensitive_data(str(error), mask=mask) 

124 

125 if redacted and redacted.strip(): 

126 return redacted 

127 

128 if isinstance(error, Exception): 

129 return error.__class__.__name__ 

130 

131 return "<redacted>"