Coverage for src/qdrant_loader_mcp_server/search/hybrid/api.py: 77%
202 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3import logging
4from typing import TYPE_CHECKING, Any
6if TYPE_CHECKING:
7 from ..components.models.hybrid import HybridSearchResult
8 from ..enhanced.cdi.models import SimilarityMetric
9 from ..enhanced.faceted_search import FacetedSearchResults, FacetFilter
10 from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain
12# Module-level logger with a NullHandler to avoid "No handler" warnings when
13# the application's logging configuration does not attach any handlers.
14logger = logging.getLogger(__name__)
15logger.addHandler(logging.NullHandler())
18class HybridEngineAPI:
19 def __init__(
20 self,
21 *,
22 logger: Any | None = None,
23 enable_intent_adaptation: bool = True,
24 knowledge_graph: Any | None = None,
25 min_score: float = 0.0,
26 # Optional components (may be wired by a builder in concrete engines)
27 vector_search_service: Any | None = None,
28 keyword_search_service: Any | None = None,
29 query_processor: Any | None = None,
30 result_combiner: Any | None = None,
31 metadata_extractor: Any | None = None,
32 faceted_search_engine: Any | None = None,
33 intent_classifier: Any | None = None,
34 adaptive_strategy: Any | None = None,
35 ) -> None:
36 # Defer logger setup to central LoggingConfig if not provided
37 if logger is None:
38 try:
39 from ...utils.logging import (
40 LoggingConfig, # Lazy import to avoid cycles
41 )
43 self.logger = LoggingConfig.get_logger(__name__)
44 except Exception:
45 # Fallback to module logger so logs are not silently dropped
46 self.logger = logging.getLogger(__name__)
47 else:
48 self.logger = logger
50 # Core toggles and context
51 self.enable_intent_adaptation = enable_intent_adaptation
52 self.knowledge_graph = knowledge_graph
53 self.min_score = min_score
55 # Optional components used by helper wrappers
56 self.vector_search_service = vector_search_service
57 self.keyword_search_service = keyword_search_service
58 self.query_processor = query_processor
59 self.result_combiner = result_combiner
60 self.metadata_extractor = metadata_extractor
61 self.faceted_search_engine = faceted_search_engine
62 self.intent_classifier = intent_classifier
63 self.adaptive_strategy = adaptive_strategy
64 # Frequently wired later by concrete engines/builders
65 self.hybrid_pipeline = None
66 self.topic_chain_generator = None
67 self.processing_config = None
68 self._planner = None
69 self._orchestrator = None
71 async def search(
72 self,
73 query: str,
74 limit: int = 5,
75 source_types: list[str] | None = None,
76 project_ids: list[str] | None = None,
77 *,
78 session_context: dict[str, Any] | None = None,
79 behavioral_context: list[str] | None = None,
80 ) -> list[HybridSearchResult]:
81 from .orchestration.search import run_search
83 self.logger.debug(
84 f"Starting hybrid search query={query} limit={limit} source_types={source_types} project_ids={project_ids} intent_adaptation_enabled={self.enable_intent_adaptation}"
85 )
86 return await run_search(
87 self,
88 query=query,
89 limit=limit,
90 source_types=source_types,
91 project_ids=project_ids,
92 session_context=session_context,
93 behavioral_context=behavioral_context,
94 )
96 # Topic Search Chain
97 async def generate_topic_search_chain(
98 self,
99 query: str,
100 strategy: ChainStrategy | None = None,
101 max_links: int = 5,
102 initialize_from_search: bool = True,
103 ) -> TopicSearchChain:
104 from .orchestration.topic_chain import generate_topic_search_chain as _gen
106 if strategy is None:
107 from ..enhanced.topic_search_chain import ChainStrategy as _CS
109 strategy = _CS.MIXED_EXPLORATION
110 return await _gen(
111 self,
112 query=query,
113 strategy=strategy,
114 max_links=max_links,
115 initialize_from_search=initialize_from_search,
116 )
118 async def execute_topic_chain_search(
119 self,
120 topic_chain: TopicSearchChain,
121 results_per_link: int = 3,
122 source_types: list[str] | None = None,
123 project_ids: list[str] | None = None,
124 ) -> dict[str, list[HybridSearchResult]]:
125 from .orchestration.topic_chain import execute_topic_chain_search as _exec
127 return await _exec(
128 self,
129 topic_chain=topic_chain,
130 results_per_link=results_per_link,
131 source_types=source_types,
132 project_ids=project_ids,
133 )
135 async def _initialize_topic_relationships(self, sample_query: str) -> None:
136 from .orchestration.topic_chain import _initialize_topic_relationships as _init
138 await _init(self, sample_query)
140 # Topic chain initialization state accessor to avoid private attribute access
141 @property
142 def is_topic_chains_initialized(self) -> bool:
143 """Public read-only accessor for topic chains initialization state."""
144 return getattr(self, "_topic_chains_initialized", False)
146 def mark_topic_chains_initialized(self) -> None:
147 """Mark topic chain relationships as initialized via public API."""
148 self._topic_chains_initialized = True
150 def set_topic_chains_initialized(self, initialized: bool) -> None:
151 """Explicitly set topic chain initialization state via public API."""
152 self._topic_chains_initialized = bool(initialized)
154 # Faceted Search
155 async def search_with_facets(
156 self,
157 query: str,
158 limit: int = 5,
159 source_types: list[str] | None = None,
160 project_ids: list[str] | None = None,
161 facet_filters: list[FacetFilter] | None = None,
162 generate_facets: bool = True,
163 session_context: dict[str, Any] | None = None,
164 behavioral_context: list[str] | None = None,
165 ) -> FacetedSearchResults:
166 from .orchestration.facets import search_with_facets as _search_with_facets
168 return await _search_with_facets(
169 self,
170 query=query,
171 limit=limit,
172 source_types=source_types,
173 project_ids=project_ids,
174 facet_filters=facet_filters,
175 generate_facets=generate_facets,
176 session_context=session_context,
177 behavioral_context=behavioral_context,
178 )
180 # CDI
181 async def analyze_document_relationships(
182 self, documents: list[HybridSearchResult]
183 ) -> dict[str, Any]:
184 from .orchestration.cdi import analyze_document_relationships as _analyze
186 return await _analyze(self, documents)
188 async def find_similar_documents(
189 self,
190 target_document: HybridSearchResult,
191 documents: list[HybridSearchResult],
192 similarity_metrics: list[SimilarityMetric] | None = None,
193 max_similar: int = 5,
194 ) -> list[dict[str, Any]]:
195 from .orchestration.cdi import find_similar_documents as _find
197 return await _find(
198 self,
199 target_document=target_document,
200 documents=documents,
201 similarity_metrics=similarity_metrics,
202 max_similar=max_similar,
203 )
205 async def detect_document_conflicts(
206 self, documents: list[HybridSearchResult]
207 ) -> dict[str, Any]:
208 from .orchestration.cdi import detect_document_conflicts as _detect
210 return await _detect(self, documents)
212 async def find_complementary_content(
213 self,
214 target_document: HybridSearchResult,
215 documents: list[HybridSearchResult],
216 max_recommendations: int = 5,
217 ) -> list[dict[str, Any]]:
218 from .orchestration.cdi import find_complementary_content as _find_comp
220 return await _find_comp(
221 self,
222 target_document=target_document,
223 documents=documents,
224 max_recommendations=max_recommendations,
225 )
227 # Lookup
228 def _build_document_lookup(
229 self, documents: list[HybridSearchResult], robust: bool = False
230 ) -> dict[str, HybridSearchResult]:
231 from .components.document_lookup import build_document_lookup as _build
233 return _build(documents, robust=robust, logger=self.logger)
235 # Public delegation APIs for clustering helpers
236 def build_document_lookup(
237 self, documents: list[HybridSearchResult], robust: bool = False
238 ) -> dict[str, HybridSearchResult]:
239 """Build a document lookup table using the configured helper.
241 Args:
242 documents: List of search results to index
243 robust: Whether to include additional, sanitized keys for resilience
245 Returns:
246 Mapping from identifier keys to corresponding search results
247 """
248 return self._build_document_lookup(documents, robust=robust)
250 def _find_document_by_id(
251 self, doc_id: str, doc_lookup: dict[str, HybridSearchResult]
252 ) -> HybridSearchResult | None:
253 from .components.document_lookup import find_document_by_id as _find
255 return _find(doc_id, doc_lookup, logger=self.logger)
257 def find_document_by_id(
258 self, doc_id: str, doc_lookup: dict[str, HybridSearchResult]
259 ) -> HybridSearchResult | None:
260 """Find a document by any supported identifier in the lookup map."""
261 return self._find_document_by_id(doc_id, doc_lookup)
263 async def cluster_documents(
264 self,
265 documents: list[HybridSearchResult],
266 strategy: Any | None = None,
267 max_clusters: int = 10,
268 min_cluster_size: int = 2,
269 ) -> dict[str, Any]:
270 from .orchestration.clustering import cluster_documents as _cluster
272 if strategy is None:
273 from ..enhanced.cross_document_intelligence import ClusteringStrategy as _CS
275 strategy = _CS.MIXED_FEATURES
276 return await _cluster(
277 self,
278 documents=documents,
279 strategy=strategy,
280 max_clusters=max_clusters,
281 min_cluster_size=min_cluster_size,
282 )
284 # Cluster quality
285 def _calculate_cluster_quality(
286 self, cluster: Any, cluster_documents: list[HybridSearchResult]
287 ) -> dict[str, Any]:
288 from .components.cluster_quality import calculate_cluster_quality
290 return calculate_cluster_quality(cluster, cluster_documents)
292 def calculate_cluster_quality(
293 self, cluster: Any, cluster_documents: list[HybridSearchResult]
294 ) -> dict[str, Any]:
295 """Calculate quality metrics for a cluster in a stable API."""
296 return self._calculate_cluster_quality(cluster, cluster_documents)
298 def _categorize_cluster_size(self, size: int) -> str:
299 from .components.cluster_quality import categorize_cluster_size
301 return categorize_cluster_size(size)
303 def _estimate_content_similarity(
304 self, documents: list[HybridSearchResult]
305 ) -> float:
306 from .components.cluster_quality import estimate_content_similarity
308 return estimate_content_similarity(documents)
310 def _build_enhanced_metadata(
311 self,
312 clusters: list[Any],
313 documents: list[HybridSearchResult],
314 strategy: Any,
315 processing_time: float,
316 matched_docs: int,
317 requested_docs: int,
318 ) -> dict[str, Any]:
319 from .components.cluster_quality import build_enhanced_metadata
321 return build_enhanced_metadata(
322 clusters, documents, strategy, processing_time, matched_docs, requested_docs
323 )
325 def build_enhanced_metadata(
326 self,
327 clusters: list[Any],
328 documents: list[HybridSearchResult],
329 strategy: Any,
330 processing_time: float,
331 matched_docs: int,
332 requested_docs: int,
333 ) -> dict[str, Any]:
334 """Build comprehensive clustering metadata via public API."""
335 return self._build_enhanced_metadata(
336 clusters,
337 documents,
338 strategy,
339 processing_time,
340 matched_docs,
341 requested_docs,
342 )
344 def _calculate_std(self, values: list[float]) -> float:
345 from .components.cluster_quality import calculate_std
347 return calculate_std(values)
349 def _assess_overall_quality(
350 self, clusters: list[Any], matched_docs: int, requested_docs: int
351 ) -> float:
352 from .components.cluster_quality import assess_overall_quality
354 return assess_overall_quality(clusters, matched_docs, requested_docs)
356 def _generate_clustering_recommendations(
357 self, clusters: list[Any], strategy: Any, matched_docs: int, requested_docs: int
358 ) -> dict[str, Any]:
359 from .components.cluster_quality import generate_clustering_recommendations
361 return generate_clustering_recommendations(
362 clusters, strategy, matched_docs, requested_docs
363 )
365 # Relationships
366 def _analyze_cluster_relationships(
367 self, clusters: list[Any], documents: list[HybridSearchResult]
368 ) -> list[dict[str, Any]]:
369 from .orchestration.relationships import analyze_cluster_relationships as _rel
371 return _rel(self, clusters, documents)
373 def analyze_cluster_relationships(
374 self, clusters: list[Any], documents: list[HybridSearchResult]
375 ) -> list[dict[str, Any]]:
376 """Analyze relationships between clusters in a public API."""
377 return self._analyze_cluster_relationships(clusters, documents)
379 def _analyze_cluster_pair(
380 self, cluster_a: Any, cluster_b: Any, doc_lookup: dict
381 ) -> dict[str, Any] | None:
382 from .orchestration.relationships import analyze_cluster_pair as _pair
384 return _pair(self, cluster_a, cluster_b, doc_lookup)
386 def _analyze_entity_overlap(
387 self, cluster_a: Any, cluster_b: Any
388 ) -> dict[str, Any] | None:
389 from .components.relationships import analyze_entity_overlap
391 return analyze_entity_overlap(cluster_a, cluster_b)
393 def _analyze_topic_overlap(
394 self, cluster_a: Any, cluster_b: Any
395 ) -> dict[str, Any] | None:
396 from .components.relationships import analyze_topic_overlap
398 return analyze_topic_overlap(cluster_a, cluster_b)
400 def _analyze_source_similarity(
401 self, docs_a: list, docs_b: list
402 ) -> dict[str, Any] | None:
403 from .components.relationships import analyze_source_similarity
405 return analyze_source_similarity(docs_a, docs_b)
407 def _analyze_hierarchy_relationship(
408 self, docs_a: list, docs_b: list
409 ) -> dict[str, Any] | None:
410 from .components.relationships import analyze_hierarchy_relationship
412 return analyze_hierarchy_relationship(docs_a, docs_b)
414 def _analyze_content_similarity(
415 self, docs_a: list, docs_b: list
416 ) -> dict[str, Any] | None:
417 from .components.relationships import analyze_content_similarity
419 return analyze_content_similarity(docs_a, docs_b)
421 # Stats and settings
422 def get_adaptive_search_stats(self) -> dict[str, Any]:
423 stats = {
424 "intent_adaptation_enabled": self.enable_intent_adaptation,
425 "has_knowledge_graph": self.knowledge_graph is not None,
426 }
427 if self.enable_intent_adaptation and self.intent_classifier:
428 stats.update(self.intent_classifier.get_cache_stats())
429 if self.adaptive_strategy:
430 stats.update(self.adaptive_strategy.get_strategy_stats())
431 return stats
433 def _build_conflict_settings(
434 self, search_config: Any | None
435 ) -> dict[str, Any] | None:
436 from .components.builder import build_conflict_settings
438 return build_conflict_settings(search_config)
440 # Helper wrappers
441 async def _get_embedding(self, text: str) -> list[float]:
442 if self.vector_search_service is None:
443 raise RuntimeError(
444 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _get_embedding()."
445 )
446 from .components.helpers import get_embedding
448 return await get_embedding(self.vector_search_service, text)
450 async def _expand_query(self, query: str) -> str:
451 if self.query_processor is None:
452 raise RuntimeError(
453 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _expand_query()."
454 )
455 from .components.helpers import expand_query
457 return await expand_query(self.query_processor, query)
459 async def _expand_query_aggressive(self, query: str) -> str:
460 if self.query_processor is None:
461 raise RuntimeError(
462 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _expand_query_aggressive()."
463 )
464 from .components.helpers import expand_query_aggressive
466 return await expand_query_aggressive(self.query_processor, query)
468 def _analyze_query(self, query: str) -> dict[str, Any]:
469 if self.query_processor is None:
470 raise RuntimeError(
471 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _analyze_query()."
472 )
473 from .components.helpers import analyze_query
475 return analyze_query(self.query_processor, query)
477 async def _vector_search(
478 self, query: str, limit: int, project_ids: list[str] | None = None
479 ) -> list[dict[str, Any]]:
480 if self.vector_search_service is None:
481 raise RuntimeError(
482 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _vector_search()."
483 )
484 from .components.helpers import vector_search
486 return await vector_search(
487 self.vector_search_service, query, limit, project_ids
488 )
490 async def _keyword_search(
491 self, query: str, limit: int, project_ids: list[str] | None = None
492 ) -> list[dict[str, Any]]:
493 if self.keyword_search_service is None:
494 raise RuntimeError(
495 "Keyword search service is not configured. Provide 'keyword_search_service' to HybridEngineAPI or wire it via your engine builder before calling _keyword_search()."
496 )
497 from .components.helpers import keyword_search
499 return await keyword_search(
500 self.keyword_search_service, query, limit, project_ids
501 )
503 async def _combine_results(
504 self,
505 vector_results: list[dict[str, Any]],
506 keyword_results: list[dict[str, Any]],
507 query_context: dict[str, Any],
508 limit: int,
509 source_types: list[str] | None = None,
510 project_ids: list[str] | None = None,
511 ) -> list[HybridSearchResult]:
512 if self.result_combiner is None:
513 raise RuntimeError(
514 "Result combiner is not configured. Provide 'result_combiner' to HybridEngineAPI or wire it via your engine builder before calling _combine_results()."
515 )
516 from .components.helpers import combine_results
518 return await combine_results(
519 self.result_combiner,
520 self.min_score,
521 vector_results,
522 keyword_results,
523 query_context,
524 limit,
525 source_types,
526 project_ids,
527 )
529 def _extract_metadata_info(self, metadata: dict) -> dict:
530 if self.metadata_extractor is None:
531 raise RuntimeError(
532 "Metadata extractor is not configured. Provide 'metadata_extractor' to HybridEngineAPI or wire it via your engine builder before calling _extract_metadata_info()."
533 )
534 from .components.metadata import extract_metadata_info
536 return extract_metadata_info(self.metadata_extractor, metadata)
538 def _extract_project_info(self, metadata: dict) -> dict:
539 if self.metadata_extractor is None:
540 raise RuntimeError(
541 "Metadata extractor is not configured. Provide 'metadata_extractor' to HybridEngineAPI or wire it via your engine builder before calling _extract_project_info()."
542 )
543 from .components.metadata import extract_project_info
545 return extract_project_info(self.metadata_extractor, metadata)
547 def _build_filter(self, project_ids: list[str] | None = None) -> Any:
548 if self.vector_search_service is None:
549 raise RuntimeError(
550 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _build_filter()."
551 )
552 from .components.helpers import build_filter
554 return build_filter(self.vector_search_service, project_ids)
556 def suggest_facet_refinements(
557 self,
558 current_results: list[HybridSearchResult],
559 current_filters: list[FacetFilter],
560 ) -> list[dict[str, Any]]:
561 if self.faceted_search_engine is None:
562 raise RuntimeError(
563 "Faceted search engine is not configured. Provide 'faceted_search_engine' to HybridEngineAPI or wire it via your engine builder before calling suggest_facet_refinements()."
564 )
565 from .components.facets import suggest_refinements as _suggest
567 return _suggest(self.faceted_search_engine, current_results, current_filters)
569 def generate_facets(self, results: list[HybridSearchResult]) -> list:
570 if self.faceted_search_engine is None:
571 raise RuntimeError(
572 "Faceted search engine is not configured. Provide 'faceted_search_engine' to HybridEngineAPI or wire it via your engine builder before calling generate_facets()."
573 )
574 from .components.facets import generate_facets as _generate
576 return _generate(self.faceted_search_engine, results)