Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/detectors.py: 68%
254 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Conflict Detection for Cross-Document Intelligence.
4This module implements advanced conflict detection between documents using
5vector similarity analysis and optional LLM validation for comprehensive
6conflict identification and analysis.
7"""
9from __future__ import annotations
11import time
12from collections import defaultdict
13from datetime import datetime
14from typing import TYPE_CHECKING, Any
16# Soft-import async clients to avoid hard dependency at import time
17if TYPE_CHECKING:
18 from qdrant_client import AsyncQdrantClient
19else:
20 AsyncQdrantClient = None # type: ignore[assignment]
22from ....utils.logging import LoggingConfig
23from ...models import SearchResult
24from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer
25from .conflict_pairing import (
26 calculate_vector_similarity as _calculate_vector_similarity_ext,
27)
28from .conflict_pairing import (
29 filter_by_vector_similarity as _filter_by_vector_similarity_ext,
30)
31from .conflict_pairing import get_document_embeddings as _get_document_embeddings_ext
32from .conflict_resolution import describe_conflict as _describe_conflict_ext
33from .conflict_resolution import extract_context_snippet as _extract_context_snippet_ext
34from .conflict_resolution import (
35 generate_resolution_suggestions as _generate_resolution_suggestions_ext,
36)
37from .conflict_scoring import (
38 analyze_metadata_conflicts as _analyze_metadata_conflicts_ext,
39)
40from .conflict_scoring import analyze_text_conflicts as _analyze_text_conflicts_ext
41from .conflict_scoring import calculate_conflict_confidence as _calculate_confidence_ext
42from .conflict_scoring import categorize_conflict as _categorize_conflict_ext
43from .legacy_adapters import LegacyConflictDetectorAdapter
44from .llm_validation import llm_analyze_conflicts as _llm_analyze_conflicts_ext
45from .llm_validation import (
46 validate_conflict_with_llm as _validate_conflict_with_llm_ext,
47)
48from .models import ConflictAnalysis
50logger = LoggingConfig.get_logger(__name__)
53class ConflictDetector:
54 """Enhanced conflict detector using vector similarity and LLM validation."""
56 def __init__(
57 self,
58 spacy_analyzer: SpaCyQueryAnalyzer,
59 qdrant_client: AsyncQdrantClient | None = None,
60 openai_client: Any | None = None,
61 collection_name: str = "documents",
62 preferred_vector_name: str | None = "dense",
63 ):
64 """Initialize the enhanced conflict detector.
66 Args:
67 spacy_analyzer: SpaCy analyzer for text processing
68 qdrant_client: Qdrant client for vector operations
69 openai_client: OpenAI client for LLM analysis
70 collection_name: Qdrant collection name
71 """
72 self.spacy_analyzer = spacy_analyzer
73 self.qdrant_client = qdrant_client
74 self.openai_client = openai_client
75 self.collection_name = collection_name
76 self.logger = LoggingConfig.get_logger(__name__)
77 self.preferred_vector_name = preferred_vector_name
79 # Vector similarity thresholds
80 self.MIN_VECTOR_SIMILARITY = (
81 0.6 # Minimum similarity to consider for conflict analysis
82 )
83 self.MAX_VECTOR_SIMILARITY = (
84 0.95 # Maximum similarity - too similar suggests same content
85 )
87 # LLM validation settings
88 self.llm_enabled = qdrant_client is not None and openai_client is not None
90 # Link back to engine for provider access if set upstream
91 self.engine: Any | None = None
93 async def _get_document_embeddings(
94 self, document_ids: list[str]
95 ) -> dict[str, list[float]]:
96 return await _get_document_embeddings_ext(self, document_ids)
98 def _calculate_vector_similarity(
99 self, embedding1: list[float], embedding2: list[float]
100 ) -> float:
101 return _calculate_vector_similarity_ext(self, embedding1, embedding2)
103 async def _validate_conflict_with_llm(
104 self, doc1: SearchResult, doc2: SearchResult, similarity_score: float
105 ) -> tuple[bool, str, float]:
106 return await _validate_conflict_with_llm_ext(self, doc1, doc2, similarity_score)
108 def _analyze_text_conflicts(
109 self, doc1: SearchResult, doc2: SearchResult
110 ) -> tuple[bool, str, float]:
111 return _analyze_text_conflicts_ext(self, doc1, doc2)
113 def _analyze_metadata_conflicts(
114 self, doc1: SearchResult, doc2: SearchResult
115 ) -> tuple[bool, str, float]:
116 return _analyze_metadata_conflicts_ext(self, doc1, doc2)
118 async def detect_conflicts(self, documents: list[SearchResult]) -> ConflictAnalysis:
119 """Detect conflicts between documents using multiple analysis methods."""
120 start_time = time.time()
121 conflicts: list[ConflictAnalysis] = []
123 if len(documents) < 2:
124 self.logger.debug("Need at least 2 documents for conflict detection")
125 return ConflictAnalysis()
127 try:
128 # Precompute embeddings once
129 document_ids = [
130 getattr(doc, "document_id", f"{doc.source_type}:{doc.source_title}")
131 for doc in documents
132 ]
133 embeddings = await self._get_document_embeddings(document_ids)
135 def analyze_pair(
136 doc1: SearchResult, doc2: SearchResult, doc1_id: str, doc2_id: str
137 ) -> ConflictAnalysis | None:
138 vector_similarity = 0.0
139 if doc1_id in embeddings and doc2_id in embeddings:
140 vector_similarity = self._calculate_vector_similarity(
141 embeddings[doc1_id], embeddings[doc2_id]
142 )
143 if (
144 vector_similarity > self.MAX_VECTOR_SIMILARITY
145 or vector_similarity < self.MIN_VECTOR_SIMILARITY
146 ):
147 return None
149 text_conflict, text_explanation, text_confidence = (
150 self._analyze_text_conflicts(doc1, doc2)
151 )
152 metadata_conflict, metadata_explanation, metadata_confidence = (
153 self._analyze_metadata_conflicts(doc1, doc2)
154 )
156 llm_conflict = False
157 llm_explanation = ""
158 llm_confidence = 0.0
159 if self.llm_enabled and (
160 text_conflict or metadata_conflict or vector_similarity > 0.7
161 ):
162 # Inlined await is not possible in nested def; handled outside
163 pass
165 if not (text_conflict or metadata_conflict or llm_conflict):
166 return None
168 combined_confidence = max(
169 text_confidence, metadata_confidence, llm_confidence
170 )
171 return ConflictAnalysis(
172 document1_title=doc1.source_title,
173 document1_source=doc1.source_type,
174 document2_title=doc2.source_title,
175 document2_source=doc2.source_type,
176 conflict_type="content_conflict",
177 confidence_score=combined_confidence,
178 vector_similarity=vector_similarity,
179 analysis_method="multi_method",
180 explanation=f"Text: {text_explanation}; Metadata: {metadata_explanation}; LLM: {llm_explanation}",
181 detected_at=datetime.now(),
182 )
184 for i, doc1 in enumerate(documents):
185 for j, doc2 in enumerate(documents[i + 1 :], i + 1):
186 doc1_id = document_ids[i]
187 doc2_id = document_ids[j]
189 # Compute baseline analysis
190 baseline = analyze_pair(doc1, doc2, doc1_id, doc2_id)
192 # If LLM is needed, compute with LLM and merge
193 if baseline is None:
194 # Check if LLM should run even when baseline None (only by similarity gate)
195 vector_similarity = 0.0
196 if doc1_id in embeddings and doc2_id in embeddings:
197 vector_similarity = self._calculate_vector_similarity(
198 embeddings[doc1_id], embeddings[doc2_id]
199 )
200 if self.llm_enabled and vector_similarity > 0.7:
201 llm_conflict, llm_explanation, llm_confidence = (
202 await self._validate_conflict_with_llm(
203 doc1, doc2, vector_similarity
204 )
205 )
206 if llm_conflict:
207 conflicts.append(
208 ConflictAnalysis(
209 document1_title=doc1.source_title,
210 document1_source=doc1.source_type,
211 document2_title=doc2.source_title,
212 document2_source=doc2.source_type,
213 conflict_type="content_conflict",
214 confidence_score=llm_confidence,
215 vector_similarity=vector_similarity,
216 analysis_method="multi_method",
217 explanation=f"LLM: {llm_explanation}",
218 detected_at=datetime.now(),
219 )
220 )
221 continue
223 # If baseline exists and LLM applies, enrich with LLM
224 if self.llm_enabled:
225 vector_similarity = baseline.vector_similarity or 0.0
226 if (
227 vector_similarity <= 0.0
228 and doc1_id in embeddings
229 and doc2_id in embeddings
230 ):
231 vector_similarity = self._calculate_vector_similarity(
232 embeddings[doc1_id], embeddings[doc2_id]
233 )
234 if vector_similarity > 0.7:
235 llm_conflict, llm_explanation, llm_confidence = (
236 await self._validate_conflict_with_llm(
237 doc1, doc2, vector_similarity
238 )
239 )
240 if llm_conflict:
241 baseline.confidence_score = max(
242 baseline.confidence_score, llm_confidence
243 )
244 baseline.explanation = (
245 baseline.explanation + f"; LLM: {llm_explanation}"
246 )
248 conflicts.append(baseline)
250 processing_time = (time.time() - start_time) * 1000
251 self.logger.info(
252 f"Conflict detection completed in {processing_time:.2f}ms",
253 document_count=len(documents),
254 conflicts_detected=len(conflicts),
255 vector_analysis=len(embeddings) > 0,
256 llm_analysis=self.llm_enabled,
257 )
259 # Build merged ConflictAnalysis from collected results
260 merged_conflicting_pairs: list[tuple[str, str, dict[str, Any]]] = []
261 merged_conflict_categories: defaultdict[str, list[tuple[str, str]]] = (
262 defaultdict(list)
263 )
264 merged_resolution_suggestions: dict[str, str] = {}
266 for conflict in conflicts:
267 # Preferred: ConflictAnalysis objects
268 if isinstance(conflict, ConflictAnalysis):
269 if getattr(conflict, "conflicting_pairs", None):
270 merged_conflicting_pairs.extend(conflict.conflicting_pairs)
271 if getattr(conflict, "conflict_categories", None):
272 for category, pairs in conflict.conflict_categories.items():
273 merged_conflict_categories[category].extend(pairs)
274 if getattr(conflict, "resolution_suggestions", None):
275 merged_resolution_suggestions.update(
276 conflict.resolution_suggestions
277 )
278 continue
280 # Backward-compat: tuples like (doc1_id, doc2_id, conflict_info)
281 try:
282 if isinstance(conflict, tuple) and len(conflict) >= 3:
283 doc1_id, doc2_id, conflict_info = (
284 conflict[0],
285 conflict[1],
286 conflict[2],
287 )
288 info_dict = (
289 conflict_info
290 if isinstance(conflict_info, dict)
291 else {"info": conflict_info}
292 )
293 merged_conflicting_pairs.append(
294 (str(doc1_id), str(doc2_id), info_dict)
295 )
296 conflict_type = info_dict.get("type", "unknown")
297 merged_conflict_categories[conflict_type].append(
298 (str(doc1_id), str(doc2_id))
299 )
300 except Exception:
301 # Ignore malformed entries
302 pass
304 return ConflictAnalysis(
305 conflicting_pairs=merged_conflicting_pairs,
306 conflict_categories=dict(merged_conflict_categories),
307 resolution_suggestions=merged_resolution_suggestions,
308 )
310 except Exception as e:
311 processing_time = (time.time() - start_time) * 1000
312 self.logger.error(
313 f"Error in conflict detection after {processing_time:.2f}ms: {e}"
314 )
315 return ConflictAnalysis()
317 # Compatibility methods for tests - delegate via legacy adapter
318 def _find_contradiction_patterns(self, doc1, doc2):
319 try:
320 adapter = getattr(self, "_legacy_adapter", None)
321 if adapter is None:
322 adapter = LegacyConflictDetectorAdapter(self)
323 self._legacy_adapter = adapter
324 return adapter._find_contradiction_patterns(doc1, doc2)
325 except Exception as e:
326 self.logger.warning(
327 f"Error delegating to legacy adapter in _find_contradiction_patterns: {e}"
328 )
329 return []
331 def _detect_version_conflicts(self, doc1, doc2):
332 try:
333 adapter = getattr(self, "_legacy_adapter", None)
334 if adapter is None:
335 adapter = LegacyConflictDetectorAdapter(self)
336 self._legacy_adapter = adapter
337 return adapter._detect_version_conflicts(doc1, doc2)
338 except Exception as e:
339 self.logger.warning(
340 f"Error delegating to legacy adapter in _detect_version_conflicts: {e}"
341 )
342 return []
344 def _detect_procedural_conflicts(self, doc1, doc2):
345 try:
346 adapter = getattr(self, "_legacy_adapter", None)
347 if adapter is None:
348 adapter = LegacyConflictDetectorAdapter(self)
349 self._legacy_adapter = adapter
350 return adapter._detect_procedural_conflicts(doc1, doc2)
351 except Exception as e:
352 self.logger.warning(
353 f"Error delegating to legacy adapter in _detect_procedural_conflicts: {e}"
354 )
355 return []
357 def _extract_context_snippet(
358 self, text, keyword, context_length=100, max_length=None
359 ):
360 return _extract_context_snippet_ext(
361 self, text, keyword, context_length, max_length
362 )
364 def _categorize_conflict(self, patterns):
365 return _categorize_conflict_ext(self, patterns)
367 def _calculate_conflict_confidence(self, patterns, doc1_score=1.0, doc2_score=1.0):
368 return _calculate_confidence_ext(self, patterns, doc1_score, doc2_score)
370 # --- Public stats accessor to avoid leaking private attributes ---
371 def get_stats(self) -> dict:
372 """Return detector runtime statistics as a dictionary.
374 Public, stable accessor that always returns a dictionary and never raises.
375 """
376 try:
377 stats = getattr(self, "_last_stats", None)
378 return stats if isinstance(stats, dict) else {}
379 except Exception:
380 return {}
382 def get_last_stats(self) -> dict:
383 """Return the last computed runtime statistics as a dictionary.
385 Exposes detector runtime metrics via a stable public API. Falls back to an
386 empty dict if stats are not available. Compatible with internal implementations
387 that may store stats on a private attribute.
388 """
389 try:
390 stats = getattr(self, "_last_stats", None)
391 return stats if isinstance(stats, dict) else {}
392 except Exception:
393 return {}
395 # Additional compatibility methods for tests
396 def _have_content_overlap(self, doc1: SearchResult, doc2: SearchResult) -> bool:
397 """Check if two documents have content overlap (compatibility method)."""
398 # Simple content overlap check using token intersection
399 tokens1 = set(doc1.text.lower().split())
400 tokens2 = set(doc2.text.lower().split())
402 # Calculate overlap ratio based on smaller set (Jaccard similarity variant)
403 intersection = tokens1 & tokens2
404 min_tokens = min(len(tokens1), len(tokens2))
406 if min_tokens == 0:
407 return False
409 # Use intersection over minimum set size for better sensitivity
410 overlap_ratio = len(intersection) / min_tokens
411 return overlap_ratio > 0.2 # 20% overlap threshold (more sensitive)
413 def _have_semantic_similarity(self, doc1: SearchResult, doc2: SearchResult) -> bool:
414 """Check if two documents have semantic similarity (compatibility method)."""
415 try:
416 # Get tokens for analysis
417 tokens1 = set(doc1.text.lower().split())
418 tokens2 = set(doc2.text.lower().split())
420 # EXPLICIT checks for very different topics FIRST
421 food_words = {
422 "coffee",
423 "brewing",
424 "recipe",
425 "cooking",
426 "food",
427 "drink",
428 "beverage",
429 "taste",
430 "techniques",
431 }
432 tech_words = {
433 "authentication",
434 "security",
435 "login",
436 "access",
437 "user",
438 "secure",
439 "auth",
440 "password",
441 }
443 doc1_is_food = bool(tokens1 & food_words)
444 doc1_is_tech = bool(tokens1 & tech_words)
445 doc2_is_food = bool(tokens2 & food_words)
446 doc2_is_tech = bool(tokens2 & tech_words)
448 # If one is clearly food-related and the other is tech-related, NOT similar
449 if (doc1_is_food and doc2_is_tech) or (doc1_is_tech and doc2_is_food):
450 return False
452 # Try spaCy if available for similar topics
453 if self.spacy_analyzer and hasattr(self.spacy_analyzer, "nlp"):
454 doc1_processed = self.spacy_analyzer.nlp(
455 doc1.text[:500]
456 ) # Limit text length
457 doc2_processed = self.spacy_analyzer.nlp(doc2.text[:500])
459 similarity = doc1_processed.similarity(doc2_processed)
460 if similarity > 0.5: # Lower threshold for better sensitivity
461 return True
463 # Look for semantic concept overlap (common important words)
464 semantic_keywords = {
465 "authentication",
466 "login",
467 "security",
468 "access",
469 "user",
470 "secure",
471 "auth",
472 "method",
473 }
474 concept1 = tokens1 & semantic_keywords
475 concept2 = tokens2 & semantic_keywords
477 # If both docs have semantic concepts and they overlap significantly
478 if concept1 and concept2:
479 concept_overlap = len(concept1 & concept2) / max(
480 len(concept1), len(concept2)
481 )
482 if concept_overlap > 0.5: # 50% concept overlap
483 return True
485 # Final fallback: use content overlap with strict threshold
486 tokens_intersection = tokens1 & tokens2
487 min_tokens = min(len(tokens1), len(tokens2))
488 if min_tokens > 0:
489 overlap_ratio = len(tokens_intersection) / min_tokens
490 return overlap_ratio > 0.5 # Very high threshold
492 return False
494 except Exception:
495 # Ultimate fallback - be conservative
496 return False
498 def _describe_conflict(self, indicators: list) -> str:
499 return _describe_conflict_ext(self, indicators)
501 def _generate_resolution_suggestions(self, conflicts) -> dict[str, str]:
502 return _generate_resolution_suggestions_ext(self, conflicts)
504 async def _llm_analyze_conflicts(
505 self, doc1: SearchResult, doc2: SearchResult, similarity_score: float
506 ) -> dict | None:
507 return await _llm_analyze_conflicts_ext(self, doc1, doc2, similarity_score)
509 async def _get_tiered_analysis_pairs(
510 self, documents: list[SearchResult]
511 ) -> list[tuple]:
512 """Generate tiered analysis pairs for conflict detection (compatibility method)."""
513 pairs = []
515 if len(documents) < 2:
516 return pairs
518 # Generate pairs with different priority tiers
519 for i, doc1 in enumerate(documents):
520 for _j, doc2 in enumerate(documents[i + 1 :], i + 1):
521 # Calculate a simple priority score based on document attributes
522 score = 1.0 # Base score
524 # Adjust score based on document similarity and importance
525 if hasattr(doc1, "score") and hasattr(doc2, "score"):
526 avg_doc_score = (doc1.score + doc2.score) / 2
527 score = min(1.0, avg_doc_score)
529 # Determine tier based on score and document characteristics
530 if score >= 0.8:
531 tier = "primary"
532 elif score >= 0.5:
533 tier = "secondary"
534 else:
535 tier = "tertiary"
537 pairs.append((doc1, doc2, tier, score))
539 # Sort by score (highest first)
540 pairs.sort(key=lambda x: x[3], reverse=True)
541 return pairs
543 async def _filter_by_vector_similarity(
544 self, documents: list[SearchResult]
545 ) -> list[tuple]:
546 return await _filter_by_vector_similarity_ext(self, documents)
548 def _should_analyze_for_conflicts(
549 self, doc1: SearchResult, doc2: SearchResult
550 ) -> bool:
551 """Determine if two documents should be analyzed for conflicts (compatibility method)."""
552 # Basic checks for document validity
553 if not doc1 or not doc2:
554 return False
556 # Check text length - skip very short texts or None text
557 text1 = doc1.text if doc1.text else ""
558 text2 = doc2.text if doc2.text else ""
560 if len(text1.strip()) < 10 or len(text2.strip()) < 10:
561 return False
563 # Skip identical documents (same ID)
564 if hasattr(doc1, "document_id") and hasattr(doc2, "document_id"):
565 if doc1.document_id == doc2.document_id:
566 return False
568 # Skip if documents are exactly the same text
569 if text1.strip() == text2.strip():
570 return False
572 return True