Coverage for src/qdrant_loader_mcp_server/search/engine.py: 55%

197 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:38 +0000

1"""Search engine service for the MCP server.""" 

2 

3from typing import Any 

4 

5from openai import AsyncOpenAI 

6from qdrant_client import AsyncQdrantClient 

7from qdrant_client.http import models 

8 

9from ..config import OpenAIConfig, QdrantConfig 

10from ..utils.logging import LoggingConfig 

11from .hybrid_search import HybridSearchEngine 

12from .models import SearchResult 

13# 🔥 NEW: Import Phase 1.2 topic chaining components 

14from .enhanced.topic_search_chain import TopicSearchChain, ChainStrategy 

15# 🔥 NEW: Import Phase 2.3 cross-document intelligence components 

16from .enhanced.cross_document_intelligence import SimilarityMetric, ClusteringStrategy 

17 

18logger = LoggingConfig.get_logger(__name__) 

19 

20 

21class SearchEngine: 

22 """Main search engine that orchestrates query processing and search.""" 

23 

24 def __init__(self): 

25 """Initialize the search engine.""" 

26 self.client: AsyncQdrantClient | None = None 

27 self.config: QdrantConfig | None = None 

28 self.openai_client: AsyncOpenAI | None = None 

29 self.hybrid_search: HybridSearchEngine | None = None 

30 self.logger = LoggingConfig.get_logger(__name__) 

31 

32 async def initialize( 

33 self, config: QdrantConfig, openai_config: OpenAIConfig 

34 ) -> None: 

35 """Initialize the search engine with configuration.""" 

36 self.config = config 

37 try: 

38 self.client = AsyncQdrantClient(url=config.url, api_key=config.api_key) 

39 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key) 

40 

41 # Ensure collection exists 

42 if self.client is None: 

43 raise RuntimeError("Failed to initialize Qdrant client") 

44 

45 collections = await self.client.get_collections() 

46 if not any(c.name == config.collection_name for c in collections.collections): 

47 await self.client.create_collection( 

48 collection_name=config.collection_name, 

49 vectors_config=models.VectorParams( 

50 size=1536, # Default size for OpenAI embeddings 

51 distance=models.Distance.COSINE, 

52 ), 

53 ) 

54 

55 # Initialize hybrid search 

56 if self.client and self.openai_client: 

57 self.hybrid_search = HybridSearchEngine( 

58 qdrant_client=self.client, 

59 openai_client=self.openai_client, 

60 collection_name=config.collection_name, 

61 ) 

62 

63 self.logger.info("Successfully connected to Qdrant", url=config.url) 

64 except Exception as e: 

65 self.logger.error( 

66 "Failed to connect to Qdrant server", 

67 error=str(e), 

68 url=config.url, 

69 hint="Make sure Qdrant is running and accessible at the configured URL", 

70 ) 

71 raise RuntimeError( 

72 f"Failed to connect to Qdrant server at {config.url}. " 

73 "Please ensure Qdrant is running and accessible." 

74 ) from None # Suppress the original exception 

75 

76 async def cleanup(self) -> None: 

77 """Cleanup resources.""" 

78 if self.client: 

79 await self.client.close() 

80 self.client = None 

81 

82 async def search( 

83 self, 

84 query: str, 

85 source_types: list[str] | None = None, 

86 limit: int = 5, 

87 project_ids: list[str] | None = None, 

88 ) -> list[SearchResult]: 

89 """Search for documents using hybrid search. 

90 

91 Args: 

92 query: Search query text 

93 source_types: Optional list of source types to filter by 

94 limit: Maximum number of results to return 

95 project_ids: Optional list of project IDs to filter by 

96 """ 

97 if not self.hybrid_search: 

98 raise RuntimeError("Search engine not initialized") 

99 

100 self.logger.debug( 

101 "Performing search", 

102 query=query, 

103 source_types=source_types, 

104 limit=limit, 

105 project_ids=project_ids, 

106 ) 

107 

108 try: 

109 results = await self.hybrid_search.search( 

110 query=query, 

111 source_types=source_types, 

112 limit=limit, 

113 project_ids=project_ids, 

114 ) 

115 

116 self.logger.info( 

117 "Search completed", 

118 query=query, 

119 result_count=len(results), 

120 project_ids=project_ids, 

121 ) 

122 

123 return results 

124 except Exception as e: 

125 self.logger.error("Search failed", error=str(e), query=query) 

126 raise 

127 

128 async def generate_topic_chain( 

129 self, 

130 query: str, 

131 strategy: str = "mixed_exploration", 

132 max_links: int = 5 

133 ) -> TopicSearchChain: 

134 """🔥 NEW: Generate a topic-driven search chain for progressive discovery. 

135  

136 Args: 

137 query: Original search query 

138 strategy: Chain generation strategy (breadth_first, depth_first, relevance_ranked, mixed_exploration) 

139 max_links: Maximum number of chain links to generate 

140  

141 Returns: 

142 TopicSearchChain with progressive exploration queries 

143 """ 

144 if not self.hybrid_search: 

145 raise RuntimeError("Search engine not initialized") 

146 

147 # Convert string strategy to enum 

148 try: 

149 chain_strategy = ChainStrategy(strategy) 

150 except ValueError: 

151 self.logger.warning(f"Unknown strategy '{strategy}', using mixed_exploration") 

152 chain_strategy = ChainStrategy.MIXED_EXPLORATION 

153 

154 self.logger.debug( 

155 "Generating topic search chain", 

156 query=query, 

157 strategy=strategy, 

158 max_links=max_links 

159 ) 

160 

161 try: 

162 topic_chain = await self.hybrid_search.generate_topic_search_chain( 

163 query=query, 

164 strategy=chain_strategy, 

165 max_links=max_links 

166 ) 

167 

168 self.logger.info( 

169 "Topic chain generation completed", 

170 query=query, 

171 chain_length=len(topic_chain.chain_links), 

172 topics_covered=topic_chain.total_topics_covered, 

173 discovery_potential=f"{topic_chain.estimated_discovery_potential:.2f}" 

174 ) 

175 

176 return topic_chain 

177 except Exception as e: 

178 self.logger.error("Topic chain generation failed", error=str(e), query=query) 

179 raise 

180 

181 async def execute_topic_chain( 

182 self, 

183 topic_chain: TopicSearchChain, 

184 results_per_link: int = 3, 

185 source_types: list[str] | None = None, 

186 project_ids: list[str] | None = None 

187 ) -> dict[str, list[SearchResult]]: 

188 """🔥 NEW: Execute searches for all links in a topic chain. 

189  

190 Args: 

191 topic_chain: The topic search chain to execute 

192 results_per_link: Number of results per chain link 

193 source_types: Optional source type filters 

194 project_ids: Optional project ID filters 

195  

196 Returns: 

197 Dictionary mapping queries to search results 

198 """ 

199 if not self.hybrid_search: 

200 raise RuntimeError("Search engine not initialized") 

201 

202 self.logger.debug( 

203 "Executing topic chain search", 

204 original_query=topic_chain.original_query, 

205 chain_length=len(topic_chain.chain_links), 

206 results_per_link=results_per_link 

207 ) 

208 

209 try: 

210 chain_results = await self.hybrid_search.execute_topic_chain_search( 

211 topic_chain=topic_chain, 

212 results_per_link=results_per_link, 

213 source_types=source_types, 

214 project_ids=project_ids 

215 ) 

216 

217 total_results = sum(len(results) for results in chain_results.values()) 

218 self.logger.info( 

219 "Topic chain execution completed", 

220 original_query=topic_chain.original_query, 

221 total_queries=len(chain_results), 

222 total_results=total_results 

223 ) 

224 

225 return chain_results 

226 except Exception as e: 

227 self.logger.error("Topic chain execution failed", error=str(e)) 

228 raise 

229 

230 async def search_with_topic_chain( 

231 self, 

232 query: str, 

233 strategy: str = "mixed_exploration", 

234 max_links: int = 5, 

235 results_per_link: int = 3, 

236 source_types: list[str] | None = None, 

237 project_ids: list[str] | None = None 

238 ) -> dict[str, list[SearchResult]]: 

239 """🔥 NEW: Combined method to generate and execute a topic search chain. 

240  

241 Args: 

242 query: Original search query 

243 strategy: Chain generation strategy 

244 max_links: Maximum chain links 

245 results_per_link: Results per link 

246 source_types: Optional source filters 

247 project_ids: Optional project filters 

248  

249 Returns: 

250 Dictionary mapping chain queries to their results 

251 """ 

252 self.logger.debug( 

253 "Starting topic chain search workflow", 

254 query=query, 

255 strategy=strategy, 

256 max_links=max_links, 

257 results_per_link=results_per_link 

258 ) 

259 

260 try: 

261 # Generate topic chain 

262 topic_chain = await self.generate_topic_chain( 

263 query=query, 

264 strategy=strategy, 

265 max_links=max_links 

266 ) 

267 

268 # Execute the chain 

269 chain_results = await self.execute_topic_chain( 

270 topic_chain=topic_chain, 

271 results_per_link=results_per_link, 

272 source_types=source_types, 

273 project_ids=project_ids 

274 ) 

275 

276 self.logger.info( 

277 "Topic chain search workflow completed", 

278 query=query, 

279 total_queries=len(chain_results), 

280 total_results=sum(len(results) for results in chain_results.values()), 

281 discovery_potential=f"{topic_chain.estimated_discovery_potential:.2f}" 

282 ) 

283 

284 return chain_results 

285 except Exception as e: 

286 self.logger.error("Topic chain search workflow failed", error=str(e), query=query) 

287 raise 

288 

289 # ============================================================================ 

290 # 🔥 Phase 1.3: Dynamic Faceted Search Interface Methods 

291 # ============================================================================ 

292 

293 async def search_with_facets( 

294 self, 

295 query: str, 

296 limit: int = 5, 

297 source_types: list[str] | None = None, 

298 project_ids: list[str] | None = None, 

299 facet_filters: list[dict] | None = None, 

300 ) -> dict: 

301 """ 

302 🔥 Phase 1.3: Perform faceted search with dynamic facet generation. 

303  

304 Returns search results with generated facets for interactive filtering. 

305  

306 Args: 

307 query: Search query 

308 limit: Maximum number of results to return 

309 source_types: Optional list of source types to filter by 

310 project_ids: Optional list of project IDs to filter by 

311 facet_filters: Optional list of facet filters to apply 

312  

313 Returns: 

314 Dictionary containing: 

315 - results: List of search results 

316 - facets: List of generated facets with counts 

317 - total_results: Total results before facet filtering 

318 - filtered_count: Results after facet filtering 

319 - applied_filters: Currently applied facet filters 

320 """ 

321 if not self.hybrid_search: 

322 raise RuntimeError("Search engine not initialized") 

323 

324 try: 

325 # Convert facet filter dictionaries to FacetFilter objects if provided 

326 filter_objects = [] 

327 if facet_filters: 

328 from .enhanced.faceted_search import FacetFilter, FacetType 

329 for filter_dict in facet_filters: 

330 facet_type = FacetType(filter_dict["facet_type"]) 

331 filter_objects.append(FacetFilter( 

332 facet_type=facet_type, 

333 values=filter_dict["values"], 

334 operator=filter_dict.get("operator", "OR") 

335 )) 

336 

337 faceted_results = await self.hybrid_search.search_with_facets( 

338 query=query, 

339 limit=limit, 

340 source_types=source_types, 

341 project_ids=project_ids, 

342 facet_filters=filter_objects, 

343 generate_facets=True 

344 ) 

345 

346 # Convert to MCP-friendly format 

347 return { 

348 "results": faceted_results.results, 

349 "facets": [ 

350 { 

351 "type": facet.facet_type.value, 

352 "name": facet.name, 

353 "display_name": facet.display_name, 

354 "description": facet.description, 

355 "values": [ 

356 { 

357 "value": fv.value, 

358 "count": fv.count, 

359 "display_name": fv.display_name, 

360 "description": fv.description 

361 } 

362 for fv in facet.get_top_values(10) 

363 ] 

364 } 

365 for facet in faceted_results.facets 

366 ], 

367 "total_results": faceted_results.total_results, 

368 "filtered_count": faceted_results.filtered_count, 

369 "applied_filters": [ 

370 { 

371 "facet_type": f.facet_type.value, 

372 "values": f.values, 

373 "operator": f.operator 

374 } 

375 for f in faceted_results.applied_filters 

376 ], 

377 "generation_time_ms": faceted_results.generation_time_ms 

378 } 

379 

380 except Exception as e: 

381 self.logger.error("Faceted search failed", error=str(e), query=query) 

382 raise 

383 

384 async def get_facet_suggestions( 

385 self, 

386 query: str, 

387 current_filters: list[dict] | None = None, 

388 limit: int = 20 

389 ) -> list[dict]: 

390 """ 

391 🔥 Phase 1.3: Get facet refinement suggestions based on current search. 

392  

393 Args: 

394 query: Current search query 

395 current_filters: Currently applied facet filters 

396 limit: Number of results to analyze for suggestions 

397  

398 Returns: 

399 List of facet refinement suggestions with impact estimates 

400 """ 

401 if not self.hybrid_search: 

402 raise RuntimeError("Search engine not initialized") 

403 

404 try: 

405 # First get current search results 

406 current_results = await self.hybrid_search.search( 

407 query=query, 

408 limit=limit, 

409 source_types=None, 

410 project_ids=None 

411 ) 

412 

413 # Convert filter dictionaries to FacetFilter objects 

414 filter_objects = [] 

415 if current_filters: 

416 from .enhanced.faceted_search import FacetFilter, FacetType 

417 for filter_dict in current_filters: 

418 facet_type = FacetType(filter_dict["facet_type"]) 

419 filter_objects.append(FacetFilter( 

420 facet_type=facet_type, 

421 values=filter_dict["values"], 

422 operator=filter_dict.get("operator", "OR") 

423 )) 

424 

425 suggestions = self.hybrid_search.suggest_facet_refinements( 

426 current_results=current_results, 

427 current_filters=filter_objects 

428 ) 

429 

430 return suggestions 

431 

432 except Exception as e: 

433 self.logger.error("Facet suggestions failed", error=str(e), query=query) 

434 raise 

435 

436 # 🔥 Phase 2.3: Cross-Document Intelligence MCP Interface 

437 

438 async def analyze_document_relationships( 

439 self, 

440 query: str, 

441 limit: int = 20, 

442 source_types: list[str] | None = None, 

443 project_ids: list[str] | None = None 

444 ) -> dict[str, Any]: 

445 """ 

446 🔥 Phase 2.3: Analyze relationships between documents from search results. 

447  

448 Args: 

449 query: Search query to get documents for analysis 

450 limit: Maximum number of documents to analyze 

451 source_types: Optional list of source types to filter by 

452 project_ids: Optional list of project IDs to filter by 

453  

454 Returns: 

455 Comprehensive cross-document relationship analysis 

456 """ 

457 if not self.hybrid_search: 

458 raise RuntimeError("Search engine not initialized") 

459 

460 try: 

461 # Get documents for analysis 

462 documents = await self.hybrid_search.search( 

463 query=query, 

464 limit=limit, 

465 source_types=source_types, 

466 project_ids=project_ids 

467 ) 

468 

469 if len(documents) < 2: 

470 return { 

471 "error": "Need at least 2 documents for relationship analysis", 

472 "document_count": len(documents) 

473 } 

474 

475 # Perform cross-document analysis 

476 analysis = await self.hybrid_search.analyze_document_relationships(documents) 

477 

478 # Add query metadata 

479 analysis["query_metadata"] = { 

480 "original_query": query, 

481 "document_count": len(documents), 

482 "source_types": source_types, 

483 "project_ids": project_ids 

484 } 

485 

486 return analysis 

487 

488 except Exception as e: 

489 self.logger.error("Document relationship analysis failed", error=str(e), query=query) 

490 raise 

491 

492 async def find_similar_documents( 

493 self, 

494 target_query: str, 

495 comparison_query: str, 

496 similarity_metrics: list[str] | None = None, 

497 max_similar: int = 5, 

498 source_types: list[str] | None = None, 

499 project_ids: list[str] | None = None 

500 ) -> list[dict[str, Any]]: 

501 """ 

502 🔥 Phase 2.3: Find documents similar to a target document. 

503  

504 Args: 

505 target_query: Query to find the target document 

506 comparison_query: Query to get documents to compare against 

507 similarity_metrics: Similarity metrics to use 

508 max_similar: Maximum number of similar documents to return 

509 source_types: Optional list of source types to filter by 

510 project_ids: Optional list of project IDs to filter by 

511  

512 Returns: 

513 List of similar documents with similarity scores 

514 """ 

515 if not self.hybrid_search: 

516 raise RuntimeError("Search engine not initialized") 

517 

518 try: 

519 # Get target document (first result from target query) 

520 target_results = await self.hybrid_search.search( 

521 query=target_query, 

522 limit=1, 

523 source_types=source_types, 

524 project_ids=project_ids 

525 ) 

526 

527 if not target_results: 

528 return [] 

529 

530 target_document = target_results[0] 

531 

532 # Get comparison documents 

533 comparison_documents = await self.hybrid_search.search( 

534 query=comparison_query, 

535 limit=20, 

536 source_types=source_types, 

537 project_ids=project_ids 

538 ) 

539 

540 # Convert string metrics to SimilarityMetric enums 

541 metrics = None 

542 if similarity_metrics: 

543 metrics = [SimilarityMetric(metric) for metric in similarity_metrics] 

544 

545 # Find similar documents 

546 similar_docs = await self.hybrid_search.find_similar_documents( 

547 target_document=target_document, 

548 documents=comparison_documents, 

549 similarity_metrics=metrics, 

550 max_similar=max_similar 

551 ) 

552 

553 return similar_docs 

554 

555 except Exception as e: 

556 self.logger.error("Similar documents search failed", error=str(e)) 

557 raise 

558 

559 async def detect_document_conflicts( 

560 self, 

561 query: str, 

562 limit: int = 15, 

563 source_types: list[str] | None = None, 

564 project_ids: list[str] | None = None 

565 ) -> dict[str, Any]: 

566 """ 

567 🔥 Phase 2.3: Detect conflicts between documents. 

568  

569 Args: 

570 query: Search query to get documents for conflict analysis 

571 limit: Maximum number of documents to analyze 

572 source_types: Optional list of source types to filter by 

573 project_ids: Optional list of project IDs to filter by 

574  

575 Returns: 

576 Conflict analysis with detected conflicts and resolution suggestions 

577 """ 

578 if not self.hybrid_search: 

579 raise RuntimeError("Search engine not initialized") 

580 

581 try: 

582 # Get documents for conflict analysis 

583 documents = await self.hybrid_search.search( 

584 query=query, 

585 limit=limit, 

586 source_types=source_types, 

587 project_ids=project_ids 

588 ) 

589 

590 if len(documents) < 2: 

591 return { 

592 "conflicts": [], 

593 "resolution_suggestions": [], 

594 "message": "Need at least 2 documents for conflict detection", 

595 "document_count": len(documents) 

596 } 

597 

598 # Detect conflicts 

599 conflicts = await self.hybrid_search.detect_document_conflicts(documents) 

600 

601 # Add query metadata 

602 conflicts["query_metadata"] = { 

603 "original_query": query, 

604 "document_count": len(documents), 

605 "source_types": source_types, 

606 "project_ids": project_ids 

607 } 

608 

609 return conflicts 

610 

611 except Exception as e: 

612 self.logger.error("Conflict detection failed", error=str(e), query=query) 

613 raise 

614 

615 async def find_complementary_content( 

616 self, 

617 target_query: str, 

618 context_query: str, 

619 max_recommendations: int = 5, 

620 source_types: list[str] | None = None, 

621 project_ids: list[str] | None = None 

622 ) -> list[dict[str, Any]]: 

623 """ 

624 🔥 Phase 2.3: Find content that complements a target document. 

625  

626 Args: 

627 target_query: Query to find the target document 

628 context_query: Query to get contextual documents 

629 max_recommendations: Maximum number of recommendations 

630 source_types: Optional list of source types to filter by 

631 project_ids: Optional list of project IDs to filter by 

632  

633 Returns: 

634 List of complementary documents with recommendation reasons 

635 """ 

636 if not self.hybrid_search: 

637 raise RuntimeError("Search engine not initialized") 

638 

639 try: 

640 self.logger.info(f"🔍 Step 1: Searching for target document with query: '{target_query}'") 

641 # Get target document 

642 target_results = await self.hybrid_search.search( 

643 query=target_query, 

644 limit=1, 

645 source_types=source_types, 

646 project_ids=project_ids 

647 ) 

648 

649 self.logger.info(f"🎯 Target search returned {len(target_results)} results") 

650 if not target_results: 

651 self.logger.warning("No target document found!") 

652 return [] 

653 

654 target_document = target_results[0] 

655 self.logger.info(f"🎯 Target document: {target_document.source_title}") 

656 

657 self.logger.info(f"🔍 Step 2: Searching for context documents with query: '{context_query}'") 

658 # Get context documents 

659 context_documents = await self.hybrid_search.search( 

660 query=context_query, 

661 limit=20, 

662 source_types=source_types, 

663 project_ids=project_ids 

664 ) 

665 

666 self.logger.info(f"📚 Context search returned {len(context_documents)} documents") 

667 

668 self.logger.info(f"🔍 Step 3: Finding complementary content...") 

669 # Find complementary content 

670 complementary = await self.hybrid_search.find_complementary_content( 

671 target_document=target_document, 

672 documents=context_documents, 

673 max_recommendations=max_recommendations 

674 ) 

675 

676 self.logger.info(f"✅ Found {len(complementary)} complementary recommendations") 

677 return complementary 

678 

679 except Exception as e: 

680 self.logger.error("Complementary content search failed", error=str(e)) 

681 raise 

682 

683 async def cluster_documents( 

684 self, 

685 query: str, 

686 strategy: str = "mixed_features", 

687 max_clusters: int = 10, 

688 min_cluster_size: int = 2, 

689 limit: int = 25, 

690 source_types: list[str] | None = None, 

691 project_ids: list[str] | None = None 

692 ) -> dict[str, Any]: 

693 """ 

694 🔥 Phase 2.3: Cluster documents based on similarity and relationships. 

695  

696 Args: 

697 query: Search query to get documents for clustering 

698 strategy: Clustering strategy (mixed_features, entity_based, topic_based, project_based) 

699 max_clusters: Maximum number of clusters to create 

700 min_cluster_size: Minimum size for a cluster 

701 limit: Maximum number of documents to cluster 

702 source_types: Optional list of source types to filter by 

703 project_ids: Optional list of project IDs to filter by 

704  

705 Returns: 

706 Document clusters with metadata and relationships 

707 """ 

708 if not self.hybrid_search: 

709 raise RuntimeError("Search engine not initialized") 

710 

711 try: 

712 # Get documents for clustering 

713 documents = await self.hybrid_search.search( 

714 query=query, 

715 limit=limit, 

716 source_types=source_types, 

717 project_ids=project_ids 

718 ) 

719 

720 if len(documents) < min_cluster_size: 

721 return { 

722 "clusters": [], 

723 "clustering_metadata": { 

724 "message": f"Need at least {min_cluster_size} documents for clustering", 

725 "document_count": len(documents) 

726 } 

727 } 

728 

729 # Convert strategy string to enum 

730 clustering_strategy = ClusteringStrategy(strategy) 

731 

732 # Cluster documents 

733 cluster_results = await self.hybrid_search.cluster_documents( 

734 documents=documents, 

735 strategy=clustering_strategy, 

736 max_clusters=max_clusters, 

737 min_cluster_size=min_cluster_size 

738 ) 

739 

740 # Add query metadata 

741 cluster_results["clustering_metadata"].update({ 

742 "original_query": query, 

743 "source_types": source_types, 

744 "project_ids": project_ids 

745 }) 

746 

747 return cluster_results 

748 

749 except Exception as e: 

750 self.logger.error("Document clustering failed", error=str(e), query=query) 

751 raise