Coverage for src / qdrant_loader_mcp_server / search / hybrid / pipeline.py: 94%
35 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""
2Hybrid search pipeline.
3"""
5from __future__ import annotations
7import asyncio
8from dataclasses import dataclass
10from ..components.search_result_models import HybridSearchResult
11from .components.boosting import ResultBooster
12from .components.deduplication import ResultDeduplicator
13from .components.normalization import ScoreNormalizer
14from .interfaces import KeywordSearcher, Reranker, ResultCombinerLike, VectorSearcher
17@dataclass
18class HybridPipeline:
19 """Hybrid search pipeline.
21 Args:
22 vector_searcher: Vector searcher
23 keyword_searcher: Keyword searcher
24 result_combiner: Result combiner
25 reranker: Reranker
26 booster: Result booster
27 normalizer: Score normalizer
28 deduplicator: Result deduplicator
29 """
31 vector_searcher: VectorSearcher
32 keyword_searcher: KeywordSearcher
33 result_combiner: ResultCombinerLike
34 reranker: Reranker | None = None
35 booster: ResultBooster | None = None
36 normalizer: ScoreNormalizer | None = None
37 deduplicator: ResultDeduplicator | None = None
39 async def run(
40 self,
41 query: str,
42 limit: int,
43 query_context: dict,
44 source_types: list[str] | None,
45 project_ids: list[str] | None,
46 *,
47 vector_query: str | None = None,
48 keyword_query: str | None = None,
49 ) -> list[HybridSearchResult]:
50 """
51 Run the hybrid search pipeline.
52 """
53 effective_vector_query = vector_query if vector_query is not None else query
54 effective_keyword_query = keyword_query if keyword_query is not None else query
55 vector_results, keyword_results = await asyncio.gather(
56 self.vector_searcher.search(effective_vector_query, limit * 3, project_ids),
57 self.keyword_searcher.search(
58 effective_keyword_query, limit * 3, project_ids
59 ),
60 )
61 results = await self.result_combiner.combine_results(
62 vector_results,
63 keyword_results,
64 query_context,
65 limit,
66 source_types,
67 project_ids,
68 )
69 # Optional post-processing hooks (disabled by default; no behavior change)
70 if self.booster is not None:
71 results = self.booster.apply(results)
72 if self.normalizer is not None:
73 # Normalize score values in-place using current scores
74 normalized = self.normalizer.scale([r.score for r in results])
75 if len(normalized) != len(results):
76 raise ValueError(
77 f"Normalizer returned {len(normalized)} values for {len(results)} results"
78 )
79 for r, v in zip(results, normalized, strict=False):
80 r.score = v
81 if self.deduplicator is not None:
82 results = self.deduplicator.deduplicate(results)
83 if self.reranker is not None:
84 return self.reranker.rerank(results=results, query=query)
85 return results