Coverage for src/qdrant_loader_mcp_server/search/engine/strategies.py: 80%
79 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Search Strategy Selection.
4This module implements intelligent strategy selection for document clustering
5and analysis based on document characteristics and content patterns.
6"""
8from typing import TYPE_CHECKING
10if TYPE_CHECKING:
11 from .core import SearchEngine
13from ...utils.logging import LoggingConfig
14from ..enhanced.cross_document_intelligence import ClusteringStrategy
17class StrategySelector:
18 """Handles intelligent strategy selection for search operations."""
20 def __init__(self, engine: "SearchEngine"):
21 """Initialize with search engine reference."""
22 self.engine = engine
23 self.logger = LoggingConfig.get_logger(__name__)
25 def select_optimal_strategy(self, documents: list) -> ClusteringStrategy:
26 """Analyze document characteristics and select optimal clustering strategy."""
27 if not documents:
28 return ClusteringStrategy.MIXED_FEATURES
30 # Analyze document characteristics
31 analysis = self.analyze_document_characteristics(documents)
33 # Strategy scoring system
34 strategy_scores: dict[ClusteringStrategy, int] = {
35 ClusteringStrategy.ENTITY_BASED: 0,
36 ClusteringStrategy.TOPIC_BASED: 0,
37 ClusteringStrategy.PROJECT_BASED: 0,
38 ClusteringStrategy.HIERARCHICAL: 0,
39 ClusteringStrategy.MIXED_FEATURES: 0,
40 }
42 # Score based on entity richness
43 if analysis["entity_richness"] > 0.7:
44 strategy_scores[ClusteringStrategy.ENTITY_BASED] += 3
45 strategy_scores[ClusteringStrategy.MIXED_FEATURES] += 1
46 elif analysis["entity_richness"] > 0.4:
47 strategy_scores[ClusteringStrategy.ENTITY_BASED] += 1
48 strategy_scores[ClusteringStrategy.MIXED_FEATURES] += 2
50 # Score based on topic clarity
51 if analysis["topic_clarity"] > 0.7:
52 strategy_scores[ClusteringStrategy.TOPIC_BASED] += 3
53 strategy_scores[ClusteringStrategy.MIXED_FEATURES] += 1
54 elif analysis["topic_clarity"] > 0.4:
55 strategy_scores[ClusteringStrategy.TOPIC_BASED] += 1
56 strategy_scores[ClusteringStrategy.MIXED_FEATURES] += 2
58 # Score based on project distribution
59 if analysis["project_distribution"] > 0.6:
60 strategy_scores[ClusteringStrategy.PROJECT_BASED] += 3
61 elif analysis["project_distribution"] > 0.3:
62 strategy_scores[ClusteringStrategy.PROJECT_BASED] += 1
63 strategy_scores[ClusteringStrategy.MIXED_FEATURES] += 1
65 # Score based on hierarchical structure
66 if analysis["hierarchical_structure"] > 0.6:
67 strategy_scores[ClusteringStrategy.HIERARCHICAL] += 3
68 elif analysis["hierarchical_structure"] > 0.3:
69 strategy_scores[ClusteringStrategy.HIERARCHICAL] += 1
70 strategy_scores[ClusteringStrategy.MIXED_FEATURES] += 1
72 # Score based on source diversity
73 if analysis["source_diversity"] > 0.7:
74 strategy_scores[ClusteringStrategy.MIXED_FEATURES] += 2
76 # Bonus for mixed_features as a safe default
77 strategy_scores[ClusteringStrategy.MIXED_FEATURES] += 1
79 # Select strategy with highest score
80 best_strategy = max(strategy_scores.items(), key=lambda x: x[1])[0]
82 self.logger.info(f"Strategy analysis: {analysis}")
83 self.logger.info(f"Strategy scores: {strategy_scores}")
84 self.logger.info(
85 f"Selected strategy: {getattr(best_strategy, 'value', str(best_strategy))}"
86 )
87 # best_strategy is already a ClusteringStrategy from the mapping above
88 return best_strategy
90 def analyze_document_characteristics(self, documents: list) -> dict[str, float]:
91 """Analyze characteristics of documents to inform strategy selection."""
92 if not documents:
93 return {
94 "entity_richness": 0,
95 "topic_clarity": 0,
96 "project_distribution": 0,
97 "hierarchical_structure": 0,
98 "source_diversity": 0,
99 }
101 # Entity analysis
102 entity_counts = []
103 for doc in documents:
104 entities = getattr(doc, "entities", []) or []
105 entity_count = len(entities)
106 entity_counts.append(entity_count)
108 avg_entities = sum(entity_counts) / len(entity_counts) if entity_counts else 0
109 entity_richness = min(
110 1.0, avg_entities / 5.0
111 ) # Normalize to 0-1, assuming 5+ entities is rich
113 # Topic analysis
114 topic_counts = []
115 for doc in documents:
116 topics = getattr(doc, "topics", []) or []
117 topic_count = len(topics)
118 topic_counts.append(topic_count)
120 avg_topics = sum(topic_counts) / len(topic_counts) if topic_counts else 0
121 topic_clarity = min(
122 1.0, avg_topics / 3.0
123 ) # Normalize to 0-1, assuming 3+ topics is clear
125 # Project distribution analysis
126 project_ids = [getattr(doc, "project_id", None) for doc in documents]
127 unique_projects = len({p for p in project_ids if p})
128 total_docs = len(documents)
130 # Fraction of documents from unique projects; guard division by zero
131 if total_docs > 0:
132 project_distribution = min(1.0, unique_projects / total_docs)
133 else:
134 project_distribution = 0
136 # Hierarchical structure analysis
137 breadcrumb_counts = []
138 for doc in documents:
139 breadcrumb = getattr(doc, "breadcrumb_text", "")
140 if breadcrumb:
141 depth = len(breadcrumb.split(" > "))
142 breadcrumb_counts.append(depth)
144 if breadcrumb_counts:
145 avg_depth = sum(breadcrumb_counts) / len(breadcrumb_counts)
146 hierarchical_structure = min(
147 1.0, (avg_depth - 1) / 3.0
148 ) # Normalize: depth 1 = 0, depth 4+ = 1
149 else:
150 hierarchical_structure = 0
152 # Source diversity analysis
153 source_types = [getattr(doc, "source_type", "") for doc in documents]
154 unique_sources = len(set(source_types))
155 source_diversity = min(
156 1.0, unique_sources / 4.0
157 ) # Normalize: 4+ source types = max diversity
159 return {
160 "entity_richness": entity_richness,
161 "topic_clarity": topic_clarity,
162 "project_distribution": project_distribution,
163 "hierarchical_structure": hierarchical_structure,
164 "source_diversity": source_diversity,
165 }