Coverage for src/qdrant_loader_mcp_server/search/components/result_combiner.py: 86%

306 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:20 +0000

1"""Result combination and ranking logic for hybrid search.""" 

2 

3from typing import Any 

4 

5from ...utils.logging import LoggingConfig 

6from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer 

7from .metadata_extractor import MetadataExtractor 

8from .search_result_models import HybridSearchResult, create_hybrid_search_result 

9 

10 

11class ResultCombiner: 

12 """Combines and ranks search results from multiple sources.""" 

13 

14 def __init__( 

15 self, 

16 vector_weight: float = 0.6, 

17 keyword_weight: float = 0.3, 

18 metadata_weight: float = 0.1, 

19 min_score: float = 0.3, 

20 spacy_analyzer: SpaCyQueryAnalyzer | None = None, 

21 ): 

22 """Initialize the result combiner. 

23 

24 Args: 

25 vector_weight: Weight for vector search scores (0-1) 

26 keyword_weight: Weight for keyword search scores (0-1) 

27 metadata_weight: Weight for metadata-based scoring (0-1) 

28 min_score: Minimum combined score threshold 

29 spacy_analyzer: Optional spaCy analyzer for semantic boosting 

30 """ 

31 self.vector_weight = vector_weight 

32 self.keyword_weight = keyword_weight 

33 self.metadata_weight = metadata_weight 

34 self.min_score = min_score 

35 self.spacy_analyzer = spacy_analyzer 

36 self.logger = LoggingConfig.get_logger(__name__) 

37 

38 self.metadata_extractor = MetadataExtractor() 

39 

40 async def combine_results( 

41 self, 

42 vector_results: list[dict[str, Any]], 

43 keyword_results: list[dict[str, Any]], 

44 query_context: dict[str, Any], 

45 limit: int, 

46 source_types: list[str] | None = None, 

47 project_ids: list[str] | None = None, 

48 ) -> list[HybridSearchResult]: 

49 """Combine and rerank results from vector and keyword search. 

50 

51 Args: 

52 vector_results: Results from vector search 

53 keyword_results: Results from keyword search 

54 query_context: Query analysis context 

55 limit: Maximum number of results to return 

56 source_types: Optional source type filters 

57 project_ids: Optional project ID filters 

58 

59 Returns: 

60 List of combined and ranked HybridSearchResult objects 

61 """ 

62 combined_dict = {} 

63 

64 # Process vector results 

65 for result in vector_results: 

66 text = result["text"] 

67 if text not in combined_dict: 

68 metadata = result["metadata"] 

69 combined_dict[text] = { 

70 "text": text, 

71 "metadata": metadata, 

72 "source_type": result["source_type"], 

73 "vector_score": result["score"], 

74 "keyword_score": 0.0, 

75 # 🔧 CRITICAL FIX: Include all root-level fields from search services 

76 "title": result.get("title", ""), 

77 "url": result.get("url", ""), 

78 "document_id": result.get("document_id", ""), 

79 "source": result.get("source", ""), 

80 "created_at": result.get("created_at", ""), 

81 "updated_at": result.get("updated_at", ""), 

82 } 

83 

84 # Process keyword results 

85 for result in keyword_results: 

86 text = result["text"] 

87 if text in combined_dict: 

88 combined_dict[text]["keyword_score"] = result["score"] 

89 else: 

90 metadata = result["metadata"] 

91 combined_dict[text] = { 

92 "text": text, 

93 "metadata": metadata, 

94 "source_type": result["source_type"], 

95 "vector_score": 0.0, 

96 "keyword_score": result["score"], 

97 "title": result.get("title", ""), 

98 "url": result.get("url", ""), 

99 "document_id": result.get("document_id", ""), 

100 "source": result.get("source", ""), 

101 "created_at": result.get("created_at", ""), 

102 "updated_at": result.get("updated_at", ""), 

103 } 

104 

105 # Calculate combined scores and create results 

106 combined_results = [] 

107 

108 # Extract intent-specific filtering configuration 

109 search_intent = query_context.get("search_intent") 

110 adaptive_config = query_context.get("adaptive_config") 

111 result_filters = adaptive_config.result_filters if adaptive_config else {} 

112 

113 for text, info in combined_dict.items(): 

114 # Skip if source type doesn't match filter 

115 if source_types and info["source_type"] not in source_types: 

116 continue 

117 

118 metadata = info["metadata"] 

119 

120 # Apply intent-specific result filtering 

121 if search_intent and result_filters: 

122 if self._should_skip_result(metadata, result_filters, query_context): 

123 continue 

124 

125 combined_score = ( 

126 self.vector_weight * info["vector_score"] 

127 + self.keyword_weight * info["keyword_score"] 

128 ) 

129 

130 if combined_score >= self.min_score: 

131 # Extract all metadata components 

132 metadata_components = self.metadata_extractor.extract_all_metadata( 

133 metadata 

134 ) 

135 

136 # Boost score with metadata 

137 boosted_score = self._boost_score_with_metadata( 

138 combined_score, metadata, query_context 

139 ) 

140 

141 # Extract fields from both direct payload fields and nested metadata 

142 # Use direct fields from Qdrant payload when available, fallback to metadata 

143 title = info.get("title", "") or metadata.get("title", "") 

144 

145 # Extract rich metadata from nested metadata object 

146 file_name = metadata.get("file_name", "") 

147 metadata.get("file_type", "") 

148 chunk_index = metadata.get("chunk_index") 

149 total_chunks = metadata.get("total_chunks") 

150 

151 # Enhanced title generation using actual Qdrant structure 

152 # Priority: root title > nested section_title > file_name + chunk info > source 

153 root_title = info.get( 

154 "title", "" 

155 ) # e.g., "Stratégie commerciale MYA.pdf - Chunk 2" 

156 nested_title = metadata.get("title", "") # e.g., "Preamble (Part 2)" 

157 section_title = metadata.get("section_title", "") 

158 

159 if root_title: 

160 title = root_title 

161 elif nested_title: 

162 title = nested_title 

163 elif section_title: 

164 title = section_title 

165 elif file_name: 

166 title = file_name 

167 # Add chunk info if available from nested metadata 

168 sub_chunk_index = metadata.get("sub_chunk_index") 

169 total_sub_chunks = metadata.get("total_sub_chunks") 

170 if sub_chunk_index is not None and total_sub_chunks is not None: 

171 title += ( 

172 f" - Chunk {int(sub_chunk_index) + 1}/{total_sub_chunks}" 

173 ) 

174 elif chunk_index is not None and total_chunks is not None: 

175 title += f" - Chunk {int(chunk_index) + 1}/{total_chunks}" 

176 else: 

177 source = info.get("source", "") or metadata.get("source", "") 

178 if source: 

179 # Extract filename from path-like sources 

180 import os 

181 

182 title = ( 

183 os.path.basename(source) 

184 if "/" in source or "\\" in source 

185 else source 

186 ) 

187 else: 

188 title = "Untitled" 

189 

190 # Create enhanced metadata dict with rich Qdrant fields 

191 enhanced_metadata = { 

192 # Core fields from root level of Qdrant payload 

193 "source_url": info.get("url", ""), 

194 "document_id": info.get("document_id", ""), 

195 "created_at": info.get("created_at", ""), 

196 "last_modified": info.get("updated_at", ""), 

197 "repo_name": info.get("source", ""), 

198 # Construct file path from nested metadata 

199 "file_path": ( 

200 metadata.get("file_directory", "").rstrip("/") 

201 + "/" 

202 + metadata.get("file_name", "") 

203 if metadata.get("file_name") and metadata.get("file_directory") 

204 else metadata.get("file_name", "") 

205 ), 

206 } 

207 

208 # Add rich metadata from nested metadata object (confirmed structure) 

209 rich_metadata_fields = { 

210 "original_filename": metadata.get("file_name"), 

211 "file_size": metadata.get("file_size"), 

212 "original_file_type": metadata.get("file_type") 

213 or metadata.get("original_file_type"), 

214 "word_count": metadata.get("word_count"), 

215 "char_count": metadata.get("character_count") 

216 or metadata.get("char_count") 

217 or metadata.get("line_count"), 

218 "chunk_index": metadata.get("sub_chunk_index", chunk_index), 

219 "total_chunks": metadata.get("total_sub_chunks", total_chunks), 

220 "chunking_strategy": metadata.get("chunking_strategy") 

221 or metadata.get("conversion_method"), 

222 "project_id": metadata.get("project_id"), 

223 "project_name": metadata.get("project_name"), 

224 "project_description": metadata.get("project_description"), 

225 "collection_name": metadata.get("collection_name"), 

226 # Additional rich fields from actual Qdrant structure 

227 "section_title": metadata.get("section_title"), 

228 "parent_section": metadata.get("parent_section"), 

229 "file_encoding": metadata.get("file_encoding"), 

230 "conversion_failed": metadata.get("conversion_failed", False), 

231 "is_excel_sheet": metadata.get("is_excel_sheet", False), 

232 } 

233 

234 # Only add non-None values to avoid conflicts 

235 for key, value in rich_metadata_fields.items(): 

236 if value is not None: 

237 enhanced_metadata[key] = value 

238 

239 # Merge with flattened metadata components (flattened takes precedence for conflicts) 

240 flattened_components = self._flatten_metadata_components( 

241 metadata_components 

242 ) 

243 enhanced_metadata.update(flattened_components) 

244 

245 # Create HybridSearchResult using factory function 

246 hybrid_result = create_hybrid_search_result( 

247 score=boosted_score, 

248 text=text, 

249 source_type=info["source_type"], 

250 source_title=title, 

251 vector_score=info["vector_score"], 

252 keyword_score=info["keyword_score"], 

253 **enhanced_metadata, 

254 ) 

255 

256 combined_results.append(hybrid_result) 

257 

258 # Sort by combined score 

259 combined_results.sort(key=lambda x: x.score, reverse=True) 

260 

261 # Apply diversity filtering for exploratory intents 

262 if adaptive_config and adaptive_config.diversity_factor > 0.0: 

263 diverse_results = self._apply_diversity_filtering( 

264 combined_results, adaptive_config.diversity_factor, limit 

265 ) 

266 self.logger.debug( 

267 "Applied diversity filtering", 

268 original_count=len(combined_results), 

269 diverse_count=len(diverse_results), 

270 diversity_factor=adaptive_config.diversity_factor, 

271 ) 

272 return diverse_results 

273 

274 return combined_results[:limit] 

275 

276 def _should_skip_result( 

277 self, metadata: dict, result_filters: dict, query_context: dict 

278 ) -> bool: 

279 """Check if a result should be skipped based on intent-specific filters.""" 

280 # Content type filtering 

281 if "content_type" in result_filters: 

282 allowed_content_types = result_filters["content_type"] 

283 content_analysis = metadata.get("content_type_analysis", {}) 

284 

285 # Check if any content type indicators match 

286 has_matching_content = False 

287 

288 for content_type in allowed_content_types: 

289 if content_type == "code" and content_analysis.get("has_code_blocks"): 

290 has_matching_content = True 

291 break 

292 elif content_type == "documentation" and not content_analysis.get( 

293 "has_code_blocks" 

294 ): 

295 has_matching_content = True 

296 break 

297 elif content_type == "technical" and query_context.get("is_technical"): 

298 has_matching_content = True 

299 break 

300 elif content_type in ["requirements", "business", "strategy"]: 

301 # Check if content mentions business terms 

302 business_indicators = self._count_business_indicators(metadata) 

303 if business_indicators > 0: 

304 has_matching_content = True 

305 break 

306 elif content_type in ["guide", "tutorial", "procedure"]: 

307 # Check for procedural content 

308 section_type = metadata.get("section_type", "").lower() 

309 if any( 

310 proc_word in section_type 

311 for proc_word in ["step", "guide", "procedure", "tutorial"] 

312 ): 

313 has_matching_content = True 

314 break 

315 

316 if not has_matching_content: 

317 return True 

318 

319 return False 

320 

321 def _count_business_indicators(self, metadata: dict) -> int: 

322 """Count business-related indicators in metadata.""" 

323 # Simple heuristic for business content 

324 business_terms = [ 

325 "requirement", 

326 "business", 

327 "strategy", 

328 "goal", 

329 "objective", 

330 "process", 

331 ] 

332 title = metadata.get("title", "").lower() 

333 content = metadata.get("content", "").lower() 

334 

335 count = 0 

336 for term in business_terms: 

337 if term in title or term in content: 

338 count += 1 

339 

340 return count 

341 

342 def _boost_score_with_metadata( 

343 self, base_score: float, metadata: dict, query_context: dict 

344 ) -> float: 

345 """Boost search scores using metadata context and spaCy semantic analysis.""" 

346 boosted_score = base_score 

347 boost_factor = 0.0 

348 

349 # Intent-aware boosting 

350 search_intent = query_context.get("search_intent") 

351 adaptive_config = query_context.get("adaptive_config") 

352 

353 if search_intent and adaptive_config: 

354 boost_factor += self._apply_intent_boosting( 

355 metadata, search_intent, adaptive_config, query_context 

356 ) 

357 

358 # Content type relevance boosting 

359 boost_factor += self._apply_content_type_boosting(metadata, query_context) 

360 

361 # Section level relevance boosting 

362 boost_factor += self._apply_section_level_boosting(metadata) 

363 

364 # Content quality indicators boosting 

365 boost_factor += self._apply_content_quality_boosting(metadata) 

366 

367 # File conversion boosting 

368 boost_factor += self._apply_conversion_boosting(metadata, query_context) 

369 

370 # Semantic analysis boosting 

371 if self.spacy_analyzer: 

372 boost_factor += self._apply_semantic_boosting(metadata, query_context) 

373 else: 

374 boost_factor += self._apply_fallback_semantic_boosting( 

375 metadata, query_context 

376 ) 

377 

378 # Apply boost (cap at reasonable maximum) 

379 boost_factor = min(boost_factor, 0.5) # Maximum 50% boost 

380 return boosted_score * (1 + boost_factor) 

381 

382 def _apply_intent_boosting( 

383 self, 

384 metadata: dict, 

385 search_intent: Any, 

386 adaptive_config: Any, 

387 query_context: dict, 

388 ) -> float: 

389 """Apply intent-specific ranking boosts.""" 

390 boost_factor = 0.0 

391 

392 ranking_boosts = adaptive_config.ranking_boosts 

393 source_type_preferences = adaptive_config.source_type_preferences 

394 

395 # Source type preference boosting 

396 source_type = metadata.get("source_type", "") 

397 if source_type in source_type_preferences: 

398 source_boost = (source_type_preferences[source_type] - 1.0) * 0.2 

399 boost_factor += source_boost 

400 

401 # Content type boosting from ranking_boosts 

402 for boost_key, boost_value in ranking_boosts.items(): 

403 if boost_key == "section_type" and isinstance(boost_value, dict): 

404 section_type = metadata.get("section_type", "") 

405 if section_type in boost_value: 

406 section_boost = (boost_value[section_type] - 1.0) * 0.15 

407 boost_factor += section_boost 

408 elif boost_key == "source_type" and isinstance(boost_value, dict): 

409 if source_type in boost_value: 

410 source_boost = (boost_value[source_type] - 1.0) * 0.15 

411 boost_factor += source_boost 

412 elif boost_key in metadata and metadata[boost_key]: 

413 # Boolean metadata boosting 

414 if isinstance(boost_value, int | float): 

415 bool_boost = (boost_value - 1.0) * 0.1 

416 boost_factor += bool_boost 

417 

418 # Intent-specific confidence boosting 

419 confidence_boost = ( 

420 search_intent.confidence * 0.05 

421 ) # Up to 5% boost for high confidence 

422 boost_factor += confidence_boost 

423 

424 return boost_factor 

425 

426 def _apply_content_type_boosting( 

427 self, metadata: dict, query_context: dict 

428 ) -> float: 

429 """Apply content type relevance boosting.""" 

430 boost_factor = 0.0 

431 content_analysis = metadata.get("content_type_analysis", {}) 

432 

433 if query_context.get("prefers_code") and content_analysis.get( 

434 "has_code_blocks" 

435 ): 

436 boost_factor += 0.15 

437 

438 if query_context.get("prefers_tables") and content_analysis.get("has_tables"): 

439 boost_factor += 0.12 

440 

441 if query_context.get("prefers_images") and content_analysis.get("has_images"): 

442 boost_factor += 0.10 

443 

444 if query_context.get("prefers_docs") and not content_analysis.get( 

445 "has_code_blocks" 

446 ): 

447 boost_factor += 0.08 

448 

449 return boost_factor 

450 

451 def _apply_section_level_boosting(self, metadata: dict) -> float: 

452 """Apply section level relevance boosting.""" 

453 boost_factor = 0.0 

454 section_level = metadata.get("section_level") 

455 

456 if section_level is not None: 

457 if section_level <= 2: # H1, H2 are more important 

458 boost_factor += 0.10 

459 elif section_level <= 3: # H3 moderately important 

460 boost_factor += 0.05 

461 

462 return boost_factor 

463 

464 def _apply_content_quality_boosting(self, metadata: dict) -> float: 

465 """Apply content quality indicators boosting.""" 

466 boost_factor = 0.0 

467 content_analysis = metadata.get("content_type_analysis", {}) 

468 word_count = content_analysis.get("word_count") or 0 

469 

470 if word_count > 100: # Substantial content 

471 boost_factor += 0.05 

472 if word_count > 500: # Very detailed content 

473 boost_factor += 0.05 

474 

475 return boost_factor 

476 

477 def _apply_conversion_boosting(self, metadata: dict, query_context: dict) -> float: 

478 """Apply file conversion boosting.""" 

479 boost_factor = 0.0 

480 

481 # Converted file boosting (often contains rich content) 

482 if metadata.get("is_converted") and metadata.get("original_file_type") in [ 

483 "docx", 

484 "xlsx", 

485 "pdf", 

486 ]: 

487 boost_factor += 0.08 

488 

489 # Excel sheet specific boosting for data queries 

490 if metadata.get("is_excel_sheet") and any( 

491 term in " ".join(query_context.get("keywords", [])) 

492 for term in ["data", "table", "sheet", "excel", "csv"] 

493 ): 

494 boost_factor += 0.12 

495 

496 return boost_factor 

497 

498 def _apply_semantic_boosting(self, metadata: dict, query_context: dict) -> float: 

499 """Apply semantic analysis boosting using spaCy.""" 

500 boost_factor = 0.0 

501 

502 if "spacy_analysis" not in query_context: 

503 return boost_factor 

504 

505 spacy_analysis = query_context["spacy_analysis"] 

506 

507 # Enhanced entity matching using spaCy similarity 

508 entities = metadata.get("entities", []) 

509 if entities and spacy_analysis.entities: 

510 max_entity_similarity = 0.0 

511 for entity in entities: 

512 entity_text = ( 

513 entity 

514 if isinstance(entity, str) 

515 else entity.get("text", str(entity)) 

516 ) 

517 similarity = self.spacy_analyzer.semantic_similarity_matching( 

518 spacy_analysis, entity_text 

519 ) 

520 max_entity_similarity = max(max_entity_similarity, similarity) 

521 

522 # Apply semantic entity boost based on similarity 

523 if max_entity_similarity > 0.6: # High similarity 

524 boost_factor += 0.15 

525 elif max_entity_similarity > 0.4: # Medium similarity 

526 boost_factor += 0.10 

527 elif max_entity_similarity > 0.2: # Low similarity 

528 boost_factor += 0.05 

529 

530 # Enhanced topic relevance using spaCy 

531 topics = metadata.get("topics", []) 

532 if topics and spacy_analysis.main_concepts: 

533 max_topic_similarity = 0.0 

534 for topic in topics: 

535 topic_text = ( 

536 topic if isinstance(topic, str) else topic.get("text", str(topic)) 

537 ) 

538 for concept in spacy_analysis.main_concepts: 

539 similarity = self.spacy_analyzer.semantic_similarity_matching( 

540 spacy_analysis, f"{topic_text} {concept}" 

541 ) 

542 max_topic_similarity = max(max_topic_similarity, similarity) 

543 

544 # Apply semantic topic boost 

545 if max_topic_similarity > 0.5: 

546 boost_factor += 0.12 

547 elif max_topic_similarity > 0.3: 

548 boost_factor += 0.08 

549 

550 return boost_factor 

551 

552 def _apply_fallback_semantic_boosting( 

553 self, metadata: dict, query_context: dict 

554 ) -> float: 

555 """Apply fallback semantic boosting without spaCy.""" 

556 boost_factor = 0.0 

557 

558 # Fallback to original entity/topic matching 

559 entities = metadata.get("entities", []) 

560 if entities: 

561 query_keywords = set(query_context.get("keywords", [])) 

562 entity_texts = set() 

563 for entity in entities: 

564 if isinstance(entity, str): 

565 entity_texts.add(entity.lower()) 

566 elif isinstance(entity, dict): 

567 if "text" in entity: 

568 entity_texts.add(str(entity["text"]).lower()) 

569 elif "entity" in entity: 

570 entity_texts.add(str(entity["entity"]).lower()) 

571 else: 

572 entity_texts.add(str(entity).lower()) 

573 

574 if query_keywords.intersection(entity_texts): 

575 boost_factor += 0.10 

576 

577 # Original topic relevance 

578 topics = metadata.get("topics", []) 

579 if topics: 

580 query_keywords = set(query_context.get("keywords", [])) 

581 topic_texts = set() 

582 for topic in topics: 

583 if isinstance(topic, str): 

584 topic_texts.add(topic.lower()) 

585 elif isinstance(topic, dict): 

586 if "text" in topic: 

587 topic_texts.add(str(topic["text"]).lower()) 

588 elif "topic" in topic: 

589 topic_texts.add(str(topic["topic"]).lower()) 

590 else: 

591 topic_texts.add(str(topic).lower()) 

592 

593 if query_keywords.intersection(topic_texts): 

594 boost_factor += 0.08 

595 

596 return boost_factor 

597 

598 def _apply_diversity_filtering( 

599 self, results: list[HybridSearchResult], diversity_factor: float, limit: int 

600 ) -> list[HybridSearchResult]: 

601 """Apply diversity filtering to promote varied result types.""" 

602 if diversity_factor <= 0.0 or len(results) <= limit: 

603 return results[:limit] 

604 

605 diverse_results = [] 

606 used_source_types = set() 

607 used_section_types = set() 

608 used_sources = set() 

609 

610 # First pass: Take top results while ensuring diversity 

611 for result in results: 

612 if len(diverse_results) >= limit: 

613 break 

614 

615 # Calculate diversity score 

616 diversity_score = 1.0 

617 

618 # Penalize duplicate source types (less diversity) 

619 source_type = result.source_type 

620 if source_type in used_source_types: 

621 diversity_score *= 1.0 - diversity_factor * 0.3 

622 

623 # Penalize duplicate section types 

624 section_type = result.section_type or "unknown" 

625 if section_type in used_section_types: 

626 diversity_score *= 1.0 - diversity_factor * 0.2 

627 

628 # Penalize duplicate sources (same document/file) 

629 source_key = f"{result.source_type}:{result.source_title}" 

630 if source_key in used_sources: 

631 diversity_score *= 1.0 - diversity_factor * 0.4 

632 

633 # Apply diversity penalty to score 

634 adjusted_score = result.score * diversity_score 

635 

636 # Use original score to determine if we should include this result 

637 if ( 

638 len(diverse_results) < limit * 0.7 

639 or adjusted_score >= result.score * 0.6 

640 ): 

641 diverse_results.append(result) 

642 used_source_types.add(source_type) 

643 used_section_types.add(section_type) 

644 used_sources.add(source_key) 

645 

646 # Second pass: Fill remaining slots with best remaining results 

647 remaining_slots = limit - len(diverse_results) 

648 if remaining_slots > 0: 

649 remaining_results = [r for r in results if r not in diverse_results] 

650 diverse_results.extend(remaining_results[:remaining_slots]) 

651 

652 return diverse_results[:limit] 

653 

654 def _flatten_metadata_components( 

655 self, metadata_components: dict[str, Any] 

656 ) -> dict[str, Any]: 

657 """Flatten metadata components for backward compatibility.""" 

658 flattened = {} 

659 

660 for _component_name, component in metadata_components.items(): 

661 if component is None: 

662 continue 

663 

664 if hasattr(component, "__dict__"): 

665 # Convert dataclass to dict and flatten 

666 component_dict = component.__dict__ 

667 for key, value in component_dict.items(): 

668 flattened[key] = value 

669 elif isinstance(component, dict): 

670 flattened.update(component) 

671 

672 return flattened