Coverage for src/qdrant_loader_mcp_server/search/hybrid/orchestration/search.py: 85%
65 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3import logging
4from typing import Any
6from ...components.result_combiner import ResultCombiner
7from ...components.search_result_models import HybridSearchResult
8from ..components.helpers import combine_results as _combine_results_helper
9from ..pipeline import HybridPipeline
11logger = logging.getLogger(__name__)
14async def run_search(
15 engine: Any,
16 query: str,
17 limit: int,
18 source_types: list[str] | None,
19 project_ids: list[str] | None,
20 session_context: dict[str, Any] | None,
21 behavioral_context: list[str] | None,
22) -> list[HybridSearchResult]:
23 # Save original combiner values up front for safe restoration
24 original_vector_weight = engine.result_combiner.vector_weight
25 original_keyword_weight = engine.result_combiner.keyword_weight
26 original_min_score = engine.result_combiner.min_score
28 combined_results: list[HybridSearchResult]
30 try:
31 # Build a request-scoped combiner clone to avoid mutating shared engine state
32 base_combiner = engine.result_combiner
33 local_combiner = ResultCombiner(
34 vector_weight=getattr(base_combiner, "vector_weight", 0.6),
35 keyword_weight=getattr(base_combiner, "keyword_weight", 0.3),
36 metadata_weight=getattr(base_combiner, "metadata_weight", 0.1),
37 min_score=getattr(base_combiner, "min_score", 0.3),
38 spacy_analyzer=getattr(base_combiner, "spacy_analyzer", None),
39 )
41 # Intent classification and adaptive adjustments (applied to local combiner only)
42 search_intent = None
43 adaptive_config = None
44 if engine.enable_intent_adaptation and engine.intent_classifier:
45 search_intent = engine.intent_classifier.classify_intent(
46 query, session_context, behavioral_context
47 )
48 adaptive_config = engine.adaptive_strategy.adapt_search(
49 search_intent, query
50 )
51 if adaptive_config:
52 local_combiner.vector_weight = adaptive_config.vector_weight
53 local_combiner.keyword_weight = adaptive_config.keyword_weight
54 local_combiner.min_score = adaptive_config.min_score_threshold
55 limit = min(adaptive_config.max_results, limit * 2)
57 expanded_query = await engine._expand_query(query)
58 if adaptive_config and getattr(adaptive_config, "expand_query", False):
59 aggressiveness = getattr(adaptive_config, "expansion_aggressiveness", None)
60 if isinstance(aggressiveness, int | float) and aggressiveness > 0.5:
61 expanded_query = await engine._expand_query_aggressive(query)
63 query_context = engine._analyze_query(query)
64 if search_intent:
65 query_context["search_intent"] = search_intent
66 query_context["adaptive_config"] = adaptive_config
68 plan = engine._planner.make_plan(
69 has_pipeline=engine.hybrid_pipeline is not None,
70 expanded_query=expanded_query,
71 )
73 # Ensure combiner threshold honors engine-level minimum when applicable
74 engine_min_score = getattr(engine, "min_score", None)
75 if engine_min_score is not None and (
76 getattr(local_combiner, "min_score", None) is None
77 or local_combiner.min_score < engine_min_score
78 ):
79 # Use the stricter (higher) engine threshold
80 local_combiner.min_score = engine_min_score
82 if plan.use_pipeline and engine.hybrid_pipeline is not None:
83 p = engine.hybrid_pipeline
84 if isinstance(p, HybridPipeline):
85 # Clone pipeline for this request with the local combiner to avoid shared mutation
86 local_pipeline = HybridPipeline(
87 vector_searcher=p.vector_searcher,
88 keyword_searcher=p.keyword_searcher,
89 result_combiner=local_combiner,
90 reranker=p.reranker,
91 booster=p.booster,
92 normalizer=p.normalizer,
93 deduplicator=p.deduplicator,
94 )
95 combined_results = await engine._orchestrator.run_pipeline(
96 local_pipeline,
97 query=query,
98 limit=limit,
99 query_context=query_context,
100 source_types=source_types,
101 project_ids=project_ids,
102 vector_query=plan.expanded_query,
103 keyword_query=query,
104 )
105 else:
106 # Custom or mocked pipeline: honor its run override without cloning
107 combined_results = await engine._orchestrator.run_pipeline(
108 p,
109 query=query,
110 limit=limit,
111 query_context=query_context,
112 source_types=source_types,
113 project_ids=project_ids,
114 vector_query=plan.expanded_query,
115 keyword_query=query,
116 )
117 else:
118 vector_results = await engine._vector_search(
119 expanded_query, limit * 3, project_ids
120 )
121 keyword_results = await engine._keyword_search(
122 query, limit * 3, project_ids
123 )
124 combined_results = await _combine_results_helper(
125 local_combiner,
126 getattr(engine, "min_score", 0.0),
127 vector_results,
128 keyword_results,
129 query_context,
130 limit,
131 source_types,
132 project_ids,
133 )
134 finally:
135 # Always attempt to restore engine combiner settings to their original values.
136 # Log any restoration failures with context, without masking original exceptions.
137 try:
138 engine_rc = getattr(engine, "result_combiner", None)
139 if engine_rc is not None:
140 restorations = [
141 ("vector_weight", original_vector_weight),
142 ("keyword_weight", original_keyword_weight),
143 ("min_score", original_min_score),
144 ]
145 for attr_name, original_value in restorations:
146 try:
147 setattr(engine_rc, attr_name, original_value)
148 except Exception as e: # noqa: F841 - keep behavior
149 try:
150 logger.error(
151 "Failed to restore result_combiner.%s to %r on %s: %s",
152 attr_name,
153 original_value,
154 type(engine_rc).__name__,
155 e,
156 exc_info=True,
157 )
158 except Exception:
159 # Never allow logging to raise
160 pass
161 except Exception:
162 # Never raise from restoration; preserve original exception flow
163 try:
164 logger.error(
165 "Unexpected error during result_combiner restoration on %s",
166 type(getattr(engine, "result_combiner", object())).__name__,
167 exc_info=True,
168 )
169 except Exception:
170 pass
172 return combined_results