Coverage for src/qdrant_loader_mcp_server/search/engine/intelligence.py: 54%

216 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Cross-Document Intelligence Operations. 

3 

4This module implements cross-document intelligence functionality including 

5document relationship analysis, similarity detection, conflict detection, 

6complementary content discovery, and document clustering. 

7""" 

8 

9from contextlib import contextmanager 

10from typing import TYPE_CHECKING, Any 

11 

12if TYPE_CHECKING: 

13 from .core import SearchEngine 

14 

15from ...utils.logging import LoggingConfig 

16from ..enhanced.cross_document_intelligence import ClusteringStrategy, SimilarityMetric 

17 

18logger = LoggingConfig.get_logger(__name__) 

19 

20 

21class IntelligenceOperations: 

22 """Handles cross-document intelligence operations.""" 

23 

24 def __init__(self, engine: "SearchEngine"): 

25 """Initialize with search engine reference.""" 

26 self.engine = engine 

27 self.logger = LoggingConfig.get_logger(__name__) 

28 

29 async def analyze_document_relationships( 

30 self, 

31 query: str, 

32 limit: int | None = None, 

33 source_types: list[str] | None = None, 

34 project_ids: list[str] | None = None, 

35 ) -> dict[str, Any]: 

36 """ 

37 Analyze relationships between documents from search results. 

38 

39 Args: 

40 query: Search query to get documents for analysis 

41 limit: Maximum number of documents to analyze 

42 source_types: Optional list of source types to filter by 

43 project_ids: Optional list of project IDs to filter by 

44 

45 Returns: 

46 Comprehensive cross-document relationship analysis 

47 """ 

48 if not self.engine.hybrid_search: 

49 raise RuntimeError("Search engine not initialized") 

50 

51 try: 

52 # Get documents for analysis 

53 # Honor default conflict limit from config if caller didn't override 

54 effective_limit = limit 

55 config = getattr(self.engine, "config", None) 

56 if limit is None: 

57 if config is not None: 

58 default_limit = getattr(config, "conflict_limit_default", None) 

59 if isinstance(default_limit, int): 

60 effective_limit = default_limit 

61 else: 

62 effective_limit = 20 

63 else: 

64 effective_limit = 20 

65 

66 documents = await self.engine.hybrid_search.search( 

67 query=query, 

68 limit=effective_limit, 

69 source_types=source_types, 

70 project_ids=project_ids, 

71 ) 

72 

73 if len(documents) < 2: 

74 return { 

75 "error": "Need at least 2 documents for relationship analysis", 

76 "document_count": len(documents), 

77 } 

78 

79 # Perform cross-document analysis 

80 analysis = await self.engine.hybrid_search.analyze_document_relationships( 

81 documents 

82 ) 

83 

84 # Add query metadata 

85 analysis["query_metadata"] = { 

86 "original_query": query, 

87 "document_count": len(documents), 

88 "source_types": source_types, 

89 "project_ids": project_ids, 

90 } 

91 

92 return analysis 

93 

94 except Exception as e: 

95 self.logger.error( 

96 "Document relationship analysis failed", error=str(e), query=query 

97 ) 

98 raise 

99 

100 async def find_similar_documents( 

101 self, 

102 target_query: str, 

103 comparison_query: str, 

104 similarity_metrics: list[str] | None = None, 

105 max_similar: int = 5, 

106 source_types: list[str] | None = None, 

107 project_ids: list[str] | None = None, 

108 ) -> dict[str, Any]: 

109 """ 

110 Find documents similar to a target document. 

111 

112 Args: 

113 target_query: Query to find the target document 

114 comparison_query: Query to get documents to compare against 

115 similarity_metrics: Similarity metrics to use 

116 max_similar: Maximum number of similar documents to return 

117 source_types: Optional list of source types to filter by 

118 project_ids: Optional list of project IDs to filter by 

119 

120 Returns: 

121 List of similar documents with similarity scores 

122 """ 

123 if not self.engine.hybrid_search: 

124 raise RuntimeError("Search engine not initialized") 

125 

126 try: 

127 # Get target document (first result from target query) 

128 target_results = await self.engine.hybrid_search.search( 

129 query=target_query, 

130 limit=1, 

131 source_types=source_types, 

132 project_ids=project_ids, 

133 ) 

134 

135 if not target_results: 

136 return { 

137 "error": "No target document found", 

138 "target_query": target_query, 

139 } 

140 

141 target_doc = target_results[0] 

142 

143 # Get comparison documents 

144 comparison_results = await self.engine.hybrid_search.search( 

145 query=comparison_query, 

146 limit=50, # Get more candidates for comparison 

147 source_types=source_types, 

148 project_ids=project_ids, 

149 ) 

150 

151 if len(comparison_results) < 2: 

152 return { 

153 "error": "Need at least 1 comparison document", 

154 "comparison_count": len(comparison_results), 

155 } 

156 

157 # Parse similarity metrics 

158 metric_enums = [] 

159 if similarity_metrics: 

160 for metric_str in similarity_metrics: 

161 try: 

162 metric_enums.append(SimilarityMetric(metric_str)) 

163 except ValueError: 

164 self.logger.warning(f"Unknown similarity metric: {metric_str}") 

165 

166 # Find similar documents 

167 similar = await self.engine.hybrid_search.find_similar_documents( 

168 target_doc, comparison_results, metric_enums or None, max_similar 

169 ) 

170 

171 return { 

172 "target_document": { 

173 "document_id": target_doc.document_id, 

174 "title": target_doc.get_display_title(), 

175 "source_type": target_doc.source_type, 

176 }, 

177 "similar_documents": similar, 

178 "similarity_metrics_used": ( 

179 [m.value for m in metric_enums] if metric_enums else "default" 

180 ), 

181 "comparison_documents_analyzed": len(comparison_results), 

182 } 

183 

184 except Exception as e: 

185 self.logger.error("Similarity search failed", error=str(e)) 

186 raise 

187 

188 async def detect_document_conflicts( 

189 self, 

190 query: str, 

191 limit: int | None = None, 

192 source_types: list[str] | None = None, 

193 project_ids: list[str] | None = None, 

194 *, 

195 use_llm: bool | None = None, 

196 max_llm_pairs: int | None = None, 

197 overall_timeout_s: float | None = None, 

198 max_pairs_total: int | None = None, 

199 text_window_chars: int | None = None, 

200 ) -> dict[str, Any]: 

201 """ 

202 Detect conflicts between documents. 

203 

204 Args: 

205 query: Search query to get documents for conflict analysis 

206 limit: Maximum number of documents to analyze 

207 source_types: Optional list of source types to filter by 

208 project_ids: Optional list of project IDs to filter by 

209 

210 Returns: 

211 Conflict analysis with detected conflicts and resolution suggestions 

212 """ 

213 if not self.engine.hybrid_search: 

214 raise RuntimeError("Search engine not initialized") 

215 

216 try: 

217 # Get documents for conflict analysis 

218 effective_limit = limit 

219 config = getattr(self.engine, "config", None) 

220 if limit is None and config is not None: 

221 default_limit = getattr(config, "conflict_limit_default", None) 

222 if isinstance(default_limit, int): 

223 effective_limit = default_limit 

224 

225 documents = await self.engine.hybrid_search.search( 

226 query=query, 

227 limit=effective_limit, 

228 source_types=source_types, 

229 project_ids=project_ids, 

230 ) 

231 

232 if len(documents) < 2: 

233 return { 

234 "conflicts": [], 

235 "resolution_suggestions": [], 

236 "message": "Need at least 2 documents for conflict detection", 

237 "document_count": len(documents), 

238 } 

239 

240 # Detect conflicts with optional per-call overrides applied 

241 detector = self.engine.hybrid_search.cross_document_engine.conflict_detector 

242 call_overrides: dict[str, Any] = {} 

243 if use_llm is not None: 

244 call_overrides["conflict_use_llm"] = bool(use_llm) 

245 if isinstance(max_llm_pairs, int): 

246 call_overrides["conflict_max_llm_pairs"] = max_llm_pairs 

247 if isinstance(overall_timeout_s, int | float): 

248 call_overrides["conflict_overall_timeout_s"] = float(overall_timeout_s) 

249 if isinstance(max_pairs_total, int): 

250 call_overrides["conflict_max_pairs_total"] = max_pairs_total 

251 if isinstance(text_window_chars, int): 

252 call_overrides["conflict_text_window_chars"] = text_window_chars 

253 

254 @contextmanager 

255 def temporary_detector_settings(det: Any, overrides: dict[str, Any] | None): 

256 """Temporarily apply merged detector settings and restore afterwards.""" 

257 previous = ( 

258 getattr(det, "_settings", {}) if hasattr(det, "_settings") else {} 

259 ) 

260 if not overrides: 

261 # No overrides to apply; simply yield control 

262 yield 

263 return 

264 merged_settings = dict(previous) 

265 merged_settings.update(overrides) 

266 try: 

267 det._settings = merged_settings # type: ignore[attr-defined] 

268 except Exception: 

269 # If settings assignment fails, proceed without overriding 

270 pass 

271 try: 

272 yield 

273 finally: 

274 # Always attempt to restore previous settings 

275 try: 

276 det._settings = previous # type: ignore[attr-defined] 

277 except Exception: 

278 pass 

279 

280 with temporary_detector_settings(detector, call_overrides): 

281 conflicts = await self.engine.hybrid_search.detect_document_conflicts( 

282 documents 

283 ) 

284 

285 # Add query metadata and original documents for formatting 

286 conflicts["query_metadata"] = { 

287 "original_query": query, 

288 "document_count": len(documents), 

289 "source_types": source_types, 

290 "project_ids": project_ids, 

291 } 

292 

293 # Inject detector runtime stats via public accessor for structured output 

294 try: 

295 detector = ( 

296 self.engine.hybrid_search.cross_document_engine.conflict_detector 

297 ) 

298 get_stats = getattr(detector, "get_stats", None) or getattr( 

299 detector, "get_last_stats", None 

300 ) 

301 raw_stats = {} 

302 if callable(get_stats): 

303 raw_stats = get_stats() or {} 

304 

305 if isinstance(raw_stats, dict) and raw_stats: 

306 # Filter to JSON-safe scalar values only 

307 safe_stats = {} 

308 for key, value in raw_stats.items(): 

309 if isinstance(value, str | int | float | bool) and not str( 

310 key 

311 ).startswith("partial_"): 

312 safe_stats[key] = value 

313 if safe_stats: 

314 conflicts["query_metadata"]["detector_stats"] = safe_stats 

315 except Exception as e: 

316 self.logger.debug("Failed to access detector stats", error=str(e)) 

317 

318 # Store lightweight, JSON-serializable representations of documents 

319 # to keep payload minimal and avoid non-serializable objects 

320 safe_documents: list[dict[str, Any]] = [] 

321 for doc in documents: 

322 try: 

323 document_id = getattr(doc, "document_id", None) 

324 # Support either attribute or mapping style access 

325 if document_id is None and isinstance(doc, dict): 

326 document_id = doc.get("document_id") or doc.get("id") 

327 

328 title = None 

329 if hasattr(doc, "get_display_title") and callable( 

330 doc.get_display_title 

331 ): 

332 try: 

333 title = doc.get_display_title() 

334 except Exception: 

335 title = None 

336 if not title: 

337 title = getattr(doc, "source_title", None) 

338 if not title and isinstance(doc, dict): 

339 title = doc.get("source_title") or doc.get("title") 

340 

341 source_type = getattr(doc, "source_type", None) 

342 if source_type is None and isinstance(doc, dict): 

343 source_type = doc.get("source_type") 

344 

345 safe_documents.append( 

346 { 

347 "document_id": document_id or "", 

348 "title": title or "Untitled", 

349 "source_type": source_type or "unknown", 

350 } 

351 ) 

352 except Exception: 

353 # Skip malformed entries 

354 continue 

355 

356 conflicts["original_documents"] = safe_documents 

357 

358 return conflicts 

359 

360 except Exception as e: 

361 self.logger.error("Conflict detection failed", error=str(e), query=query) 

362 raise 

363 

364 async def find_complementary_content( 

365 self, 

366 target_query: str, 

367 context_query: str, 

368 max_recommendations: int = 5, 

369 source_types: list[str] | None = None, 

370 project_ids: list[str] | None = None, 

371 ) -> dict[str, Any]: 

372 """ 

373 Find content that complements a target document. 

374 

375 Args: 

376 target_query: Query to find the target document 

377 context_query: Query to get contextual documents 

378 max_recommendations: Maximum number of recommendations 

379 source_types: Optional list of source types to filter by 

380 project_ids: Optional list of project IDs to filter by 

381 

382 Returns: 

383 Dict containing complementary recommendations and target document info 

384 """ 

385 if not self.engine.hybrid_search: 

386 raise RuntimeError("Search engine not initialized") 

387 

388 try: 

389 self.logger.info( 

390 f"🔍 Step 1: Searching for target document with query: '{target_query}'" 

391 ) 

392 # Get target document 

393 target_results = await self.engine.hybrid_search.search( 

394 query=target_query, 

395 limit=1, 

396 source_types=(source_types or None), 

397 project_ids=project_ids, 

398 ) 

399 

400 self.logger.info(f"🎯 Target search returned {len(target_results)} results") 

401 if not target_results: 

402 self.logger.warning("No target document found!") 

403 # Retry with a relaxed/sanitized query (drop stopwords and shorten) 

404 import re 

405 

406 tokens = re.findall(r"\w+", target_query) 

407 stop = { 

408 "the", 

409 "and", 

410 "or", 

411 "of", 

412 "for", 

413 "to", 

414 "a", 

415 "an", 

416 "phase", 

417 "kickoff", 

418 } 

419 relaxed_tokens = [t for t in tokens if t.lower() not in stop] 

420 relaxed_query = ( 

421 " ".join(relaxed_tokens[:4]) if relaxed_tokens else target_query 

422 ) 

423 

424 if relaxed_query and relaxed_query != target_query: 

425 self.logger.info( 

426 f"🔁 Retrying target search with relaxed query: '{relaxed_query}'" 

427 ) 

428 target_results = await self.engine.hybrid_search.search( 

429 query=relaxed_query, 

430 limit=1, 

431 source_types=(source_types or None), 

432 project_ids=project_ids, 

433 ) 

434 

435 # Final fallback: use project anchor terms 

436 if not target_results: 

437 fallback_query = "Mya Health " + " ".join(relaxed_tokens[:2]) 

438 self.logger.info( 

439 f"🔁 Final fallback target search with query: '{fallback_query}'" 

440 ) 

441 target_results = await self.engine.hybrid_search.search( 

442 query=fallback_query, 

443 limit=1, 

444 source_types=(source_types or None), 

445 project_ids=project_ids, 

446 ) 

447 

448 if not target_results: 

449 # Absolute last resort: generic project query 

450 generic_query = "Mya Health" 

451 self.logger.info( 

452 f"🔁 Generic fallback target search with query: '{generic_query}'" 

453 ) 

454 target_results = await self.engine.hybrid_search.search( 

455 query=generic_query, 

456 limit=1, 

457 source_types=(source_types or None), 

458 project_ids=project_ids, 

459 ) 

460 

461 if not target_results: 

462 return { 

463 "complementary_recommendations": [], 

464 "target_document": None, 

465 "context_documents_analyzed": 0, 

466 } 

467 

468 target_doc = target_results[0] 

469 self.logger.info(f"📄 Target document: {target_doc.get_display_title()}") 

470 

471 self.logger.info( 

472 f"🔍 Step 2: Searching for context documents with query: '{context_query}'" 

473 ) 

474 # Get context documents for comparison - adaptive limit based on max_recommendations 

475 # Use factor 4 with a minimum of 20 to balance recall and efficiency 

476 adaptive_limit = max(max_recommendations * 4, 20) 

477 context_results = await self.engine.hybrid_search.search( 

478 query=context_query, 

479 limit=adaptive_limit, 

480 source_types=(source_types or None), 

481 project_ids=project_ids, 

482 ) 

483 

484 self.logger.info( 

485 f"📚 Context search returned {len(context_results)} documents" 

486 ) 

487 if not context_results: 

488 self.logger.warning("No context documents found!") 

489 # Retry with a broad project-level context query 

490 broad_context = "Mya Health documentation architecture project" 

491 self.logger.info( 

492 f"🔁 Retrying context search with broad query: '{broad_context}'" 

493 ) 

494 context_results = await self.engine.hybrid_search.search( 

495 query=broad_context, 

496 limit=adaptive_limit, 

497 source_types=(source_types or None), 

498 project_ids=project_ids, 

499 ) 

500 

501 if not context_results: 

502 return { 

503 "complementary_recommendations": [], 

504 "target_document": { 

505 "document_id": target_doc.document_id, 

506 "title": target_doc.get_display_title(), 

507 "source_type": target_doc.source_type, 

508 }, 

509 "context_documents_analyzed": 0, 

510 } 

511 

512 # Find complementary content 

513 self.logger.info("🔍 Step 3: Finding complementary content...") 

514 complementary = await self.engine.hybrid_search.find_complementary_content( 

515 target_doc, context_results, max_recommendations 

516 ) 

517 

518 self.logger.info(f"✅ Found {len(complementary)} recommendations") 

519 

520 # Transform recommendations to expected format 

521 transformed_recommendations = [] 

522 for rec in complementary: 

523 if isinstance(rec, dict): 

524 # Get document info 

525 doc = rec.get("document") 

526 if doc: 

527 transformed_rec = { 

528 "document_id": ( 

529 doc.document_id 

530 if hasattr(doc, "document_id") 

531 else rec.get("document_id", "unknown") 

532 ), 

533 "title": ( 

534 doc.get_display_title() 

535 if hasattr(doc, "get_display_title") 

536 else ( 

537 doc.source_title 

538 if hasattr(doc, "source_title") 

539 else rec.get("title", "Untitled") 

540 ) 

541 ), 

542 "relevance_score": rec.get( 

543 "complementary_score", rec.get("relevance_score", 0.0) 

544 ), 

545 "reason": rec.get("explanation", rec.get("reason", "")), 

546 "strategy": rec.get( 

547 "relationship_type", rec.get("strategy", "related") 

548 ), 

549 # Preserve essential metadata for downstream formatters 

550 "source_type": getattr( 

551 doc, "source_type", rec.get("source_type", "unknown") 

552 ), 

553 "project_id": getattr( 

554 doc, "project_id", rec.get("project_id") 

555 ), 

556 } 

557 transformed_recommendations.append(transformed_rec) 

558 else: 

559 # Handle non-dict recommendations 

560 transformed_recommendations.append(rec) 

561 

562 return { 

563 "complementary_recommendations": transformed_recommendations, 

564 "target_document": { 

565 "document_id": target_doc.document_id, 

566 "title": target_doc.get_display_title(), 

567 "source_type": target_doc.source_type, 

568 }, 

569 "context_documents_analyzed": len(context_results), 

570 } 

571 

572 except Exception as e: 

573 self.logger.error("Complementary content search failed", error=str(e)) 

574 raise 

575 

576 async def cluster_documents( 

577 self, 

578 query: str, 

579 strategy: ClusteringStrategy = ClusteringStrategy.MIXED_FEATURES, 

580 max_clusters: int = 10, 

581 min_cluster_size: int = 2, 

582 limit: int = 30, 

583 source_types: list[str] | None = None, 

584 project_ids: list[str] | None = None, 

585 ) -> dict[str, Any]: 

586 """ 

587 Cluster documents using the specified strategy. 

588 

589 Args: 

590 query: Search query to get documents for clustering 

591 strategy: Clustering strategy to use 

592 max_clusters: Maximum number of clusters to create 

593 min_cluster_size: Minimum documents per cluster 

594 limit: Maximum documents to analyze 

595 source_types: Optional list of source types to filter by 

596 project_ids: Optional list of project IDs to filter by 

597 

598 Returns: 

599 Dictionary containing clusters and metadata 

600 """ 

601 if not self.engine.hybrid_search: 

602 raise RuntimeError("Search engine not initialized") 

603 

604 try: 

605 # Get documents for clustering 

606 documents = await self.engine.hybrid_search.search( 

607 query=query, 

608 limit=limit, 

609 source_types=source_types, 

610 project_ids=project_ids, 

611 ) 

612 

613 if len(documents) < min_cluster_size: 

614 return { 

615 "clusters": [], 

616 "clustering_metadata": { 

617 "message": f"Need at least {min_cluster_size} documents for clustering", 

618 "document_count": len(documents), 

619 "original_query": query, 

620 "source_types": source_types, 

621 "project_ids": project_ids, 

622 "strategy": strategy.value, 

623 "max_clusters": max_clusters, 

624 "min_cluster_size": min_cluster_size, 

625 }, 

626 } 

627 

628 # Perform clustering 

629 clusters = await self.engine.hybrid_search.cluster_documents( 

630 documents=documents, 

631 strategy=strategy, 

632 max_clusters=max_clusters, 

633 min_cluster_size=min_cluster_size, 

634 ) 

635 

636 # Add query metadata - merge into clustering_metadata if it exists 

637 result = {**clusters} 

638 if "clustering_metadata" in result: 

639 result["clustering_metadata"]["original_query"] = query 

640 result["clustering_metadata"]["document_count"] = len(documents) 

641 result["clustering_metadata"]["source_types"] = source_types 

642 result["clustering_metadata"]["project_ids"] = project_ids 

643 else: 

644 result["query_metadata"] = { 

645 "original_query": query, 

646 "document_count": len(documents), 

647 "source_types": source_types, 

648 "project_ids": project_ids, 

649 "strategy": strategy.value, 

650 "max_clusters": max_clusters, 

651 "min_cluster_size": min_cluster_size, 

652 } 

653 

654 return result 

655 

656 except Exception as e: 

657 self.logger.error("Document clustering failed", error=str(e), query=query) 

658 raise