Coverage for src/qdrant_loader_mcp_server/search/engine.py: 55%
197 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
1"""Search engine service for the MCP server."""
3from typing import Any
5from openai import AsyncOpenAI
6from qdrant_client import AsyncQdrantClient
7from qdrant_client.http import models
9from ..config import OpenAIConfig, QdrantConfig
10from ..utils.logging import LoggingConfig
11from .hybrid_search import HybridSearchEngine
12from .models import SearchResult
13# 🔥 NEW: Import Phase 1.2 topic chaining components
14from .enhanced.topic_search_chain import TopicSearchChain, ChainStrategy
15# 🔥 NEW: Import Phase 2.3 cross-document intelligence components
16from .enhanced.cross_document_intelligence import SimilarityMetric, ClusteringStrategy
18logger = LoggingConfig.get_logger(__name__)
21class SearchEngine:
22 """Main search engine that orchestrates query processing and search."""
24 def __init__(self):
25 """Initialize the search engine."""
26 self.client: AsyncQdrantClient | None = None
27 self.config: QdrantConfig | None = None
28 self.openai_client: AsyncOpenAI | None = None
29 self.hybrid_search: HybridSearchEngine | None = None
30 self.logger = LoggingConfig.get_logger(__name__)
32 async def initialize(
33 self, config: QdrantConfig, openai_config: OpenAIConfig
34 ) -> None:
35 """Initialize the search engine with configuration."""
36 self.config = config
37 try:
38 self.client = AsyncQdrantClient(url=config.url, api_key=config.api_key)
39 self.openai_client = AsyncOpenAI(api_key=openai_config.api_key)
41 # Ensure collection exists
42 if self.client is None:
43 raise RuntimeError("Failed to initialize Qdrant client")
45 collections = await self.client.get_collections()
46 if not any(c.name == config.collection_name for c in collections.collections):
47 await self.client.create_collection(
48 collection_name=config.collection_name,
49 vectors_config=models.VectorParams(
50 size=1536, # Default size for OpenAI embeddings
51 distance=models.Distance.COSINE,
52 ),
53 )
55 # Initialize hybrid search
56 if self.client and self.openai_client:
57 self.hybrid_search = HybridSearchEngine(
58 qdrant_client=self.client,
59 openai_client=self.openai_client,
60 collection_name=config.collection_name,
61 )
63 self.logger.info("Successfully connected to Qdrant", url=config.url)
64 except Exception as e:
65 self.logger.error(
66 "Failed to connect to Qdrant server",
67 error=str(e),
68 url=config.url,
69 hint="Make sure Qdrant is running and accessible at the configured URL",
70 )
71 raise RuntimeError(
72 f"Failed to connect to Qdrant server at {config.url}. "
73 "Please ensure Qdrant is running and accessible."
74 ) from None # Suppress the original exception
76 async def cleanup(self) -> None:
77 """Cleanup resources."""
78 if self.client:
79 await self.client.close()
80 self.client = None
82 async def search(
83 self,
84 query: str,
85 source_types: list[str] | None = None,
86 limit: int = 5,
87 project_ids: list[str] | None = None,
88 ) -> list[SearchResult]:
89 """Search for documents using hybrid search.
91 Args:
92 query: Search query text
93 source_types: Optional list of source types to filter by
94 limit: Maximum number of results to return
95 project_ids: Optional list of project IDs to filter by
96 """
97 if not self.hybrid_search:
98 raise RuntimeError("Search engine not initialized")
100 self.logger.debug(
101 "Performing search",
102 query=query,
103 source_types=source_types,
104 limit=limit,
105 project_ids=project_ids,
106 )
108 try:
109 results = await self.hybrid_search.search(
110 query=query,
111 source_types=source_types,
112 limit=limit,
113 project_ids=project_ids,
114 )
116 self.logger.info(
117 "Search completed",
118 query=query,
119 result_count=len(results),
120 project_ids=project_ids,
121 )
123 return results
124 except Exception as e:
125 self.logger.error("Search failed", error=str(e), query=query)
126 raise
128 async def generate_topic_chain(
129 self,
130 query: str,
131 strategy: str = "mixed_exploration",
132 max_links: int = 5
133 ) -> TopicSearchChain:
134 """🔥 NEW: Generate a topic-driven search chain for progressive discovery.
136 Args:
137 query: Original search query
138 strategy: Chain generation strategy (breadth_first, depth_first, relevance_ranked, mixed_exploration)
139 max_links: Maximum number of chain links to generate
141 Returns:
142 TopicSearchChain with progressive exploration queries
143 """
144 if not self.hybrid_search:
145 raise RuntimeError("Search engine not initialized")
147 # Convert string strategy to enum
148 try:
149 chain_strategy = ChainStrategy(strategy)
150 except ValueError:
151 self.logger.warning(f"Unknown strategy '{strategy}', using mixed_exploration")
152 chain_strategy = ChainStrategy.MIXED_EXPLORATION
154 self.logger.debug(
155 "Generating topic search chain",
156 query=query,
157 strategy=strategy,
158 max_links=max_links
159 )
161 try:
162 topic_chain = await self.hybrid_search.generate_topic_search_chain(
163 query=query,
164 strategy=chain_strategy,
165 max_links=max_links
166 )
168 self.logger.info(
169 "Topic chain generation completed",
170 query=query,
171 chain_length=len(topic_chain.chain_links),
172 topics_covered=topic_chain.total_topics_covered,
173 discovery_potential=f"{topic_chain.estimated_discovery_potential:.2f}"
174 )
176 return topic_chain
177 except Exception as e:
178 self.logger.error("Topic chain generation failed", error=str(e), query=query)
179 raise
181 async def execute_topic_chain(
182 self,
183 topic_chain: TopicSearchChain,
184 results_per_link: int = 3,
185 source_types: list[str] | None = None,
186 project_ids: list[str] | None = None
187 ) -> dict[str, list[SearchResult]]:
188 """🔥 NEW: Execute searches for all links in a topic chain.
190 Args:
191 topic_chain: The topic search chain to execute
192 results_per_link: Number of results per chain link
193 source_types: Optional source type filters
194 project_ids: Optional project ID filters
196 Returns:
197 Dictionary mapping queries to search results
198 """
199 if not self.hybrid_search:
200 raise RuntimeError("Search engine not initialized")
202 self.logger.debug(
203 "Executing topic chain search",
204 original_query=topic_chain.original_query,
205 chain_length=len(topic_chain.chain_links),
206 results_per_link=results_per_link
207 )
209 try:
210 chain_results = await self.hybrid_search.execute_topic_chain_search(
211 topic_chain=topic_chain,
212 results_per_link=results_per_link,
213 source_types=source_types,
214 project_ids=project_ids
215 )
217 total_results = sum(len(results) for results in chain_results.values())
218 self.logger.info(
219 "Topic chain execution completed",
220 original_query=topic_chain.original_query,
221 total_queries=len(chain_results),
222 total_results=total_results
223 )
225 return chain_results
226 except Exception as e:
227 self.logger.error("Topic chain execution failed", error=str(e))
228 raise
230 async def search_with_topic_chain(
231 self,
232 query: str,
233 strategy: str = "mixed_exploration",
234 max_links: int = 5,
235 results_per_link: int = 3,
236 source_types: list[str] | None = None,
237 project_ids: list[str] | None = None
238 ) -> dict[str, list[SearchResult]]:
239 """🔥 NEW: Combined method to generate and execute a topic search chain.
241 Args:
242 query: Original search query
243 strategy: Chain generation strategy
244 max_links: Maximum chain links
245 results_per_link: Results per link
246 source_types: Optional source filters
247 project_ids: Optional project filters
249 Returns:
250 Dictionary mapping chain queries to their results
251 """
252 self.logger.debug(
253 "Starting topic chain search workflow",
254 query=query,
255 strategy=strategy,
256 max_links=max_links,
257 results_per_link=results_per_link
258 )
260 try:
261 # Generate topic chain
262 topic_chain = await self.generate_topic_chain(
263 query=query,
264 strategy=strategy,
265 max_links=max_links
266 )
268 # Execute the chain
269 chain_results = await self.execute_topic_chain(
270 topic_chain=topic_chain,
271 results_per_link=results_per_link,
272 source_types=source_types,
273 project_ids=project_ids
274 )
276 self.logger.info(
277 "Topic chain search workflow completed",
278 query=query,
279 total_queries=len(chain_results),
280 total_results=sum(len(results) for results in chain_results.values()),
281 discovery_potential=f"{topic_chain.estimated_discovery_potential:.2f}"
282 )
284 return chain_results
285 except Exception as e:
286 self.logger.error("Topic chain search workflow failed", error=str(e), query=query)
287 raise
289 # ============================================================================
290 # 🔥 Phase 1.3: Dynamic Faceted Search Interface Methods
291 # ============================================================================
293 async def search_with_facets(
294 self,
295 query: str,
296 limit: int = 5,
297 source_types: list[str] | None = None,
298 project_ids: list[str] | None = None,
299 facet_filters: list[dict] | None = None,
300 ) -> dict:
301 """
302 🔥 Phase 1.3: Perform faceted search with dynamic facet generation.
304 Returns search results with generated facets for interactive filtering.
306 Args:
307 query: Search query
308 limit: Maximum number of results to return
309 source_types: Optional list of source types to filter by
310 project_ids: Optional list of project IDs to filter by
311 facet_filters: Optional list of facet filters to apply
313 Returns:
314 Dictionary containing:
315 - results: List of search results
316 - facets: List of generated facets with counts
317 - total_results: Total results before facet filtering
318 - filtered_count: Results after facet filtering
319 - applied_filters: Currently applied facet filters
320 """
321 if not self.hybrid_search:
322 raise RuntimeError("Search engine not initialized")
324 try:
325 # Convert facet filter dictionaries to FacetFilter objects if provided
326 filter_objects = []
327 if facet_filters:
328 from .enhanced.faceted_search import FacetFilter, FacetType
329 for filter_dict in facet_filters:
330 facet_type = FacetType(filter_dict["facet_type"])
331 filter_objects.append(FacetFilter(
332 facet_type=facet_type,
333 values=filter_dict["values"],
334 operator=filter_dict.get("operator", "OR")
335 ))
337 faceted_results = await self.hybrid_search.search_with_facets(
338 query=query,
339 limit=limit,
340 source_types=source_types,
341 project_ids=project_ids,
342 facet_filters=filter_objects,
343 generate_facets=True
344 )
346 # Convert to MCP-friendly format
347 return {
348 "results": faceted_results.results,
349 "facets": [
350 {
351 "type": facet.facet_type.value,
352 "name": facet.name,
353 "display_name": facet.display_name,
354 "description": facet.description,
355 "values": [
356 {
357 "value": fv.value,
358 "count": fv.count,
359 "display_name": fv.display_name,
360 "description": fv.description
361 }
362 for fv in facet.get_top_values(10)
363 ]
364 }
365 for facet in faceted_results.facets
366 ],
367 "total_results": faceted_results.total_results,
368 "filtered_count": faceted_results.filtered_count,
369 "applied_filters": [
370 {
371 "facet_type": f.facet_type.value,
372 "values": f.values,
373 "operator": f.operator
374 }
375 for f in faceted_results.applied_filters
376 ],
377 "generation_time_ms": faceted_results.generation_time_ms
378 }
380 except Exception as e:
381 self.logger.error("Faceted search failed", error=str(e), query=query)
382 raise
384 async def get_facet_suggestions(
385 self,
386 query: str,
387 current_filters: list[dict] | None = None,
388 limit: int = 20
389 ) -> list[dict]:
390 """
391 🔥 Phase 1.3: Get facet refinement suggestions based on current search.
393 Args:
394 query: Current search query
395 current_filters: Currently applied facet filters
396 limit: Number of results to analyze for suggestions
398 Returns:
399 List of facet refinement suggestions with impact estimates
400 """
401 if not self.hybrid_search:
402 raise RuntimeError("Search engine not initialized")
404 try:
405 # First get current search results
406 current_results = await self.hybrid_search.search(
407 query=query,
408 limit=limit,
409 source_types=None,
410 project_ids=None
411 )
413 # Convert filter dictionaries to FacetFilter objects
414 filter_objects = []
415 if current_filters:
416 from .enhanced.faceted_search import FacetFilter, FacetType
417 for filter_dict in current_filters:
418 facet_type = FacetType(filter_dict["facet_type"])
419 filter_objects.append(FacetFilter(
420 facet_type=facet_type,
421 values=filter_dict["values"],
422 operator=filter_dict.get("operator", "OR")
423 ))
425 suggestions = self.hybrid_search.suggest_facet_refinements(
426 current_results=current_results,
427 current_filters=filter_objects
428 )
430 return suggestions
432 except Exception as e:
433 self.logger.error("Facet suggestions failed", error=str(e), query=query)
434 raise
436 # 🔥 Phase 2.3: Cross-Document Intelligence MCP Interface
438 async def analyze_document_relationships(
439 self,
440 query: str,
441 limit: int = 20,
442 source_types: list[str] | None = None,
443 project_ids: list[str] | None = None
444 ) -> dict[str, Any]:
445 """
446 🔥 Phase 2.3: Analyze relationships between documents from search results.
448 Args:
449 query: Search query to get documents for analysis
450 limit: Maximum number of documents to analyze
451 source_types: Optional list of source types to filter by
452 project_ids: Optional list of project IDs to filter by
454 Returns:
455 Comprehensive cross-document relationship analysis
456 """
457 if not self.hybrid_search:
458 raise RuntimeError("Search engine not initialized")
460 try:
461 # Get documents for analysis
462 documents = await self.hybrid_search.search(
463 query=query,
464 limit=limit,
465 source_types=source_types,
466 project_ids=project_ids
467 )
469 if len(documents) < 2:
470 return {
471 "error": "Need at least 2 documents for relationship analysis",
472 "document_count": len(documents)
473 }
475 # Perform cross-document analysis
476 analysis = await self.hybrid_search.analyze_document_relationships(documents)
478 # Add query metadata
479 analysis["query_metadata"] = {
480 "original_query": query,
481 "document_count": len(documents),
482 "source_types": source_types,
483 "project_ids": project_ids
484 }
486 return analysis
488 except Exception as e:
489 self.logger.error("Document relationship analysis failed", error=str(e), query=query)
490 raise
492 async def find_similar_documents(
493 self,
494 target_query: str,
495 comparison_query: str,
496 similarity_metrics: list[str] | None = None,
497 max_similar: int = 5,
498 source_types: list[str] | None = None,
499 project_ids: list[str] | None = None
500 ) -> list[dict[str, Any]]:
501 """
502 🔥 Phase 2.3: Find documents similar to a target document.
504 Args:
505 target_query: Query to find the target document
506 comparison_query: Query to get documents to compare against
507 similarity_metrics: Similarity metrics to use
508 max_similar: Maximum number of similar documents to return
509 source_types: Optional list of source types to filter by
510 project_ids: Optional list of project IDs to filter by
512 Returns:
513 List of similar documents with similarity scores
514 """
515 if not self.hybrid_search:
516 raise RuntimeError("Search engine not initialized")
518 try:
519 # Get target document (first result from target query)
520 target_results = await self.hybrid_search.search(
521 query=target_query,
522 limit=1,
523 source_types=source_types,
524 project_ids=project_ids
525 )
527 if not target_results:
528 return []
530 target_document = target_results[0]
532 # Get comparison documents
533 comparison_documents = await self.hybrid_search.search(
534 query=comparison_query,
535 limit=20,
536 source_types=source_types,
537 project_ids=project_ids
538 )
540 # Convert string metrics to SimilarityMetric enums
541 metrics = None
542 if similarity_metrics:
543 metrics = [SimilarityMetric(metric) for metric in similarity_metrics]
545 # Find similar documents
546 similar_docs = await self.hybrid_search.find_similar_documents(
547 target_document=target_document,
548 documents=comparison_documents,
549 similarity_metrics=metrics,
550 max_similar=max_similar
551 )
553 return similar_docs
555 except Exception as e:
556 self.logger.error("Similar documents search failed", error=str(e))
557 raise
559 async def detect_document_conflicts(
560 self,
561 query: str,
562 limit: int = 15,
563 source_types: list[str] | None = None,
564 project_ids: list[str] | None = None
565 ) -> dict[str, Any]:
566 """
567 🔥 Phase 2.3: Detect conflicts between documents.
569 Args:
570 query: Search query to get documents for conflict analysis
571 limit: Maximum number of documents to analyze
572 source_types: Optional list of source types to filter by
573 project_ids: Optional list of project IDs to filter by
575 Returns:
576 Conflict analysis with detected conflicts and resolution suggestions
577 """
578 if not self.hybrid_search:
579 raise RuntimeError("Search engine not initialized")
581 try:
582 # Get documents for conflict analysis
583 documents = await self.hybrid_search.search(
584 query=query,
585 limit=limit,
586 source_types=source_types,
587 project_ids=project_ids
588 )
590 if len(documents) < 2:
591 return {
592 "conflicts": [],
593 "resolution_suggestions": [],
594 "message": "Need at least 2 documents for conflict detection",
595 "document_count": len(documents)
596 }
598 # Detect conflicts
599 conflicts = await self.hybrid_search.detect_document_conflicts(documents)
601 # Add query metadata
602 conflicts["query_metadata"] = {
603 "original_query": query,
604 "document_count": len(documents),
605 "source_types": source_types,
606 "project_ids": project_ids
607 }
609 return conflicts
611 except Exception as e:
612 self.logger.error("Conflict detection failed", error=str(e), query=query)
613 raise
615 async def find_complementary_content(
616 self,
617 target_query: str,
618 context_query: str,
619 max_recommendations: int = 5,
620 source_types: list[str] | None = None,
621 project_ids: list[str] | None = None
622 ) -> list[dict[str, Any]]:
623 """
624 🔥 Phase 2.3: Find content that complements a target document.
626 Args:
627 target_query: Query to find the target document
628 context_query: Query to get contextual documents
629 max_recommendations: Maximum number of recommendations
630 source_types: Optional list of source types to filter by
631 project_ids: Optional list of project IDs to filter by
633 Returns:
634 List of complementary documents with recommendation reasons
635 """
636 if not self.hybrid_search:
637 raise RuntimeError("Search engine not initialized")
639 try:
640 self.logger.info(f"🔍 Step 1: Searching for target document with query: '{target_query}'")
641 # Get target document
642 target_results = await self.hybrid_search.search(
643 query=target_query,
644 limit=1,
645 source_types=source_types,
646 project_ids=project_ids
647 )
649 self.logger.info(f"🎯 Target search returned {len(target_results)} results")
650 if not target_results:
651 self.logger.warning("No target document found!")
652 return []
654 target_document = target_results[0]
655 self.logger.info(f"🎯 Target document: {target_document.source_title}")
657 self.logger.info(f"🔍 Step 2: Searching for context documents with query: '{context_query}'")
658 # Get context documents
659 context_documents = await self.hybrid_search.search(
660 query=context_query,
661 limit=20,
662 source_types=source_types,
663 project_ids=project_ids
664 )
666 self.logger.info(f"📚 Context search returned {len(context_documents)} documents")
668 self.logger.info(f"🔍 Step 3: Finding complementary content...")
669 # Find complementary content
670 complementary = await self.hybrid_search.find_complementary_content(
671 target_document=target_document,
672 documents=context_documents,
673 max_recommendations=max_recommendations
674 )
676 self.logger.info(f"✅ Found {len(complementary)} complementary recommendations")
677 return complementary
679 except Exception as e:
680 self.logger.error("Complementary content search failed", error=str(e))
681 raise
683 async def cluster_documents(
684 self,
685 query: str,
686 strategy: str = "mixed_features",
687 max_clusters: int = 10,
688 min_cluster_size: int = 2,
689 limit: int = 25,
690 source_types: list[str] | None = None,
691 project_ids: list[str] | None = None
692 ) -> dict[str, Any]:
693 """
694 🔥 Phase 2.3: Cluster documents based on similarity and relationships.
696 Args:
697 query: Search query to get documents for clustering
698 strategy: Clustering strategy (mixed_features, entity_based, topic_based, project_based)
699 max_clusters: Maximum number of clusters to create
700 min_cluster_size: Minimum size for a cluster
701 limit: Maximum number of documents to cluster
702 source_types: Optional list of source types to filter by
703 project_ids: Optional list of project IDs to filter by
705 Returns:
706 Document clusters with metadata and relationships
707 """
708 if not self.hybrid_search:
709 raise RuntimeError("Search engine not initialized")
711 try:
712 # Get documents for clustering
713 documents = await self.hybrid_search.search(
714 query=query,
715 limit=limit,
716 source_types=source_types,
717 project_ids=project_ids
718 )
720 if len(documents) < min_cluster_size:
721 return {
722 "clusters": [],
723 "clustering_metadata": {
724 "message": f"Need at least {min_cluster_size} documents for clustering",
725 "document_count": len(documents)
726 }
727 }
729 # Convert strategy string to enum
730 clustering_strategy = ClusteringStrategy(strategy)
732 # Cluster documents
733 cluster_results = await self.hybrid_search.cluster_documents(
734 documents=documents,
735 strategy=clustering_strategy,
736 max_clusters=max_clusters,
737 min_cluster_size=min_cluster_size
738 )
740 # Add query metadata
741 cluster_results["clustering_metadata"].update({
742 "original_query": query,
743 "source_types": source_types,
744 "project_ids": project_ids
745 })
747 return cluster_results
749 except Exception as e:
750 self.logger.error("Document clustering failed", error=str(e), query=query)
751 raise