Coverage for src/qdrant_loader_mcp_server/search/engine/topic_chain.py: 79%

104 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Topic Chain Search Operations. 

3 

4This module implements topic-driven search chain functionality 

5for progressive discovery and exploration of related content. 

6""" 

7 

8from __future__ import annotations 

9 

10from typing import TYPE_CHECKING 

11 

12if TYPE_CHECKING: 

13 from .core import SearchEngine 

14 

15from ...utils.logging import LoggingConfig 

16from ..components.search_result_models import HybridSearchResult 

17from ..enhanced.topic_search_chain import ChainStrategy, TopicSearchChain 

18 

19logger = LoggingConfig.get_logger(__name__) 

20 

21 

22class TopicChainResult(dict): 

23 """Dict-like result for topic chain searches. 

24 

25 - Behaves like the legacy mapping of query->results (backward compatible). 

26 - Additionally exposes metadata via special keys and attributes: 

27 'chain_results', 'organized_results', and 'stats'. 

28 """ 

29 

30 def __init__(self, chain_results: dict, organized_results: dict, stats: dict): 

31 super().__init__(chain_results) 

32 # Expose as attributes 

33 self.chain_results = chain_results 

34 self.organized_results = organized_results 

35 self.stats = stats 

36 

37 def __getitem__(self, key): # type: ignore[override] 

38 if key == "chain_results": 

39 return self.chain_results 

40 if key == "organized_results": 

41 return self.organized_results 

42 if key == "stats": 

43 return self.stats 

44 return super().__getitem__(key) 

45 

46 def __contains__(self, key): # type: ignore[override] 

47 if key in {"chain_results", "organized_results", "stats"}: 

48 return True 

49 return super().__contains__(key) 

50 

51 def get(self, key, default=None): # type: ignore[override] 

52 try: 

53 return self[key] 

54 except KeyError: 

55 return default 

56 

57 def __eq__(self, other): # type: ignore[override] 

58 # Compare only the legacy mapping portion for equality with dicts 

59 if isinstance(other, dict): 

60 return dict(super().items()) == other 

61 return super().__eq__(other) 

62 

63 

64class TopicChainOperations: 

65 """Handles topic chain search operations.""" 

66 

67 def __init__(self, engine: SearchEngine): 

68 """Initialize with search engine reference.""" 

69 self.engine = engine 

70 self.logger = LoggingConfig.get_logger(__name__) 

71 

72 async def generate_topic_chain( 

73 self, query: str, strategy: str = "mixed_exploration", max_links: int = 5 

74 ) -> TopicSearchChain: 

75 """🔥 NEW: Generate a topic-driven search chain for progressive discovery. 

76 

77 Args: 

78 query: Original search query 

79 strategy: Chain generation strategy (breadth_first, depth_first, relevance_ranked, mixed_exploration) 

80 max_links: Maximum number of chain links to generate 

81 

82 Returns: 

83 TopicSearchChain with progressive exploration queries 

84 """ 

85 if not self.engine.hybrid_search: 

86 raise RuntimeError("Search engine not initialized") 

87 

88 # Convert string strategy to enum 

89 try: 

90 chain_strategy = ChainStrategy(strategy) 

91 except ValueError: 

92 self.logger.warning( 

93 f"Unknown strategy '{strategy}', using mixed_exploration" 

94 ) 

95 chain_strategy = ChainStrategy.MIXED_EXPLORATION 

96 

97 self.logger.debug( 

98 "Generating topic search chain", 

99 query=query, 

100 strategy=strategy, 

101 max_links=max_links, 

102 ) 

103 

104 try: 

105 topic_chain = await self.engine.hybrid_search.generate_topic_search_chain( 

106 query=query, strategy=chain_strategy, max_links=max_links 

107 ) 

108 

109 self.logger.info( 

110 "Topic chain generation completed", 

111 query=query, 

112 chain_length=len(topic_chain.chain_links), 

113 topics_covered=topic_chain.total_topics_covered, 

114 discovery_potential=f"{topic_chain.estimated_discovery_potential:.2f}", 

115 ) 

116 

117 return topic_chain 

118 except Exception as e: 

119 self.logger.error( 

120 "Topic chain generation failed", error=str(e), query=query 

121 ) 

122 raise 

123 

124 async def execute_topic_chain( 

125 self, 

126 topic_chain: TopicSearchChain, 

127 results_per_link: int = 3, 

128 source_types: list[str] | None = None, 

129 project_ids: list[str] | None = None, 

130 ) -> dict[str, list[HybridSearchResult]]: 

131 """🔥 NEW: Execute searches for all links in a topic chain. 

132 

133 Args: 

134 topic_chain: The topic search chain to execute 

135 results_per_link: Number of results per chain link 

136 source_types: Optional source type filters 

137 project_ids: Optional project ID filters 

138 

139 Returns: 

140 Dictionary mapping queries to search results 

141 """ 

142 if not self.engine.hybrid_search: 

143 raise RuntimeError("Search engine not initialized") 

144 

145 self.logger.debug( 

146 "Executing topic chain search", 

147 original_query=topic_chain.original_query, 

148 chain_length=len(topic_chain.chain_links), 

149 results_per_link=results_per_link, 

150 ) 

151 

152 try: 

153 chain_results = await self.engine.hybrid_search.execute_topic_chain_search( 

154 topic_chain=topic_chain, 

155 results_per_link=results_per_link, 

156 source_types=source_types, 

157 project_ids=project_ids, 

158 ) 

159 

160 total_results = sum(len(results) for results in chain_results.values()) 

161 self.logger.info( 

162 "Topic chain execution completed", 

163 original_query=topic_chain.original_query, 

164 total_queries=len(chain_results), 

165 total_results=total_results, 

166 ) 

167 

168 return chain_results 

169 except Exception as e: 

170 self.logger.error("Topic chain execution failed", error=str(e)) 

171 raise 

172 

173 async def search_with_topic_chain( 

174 self, 

175 query: str, 

176 chain_strategy: str = "mixed_exploration", 

177 results_per_link: int = 3, 

178 max_links: int = 5, 

179 source_types: list[str] | None = None, 

180 project_ids: list[str] | None = None, 

181 ) -> TopicChainResult: 

182 """🔥 NEW: Perform search with full topic chain analysis. 

183 

184 This combines topic chain generation and execution for complete 

185 progressive discovery workflow. 

186 

187 Args: 

188 query: Original search query 

189 chain_strategy: Strategy for topic chain generation 

190 results_per_link: Results per chain link 

191 max_links: Maximum chain links 

192 source_types: Optional source type filters 

193 project_ids: Optional project ID filters 

194 

195 Returns: 

196 Dictionary with chain metadata and organized results 

197 """ 

198 if not self.engine.hybrid_search: 

199 raise RuntimeError("Search engine not initialized") 

200 

201 self.logger.info( 

202 "Starting topic chain search workflow", 

203 query=query, 

204 strategy=chain_strategy, 

205 max_links=max_links, 

206 ) 

207 

208 try: 

209 # Generate the topic chain 

210 topic_chain = await self.generate_topic_chain( 

211 query=query, strategy=chain_strategy, max_links=max_links 

212 ) 

213 

214 # Execute searches for each link 

215 chain_results = await self.execute_topic_chain( 

216 topic_chain=topic_chain, 

217 results_per_link=results_per_link, 

218 source_types=source_types, 

219 project_ids=project_ids, 

220 ) 

221 

222 # Organize results by exploration depth 

223 organized_results = self._organize_chain_results(topic_chain, chain_results) 

224 

225 # Calculate exploration statistics 

226 stats = self._calculate_exploration_stats(topic_chain, chain_results) 

227 

228 self.logger.info( 

229 "Topic chain search completed", 

230 query=query, 

231 total_results=sum(len(results) for results in chain_results.values()), 

232 topics_explored=topic_chain.total_topics_covered, 

233 ) 

234 

235 # Return structured result matching documented shape while preserving 

236 # backward compatibility by including raw chain results under a key 

237 return TopicChainResult( 

238 chain_results=chain_results, 

239 organized_results=organized_results, 

240 stats=stats, 

241 ) 

242 

243 except Exception as e: 

244 self.logger.error("Topic chain search failed", error=str(e), query=query) 

245 raise 

246 

247 def _organize_chain_results( 

248 self, topic_chain: TopicSearchChain, chain_results: dict 

249 ) -> dict: 

250 """Organize chain results by exploration depth.""" 

251 organized = {} 

252 

253 # Defensive: handle None or empty chain_results 

254 if not chain_results: 

255 return organized 

256 

257 for link in topic_chain.chain_links: 

258 depth = link.chain_position 

259 query = link.query 

260 

261 if depth not in organized: 

262 organized[depth] = { 

263 "queries": [], 

264 "results": [], 

265 "total_results": 0, 

266 } 

267 

268 results = chain_results.get(query) 

269 if results is not None: 

270 organized[depth]["queries"].append( 

271 { 

272 "query": query, 

273 "topics": [link.topic_focus] + link.related_topics, 

274 "relevance_score": link.relevance_score, 

275 "result_count": len(results), 

276 } 

277 ) 

278 organized[depth]["results"].extend(results) 

279 organized[depth]["total_results"] += len(results) 

280 

281 return organized 

282 

283 def _calculate_exploration_stats( 

284 self, topic_chain: TopicSearchChain, chain_results: dict 

285 ) -> dict: 

286 """Calculate exploration statistics.""" 

287 total_results = sum(len(results) for results in chain_results.values()) 

288 unique_topics = set() 

289 

290 for link in topic_chain.chain_links: 

291 unique_topics.update([link.topic_focus] + link.related_topics) 

292 

293 depth_distribution = {} 

294 for link in topic_chain.chain_links: 

295 depth = link.chain_position 

296 depth_distribution[depth] = depth_distribution.get(depth, 0) + 1 

297 

298 return { 

299 "total_chain_links": len(topic_chain.chain_links), 

300 "unique_topics_discovered": len(unique_topics), 

301 "depth_distribution": depth_distribution, 

302 "average_relevance_score": ( 

303 sum(link.relevance_score for link in topic_chain.chain_links) 

304 / len(topic_chain.chain_links) 

305 if topic_chain.chain_links 

306 else 0 

307 ), 

308 "results_per_query_average": ( 

309 total_results / len(chain_results) if chain_results else 0 

310 ), 

311 }