Coverage for src / qdrant_loader_mcp_server / search / components / result_combiner.py: 89%
175 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:41 +0000
1"""Result combination and ranking logic for hybrid search."""
3from typing import Any
5from ...utils.logging import LoggingConfig
6from ..hybrid.components.scoring import HybridScorer
7from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer
8from .combining import (
9 boost_score_with_metadata,
10 flatten_metadata_components,
11 should_skip_result,
12)
13from .metadata_extractor import MetadataExtractor
14from .search_result_models import HybridSearchResult, create_hybrid_search_result
16WRRF_CONSTANT = 60
19class ResultCombiner:
20 """Combines and ranks search results from multiple sources."""
22 def __init__(
23 self,
24 vector_weight: float = 0.6,
25 keyword_weight: float = 0.3,
26 metadata_weight: float = 0.1,
27 min_score: float = 0.3,
28 spacy_analyzer: SpaCyQueryAnalyzer | None = None,
29 ):
30 """Initialize the result combiner.
32 Args:
33 vector_weight: Weight for vector search scores (0-1)
34 keyword_weight: Weight for keyword search scores (0-1)
35 metadata_weight: Weight for metadata-based scoring (0-1)
36 min_score: Minimum combined score threshold
37 spacy_analyzer: Optional spaCy analyzer for semantic boosting
38 """
39 self.vector_weight = vector_weight
40 self.keyword_weight = keyword_weight
41 self.metadata_weight = metadata_weight
42 self.min_score = min_score
43 self.spacy_analyzer = spacy_analyzer
44 self.logger = LoggingConfig.get_logger(__name__)
46 self.metadata_extractor = MetadataExtractor()
47 # Internal scorer to centralize weighting logic (behavior-preserving)
48 self._scorer = HybridScorer(
49 vector_weight=self.vector_weight,
50 keyword_weight=self.keyword_weight,
51 metadata_weight=self.metadata_weight,
52 )
54 def merge_results_with_wrrf(
55 self,
56 vector_results: list[dict[str, Any]],
57 keyword_results: list[dict[str, Any]],
58 ) -> dict:
59 """
60 Merge and rerank results using Weighted Recipocal Rerank Fusion from vector (dense) and keyword (sparse) search.
61 """
62 combined_dict = {}
63 # Process vector results
64 for rank, result in enumerate(vector_results, 1):
65 text = result["text"]
66 if text not in combined_dict:
67 metadata = result["metadata"]
68 combined_dict[text] = {
69 "text": text,
70 "metadata": metadata,
71 "source_type": result["source_type"],
72 "vector_score": result["score"],
73 "keyword_score": 0.0,
74 # 🔧 CRITICAL FIX: Include all root-level fields from search services
75 "title": result.get("title", ""),
76 "url": result.get("url", ""),
77 "document_id": result.get("document_id", ""),
78 "source": result.get("source", ""),
79 "created_at": result.get("created_at", ""),
80 "updated_at": result.get("updated_at", ""),
81 "contextual_content": result.get("contextual_content", ""),
82 "wrrf_score": self._scorer.vector_weight
83 * (1 / (rank + WRRF_CONSTANT)),
84 }
86 # Process keyword results
87 for rank, result in enumerate(keyword_results, 1):
88 text = result["text"]
89 if text in combined_dict:
90 combined_dict[text]["keyword_score"] = result["score"]
91 # Backfill contextual_content if vector entry was empty
92 if not combined_dict[text].get("contextual_content") and result.get(
93 "contextual_content"
94 ):
95 combined_dict[text]["contextual_content"] = result[
96 "contextual_content"
97 ]
98 # Sum
99 combined_dict[text]["wrrf_score"] += self._scorer.keyword_weight * (
100 1 / (rank + WRRF_CONSTANT)
101 )
102 else:
103 metadata = result["metadata"]
104 combined_dict[text] = {
105 "text": text,
106 "metadata": metadata,
107 "source_type": result["source_type"],
108 "vector_score": 0.0,
109 "keyword_score": result["score"],
110 "title": result.get("title", ""),
111 "url": result.get("url", ""),
112 "document_id": result.get("document_id", ""),
113 "source": result.get("source", ""),
114 "created_at": result.get("created_at", ""),
115 "updated_at": result.get("updated_at", ""),
116 "contextual_content": result.get("contextual_content", ""),
117 "wrrf_score": self._scorer.keyword_weight
118 * (1 / (rank + WRRF_CONSTANT)),
119 }
120 return combined_dict
122 def extract_chunk_title(
123 self, info: dict, metadata: dict, chunk_index: int, total_chunks: int
124 ) -> str:
125 # Extract fields from both direct payload fields and nested metadata
126 # Use direct fields from Qdrant payload when available, fallback to metadata
127 title = info.get("title", "") or metadata.get("title", "")
129 # Extract rich metadata from nested metadata object
130 file_name = metadata.get("file_name", "")
131 metadata.get("file_type", "")
133 # Enhanced title generation using actual Qdrant structure
134 # Priority: root title > nested section_title > file_name + chunk info > source
135 root_title = info.get(
136 "title", ""
137 ) # e.g., "Stratégie commerciale MYA.pdf - Chunk 2"
138 nested_title = metadata.get("title", "") # e.g., "Preamble (Part 2)"
139 section_title = metadata.get("section_title", "")
141 if root_title:
142 title = root_title
143 elif nested_title:
144 title = nested_title
145 elif section_title:
146 title = section_title
147 elif file_name:
148 title = file_name
149 # Add chunk info if available from nested metadata
150 sub_chunk_index = metadata.get("sub_chunk_index")
151 total_sub_chunks = metadata.get("total_sub_chunks")
152 if sub_chunk_index is not None and total_sub_chunks is not None:
153 title += f" - Chunk {int(sub_chunk_index) + 1}/{total_sub_chunks}"
154 elif chunk_index is not None and total_chunks is not None:
155 title += f" - Chunk {int(chunk_index) + 1}/{total_chunks}"
156 else:
157 source = info.get("source", "") or metadata.get("source", "")
158 if source:
159 # Extract filename from path-like sources
160 import os
162 title = (
163 os.path.basename(source)
164 if "/" in source or "\\" in source
165 else source
166 )
167 else:
168 title = "Untitled"
169 return title
171 def merge_rich_and_enhanced_metadata(
172 self,
173 info: dict,
174 metadata: dict,
175 metadata_components: dict,
176 chunk_index: int,
177 total_chunks: int,
178 ) -> dict:
179 # Create enhanced metadata dict with rich Qdrant fields
180 enhanced_metadata = {
181 # Core fields from root level of Qdrant payload
182 "source_url": info.get("url", ""),
183 "document_id": info.get("document_id", ""),
184 "created_at": info.get("created_at", ""),
185 "last_modified": info.get("updated_at", ""),
186 "repo_name": info.get("source", ""),
187 # Project scoping is stored at the root as 'source'
188 "project_id": info.get("source", ""),
189 # Construct file path from nested metadata
190 "file_path": (
191 metadata.get("file_directory", "").rstrip("/")
192 + "/"
193 + metadata.get("file_name", "")
194 if metadata.get("file_name") and metadata.get("file_directory")
195 else metadata.get("file_name", "")
196 ),
197 }
199 # Add rich metadata from nested metadata object (confirmed structure)
200 rich_metadata_fields = {
201 "original_filename": metadata.get("file_name"),
202 "file_size": metadata.get("file_size"),
203 "original_file_type": metadata.get("file_type")
204 or metadata.get("original_file_type"),
205 "word_count": metadata.get("word_count"),
206 "char_count": metadata.get("character_count")
207 or metadata.get("char_count")
208 or metadata.get("line_count"),
209 "chunk_index": metadata.get("sub_chunk_index", chunk_index),
210 "total_chunks": metadata.get("total_sub_chunks", total_chunks),
211 "chunking_strategy": metadata.get("chunking_strategy")
212 or metadata.get("conversion_method"),
213 # Project fields now come from root payload; avoid overriding with nested metadata
214 "collection_name": metadata.get("collection_name"),
215 # Additional rich fields from actual Qdrant structure
216 "section_title": metadata.get("section_title"),
217 "parent_section": metadata.get("parent_section"),
218 "file_encoding": metadata.get("file_encoding"),
219 "conversion_failed": metadata.get("conversion_failed", False),
220 "is_excel_sheet": metadata.get("is_excel_sheet", False),
221 }
223 # Only add non-None values to avoid conflicts
224 for key, value in rich_metadata_fields.items():
225 if value is not None:
226 enhanced_metadata[key] = value
228 # Merge with flattened metadata components (flattened takes precedence for conflicts)
229 flattened_components = flatten_metadata_components(metadata_components)
230 enhanced_metadata.update(flattened_components)
232 return enhanced_metadata
234 def is_result_filtered(self, use_wrrf: bool, wrrf_score: float, chunk_score: float):
235 # Scale minimum threshold
236 wrrf_min_score = self.min_score * (
237 (self._scorer.vector_weight + self._scorer.keyword_weight)
238 / (WRRF_CONSTANT + 1)
239 )
240 # Filter low wrrf
241 if use_wrrf and wrrf_score <= wrrf_min_score:
242 return True
244 # Fallback to standard filter
245 if not use_wrrf and chunk_score <= self.min_score:
246 return True
247 return False
249 async def combine_results(
250 self,
251 vector_results: list[dict[str, Any]],
252 keyword_results: list[dict[str, Any]],
253 query_context: dict[str, Any],
254 limit: int,
255 source_types: list[str] | None = None,
256 project_ids: list[str] | None = None,
257 ) -> list[HybridSearchResult]:
258 """Combine and rerank results using Weighted Recipocal Rerank Fusion from vector (dense) and keyword (sparse) search.
260 Args:
261 vector_results: Results from vector search
262 keyword_results: Results from keyword search
263 query_context: Query analysis context
264 limit: Maximum number of results to return
265 source_types: Optional source type filters
266 project_ids: Optional project ID filters
268 Returns:
269 List of combined and ranked HybridSearchResult objects
270 """
271 combined_dict = self.merge_results_with_wrrf(
272 vector_results=vector_results, keyword_results=keyword_results
273 )
275 # Calculate combined scores and create results
276 combined_results = []
278 # Extract intent-specific filtering configuration
279 search_intent = query_context.get("search_intent")
280 adaptive_config = query_context.get("adaptive_config")
281 result_filters = adaptive_config.result_filters if adaptive_config else {}
283 # Naive WRRF trigger
284 use_wrrf = len(combined_dict.keys()) >= 10
286 for text, info in combined_dict.items():
287 # Skip if source type doesn't match filter
288 if source_types and info["source_type"] not in source_types:
289 continue
290 # Apply intent-specific result filtering
291 metadata = info["metadata"]
292 if search_intent and result_filters:
293 if should_skip_result(metadata, result_filters, query_context):
294 continue
296 wrrf_score = info["wrrf_score"]
297 # Fallback to standard weighting scoring
298 chunk_score = (info["keyword_score"] * self._scorer.keyword_weight) + (
299 info["vector_score"] * self._scorer.vector_weight
300 )
302 # Filter based on WRRF or standard scores and weighting
303 if self.is_result_filtered(use_wrrf, wrrf_score, chunk_score):
304 continue
306 score = wrrf_score if use_wrrf else chunk_score
308 # Extract all metadata components
309 metadata_components = self.metadata_extractor.extract_all_metadata(metadata)
311 # TODO: Evaluate metadata score boosting with WRRF and in general - Boost score with metadata
312 boosted_score = boost_score_with_metadata(
313 score,
314 metadata,
315 query_context,
316 spacy_analyzer=self.spacy_analyzer,
317 )
318 chunk_index = metadata.get("chunk_index")
319 total_chunks = metadata.get("total_chunks")
321 title = self.extract_chunk_title(
322 info=info,
323 metadata=metadata,
324 chunk_index=chunk_index,
325 total_chunks=total_chunks,
326 )
327 enhanced_metadata = self.merge_rich_and_enhanced_metadata(
328 info=info,
329 metadata=metadata,
330 metadata_components=metadata_components,
331 chunk_index=chunk_index,
332 total_chunks=total_chunks,
333 )
334 contextual_content = info.get("contextual_content")
335 if contextual_content:
336 enhanced_metadata["contextual_content"] = contextual_content
337 # NOTE: No additional fallback; root payload project_id is authoritative
339 # Create HybridSearchResult using factory function
340 hybrid_result = create_hybrid_search_result(
341 score=boosted_score,
342 text=text,
343 source_type=info["source_type"],
344 source_title=title,
345 vector_score=info["vector_score"],
346 keyword_score=info["keyword_score"],
347 **enhanced_metadata,
348 )
350 combined_results.append(hybrid_result)
352 # Sort by combined score
353 combined_results.sort(key=lambda x: x.score, reverse=True)
354 # Apply diversity filtering for exploratory intents
355 if adaptive_config and adaptive_config.diversity_factor > 0.0:
356 try:
357 from ..hybrid.components.diversity import apply_diversity_filtering
359 diverse_results = apply_diversity_filtering(
360 combined_results, adaptive_config.diversity_factor, limit
361 )
362 self.logger.debug(
363 "Applied diversity filtering",
364 original_count=len(combined_results),
365 diverse_count=len(diverse_results),
366 diversity_factor=adaptive_config.diversity_factor,
367 )
368 return diverse_results
369 except Exception:
370 # Fallback to original top-N behavior if import or filtering fails
371 pass
373 return combined_results[:limit]
375 # The following methods are thin wrappers delegating to combining/* modules
376 # to preserve backward-compatible tests that call private methods directly.
378 def _should_skip_result(
379 self, metadata: dict, result_filters: dict, query_context: dict
380 ) -> bool:
381 return should_skip_result(metadata, result_filters, query_context)
383 def _count_business_indicators(self, metadata: dict) -> int:
384 return __import__(
385 f"{__package__}.combining.filters", fromlist=["count_business_indicators"]
386 ).count_business_indicators(metadata)
388 def _boost_score_with_metadata(
389 self, base_score: float, metadata: dict, query_context: dict
390 ) -> float:
391 return boost_score_with_metadata(
392 base_score, metadata, query_context, spacy_analyzer=self.spacy_analyzer
393 )
395 def _apply_content_type_boosting(
396 self, metadata: dict, query_context: dict
397 ) -> float:
398 from .combining import apply_content_type_boosting
400 return apply_content_type_boosting(metadata, query_context)
402 def _apply_section_level_boosting(self, metadata: dict) -> float:
403 from .combining import apply_section_level_boosting
405 return apply_section_level_boosting(metadata)
407 def _apply_content_quality_boosting(self, metadata: dict) -> float:
408 from .combining import apply_content_quality_boosting
410 return apply_content_quality_boosting(metadata)
412 def _apply_conversion_boosting(self, metadata: dict, query_context: dict) -> float:
413 from .combining import apply_conversion_boosting
415 return apply_conversion_boosting(metadata, query_context)
417 def _apply_semantic_boosting(self, metadata: dict, query_context: dict) -> float:
418 from .combining import apply_semantic_boosting
420 return apply_semantic_boosting(metadata, query_context, self.spacy_analyzer)
422 def _apply_fallback_semantic_boosting(
423 self, metadata: dict, query_context: dict
424 ) -> float:
425 from .combining import apply_fallback_semantic_boosting
427 return apply_fallback_semantic_boosting(metadata, query_context)
429 def _apply_diversity_filtering(
430 self, results: list[HybridSearchResult], diversity_factor: float, limit: int
431 ) -> list[HybridSearchResult]:
432 if diversity_factor <= 0.0 or len(results) <= limit:
433 return results[:limit]
435 diverse_results = []
436 used_source_types = set()
437 used_section_types = set()
438 used_sources = set()
440 for result in results:
441 if len(diverse_results) >= limit:
442 break
444 diversity_score = 1.0
445 source_type = result.source_type
446 if source_type in used_source_types:
447 diversity_score *= 1.0 - diversity_factor * 0.3
449 section_type = result.section_type or "unknown"
450 if section_type in used_section_types:
451 diversity_score *= 1.0 - diversity_factor * 0.2
453 source_key = f"{result.source_type}:{result.source_title}"
454 if source_key in used_sources:
455 diversity_score *= 1.0 - diversity_factor * 0.4
457 adjusted_score = result.score * diversity_score
459 if (
460 len(diverse_results) < limit * 0.7
461 or adjusted_score >= result.score * 0.6
462 ):
463 diverse_results.append(result)
464 used_source_types.add(source_type)
465 used_section_types.add(section_type)
466 used_sources.add(source_key)
468 remaining_slots = limit - len(diverse_results)
469 if remaining_slots > 0:
470 remaining_results = [r for r in results if r not in diverse_results]
471 diverse_results.extend(remaining_results[:remaining_slots])
473 return diverse_results[:limit]
475 def _flatten_metadata_components(
476 self, metadata_components: dict[str, Any]
477 ) -> dict[str, Any]:
478 return flatten_metadata_components(metadata_components)