Coverage for src/qdrant_loader_mcp_server/search/hybrid/pipeline.py: 94%
35 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3import asyncio
4from dataclasses import dataclass
6from ..components.search_result_models import HybridSearchResult
7from .components.boosting import ResultBooster
8from .components.deduplication import ResultDeduplicator
9from .components.normalization import ScoreNormalizer
10from .interfaces import KeywordSearcher, Reranker, ResultCombinerLike, VectorSearcher
13@dataclass
14class HybridPipeline:
15 vector_searcher: VectorSearcher
16 keyword_searcher: KeywordSearcher
17 result_combiner: ResultCombinerLike
18 reranker: Reranker | None = None
19 booster: ResultBooster | None = None
20 normalizer: ScoreNormalizer | None = None
21 deduplicator: ResultDeduplicator | None = None
23 async def run(
24 self,
25 query: str,
26 limit: int,
27 query_context: dict,
28 source_types: list[str] | None,
29 project_ids: list[str] | None,
30 *,
31 vector_query: str | None = None,
32 keyword_query: str | None = None,
33 ) -> list[HybridSearchResult]:
34 effective_vector_query = vector_query if vector_query is not None else query
35 effective_keyword_query = keyword_query if keyword_query is not None else query
36 vector_results, keyword_results = await asyncio.gather(
37 self.vector_searcher.search(effective_vector_query, limit * 3, project_ids),
38 self.keyword_searcher.search(
39 effective_keyword_query, limit * 3, project_ids
40 ),
41 )
42 results = await self.result_combiner.combine_results(
43 vector_results,
44 keyword_results,
45 query_context,
46 limit,
47 source_types,
48 project_ids,
49 )
50 # Optional post-processing hooks (disabled by default; no behavior change)
51 if self.booster is not None:
52 results = self.booster.apply(results)
53 if self.normalizer is not None:
54 # Normalize score values in-place using current scores
55 normalized = self.normalizer.scale([r.score for r in results])
56 if len(normalized) != len(results):
57 raise ValueError(
58 f"Normalizer returned {len(normalized)} values for {len(results)} results"
59 )
60 for r, v in zip(results, normalized, strict=False):
61 r.score = v
62 if self.deduplicator is not None:
63 results = self.deduplicator.deduplicate(results)
64 if self.reranker is not None:
65 return self.reranker.rerank(results)
66 return results