Coverage for src/qdrant_loader_mcp_server/mcp/formatters/structured.py: 49%

92 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Structured Result Formatters - Complex Data Structure Formatting. 

3 

4This module handles the creation of complex, structured result formats 

5for MCP responses that require detailed organization and presentation. 

6""" 

7 

8from typing import Any 

9 

10# Backward-compatible import for HybridSearchResult across branches 

11try: # Prefer current location 

12 from ...search.components.search_result_models import ( 

13 HybridSearchResult, # type: ignore[assignment] 

14 ) 

15except Exception: # ImportError | ModuleNotFoundError 

16 # Fallback for older layout 

17 from ...search.components.models.hybrid import ( 

18 HybridSearchResult, # type: ignore[assignment] 

19 ) 

20from .utils import FormatterUtils 

21 

22 

23class StructuredResultFormatters: 

24 """Handles structured result formatting operations.""" 

25 

26 @staticmethod 

27 def create_structured_search_results( 

28 results: list[HybridSearchResult], 

29 query: str = "", 

30 max_results: int = 20, 

31 ) -> list[dict[str, Any]]: 

32 """Create structured search results as a list of formatted results.""" 

33 formatted_results: list[dict[str, Any]] = [] 

34 for result in results[:max_results]: 

35 raw_text = getattr(result, "text", None) 

36 if raw_text is None: 

37 normalized_text = "" 

38 elif isinstance(raw_text, str): 

39 normalized_text = raw_text 

40 else: 

41 normalized_text = str(raw_text) 

42 

43 formatted_results.append( 

44 { 

45 "document_id": getattr(result, "document_id", ""), 

46 "title": ( 

47 result.get_display_title() 

48 if hasattr(result, "get_display_title") 

49 else None 

50 ) 

51 or getattr(result, "source_title", None) 

52 or "Untitled", 

53 "content": normalized_text, 

54 "content_snippet": ( 

55 normalized_text[:300] + "..." 

56 if len(normalized_text) > 300 

57 else normalized_text 

58 ), 

59 "source_type": getattr(result, "source_type", "unknown"), 

60 "source_url": getattr(result, "source_url", None), 

61 "file_path": getattr(result, "file_path", None), 

62 "score": getattr(result, "score", 0.0), 

63 "created_at": getattr(result, "created_at", None), 

64 "updated_at": getattr(result, "updated_at", None), 

65 "metadata": { 

66 "breadcrumb": getattr(result, "breadcrumb_text", None), 

67 "hierarchy_context": getattr(result, "hierarchy_context", None), 

68 "project_info": ( 

69 result.get_project_info() 

70 if hasattr(result, "get_project_info") 

71 else None 

72 ), 

73 "project_id": ( 

74 "" 

75 if getattr(result, "project_id", None) is None 

76 else str(getattr(result, "project_id", "")) 

77 ), 

78 "file_path": getattr(result, "file_path", None), 

79 "word_count": getattr(result, "word_count", None), 

80 "chunk_index": getattr(result, "chunk_index", None), 

81 "total_chunks": getattr(result, "total_chunks", None), 

82 "is_attachment": getattr(result, "is_attachment", False), 

83 "depth": FormatterUtils.extract_synthetic_depth(result), 

84 "has_children": FormatterUtils.extract_has_children(result), 

85 }, 

86 } 

87 ) 

88 return formatted_results 

89 

90 @staticmethod 

91 def create_structured_hierarchy_results( 

92 organized_results: dict[str, list[HybridSearchResult]], 

93 query: str = "", 

94 max_depth: int = 100, 

95 ) -> dict[str, Any]: 

96 """Create structured hierarchical results with full organization. 

97 

98 Parameters: 

99 organized_results: Mapping of group name to list of results. 

100 query: Original query string. 

101 max_depth: Maximum tree depth to attach children. This prevents 

102 stack overflows on very deep or cyclic hierarchies. Root level 

103 starts at depth 1. Children beyond max_depth are not attached. 

104 """ 

105 hierarchy_data = [] 

106 

107 for group_name, results in organized_results.items(): 

108 # Build hierarchical structure 

109 root_documents = [] 

110 child_map = {} 

111 

112 for result in results: 

113 parent_id = FormatterUtils.extract_synthetic_parent_id(result) 

114 raw_text = getattr(result, "text", None) 

115 if raw_text is None: 

116 normalized_text = "" 

117 elif isinstance(raw_text, str): 

118 normalized_text = raw_text 

119 else: 

120 normalized_text = str(raw_text) 

121 

122 doc_data = { 

123 "document_id": getattr(result, "document_id", ""), 

124 "title": ( 

125 result.get_display_title() 

126 if hasattr(result, "get_display_title") 

127 else None 

128 ) 

129 or getattr(result, "source_title", None) 

130 or "Untitled", 

131 "content_snippet": ( 

132 normalized_text[:200] + "..." 

133 if len(normalized_text) > 200 

134 else normalized_text 

135 ), 

136 "source_type": getattr(result, "source_type", "unknown"), 

137 "score": getattr(result, "score", 0.0), 

138 "depth": FormatterUtils.extract_synthetic_depth(result), 

139 "parent_id": parent_id, 

140 "has_children": FormatterUtils.extract_has_children(result), 

141 "children": [], 

142 "metadata": { 

143 "breadcrumb": FormatterUtils.extract_synthetic_breadcrumb( 

144 result 

145 ), 

146 "hierarchy_context": getattr(result, "hierarchy_context", None), 

147 "file_path": getattr(result, "file_path", None), 

148 }, 

149 } 

150 

151 if parent_id: 

152 if parent_id not in child_map: 

153 child_map[parent_id] = [] 

154 child_map[parent_id].append(doc_data) 

155 else: 

156 root_documents.append(doc_data) 

157 

158 # Attach children to parents using an explicit stack and depth cap 

159 # to avoid unbounded recursion in deep hierarchies 

160 def attach_children_iterative( 

161 root_docs: list[dict[str, Any]], 

162 depth_limit: int, 

163 child_lookup: dict[str, list[dict[str, Any]]], 

164 ) -> None: 

165 if depth_limit <= 0: 

166 return 

167 stack: list[tuple[dict[str, Any], int]] = [ 

168 (doc, 1) for doc in root_docs 

169 ] 

170 # Track visited to avoid cycles 

171 visited: set[str] = set() 

172 while stack: 

173 current_doc, current_depth = stack.pop() 

174 doc_id = current_doc.get("document_id") 

175 if not doc_id or doc_id in visited: 

176 continue 

177 visited.add(doc_id) 

178 # Only attach children if within depth limit 

179 if current_depth >= depth_limit: 

180 continue 

181 children = child_lookup.get(doc_id) 

182 if children: 

183 current_doc["children"] = children 

184 for child in children: 

185 stack.append((child, current_depth + 1)) 

186 

187 attach_children_iterative(root_documents, max_depth, child_map) 

188 

189 hierarchy_data.append( 

190 { 

191 "group_name": FormatterUtils.generate_clean_group_name( 

192 group_name, results 

193 ), 

194 "documents": root_documents, 

195 "total_documents": len(results), 

196 "max_depth": max( 

197 (FormatterUtils.extract_synthetic_depth(r) for r in results), 

198 default=0, 

199 ), 

200 } 

201 ) 

202 

203 return { 

204 "query": query, 

205 "hierarchy_groups": hierarchy_data, 

206 "total_groups": len(organized_results), 

207 "total_documents": sum( 

208 len(results) for results in organized_results.values() 

209 ), 

210 } 

211 

212 @staticmethod 

213 def create_structured_attachment_results( 

214 filtered_results: list[HybridSearchResult], 

215 attachment_filter: dict[str, Any], 

216 include_metadata: bool = True, 

217 ) -> dict[str, Any]: 

218 """Create structured attachment results with detailed organization.""" 

219 # Filter only attachment results 

220 attachment_results = [ 

221 result 

222 for result in filtered_results 

223 if getattr(result, "is_attachment", False) 

224 ] 

225 

226 # Group by file type 

227 organized_attachments = {} 

228 for result in attachment_results: 

229 file_type = FormatterUtils.extract_file_type_minimal(result) 

230 if file_type not in organized_attachments: 

231 organized_attachments[file_type] = [] 

232 organized_attachments[file_type].append(result) 

233 

234 attachment_data = [] 

235 for file_type, results in organized_attachments.items(): 

236 attachments = [] 

237 

238 for result in results: 

239 raw_text = getattr(result, "text", None) 

240 if raw_text is None: 

241 normalized_text = "" 

242 elif isinstance(raw_text, str): 

243 normalized_text = raw_text 

244 else: 

245 normalized_text = str(raw_text) 

246 

247 attachment_info = { 

248 "document_id": getattr(result, "document_id", ""), 

249 "filename": FormatterUtils.extract_safe_filename(result), 

250 "file_type": FormatterUtils.extract_file_type_minimal(result), 

251 "source_type": getattr(result, "source_type", "unknown"), 

252 "score": getattr(result, "score", 0.0), 

253 "content_snippet": ( 

254 normalized_text[:150] + "..." 

255 if len(normalized_text) > 150 

256 else normalized_text 

257 ), 

258 } 

259 

260 if include_metadata: 

261 attachment_info["metadata"] = { 

262 "original_filename": getattr(result, "original_filename", None), 

263 "attachment_context": getattr( 

264 result, "attachment_context", None 

265 ), 

266 "parent_document_title": getattr( 

267 result, "parent_document_title", None 

268 ), 

269 "file_path": getattr(result, "file_path", None), 

270 "source_url": getattr(result, "source_url", None), 

271 "breadcrumb": getattr(result, "breadcrumb_text", None), 

272 } 

273 

274 attachments.append(attachment_info) 

275 

276 attachment_data.append( 

277 { 

278 "group_name": file_type, 

279 "file_types": [file_type], 

280 "attachments": attachments, 

281 "total_attachments": len(attachments), 

282 "metadata": ( 

283 { 

284 "avg_score": ( 

285 sum(getattr(r, "score", 0) for r in results) 

286 / len(results) 

287 if results 

288 else 0 

289 ), 

290 "source_types": list( 

291 {getattr(r, "source_type", "unknown") for r in results} 

292 ), 

293 } 

294 if include_metadata 

295 else {} 

296 ), 

297 } 

298 ) 

299 

300 def _normalized_text(r: Any) -> str: 

301 rt = getattr(r, "text", None) 

302 if rt is None: 

303 return "" 

304 if isinstance(rt, str): 

305 return rt 

306 return str(rt) 

307 

308 return { 

309 "results": [ 

310 { 

311 "document_id": getattr(result, "document_id", ""), 

312 "title": ( 

313 result.get_display_title() 

314 if hasattr(result, "get_display_title") 

315 else None 

316 ) 

317 or getattr(result, "source_title", None) 

318 or "Untitled", 

319 "attachment_info": { 

320 "filename": FormatterUtils.extract_safe_filename(result), 

321 "file_type": FormatterUtils.extract_file_type_minimal(result), 

322 "file_size": getattr(result, "file_size", None), 

323 }, 

324 "source_type": getattr(result, "source_type", "unknown"), 

325 "score": getattr(result, "score", 0.0), 

326 "content_snippet": ( 

327 _normalized_text(result)[:150] + "..." 

328 if len(_normalized_text(result)) > 150 

329 else _normalized_text(result) 

330 ), 

331 "metadata": ( 

332 { 

333 "file_path": getattr(result, "file_path", None), 

334 "source_url": getattr(result, "source_url", None), 

335 "breadcrumb": getattr(result, "breadcrumb_text", None), 

336 "parent_document_title": getattr( 

337 result, "parent_document_title", None 

338 ), 

339 } 

340 if include_metadata 

341 else {} 

342 ), 

343 } 

344 for result in attachment_results 

345 ], 

346 "total_found": len(attachment_results), 

347 "attachment_summary": { 

348 "total_attachments": len(attachment_results), 

349 "file_types": list(organized_attachments.keys()), 

350 "groups_created": len(organized_attachments), 

351 }, 

352 # Keep additional fields for backward compatibility 

353 "attachment_groups": attachment_data, 

354 "total_groups": len(organized_attachments), 

355 "total_attachments": len(attachment_results), 

356 "filter_criteria": attachment_filter, 

357 "metadata": ( 

358 { 

359 "all_file_types": list(organized_attachments.keys()), 

360 "largest_group_size": max( 

361 (len(results) for results in organized_attachments.values()), 

362 default=0, 

363 ), 

364 } 

365 if include_metadata 

366 else {} 

367 ), 

368 }