Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/conflict_scoring.py: 64%

86 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3from typing import Any 

4 

5 

6def analyze_text_conflicts( 

7 detector: Any, doc1: Any, doc2: Any 

8) -> tuple[bool, str, float]: 

9 """spaCy-driven textual conflict heuristics (extracted).""" 

10 try: 

11 doc1_analysis = detector.spacy_analyzer.analyze_query_semantic(doc1.content) 

12 doc2_analysis = detector.spacy_analyzer.analyze_query_semantic(doc2.content) 

13 

14 doc1_entities = {ent[0].lower() for ent in doc1_analysis.entities} 

15 doc2_entities = {ent[0].lower() for ent in doc2_analysis.entities} 

16 doc1_keywords = {kw.lower() for kw in doc1_analysis.semantic_keywords} 

17 doc2_keywords = {kw.lower() for kw in doc2_analysis.semantic_keywords} 

18 

19 entity_overlap = len(doc1_entities & doc2_entities) / max( 

20 len(doc1_entities | doc2_entities), 1 

21 ) 

22 _keyword_overlap = len(doc1_keywords & doc2_keywords) / max( 

23 len(doc1_keywords | doc2_keywords), 1 

24 ) 

25 

26 conflict_indicators = [ 

27 "should not", 

28 "avoid", 

29 "deprecated", 

30 "recommended", 

31 "best practice", 

32 "anti-pattern", 

33 "wrong", 

34 "correct", 

35 "instead", 

36 "better", 

37 "worse", 

38 ] 

39 

40 doc1_indicators = sum( 

41 1 for indicator in conflict_indicators if indicator in doc1.content.lower() 

42 ) 

43 doc2_indicators = sum( 

44 1 for indicator in conflict_indicators if indicator in doc2.content.lower() 

45 ) 

46 

47 if entity_overlap > 0.3 and (doc1_indicators > 0 or doc2_indicators > 0): 

48 confidence = min( 

49 entity_overlap * (doc1_indicators + doc2_indicators) / 10, 1.0 

50 ) 

51 explanation = f"Similar topics with conflicting recommendations (overlap: {entity_overlap:.2f})" 

52 return True, explanation, confidence 

53 

54 return False, "No textual conflicts detected", 0.0 

55 except Exception as e: # pragma: no cover 

56 detector.logger.error(f"Error in text conflict analysis: {e}") 

57 return False, f"Text analysis error: {str(e)}", 0.0 

58 

59 

60def analyze_metadata_conflicts( 

61 detector: Any, doc1: Any, doc2: Any 

62) -> tuple[bool, str, float]: 

63 """Metadata-driven conflict heuristics (extracted).""" 

64 try: 

65 conflicts: list[tuple[str, float, str]] = [] 

66 total_weight = 0.0 

67 

68 doc1_date = getattr(doc1, "created_at", None) 

69 doc2_date = getattr(doc2, "created_at", None) 

70 if doc1_date and doc2_date: 

71 date_diff = abs((doc1_date - doc2_date).days) 

72 if date_diff > 365: 

73 conflicts.append( 

74 ("date_conflict", 0.3, f"Documents created {date_diff} days apart") 

75 ) 

76 total_weight += 0.3 

77 

78 if doc1.source_type != doc2.source_type: 

79 source_conflicts = {("confluence", "git"): 0.2, ("jira", "confluence"): 0.1} 

80 conflict_key = tuple(sorted([doc1.source_type, doc2.source_type])) 

81 if conflict_key in source_conflicts: 

82 w = source_conflicts[conflict_key] 

83 conflicts.append( 

84 ( 

85 "source_type_conflict", 

86 w, 

87 f"Different source types: {conflict_key}", 

88 ) 

89 ) 

90 total_weight += w 

91 

92 if ( 

93 hasattr(doc1, "project_id") 

94 and hasattr(doc2, "project_id") 

95 and doc1.project_id != doc2.project_id 

96 ): 

97 conflicts.append( 

98 ( 

99 "project_conflict", 

100 0.1, 

101 f"Different projects: {doc1.project_id} vs {doc2.project_id}", 

102 ) 

103 ) 

104 total_weight += 0.1 

105 

106 if conflicts and total_weight > 0.2: 

107 explanation = "; ".join([c[2] for c in conflicts]) 

108 return True, explanation, min(total_weight, 1.0) 

109 

110 return False, "No metadata conflicts detected", 0.0 

111 except Exception as e: # pragma: no cover 

112 detector.logger.error(f"Error in metadata conflict analysis: {e}") 

113 return False, f"Metadata analysis error: {str(e)}", 0.0 

114 

115 

116def categorize_conflict(_detector: Any, patterns) -> str: 

117 if not patterns: 

118 return "unknown" 

119 for item in patterns: 

120 if isinstance(item, dict): 

121 pattern_text = item.get("type", "").lower() 

122 elif isinstance(item, tuple) and len(item) > 0: 

123 pattern_text = str(item[0]).lower() 

124 elif isinstance(item, str): 

125 pattern_text = item.lower() 

126 else: 

127 pattern_text = str(item).lower() 

128 

129 if any(keyword in pattern_text for keyword in ["version", "deprecated"]): 

130 return "version" 

131 elif any( 

132 keyword in pattern_text 

133 for keyword in [ 

134 "procedure", 

135 "process", 

136 "steps", 

137 "should", 

138 "must", 

139 "never", 

140 "always", 

141 ] 

142 ): 

143 return "procedural" 

144 elif any( 

145 keyword in pattern_text 

146 for keyword in [ 

147 "data", 

148 "value", 

149 "number", 

150 "different values", 

151 "conflicting data", 

152 ] 

153 ): 

154 return "data" 

155 

156 return "general" 

157 

158 

159def calculate_conflict_confidence( 

160 _detector: Any, patterns, doc1_score: float = 1.0, doc2_score: float = 1.0 

161) -> float: 

162 if not patterns: 

163 return 0.0 

164 confidences: list[float] = [] 

165 for pattern in patterns: 

166 if isinstance(pattern, dict): 

167 confidences.append(pattern.get("confidence", 0.5)) 

168 elif isinstance(pattern, tuple) and len(pattern) >= 2: 

169 try: 

170 confidences.append(float(pattern[1])) 

171 except (ValueError, IndexError): 

172 confidences.append(0.5) 

173 else: 

174 pattern_text = str(pattern).lower() 

175 if any( 

176 ind in pattern_text 

177 for ind in [ 

178 "conflict", 

179 "incompatible", 

180 "contradicts", 

181 "different values", 

182 ] 

183 ): 

184 confidences.append(0.8) 

185 elif any( 

186 ind in pattern_text 

187 for ind in ["different approach", "alternative method"] 

188 ): 

189 confidences.append(0.6) 

190 elif any(ind in pattern_text for ind in ["unclear", "possibly different"]): 

191 confidences.append(0.3) 

192 else: 

193 confidences.append(0.5) 

194 pattern_strength = sum(confidences) / len(confidences) if confidences else 0.5 

195 doc_score_avg = (doc1_score + doc2_score) / 2 

196 return min(1.0, pattern_strength * doc_score_avg)