Coverage for src / qdrant_loader_mcp_server / search / hybrid / orchestration / search.py: 85%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1from __future__ import annotations 

2 

3import logging 

4from typing import Any 

5 

6from ...components.result_combiner import ResultCombiner 

7from ...components.search_result_models import HybridSearchResult 

8from ..components.helpers import combine_results as _combine_results_helper 

9from ..pipeline import HybridPipeline 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14async def run_search( 

15 engine: Any, 

16 query: str, 

17 limit: int, 

18 source_types: list[str] | None, 

19 project_ids: list[str] | None, 

20 session_context: dict[str, Any] | None, 

21 behavioral_context: list[str] | None, 

22) -> list[HybridSearchResult]: 

23 # Save original combiner values up front for safe restoration 

24 """ 

25 Execute a hybrid search for the given query using the provided engine and return ranked results. 

26 

27 Per-request adjustments (query expansion, intent-adaptive combiner weights, and fetch limits) are applied to a cloned combiner and do not mutate the engine's shared combiner state; the function attempts to restore the engine's result_combiner attributes to their original values and logs any restoration failures without raising. 

28 

29 Parameters: 

30 engine: Search engine instance providing hybrid search, planners, expansion, and orchestration. 

31 query (str): The user query to search for. 

32 limit (int): Maximum number of results to return. 

33 source_types (list[str] | None): Optional list of source types to filter results. 

34 project_ids (list[str] | None): Optional list of project IDs to restrict the search. 

35 session_context (dict[str, Any] | None): Optional session-level context used for intent classification and adaptations. 

36 behavioral_context (list[str] | None): Optional behavioral signals used for intent classification and adaptations. 

37 

38 Returns: 

39 list[HybridSearchResult]: Ranked hybrid search results; length will be at most `limit`. 

40 """ 

41 original_vector_weight = engine.result_combiner.vector_weight 

42 original_keyword_weight = engine.result_combiner.keyword_weight 

43 original_min_score = engine.result_combiner.min_score 

44 

45 combined_results: list[HybridSearchResult] 

46 fetch_limit = limit 

47 

48 try: 

49 # Build a request-scoped combiner clone to avoid mutating shared engine state 

50 base_combiner = engine.result_combiner 

51 local_combiner = ResultCombiner( 

52 vector_weight=getattr(base_combiner, "vector_weight", 0.6), 

53 keyword_weight=getattr(base_combiner, "keyword_weight", 0.3), 

54 metadata_weight=getattr(base_combiner, "metadata_weight", 0.1), 

55 min_score=getattr(base_combiner, "min_score", 0.3), 

56 spacy_analyzer=getattr(base_combiner, "spacy_analyzer", None), 

57 ) 

58 

59 # Intent classification and adaptive adjustments (applied to local combiner only) 

60 search_intent = None 

61 adaptive_config = None 

62 if engine.enable_intent_adaptation and engine.intent_classifier: 

63 search_intent = engine.intent_classifier.classify_intent( 

64 query, session_context, behavioral_context 

65 ) 

66 adaptive_config = engine.adaptive_strategy.adapt_search( 

67 search_intent, query 

68 ) 

69 if adaptive_config: 

70 local_combiner.vector_weight = adaptive_config.vector_weight 

71 local_combiner.keyword_weight = adaptive_config.keyword_weight 

72 local_combiner.min_score = adaptive_config.min_score_threshold 

73 fetch_limit = min(adaptive_config.max_results, limit * 2) 

74 

75 # TODO: Evaluate the expanded_query logic to see it's impacts on vector and keyword searches 

76 expanded_query = await engine._expand_query(query) 

77 if adaptive_config and getattr(adaptive_config, "expand_query", False): 

78 aggressiveness = getattr(adaptive_config, "expansion_aggressiveness", None) 

79 if isinstance(aggressiveness, int | float) and aggressiveness > 0.5: 

80 expanded_query = await engine._expand_query_aggressive(query) 

81 

82 query_context = engine._analyze_query(query) 

83 if search_intent: 

84 query_context["search_intent"] = search_intent 

85 query_context["adaptive_config"] = adaptive_config 

86 

87 plan = engine._planner.make_plan( 

88 has_pipeline=engine.hybrid_pipeline is not None, 

89 expanded_query=expanded_query, 

90 ) 

91 

92 resolved_vector_query = plan.expanded_query 

93 resolved_keyword_query = query 

94 

95 # Ensure combiner threshold honors engine-level minimum when applicable 

96 engine_min_score = getattr(engine, "min_score", None) 

97 if engine_min_score is not None and ( 

98 getattr(local_combiner, "min_score", None) is None 

99 or local_combiner.min_score < engine_min_score 

100 ): 

101 # Use the stricter (higher) engine threshold 

102 local_combiner.min_score = engine_min_score 

103 

104 if plan.use_pipeline and engine.hybrid_pipeline is not None: 

105 hybrid_pipeline: HybridPipeline = engine.hybrid_pipeline 

106 if isinstance(hybrid_pipeline, HybridPipeline): 

107 # Clone pipeline for this request with the local combiner to avoid shared mutation 

108 local_pipeline = HybridPipeline( 

109 vector_searcher=hybrid_pipeline.vector_searcher, 

110 keyword_searcher=hybrid_pipeline.keyword_searcher, 

111 result_combiner=local_combiner, 

112 reranker=hybrid_pipeline.reranker, 

113 booster=hybrid_pipeline.booster, 

114 normalizer=hybrid_pipeline.normalizer, 

115 deduplicator=hybrid_pipeline.deduplicator, 

116 ) 

117 combined_results: list[HybridSearchResult] = ( 

118 await engine._orchestrator.run_pipeline( 

119 local_pipeline, 

120 query=query, 

121 limit=fetch_limit, 

122 query_context=query_context, 

123 source_types=source_types, 

124 project_ids=project_ids, 

125 vector_query=resolved_vector_query, 

126 keyword_query=resolved_keyword_query, 

127 ) 

128 ) 

129 else: 

130 # Custom or mocked pipeline: honor its run override without cloning 

131 combined_results: list[HybridSearchResult] = ( 

132 await engine._orchestrator.run_pipeline( 

133 hybrid_pipeline, 

134 query=query, 

135 limit=fetch_limit, 

136 query_context=query_context, 

137 source_types=source_types, 

138 project_ids=project_ids, 

139 vector_query=resolved_vector_query, 

140 keyword_query=resolved_keyword_query, 

141 ) 

142 ) 

143 else: 

144 vector_results = await engine._vector_search( 

145 expanded_query, fetch_limit * 3, project_ids 

146 ) 

147 keyword_results = await engine._keyword_search( 

148 query, fetch_limit * 3, project_ids 

149 ) 

150 combined_results = await _combine_results_helper( 

151 local_combiner, 

152 getattr(engine, "min_score", 0.0), 

153 vector_results, 

154 keyword_results, 

155 query_context, 

156 fetch_limit, 

157 source_types, 

158 project_ids, 

159 ) 

160 finally: 

161 # Always attempt to restore engine combiner settings to their original values. 

162 # Log any restoration failures with context, without masking original exceptions. 

163 try: 

164 engine_rc = getattr(engine, "result_combiner", None) 

165 if engine_rc is not None: 

166 restorations = [ 

167 ("vector_weight", original_vector_weight), 

168 ("keyword_weight", original_keyword_weight), 

169 ("min_score", original_min_score), 

170 ] 

171 for attr_name, original_value in restorations: 

172 try: 

173 setattr(engine_rc, attr_name, original_value) 

174 except Exception as e: # noqa: F841 - keep behavior 

175 try: 

176 logger.error( 

177 "Failed to restore result_combiner.%s to %r on %s: %s", 

178 attr_name, 

179 original_value, 

180 type(engine_rc).__name__, 

181 e, 

182 exc_info=True, 

183 ) 

184 except Exception: 

185 # Never allow logging to raise 

186 pass 

187 except Exception: 

188 # Never raise from restoration; preserve original exception flow 

189 try: 

190 logger.error( 

191 "Unexpected error during result_combiner restoration on %s", 

192 type(getattr(engine, "result_combiner", object())).__name__, 

193 exc_info=True, 

194 ) 

195 except Exception: 

196 pass 

197 

198 return combined_results[:limit]