Coverage for src / qdrant_loader_mcp_server / search / hybrid / pipeline.py: 92%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1""" 

2Hybrid search pipeline. 

3""" 

4 

5from __future__ import annotations 

6 

7import asyncio 

8import inspect 

9from dataclasses import dataclass 

10 

11from ..components.search_result_models import HybridSearchResult 

12from .components.boosting import ResultBooster 

13from .components.deduplication import ResultDeduplicator 

14from .components.normalization import ScoreNormalizer 

15from .interfaces import KeywordSearcher, Reranker, ResultCombinerLike, VectorSearcher 

16 

17 

18async def _supports_qdrant_hybrid(vector_searcher: VectorSearcher) -> bool: 

19 """Best-effort probe for whether the vector searcher will fuse dense+sparse server-side. 

20 

21 Tolerant of both sync and async implementations and of searchers that don't 

22 expose the hook at all (legacy / test doubles). 

23 """ 

24 probe = getattr(vector_searcher, "supports_qdrant_hybrid", None) 

25 if not callable(probe): 

26 return False 

27 try: 

28 result = probe() 

29 if inspect.isawaitable(result): 

30 result = await result 

31 return bool(result) 

32 except Exception: 

33 return False 

34 

35 

36@dataclass 

37class HybridPipeline: 

38 """Hybrid search pipeline. 

39 

40 Args: 

41 vector_searcher: Vector searcher 

42 keyword_searcher: Keyword searcher 

43 result_combiner: Result combiner 

44 reranker: Reranker 

45 booster: Result booster 

46 normalizer: Score normalizer 

47 deduplicator: Result deduplicator 

48 """ 

49 

50 vector_searcher: VectorSearcher 

51 keyword_searcher: KeywordSearcher 

52 result_combiner: ResultCombinerLike 

53 reranker: Reranker | None = None 

54 booster: ResultBooster | None = None 

55 normalizer: ScoreNormalizer | None = None 

56 deduplicator: ResultDeduplicator | None = None 

57 

58 async def run( 

59 self, 

60 query: str, 

61 limit: int, 

62 query_context: dict, 

63 source_types: list[str] | None, 

64 project_ids: list[str] | None, 

65 *, 

66 vector_query: str | None = None, 

67 keyword_query: str | None = None, 

68 ) -> list[HybridSearchResult]: 

69 """ 

70 Run the hybrid search pipeline. 

71 """ 

72 effective_vector_query = vector_query if vector_query is not None else query 

73 effective_keyword_query = keyword_query if keyword_query is not None else query 

74 

75 # Pre-flight: if the vector searcher will use Qdrant fusion (dense+sparse 

76 # already covered server-side), skip the keyword path entirely. Otherwise 

77 # run both branches in parallel — issuing them sequentially would double 

78 # query latency in the dense-only path. 

79 will_use_qdrant_hybrid = await _supports_qdrant_hybrid(self.vector_searcher) 

80 

81 if will_use_qdrant_hybrid: 

82 vector_results = await self.vector_searcher.search( 

83 effective_vector_query, limit * 3, project_ids 

84 ) 

85 keyword_results = [] 

86 else: 

87 vector_results, keyword_results = await asyncio.gather( 

88 self.vector_searcher.search( 

89 effective_vector_query, limit * 3, project_ids 

90 ), 

91 self.keyword_searcher.search( 

92 effective_keyword_query, limit * 3, project_ids 

93 ), 

94 ) 

95 results = await self.result_combiner.combine_results( 

96 vector_results, 

97 keyword_results, 

98 query_context, 

99 limit, 

100 source_types, 

101 project_ids, 

102 ) 

103 # Optional post-processing hooks (disabled by default; no behavior change) 

104 if self.booster is not None: 

105 results = self.booster.apply(results) 

106 if self.normalizer is not None: 

107 # Normalize score values in-place using current scores 

108 normalized = self.normalizer.scale([r.score for r in results]) 

109 if len(normalized) != len(results): 

110 raise ValueError( 

111 f"Normalizer returned {len(normalized)} values for {len(results)} results" 

112 ) 

113 for r, v in zip(results, normalized, strict=False): 

114 r.score = v 

115 if self.deduplicator is not None: 

116 results = self.deduplicator.deduplicate(results) 

117 if self.reranker is not None: 

118 return self.reranker.rerank(results=results, query=query) 

119 return results