Coverage for src / qdrant_loader_mcp_server / search / hybrid / pipeline.py: 92%
51 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""
2Hybrid search pipeline.
3"""
5from __future__ import annotations
7import asyncio
8import inspect
9from dataclasses import dataclass
11from ..components.search_result_models import HybridSearchResult
12from .components.boosting import ResultBooster
13from .components.deduplication import ResultDeduplicator
14from .components.normalization import ScoreNormalizer
15from .interfaces import KeywordSearcher, Reranker, ResultCombinerLike, VectorSearcher
18async def _supports_qdrant_hybrid(vector_searcher: VectorSearcher) -> bool:
19 """Best-effort probe for whether the vector searcher will fuse dense+sparse server-side.
21 Tolerant of both sync and async implementations and of searchers that don't
22 expose the hook at all (legacy / test doubles).
23 """
24 probe = getattr(vector_searcher, "supports_qdrant_hybrid", None)
25 if not callable(probe):
26 return False
27 try:
28 result = probe()
29 if inspect.isawaitable(result):
30 result = await result
31 return bool(result)
32 except Exception:
33 return False
36@dataclass
37class HybridPipeline:
38 """Hybrid search pipeline.
40 Args:
41 vector_searcher: Vector searcher
42 keyword_searcher: Keyword searcher
43 result_combiner: Result combiner
44 reranker: Reranker
45 booster: Result booster
46 normalizer: Score normalizer
47 deduplicator: Result deduplicator
48 """
50 vector_searcher: VectorSearcher
51 keyword_searcher: KeywordSearcher
52 result_combiner: ResultCombinerLike
53 reranker: Reranker | None = None
54 booster: ResultBooster | None = None
55 normalizer: ScoreNormalizer | None = None
56 deduplicator: ResultDeduplicator | None = None
58 async def run(
59 self,
60 query: str,
61 limit: int,
62 query_context: dict,
63 source_types: list[str] | None,
64 project_ids: list[str] | None,
65 *,
66 vector_query: str | None = None,
67 keyword_query: str | None = None,
68 ) -> list[HybridSearchResult]:
69 """
70 Run the hybrid search pipeline.
71 """
72 effective_vector_query = vector_query if vector_query is not None else query
73 effective_keyword_query = keyword_query if keyword_query is not None else query
75 # Pre-flight: if the vector searcher will use Qdrant fusion (dense+sparse
76 # already covered server-side), skip the keyword path entirely. Otherwise
77 # run both branches in parallel — issuing them sequentially would double
78 # query latency in the dense-only path.
79 will_use_qdrant_hybrid = await _supports_qdrant_hybrid(self.vector_searcher)
81 if will_use_qdrant_hybrid:
82 vector_results = await self.vector_searcher.search(
83 effective_vector_query, limit * 3, project_ids
84 )
85 keyword_results = []
86 else:
87 vector_results, keyword_results = await asyncio.gather(
88 self.vector_searcher.search(
89 effective_vector_query, limit * 3, project_ids
90 ),
91 self.keyword_searcher.search(
92 effective_keyword_query, limit * 3, project_ids
93 ),
94 )
95 results = await self.result_combiner.combine_results(
96 vector_results,
97 keyword_results,
98 query_context,
99 limit,
100 source_types,
101 project_ids,
102 )
103 # Optional post-processing hooks (disabled by default; no behavior change)
104 if self.booster is not None:
105 results = self.booster.apply(results)
106 if self.normalizer is not None:
107 # Normalize score values in-place using current scores
108 normalized = self.normalizer.scale([r.score for r in results])
109 if len(normalized) != len(results):
110 raise ValueError(
111 f"Normalizer returned {len(normalized)} values for {len(results)} results"
112 )
113 for r, v in zip(results, normalized, strict=False):
114 r.score = v
115 if self.deduplicator is not None:
116 results = self.deduplicator.deduplicate(results)
117 if self.reranker is not None:
118 return self.reranker.rerank(results=results, query=query)
119 return results