Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/conflict_scoring.py: 64%
86 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3from typing import Any
6def analyze_text_conflicts(
7 detector: Any, doc1: Any, doc2: Any
8) -> tuple[bool, str, float]:
9 """spaCy-driven textual conflict heuristics (extracted)."""
10 try:
11 doc1_analysis = detector.spacy_analyzer.analyze_query_semantic(doc1.content)
12 doc2_analysis = detector.spacy_analyzer.analyze_query_semantic(doc2.content)
14 doc1_entities = {ent[0].lower() for ent in doc1_analysis.entities}
15 doc2_entities = {ent[0].lower() for ent in doc2_analysis.entities}
16 doc1_keywords = {kw.lower() for kw in doc1_analysis.semantic_keywords}
17 doc2_keywords = {kw.lower() for kw in doc2_analysis.semantic_keywords}
19 entity_overlap = len(doc1_entities & doc2_entities) / max(
20 len(doc1_entities | doc2_entities), 1
21 )
22 _keyword_overlap = len(doc1_keywords & doc2_keywords) / max(
23 len(doc1_keywords | doc2_keywords), 1
24 )
26 conflict_indicators = [
27 "should not",
28 "avoid",
29 "deprecated",
30 "recommended",
31 "best practice",
32 "anti-pattern",
33 "wrong",
34 "correct",
35 "instead",
36 "better",
37 "worse",
38 ]
40 doc1_indicators = sum(
41 1 for indicator in conflict_indicators if indicator in doc1.content.lower()
42 )
43 doc2_indicators = sum(
44 1 for indicator in conflict_indicators if indicator in doc2.content.lower()
45 )
47 if entity_overlap > 0.3 and (doc1_indicators > 0 or doc2_indicators > 0):
48 confidence = min(
49 entity_overlap * (doc1_indicators + doc2_indicators) / 10, 1.0
50 )
51 explanation = f"Similar topics with conflicting recommendations (overlap: {entity_overlap:.2f})"
52 return True, explanation, confidence
54 return False, "No textual conflicts detected", 0.0
55 except Exception as e: # pragma: no cover
56 detector.logger.error(f"Error in text conflict analysis: {e}")
57 return False, f"Text analysis error: {str(e)}", 0.0
60def analyze_metadata_conflicts(
61 detector: Any, doc1: Any, doc2: Any
62) -> tuple[bool, str, float]:
63 """Metadata-driven conflict heuristics (extracted)."""
64 try:
65 conflicts: list[tuple[str, float, str]] = []
66 total_weight = 0.0
68 doc1_date = getattr(doc1, "created_at", None)
69 doc2_date = getattr(doc2, "created_at", None)
70 if doc1_date and doc2_date:
71 date_diff = abs((doc1_date - doc2_date).days)
72 if date_diff > 365:
73 conflicts.append(
74 ("date_conflict", 0.3, f"Documents created {date_diff} days apart")
75 )
76 total_weight += 0.3
78 if doc1.source_type != doc2.source_type:
79 source_conflicts = {("confluence", "git"): 0.2, ("jira", "confluence"): 0.1}
80 conflict_key = tuple(sorted([doc1.source_type, doc2.source_type]))
81 if conflict_key in source_conflicts:
82 w = source_conflicts[conflict_key]
83 conflicts.append(
84 (
85 "source_type_conflict",
86 w,
87 f"Different source types: {conflict_key}",
88 )
89 )
90 total_weight += w
92 if (
93 hasattr(doc1, "project_id")
94 and hasattr(doc2, "project_id")
95 and doc1.project_id != doc2.project_id
96 ):
97 conflicts.append(
98 (
99 "project_conflict",
100 0.1,
101 f"Different projects: {doc1.project_id} vs {doc2.project_id}",
102 )
103 )
104 total_weight += 0.1
106 if conflicts and total_weight > 0.2:
107 explanation = "; ".join([c[2] for c in conflicts])
108 return True, explanation, min(total_weight, 1.0)
110 return False, "No metadata conflicts detected", 0.0
111 except Exception as e: # pragma: no cover
112 detector.logger.error(f"Error in metadata conflict analysis: {e}")
113 return False, f"Metadata analysis error: {str(e)}", 0.0
116def categorize_conflict(_detector: Any, patterns) -> str:
117 if not patterns:
118 return "unknown"
119 for item in patterns:
120 if isinstance(item, dict):
121 pattern_text = item.get("type", "").lower()
122 elif isinstance(item, tuple) and len(item) > 0:
123 pattern_text = str(item[0]).lower()
124 elif isinstance(item, str):
125 pattern_text = item.lower()
126 else:
127 pattern_text = str(item).lower()
129 if any(keyword in pattern_text for keyword in ["version", "deprecated"]):
130 return "version"
131 elif any(
132 keyword in pattern_text
133 for keyword in [
134 "procedure",
135 "process",
136 "steps",
137 "should",
138 "must",
139 "never",
140 "always",
141 ]
142 ):
143 return "procedural"
144 elif any(
145 keyword in pattern_text
146 for keyword in [
147 "data",
148 "value",
149 "number",
150 "different values",
151 "conflicting data",
152 ]
153 ):
154 return "data"
156 return "general"
159def calculate_conflict_confidence(
160 _detector: Any, patterns, doc1_score: float = 1.0, doc2_score: float = 1.0
161) -> float:
162 if not patterns:
163 return 0.0
164 confidences: list[float] = []
165 for pattern in patterns:
166 if isinstance(pattern, dict):
167 confidences.append(pattern.get("confidence", 0.5))
168 elif isinstance(pattern, tuple) and len(pattern) >= 2:
169 try:
170 confidences.append(float(pattern[1]))
171 except (ValueError, IndexError):
172 confidences.append(0.5)
173 else:
174 pattern_text = str(pattern).lower()
175 if any(
176 ind in pattern_text
177 for ind in [
178 "conflict",
179 "incompatible",
180 "contradicts",
181 "different values",
182 ]
183 ):
184 confidences.append(0.8)
185 elif any(
186 ind in pattern_text
187 for ind in ["different approach", "alternative method"]
188 ):
189 confidences.append(0.6)
190 elif any(ind in pattern_text for ind in ["unclear", "possibly different"]):
191 confidences.append(0.3)
192 else:
193 confidences.append(0.5)
194 pattern_strength = sum(confidences) / len(confidences) if confidences else 0.5
195 doc_score_avg = (doc1_score + doc2_score) / 2
196 return min(1.0, pattern_strength * doc_score_avg)