Coverage for src / qdrant_loader_mcp_server / search / hybrid / pipeline.py: 94%

35 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1""" 

2Hybrid search pipeline. 

3""" 

4 

5from __future__ import annotations 

6 

7import asyncio 

8from dataclasses import dataclass 

9 

10from ..components.search_result_models import HybridSearchResult 

11from .components.boosting import ResultBooster 

12from .components.deduplication import ResultDeduplicator 

13from .components.normalization import ScoreNormalizer 

14from .interfaces import KeywordSearcher, Reranker, ResultCombinerLike, VectorSearcher 

15 

16 

17@dataclass 

18class HybridPipeline: 

19 """Hybrid search pipeline. 

20 

21 Args: 

22 vector_searcher: Vector searcher 

23 keyword_searcher: Keyword searcher 

24 result_combiner: Result combiner 

25 reranker: Reranker 

26 booster: Result booster 

27 normalizer: Score normalizer 

28 deduplicator: Result deduplicator 

29 """ 

30 

31 vector_searcher: VectorSearcher 

32 keyword_searcher: KeywordSearcher 

33 result_combiner: ResultCombinerLike 

34 reranker: Reranker | None = None 

35 booster: ResultBooster | None = None 

36 normalizer: ScoreNormalizer | None = None 

37 deduplicator: ResultDeduplicator | None = None 

38 

39 async def run( 

40 self, 

41 query: str, 

42 limit: int, 

43 query_context: dict, 

44 source_types: list[str] | None, 

45 project_ids: list[str] | None, 

46 *, 

47 vector_query: str | None = None, 

48 keyword_query: str | None = None, 

49 ) -> list[HybridSearchResult]: 

50 """ 

51 Run the hybrid search pipeline. 

52 """ 

53 effective_vector_query = vector_query if vector_query is not None else query 

54 effective_keyword_query = keyword_query if keyword_query is not None else query 

55 vector_results, keyword_results = await asyncio.gather( 

56 self.vector_searcher.search(effective_vector_query, limit * 3, project_ids), 

57 self.keyword_searcher.search( 

58 effective_keyword_query, limit * 3, project_ids 

59 ), 

60 ) 

61 results = await self.result_combiner.combine_results( 

62 vector_results, 

63 keyword_results, 

64 query_context, 

65 limit, 

66 source_types, 

67 project_ids, 

68 ) 

69 # Optional post-processing hooks (disabled by default; no behavior change) 

70 if self.booster is not None: 

71 results = self.booster.apply(results) 

72 if self.normalizer is not None: 

73 # Normalize score values in-place using current scores 

74 normalized = self.normalizer.scale([r.score for r in results]) 

75 if len(normalized) != len(results): 

76 raise ValueError( 

77 f"Normalizer returned {len(normalized)} values for {len(results)} results" 

78 ) 

79 for r, v in zip(results, normalized, strict=False): 

80 r.score = v 

81 if self.deduplicator is not None: 

82 results = self.deduplicator.deduplicate(results) 

83 if self.reranker is not None: 

84 return self.reranker.rerank(results=results, query=query) 

85 return results