Coverage for src / qdrant_loader_mcp_server / search / engine / intelligence.py: 54%

216 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1""" 

2Cross-Document Intelligence Operations. 

3 

4This module implements cross-document intelligence functionality including 

5document relationship analysis, similarity detection, conflict detection, 

6complementary content discovery, and document clustering. 

7""" 

8 

9from contextlib import contextmanager 

10from typing import TYPE_CHECKING, Any 

11 

12if TYPE_CHECKING: 

13 from .core import SearchEngine 

14 

15from ...utils.logging import LoggingConfig 

16from ..enhanced.cross_document_intelligence import ClusteringStrategy, SimilarityMetric 

17 

18logger = LoggingConfig.get_logger(__name__) 

19 

20 

21class IntelligenceOperations: 

22 """Handles cross-document intelligence operations.""" 

23 

24 def __init__(self, engine: "SearchEngine"): 

25 """Initialize with search engine reference.""" 

26 self.engine = engine 

27 self.logger = LoggingConfig.get_logger(__name__) 

28 

29 async def analyze_document_relationships( 

30 self, 

31 query: str, 

32 limit: int | None = None, 

33 source_types: list[str] | None = None, 

34 project_ids: list[str] | None = None, 

35 ) -> dict[str, Any]: 

36 """ 

37 Analyze relationships between documents from search results. 

38 

39 Args: 

40 query: Search query to get documents for analysis 

41 limit: Maximum number of documents to analyze 

42 source_types: Optional list of source types to filter by 

43 project_ids: Optional list of project IDs to filter by 

44 

45 Returns: 

46 Comprehensive cross-document relationship analysis 

47 """ 

48 if not self.engine.hybrid_search: 

49 raise RuntimeError("Search engine not initialized") 

50 

51 try: 

52 # Get documents for analysis 

53 # Honor default conflict limit from config if caller didn't override 

54 effective_limit = limit 

55 config = getattr(self.engine, "config", None) 

56 if limit is None: 

57 if config is not None: 

58 default_limit = getattr(config, "conflict_limit_default", None) 

59 if isinstance(default_limit, int): 

60 effective_limit = default_limit 

61 else: 

62 effective_limit = 20 

63 else: 

64 effective_limit = 20 

65 

66 documents = await self.engine.hybrid_search.search( 

67 query=query, 

68 limit=effective_limit, 

69 source_types=source_types, 

70 project_ids=project_ids, 

71 ) 

72 

73 if len(documents) < 2: 

74 return { 

75 "error": "Need at least 2 documents for relationship analysis", 

76 "document_count": len(documents), 

77 } 

78 

79 # Perform cross-document analysis 

80 analysis = await self.engine.hybrid_search.analyze_document_relationships( 

81 documents 

82 ) 

83 

84 # Add query metadata 

85 analysis["query_metadata"] = { 

86 "original_query": query, 

87 "document_count": len(documents), 

88 "source_types": source_types, 

89 "project_ids": project_ids, 

90 } 

91 

92 return analysis 

93 

94 except Exception as e: 

95 self.logger.error( 

96 "Document relationship analysis failed", error=str(e), query=query 

97 ) 

98 raise 

99 

100 async def find_similar_documents( 

101 self, 

102 target_query: str, 

103 comparison_query: str, 

104 similarity_metrics: list[str] | None = None, 

105 max_similar: int = 5, 

106 source_types: list[str] | None = None, 

107 project_ids: list[str] | None = None, 

108 similarity_threshold: float = 0.7, 

109 ) -> dict[str, Any]: 

110 """ 

111 Find documents most similar to a target document retrieved by a query. 

112 

113 Parameters: 

114 target_query (str): Query used to select the target document (first search result). 

115 comparison_query (str): Query used to retrieve candidate documents to compare against. 

116 similarity_metrics (list[str] | None): Optional list of similarity metric names; unknown names are ignored. 

117 max_similar (int): Maximum number of similar documents to include in results. 

118 source_types (list[str] | None): Optional list of source types to filter both searches. 

119 project_ids (list[str] | None): Optional list of project IDs to filter both searches. 

120 similarity_threshold (float): Minimum similarity score for results to be considered similar. 

121 

122 Returns: 

123 dict: Result object containing either an error or similarity details. 

124 On success, includes: 

125 - target_document (dict): {document_id, title, source_type} for the target. 

126 - similar_documents: Backend-provided list of similar document entries (each includes similarity scores). 

127 - similarity_metrics_used: List of metric names used or the string "default". 

128 - comparison_documents_analyzed (int): Number of comparison documents evaluated. 

129 On failure, includes an "error" key with details and additional context fields (e.g., target_query or comparison_count). 

130 """ 

131 if not self.engine.hybrid_search: 

132 raise RuntimeError("Search engine not initialized") 

133 

134 try: 

135 # Get target document (first result from target query) 

136 target_results = await self.engine.hybrid_search.search( 

137 query=target_query, 

138 limit=1, 

139 source_types=source_types, 

140 project_ids=project_ids, 

141 ) 

142 

143 if not target_results: 

144 return { 

145 "error": "No target document found", 

146 "target_query": target_query, 

147 } 

148 

149 target_doc = target_results[0] 

150 

151 # Get comparison documents 

152 comparison_results = await self.engine.hybrid_search.search( 

153 query=comparison_query, 

154 limit=50, # Get more candidates for comparison 

155 source_types=source_types, 

156 project_ids=project_ids, 

157 ) 

158 

159 if len(comparison_results) < 2: 

160 return { 

161 "error": "Need at least 1 comparison document", 

162 "comparison_count": len(comparison_results), 

163 } 

164 

165 # Parse similarity metrics 

166 metric_enums = [] 

167 if similarity_metrics: 

168 for metric_str in similarity_metrics: 

169 try: 

170 metric_enums.append(SimilarityMetric(metric_str)) 

171 except ValueError: 

172 self.logger.warning(f"Unknown similarity metric: {metric_str}") 

173 

174 # Find similar documents 

175 similar = await self.engine.hybrid_search.find_similar_documents( 

176 target_doc, 

177 comparison_results, 

178 metric_enums or None, 

179 max_similar, 

180 similarity_threshold, 

181 ) 

182 

183 return { 

184 "target_document": { 

185 "document_id": target_doc.document_id, 

186 "title": target_doc.get_display_title(), 

187 "source_type": target_doc.source_type, 

188 }, 

189 "similar_documents": similar, 

190 "similarity_metrics_used": ( 

191 [m.value for m in metric_enums] if metric_enums else "default" 

192 ), 

193 "comparison_documents_analyzed": len(comparison_results), 

194 } 

195 

196 except Exception as e: 

197 self.logger.error("Similarity search failed", error=str(e)) 

198 raise 

199 

200 async def detect_document_conflicts( 

201 self, 

202 query: str, 

203 limit: int | None = None, 

204 source_types: list[str] | None = None, 

205 project_ids: list[str] | None = None, 

206 *, 

207 use_llm: bool | None = None, 

208 max_llm_pairs: int | None = None, 

209 overall_timeout_s: float | None = None, 

210 max_pairs_total: int | None = None, 

211 text_window_chars: int | None = None, 

212 ) -> dict[str, Any]: 

213 """ 

214 Detect conflicts between documents. 

215 

216 Args: 

217 query: Search query to get documents for conflict analysis 

218 limit: Maximum number of documents to analyze 

219 source_types: Optional list of source types to filter by 

220 project_ids: Optional list of project IDs to filter by 

221 

222 Returns: 

223 Conflict analysis with detected conflicts and resolution suggestions 

224 """ 

225 if not self.engine.hybrid_search: 

226 raise RuntimeError("Search engine not initialized") 

227 

228 try: 

229 # Get documents for conflict analysis 

230 effective_limit = limit 

231 config = getattr(self.engine, "config", None) 

232 if limit is None and config is not None: 

233 default_limit = getattr(config, "conflict_limit_default", None) 

234 if isinstance(default_limit, int): 

235 effective_limit = default_limit 

236 

237 documents = await self.engine.hybrid_search.search( 

238 query=query, 

239 limit=effective_limit, 

240 source_types=source_types, 

241 project_ids=project_ids, 

242 ) 

243 

244 if len(documents) < 2: 

245 return { 

246 "conflicts": [], 

247 "resolution_suggestions": [], 

248 "message": "Need at least 2 documents for conflict detection", 

249 "document_count": len(documents), 

250 } 

251 

252 # Detect conflicts with optional per-call overrides applied 

253 detector = self.engine.hybrid_search.cross_document_engine.conflict_detector 

254 call_overrides: dict[str, Any] = {} 

255 if use_llm is not None: 

256 call_overrides["conflict_use_llm"] = bool(use_llm) 

257 if isinstance(max_llm_pairs, int): 

258 call_overrides["conflict_max_llm_pairs"] = max_llm_pairs 

259 if isinstance(overall_timeout_s, int | float): 

260 call_overrides["conflict_overall_timeout_s"] = float(overall_timeout_s) 

261 if isinstance(max_pairs_total, int): 

262 call_overrides["conflict_max_pairs_total"] = max_pairs_total 

263 if isinstance(text_window_chars, int): 

264 call_overrides["conflict_text_window_chars"] = text_window_chars 

265 

266 @contextmanager 

267 def temporary_detector_settings(det: Any, overrides: dict[str, Any] | None): 

268 """Temporarily apply merged detector settings and restore afterwards.""" 

269 previous = ( 

270 getattr(det, "_settings", {}) if hasattr(det, "_settings") else {} 

271 ) 

272 if not overrides: 

273 # No overrides to apply; simply yield control 

274 yield 

275 return 

276 merged_settings = dict(previous) 

277 merged_settings.update(overrides) 

278 try: 

279 det._settings = merged_settings # type: ignore[attr-defined] 

280 except Exception: 

281 # If settings assignment fails, proceed without overriding 

282 pass 

283 try: 

284 yield 

285 finally: 

286 # Always attempt to restore previous settings 

287 try: 

288 det._settings = previous # type: ignore[attr-defined] 

289 except Exception: 

290 pass 

291 

292 with temporary_detector_settings(detector, call_overrides): 

293 conflicts = await self.engine.hybrid_search.detect_document_conflicts( 

294 documents 

295 ) 

296 

297 # Add query metadata and original documents for formatting 

298 conflicts["query_metadata"] = { 

299 "original_query": query, 

300 "document_count": len(documents), 

301 "source_types": source_types, 

302 "project_ids": project_ids, 

303 } 

304 

305 # Inject detector runtime stats via public accessor for structured output 

306 try: 

307 detector = ( 

308 self.engine.hybrid_search.cross_document_engine.conflict_detector 

309 ) 

310 get_stats = getattr(detector, "get_stats", None) or getattr( 

311 detector, "get_last_stats", None 

312 ) 

313 raw_stats = {} 

314 if callable(get_stats): 

315 raw_stats = get_stats() or {} 

316 

317 if isinstance(raw_stats, dict) and raw_stats: 

318 # Filter to JSON-safe scalar values only 

319 safe_stats = {} 

320 for key, value in raw_stats.items(): 

321 if isinstance(value, str | int | float | bool) and not str( 

322 key 

323 ).startswith("partial_"): 

324 safe_stats[key] = value 

325 if safe_stats: 

326 conflicts["query_metadata"]["detector_stats"] = safe_stats 

327 except Exception as e: 

328 self.logger.debug("Failed to access detector stats", error=str(e)) 

329 

330 # Store lightweight, JSON-serializable representations of documents 

331 # to keep payload minimal and avoid non-serializable objects 

332 safe_documents: list[dict[str, Any]] = [] 

333 for doc in documents: 

334 try: 

335 document_id = getattr(doc, "document_id", None) 

336 # Support either attribute or mapping style access 

337 if document_id is None and isinstance(doc, dict): 

338 document_id = doc.get("document_id") or doc.get("id") 

339 

340 title = None 

341 if hasattr(doc, "get_display_title") and callable( 

342 doc.get_display_title 

343 ): 

344 try: 

345 title = doc.get_display_title() 

346 except Exception: 

347 title = None 

348 if not title: 

349 title = getattr(doc, "source_title", None) 

350 if not title and isinstance(doc, dict): 

351 title = doc.get("source_title") or doc.get("title") 

352 

353 source_type = getattr(doc, "source_type", None) 

354 if source_type is None and isinstance(doc, dict): 

355 source_type = doc.get("source_type") 

356 

357 safe_documents.append( 

358 { 

359 "document_id": document_id or "", 

360 "title": title or "Untitled", 

361 "source_type": source_type or "unknown", 

362 } 

363 ) 

364 except Exception: 

365 # Skip malformed entries 

366 continue 

367 

368 conflicts["original_documents"] = safe_documents 

369 

370 return conflicts 

371 

372 except Exception as e: 

373 self.logger.error("Conflict detection failed", error=str(e), query=query) 

374 raise 

375 

376 async def find_complementary_content( 

377 self, 

378 target_query: str, 

379 context_query: str, 

380 max_recommendations: int = 5, 

381 source_types: list[str] | None = None, 

382 project_ids: list[str] | None = None, 

383 ) -> dict[str, Any]: 

384 """ 

385 Find documents that complement a target document using contextual documents. 

386 

387 Performs a search for a target document (with several fallback queries if none found), retrieves contextual documents, and returns up to `max_recommendations` complementary recommendations derived from those context documents. 

388 

389 Parameters: 

390 target_query (str): Query used to locate the primary target document. 

391 context_query (str): Query used to retrieve contextual documents for comparison. 

392 max_recommendations (int): Maximum number of complementary recommendations to return. 

393 source_types (list[str] | None): Optional list of source types to filter searches. 

394 project_ids (list[str] | None): Optional list of project IDs to filter searches. 

395 

396 Returns: 

397 dict: { 

398 "complementary_recommendations": list -- Transformed recommendation entries (each is a dict with at least `document_id`, `title`, `relevance_score`, `reason`, `strategy`, and optional `source_type`/`project_id`) or raw recommendation items if not mappable; 

399 "target_document": dict | None -- `{ "document_id", "title", "source_type" }` for the chosen target document, or `None` if no target was found; 

400 "context_documents_analyzed": int -- Number of context documents that were analyzed. 

401 } 

402 """ 

403 if not self.engine.hybrid_search: 

404 raise RuntimeError("Search engine not initialized") 

405 

406 try: 

407 self.logger.info( 

408 f"🔍 Step 1: Searching for target document with query: '{target_query}'" 

409 ) 

410 # Get target document 

411 target_results = await self.engine.hybrid_search.search( 

412 query=target_query, 

413 limit=1, 

414 source_types=(source_types or None), 

415 project_ids=project_ids, 

416 ) 

417 

418 self.logger.info(f"🎯 Target search returned {len(target_results)} results") 

419 if not target_results: 

420 self.logger.warning("No target document found!") 

421 # Retry with a relaxed/sanitized query (drop stopwords and shorten) 

422 import re 

423 

424 tokens = re.findall(r"\w+", target_query) 

425 stop = { 

426 "the", 

427 "and", 

428 "or", 

429 "of", 

430 "for", 

431 "to", 

432 "a", 

433 "an", 

434 "phase", 

435 "kickoff", 

436 } 

437 relaxed_tokens = [t for t in tokens if t.lower() not in stop] 

438 relaxed_query = ( 

439 " ".join(relaxed_tokens[:4]) if relaxed_tokens else target_query 

440 ) 

441 

442 if relaxed_query and relaxed_query != target_query: 

443 self.logger.info( 

444 f"🔁 Retrying target search with relaxed query: '{relaxed_query}'" 

445 ) 

446 target_results = await self.engine.hybrid_search.search( 

447 query=relaxed_query, 

448 limit=1, 

449 source_types=(source_types or None), 

450 project_ids=project_ids, 

451 ) 

452 

453 # Final fallback: use project anchor terms 

454 if not target_results: 

455 fallback_query = "Mya Health " + " ".join(relaxed_tokens[:2]) 

456 self.logger.info( 

457 f"🔁 Final fallback target search with query: '{fallback_query}'" 

458 ) 

459 target_results = await self.engine.hybrid_search.search( 

460 query=fallback_query, 

461 limit=1, 

462 source_types=(source_types or None), 

463 project_ids=project_ids, 

464 ) 

465 

466 if not target_results: 

467 # Absolute last resort: generic project query 

468 generic_query = "Mya Health" 

469 self.logger.info( 

470 f"🔁 Generic fallback target search with query: '{generic_query}'" 

471 ) 

472 target_results = await self.engine.hybrid_search.search( 

473 query=generic_query, 

474 limit=1, 

475 source_types=(source_types or None), 

476 project_ids=project_ids, 

477 ) 

478 

479 if not target_results: 

480 return { 

481 "complementary_recommendations": [], 

482 "target_document": None, 

483 "context_documents_analyzed": 0, 

484 } 

485 

486 target_doc = target_results[0] 

487 self.logger.info(f"📄 Target document: {target_doc.get_display_title()}") 

488 

489 self.logger.info( 

490 f"🔍 Step 2: Searching for context documents with query: '{context_query}'" 

491 ) 

492 # Get context documents for comparison - adaptive limit based on max_recommendations 

493 # Use factor 4 with a minimum of 20 to balance recall and efficiency 

494 adaptive_limit = max(max_recommendations * 4, 20) 

495 context_results = await self.engine.hybrid_search.search( 

496 query=context_query, 

497 limit=adaptive_limit, 

498 source_types=(source_types or None), 

499 project_ids=project_ids, 

500 ) 

501 

502 self.logger.info( 

503 f"📚 Context search returned {len(context_results)} documents" 

504 ) 

505 if not context_results: 

506 self.logger.warning("No context documents found!") 

507 # Retry with a broad project-level context query 

508 broad_context = "Mya Health documentation architecture project" 

509 self.logger.info( 

510 f"🔁 Retrying context search with broad query: '{broad_context}'" 

511 ) 

512 context_results = await self.engine.hybrid_search.search( 

513 query=broad_context, 

514 limit=adaptive_limit, 

515 source_types=(source_types or None), 

516 project_ids=project_ids, 

517 ) 

518 

519 if not context_results: 

520 return { 

521 "complementary_recommendations": [], 

522 "target_document": { 

523 "document_id": target_doc.document_id, 

524 "title": target_doc.get_display_title(), 

525 "source_type": target_doc.source_type, 

526 }, 

527 "context_documents_analyzed": 0, 

528 } 

529 

530 # Find complementary content 

531 self.logger.info("🔍 Step 3: Finding complementary content...") 

532 complementary = await self.engine.hybrid_search.find_complementary_content( 

533 target_doc, context_results, max_recommendations 

534 ) 

535 

536 self.logger.info(f"✅ Found {len(complementary)} recommendations") 

537 

538 # Transform recommendations to expected format 

539 transformed_recommendations = [] 

540 for rec in complementary: 

541 if isinstance(rec, dict): 

542 # Get document info 

543 doc = rec.get("document") 

544 if doc: 

545 transformed_rec = { 

546 "document_id": ( 

547 doc.document_id 

548 if hasattr(doc, "document_id") 

549 else rec.get("document_id", "unknown") 

550 ), 

551 "title": ( 

552 doc.get_display_title() 

553 if hasattr(doc, "get_display_title") 

554 else ( 

555 doc.source_title 

556 if hasattr(doc, "source_title") 

557 else rec.get("title", "Untitled") 

558 ) 

559 ), 

560 "relevance_score": rec.get( 

561 "complementary_score", rec.get("relevance_score", 0.0) 

562 ), 

563 "reason": rec.get( 

564 "recommendation_reason", rec.get("reason", "") 

565 ), 

566 "strategy": rec.get( 

567 "relationship_type", rec.get("strategy", "related") 

568 ), 

569 # Preserve essential metadata for downstream formatters 

570 "source_type": getattr( 

571 doc, "source_type", rec.get("source_type", "unknown") 

572 ), 

573 "project_id": getattr( 

574 doc, "project_id", rec.get("project_id") 

575 ), 

576 } 

577 transformed_recommendations.append(transformed_rec) 

578 else: 

579 # Handle non-dict recommendations 

580 transformed_recommendations.append(rec) 

581 

582 return { 

583 "complementary_recommendations": transformed_recommendations, 

584 "target_document": { 

585 "document_id": target_doc.document_id, 

586 "title": target_doc.get_display_title(), 

587 "source_type": target_doc.source_type, 

588 }, 

589 "context_documents_analyzed": len(context_results), 

590 } 

591 

592 except Exception as e: 

593 self.logger.error("Complementary content search failed", error=str(e)) 

594 raise 

595 

596 async def cluster_documents( 

597 self, 

598 query: str, 

599 strategy: ClusteringStrategy = ClusteringStrategy.MIXED_FEATURES, 

600 max_clusters: int = 10, 

601 min_cluster_size: int = 2, 

602 limit: int = 30, 

603 source_types: list[str] | None = None, 

604 project_ids: list[str] | None = None, 

605 ) -> dict[str, Any]: 

606 """ 

607 Cluster documents using the specified strategy. 

608 

609 Args: 

610 query: Search query to get documents for clustering 

611 strategy: Clustering strategy to use 

612 max_clusters: Maximum number of clusters to create 

613 min_cluster_size: Minimum documents per cluster 

614 limit: Maximum documents to analyze 

615 source_types: Optional list of source types to filter by 

616 project_ids: Optional list of project IDs to filter by 

617 

618 Returns: 

619 Dictionary containing clusters and metadata 

620 """ 

621 if not self.engine.hybrid_search: 

622 raise RuntimeError("Search engine not initialized") 

623 

624 try: 

625 # Get documents for clustering 

626 documents = await self.engine.hybrid_search.search( 

627 query=query, 

628 limit=limit, 

629 source_types=source_types, 

630 project_ids=project_ids, 

631 ) 

632 

633 if len(documents) < min_cluster_size: 

634 return { 

635 "clusters": [], 

636 "clustering_metadata": { 

637 "message": f"Need at least {min_cluster_size} documents for clustering", 

638 "document_count": len(documents), 

639 "original_query": query, 

640 "source_types": source_types, 

641 "project_ids": project_ids, 

642 "strategy": strategy.value, 

643 "max_clusters": max_clusters, 

644 "min_cluster_size": min_cluster_size, 

645 }, 

646 } 

647 

648 # Perform clustering 

649 clusters = await self.engine.hybrid_search.cluster_documents( 

650 documents=documents, 

651 strategy=strategy, 

652 max_clusters=max_clusters, 

653 min_cluster_size=min_cluster_size, 

654 ) 

655 

656 # Add query metadata - merge into clustering_metadata if it exists 

657 result = {**clusters} 

658 if "clustering_metadata" in result: 

659 result["clustering_metadata"]["original_query"] = query 

660 result["clustering_metadata"]["document_count"] = len(documents) 

661 result["clustering_metadata"]["source_types"] = source_types 

662 result["clustering_metadata"]["project_ids"] = project_ids 

663 else: 

664 result["query_metadata"] = { 

665 "original_query": query, 

666 "document_count": len(documents), 

667 "source_types": source_types, 

668 "project_ids": project_ids, 

669 "strategy": strategy.value, 

670 "max_clusters": max_clusters, 

671 "min_cluster_size": min_cluster_size, 

672 } 

673 

674 return result 

675 

676 except Exception as e: 

677 self.logger.error("Document clustering failed", error=str(e), query=query) 

678 raise