Coverage for src/qdrant_loader_mcp_server/search/hybrid/pipeline.py: 94%

35 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3import asyncio 

4from dataclasses import dataclass 

5 

6from ..components.search_result_models import HybridSearchResult 

7from .components.boosting import ResultBooster 

8from .components.deduplication import ResultDeduplicator 

9from .components.normalization import ScoreNormalizer 

10from .interfaces import KeywordSearcher, Reranker, ResultCombinerLike, VectorSearcher 

11 

12 

13@dataclass 

14class HybridPipeline: 

15 vector_searcher: VectorSearcher 

16 keyword_searcher: KeywordSearcher 

17 result_combiner: ResultCombinerLike 

18 reranker: Reranker | None = None 

19 booster: ResultBooster | None = None 

20 normalizer: ScoreNormalizer | None = None 

21 deduplicator: ResultDeduplicator | None = None 

22 

23 async def run( 

24 self, 

25 query: str, 

26 limit: int, 

27 query_context: dict, 

28 source_types: list[str] | None, 

29 project_ids: list[str] | None, 

30 *, 

31 vector_query: str | None = None, 

32 keyword_query: str | None = None, 

33 ) -> list[HybridSearchResult]: 

34 effective_vector_query = vector_query if vector_query is not None else query 

35 effective_keyword_query = keyword_query if keyword_query is not None else query 

36 vector_results, keyword_results = await asyncio.gather( 

37 self.vector_searcher.search(effective_vector_query, limit * 3, project_ids), 

38 self.keyword_searcher.search( 

39 effective_keyword_query, limit * 3, project_ids 

40 ), 

41 ) 

42 results = await self.result_combiner.combine_results( 

43 vector_results, 

44 keyword_results, 

45 query_context, 

46 limit, 

47 source_types, 

48 project_ids, 

49 ) 

50 # Optional post-processing hooks (disabled by default; no behavior change) 

51 if self.booster is not None: 

52 results = self.booster.apply(results) 

53 if self.normalizer is not None: 

54 # Normalize score values in-place using current scores 

55 normalized = self.normalizer.scale([r.score for r in results]) 

56 if len(normalized) != len(results): 

57 raise ValueError( 

58 f"Normalizer returned {len(normalized)} values for {len(results)} results" 

59 ) 

60 for r, v in zip(results, normalized, strict=False): 

61 r.score = v 

62 if self.deduplicator is not None: 

63 results = self.deduplicator.deduplicate(results) 

64 if self.reranker is not None: 

65 return self.reranker.rerank(results) 

66 return results