Coverage for src/qdrant_loader_mcp_server/search/components/result_combiner.py: 88%
148 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""Result combination and ranking logic for hybrid search."""
3from typing import Any
5from ...utils.logging import LoggingConfig
6from ..hybrid.components.scoring import HybridScorer, ScoreComponents
7from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer
8from .combining import (
9 boost_score_with_metadata,
10 flatten_metadata_components,
11 should_skip_result,
12)
13from .metadata_extractor import MetadataExtractor
14from .search_result_models import HybridSearchResult, create_hybrid_search_result
17class ResultCombiner:
18 """Combines and ranks search results from multiple sources."""
20 def __init__(
21 self,
22 vector_weight: float = 0.6,
23 keyword_weight: float = 0.3,
24 metadata_weight: float = 0.1,
25 min_score: float = 0.3,
26 spacy_analyzer: SpaCyQueryAnalyzer | None = None,
27 ):
28 """Initialize the result combiner.
30 Args:
31 vector_weight: Weight for vector search scores (0-1)
32 keyword_weight: Weight for keyword search scores (0-1)
33 metadata_weight: Weight for metadata-based scoring (0-1)
34 min_score: Minimum combined score threshold
35 spacy_analyzer: Optional spaCy analyzer for semantic boosting
36 """
37 self.vector_weight = vector_weight
38 self.keyword_weight = keyword_weight
39 self.metadata_weight = metadata_weight
40 self.min_score = min_score
41 self.spacy_analyzer = spacy_analyzer
42 self.logger = LoggingConfig.get_logger(__name__)
44 self.metadata_extractor = MetadataExtractor()
45 # Internal scorer to centralize weighting logic (behavior-preserving)
46 self._scorer = HybridScorer(
47 vector_weight=self.vector_weight,
48 keyword_weight=self.keyword_weight,
49 metadata_weight=self.metadata_weight,
50 )
52 async def combine_results(
53 self,
54 vector_results: list[dict[str, Any]],
55 keyword_results: list[dict[str, Any]],
56 query_context: dict[str, Any],
57 limit: int,
58 source_types: list[str] | None = None,
59 project_ids: list[str] | None = None,
60 ) -> list[HybridSearchResult]:
61 """Combine and rerank results from vector and keyword search.
63 Args:
64 vector_results: Results from vector search
65 keyword_results: Results from keyword search
66 query_context: Query analysis context
67 limit: Maximum number of results to return
68 source_types: Optional source type filters
69 project_ids: Optional project ID filters
71 Returns:
72 List of combined and ranked HybridSearchResult objects
73 """
74 combined_dict = {}
76 # Process vector results
77 for result in vector_results:
78 text = result["text"]
79 if text not in combined_dict:
80 metadata = result["metadata"]
81 combined_dict[text] = {
82 "text": text,
83 "metadata": metadata,
84 "source_type": result["source_type"],
85 "vector_score": result["score"],
86 "keyword_score": 0.0,
87 # 🔧 CRITICAL FIX: Include all root-level fields from search services
88 "title": result.get("title", ""),
89 "url": result.get("url", ""),
90 "document_id": result.get("document_id", ""),
91 "source": result.get("source", ""),
92 "created_at": result.get("created_at", ""),
93 "updated_at": result.get("updated_at", ""),
94 }
96 # Process keyword results
97 for result in keyword_results:
98 text = result["text"]
99 if text in combined_dict:
100 combined_dict[text]["keyword_score"] = result["score"]
101 else:
102 metadata = result["metadata"]
103 combined_dict[text] = {
104 "text": text,
105 "metadata": metadata,
106 "source_type": result["source_type"],
107 "vector_score": 0.0,
108 "keyword_score": result["score"],
109 "title": result.get("title", ""),
110 "url": result.get("url", ""),
111 "document_id": result.get("document_id", ""),
112 "source": result.get("source", ""),
113 "created_at": result.get("created_at", ""),
114 "updated_at": result.get("updated_at", ""),
115 }
117 # Calculate combined scores and create results
118 combined_results = []
120 # Extract intent-specific filtering configuration
121 search_intent = query_context.get("search_intent")
122 adaptive_config = query_context.get("adaptive_config")
123 result_filters = adaptive_config.result_filters if adaptive_config else {}
125 for text, info in combined_dict.items():
126 # Skip if source type doesn't match filter
127 if source_types and info["source_type"] not in source_types:
128 continue
130 metadata = info["metadata"]
132 # Apply intent-specific result filtering
133 if search_intent and result_filters:
134 if should_skip_result(metadata, result_filters, query_context):
135 continue
137 combined_score = self._scorer.compute(
138 ScoreComponents(
139 vector_score=info["vector_score"],
140 keyword_score=info["keyword_score"],
141 metadata_score=0.0, # Preserve legacy behavior (no metadata in base score)
142 )
143 )
145 if combined_score >= self.min_score:
146 # Extract all metadata components
147 metadata_components = self.metadata_extractor.extract_all_metadata(
148 metadata
149 )
151 # Boost score with metadata
152 boosted_score = boost_score_with_metadata(
153 combined_score,
154 metadata,
155 query_context,
156 spacy_analyzer=self.spacy_analyzer,
157 )
159 # Extract fields from both direct payload fields and nested metadata
160 # Use direct fields from Qdrant payload when available, fallback to metadata
161 title = info.get("title", "") or metadata.get("title", "")
163 # Extract rich metadata from nested metadata object
164 file_name = metadata.get("file_name", "")
165 metadata.get("file_type", "")
166 chunk_index = metadata.get("chunk_index")
167 total_chunks = metadata.get("total_chunks")
169 # Enhanced title generation using actual Qdrant structure
170 # Priority: root title > nested section_title > file_name + chunk info > source
171 root_title = info.get(
172 "title", ""
173 ) # e.g., "Stratégie commerciale MYA.pdf - Chunk 2"
174 nested_title = metadata.get("title", "") # e.g., "Preamble (Part 2)"
175 section_title = metadata.get("section_title", "")
177 if root_title:
178 title = root_title
179 elif nested_title:
180 title = nested_title
181 elif section_title:
182 title = section_title
183 elif file_name:
184 title = file_name
185 # Add chunk info if available from nested metadata
186 sub_chunk_index = metadata.get("sub_chunk_index")
187 total_sub_chunks = metadata.get("total_sub_chunks")
188 if sub_chunk_index is not None and total_sub_chunks is not None:
189 title += (
190 f" - Chunk {int(sub_chunk_index) + 1}/{total_sub_chunks}"
191 )
192 elif chunk_index is not None and total_chunks is not None:
193 title += f" - Chunk {int(chunk_index) + 1}/{total_chunks}"
194 else:
195 source = info.get("source", "") or metadata.get("source", "")
196 if source:
197 # Extract filename from path-like sources
198 import os
200 title = (
201 os.path.basename(source)
202 if "/" in source or "\\" in source
203 else source
204 )
205 else:
206 title = "Untitled"
208 # Create enhanced metadata dict with rich Qdrant fields
209 enhanced_metadata = {
210 # Core fields from root level of Qdrant payload
211 "source_url": info.get("url", ""),
212 "document_id": info.get("document_id", ""),
213 "created_at": info.get("created_at", ""),
214 "last_modified": info.get("updated_at", ""),
215 "repo_name": info.get("source", ""),
216 # Project scoping is stored at the root as 'source'
217 "project_id": info.get("source", ""),
218 # Construct file path from nested metadata
219 "file_path": (
220 metadata.get("file_directory", "").rstrip("/")
221 + "/"
222 + metadata.get("file_name", "")
223 if metadata.get("file_name") and metadata.get("file_directory")
224 else metadata.get("file_name", "")
225 ),
226 }
228 # Add rich metadata from nested metadata object (confirmed structure)
229 rich_metadata_fields = {
230 "original_filename": metadata.get("file_name"),
231 "file_size": metadata.get("file_size"),
232 "original_file_type": metadata.get("file_type")
233 or metadata.get("original_file_type"),
234 "word_count": metadata.get("word_count"),
235 "char_count": metadata.get("character_count")
236 or metadata.get("char_count")
237 or metadata.get("line_count"),
238 "chunk_index": metadata.get("sub_chunk_index", chunk_index),
239 "total_chunks": metadata.get("total_sub_chunks", total_chunks),
240 "chunking_strategy": metadata.get("chunking_strategy")
241 or metadata.get("conversion_method"),
242 # Project fields now come from root payload; avoid overriding with nested metadata
243 "collection_name": metadata.get("collection_name"),
244 # Additional rich fields from actual Qdrant structure
245 "section_title": metadata.get("section_title"),
246 "parent_section": metadata.get("parent_section"),
247 "file_encoding": metadata.get("file_encoding"),
248 "conversion_failed": metadata.get("conversion_failed", False),
249 "is_excel_sheet": metadata.get("is_excel_sheet", False),
250 }
252 # Only add non-None values to avoid conflicts
253 for key, value in rich_metadata_fields.items():
254 if value is not None:
255 enhanced_metadata[key] = value
257 # Merge with flattened metadata components (flattened takes precedence for conflicts)
258 flattened_components = flatten_metadata_components(metadata_components)
259 enhanced_metadata.update(flattened_components)
261 # NOTE: No additional fallback; root payload project_id is authoritative
263 # Create HybridSearchResult using factory function
264 hybrid_result = create_hybrid_search_result(
265 score=boosted_score,
266 text=text,
267 source_type=info["source_type"],
268 source_title=title,
269 vector_score=info["vector_score"],
270 keyword_score=info["keyword_score"],
271 **enhanced_metadata,
272 )
274 combined_results.append(hybrid_result)
276 # Sort by combined score
277 combined_results.sort(key=lambda x: x.score, reverse=True)
279 # Apply diversity filtering for exploratory intents
280 if adaptive_config and adaptive_config.diversity_factor > 0.0:
281 try:
282 from ..hybrid.components.diversity import apply_diversity_filtering
284 diverse_results = apply_diversity_filtering(
285 combined_results, adaptive_config.diversity_factor, limit
286 )
287 self.logger.debug(
288 "Applied diversity filtering",
289 original_count=len(combined_results),
290 diverse_count=len(diverse_results),
291 diversity_factor=adaptive_config.diversity_factor,
292 )
293 return diverse_results
294 except Exception:
295 # Fallback to original top-N behavior if import or filtering fails
296 pass
298 return combined_results[:limit]
300 # The following methods are thin wrappers delegating to combining/* modules
301 # to preserve backward-compatible tests that call private methods directly.
303 def _should_skip_result(
304 self, metadata: dict, result_filters: dict, query_context: dict
305 ) -> bool:
306 return should_skip_result(metadata, result_filters, query_context)
308 def _count_business_indicators(self, metadata: dict) -> int:
309 return __import__(
310 f"{__package__}.combining.filters", fromlist=["count_business_indicators"]
311 ).count_business_indicators(metadata)
313 def _boost_score_with_metadata(
314 self, base_score: float, metadata: dict, query_context: dict
315 ) -> float:
316 return boost_score_with_metadata(
317 base_score, metadata, query_context, spacy_analyzer=self.spacy_analyzer
318 )
320 def _apply_content_type_boosting(
321 self, metadata: dict, query_context: dict
322 ) -> float:
323 from .combining import apply_content_type_boosting
325 return apply_content_type_boosting(metadata, query_context)
327 def _apply_section_level_boosting(self, metadata: dict) -> float:
328 from .combining import apply_section_level_boosting
330 return apply_section_level_boosting(metadata)
332 def _apply_content_quality_boosting(self, metadata: dict) -> float:
333 from .combining import apply_content_quality_boosting
335 return apply_content_quality_boosting(metadata)
337 def _apply_conversion_boosting(self, metadata: dict, query_context: dict) -> float:
338 from .combining import apply_conversion_boosting
340 return apply_conversion_boosting(metadata, query_context)
342 def _apply_semantic_boosting(self, metadata: dict, query_context: dict) -> float:
343 from .combining import apply_semantic_boosting
345 return apply_semantic_boosting(metadata, query_context, self.spacy_analyzer)
347 def _apply_fallback_semantic_boosting(
348 self, metadata: dict, query_context: dict
349 ) -> float:
350 from .combining import apply_fallback_semantic_boosting
352 return apply_fallback_semantic_boosting(metadata, query_context)
354 def _apply_diversity_filtering(
355 self, results: list[HybridSearchResult], diversity_factor: float, limit: int
356 ) -> list[HybridSearchResult]:
357 if diversity_factor <= 0.0 or len(results) <= limit:
358 return results[:limit]
360 diverse_results = []
361 used_source_types = set()
362 used_section_types = set()
363 used_sources = set()
365 for result in results:
366 if len(diverse_results) >= limit:
367 break
369 diversity_score = 1.0
370 source_type = result.source_type
371 if source_type in used_source_types:
372 diversity_score *= 1.0 - diversity_factor * 0.3
374 section_type = result.section_type or "unknown"
375 if section_type in used_section_types:
376 diversity_score *= 1.0 - diversity_factor * 0.2
378 source_key = f"{result.source_type}:{result.source_title}"
379 if source_key in used_sources:
380 diversity_score *= 1.0 - diversity_factor * 0.4
382 adjusted_score = result.score * diversity_score
384 if (
385 len(diverse_results) < limit * 0.7
386 or adjusted_score >= result.score * 0.6
387 ):
388 diverse_results.append(result)
389 used_source_types.add(source_type)
390 used_section_types.add(section_type)
391 used_sources.add(source_key)
393 remaining_slots = limit - len(diverse_results)
394 if remaining_slots > 0:
395 remaining_results = [r for r in results if r not in diverse_results]
396 diverse_results.extend(remaining_results[:remaining_slots])
398 return diverse_results[:limit]
400 def _flatten_metadata_components(
401 self, metadata_components: dict[str, Any]
402 ) -> dict[str, Any]:
403 return flatten_metadata_components(metadata_components)