Coverage for src / qdrant_loader_mcp_server / search / hybrid / api.py: 77%
202 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1from __future__ import annotations
3import logging
4from typing import TYPE_CHECKING, Any
6if TYPE_CHECKING:
7 from ..components.models.hybrid import HybridSearchResult
8 from ..enhanced.cdi.models import SimilarityMetric
9 from ..enhanced.faceted_search import FacetedSearchResults, FacetFilter
10 from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain
12# Module-level logger with a NullHandler to avoid "No handler" warnings when
13# the application's logging configuration does not attach any handlers.
14logger = logging.getLogger(__name__)
15logger.addHandler(logging.NullHandler())
18class HybridEngineAPI:
19 def __init__(
20 self,
21 *,
22 logger: Any | None = None,
23 enable_intent_adaptation: bool = True,
24 knowledge_graph: Any | None = None,
25 min_score: float = 0.0,
26 # Optional components (may be wired by a builder in concrete engines)
27 vector_search_service: Any | None = None,
28 keyword_search_service: Any | None = None,
29 query_processor: Any | None = None,
30 result_combiner: Any | None = None,
31 metadata_extractor: Any | None = None,
32 faceted_search_engine: Any | None = None,
33 intent_classifier: Any | None = None,
34 adaptive_strategy: Any | None = None,
35 ) -> None:
36 # Defer logger setup to central LoggingConfig if not provided
37 if logger is None:
38 try:
39 from ...utils.logging import (
40 LoggingConfig, # Lazy import to avoid cycles
41 )
43 self.logger = LoggingConfig.get_logger(__name__)
44 except Exception:
45 # Fallback to module logger so logs are not silently dropped
46 self.logger = logging.getLogger(__name__)
47 else:
48 self.logger = logger
50 # Core toggles and context
51 self.enable_intent_adaptation = enable_intent_adaptation
52 self.knowledge_graph = knowledge_graph
53 self.min_score = min_score
55 # Optional components used by helper wrappers
56 self.vector_search_service = vector_search_service
57 self.keyword_search_service = keyword_search_service
58 self.query_processor = query_processor
59 self.result_combiner = result_combiner
60 self.metadata_extractor = metadata_extractor
61 self.faceted_search_engine = faceted_search_engine
62 self.intent_classifier = intent_classifier
63 self.adaptive_strategy = adaptive_strategy
64 # Frequently wired later by concrete engines/builders
65 self.hybrid_pipeline = None
66 self.topic_chain_generator = None
67 self.processing_config = None
68 self._planner = None
69 self._orchestrator = None
71 async def search(
72 self,
73 query: str,
74 limit: int = 5,
75 source_types: list[str] | None = None,
76 project_ids: list[str] | None = None,
77 *,
78 session_context: dict[str, Any] | None = None,
79 behavioral_context: list[str] | None = None,
80 ) -> list[HybridSearchResult]:
81 from .orchestration.search import run_search
83 self.logger.debug(
84 f"Starting hybrid search query={query} limit={limit} source_types={source_types} project_ids={project_ids} intent_adaptation_enabled={self.enable_intent_adaptation}"
85 )
86 return await run_search(
87 self,
88 query=query,
89 limit=limit,
90 source_types=source_types,
91 project_ids=project_ids,
92 session_context=session_context,
93 behavioral_context=behavioral_context,
94 )
96 # Topic Search Chain
97 async def generate_topic_search_chain(
98 self,
99 query: str,
100 strategy: ChainStrategy | None = None,
101 max_links: int = 5,
102 initialize_from_search: bool = True,
103 ) -> TopicSearchChain:
104 from .orchestration.topic_chain import generate_topic_search_chain as _gen
106 if strategy is None:
107 from ..enhanced.topic_search_chain import ChainStrategy as _CS
109 strategy = _CS.MIXED_EXPLORATION
110 return await _gen(
111 self,
112 query=query,
113 strategy=strategy,
114 max_links=max_links,
115 initialize_from_search=initialize_from_search,
116 )
118 async def execute_topic_chain_search(
119 self,
120 topic_chain: TopicSearchChain,
121 results_per_link: int = 3,
122 source_types: list[str] | None = None,
123 project_ids: list[str] | None = None,
124 ) -> dict[str, list[HybridSearchResult]]:
125 from .orchestration.topic_chain import execute_topic_chain_search as _exec
127 return await _exec(
128 self,
129 topic_chain=topic_chain,
130 results_per_link=results_per_link,
131 source_types=source_types,
132 project_ids=project_ids,
133 )
135 async def _initialize_topic_relationships(self, sample_query: str) -> None:
136 from .orchestration.topic_chain import _initialize_topic_relationships as _init
138 await _init(self, sample_query)
140 # Topic chain initialization state accessor to avoid private attribute access
141 @property
142 def is_topic_chains_initialized(self) -> bool:
143 """Public read-only accessor for topic chains initialization state."""
144 return getattr(self, "_topic_chains_initialized", False)
146 def mark_topic_chains_initialized(self) -> None:
147 """Mark topic chain relationships as initialized via public API."""
148 self._topic_chains_initialized = True
150 def set_topic_chains_initialized(self, initialized: bool) -> None:
151 """Explicitly set topic chain initialization state via public API."""
152 self._topic_chains_initialized = bool(initialized)
154 # Faceted Search
155 async def search_with_facets(
156 self,
157 query: str,
158 limit: int = 5,
159 source_types: list[str] | None = None,
160 project_ids: list[str] | None = None,
161 facet_filters: list[FacetFilter] | None = None,
162 generate_facets: bool = True,
163 session_context: dict[str, Any] | None = None,
164 behavioral_context: list[str] | None = None,
165 ) -> FacetedSearchResults:
166 from .orchestration.facets import search_with_facets as _search_with_facets
168 return await _search_with_facets(
169 self,
170 query=query,
171 limit=limit,
172 source_types=source_types,
173 project_ids=project_ids,
174 facet_filters=facet_filters,
175 generate_facets=generate_facets,
176 session_context=session_context,
177 behavioral_context=behavioral_context,
178 )
180 # CDI
181 async def analyze_document_relationships(
182 self, documents: list[HybridSearchResult]
183 ) -> dict[str, Any]:
184 from .orchestration.cdi import analyze_document_relationships as _analyze
186 return await _analyze(self, documents)
188 async def find_similar_documents(
189 self,
190 target_document: HybridSearchResult,
191 documents: list[HybridSearchResult],
192 similarity_metrics: list[SimilarityMetric] | None = None,
193 max_similar: int = 5,
194 similarity_threshold: float = 0.7,
195 ) -> list[dict[str, Any]]:
196 """
197 Identify documents in a collection that are most similar to a target document.
199 Parameters:
200 target_document (HybridSearchResult): The document to compare others against.
201 documents (list[HybridSearchResult]): Candidate documents to evaluate for similarity.
202 similarity_metrics (list[SimilarityMetric] | None): Metrics to use when computing similarity; if omitted, defaults are applied.
203 max_similar (int): Maximum number of similar documents to return.
204 similarity_threshold (float): Minimum similarity score (0.0–1.0) required for a document to be included.
206 Returns:
207 list[dict[str, Any]]: A list of similarity records for matching documents (up to `max_similar`), each containing at least the document reference and its similarity score.
208 """
209 from .orchestration.cdi import find_similar_documents as _find
211 return await _find(
212 self,
213 target_document=target_document,
214 documents=documents,
215 similarity_metrics=similarity_metrics,
216 max_similar=max_similar,
217 similarity_threshold=similarity_threshold,
218 )
220 async def detect_document_conflicts(
221 self, documents: list[HybridSearchResult]
222 ) -> dict[str, Any]:
223 """
224 Detect conflicts among the provided documents.
226 Parameters:
227 documents (list[HybridSearchResult]): Documents to analyze for conflicting content or metadata.
229 Returns:
230 dict[str, Any]: Analysis results mapping conflict categories or identifiers to details such as affected document IDs, conflicting fields, and confidence scores.
231 """
232 from .orchestration.cdi import detect_document_conflicts as _detect
234 return await _detect(self, documents)
236 async def find_complementary_content(
237 self,
238 target_document: HybridSearchResult,
239 documents: list[HybridSearchResult],
240 max_recommendations: int = 5,
241 ) -> list[dict[str, Any]]:
242 from .orchestration.cdi import find_complementary_content as _find_comp
244 return await _find_comp(
245 self,
246 target_document=target_document,
247 documents=documents,
248 max_recommendations=max_recommendations,
249 )
251 # Lookup
252 def _build_document_lookup(
253 self, documents: list[HybridSearchResult], robust: bool = False
254 ) -> dict[str, HybridSearchResult]:
255 from .components.document_lookup import build_document_lookup as _build
257 return _build(documents, robust=robust, logger=self.logger)
259 # Public delegation APIs for clustering helpers
260 def build_document_lookup(
261 self, documents: list[HybridSearchResult], robust: bool = False
262 ) -> dict[str, HybridSearchResult]:
263 """Build a document lookup table using the configured helper.
265 Args:
266 documents: List of search results to index
267 robust: Whether to include additional, sanitized keys for resilience
269 Returns:
270 Mapping from identifier keys to corresponding search results
271 """
272 return self._build_document_lookup(documents, robust=robust)
274 def _find_document_by_id(
275 self, doc_id: str, doc_lookup: dict[str, HybridSearchResult]
276 ) -> HybridSearchResult | None:
277 from .components.document_lookup import find_document_by_id as _find
279 return _find(doc_id, doc_lookup, logger=self.logger)
281 def find_document_by_id(
282 self, doc_id: str, doc_lookup: dict[str, HybridSearchResult]
283 ) -> HybridSearchResult | None:
284 """Find a document by any supported identifier in the lookup map."""
285 return self._find_document_by_id(doc_id, doc_lookup)
287 async def cluster_documents(
288 self,
289 documents: list[HybridSearchResult],
290 strategy: Any | None = None,
291 max_clusters: int = 10,
292 min_cluster_size: int = 2,
293 ) -> dict[str, Any]:
294 from .orchestration.clustering import cluster_documents as _cluster
296 if strategy is None:
297 from ..enhanced.cross_document_intelligence import ClusteringStrategy as _CS
299 strategy = _CS.MIXED_FEATURES
300 return await _cluster(
301 self,
302 documents=documents,
303 strategy=strategy,
304 max_clusters=max_clusters,
305 min_cluster_size=min_cluster_size,
306 )
308 # Cluster quality
309 def _calculate_cluster_quality(
310 self, cluster: Any, cluster_documents: list[HybridSearchResult]
311 ) -> dict[str, Any]:
312 from .components.cluster_quality import calculate_cluster_quality
314 return calculate_cluster_quality(cluster, cluster_documents)
316 def calculate_cluster_quality(
317 self, cluster: Any, cluster_documents: list[HybridSearchResult]
318 ) -> dict[str, Any]:
319 """Calculate quality metrics for a cluster in a stable API."""
320 return self._calculate_cluster_quality(cluster, cluster_documents)
322 def _categorize_cluster_size(self, size: int) -> str:
323 from .components.cluster_quality import categorize_cluster_size
325 return categorize_cluster_size(size)
327 def _estimate_content_similarity(
328 self, documents: list[HybridSearchResult]
329 ) -> float:
330 from .components.cluster_quality import estimate_content_similarity
332 return estimate_content_similarity(documents)
334 def _build_enhanced_metadata(
335 self,
336 clusters: list[Any],
337 documents: list[HybridSearchResult],
338 strategy: Any,
339 processing_time: float,
340 matched_docs: int,
341 requested_docs: int,
342 ) -> dict[str, Any]:
343 from .components.cluster_quality import build_enhanced_metadata
345 return build_enhanced_metadata(
346 clusters, documents, strategy, processing_time, matched_docs, requested_docs
347 )
349 def build_enhanced_metadata(
350 self,
351 clusters: list[Any],
352 documents: list[HybridSearchResult],
353 strategy: Any,
354 processing_time: float,
355 matched_docs: int,
356 requested_docs: int,
357 ) -> dict[str, Any]:
358 """Build comprehensive clustering metadata via public API."""
359 return self._build_enhanced_metadata(
360 clusters,
361 documents,
362 strategy,
363 processing_time,
364 matched_docs,
365 requested_docs,
366 )
368 def _calculate_std(self, values: list[float]) -> float:
369 from .components.cluster_quality import calculate_std
371 return calculate_std(values)
373 def _assess_overall_quality(
374 self, clusters: list[Any], matched_docs: int, requested_docs: int
375 ) -> float:
376 from .components.cluster_quality import assess_overall_quality
378 return assess_overall_quality(clusters, matched_docs, requested_docs)
380 def _generate_clustering_recommendations(
381 self, clusters: list[Any], strategy: Any, matched_docs: int, requested_docs: int
382 ) -> dict[str, Any]:
383 from .components.cluster_quality import generate_clustering_recommendations
385 return generate_clustering_recommendations(
386 clusters, strategy, matched_docs, requested_docs
387 )
389 # Relationships
390 def _analyze_cluster_relationships(
391 self, clusters: list[Any], documents: list[HybridSearchResult]
392 ) -> list[dict[str, Any]]:
393 from .orchestration.relationships import analyze_cluster_relationships as _rel
395 return _rel(self, clusters, documents)
397 def analyze_cluster_relationships(
398 self, clusters: list[Any], documents: list[HybridSearchResult]
399 ) -> list[dict[str, Any]]:
400 """Analyze relationships between clusters in a public API."""
401 return self._analyze_cluster_relationships(clusters, documents)
403 def _analyze_cluster_pair(
404 self, cluster_a: Any, cluster_b: Any, doc_lookup: dict
405 ) -> dict[str, Any] | None:
406 from .orchestration.relationships import analyze_cluster_pair as _pair
408 return _pair(self, cluster_a, cluster_b, doc_lookup)
410 def _analyze_entity_overlap(
411 self, cluster_a: Any, cluster_b: Any
412 ) -> dict[str, Any] | None:
413 from .components.relationships import analyze_entity_overlap
415 return analyze_entity_overlap(cluster_a, cluster_b)
417 def _analyze_topic_overlap(
418 self, cluster_a: Any, cluster_b: Any
419 ) -> dict[str, Any] | None:
420 from .components.relationships import analyze_topic_overlap
422 return analyze_topic_overlap(cluster_a, cluster_b)
424 def _analyze_source_similarity(
425 self, docs_a: list, docs_b: list
426 ) -> dict[str, Any] | None:
427 from .components.relationships import analyze_source_similarity
429 return analyze_source_similarity(docs_a, docs_b)
431 def _analyze_hierarchy_relationship(
432 self, docs_a: list, docs_b: list
433 ) -> dict[str, Any] | None:
434 from .components.relationships import analyze_hierarchy_relationship
436 return analyze_hierarchy_relationship(docs_a, docs_b)
438 def _analyze_content_similarity(
439 self, docs_a: list, docs_b: list
440 ) -> dict[str, Any] | None:
441 from .components.relationships import analyze_content_similarity
443 return analyze_content_similarity(docs_a, docs_b)
445 # Stats and settings
446 def get_adaptive_search_stats(self) -> dict[str, Any]:
447 stats = {
448 "intent_adaptation_enabled": self.enable_intent_adaptation,
449 "has_knowledge_graph": self.knowledge_graph is not None,
450 }
451 if self.enable_intent_adaptation and self.intent_classifier:
452 stats.update(self.intent_classifier.get_cache_stats())
453 if self.adaptive_strategy:
454 stats.update(self.adaptive_strategy.get_strategy_stats())
455 return stats
457 def _build_conflict_settings(
458 self, search_config: Any | None
459 ) -> dict[str, Any] | None:
460 from .components.builder import build_conflict_settings
462 return build_conflict_settings(search_config)
464 # Helper wrappers
465 async def _get_embedding(self, text: str) -> list[float]:
466 if self.vector_search_service is None:
467 raise RuntimeError(
468 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _get_embedding()."
469 )
470 from .components.helpers import get_embedding
472 return await get_embedding(self.vector_search_service, text)
474 async def _expand_query(self, query: str) -> str:
475 if self.query_processor is None:
476 raise RuntimeError(
477 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _expand_query()."
478 )
479 from .components.helpers import expand_query
481 return await expand_query(self.query_processor, query)
483 async def _expand_query_aggressive(self, query: str) -> str:
484 if self.query_processor is None:
485 raise RuntimeError(
486 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _expand_query_aggressive()."
487 )
488 from .components.helpers import expand_query_aggressive
490 return await expand_query_aggressive(self.query_processor, query)
492 def _analyze_query(self, query: str) -> dict[str, Any]:
493 if self.query_processor is None:
494 raise RuntimeError(
495 "Query processor is not configured. Provide 'query_processor' to HybridEngineAPI or wire it via your engine builder before calling _analyze_query()."
496 )
497 from .components.helpers import analyze_query
499 return analyze_query(self.query_processor, query)
501 async def _vector_search(
502 self, query: str, limit: int, project_ids: list[str] | None = None
503 ) -> list[dict[str, Any]]:
504 if self.vector_search_service is None:
505 raise RuntimeError(
506 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _vector_search()."
507 )
508 from .components.helpers import vector_search
510 return await vector_search(
511 self.vector_search_service, query, limit, project_ids
512 )
514 async def _keyword_search(
515 self, query: str, limit: int, project_ids: list[str] | None = None
516 ) -> list[dict[str, Any]]:
517 if self.keyword_search_service is None:
518 raise RuntimeError(
519 "Keyword search service is not configured. Provide 'keyword_search_service' to HybridEngineAPI or wire it via your engine builder before calling _keyword_search()."
520 )
521 from .components.helpers import keyword_search
523 return await keyword_search(
524 self.keyword_search_service, query, limit, project_ids
525 )
527 async def _combine_results(
528 self,
529 vector_results: list[dict[str, Any]],
530 keyword_results: list[dict[str, Any]],
531 query_context: dict[str, Any],
532 limit: int,
533 source_types: list[str] | None = None,
534 project_ids: list[str] | None = None,
535 ) -> list[HybridSearchResult]:
536 if self.result_combiner is None:
537 raise RuntimeError(
538 "Result combiner is not configured. Provide 'result_combiner' to HybridEngineAPI or wire it via your engine builder before calling _combine_results()."
539 )
540 from .components.helpers import combine_results
542 return await combine_results(
543 self.result_combiner,
544 self.min_score,
545 vector_results,
546 keyword_results,
547 query_context,
548 limit,
549 source_types,
550 project_ids,
551 )
553 def _extract_metadata_info(self, metadata: dict) -> dict:
554 if self.metadata_extractor is None:
555 raise RuntimeError(
556 "Metadata extractor is not configured. Provide 'metadata_extractor' to HybridEngineAPI or wire it via your engine builder before calling _extract_metadata_info()."
557 )
558 from .components.metadata import extract_metadata_info
560 return extract_metadata_info(self.metadata_extractor, metadata)
562 def _extract_project_info(self, metadata: dict) -> dict:
563 if self.metadata_extractor is None:
564 raise RuntimeError(
565 "Metadata extractor is not configured. Provide 'metadata_extractor' to HybridEngineAPI or wire it via your engine builder before calling _extract_project_info()."
566 )
567 from .components.metadata import extract_project_info
569 return extract_project_info(self.metadata_extractor, metadata)
571 def _build_filter(self, project_ids: list[str] | None = None) -> Any:
572 if self.vector_search_service is None:
573 raise RuntimeError(
574 "Vector search service is not configured. Provide 'vector_search_service' to HybridEngineAPI or wire it via your engine builder before calling _build_filter()."
575 )
576 from .components.helpers import build_filter
578 return build_filter(self.vector_search_service, project_ids)
580 def suggest_facet_refinements(
581 self,
582 current_results: list[HybridSearchResult],
583 current_filters: list[FacetFilter],
584 ) -> list[dict[str, Any]]:
585 if self.faceted_search_engine is None:
586 raise RuntimeError(
587 "Faceted search engine is not configured. Provide 'faceted_search_engine' to HybridEngineAPI or wire it via your engine builder before calling suggest_facet_refinements()."
588 )
589 from .components.facets import suggest_refinements as _suggest
591 return _suggest(self.faceted_search_engine, current_results, current_filters)
593 def generate_facets(self, results: list[HybridSearchResult]) -> list:
594 if self.faceted_search_engine is None:
595 raise RuntimeError(
596 "Faceted search engine is not configured. Provide 'faceted_search_engine' to HybridEngineAPI or wire it via your engine builder before calling generate_facets()."
597 )
598 from .components.facets import generate_facets as _generate
600 return _generate(self.faceted_search_engine, results)