Coverage for src / qdrant_loader_core / logging_filters.py: 76%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-12 09:42 +0000

1"""Logging filters for redaction and noise suppression.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import re 

7 

8 

9class QdrantVersionFilter(logging.Filter): 

10 def filter(self, record: logging.LogRecord) -> bool: 

11 try: 

12 return "version check" not in record.getMessage().lower() 

13 except Exception: 

14 return True 

15 

16 

17class ApplicationFilter(logging.Filter): 

18 def filter(self, record: logging.LogRecord) -> bool: 

19 # Allow all logs by default; app packages may add their own filters 

20 return True 

21 

22 

23class RedactionFilter(logging.Filter): 

24 """Redacts obvious secrets from stdlib log records.""" 

25 

26 # Heuristics for tokens/keys in plain strings 

27 TOKEN_PATTERNS = [ 

28 re.compile(r"sk-[A-Za-z0-9_\-]{6,}"), 

29 re.compile(r"tok-[A-Za-z0-9_\-]{6,}"), 

30 re.compile( 

31 r"(?i)(api_key|authorization|token|access_token|secret|password)\s*[:=]\s*([^\s]+)" 

32 ), 

33 re.compile(r"Bearer\s+[A-Za-z0-9_\-\.]+"), 

34 ] 

35 

36 # Keys commonly used for secrets in structlog event dictionaries 

37 SENSITIVE_KEYS = { 

38 "api_key", 

39 "llm_api_key", 

40 "authorization", 

41 "Authorization", 

42 "token", 

43 "access_token", 

44 "secret", 

45 "password", 

46 } 

47 

48 def _redact_text(self, text: str) -> str: 

49 def mask(m: re.Match[str]) -> str: 

50 s = m.group(0) 

51 if len(s) <= 8: 

52 return "***REDACTED***" 

53 return s[:2] + "***REDACTED***" + s[-2:] 

54 

55 redacted = text 

56 for pat in self.TOKEN_PATTERNS: 

57 redacted = pat.sub(mask, redacted) 

58 return redacted 

59 

60 def filter(self, record: logging.LogRecord) -> bool: 

61 try: 

62 redaction_detected = False 

63 

64 # Args may contain secrets; best-effort mask strings and detect changes 

65 if isinstance(record.args, tuple): 

66 new_args = [] 

67 for a in record.args: 

68 if isinstance(a, str): 

69 red_a = self._redact_text(a) 

70 if red_a != a: 

71 redaction_detected = True 

72 new_args.append(red_a) 

73 else: 

74 new_args.append(a) 

75 record.args = tuple(new_args) 

76 

77 # Redact raw message only when it contains no formatting placeholders 

78 # to avoid interfering with %-style or {}-style formatting 

79 if isinstance(record.msg, str): 

80 try: 

81 has_placeholders = ("%" in record.msg) or ("{" in record.msg) 

82 except Exception: 

83 has_placeholders = True 

84 if not has_placeholders: 

85 red_msg = self._redact_text(record.msg) 

86 if red_msg != record.msg: 

87 record.msg = red_msg 

88 redaction_detected = True 

89 

90 # If structlog extras contain sensitive keys, mark as redacted 

91 try: 

92 if any( 

93 (k in self.SENSITIVE_KEYS and bool(record.__dict__.get(k))) 

94 for k in record.__dict__.keys() 

95 ): 

96 redaction_detected = True 

97 except Exception: 

98 pass 

99 

100 # Ensure a visible redaction marker appears in the captured message 

101 if redaction_detected: 

102 try: 

103 if ( 

104 isinstance(record.msg, str) 

105 and "***REDACTED***" not in record.msg 

106 ): 

107 # Append a marker in a way that won't interfere with %-formatting 

108 record.msg = f"{record.msg} ***REDACTED***" 

109 except Exception: 

110 pass 

111 except Exception: 

112 pass 

113 return True