Coverage for src/qdrant_loader_mcp_server/search/components/result_combiner.py: 88%

148 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1"""Result combination and ranking logic for hybrid search.""" 

2 

3from typing import Any 

4 

5from ...utils.logging import LoggingConfig 

6from ..hybrid.components.scoring import HybridScorer, ScoreComponents 

7from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer 

8from .combining import ( 

9 boost_score_with_metadata, 

10 flatten_metadata_components, 

11 should_skip_result, 

12) 

13from .metadata_extractor import MetadataExtractor 

14from .search_result_models import HybridSearchResult, create_hybrid_search_result 

15 

16 

17class ResultCombiner: 

18 """Combines and ranks search results from multiple sources.""" 

19 

20 def __init__( 

21 self, 

22 vector_weight: float = 0.6, 

23 keyword_weight: float = 0.3, 

24 metadata_weight: float = 0.1, 

25 min_score: float = 0.3, 

26 spacy_analyzer: SpaCyQueryAnalyzer | None = None, 

27 ): 

28 """Initialize the result combiner. 

29 

30 Args: 

31 vector_weight: Weight for vector search scores (0-1) 

32 keyword_weight: Weight for keyword search scores (0-1) 

33 metadata_weight: Weight for metadata-based scoring (0-1) 

34 min_score: Minimum combined score threshold 

35 spacy_analyzer: Optional spaCy analyzer for semantic boosting 

36 """ 

37 self.vector_weight = vector_weight 

38 self.keyword_weight = keyword_weight 

39 self.metadata_weight = metadata_weight 

40 self.min_score = min_score 

41 self.spacy_analyzer = spacy_analyzer 

42 self.logger = LoggingConfig.get_logger(__name__) 

43 

44 self.metadata_extractor = MetadataExtractor() 

45 # Internal scorer to centralize weighting logic (behavior-preserving) 

46 self._scorer = HybridScorer( 

47 vector_weight=self.vector_weight, 

48 keyword_weight=self.keyword_weight, 

49 metadata_weight=self.metadata_weight, 

50 ) 

51 

52 async def combine_results( 

53 self, 

54 vector_results: list[dict[str, Any]], 

55 keyword_results: list[dict[str, Any]], 

56 query_context: dict[str, Any], 

57 limit: int, 

58 source_types: list[str] | None = None, 

59 project_ids: list[str] | None = None, 

60 ) -> list[HybridSearchResult]: 

61 """Combine and rerank results from vector and keyword search. 

62 

63 Args: 

64 vector_results: Results from vector search 

65 keyword_results: Results from keyword search 

66 query_context: Query analysis context 

67 limit: Maximum number of results to return 

68 source_types: Optional source type filters 

69 project_ids: Optional project ID filters 

70 

71 Returns: 

72 List of combined and ranked HybridSearchResult objects 

73 """ 

74 combined_dict = {} 

75 

76 # Process vector results 

77 for result in vector_results: 

78 text = result["text"] 

79 if text not in combined_dict: 

80 metadata = result["metadata"] 

81 combined_dict[text] = { 

82 "text": text, 

83 "metadata": metadata, 

84 "source_type": result["source_type"], 

85 "vector_score": result["score"], 

86 "keyword_score": 0.0, 

87 # 🔧 CRITICAL FIX: Include all root-level fields from search services 

88 "title": result.get("title", ""), 

89 "url": result.get("url", ""), 

90 "document_id": result.get("document_id", ""), 

91 "source": result.get("source", ""), 

92 "created_at": result.get("created_at", ""), 

93 "updated_at": result.get("updated_at", ""), 

94 } 

95 

96 # Process keyword results 

97 for result in keyword_results: 

98 text = result["text"] 

99 if text in combined_dict: 

100 combined_dict[text]["keyword_score"] = result["score"] 

101 else: 

102 metadata = result["metadata"] 

103 combined_dict[text] = { 

104 "text": text, 

105 "metadata": metadata, 

106 "source_type": result["source_type"], 

107 "vector_score": 0.0, 

108 "keyword_score": result["score"], 

109 "title": result.get("title", ""), 

110 "url": result.get("url", ""), 

111 "document_id": result.get("document_id", ""), 

112 "source": result.get("source", ""), 

113 "created_at": result.get("created_at", ""), 

114 "updated_at": result.get("updated_at", ""), 

115 } 

116 

117 # Calculate combined scores and create results 

118 combined_results = [] 

119 

120 # Extract intent-specific filtering configuration 

121 search_intent = query_context.get("search_intent") 

122 adaptive_config = query_context.get("adaptive_config") 

123 result_filters = adaptive_config.result_filters if adaptive_config else {} 

124 

125 for text, info in combined_dict.items(): 

126 # Skip if source type doesn't match filter 

127 if source_types and info["source_type"] not in source_types: 

128 continue 

129 

130 metadata = info["metadata"] 

131 

132 # Apply intent-specific result filtering 

133 if search_intent and result_filters: 

134 if should_skip_result(metadata, result_filters, query_context): 

135 continue 

136 

137 combined_score = self._scorer.compute( 

138 ScoreComponents( 

139 vector_score=info["vector_score"], 

140 keyword_score=info["keyword_score"], 

141 metadata_score=0.0, # Preserve legacy behavior (no metadata in base score) 

142 ) 

143 ) 

144 

145 if combined_score >= self.min_score: 

146 # Extract all metadata components 

147 metadata_components = self.metadata_extractor.extract_all_metadata( 

148 metadata 

149 ) 

150 

151 # Boost score with metadata 

152 boosted_score = boost_score_with_metadata( 

153 combined_score, 

154 metadata, 

155 query_context, 

156 spacy_analyzer=self.spacy_analyzer, 

157 ) 

158 

159 # Extract fields from both direct payload fields and nested metadata 

160 # Use direct fields from Qdrant payload when available, fallback to metadata 

161 title = info.get("title", "") or metadata.get("title", "") 

162 

163 # Extract rich metadata from nested metadata object 

164 file_name = metadata.get("file_name", "") 

165 metadata.get("file_type", "") 

166 chunk_index = metadata.get("chunk_index") 

167 total_chunks = metadata.get("total_chunks") 

168 

169 # Enhanced title generation using actual Qdrant structure 

170 # Priority: root title > nested section_title > file_name + chunk info > source 

171 root_title = info.get( 

172 "title", "" 

173 ) # e.g., "Stratégie commerciale MYA.pdf - Chunk 2" 

174 nested_title = metadata.get("title", "") # e.g., "Preamble (Part 2)" 

175 section_title = metadata.get("section_title", "") 

176 

177 if root_title: 

178 title = root_title 

179 elif nested_title: 

180 title = nested_title 

181 elif section_title: 

182 title = section_title 

183 elif file_name: 

184 title = file_name 

185 # Add chunk info if available from nested metadata 

186 sub_chunk_index = metadata.get("sub_chunk_index") 

187 total_sub_chunks = metadata.get("total_sub_chunks") 

188 if sub_chunk_index is not None and total_sub_chunks is not None: 

189 title += ( 

190 f" - Chunk {int(sub_chunk_index) + 1}/{total_sub_chunks}" 

191 ) 

192 elif chunk_index is not None and total_chunks is not None: 

193 title += f" - Chunk {int(chunk_index) + 1}/{total_chunks}" 

194 else: 

195 source = info.get("source", "") or metadata.get("source", "") 

196 if source: 

197 # Extract filename from path-like sources 

198 import os 

199 

200 title = ( 

201 os.path.basename(source) 

202 if "/" in source or "\\" in source 

203 else source 

204 ) 

205 else: 

206 title = "Untitled" 

207 

208 # Create enhanced metadata dict with rich Qdrant fields 

209 enhanced_metadata = { 

210 # Core fields from root level of Qdrant payload 

211 "source_url": info.get("url", ""), 

212 "document_id": info.get("document_id", ""), 

213 "created_at": info.get("created_at", ""), 

214 "last_modified": info.get("updated_at", ""), 

215 "repo_name": info.get("source", ""), 

216 # Project scoping is stored at the root as 'source' 

217 "project_id": info.get("source", ""), 

218 # Construct file path from nested metadata 

219 "file_path": ( 

220 metadata.get("file_directory", "").rstrip("/") 

221 + "/" 

222 + metadata.get("file_name", "") 

223 if metadata.get("file_name") and metadata.get("file_directory") 

224 else metadata.get("file_name", "") 

225 ), 

226 } 

227 

228 # Add rich metadata from nested metadata object (confirmed structure) 

229 rich_metadata_fields = { 

230 "original_filename": metadata.get("file_name"), 

231 "file_size": metadata.get("file_size"), 

232 "original_file_type": metadata.get("file_type") 

233 or metadata.get("original_file_type"), 

234 "word_count": metadata.get("word_count"), 

235 "char_count": metadata.get("character_count") 

236 or metadata.get("char_count") 

237 or metadata.get("line_count"), 

238 "chunk_index": metadata.get("sub_chunk_index", chunk_index), 

239 "total_chunks": metadata.get("total_sub_chunks", total_chunks), 

240 "chunking_strategy": metadata.get("chunking_strategy") 

241 or metadata.get("conversion_method"), 

242 # Project fields now come from root payload; avoid overriding with nested metadata 

243 "collection_name": metadata.get("collection_name"), 

244 # Additional rich fields from actual Qdrant structure 

245 "section_title": metadata.get("section_title"), 

246 "parent_section": metadata.get("parent_section"), 

247 "file_encoding": metadata.get("file_encoding"), 

248 "conversion_failed": metadata.get("conversion_failed", False), 

249 "is_excel_sheet": metadata.get("is_excel_sheet", False), 

250 } 

251 

252 # Only add non-None values to avoid conflicts 

253 for key, value in rich_metadata_fields.items(): 

254 if value is not None: 

255 enhanced_metadata[key] = value 

256 

257 # Merge with flattened metadata components (flattened takes precedence for conflicts) 

258 flattened_components = flatten_metadata_components(metadata_components) 

259 enhanced_metadata.update(flattened_components) 

260 

261 # NOTE: No additional fallback; root payload project_id is authoritative 

262 

263 # Create HybridSearchResult using factory function 

264 hybrid_result = create_hybrid_search_result( 

265 score=boosted_score, 

266 text=text, 

267 source_type=info["source_type"], 

268 source_title=title, 

269 vector_score=info["vector_score"], 

270 keyword_score=info["keyword_score"], 

271 **enhanced_metadata, 

272 ) 

273 

274 combined_results.append(hybrid_result) 

275 

276 # Sort by combined score 

277 combined_results.sort(key=lambda x: x.score, reverse=True) 

278 

279 # Apply diversity filtering for exploratory intents 

280 if adaptive_config and adaptive_config.diversity_factor > 0.0: 

281 try: 

282 from ..hybrid.components.diversity import apply_diversity_filtering 

283 

284 diverse_results = apply_diversity_filtering( 

285 combined_results, adaptive_config.diversity_factor, limit 

286 ) 

287 self.logger.debug( 

288 "Applied diversity filtering", 

289 original_count=len(combined_results), 

290 diverse_count=len(diverse_results), 

291 diversity_factor=adaptive_config.diversity_factor, 

292 ) 

293 return diverse_results 

294 except Exception: 

295 # Fallback to original top-N behavior if import or filtering fails 

296 pass 

297 

298 return combined_results[:limit] 

299 

300 # The following methods are thin wrappers delegating to combining/* modules 

301 # to preserve backward-compatible tests that call private methods directly. 

302 

303 def _should_skip_result( 

304 self, metadata: dict, result_filters: dict, query_context: dict 

305 ) -> bool: 

306 return should_skip_result(metadata, result_filters, query_context) 

307 

308 def _count_business_indicators(self, metadata: dict) -> int: 

309 return __import__( 

310 f"{__package__}.combining.filters", fromlist=["count_business_indicators"] 

311 ).count_business_indicators(metadata) 

312 

313 def _boost_score_with_metadata( 

314 self, base_score: float, metadata: dict, query_context: dict 

315 ) -> float: 

316 return boost_score_with_metadata( 

317 base_score, metadata, query_context, spacy_analyzer=self.spacy_analyzer 

318 ) 

319 

320 def _apply_content_type_boosting( 

321 self, metadata: dict, query_context: dict 

322 ) -> float: 

323 from .combining import apply_content_type_boosting 

324 

325 return apply_content_type_boosting(metadata, query_context) 

326 

327 def _apply_section_level_boosting(self, metadata: dict) -> float: 

328 from .combining import apply_section_level_boosting 

329 

330 return apply_section_level_boosting(metadata) 

331 

332 def _apply_content_quality_boosting(self, metadata: dict) -> float: 

333 from .combining import apply_content_quality_boosting 

334 

335 return apply_content_quality_boosting(metadata) 

336 

337 def _apply_conversion_boosting(self, metadata: dict, query_context: dict) -> float: 

338 from .combining import apply_conversion_boosting 

339 

340 return apply_conversion_boosting(metadata, query_context) 

341 

342 def _apply_semantic_boosting(self, metadata: dict, query_context: dict) -> float: 

343 from .combining import apply_semantic_boosting 

344 

345 return apply_semantic_boosting(metadata, query_context, self.spacy_analyzer) 

346 

347 def _apply_fallback_semantic_boosting( 

348 self, metadata: dict, query_context: dict 

349 ) -> float: 

350 from .combining import apply_fallback_semantic_boosting 

351 

352 return apply_fallback_semantic_boosting(metadata, query_context) 

353 

354 def _apply_diversity_filtering( 

355 self, results: list[HybridSearchResult], diversity_factor: float, limit: int 

356 ) -> list[HybridSearchResult]: 

357 if diversity_factor <= 0.0 or len(results) <= limit: 

358 return results[:limit] 

359 

360 diverse_results = [] 

361 used_source_types = set() 

362 used_section_types = set() 

363 used_sources = set() 

364 

365 for result in results: 

366 if len(diverse_results) >= limit: 

367 break 

368 

369 diversity_score = 1.0 

370 source_type = result.source_type 

371 if source_type in used_source_types: 

372 diversity_score *= 1.0 - diversity_factor * 0.3 

373 

374 section_type = result.section_type or "unknown" 

375 if section_type in used_section_types: 

376 diversity_score *= 1.0 - diversity_factor * 0.2 

377 

378 source_key = f"{result.source_type}:{result.source_title}" 

379 if source_key in used_sources: 

380 diversity_score *= 1.0 - diversity_factor * 0.4 

381 

382 adjusted_score = result.score * diversity_score 

383 

384 if ( 

385 len(diverse_results) < limit * 0.7 

386 or adjusted_score >= result.score * 0.6 

387 ): 

388 diverse_results.append(result) 

389 used_source_types.add(source_type) 

390 used_section_types.add(section_type) 

391 used_sources.add(source_key) 

392 

393 remaining_slots = limit - len(diverse_results) 

394 if remaining_slots > 0: 

395 remaining_results = [r for r in results if r not in diverse_results] 

396 diverse_results.extend(remaining_results[:remaining_slots]) 

397 

398 return diverse_results[:limit] 

399 

400 def _flatten_metadata_components( 

401 self, metadata_components: dict[str, Any] 

402 ) -> dict[str, Any]: 

403 return flatten_metadata_components(metadata_components)