Coverage for src/qdrant_loader_mcp_server/search/components/result_combiner.py: 86%
306 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:20 +0000
1"""Result combination and ranking logic for hybrid search."""
3from typing import Any
5from ...utils.logging import LoggingConfig
6from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer
7from .metadata_extractor import MetadataExtractor
8from .search_result_models import HybridSearchResult, create_hybrid_search_result
11class ResultCombiner:
12 """Combines and ranks search results from multiple sources."""
14 def __init__(
15 self,
16 vector_weight: float = 0.6,
17 keyword_weight: float = 0.3,
18 metadata_weight: float = 0.1,
19 min_score: float = 0.3,
20 spacy_analyzer: SpaCyQueryAnalyzer | None = None,
21 ):
22 """Initialize the result combiner.
24 Args:
25 vector_weight: Weight for vector search scores (0-1)
26 keyword_weight: Weight for keyword search scores (0-1)
27 metadata_weight: Weight for metadata-based scoring (0-1)
28 min_score: Minimum combined score threshold
29 spacy_analyzer: Optional spaCy analyzer for semantic boosting
30 """
31 self.vector_weight = vector_weight
32 self.keyword_weight = keyword_weight
33 self.metadata_weight = metadata_weight
34 self.min_score = min_score
35 self.spacy_analyzer = spacy_analyzer
36 self.logger = LoggingConfig.get_logger(__name__)
38 self.metadata_extractor = MetadataExtractor()
40 async def combine_results(
41 self,
42 vector_results: list[dict[str, Any]],
43 keyword_results: list[dict[str, Any]],
44 query_context: dict[str, Any],
45 limit: int,
46 source_types: list[str] | None = None,
47 project_ids: list[str] | None = None,
48 ) -> list[HybridSearchResult]:
49 """Combine and rerank results from vector and keyword search.
51 Args:
52 vector_results: Results from vector search
53 keyword_results: Results from keyword search
54 query_context: Query analysis context
55 limit: Maximum number of results to return
56 source_types: Optional source type filters
57 project_ids: Optional project ID filters
59 Returns:
60 List of combined and ranked HybridSearchResult objects
61 """
62 combined_dict = {}
64 # Process vector results
65 for result in vector_results:
66 text = result["text"]
67 if text not in combined_dict:
68 metadata = result["metadata"]
69 combined_dict[text] = {
70 "text": text,
71 "metadata": metadata,
72 "source_type": result["source_type"],
73 "vector_score": result["score"],
74 "keyword_score": 0.0,
75 # 🔧 CRITICAL FIX: Include all root-level fields from search services
76 "title": result.get("title", ""),
77 "url": result.get("url", ""),
78 "document_id": result.get("document_id", ""),
79 "source": result.get("source", ""),
80 "created_at": result.get("created_at", ""),
81 "updated_at": result.get("updated_at", ""),
82 }
84 # Process keyword results
85 for result in keyword_results:
86 text = result["text"]
87 if text in combined_dict:
88 combined_dict[text]["keyword_score"] = result["score"]
89 else:
90 metadata = result["metadata"]
91 combined_dict[text] = {
92 "text": text,
93 "metadata": metadata,
94 "source_type": result["source_type"],
95 "vector_score": 0.0,
96 "keyword_score": result["score"],
97 "title": result.get("title", ""),
98 "url": result.get("url", ""),
99 "document_id": result.get("document_id", ""),
100 "source": result.get("source", ""),
101 "created_at": result.get("created_at", ""),
102 "updated_at": result.get("updated_at", ""),
103 }
105 # Calculate combined scores and create results
106 combined_results = []
108 # Extract intent-specific filtering configuration
109 search_intent = query_context.get("search_intent")
110 adaptive_config = query_context.get("adaptive_config")
111 result_filters = adaptive_config.result_filters if adaptive_config else {}
113 for text, info in combined_dict.items():
114 # Skip if source type doesn't match filter
115 if source_types and info["source_type"] not in source_types:
116 continue
118 metadata = info["metadata"]
120 # Apply intent-specific result filtering
121 if search_intent and result_filters:
122 if self._should_skip_result(metadata, result_filters, query_context):
123 continue
125 combined_score = (
126 self.vector_weight * info["vector_score"]
127 + self.keyword_weight * info["keyword_score"]
128 )
130 if combined_score >= self.min_score:
131 # Extract all metadata components
132 metadata_components = self.metadata_extractor.extract_all_metadata(
133 metadata
134 )
136 # Boost score with metadata
137 boosted_score = self._boost_score_with_metadata(
138 combined_score, metadata, query_context
139 )
141 # Extract fields from both direct payload fields and nested metadata
142 # Use direct fields from Qdrant payload when available, fallback to metadata
143 title = info.get("title", "") or metadata.get("title", "")
145 # Extract rich metadata from nested metadata object
146 file_name = metadata.get("file_name", "")
147 metadata.get("file_type", "")
148 chunk_index = metadata.get("chunk_index")
149 total_chunks = metadata.get("total_chunks")
151 # Enhanced title generation using actual Qdrant structure
152 # Priority: root title > nested section_title > file_name + chunk info > source
153 root_title = info.get(
154 "title", ""
155 ) # e.g., "Stratégie commerciale MYA.pdf - Chunk 2"
156 nested_title = metadata.get("title", "") # e.g., "Preamble (Part 2)"
157 section_title = metadata.get("section_title", "")
159 if root_title:
160 title = root_title
161 elif nested_title:
162 title = nested_title
163 elif section_title:
164 title = section_title
165 elif file_name:
166 title = file_name
167 # Add chunk info if available from nested metadata
168 sub_chunk_index = metadata.get("sub_chunk_index")
169 total_sub_chunks = metadata.get("total_sub_chunks")
170 if sub_chunk_index is not None and total_sub_chunks is not None:
171 title += (
172 f" - Chunk {int(sub_chunk_index) + 1}/{total_sub_chunks}"
173 )
174 elif chunk_index is not None and total_chunks is not None:
175 title += f" - Chunk {int(chunk_index) + 1}/{total_chunks}"
176 else:
177 source = info.get("source", "") or metadata.get("source", "")
178 if source:
179 # Extract filename from path-like sources
180 import os
182 title = (
183 os.path.basename(source)
184 if "/" in source or "\\" in source
185 else source
186 )
187 else:
188 title = "Untitled"
190 # Create enhanced metadata dict with rich Qdrant fields
191 enhanced_metadata = {
192 # Core fields from root level of Qdrant payload
193 "source_url": info.get("url", ""),
194 "document_id": info.get("document_id", ""),
195 "created_at": info.get("created_at", ""),
196 "last_modified": info.get("updated_at", ""),
197 "repo_name": info.get("source", ""),
198 # Construct file path from nested metadata
199 "file_path": (
200 metadata.get("file_directory", "").rstrip("/")
201 + "/"
202 + metadata.get("file_name", "")
203 if metadata.get("file_name") and metadata.get("file_directory")
204 else metadata.get("file_name", "")
205 ),
206 }
208 # Add rich metadata from nested metadata object (confirmed structure)
209 rich_metadata_fields = {
210 "original_filename": metadata.get("file_name"),
211 "file_size": metadata.get("file_size"),
212 "original_file_type": metadata.get("file_type")
213 or metadata.get("original_file_type"),
214 "word_count": metadata.get("word_count"),
215 "char_count": metadata.get("character_count")
216 or metadata.get("char_count")
217 or metadata.get("line_count"),
218 "chunk_index": metadata.get("sub_chunk_index", chunk_index),
219 "total_chunks": metadata.get("total_sub_chunks", total_chunks),
220 "chunking_strategy": metadata.get("chunking_strategy")
221 or metadata.get("conversion_method"),
222 "project_id": metadata.get("project_id"),
223 "project_name": metadata.get("project_name"),
224 "project_description": metadata.get("project_description"),
225 "collection_name": metadata.get("collection_name"),
226 # Additional rich fields from actual Qdrant structure
227 "section_title": metadata.get("section_title"),
228 "parent_section": metadata.get("parent_section"),
229 "file_encoding": metadata.get("file_encoding"),
230 "conversion_failed": metadata.get("conversion_failed", False),
231 "is_excel_sheet": metadata.get("is_excel_sheet", False),
232 }
234 # Only add non-None values to avoid conflicts
235 for key, value in rich_metadata_fields.items():
236 if value is not None:
237 enhanced_metadata[key] = value
239 # Merge with flattened metadata components (flattened takes precedence for conflicts)
240 flattened_components = self._flatten_metadata_components(
241 metadata_components
242 )
243 enhanced_metadata.update(flattened_components)
245 # Create HybridSearchResult using factory function
246 hybrid_result = create_hybrid_search_result(
247 score=boosted_score,
248 text=text,
249 source_type=info["source_type"],
250 source_title=title,
251 vector_score=info["vector_score"],
252 keyword_score=info["keyword_score"],
253 **enhanced_metadata,
254 )
256 combined_results.append(hybrid_result)
258 # Sort by combined score
259 combined_results.sort(key=lambda x: x.score, reverse=True)
261 # Apply diversity filtering for exploratory intents
262 if adaptive_config and adaptive_config.diversity_factor > 0.0:
263 diverse_results = self._apply_diversity_filtering(
264 combined_results, adaptive_config.diversity_factor, limit
265 )
266 self.logger.debug(
267 "Applied diversity filtering",
268 original_count=len(combined_results),
269 diverse_count=len(diverse_results),
270 diversity_factor=adaptive_config.diversity_factor,
271 )
272 return diverse_results
274 return combined_results[:limit]
276 def _should_skip_result(
277 self, metadata: dict, result_filters: dict, query_context: dict
278 ) -> bool:
279 """Check if a result should be skipped based on intent-specific filters."""
280 # Content type filtering
281 if "content_type" in result_filters:
282 allowed_content_types = result_filters["content_type"]
283 content_analysis = metadata.get("content_type_analysis", {})
285 # Check if any content type indicators match
286 has_matching_content = False
288 for content_type in allowed_content_types:
289 if content_type == "code" and content_analysis.get("has_code_blocks"):
290 has_matching_content = True
291 break
292 elif content_type == "documentation" and not content_analysis.get(
293 "has_code_blocks"
294 ):
295 has_matching_content = True
296 break
297 elif content_type == "technical" and query_context.get("is_technical"):
298 has_matching_content = True
299 break
300 elif content_type in ["requirements", "business", "strategy"]:
301 # Check if content mentions business terms
302 business_indicators = self._count_business_indicators(metadata)
303 if business_indicators > 0:
304 has_matching_content = True
305 break
306 elif content_type in ["guide", "tutorial", "procedure"]:
307 # Check for procedural content
308 section_type = metadata.get("section_type", "").lower()
309 if any(
310 proc_word in section_type
311 for proc_word in ["step", "guide", "procedure", "tutorial"]
312 ):
313 has_matching_content = True
314 break
316 if not has_matching_content:
317 return True
319 return False
321 def _count_business_indicators(self, metadata: dict) -> int:
322 """Count business-related indicators in metadata."""
323 # Simple heuristic for business content
324 business_terms = [
325 "requirement",
326 "business",
327 "strategy",
328 "goal",
329 "objective",
330 "process",
331 ]
332 title = metadata.get("title", "").lower()
333 content = metadata.get("content", "").lower()
335 count = 0
336 for term in business_terms:
337 if term in title or term in content:
338 count += 1
340 return count
342 def _boost_score_with_metadata(
343 self, base_score: float, metadata: dict, query_context: dict
344 ) -> float:
345 """Boost search scores using metadata context and spaCy semantic analysis."""
346 boosted_score = base_score
347 boost_factor = 0.0
349 # Intent-aware boosting
350 search_intent = query_context.get("search_intent")
351 adaptive_config = query_context.get("adaptive_config")
353 if search_intent and adaptive_config:
354 boost_factor += self._apply_intent_boosting(
355 metadata, search_intent, adaptive_config, query_context
356 )
358 # Content type relevance boosting
359 boost_factor += self._apply_content_type_boosting(metadata, query_context)
361 # Section level relevance boosting
362 boost_factor += self._apply_section_level_boosting(metadata)
364 # Content quality indicators boosting
365 boost_factor += self._apply_content_quality_boosting(metadata)
367 # File conversion boosting
368 boost_factor += self._apply_conversion_boosting(metadata, query_context)
370 # Semantic analysis boosting
371 if self.spacy_analyzer:
372 boost_factor += self._apply_semantic_boosting(metadata, query_context)
373 else:
374 boost_factor += self._apply_fallback_semantic_boosting(
375 metadata, query_context
376 )
378 # Apply boost (cap at reasonable maximum)
379 boost_factor = min(boost_factor, 0.5) # Maximum 50% boost
380 return boosted_score * (1 + boost_factor)
382 def _apply_intent_boosting(
383 self,
384 metadata: dict,
385 search_intent: Any,
386 adaptive_config: Any,
387 query_context: dict,
388 ) -> float:
389 """Apply intent-specific ranking boosts."""
390 boost_factor = 0.0
392 ranking_boosts = adaptive_config.ranking_boosts
393 source_type_preferences = adaptive_config.source_type_preferences
395 # Source type preference boosting
396 source_type = metadata.get("source_type", "")
397 if source_type in source_type_preferences:
398 source_boost = (source_type_preferences[source_type] - 1.0) * 0.2
399 boost_factor += source_boost
401 # Content type boosting from ranking_boosts
402 for boost_key, boost_value in ranking_boosts.items():
403 if boost_key == "section_type" and isinstance(boost_value, dict):
404 section_type = metadata.get("section_type", "")
405 if section_type in boost_value:
406 section_boost = (boost_value[section_type] - 1.0) * 0.15
407 boost_factor += section_boost
408 elif boost_key == "source_type" and isinstance(boost_value, dict):
409 if source_type in boost_value:
410 source_boost = (boost_value[source_type] - 1.0) * 0.15
411 boost_factor += source_boost
412 elif boost_key in metadata and metadata[boost_key]:
413 # Boolean metadata boosting
414 if isinstance(boost_value, int | float):
415 bool_boost = (boost_value - 1.0) * 0.1
416 boost_factor += bool_boost
418 # Intent-specific confidence boosting
419 confidence_boost = (
420 search_intent.confidence * 0.05
421 ) # Up to 5% boost for high confidence
422 boost_factor += confidence_boost
424 return boost_factor
426 def _apply_content_type_boosting(
427 self, metadata: dict, query_context: dict
428 ) -> float:
429 """Apply content type relevance boosting."""
430 boost_factor = 0.0
431 content_analysis = metadata.get("content_type_analysis", {})
433 if query_context.get("prefers_code") and content_analysis.get(
434 "has_code_blocks"
435 ):
436 boost_factor += 0.15
438 if query_context.get("prefers_tables") and content_analysis.get("has_tables"):
439 boost_factor += 0.12
441 if query_context.get("prefers_images") and content_analysis.get("has_images"):
442 boost_factor += 0.10
444 if query_context.get("prefers_docs") and not content_analysis.get(
445 "has_code_blocks"
446 ):
447 boost_factor += 0.08
449 return boost_factor
451 def _apply_section_level_boosting(self, metadata: dict) -> float:
452 """Apply section level relevance boosting."""
453 boost_factor = 0.0
454 section_level = metadata.get("section_level")
456 if section_level is not None:
457 if section_level <= 2: # H1, H2 are more important
458 boost_factor += 0.10
459 elif section_level <= 3: # H3 moderately important
460 boost_factor += 0.05
462 return boost_factor
464 def _apply_content_quality_boosting(self, metadata: dict) -> float:
465 """Apply content quality indicators boosting."""
466 boost_factor = 0.0
467 content_analysis = metadata.get("content_type_analysis", {})
468 word_count = content_analysis.get("word_count") or 0
470 if word_count > 100: # Substantial content
471 boost_factor += 0.05
472 if word_count > 500: # Very detailed content
473 boost_factor += 0.05
475 return boost_factor
477 def _apply_conversion_boosting(self, metadata: dict, query_context: dict) -> float:
478 """Apply file conversion boosting."""
479 boost_factor = 0.0
481 # Converted file boosting (often contains rich content)
482 if metadata.get("is_converted") and metadata.get("original_file_type") in [
483 "docx",
484 "xlsx",
485 "pdf",
486 ]:
487 boost_factor += 0.08
489 # Excel sheet specific boosting for data queries
490 if metadata.get("is_excel_sheet") and any(
491 term in " ".join(query_context.get("keywords", []))
492 for term in ["data", "table", "sheet", "excel", "csv"]
493 ):
494 boost_factor += 0.12
496 return boost_factor
498 def _apply_semantic_boosting(self, metadata: dict, query_context: dict) -> float:
499 """Apply semantic analysis boosting using spaCy."""
500 boost_factor = 0.0
502 if "spacy_analysis" not in query_context:
503 return boost_factor
505 spacy_analysis = query_context["spacy_analysis"]
507 # Enhanced entity matching using spaCy similarity
508 entities = metadata.get("entities", [])
509 if entities and spacy_analysis.entities:
510 max_entity_similarity = 0.0
511 for entity in entities:
512 entity_text = (
513 entity
514 if isinstance(entity, str)
515 else entity.get("text", str(entity))
516 )
517 similarity = self.spacy_analyzer.semantic_similarity_matching(
518 spacy_analysis, entity_text
519 )
520 max_entity_similarity = max(max_entity_similarity, similarity)
522 # Apply semantic entity boost based on similarity
523 if max_entity_similarity > 0.6: # High similarity
524 boost_factor += 0.15
525 elif max_entity_similarity > 0.4: # Medium similarity
526 boost_factor += 0.10
527 elif max_entity_similarity > 0.2: # Low similarity
528 boost_factor += 0.05
530 # Enhanced topic relevance using spaCy
531 topics = metadata.get("topics", [])
532 if topics and spacy_analysis.main_concepts:
533 max_topic_similarity = 0.0
534 for topic in topics:
535 topic_text = (
536 topic if isinstance(topic, str) else topic.get("text", str(topic))
537 )
538 for concept in spacy_analysis.main_concepts:
539 similarity = self.spacy_analyzer.semantic_similarity_matching(
540 spacy_analysis, f"{topic_text} {concept}"
541 )
542 max_topic_similarity = max(max_topic_similarity, similarity)
544 # Apply semantic topic boost
545 if max_topic_similarity > 0.5:
546 boost_factor += 0.12
547 elif max_topic_similarity > 0.3:
548 boost_factor += 0.08
550 return boost_factor
552 def _apply_fallback_semantic_boosting(
553 self, metadata: dict, query_context: dict
554 ) -> float:
555 """Apply fallback semantic boosting without spaCy."""
556 boost_factor = 0.0
558 # Fallback to original entity/topic matching
559 entities = metadata.get("entities", [])
560 if entities:
561 query_keywords = set(query_context.get("keywords", []))
562 entity_texts = set()
563 for entity in entities:
564 if isinstance(entity, str):
565 entity_texts.add(entity.lower())
566 elif isinstance(entity, dict):
567 if "text" in entity:
568 entity_texts.add(str(entity["text"]).lower())
569 elif "entity" in entity:
570 entity_texts.add(str(entity["entity"]).lower())
571 else:
572 entity_texts.add(str(entity).lower())
574 if query_keywords.intersection(entity_texts):
575 boost_factor += 0.10
577 # Original topic relevance
578 topics = metadata.get("topics", [])
579 if topics:
580 query_keywords = set(query_context.get("keywords", []))
581 topic_texts = set()
582 for topic in topics:
583 if isinstance(topic, str):
584 topic_texts.add(topic.lower())
585 elif isinstance(topic, dict):
586 if "text" in topic:
587 topic_texts.add(str(topic["text"]).lower())
588 elif "topic" in topic:
589 topic_texts.add(str(topic["topic"]).lower())
590 else:
591 topic_texts.add(str(topic).lower())
593 if query_keywords.intersection(topic_texts):
594 boost_factor += 0.08
596 return boost_factor
598 def _apply_diversity_filtering(
599 self, results: list[HybridSearchResult], diversity_factor: float, limit: int
600 ) -> list[HybridSearchResult]:
601 """Apply diversity filtering to promote varied result types."""
602 if diversity_factor <= 0.0 or len(results) <= limit:
603 return results[:limit]
605 diverse_results = []
606 used_source_types = set()
607 used_section_types = set()
608 used_sources = set()
610 # First pass: Take top results while ensuring diversity
611 for result in results:
612 if len(diverse_results) >= limit:
613 break
615 # Calculate diversity score
616 diversity_score = 1.0
618 # Penalize duplicate source types (less diversity)
619 source_type = result.source_type
620 if source_type in used_source_types:
621 diversity_score *= 1.0 - diversity_factor * 0.3
623 # Penalize duplicate section types
624 section_type = result.section_type or "unknown"
625 if section_type in used_section_types:
626 diversity_score *= 1.0 - diversity_factor * 0.2
628 # Penalize duplicate sources (same document/file)
629 source_key = f"{result.source_type}:{result.source_title}"
630 if source_key in used_sources:
631 diversity_score *= 1.0 - diversity_factor * 0.4
633 # Apply diversity penalty to score
634 adjusted_score = result.score * diversity_score
636 # Use original score to determine if we should include this result
637 if (
638 len(diverse_results) < limit * 0.7
639 or adjusted_score >= result.score * 0.6
640 ):
641 diverse_results.append(result)
642 used_source_types.add(source_type)
643 used_section_types.add(section_type)
644 used_sources.add(source_key)
646 # Second pass: Fill remaining slots with best remaining results
647 remaining_slots = limit - len(diverse_results)
648 if remaining_slots > 0:
649 remaining_results = [r for r in results if r not in diverse_results]
650 diverse_results.extend(remaining_results[:remaining_slots])
652 return diverse_results[:limit]
654 def _flatten_metadata_components(
655 self, metadata_components: dict[str, Any]
656 ) -> dict[str, Any]:
657 """Flatten metadata components for backward compatibility."""
658 flattened = {}
660 for _component_name, component in metadata_components.items():
661 if component is None:
662 continue
664 if hasattr(component, "__dict__"):
665 # Convert dataclass to dict and flatten
666 component_dict = component.__dict__
667 for key, value in component_dict.items():
668 flattened[key] = value
669 elif isinstance(component, dict):
670 flattened.update(component)
672 return flattened