Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/engine.py: 55%

201 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Cross-Document Intelligence Engine. 

3 

4This module implements the main orchestration engine that coordinates all 

5cross-document intelligence analysis including similarity calculation, 

6clustering, citation networks, complementary content discovery, and conflict detection. 

7""" 

8 

9from __future__ import annotations 

10 

11import time 

12from typing import TYPE_CHECKING, Any 

13 

14# Soft-import async clients to avoid hard dependency at import time 

15if TYPE_CHECKING: 

16 pass 

17 

18from ....utils.logging import LoggingConfig 

19from ...models import SearchResult 

20from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer 

21from .analyzers import DocumentClusterAnalyzer 

22from .calculators import DocumentSimilarityCalculator 

23from .citations import CitationNetworkAnalyzer 

24from .detectors import ConflictDetector 

25from .extractors.clustering import DefaultClusterer 

26from .extractors.graph import DefaultGraphBuilder 

27from .extractors.similarity import DefaultSimilarityComputer 

28from .finders import ComplementaryContentFinder 

29from .models import ClusteringStrategy, RelationshipType 

30from .pipeline import CrossDocumentPipeline 

31from .rankers.default import DefaultRanker 

32 

33logger = LoggingConfig.get_logger(__name__) 

34 

35 

36class CrossDocumentIntelligenceEngine: 

37 """Main engine that orchestrates cross-document intelligence analysis.""" 

38 

39 def __init__( 

40 self, 

41 spacy_analyzer: SpaCyQueryAnalyzer, 

42 knowledge_graph=None, 

43 qdrant_client=None, 

44 openai_client=None, 

45 collection_name: str = "documents", 

46 conflict_settings: dict | None = None, 

47 ): 

48 """Initialize the cross-document intelligence engine.""" 

49 self.spacy_analyzer = spacy_analyzer 

50 self.knowledge_graph = knowledge_graph 

51 self.qdrant_client = qdrant_client 

52 self.openai_client = openai_client 

53 self.collection_name = collection_name 

54 self.logger = LoggingConfig.get_logger(__name__) 

55 

56 # Initialize pipeline-based composition (behavior preserved via adapters) 

57 self._pipeline = CrossDocumentPipeline( 

58 similarity_computer=DefaultSimilarityComputer(spacy_analyzer), 

59 clusterer=DefaultClusterer( 

60 similarity_calculator=DocumentSimilarityCalculator(spacy_analyzer) 

61 ), 

62 graph_builder=DefaultGraphBuilder(), 

63 ranker=DefaultRanker(), 

64 ) 

65 

66 # Keep legacy attributes for backward compatibility across the codebase 

67 self.similarity_calculator = DocumentSimilarityCalculator(spacy_analyzer) 

68 self.cluster_analyzer = DocumentClusterAnalyzer(self.similarity_calculator) 

69 self.citation_analyzer = CitationNetworkAnalyzer() 

70 self.complementary_finder = ComplementaryContentFinder( 

71 self.similarity_calculator, knowledge_graph 

72 ) 

73 self.conflict_detector = ConflictDetector( 

74 spacy_analyzer, qdrant_client, openai_client, collection_name 

75 ) 

76 # Optional: LLM provider from core (set by builder) 

77 self.llm_provider = None 

78 if conflict_settings is not None: 

79 validated = self._validate_and_normalize_conflict_settings( 

80 conflict_settings 

81 ) 

82 if validated is not None: 

83 # Apply configuration knobs where supported (respect OpenAI client availability) 

84 self.conflict_detector.llm_enabled = bool( 

85 validated.get("conflict_use_llm", True) 

86 ) and (openai_client is not None) 

87 # Store normalized runtime settings for detector use 

88 self.conflict_detector._settings = validated 

89 else: 

90 # Validation already logged a clear warning; keep detector defaults 

91 pass 

92 

93 def _validate_and_normalize_conflict_settings( 

94 self, settings: object 

95 ) -> dict[str, Any] | None: 

96 """Validate and normalize conflict detection settings. 

97 

98 Returns a sanitized settings dict or None when invalid. On any validation 

99 problem, logs a clear warning and falls back to detector defaults. 

100 """ 

101 from collections.abc import Mapping 

102 

103 if not isinstance(settings, Mapping): 

104 self.logger.warning( 

105 f"Invalid conflict_settings: expected mapping, got {type(settings).__name__}; using defaults" 

106 ) 

107 return None 

108 

109 errors: list[str] = [] 

110 

111 def coerce_bool(value: object, default: bool, key: str) -> bool: 

112 try: 

113 if isinstance(value, bool): 

114 return value 

115 if isinstance(value, int | float): 

116 return value != 0 

117 if isinstance(value, str): 

118 v = value.strip().lower() 

119 if v in {"true", "1", "yes", "y", "on"}: 

120 return True 

121 if v in {"false", "0", "no", "n", "off"}: 

122 return False 

123 raise ValueError("unrecognized boolean value") 

124 except Exception as e: 

125 errors.append(f"{key}: {e}") 

126 return default 

127 

128 def coerce_int_non_negative(value: object, default: int, key: str) -> int: 

129 try: 

130 if isinstance(value, bool): 

131 # Avoid treating booleans as ints 

132 raise ValueError("boolean not allowed for integer field") 

133 if isinstance(value, int | float): 

134 v = int(value) 

135 elif isinstance(value, str): 

136 v = int(value.strip()) 

137 else: 

138 raise ValueError("unsupported type") 

139 return v if v >= 0 else default 

140 except Exception as e: 

141 errors.append(f"{key}: {e}") 

142 return default 

143 

144 def coerce_float_positive(value: object, default: float, key: str) -> float: 

145 try: 

146 if isinstance(value, bool): 

147 raise ValueError("boolean not allowed for float field") 

148 if isinstance(value, int | float): 

149 v = float(value) 

150 elif isinstance(value, str): 

151 v = float(value.strip()) 

152 else: 

153 raise ValueError("unsupported type") 

154 return v if v > 0 else default 

155 except Exception as e: 

156 errors.append(f"{key}: {e}") 

157 return default 

158 

159 # Safe defaults align with ConflictDetector fallbacks 

160 defaults: dict[str, Any] = { 

161 "conflict_use_llm": True, 

162 "conflict_max_llm_pairs": 2, 

163 "conflict_llm_model": "gpt-4o-mini", 

164 "conflict_llm_timeout_s": 12.0, 

165 "conflict_overall_timeout_s": 9.0, 

166 "conflict_text_window_chars": 2000, 

167 "conflict_max_pairs_total": 24, 

168 "conflict_embeddings_timeout_s": 5.0, 

169 "conflict_embeddings_max_concurrency": 5, 

170 # Optional/unused in detector but supported upstream 

171 "conflict_limit_default": 10, 

172 "conflict_tier_caps": { 

173 "primary": 50, 

174 "secondary": 30, 

175 "tertiary": 20, 

176 "fallback": 10, 

177 }, 

178 } 

179 

180 # Start with defaults and override with sanitized values 

181 normalized: dict[str, Any] = dict(defaults) 

182 

183 # Booleans 

184 normalized["conflict_use_llm"] = coerce_bool( 

185 settings.get("conflict_use_llm", defaults["conflict_use_llm"]), 

186 defaults["conflict_use_llm"], 

187 "conflict_use_llm", 

188 ) 

189 

190 # Integers (non-negative) 

191 normalized["conflict_max_llm_pairs"] = coerce_int_non_negative( 

192 settings.get("conflict_max_llm_pairs", defaults["conflict_max_llm_pairs"]), 

193 defaults["conflict_max_llm_pairs"], 

194 "conflict_max_llm_pairs", 

195 ) 

196 normalized["conflict_max_pairs_total"] = coerce_int_non_negative( 

197 settings.get( 

198 "conflict_max_pairs_total", defaults["conflict_max_pairs_total"] 

199 ), 

200 defaults["conflict_max_pairs_total"], 

201 "conflict_max_pairs_total", 

202 ) 

203 normalized["conflict_text_window_chars"] = coerce_int_non_negative( 

204 settings.get( 

205 "conflict_text_window_chars", defaults["conflict_text_window_chars"] 

206 ), 

207 defaults["conflict_text_window_chars"], 

208 "conflict_text_window_chars", 

209 ) 

210 normalized["conflict_embeddings_max_concurrency"] = coerce_int_non_negative( 

211 settings.get( 

212 "conflict_embeddings_max_concurrency", 

213 defaults["conflict_embeddings_max_concurrency"], 

214 ), 

215 defaults["conflict_embeddings_max_concurrency"], 

216 "conflict_embeddings_max_concurrency", 

217 ) 

218 normalized["conflict_limit_default"] = coerce_int_non_negative( 

219 settings.get("conflict_limit_default", defaults["conflict_limit_default"]), 

220 defaults["conflict_limit_default"], 

221 "conflict_limit_default", 

222 ) 

223 

224 # Floats (positive) 

225 normalized["conflict_llm_timeout_s"] = coerce_float_positive( 

226 settings.get("conflict_llm_timeout_s", defaults["conflict_llm_timeout_s"]), 

227 defaults["conflict_llm_timeout_s"], 

228 "conflict_llm_timeout_s", 

229 ) 

230 normalized["conflict_overall_timeout_s"] = coerce_float_positive( 

231 settings.get( 

232 "conflict_overall_timeout_s", defaults["conflict_overall_timeout_s"] 

233 ), 

234 defaults["conflict_overall_timeout_s"], 

235 "conflict_overall_timeout_s", 

236 ) 

237 normalized["conflict_embeddings_timeout_s"] = coerce_float_positive( 

238 settings.get( 

239 "conflict_embeddings_timeout_s", 

240 defaults["conflict_embeddings_timeout_s"], 

241 ), 

242 defaults["conflict_embeddings_timeout_s"], 

243 "conflict_embeddings_timeout_s", 

244 ) 

245 

246 # Strings 

247 llm_model = settings.get("conflict_llm_model", defaults["conflict_llm_model"]) 

248 if isinstance(llm_model, str) and llm_model.strip(): 

249 normalized["conflict_llm_model"] = llm_model.strip() 

250 else: 

251 if "conflict_llm_model" in settings: 

252 errors.append("conflict_llm_model: expected non-empty string") 

253 normalized["conflict_llm_model"] = defaults["conflict_llm_model"] 

254 

255 # Nested mapping: conflict_tier_caps 

256 tier_caps_default = defaults["conflict_tier_caps"] 

257 tier_caps_value = settings.get("conflict_tier_caps", tier_caps_default) 

258 if isinstance(tier_caps_value, Mapping): 

259 tier_caps_normalized: dict[str, int] = dict(tier_caps_default) 

260 for k in ("primary", "secondary", "tertiary", "fallback"): 

261 tier_caps_normalized[k] = coerce_int_non_negative( 

262 tier_caps_value.get(k, tier_caps_default[k]), 

263 tier_caps_default[k], 

264 f"conflict_tier_caps.{k}", 

265 ) 

266 normalized["conflict_tier_caps"] = tier_caps_normalized 

267 else: 

268 if "conflict_tier_caps" in settings: 

269 errors.append("conflict_tier_caps: expected mapping with integer caps") 

270 normalized["conflict_tier_caps"] = dict(tier_caps_default) 

271 

272 if errors: 

273 self.logger.warning( 

274 "Invalid values in conflict_settings; using defaults for invalid fields: " 

275 + "; ".join(errors) 

276 ) 

277 

278 return normalized 

279 

280 def analyze_document_relationships(self, documents) -> dict[str, Any]: 

281 """Lightweight relationship analysis focusing on essential relationships.""" 

282 start_time = time.time() 

283 

284 self.logger.info( 

285 f"Starting lightweight cross-document analysis for {len(documents)} documents" 

286 ) 

287 

288 # Skip heavy analysis components for performance: 

289 # - Skip similarity matrix computation (O(n²) operation) 

290 # - Skip citation network analysis 

291 # - Skip complementary content analysis 

292 # - Skip conflict detection 

293 

294 # Keep only: document clustering for essential relationships 

295 clusters = self.cluster_analyzer.create_clusters( 

296 documents, 

297 strategy=ClusteringStrategy.MIXED_FEATURES, 

298 max_clusters=5, # Reduced from 10 to 5 for faster processing 

299 min_cluster_size=2, 

300 ) 

301 

302 processing_time = (time.time() - start_time) * 1000 

303 

304 # Build lightweight response focused on cluster relationships 

305 # Need to include documents in clusters for relationship extraction 

306 cluster_data = [] 

307 doc_id_to_doc = {} 

308 

309 # Create document lookup for mapping cluster document IDs to actual documents 

310 for doc in documents: 

311 doc_id = f"{doc.source_type}:{doc.source_title}" 

312 doc_id_to_doc[doc_id] = doc 

313 

314 for cluster in clusters: 

315 cluster_summary = cluster.get_cluster_summary() 

316 

317 # Add actual document objects for relationship extraction 

318 cluster_documents = [] 

319 for doc_id in cluster.documents: 

320 if doc_id in doc_id_to_doc: 

321 doc = doc_id_to_doc[doc_id] 

322 cluster_documents.append( 

323 { 

324 "document_id": doc.document_id, 

325 "title": doc.source_title, 

326 "source_title": doc.source_title, 

327 "source_type": doc.source_type, 

328 } 

329 ) 

330 

331 cluster_summary["documents"] = cluster_documents 

332 cluster_data.append(cluster_summary) 

333 

334 analysis_results = { 

335 "summary": { 

336 "total_documents": len(documents), 

337 "processing_time_ms": processing_time, 

338 "clusters_found": len(clusters), 

339 "analysis_mode": "lightweight", 

340 }, 

341 "document_clusters": cluster_data, 

342 "relationships_count": sum( 

343 len(cluster.documents) * (len(cluster.documents) - 1) // 2 

344 for cluster in clusters 

345 if len(cluster.documents) > 1 

346 ), 

347 } 

348 

349 self.logger.info( 

350 f"Lightweight cross-document analysis completed in {processing_time:.2f}ms" 

351 ) 

352 

353 return analysis_results 

354 

355 def find_document_relationships( 

356 self, 

357 target_doc_id: str, 

358 documents: list[SearchResult], 

359 relationship_types: list[RelationshipType] = None, 

360 ) -> dict[str, list[dict[str, Any]]]: 

361 """Find specific relationships for a target document.""" 

362 if relationship_types is None: 

363 relationship_types = [ 

364 RelationshipType.SEMANTIC_SIMILARITY, 

365 RelationshipType.COMPLEMENTARY, 

366 RelationshipType.HIERARCHICAL, 

367 ] 

368 

369 # Find target document 

370 target_doc = None 

371 for doc in documents: 

372 if f"{doc.source_type}:{doc.source_title}" == target_doc_id: 

373 target_doc = doc 

374 break 

375 

376 if not target_doc: 

377 return {"error": "Target document not found"} 

378 

379 relationships = {rel_type.value: [] for rel_type in relationship_types} 

380 

381 for rel_type in relationship_types: 

382 if rel_type == RelationshipType.SEMANTIC_SIMILARITY: 

383 # Find similar documents 

384 for doc in documents: 

385 if doc != target_doc: 

386 similarity = self.similarity_calculator.calculate_similarity( 

387 target_doc, doc 

388 ) 

389 if similarity.similarity_score > 0.5: 

390 relationships[rel_type.value].append( 

391 { 

392 "document_id": f"{doc.source_type}:{doc.source_title}", 

393 "score": similarity.similarity_score, 

394 "explanation": similarity.get_display_explanation(), 

395 } 

396 ) 

397 

398 elif rel_type == RelationshipType.COMPLEMENTARY: 

399 # Find complementary content 

400 complementary = self.complementary_finder.find_complementary_content( 

401 target_doc, documents 

402 ) 

403 relationships[rel_type.value] = complementary.get_top_recommendations(5) 

404 

405 elif rel_type == RelationshipType.HIERARCHICAL: 

406 # Find hierarchical relationships 

407 for doc in documents: 

408 if doc != target_doc: 

409 if ( 

410 doc.parent_id == target_doc_id 

411 or target_doc.parent_id 

412 == f"{doc.source_type}:{doc.source_title}" 

413 ): 

414 relationships[rel_type.value].append( 

415 { 

416 "document_id": f"{doc.source_type}:{doc.source_title}", 

417 "relationship": ( 

418 "parent" 

419 if doc.parent_id == target_doc_id 

420 else "child" 

421 ), 

422 "explanation": "Direct hierarchical relationship", 

423 } 

424 ) 

425 

426 # Sort each relationship type by score/relevance 

427 for rel_type in relationships: 

428 if relationships[rel_type]: 

429 relationships[rel_type] = sorted( 

430 relationships[rel_type], 

431 key=lambda x: x.get("score", x.get("relevance_score", 0)), 

432 reverse=True, 

433 )[ 

434 :5 

435 ] # Top 5 for each type 

436 

437 return relationships 

438 

439 def _build_similarity_matrix( 

440 self, documents: list[SearchResult] 

441 ) -> dict[str, dict[str, float]]: 

442 """Build similarity matrix for all document pairs.""" 

443 matrix = {} 

444 

445 for i, doc1 in enumerate(documents): 

446 doc1_id = f"{doc1.source_type}:{doc1.source_title}" 

447 matrix[doc1_id] = {} 

448 

449 for j, doc2 in enumerate(documents): 

450 doc2_id = f"{doc2.source_type}:{doc2.source_title}" 

451 

452 if i == j: 

453 matrix[doc1_id][doc2_id] = 1.0 

454 else: 

455 # Ensure rows exist before accessing for symmetry checks 

456 if doc2_id not in matrix: 

457 matrix[doc2_id] = {} 

458 # If symmetric value already computed, reuse it 

459 if doc1_id in matrix.get(doc2_id, {}): 

460 matrix[doc1_id][doc2_id] = matrix[doc2_id][doc1_id] 

461 continue 

462 # Otherwise compute and store symmetrically 

463 similarity = self.similarity_calculator.calculate_similarity( 

464 doc1, doc2 

465 ) 

466 score = similarity.similarity_score 

467 matrix[doc1_id][doc2_id] = score 

468 matrix[doc2_id][doc1_id] = score 

469 

470 return matrix 

471 

472 def _extract_similarity_insights( 

473 self, similarity_matrix: dict[str, dict[str, float]] 

474 ) -> dict[str, Any]: 

475 """Extract insights from the similarity matrix.""" 

476 if not similarity_matrix: 

477 return {} 

478 

479 all_scores = [] 

480 for doc1_scores in similarity_matrix.values(): 

481 for _doc2_id, score in doc1_scores.items(): 

482 if score < 1.0: # Exclude self-similarity 

483 all_scores.append(score) 

484 

485 if not all_scores: 

486 return {} 

487 

488 insights = { 

489 "average_similarity": sum(all_scores) / len(all_scores), 

490 "max_similarity": max(all_scores), 

491 "min_similarity": min(all_scores), 

492 "high_similarity_pairs": sum(1 for score in all_scores if score > 0.7), 

493 "total_pairs_analyzed": len(all_scores), 

494 } 

495 # Alias for tests expecting 'highly_similar_pairs' 

496 insights["highly_similar_pairs"] = insights["high_similarity_pairs"] 

497 

498 return insights