Coverage for src / qdrant_loader_mcp_server / search / hybrid / orchestration / search.py: 85%
68 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1from __future__ import annotations
3import logging
4from typing import Any
6from ...components.result_combiner import ResultCombiner
7from ...components.search_result_models import HybridSearchResult
8from ..components.helpers import combine_results as _combine_results_helper
9from ..pipeline import HybridPipeline
11logger = logging.getLogger(__name__)
14async def run_search(
15 engine: Any,
16 query: str,
17 limit: int,
18 source_types: list[str] | None,
19 project_ids: list[str] | None,
20 session_context: dict[str, Any] | None,
21 behavioral_context: list[str] | None,
22) -> list[HybridSearchResult]:
23 # Save original combiner values up front for safe restoration
24 """
25 Execute a hybrid search for the given query using the provided engine and return ranked results.
27 Per-request adjustments (query expansion, intent-adaptive combiner weights, and fetch limits) are applied to a cloned combiner and do not mutate the engine's shared combiner state; the function attempts to restore the engine's result_combiner attributes to their original values and logs any restoration failures without raising.
29 Parameters:
30 engine: Search engine instance providing hybrid search, planners, expansion, and orchestration.
31 query (str): The user query to search for.
32 limit (int): Maximum number of results to return.
33 source_types (list[str] | None): Optional list of source types to filter results.
34 project_ids (list[str] | None): Optional list of project IDs to restrict the search.
35 session_context (dict[str, Any] | None): Optional session-level context used for intent classification and adaptations.
36 behavioral_context (list[str] | None): Optional behavioral signals used for intent classification and adaptations.
38 Returns:
39 list[HybridSearchResult]: Ranked hybrid search results; length will be at most `limit`.
40 """
41 original_vector_weight = engine.result_combiner.vector_weight
42 original_keyword_weight = engine.result_combiner.keyword_weight
43 original_min_score = engine.result_combiner.min_score
45 combined_results: list[HybridSearchResult]
46 fetch_limit = limit
48 try:
49 # Build a request-scoped combiner clone to avoid mutating shared engine state
50 base_combiner = engine.result_combiner
51 local_combiner = ResultCombiner(
52 vector_weight=getattr(base_combiner, "vector_weight", 0.6),
53 keyword_weight=getattr(base_combiner, "keyword_weight", 0.3),
54 metadata_weight=getattr(base_combiner, "metadata_weight", 0.1),
55 min_score=getattr(base_combiner, "min_score", 0.3),
56 spacy_analyzer=getattr(base_combiner, "spacy_analyzer", None),
57 )
59 # Intent classification and adaptive adjustments (applied to local combiner only)
60 search_intent = None
61 adaptive_config = None
62 if engine.enable_intent_adaptation and engine.intent_classifier:
63 search_intent = engine.intent_classifier.classify_intent(
64 query, session_context, behavioral_context
65 )
66 adaptive_config = engine.adaptive_strategy.adapt_search(
67 search_intent, query
68 )
69 if adaptive_config:
70 local_combiner.vector_weight = adaptive_config.vector_weight
71 local_combiner.keyword_weight = adaptive_config.keyword_weight
72 local_combiner.min_score = adaptive_config.min_score_threshold
73 fetch_limit = min(adaptive_config.max_results, limit * 2)
75 # TODO: Evaluate the expanded_query logic to see it's impacts on vector and keyword searches
76 expanded_query = await engine._expand_query(query)
77 if adaptive_config and getattr(adaptive_config, "expand_query", False):
78 aggressiveness = getattr(adaptive_config, "expansion_aggressiveness", None)
79 if isinstance(aggressiveness, int | float) and aggressiveness > 0.5:
80 expanded_query = await engine._expand_query_aggressive(query)
82 query_context = engine._analyze_query(query)
83 if search_intent:
84 query_context["search_intent"] = search_intent
85 query_context["adaptive_config"] = adaptive_config
87 plan = engine._planner.make_plan(
88 has_pipeline=engine.hybrid_pipeline is not None,
89 expanded_query=expanded_query,
90 )
92 resolved_vector_query = plan.expanded_query
93 resolved_keyword_query = query
95 # Ensure combiner threshold honors engine-level minimum when applicable
96 engine_min_score = getattr(engine, "min_score", None)
97 if engine_min_score is not None and (
98 getattr(local_combiner, "min_score", None) is None
99 or local_combiner.min_score < engine_min_score
100 ):
101 # Use the stricter (higher) engine threshold
102 local_combiner.min_score = engine_min_score
104 if plan.use_pipeline and engine.hybrid_pipeline is not None:
105 hybrid_pipeline: HybridPipeline = engine.hybrid_pipeline
106 if isinstance(hybrid_pipeline, HybridPipeline):
107 # Clone pipeline for this request with the local combiner to avoid shared mutation
108 local_pipeline = HybridPipeline(
109 vector_searcher=hybrid_pipeline.vector_searcher,
110 keyword_searcher=hybrid_pipeline.keyword_searcher,
111 result_combiner=local_combiner,
112 reranker=hybrid_pipeline.reranker,
113 booster=hybrid_pipeline.booster,
114 normalizer=hybrid_pipeline.normalizer,
115 deduplicator=hybrid_pipeline.deduplicator,
116 )
117 combined_results: list[HybridSearchResult] = (
118 await engine._orchestrator.run_pipeline(
119 local_pipeline,
120 query=query,
121 limit=fetch_limit,
122 query_context=query_context,
123 source_types=source_types,
124 project_ids=project_ids,
125 vector_query=resolved_vector_query,
126 keyword_query=resolved_keyword_query,
127 )
128 )
129 else:
130 # Custom or mocked pipeline: honor its run override without cloning
131 combined_results: list[HybridSearchResult] = (
132 await engine._orchestrator.run_pipeline(
133 hybrid_pipeline,
134 query=query,
135 limit=fetch_limit,
136 query_context=query_context,
137 source_types=source_types,
138 project_ids=project_ids,
139 vector_query=resolved_vector_query,
140 keyword_query=resolved_keyword_query,
141 )
142 )
143 else:
144 vector_results = await engine._vector_search(
145 expanded_query, fetch_limit * 3, project_ids
146 )
147 keyword_results = await engine._keyword_search(
148 query, fetch_limit * 3, project_ids
149 )
150 combined_results = await _combine_results_helper(
151 local_combiner,
152 getattr(engine, "min_score", 0.0),
153 vector_results,
154 keyword_results,
155 query_context,
156 fetch_limit,
157 source_types,
158 project_ids,
159 )
160 finally:
161 # Always attempt to restore engine combiner settings to their original values.
162 # Log any restoration failures with context, without masking original exceptions.
163 try:
164 engine_rc = getattr(engine, "result_combiner", None)
165 if engine_rc is not None:
166 restorations = [
167 ("vector_weight", original_vector_weight),
168 ("keyword_weight", original_keyword_weight),
169 ("min_score", original_min_score),
170 ]
171 for attr_name, original_value in restorations:
172 try:
173 setattr(engine_rc, attr_name, original_value)
174 except Exception as e: # noqa: F841 - keep behavior
175 try:
176 logger.error(
177 "Failed to restore result_combiner.%s to %r on %s: %s",
178 attr_name,
179 original_value,
180 type(engine_rc).__name__,
181 e,
182 exc_info=True,
183 )
184 except Exception:
185 # Never allow logging to raise
186 pass
187 except Exception:
188 # Never raise from restoration; preserve original exception flow
189 try:
190 logger.error(
191 "Unexpected error during result_combiner restoration on %s",
192 type(getattr(engine, "result_combiner", object())).__name__,
193 exc_info=True,
194 )
195 except Exception:
196 pass
198 return combined_results[:limit]