Coverage for src/qdrant_loader_mcp_server/search/engine/topic_chain.py: 79%
104 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Topic Chain Search Operations.
4This module implements topic-driven search chain functionality
5for progressive discovery and exploration of related content.
6"""
8from __future__ import annotations
10from typing import TYPE_CHECKING
12if TYPE_CHECKING:
13 from .core import SearchEngine
15from ...utils.logging import LoggingConfig
16from ..components.search_result_models import HybridSearchResult
17from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain
19logger = LoggingConfig.get_logger(__name__)
22class TopicChainResult(dict):
23 """Dict-like result for topic chain searches.
25 - Behaves like the legacy mapping of query->results (backward compatible).
26 - Additionally exposes metadata via special keys and attributes:
27 'chain_results', 'organized_results', and 'stats'.
28 """
30 def __init__(self, chain_results: dict, organized_results: dict, stats: dict):
31 super().__init__(chain_results)
32 # Expose as attributes
33 self.chain_results = chain_results
34 self.organized_results = organized_results
35 self.stats = stats
37 def __getitem__(self, key): # type: ignore[override]
38 if key == "chain_results":
39 return self.chain_results
40 if key == "organized_results":
41 return self.organized_results
42 if key == "stats":
43 return self.stats
44 return super().__getitem__(key)
46 def __contains__(self, key): # type: ignore[override]
47 if key in {"chain_results", "organized_results", "stats"}:
48 return True
49 return super().__contains__(key)
51 def get(self, key, default=None): # type: ignore[override]
52 try:
53 return self[key]
54 except KeyError:
55 return default
57 def __eq__(self, other): # type: ignore[override]
58 # Compare only the legacy mapping portion for equality with dicts
59 if isinstance(other, dict):
60 return dict(super().items()) == other
61 return super().__eq__(other)
64class TopicChainOperations:
65 """Handles topic chain search operations."""
67 def __init__(self, engine: SearchEngine):
68 """Initialize with search engine reference."""
69 self.engine = engine
70 self.logger = LoggingConfig.get_logger(__name__)
72 async def generate_topic_chain(
73 self, query: str, strategy: str = "mixed_exploration", max_links: int = 5
74 ) -> TopicSearchChain:
75 """🔥 NEW: Generate a topic-driven search chain for progressive discovery.
77 Args:
78 query: Original search query
79 strategy: Chain generation strategy (breadth_first, depth_first, relevance_ranked, mixed_exploration)
80 max_links: Maximum number of chain links to generate
82 Returns:
83 TopicSearchChain with progressive exploration queries
84 """
85 if not self.engine.hybrid_search:
86 raise RuntimeError("Search engine not initialized")
88 # Convert string strategy to enum
89 try:
90 chain_strategy = ChainStrategy(strategy)
91 except ValueError:
92 self.logger.warning(
93 f"Unknown strategy '{strategy}', using mixed_exploration"
94 )
95 chain_strategy = ChainStrategy.MIXED_EXPLORATION
97 self.logger.debug(
98 "Generating topic search chain",
99 query=query,
100 strategy=strategy,
101 max_links=max_links,
102 )
104 try:
105 topic_chain = await self.engine.hybrid_search.generate_topic_search_chain(
106 query=query, strategy=chain_strategy, max_links=max_links
107 )
109 self.logger.info(
110 "Topic chain generation completed",
111 query=query,
112 chain_length=len(topic_chain.chain_links),
113 topics_covered=topic_chain.total_topics_covered,
114 discovery_potential=f"{topic_chain.estimated_discovery_potential:.2f}",
115 )
117 return topic_chain
118 except Exception as e:
119 self.logger.error(
120 "Topic chain generation failed", error=str(e), query=query
121 )
122 raise
124 async def execute_topic_chain(
125 self,
126 topic_chain: TopicSearchChain,
127 results_per_link: int = 3,
128 source_types: list[str] | None = None,
129 project_ids: list[str] | None = None,
130 ) -> dict[str, list[HybridSearchResult]]:
131 """🔥 NEW: Execute searches for all links in a topic chain.
133 Args:
134 topic_chain: The topic search chain to execute
135 results_per_link: Number of results per chain link
136 source_types: Optional source type filters
137 project_ids: Optional project ID filters
139 Returns:
140 Dictionary mapping queries to search results
141 """
142 if not self.engine.hybrid_search:
143 raise RuntimeError("Search engine not initialized")
145 self.logger.debug(
146 "Executing topic chain search",
147 original_query=topic_chain.original_query,
148 chain_length=len(topic_chain.chain_links),
149 results_per_link=results_per_link,
150 )
152 try:
153 chain_results = await self.engine.hybrid_search.execute_topic_chain_search(
154 topic_chain=topic_chain,
155 results_per_link=results_per_link,
156 source_types=source_types,
157 project_ids=project_ids,
158 )
160 total_results = sum(len(results) for results in chain_results.values())
161 self.logger.info(
162 "Topic chain execution completed",
163 original_query=topic_chain.original_query,
164 total_queries=len(chain_results),
165 total_results=total_results,
166 )
168 return chain_results
169 except Exception as e:
170 self.logger.error("Topic chain execution failed", error=str(e))
171 raise
173 async def search_with_topic_chain(
174 self,
175 query: str,
176 chain_strategy: str = "mixed_exploration",
177 results_per_link: int = 3,
178 max_links: int = 5,
179 source_types: list[str] | None = None,
180 project_ids: list[str] | None = None,
181 ) -> TopicChainResult:
182 """🔥 NEW: Perform search with full topic chain analysis.
184 This combines topic chain generation and execution for complete
185 progressive discovery workflow.
187 Args:
188 query: Original search query
189 chain_strategy: Strategy for topic chain generation
190 results_per_link: Results per chain link
191 max_links: Maximum chain links
192 source_types: Optional source type filters
193 project_ids: Optional project ID filters
195 Returns:
196 Dictionary with chain metadata and organized results
197 """
198 if not self.engine.hybrid_search:
199 raise RuntimeError("Search engine not initialized")
201 self.logger.info(
202 "Starting topic chain search workflow",
203 query=query,
204 strategy=chain_strategy,
205 max_links=max_links,
206 )
208 try:
209 # Generate the topic chain
210 topic_chain = await self.generate_topic_chain(
211 query=query, strategy=chain_strategy, max_links=max_links
212 )
214 # Execute searches for each link
215 chain_results = await self.execute_topic_chain(
216 topic_chain=topic_chain,
217 results_per_link=results_per_link,
218 source_types=source_types,
219 project_ids=project_ids,
220 )
222 # Organize results by exploration depth
223 organized_results = self._organize_chain_results(topic_chain, chain_results)
225 # Calculate exploration statistics
226 stats = self._calculate_exploration_stats(topic_chain, chain_results)
228 self.logger.info(
229 "Topic chain search completed",
230 query=query,
231 total_results=sum(len(results) for results in chain_results.values()),
232 topics_explored=topic_chain.total_topics_covered,
233 )
235 # Return structured result matching documented shape while preserving
236 # backward compatibility by including raw chain results under a key
237 return TopicChainResult(
238 chain_results=chain_results,
239 organized_results=organized_results,
240 stats=stats,
241 )
243 except Exception as e:
244 self.logger.error("Topic chain search failed", error=str(e), query=query)
245 raise
247 def _organize_chain_results(
248 self, topic_chain: TopicSearchChain, chain_results: dict
249 ) -> dict:
250 """Organize chain results by exploration depth."""
251 organized = {}
253 # Defensive: handle None or empty chain_results
254 if not chain_results:
255 return organized
257 for link in topic_chain.chain_links:
258 depth = link.chain_position
259 query = link.query
261 if depth not in organized:
262 organized[depth] = {
263 "queries": [],
264 "results": [],
265 "total_results": 0,
266 }
268 results = chain_results.get(query)
269 if results is not None:
270 organized[depth]["queries"].append(
271 {
272 "query": query,
273 "topics": [link.topic_focus] + link.related_topics,
274 "relevance_score": link.relevance_score,
275 "result_count": len(results),
276 }
277 )
278 organized[depth]["results"].extend(results)
279 organized[depth]["total_results"] += len(results)
281 return organized
283 def _calculate_exploration_stats(
284 self, topic_chain: TopicSearchChain, chain_results: dict
285 ) -> dict:
286 """Calculate exploration statistics."""
287 total_results = sum(len(results) for results in chain_results.values())
288 unique_topics = set()
290 for link in topic_chain.chain_links:
291 unique_topics.update([link.topic_focus] + link.related_topics)
293 depth_distribution = {}
294 for link in topic_chain.chain_links:
295 depth = link.chain_position
296 depth_distribution[depth] = depth_distribution.get(depth, 0) + 1
298 return {
299 "total_chain_links": len(topic_chain.chain_links),
300 "unique_topics_discovered": len(unique_topics),
301 "depth_distribution": depth_distribution,
302 "average_relevance_score": (
303 sum(link.relevance_score for link in topic_chain.chain_links)
304 / len(topic_chain.chain_links)
305 if topic_chain.chain_links
306 else 0
307 ),
308 "results_per_query_average": (
309 total_results / len(chain_results) if chain_results else 0
310 ),
311 }