Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/engine.py: 55%
201 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Cross-Document Intelligence Engine.
4This module implements the main orchestration engine that coordinates all
5cross-document intelligence analysis including similarity calculation,
6clustering, citation networks, complementary content discovery, and conflict detection.
7"""
9from __future__ import annotations
11import time
12from typing import TYPE_CHECKING, Any
14# Soft-import async clients to avoid hard dependency at import time
15if TYPE_CHECKING:
16 pass
18from ....utils.logging import LoggingConfig
19from ...models import SearchResult
20from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer
21from .analyzers import DocumentClusterAnalyzer
22from .calculators import DocumentSimilarityCalculator
23from .citations import CitationNetworkAnalyzer
24from .detectors import ConflictDetector
25from .extractors.clustering import DefaultClusterer
26from .extractors.graph import DefaultGraphBuilder
27from .extractors.similarity import DefaultSimilarityComputer
28from .finders import ComplementaryContentFinder
29from .models import ClusteringStrategy, RelationshipType
30from .pipeline import CrossDocumentPipeline
31from .rankers.default import DefaultRanker
33logger = LoggingConfig.get_logger(__name__)
36class CrossDocumentIntelligenceEngine:
37 """Main engine that orchestrates cross-document intelligence analysis."""
39 def __init__(
40 self,
41 spacy_analyzer: SpaCyQueryAnalyzer,
42 knowledge_graph=None,
43 qdrant_client=None,
44 openai_client=None,
45 collection_name: str = "documents",
46 conflict_settings: dict | None = None,
47 ):
48 """Initialize the cross-document intelligence engine."""
49 self.spacy_analyzer = spacy_analyzer
50 self.knowledge_graph = knowledge_graph
51 self.qdrant_client = qdrant_client
52 self.openai_client = openai_client
53 self.collection_name = collection_name
54 self.logger = LoggingConfig.get_logger(__name__)
56 # Initialize pipeline-based composition (behavior preserved via adapters)
57 self._pipeline = CrossDocumentPipeline(
58 similarity_computer=DefaultSimilarityComputer(spacy_analyzer),
59 clusterer=DefaultClusterer(
60 similarity_calculator=DocumentSimilarityCalculator(spacy_analyzer)
61 ),
62 graph_builder=DefaultGraphBuilder(),
63 ranker=DefaultRanker(),
64 )
66 # Keep legacy attributes for backward compatibility across the codebase
67 self.similarity_calculator = DocumentSimilarityCalculator(spacy_analyzer)
68 self.cluster_analyzer = DocumentClusterAnalyzer(self.similarity_calculator)
69 self.citation_analyzer = CitationNetworkAnalyzer()
70 self.complementary_finder = ComplementaryContentFinder(
71 self.similarity_calculator, knowledge_graph
72 )
73 self.conflict_detector = ConflictDetector(
74 spacy_analyzer, qdrant_client, openai_client, collection_name
75 )
76 # Optional: LLM provider from core (set by builder)
77 self.llm_provider = None
78 if conflict_settings is not None:
79 validated = self._validate_and_normalize_conflict_settings(
80 conflict_settings
81 )
82 if validated is not None:
83 # Apply configuration knobs where supported (respect OpenAI client availability)
84 self.conflict_detector.llm_enabled = bool(
85 validated.get("conflict_use_llm", True)
86 ) and (openai_client is not None)
87 # Store normalized runtime settings for detector use
88 self.conflict_detector._settings = validated
89 else:
90 # Validation already logged a clear warning; keep detector defaults
91 pass
93 def _validate_and_normalize_conflict_settings(
94 self, settings: object
95 ) -> dict[str, Any] | None:
96 """Validate and normalize conflict detection settings.
98 Returns a sanitized settings dict or None when invalid. On any validation
99 problem, logs a clear warning and falls back to detector defaults.
100 """
101 from collections.abc import Mapping
103 if not isinstance(settings, Mapping):
104 self.logger.warning(
105 f"Invalid conflict_settings: expected mapping, got {type(settings).__name__}; using defaults"
106 )
107 return None
109 errors: list[str] = []
111 def coerce_bool(value: object, default: bool, key: str) -> bool:
112 try:
113 if isinstance(value, bool):
114 return value
115 if isinstance(value, int | float):
116 return value != 0
117 if isinstance(value, str):
118 v = value.strip().lower()
119 if v in {"true", "1", "yes", "y", "on"}:
120 return True
121 if v in {"false", "0", "no", "n", "off"}:
122 return False
123 raise ValueError("unrecognized boolean value")
124 except Exception as e:
125 errors.append(f"{key}: {e}")
126 return default
128 def coerce_int_non_negative(value: object, default: int, key: str) -> int:
129 try:
130 if isinstance(value, bool):
131 # Avoid treating booleans as ints
132 raise ValueError("boolean not allowed for integer field")
133 if isinstance(value, int | float):
134 v = int(value)
135 elif isinstance(value, str):
136 v = int(value.strip())
137 else:
138 raise ValueError("unsupported type")
139 return v if v >= 0 else default
140 except Exception as e:
141 errors.append(f"{key}: {e}")
142 return default
144 def coerce_float_positive(value: object, default: float, key: str) -> float:
145 try:
146 if isinstance(value, bool):
147 raise ValueError("boolean not allowed for float field")
148 if isinstance(value, int | float):
149 v = float(value)
150 elif isinstance(value, str):
151 v = float(value.strip())
152 else:
153 raise ValueError("unsupported type")
154 return v if v > 0 else default
155 except Exception as e:
156 errors.append(f"{key}: {e}")
157 return default
159 # Safe defaults align with ConflictDetector fallbacks
160 defaults: dict[str, Any] = {
161 "conflict_use_llm": True,
162 "conflict_max_llm_pairs": 2,
163 "conflict_llm_model": "gpt-4o-mini",
164 "conflict_llm_timeout_s": 12.0,
165 "conflict_overall_timeout_s": 9.0,
166 "conflict_text_window_chars": 2000,
167 "conflict_max_pairs_total": 24,
168 "conflict_embeddings_timeout_s": 5.0,
169 "conflict_embeddings_max_concurrency": 5,
170 # Optional/unused in detector but supported upstream
171 "conflict_limit_default": 10,
172 "conflict_tier_caps": {
173 "primary": 50,
174 "secondary": 30,
175 "tertiary": 20,
176 "fallback": 10,
177 },
178 }
180 # Start with defaults and override with sanitized values
181 normalized: dict[str, Any] = dict(defaults)
183 # Booleans
184 normalized["conflict_use_llm"] = coerce_bool(
185 settings.get("conflict_use_llm", defaults["conflict_use_llm"]),
186 defaults["conflict_use_llm"],
187 "conflict_use_llm",
188 )
190 # Integers (non-negative)
191 normalized["conflict_max_llm_pairs"] = coerce_int_non_negative(
192 settings.get("conflict_max_llm_pairs", defaults["conflict_max_llm_pairs"]),
193 defaults["conflict_max_llm_pairs"],
194 "conflict_max_llm_pairs",
195 )
196 normalized["conflict_max_pairs_total"] = coerce_int_non_negative(
197 settings.get(
198 "conflict_max_pairs_total", defaults["conflict_max_pairs_total"]
199 ),
200 defaults["conflict_max_pairs_total"],
201 "conflict_max_pairs_total",
202 )
203 normalized["conflict_text_window_chars"] = coerce_int_non_negative(
204 settings.get(
205 "conflict_text_window_chars", defaults["conflict_text_window_chars"]
206 ),
207 defaults["conflict_text_window_chars"],
208 "conflict_text_window_chars",
209 )
210 normalized["conflict_embeddings_max_concurrency"] = coerce_int_non_negative(
211 settings.get(
212 "conflict_embeddings_max_concurrency",
213 defaults["conflict_embeddings_max_concurrency"],
214 ),
215 defaults["conflict_embeddings_max_concurrency"],
216 "conflict_embeddings_max_concurrency",
217 )
218 normalized["conflict_limit_default"] = coerce_int_non_negative(
219 settings.get("conflict_limit_default", defaults["conflict_limit_default"]),
220 defaults["conflict_limit_default"],
221 "conflict_limit_default",
222 )
224 # Floats (positive)
225 normalized["conflict_llm_timeout_s"] = coerce_float_positive(
226 settings.get("conflict_llm_timeout_s", defaults["conflict_llm_timeout_s"]),
227 defaults["conflict_llm_timeout_s"],
228 "conflict_llm_timeout_s",
229 )
230 normalized["conflict_overall_timeout_s"] = coerce_float_positive(
231 settings.get(
232 "conflict_overall_timeout_s", defaults["conflict_overall_timeout_s"]
233 ),
234 defaults["conflict_overall_timeout_s"],
235 "conflict_overall_timeout_s",
236 )
237 normalized["conflict_embeddings_timeout_s"] = coerce_float_positive(
238 settings.get(
239 "conflict_embeddings_timeout_s",
240 defaults["conflict_embeddings_timeout_s"],
241 ),
242 defaults["conflict_embeddings_timeout_s"],
243 "conflict_embeddings_timeout_s",
244 )
246 # Strings
247 llm_model = settings.get("conflict_llm_model", defaults["conflict_llm_model"])
248 if isinstance(llm_model, str) and llm_model.strip():
249 normalized["conflict_llm_model"] = llm_model.strip()
250 else:
251 if "conflict_llm_model" in settings:
252 errors.append("conflict_llm_model: expected non-empty string")
253 normalized["conflict_llm_model"] = defaults["conflict_llm_model"]
255 # Nested mapping: conflict_tier_caps
256 tier_caps_default = defaults["conflict_tier_caps"]
257 tier_caps_value = settings.get("conflict_tier_caps", tier_caps_default)
258 if isinstance(tier_caps_value, Mapping):
259 tier_caps_normalized: dict[str, int] = dict(tier_caps_default)
260 for k in ("primary", "secondary", "tertiary", "fallback"):
261 tier_caps_normalized[k] = coerce_int_non_negative(
262 tier_caps_value.get(k, tier_caps_default[k]),
263 tier_caps_default[k],
264 f"conflict_tier_caps.{k}",
265 )
266 normalized["conflict_tier_caps"] = tier_caps_normalized
267 else:
268 if "conflict_tier_caps" in settings:
269 errors.append("conflict_tier_caps: expected mapping with integer caps")
270 normalized["conflict_tier_caps"] = dict(tier_caps_default)
272 if errors:
273 self.logger.warning(
274 "Invalid values in conflict_settings; using defaults for invalid fields: "
275 + "; ".join(errors)
276 )
278 return normalized
280 def analyze_document_relationships(self, documents) -> dict[str, Any]:
281 """Lightweight relationship analysis focusing on essential relationships."""
282 start_time = time.time()
284 self.logger.info(
285 f"Starting lightweight cross-document analysis for {len(documents)} documents"
286 )
288 # Skip heavy analysis components for performance:
289 # - Skip similarity matrix computation (O(n²) operation)
290 # - Skip citation network analysis
291 # - Skip complementary content analysis
292 # - Skip conflict detection
294 # Keep only: document clustering for essential relationships
295 clusters = self.cluster_analyzer.create_clusters(
296 documents,
297 strategy=ClusteringStrategy.MIXED_FEATURES,
298 max_clusters=5, # Reduced from 10 to 5 for faster processing
299 min_cluster_size=2,
300 )
302 processing_time = (time.time() - start_time) * 1000
304 # Build lightweight response focused on cluster relationships
305 # Need to include documents in clusters for relationship extraction
306 cluster_data = []
307 doc_id_to_doc = {}
309 # Create document lookup for mapping cluster document IDs to actual documents
310 for doc in documents:
311 doc_id = f"{doc.source_type}:{doc.source_title}"
312 doc_id_to_doc[doc_id] = doc
314 for cluster in clusters:
315 cluster_summary = cluster.get_cluster_summary()
317 # Add actual document objects for relationship extraction
318 cluster_documents = []
319 for doc_id in cluster.documents:
320 if doc_id in doc_id_to_doc:
321 doc = doc_id_to_doc[doc_id]
322 cluster_documents.append(
323 {
324 "document_id": doc.document_id,
325 "title": doc.source_title,
326 "source_title": doc.source_title,
327 "source_type": doc.source_type,
328 }
329 )
331 cluster_summary["documents"] = cluster_documents
332 cluster_data.append(cluster_summary)
334 analysis_results = {
335 "summary": {
336 "total_documents": len(documents),
337 "processing_time_ms": processing_time,
338 "clusters_found": len(clusters),
339 "analysis_mode": "lightweight",
340 },
341 "document_clusters": cluster_data,
342 "relationships_count": sum(
343 len(cluster.documents) * (len(cluster.documents) - 1) // 2
344 for cluster in clusters
345 if len(cluster.documents) > 1
346 ),
347 }
349 self.logger.info(
350 f"Lightweight cross-document analysis completed in {processing_time:.2f}ms"
351 )
353 return analysis_results
355 def find_document_relationships(
356 self,
357 target_doc_id: str,
358 documents: list[SearchResult],
359 relationship_types: list[RelationshipType] = None,
360 ) -> dict[str, list[dict[str, Any]]]:
361 """Find specific relationships for a target document."""
362 if relationship_types is None:
363 relationship_types = [
364 RelationshipType.SEMANTIC_SIMILARITY,
365 RelationshipType.COMPLEMENTARY,
366 RelationshipType.HIERARCHICAL,
367 ]
369 # Find target document
370 target_doc = None
371 for doc in documents:
372 if f"{doc.source_type}:{doc.source_title}" == target_doc_id:
373 target_doc = doc
374 break
376 if not target_doc:
377 return {"error": "Target document not found"}
379 relationships = {rel_type.value: [] for rel_type in relationship_types}
381 for rel_type in relationship_types:
382 if rel_type == RelationshipType.SEMANTIC_SIMILARITY:
383 # Find similar documents
384 for doc in documents:
385 if doc != target_doc:
386 similarity = self.similarity_calculator.calculate_similarity(
387 target_doc, doc
388 )
389 if similarity.similarity_score > 0.5:
390 relationships[rel_type.value].append(
391 {
392 "document_id": f"{doc.source_type}:{doc.source_title}",
393 "score": similarity.similarity_score,
394 "explanation": similarity.get_display_explanation(),
395 }
396 )
398 elif rel_type == RelationshipType.COMPLEMENTARY:
399 # Find complementary content
400 complementary = self.complementary_finder.find_complementary_content(
401 target_doc, documents
402 )
403 relationships[rel_type.value] = complementary.get_top_recommendations(5)
405 elif rel_type == RelationshipType.HIERARCHICAL:
406 # Find hierarchical relationships
407 for doc in documents:
408 if doc != target_doc:
409 if (
410 doc.parent_id == target_doc_id
411 or target_doc.parent_id
412 == f"{doc.source_type}:{doc.source_title}"
413 ):
414 relationships[rel_type.value].append(
415 {
416 "document_id": f"{doc.source_type}:{doc.source_title}",
417 "relationship": (
418 "parent"
419 if doc.parent_id == target_doc_id
420 else "child"
421 ),
422 "explanation": "Direct hierarchical relationship",
423 }
424 )
426 # Sort each relationship type by score/relevance
427 for rel_type in relationships:
428 if relationships[rel_type]:
429 relationships[rel_type] = sorted(
430 relationships[rel_type],
431 key=lambda x: x.get("score", x.get("relevance_score", 0)),
432 reverse=True,
433 )[
434 :5
435 ] # Top 5 for each type
437 return relationships
439 def _build_similarity_matrix(
440 self, documents: list[SearchResult]
441 ) -> dict[str, dict[str, float]]:
442 """Build similarity matrix for all document pairs."""
443 matrix = {}
445 for i, doc1 in enumerate(documents):
446 doc1_id = f"{doc1.source_type}:{doc1.source_title}"
447 matrix[doc1_id] = {}
449 for j, doc2 in enumerate(documents):
450 doc2_id = f"{doc2.source_type}:{doc2.source_title}"
452 if i == j:
453 matrix[doc1_id][doc2_id] = 1.0
454 else:
455 # Ensure rows exist before accessing for symmetry checks
456 if doc2_id not in matrix:
457 matrix[doc2_id] = {}
458 # If symmetric value already computed, reuse it
459 if doc1_id in matrix.get(doc2_id, {}):
460 matrix[doc1_id][doc2_id] = matrix[doc2_id][doc1_id]
461 continue
462 # Otherwise compute and store symmetrically
463 similarity = self.similarity_calculator.calculate_similarity(
464 doc1, doc2
465 )
466 score = similarity.similarity_score
467 matrix[doc1_id][doc2_id] = score
468 matrix[doc2_id][doc1_id] = score
470 return matrix
472 def _extract_similarity_insights(
473 self, similarity_matrix: dict[str, dict[str, float]]
474 ) -> dict[str, Any]:
475 """Extract insights from the similarity matrix."""
476 if not similarity_matrix:
477 return {}
479 all_scores = []
480 for doc1_scores in similarity_matrix.values():
481 for _doc2_id, score in doc1_scores.items():
482 if score < 1.0: # Exclude self-similarity
483 all_scores.append(score)
485 if not all_scores:
486 return {}
488 insights = {
489 "average_similarity": sum(all_scores) / len(all_scores),
490 "max_similarity": max(all_scores),
491 "min_similarity": min(all_scores),
492 "high_similarity_pairs": sum(1 for score in all_scores if score > 0.7),
493 "total_pairs_analyzed": len(all_scores),
494 }
495 # Alias for tests expecting 'highly_similar_pairs'
496 insights["highly_similar_pairs"] = insights["high_similarity_pairs"]
498 return insights