Coverage for src / qdrant_loader_mcp_server / search / components / result_combiner.py: 89%

175 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:41 +0000

1"""Result combination and ranking logic for hybrid search.""" 

2 

3from typing import Any 

4 

5from ...utils.logging import LoggingConfig 

6from ..hybrid.components.scoring import HybridScorer 

7from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer 

8from .combining import ( 

9 boost_score_with_metadata, 

10 flatten_metadata_components, 

11 should_skip_result, 

12) 

13from .metadata_extractor import MetadataExtractor 

14from .search_result_models import HybridSearchResult, create_hybrid_search_result 

15 

16WRRF_CONSTANT = 60 

17 

18 

19class ResultCombiner: 

20 """Combines and ranks search results from multiple sources.""" 

21 

22 def __init__( 

23 self, 

24 vector_weight: float = 0.6, 

25 keyword_weight: float = 0.3, 

26 metadata_weight: float = 0.1, 

27 min_score: float = 0.3, 

28 spacy_analyzer: SpaCyQueryAnalyzer | None = None, 

29 ): 

30 """Initialize the result combiner. 

31 

32 Args: 

33 vector_weight: Weight for vector search scores (0-1) 

34 keyword_weight: Weight for keyword search scores (0-1) 

35 metadata_weight: Weight for metadata-based scoring (0-1) 

36 min_score: Minimum combined score threshold 

37 spacy_analyzer: Optional spaCy analyzer for semantic boosting 

38 """ 

39 self.vector_weight = vector_weight 

40 self.keyword_weight = keyword_weight 

41 self.metadata_weight = metadata_weight 

42 self.min_score = min_score 

43 self.spacy_analyzer = spacy_analyzer 

44 self.logger = LoggingConfig.get_logger(__name__) 

45 

46 self.metadata_extractor = MetadataExtractor() 

47 # Internal scorer to centralize weighting logic (behavior-preserving) 

48 self._scorer = HybridScorer( 

49 vector_weight=self.vector_weight, 

50 keyword_weight=self.keyword_weight, 

51 metadata_weight=self.metadata_weight, 

52 ) 

53 

54 def merge_results_with_wrrf( 

55 self, 

56 vector_results: list[dict[str, Any]], 

57 keyword_results: list[dict[str, Any]], 

58 ) -> dict: 

59 """ 

60 Merge and rerank results using Weighted Recipocal Rerank Fusion from vector (dense) and keyword (sparse) search. 

61 """ 

62 combined_dict = {} 

63 # Process vector results 

64 for rank, result in enumerate(vector_results, 1): 

65 text = result["text"] 

66 if text not in combined_dict: 

67 metadata = result["metadata"] 

68 combined_dict[text] = { 

69 "text": text, 

70 "metadata": metadata, 

71 "source_type": result["source_type"], 

72 "vector_score": result["score"], 

73 "keyword_score": 0.0, 

74 # 🔧 CRITICAL FIX: Include all root-level fields from search services 

75 "title": result.get("title", ""), 

76 "url": result.get("url", ""), 

77 "document_id": result.get("document_id", ""), 

78 "source": result.get("source", ""), 

79 "created_at": result.get("created_at", ""), 

80 "updated_at": result.get("updated_at", ""), 

81 "contextual_content": result.get("contextual_content", ""), 

82 "wrrf_score": self._scorer.vector_weight 

83 * (1 / (rank + WRRF_CONSTANT)), 

84 } 

85 

86 # Process keyword results 

87 for rank, result in enumerate(keyword_results, 1): 

88 text = result["text"] 

89 if text in combined_dict: 

90 combined_dict[text]["keyword_score"] = result["score"] 

91 # Backfill contextual_content if vector entry was empty 

92 if not combined_dict[text].get("contextual_content") and result.get( 

93 "contextual_content" 

94 ): 

95 combined_dict[text]["contextual_content"] = result[ 

96 "contextual_content" 

97 ] 

98 # Sum 

99 combined_dict[text]["wrrf_score"] += self._scorer.keyword_weight * ( 

100 1 / (rank + WRRF_CONSTANT) 

101 ) 

102 else: 

103 metadata = result["metadata"] 

104 combined_dict[text] = { 

105 "text": text, 

106 "metadata": metadata, 

107 "source_type": result["source_type"], 

108 "vector_score": 0.0, 

109 "keyword_score": result["score"], 

110 "title": result.get("title", ""), 

111 "url": result.get("url", ""), 

112 "document_id": result.get("document_id", ""), 

113 "source": result.get("source", ""), 

114 "created_at": result.get("created_at", ""), 

115 "updated_at": result.get("updated_at", ""), 

116 "contextual_content": result.get("contextual_content", ""), 

117 "wrrf_score": self._scorer.keyword_weight 

118 * (1 / (rank + WRRF_CONSTANT)), 

119 } 

120 return combined_dict 

121 

122 def extract_chunk_title( 

123 self, info: dict, metadata: dict, chunk_index: int, total_chunks: int 

124 ) -> str: 

125 # Extract fields from both direct payload fields and nested metadata 

126 # Use direct fields from Qdrant payload when available, fallback to metadata 

127 title = info.get("title", "") or metadata.get("title", "") 

128 

129 # Extract rich metadata from nested metadata object 

130 file_name = metadata.get("file_name", "") 

131 metadata.get("file_type", "") 

132 

133 # Enhanced title generation using actual Qdrant structure 

134 # Priority: root title > nested section_title > file_name + chunk info > source 

135 root_title = info.get( 

136 "title", "" 

137 ) # e.g., "Stratégie commerciale MYA.pdf - Chunk 2" 

138 nested_title = metadata.get("title", "") # e.g., "Preamble (Part 2)" 

139 section_title = metadata.get("section_title", "") 

140 

141 if root_title: 

142 title = root_title 

143 elif nested_title: 

144 title = nested_title 

145 elif section_title: 

146 title = section_title 

147 elif file_name: 

148 title = file_name 

149 # Add chunk info if available from nested metadata 

150 sub_chunk_index = metadata.get("sub_chunk_index") 

151 total_sub_chunks = metadata.get("total_sub_chunks") 

152 if sub_chunk_index is not None and total_sub_chunks is not None: 

153 title += f" - Chunk {int(sub_chunk_index) + 1}/{total_sub_chunks}" 

154 elif chunk_index is not None and total_chunks is not None: 

155 title += f" - Chunk {int(chunk_index) + 1}/{total_chunks}" 

156 else: 

157 source = info.get("source", "") or metadata.get("source", "") 

158 if source: 

159 # Extract filename from path-like sources 

160 import os 

161 

162 title = ( 

163 os.path.basename(source) 

164 if "/" in source or "\\" in source 

165 else source 

166 ) 

167 else: 

168 title = "Untitled" 

169 return title 

170 

171 def merge_rich_and_enhanced_metadata( 

172 self, 

173 info: dict, 

174 metadata: dict, 

175 metadata_components: dict, 

176 chunk_index: int, 

177 total_chunks: int, 

178 ) -> dict: 

179 # Create enhanced metadata dict with rich Qdrant fields 

180 enhanced_metadata = { 

181 # Core fields from root level of Qdrant payload 

182 "source_url": info.get("url", ""), 

183 "document_id": info.get("document_id", ""), 

184 "created_at": info.get("created_at", ""), 

185 "last_modified": info.get("updated_at", ""), 

186 "repo_name": info.get("source", ""), 

187 # Project scoping is stored at the root as 'source' 

188 "project_id": info.get("source", ""), 

189 # Construct file path from nested metadata 

190 "file_path": ( 

191 metadata.get("file_directory", "").rstrip("/") 

192 + "/" 

193 + metadata.get("file_name", "") 

194 if metadata.get("file_name") and metadata.get("file_directory") 

195 else metadata.get("file_name", "") 

196 ), 

197 } 

198 

199 # Add rich metadata from nested metadata object (confirmed structure) 

200 rich_metadata_fields = { 

201 "original_filename": metadata.get("file_name"), 

202 "file_size": metadata.get("file_size"), 

203 "original_file_type": metadata.get("file_type") 

204 or metadata.get("original_file_type"), 

205 "word_count": metadata.get("word_count"), 

206 "char_count": metadata.get("character_count") 

207 or metadata.get("char_count") 

208 or metadata.get("line_count"), 

209 "chunk_index": metadata.get("sub_chunk_index", chunk_index), 

210 "total_chunks": metadata.get("total_sub_chunks", total_chunks), 

211 "chunking_strategy": metadata.get("chunking_strategy") 

212 or metadata.get("conversion_method"), 

213 # Project fields now come from root payload; avoid overriding with nested metadata 

214 "collection_name": metadata.get("collection_name"), 

215 # Additional rich fields from actual Qdrant structure 

216 "section_title": metadata.get("section_title"), 

217 "parent_section": metadata.get("parent_section"), 

218 "file_encoding": metadata.get("file_encoding"), 

219 "conversion_failed": metadata.get("conversion_failed", False), 

220 "is_excel_sheet": metadata.get("is_excel_sheet", False), 

221 } 

222 

223 # Only add non-None values to avoid conflicts 

224 for key, value in rich_metadata_fields.items(): 

225 if value is not None: 

226 enhanced_metadata[key] = value 

227 

228 # Merge with flattened metadata components (flattened takes precedence for conflicts) 

229 flattened_components = flatten_metadata_components(metadata_components) 

230 enhanced_metadata.update(flattened_components) 

231 

232 return enhanced_metadata 

233 

234 def is_result_filtered(self, use_wrrf: bool, wrrf_score: float, chunk_score: float): 

235 # Scale minimum threshold 

236 wrrf_min_score = self.min_score * ( 

237 (self._scorer.vector_weight + self._scorer.keyword_weight) 

238 / (WRRF_CONSTANT + 1) 

239 ) 

240 # Filter low wrrf 

241 if use_wrrf and wrrf_score <= wrrf_min_score: 

242 return True 

243 

244 # Fallback to standard filter 

245 if not use_wrrf and chunk_score <= self.min_score: 

246 return True 

247 return False 

248 

249 async def combine_results( 

250 self, 

251 vector_results: list[dict[str, Any]], 

252 keyword_results: list[dict[str, Any]], 

253 query_context: dict[str, Any], 

254 limit: int, 

255 source_types: list[str] | None = None, 

256 project_ids: list[str] | None = None, 

257 ) -> list[HybridSearchResult]: 

258 """Combine and rerank results using Weighted Recipocal Rerank Fusion from vector (dense) and keyword (sparse) search. 

259 

260 Args: 

261 vector_results: Results from vector search 

262 keyword_results: Results from keyword search 

263 query_context: Query analysis context 

264 limit: Maximum number of results to return 

265 source_types: Optional source type filters 

266 project_ids: Optional project ID filters 

267 

268 Returns: 

269 List of combined and ranked HybridSearchResult objects 

270 """ 

271 combined_dict = self.merge_results_with_wrrf( 

272 vector_results=vector_results, keyword_results=keyword_results 

273 ) 

274 

275 # Calculate combined scores and create results 

276 combined_results = [] 

277 

278 # Extract intent-specific filtering configuration 

279 search_intent = query_context.get("search_intent") 

280 adaptive_config = query_context.get("adaptive_config") 

281 result_filters = adaptive_config.result_filters if adaptive_config else {} 

282 

283 # Naive WRRF trigger 

284 use_wrrf = len(combined_dict.keys()) >= 10 

285 

286 for text, info in combined_dict.items(): 

287 # Skip if source type doesn't match filter 

288 if source_types and info["source_type"] not in source_types: 

289 continue 

290 # Apply intent-specific result filtering 

291 metadata = info["metadata"] 

292 if search_intent and result_filters: 

293 if should_skip_result(metadata, result_filters, query_context): 

294 continue 

295 

296 wrrf_score = info["wrrf_score"] 

297 # Fallback to standard weighting scoring 

298 chunk_score = (info["keyword_score"] * self._scorer.keyword_weight) + ( 

299 info["vector_score"] * self._scorer.vector_weight 

300 ) 

301 

302 # Filter based on WRRF or standard scores and weighting 

303 if self.is_result_filtered(use_wrrf, wrrf_score, chunk_score): 

304 continue 

305 

306 score = wrrf_score if use_wrrf else chunk_score 

307 

308 # Extract all metadata components 

309 metadata_components = self.metadata_extractor.extract_all_metadata(metadata) 

310 

311 # TODO: Evaluate metadata score boosting with WRRF and in general - Boost score with metadata 

312 boosted_score = boost_score_with_metadata( 

313 score, 

314 metadata, 

315 query_context, 

316 spacy_analyzer=self.spacy_analyzer, 

317 ) 

318 chunk_index = metadata.get("chunk_index") 

319 total_chunks = metadata.get("total_chunks") 

320 

321 title = self.extract_chunk_title( 

322 info=info, 

323 metadata=metadata, 

324 chunk_index=chunk_index, 

325 total_chunks=total_chunks, 

326 ) 

327 enhanced_metadata = self.merge_rich_and_enhanced_metadata( 

328 info=info, 

329 metadata=metadata, 

330 metadata_components=metadata_components, 

331 chunk_index=chunk_index, 

332 total_chunks=total_chunks, 

333 ) 

334 contextual_content = info.get("contextual_content") 

335 if contextual_content: 

336 enhanced_metadata["contextual_content"] = contextual_content 

337 # NOTE: No additional fallback; root payload project_id is authoritative 

338 

339 # Create HybridSearchResult using factory function 

340 hybrid_result = create_hybrid_search_result( 

341 score=boosted_score, 

342 text=text, 

343 source_type=info["source_type"], 

344 source_title=title, 

345 vector_score=info["vector_score"], 

346 keyword_score=info["keyword_score"], 

347 **enhanced_metadata, 

348 ) 

349 

350 combined_results.append(hybrid_result) 

351 

352 # Sort by combined score 

353 combined_results.sort(key=lambda x: x.score, reverse=True) 

354 # Apply diversity filtering for exploratory intents 

355 if adaptive_config and adaptive_config.diversity_factor > 0.0: 

356 try: 

357 from ..hybrid.components.diversity import apply_diversity_filtering 

358 

359 diverse_results = apply_diversity_filtering( 

360 combined_results, adaptive_config.diversity_factor, limit 

361 ) 

362 self.logger.debug( 

363 "Applied diversity filtering", 

364 original_count=len(combined_results), 

365 diverse_count=len(diverse_results), 

366 diversity_factor=adaptive_config.diversity_factor, 

367 ) 

368 return diverse_results 

369 except Exception: 

370 # Fallback to original top-N behavior if import or filtering fails 

371 pass 

372 

373 return combined_results[:limit] 

374 

375 # The following methods are thin wrappers delegating to combining/* modules 

376 # to preserve backward-compatible tests that call private methods directly. 

377 

378 def _should_skip_result( 

379 self, metadata: dict, result_filters: dict, query_context: dict 

380 ) -> bool: 

381 return should_skip_result(metadata, result_filters, query_context) 

382 

383 def _count_business_indicators(self, metadata: dict) -> int: 

384 return __import__( 

385 f"{__package__}.combining.filters", fromlist=["count_business_indicators"] 

386 ).count_business_indicators(metadata) 

387 

388 def _boost_score_with_metadata( 

389 self, base_score: float, metadata: dict, query_context: dict 

390 ) -> float: 

391 return boost_score_with_metadata( 

392 base_score, metadata, query_context, spacy_analyzer=self.spacy_analyzer 

393 ) 

394 

395 def _apply_content_type_boosting( 

396 self, metadata: dict, query_context: dict 

397 ) -> float: 

398 from .combining import apply_content_type_boosting 

399 

400 return apply_content_type_boosting(metadata, query_context) 

401 

402 def _apply_section_level_boosting(self, metadata: dict) -> float: 

403 from .combining import apply_section_level_boosting 

404 

405 return apply_section_level_boosting(metadata) 

406 

407 def _apply_content_quality_boosting(self, metadata: dict) -> float: 

408 from .combining import apply_content_quality_boosting 

409 

410 return apply_content_quality_boosting(metadata) 

411 

412 def _apply_conversion_boosting(self, metadata: dict, query_context: dict) -> float: 

413 from .combining import apply_conversion_boosting 

414 

415 return apply_conversion_boosting(metadata, query_context) 

416 

417 def _apply_semantic_boosting(self, metadata: dict, query_context: dict) -> float: 

418 from .combining import apply_semantic_boosting 

419 

420 return apply_semantic_boosting(metadata, query_context, self.spacy_analyzer) 

421 

422 def _apply_fallback_semantic_boosting( 

423 self, metadata: dict, query_context: dict 

424 ) -> float: 

425 from .combining import apply_fallback_semantic_boosting 

426 

427 return apply_fallback_semantic_boosting(metadata, query_context) 

428 

429 def _apply_diversity_filtering( 

430 self, results: list[HybridSearchResult], diversity_factor: float, limit: int 

431 ) -> list[HybridSearchResult]: 

432 if diversity_factor <= 0.0 or len(results) <= limit: 

433 return results[:limit] 

434 

435 diverse_results = [] 

436 used_source_types = set() 

437 used_section_types = set() 

438 used_sources = set() 

439 

440 for result in results: 

441 if len(diverse_results) >= limit: 

442 break 

443 

444 diversity_score = 1.0 

445 source_type = result.source_type 

446 if source_type in used_source_types: 

447 diversity_score *= 1.0 - diversity_factor * 0.3 

448 

449 section_type = result.section_type or "unknown" 

450 if section_type in used_section_types: 

451 diversity_score *= 1.0 - diversity_factor * 0.2 

452 

453 source_key = f"{result.source_type}:{result.source_title}" 

454 if source_key in used_sources: 

455 diversity_score *= 1.0 - diversity_factor * 0.4 

456 

457 adjusted_score = result.score * diversity_score 

458 

459 if ( 

460 len(diverse_results) < limit * 0.7 

461 or adjusted_score >= result.score * 0.6 

462 ): 

463 diverse_results.append(result) 

464 used_source_types.add(source_type) 

465 used_section_types.add(section_type) 

466 used_sources.add(source_key) 

467 

468 remaining_slots = limit - len(diverse_results) 

469 if remaining_slots > 0: 

470 remaining_results = [r for r in results if r not in diverse_results] 

471 diverse_results.extend(remaining_results[:remaining_slots]) 

472 

473 return diverse_results[:limit] 

474 

475 def _flatten_metadata_components( 

476 self, metadata_components: dict[str, Any] 

477 ) -> dict[str, Any]: 

478 return flatten_metadata_components(metadata_components)