Coverage for src/qdrant_loader_mcp_server/search/hybrid/orchestration/search.py: 85%

65 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3import logging 

4from typing import Any 

5 

6from ...components.result_combiner import ResultCombiner 

7from ...components.search_result_models import HybridSearchResult 

8from ..components.helpers import combine_results as _combine_results_helper 

9from ..pipeline import HybridPipeline 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14async def run_search( 

15 engine: Any, 

16 query: str, 

17 limit: int, 

18 source_types: list[str] | None, 

19 project_ids: list[str] | None, 

20 session_context: dict[str, Any] | None, 

21 behavioral_context: list[str] | None, 

22) -> list[HybridSearchResult]: 

23 # Save original combiner values up front for safe restoration 

24 original_vector_weight = engine.result_combiner.vector_weight 

25 original_keyword_weight = engine.result_combiner.keyword_weight 

26 original_min_score = engine.result_combiner.min_score 

27 

28 combined_results: list[HybridSearchResult] 

29 

30 try: 

31 # Build a request-scoped combiner clone to avoid mutating shared engine state 

32 base_combiner = engine.result_combiner 

33 local_combiner = ResultCombiner( 

34 vector_weight=getattr(base_combiner, "vector_weight", 0.6), 

35 keyword_weight=getattr(base_combiner, "keyword_weight", 0.3), 

36 metadata_weight=getattr(base_combiner, "metadata_weight", 0.1), 

37 min_score=getattr(base_combiner, "min_score", 0.3), 

38 spacy_analyzer=getattr(base_combiner, "spacy_analyzer", None), 

39 ) 

40 

41 # Intent classification and adaptive adjustments (applied to local combiner only) 

42 search_intent = None 

43 adaptive_config = None 

44 if engine.enable_intent_adaptation and engine.intent_classifier: 

45 search_intent = engine.intent_classifier.classify_intent( 

46 query, session_context, behavioral_context 

47 ) 

48 adaptive_config = engine.adaptive_strategy.adapt_search( 

49 search_intent, query 

50 ) 

51 if adaptive_config: 

52 local_combiner.vector_weight = adaptive_config.vector_weight 

53 local_combiner.keyword_weight = adaptive_config.keyword_weight 

54 local_combiner.min_score = adaptive_config.min_score_threshold 

55 limit = min(adaptive_config.max_results, limit * 2) 

56 

57 expanded_query = await engine._expand_query(query) 

58 if adaptive_config and getattr(adaptive_config, "expand_query", False): 

59 aggressiveness = getattr(adaptive_config, "expansion_aggressiveness", None) 

60 if isinstance(aggressiveness, int | float) and aggressiveness > 0.5: 

61 expanded_query = await engine._expand_query_aggressive(query) 

62 

63 query_context = engine._analyze_query(query) 

64 if search_intent: 

65 query_context["search_intent"] = search_intent 

66 query_context["adaptive_config"] = adaptive_config 

67 

68 plan = engine._planner.make_plan( 

69 has_pipeline=engine.hybrid_pipeline is not None, 

70 expanded_query=expanded_query, 

71 ) 

72 

73 # Ensure combiner threshold honors engine-level minimum when applicable 

74 engine_min_score = getattr(engine, "min_score", None) 

75 if engine_min_score is not None and ( 

76 getattr(local_combiner, "min_score", None) is None 

77 or local_combiner.min_score < engine_min_score 

78 ): 

79 # Use the stricter (higher) engine threshold 

80 local_combiner.min_score = engine_min_score 

81 

82 if plan.use_pipeline and engine.hybrid_pipeline is not None: 

83 p = engine.hybrid_pipeline 

84 if isinstance(p, HybridPipeline): 

85 # Clone pipeline for this request with the local combiner to avoid shared mutation 

86 local_pipeline = HybridPipeline( 

87 vector_searcher=p.vector_searcher, 

88 keyword_searcher=p.keyword_searcher, 

89 result_combiner=local_combiner, 

90 reranker=p.reranker, 

91 booster=p.booster, 

92 normalizer=p.normalizer, 

93 deduplicator=p.deduplicator, 

94 ) 

95 combined_results = await engine._orchestrator.run_pipeline( 

96 local_pipeline, 

97 query=query, 

98 limit=limit, 

99 query_context=query_context, 

100 source_types=source_types, 

101 project_ids=project_ids, 

102 vector_query=plan.expanded_query, 

103 keyword_query=query, 

104 ) 

105 else: 

106 # Custom or mocked pipeline: honor its run override without cloning 

107 combined_results = await engine._orchestrator.run_pipeline( 

108 p, 

109 query=query, 

110 limit=limit, 

111 query_context=query_context, 

112 source_types=source_types, 

113 project_ids=project_ids, 

114 vector_query=plan.expanded_query, 

115 keyword_query=query, 

116 ) 

117 else: 

118 vector_results = await engine._vector_search( 

119 expanded_query, limit * 3, project_ids 

120 ) 

121 keyword_results = await engine._keyword_search( 

122 query, limit * 3, project_ids 

123 ) 

124 combined_results = await _combine_results_helper( 

125 local_combiner, 

126 getattr(engine, "min_score", 0.0), 

127 vector_results, 

128 keyword_results, 

129 query_context, 

130 limit, 

131 source_types, 

132 project_ids, 

133 ) 

134 finally: 

135 # Always attempt to restore engine combiner settings to their original values. 

136 # Log any restoration failures with context, without masking original exceptions. 

137 try: 

138 engine_rc = getattr(engine, "result_combiner", None) 

139 if engine_rc is not None: 

140 restorations = [ 

141 ("vector_weight", original_vector_weight), 

142 ("keyword_weight", original_keyword_weight), 

143 ("min_score", original_min_score), 

144 ] 

145 for attr_name, original_value in restorations: 

146 try: 

147 setattr(engine_rc, attr_name, original_value) 

148 except Exception as e: # noqa: F841 - keep behavior 

149 try: 

150 logger.error( 

151 "Failed to restore result_combiner.%s to %r on %s: %s", 

152 attr_name, 

153 original_value, 

154 type(engine_rc).__name__, 

155 e, 

156 exc_info=True, 

157 ) 

158 except Exception: 

159 # Never allow logging to raise 

160 pass 

161 except Exception: 

162 # Never raise from restoration; preserve original exception flow 

163 try: 

164 logger.error( 

165 "Unexpected error during result_combiner restoration on %s", 

166 type(getattr(engine, "result_combiner", object())).__name__, 

167 exc_info=True, 

168 ) 

169 except Exception: 

170 pass 

171 

172 return combined_results