Coverage for src/qdrant_loader_mcp_server/search/hybrid/components/diversity.py: 84%
38 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3from ...components.search_result_models import HybridSearchResult
6def apply_diversity_filtering(
7 results: list[HybridSearchResult], diversity_factor: float, limit: int
8) -> list[HybridSearchResult]:
9 """Promote variety in the top-N results based on a diversity factor.
11 Penalizes repeated source types, section types, and identical source/title pairs
12 before selecting the final `limit` results, mirroring legacy behavior.
14 Valid range for `diversity_factor` is [0.0, 1.0]. Values outside this range
15 will raise a ValueError.
16 """
17 # Validate inputs
18 if not (0.0 <= diversity_factor <= 1.0):
19 raise ValueError(
20 f"diversity_factor must be within [0.0, 1.0], got {diversity_factor}"
21 )
22 if diversity_factor == 0.0 or len(results) <= limit:
23 return results[:limit]
25 diverse_results: list[HybridSearchResult] = []
26 used_source_types: set[str] = set()
27 used_section_types: set[str] = set()
28 used_sources: set[str] = set()
30 # First pass: Take top results while ensuring diversity
31 for result in results:
32 if len(diverse_results) >= limit:
33 break
35 # Calculate diversity score
36 diversity_score = 1.0
38 # Penalize duplicate source types (less diversity)
39 source_type = result.source_type
40 if source_type in used_source_types:
41 diversity_score *= 1.0 - diversity_factor * 0.3
43 # Penalize duplicate section types
44 section_type = result.section_type or "unknown"
45 if section_type in used_section_types:
46 diversity_score *= 1.0 - diversity_factor * 0.2
48 # Penalize duplicate sources (same document/file)
49 source_key = f"{result.source_type}:{result.source_title}"
50 if source_key in used_sources:
51 diversity_score *= 1.0 - diversity_factor * 0.4
53 # Apply diversity penalty to score
54 adjusted_score = result.score * diversity_score
56 # Use original score to determine if we should include this result
57 if len(diverse_results) < limit * 0.7 or adjusted_score >= result.score * 0.6:
58 diverse_results.append(result)
59 used_source_types.add(source_type)
60 used_section_types.add(section_type)
61 used_sources.add(source_key)
63 # Second pass: Fill remaining slots with best remaining results
64 remaining_slots = limit - len(diverse_results)
65 if remaining_slots > 0:
66 # Build an identifier set for O(1) membership checks while preserving order
67 # Prefer a stable tuple key; fallback to object id if needed
68 def _result_key(r: HybridSearchResult) -> tuple:
69 return (
70 r.document_id,
71 r.source_type,
72 r.source_title,
73 r.section_type,
74 r.section_title,
75 )
77 existing_keys = {_result_key(r) for r in diverse_results}
78 remaining_results = [r for r in results if _result_key(r) not in existing_keys]
79 diverse_results.extend(remaining_results[:remaining_slots])
81 return diverse_results[:limit]