Coverage for src / qdrant_loader_mcp_server / search / hybrid / orchestration / search.py: 100%

52 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:41 +0000

1from __future__ import annotations 

2 

3import logging 

4from typing import Any 

5 

6from ...components.result_combiner import ResultCombiner 

7from ...components.search_result_models import HybridSearchResult 

8from ..components.helpers import combine_results as _combine_results_helper 

9from ..pipeline import HybridPipeline 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14async def run_search( 

15 engine: Any, 

16 query: str, 

17 limit: int, 

18 source_types: list[str] | None, 

19 project_ids: list[str] | None, 

20 session_context: dict[str, Any] | None, 

21 behavioral_context: list[str] | None, 

22) -> list[HybridSearchResult]: 

23 """ 

24 Execute a hybrid search for the given query using the provided engine and return ranked results. 

25 

26 Per-request adjustments (query expansion, intent-adaptive combiner weights, and fetch limits) 

27 are applied to a request-scoped combiner clone and never mutate the engine's shared state. 

28 

29 Parameters: 

30 engine: Search engine instance providing hybrid search, planners, expansion, and orchestration. 

31 query (str): The user query to search for. 

32 limit (int): Maximum number of results to return. 

33 source_types (list[str] | None): Optional list of source types to filter results. 

34 project_ids (list[str] | None): Optional list of project IDs to restrict the search. 

35 session_context (dict[str, Any] | None): Optional session-level context used for intent classification and adaptations. 

36 behavioral_context (list[str] | None): Optional behavioral signals used for intent classification and adaptations. 

37 

38 Returns: 

39 list[HybridSearchResult]: Ranked hybrid search results; length will be at most `limit`. 

40 """ 

41 combined_results: list[HybridSearchResult] 

42 fetch_limit = limit 

43 

44 # Resolve request-scoped weights first so the combiner's internal scorer 

45 # is built with the final values. 

46 base_combiner = engine.result_combiner 

47 vector_weight = getattr(base_combiner, "vector_weight", 0.6) 

48 keyword_weight = getattr(base_combiner, "keyword_weight", 0.3) 

49 metadata_weight = getattr(base_combiner, "metadata_weight", 0.1) 

50 min_score = getattr(base_combiner, "min_score", 0.3) 

51 spacy_analyzer = getattr(base_combiner, "spacy_analyzer", None) 

52 

53 # Intent classification and adaptive adjustments (applied to local combiner only) 

54 search_intent = None 

55 adaptive_config = None 

56 if engine.enable_intent_adaptation and engine.intent_classifier: 

57 search_intent = engine.intent_classifier.classify_intent( 

58 query, session_context, behavioral_context 

59 ) 

60 adaptive_config = engine.adaptive_strategy.adapt_search(search_intent, query) 

61 if adaptive_config: 

62 vector_weight = adaptive_config.vector_weight 

63 keyword_weight = adaptive_config.keyword_weight 

64 min_score = adaptive_config.min_score_threshold 

65 fetch_limit = min(adaptive_config.max_results, limit * 2) 

66 

67 local_combiner = ResultCombiner( 

68 vector_weight=vector_weight, 

69 keyword_weight=keyword_weight, 

70 metadata_weight=metadata_weight, 

71 min_score=min_score, 

72 spacy_analyzer=spacy_analyzer, 

73 ) 

74 

75 # TODO: Evaluate the expanded_query logic to see it's impacts on vector and keyword searches 

76 expanded_query = await engine._expand_query(query) 

77 if adaptive_config and getattr(adaptive_config, "expand_query", False): 

78 aggressiveness = getattr(adaptive_config, "expansion_aggressiveness", None) 

79 if isinstance(aggressiveness, int | float) and aggressiveness > 0.5: 

80 expanded_query = await engine._expand_query_aggressive(query) 

81 

82 query_context = engine._analyze_query(query) 

83 if search_intent: 

84 query_context["search_intent"] = search_intent 

85 query_context["adaptive_config"] = adaptive_config 

86 

87 plan = engine._planner.make_plan( 

88 has_pipeline=engine.hybrid_pipeline is not None, 

89 expanded_query=expanded_query, 

90 ) 

91 

92 resolved_vector_query = plan.expanded_query 

93 resolved_keyword_query = query 

94 

95 # Ensure combiner threshold honors engine-level minimum when applicable 

96 engine_min_score = getattr(engine, "min_score", None) 

97 if engine_min_score is not None and ( 

98 getattr(local_combiner, "min_score", None) is None 

99 or local_combiner.min_score < engine_min_score 

100 ): 

101 # Use the stricter (higher) engine threshold 

102 local_combiner.min_score = engine_min_score 

103 

104 if plan.use_pipeline and engine.hybrid_pipeline is not None: 

105 hybrid_pipeline: HybridPipeline = engine.hybrid_pipeline 

106 if isinstance(hybrid_pipeline, HybridPipeline): 

107 # Clone pipeline for this request with the local combiner to avoid shared mutation 

108 local_pipeline = HybridPipeline( 

109 vector_searcher=hybrid_pipeline.vector_searcher, 

110 keyword_searcher=hybrid_pipeline.keyword_searcher, 

111 result_combiner=local_combiner, 

112 reranker=hybrid_pipeline.reranker, 

113 booster=hybrid_pipeline.booster, 

114 normalizer=hybrid_pipeline.normalizer, 

115 deduplicator=hybrid_pipeline.deduplicator, 

116 ) 

117 combined_results = await engine._orchestrator.run_pipeline( 

118 local_pipeline, 

119 query=query, 

120 limit=fetch_limit, 

121 query_context=query_context, 

122 source_types=source_types, 

123 project_ids=project_ids, 

124 vector_query=resolved_vector_query, 

125 keyword_query=resolved_keyword_query, 

126 ) 

127 else: 

128 # Custom or mocked pipeline: honor its run override without cloning 

129 combined_results = await engine._orchestrator.run_pipeline( 

130 hybrid_pipeline, 

131 query=query, 

132 limit=fetch_limit, 

133 query_context=query_context, 

134 source_types=source_types, 

135 project_ids=project_ids, 

136 vector_query=resolved_vector_query, 

137 keyword_query=resolved_keyword_query, 

138 ) 

139 else: 

140 vector_results = await engine._vector_search( 

141 expanded_query, fetch_limit * 3, project_ids 

142 ) 

143 keyword_results = await engine._keyword_search( 

144 query, fetch_limit * 3, project_ids 

145 ) 

146 combined_results = await _combine_results_helper( 

147 local_combiner, 

148 getattr(engine, "min_score", 0.0), 

149 vector_results, 

150 keyword_results, 

151 query_context, 

152 fetch_limit, 

153 source_types, 

154 project_ids, 

155 ) 

156 

157 return combined_results[:limit]