Coverage for src/qdrant_loader_mcp_server/mcp/formatters/utils.py: 63%

226 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Formatter Utilities - Shared Helper Functions. 

3 

4This module contains utility functions and helpers used across 

5different formatter modules for common operations like field extraction, 

6group generation, and data processing. 

7""" 

8 

9from typing import Any 

10 

11from ...search.components.search_result_models import HybridSearchResult 

12 

13 

14class FormatterUtils: 

15 """Shared utility functions for formatters.""" 

16 

17 @staticmethod 

18 def extract_minimal_doc_fields( 

19 result: HybridSearchResult, include_content: bool = False 

20 ) -> dict[str, Any]: 

21 """Extract minimal document fields for lightweight responses.""" 

22 minimal = { 

23 "document_id": getattr(result, "document_id", ""), 

24 "title": getattr(result, "source_title", "Untitled"), 

25 "source_type": getattr(result, "source_type", "unknown"), 

26 "score": getattr(result, "score", 0.0), 

27 } 

28 

29 if include_content: 

30 content = getattr(result, "text", "") 

31 minimal["snippet"] = ( 

32 content[:200] + "..." if len(content) > 200 else content 

33 ) 

34 

35 # Add optional fields if available 

36 optional_fields = ["source_url", "file_path", "breadcrumb_text"] 

37 for field in optional_fields: 

38 value = getattr(result, field, None) 

39 if value: 

40 minimal[field] = value 

41 

42 return minimal 

43 

44 @staticmethod 

45 def extract_conflicting_statements(conflict_info: dict) -> list[dict[str, Any]]: 

46 """Extract conflicting statements from conflict information.""" 

47 statements = [] 

48 

49 # Extract from structured indicators 

50 structured_indicators = conflict_info.get("structured_indicators", []) 

51 for indicator in structured_indicators: 

52 if ( 

53 isinstance(indicator, dict) 

54 and "doc1_snippet" in indicator 

55 and "doc2_snippet" in indicator 

56 ): 

57 statements.append( 

58 { 

59 "document_1_statement": indicator["doc1_snippet"], 

60 "document_2_statement": indicator["doc2_snippet"], 

61 "context": indicator.get("context", ""), 

62 } 

63 ) 

64 

65 # Fallback to basic conflict description if no structured indicators 

66 if not statements and "description" in conflict_info: 

67 statements.append( 

68 { 

69 "document_1_statement": conflict_info.get("description", ""), 

70 "document_2_statement": "", 

71 "context": "General conflict detected", 

72 } 

73 ) 

74 

75 return statements 

76 

77 @staticmethod 

78 def generate_clean_group_name(group_key: str, results: list) -> str: 

79 """Generate clear, short group names.""" 

80 # Remove chunk/content prefixes from group names 

81 if group_key.startswith("Exists, limited clarity"): 

82 return "Technical Documentation" 

83 if group_key.startswith("Immediately begin compiling"): 

84 return "Product Management" 

85 if group_key.startswith("Purpose and Scope"): 

86 return "Project Overview" 

87 

88 # Use first meaningful part of breadcrumb 

89 if " > " in group_key: 

90 return group_key.split(" > ")[0] 

91 

92 # Truncate long names and add context 

93 if len(group_key) > 50: 

94 source_type = ( 

95 getattr(results[0], "source_type", "unknown") if results else "unknown" 

96 ) 

97 return f"{group_key[:47]}... ({source_type.title()})" 

98 

99 return group_key 

100 

101 @staticmethod 

102 def get_group_key(result) -> str: 

103 """Generate a stable group key for hierarchy organization.""" 

104 # Try synthetic breadcrumb first 

105 synthetic_breadcrumb = FormatterUtils.extract_synthetic_breadcrumb(result) 

106 if synthetic_breadcrumb: 

107 if getattr(result, "source_type", None) == "confluence": 

108 return synthetic_breadcrumb 

109 elif getattr(result, "source_type", None) == "localfile": 

110 # Use root folder from breadcrumb 

111 return ( 

112 synthetic_breadcrumb.split(" > ")[0] 

113 if " > " in synthetic_breadcrumb 

114 else synthetic_breadcrumb 

115 ) 

116 

117 # Fallback to file path for localfiles 

118 if getattr(result, "source_type", None) == "localfile" and getattr( 

119 result, "file_path", None 

120 ): 

121 path_parts = [p for p in result.file_path.split("/") if p and p != "."] 

122 return path_parts[0] if path_parts else "Root" 

123 

124 # Fallback to title 

125 return getattr(result, "source_title", None) or "Uncategorized" 

126 

127 @staticmethod 

128 def count_siblings(result, all_results: list) -> int: 

129 """Count sibling documents at the same hierarchy level.""" 

130 if not all_results: 

131 return 0 

132 

133 # Get the parent of the current result 

134 parent_id = FormatterUtils.extract_synthetic_parent_id(result) 

135 if not parent_id: 

136 # If no parent, count documents at root level 

137 siblings = [ 

138 r 

139 for r in all_results 

140 if not FormatterUtils.extract_synthetic_parent_id(r) 

141 ] 

142 return len(siblings) 

143 

144 # Count documents with the same parent 

145 siblings = [ 

146 r 

147 for r in all_results 

148 if FormatterUtils.extract_synthetic_parent_id(r) == parent_id 

149 ] 

150 return len(siblings) 

151 

152 @staticmethod 

153 def extract_synthetic_depth(result) -> int: 

154 """Extract synthetic hierarchy depth from breadcrumb or file path.""" 

155 # First try breadcrumb 

156 breadcrumb = FormatterUtils.extract_synthetic_breadcrumb(result) 

157 if breadcrumb and " > " in breadcrumb: 

158 return len(breadcrumb.split(" > ")) 

159 

160 # Try file path for local files 

161 if getattr(result, "source_type", None) == "localfile" and getattr( 

162 result, "file_path", None 

163 ): 

164 # Remove leading ./ and count path segments 

165 clean_path = result.file_path.lstrip("./") 

166 path_parts = [p for p in clean_path.split("/") if p] 

167 return len(path_parts) 

168 

169 return 1 # Default depth 

170 

171 @staticmethod 

172 def extract_synthetic_parent_id(result) -> str | None: 

173 """Extract synthetic parent ID from hierarchy information.""" 

174 breadcrumb = FormatterUtils.extract_synthetic_breadcrumb(result) 

175 if breadcrumb and " > " in breadcrumb: 

176 # Parent is the second-to-last element in breadcrumb 

177 parts = breadcrumb.split(" > ") 

178 if len(parts) > 1: 

179 return parts[-2] # Second to last is the parent 

180 

181 # For file paths, parent is the directory 

182 if getattr(result, "source_type", None) == "localfile" and getattr( 

183 result, "file_path", None 

184 ): 

185 path_parts = [p for p in result.file_path.split("/") if p and p != "."] 

186 if len(path_parts) > 1: 

187 return path_parts[-2] # Parent directory 

188 

189 return None 

190 

191 @staticmethod 

192 def extract_synthetic_parent_title(result) -> str | None: 

193 """Extract synthetic parent title from hierarchy information.""" 

194 parent_id = FormatterUtils.extract_synthetic_parent_id(result) 

195 return parent_id # In most cases, parent ID is the title 

196 

197 @staticmethod 

198 def extract_synthetic_breadcrumb(result) -> str | None: 

199 """Extract synthetic breadcrumb from result metadata.""" 

200 # First try the breadcrumb_text field 

201 if hasattr(result, "breadcrumb_text") and result.breadcrumb_text: 

202 return result.breadcrumb_text 

203 

204 # Try to construct from file path 

205 if getattr(result, "source_type", None) == "localfile" and getattr( 

206 result, "file_path", None 

207 ): 

208 path_parts = [p for p in result.file_path.split("/") if p and p != "."] 

209 if path_parts: 

210 # Include filename without extension for local files 

211 filename = path_parts[-1] 

212 if "." in filename: 

213 filename = filename.rsplit(".", 1)[0] 

214 path_parts[-1] = filename 

215 return " > ".join(path_parts) 

216 

217 # Try hierarchy context for Confluence 

218 if hasattr(result, "hierarchy_context") and result.hierarchy_context: 

219 return result.hierarchy_context 

220 

221 return None 

222 

223 @staticmethod 

224 def extract_has_children(result) -> bool: 

225 """Extract whether the result has child documents.""" 

226 # Check if result has a has_children method 

227 if hasattr(result, "has_children") and callable(result.has_children): 

228 return result.has_children() 

229 

230 # Check for children_count attribute 

231 children_count = getattr(result, "children_count", 0) 

232 return children_count > 0 

233 

234 @staticmethod 

235 def extract_children_count(result, all_results: list) -> int: 

236 """Extract children count for a result.""" 

237 # First check if the result has a children_count attribute 

238 if hasattr(result, "children_count"): 

239 return getattr(result, "children_count", 0) 

240 

241 # Calculate based on hierarchy if all_results provided 

242 if all_results: 

243 result_id = getattr(result, "document_id", None) or getattr( 

244 result, "source_title", "" 

245 ) 

246 children = [ 

247 r 

248 for r in all_results 

249 if FormatterUtils.extract_synthetic_parent_id(r) == result_id 

250 ] 

251 return len(children) 

252 

253 return 0 

254 

255 @staticmethod 

256 def extract_safe_filename(result: HybridSearchResult) -> str: 

257 """Extract a safe filename from result, handling various edge cases.""" 

258 # First try original_filename (prioritize this for all results) 

259 original_filename = getattr(result, "original_filename", None) 

260 if original_filename: 

261 return original_filename 

262 

263 # Then try file_path 

264 file_path = getattr(result, "file_path", None) 

265 if file_path: 

266 # Extract filename from path 

267 filename = file_path.split("/")[-1] if "/" in file_path else file_path 

268 return filename if filename else "Unknown" 

269 

270 # Fallback to source_title 

271 return getattr(result, "source_title", "Unknown") 

272 

273 @staticmethod 

274 def extract_file_type_minimal(result: HybridSearchResult) -> str: 

275 """Extract minimal file type information from result.""" 

276 # First try mime_type for more detailed type information 

277 mime_type = getattr(result, "mime_type", None) 

278 if mime_type: 

279 # Convert common mime types to readable format 

280 if mime_type == "text/markdown": 

281 return "markdown" 

282 elif mime_type.startswith("text/"): 

283 return mime_type.replace("text/", "") 

284 elif mime_type.startswith("application/pdf"): 

285 return "pdf" 

286 elif mime_type.startswith("application/"): 

287 return mime_type.replace("application/", "") 

288 elif "/" in mime_type: 

289 return mime_type.split("/")[-1] 

290 

291 # Fallback to filename extension 

292 filename = FormatterUtils.extract_safe_filename(result) 

293 if "." in filename: 

294 extension = filename.split(".")[-1].lower() 

295 return extension 

296 

297 # Final fallback based on source_type 

298 source_type = getattr(result, "source_type", "") 

299 if source_type == "confluence": 

300 return "page" 

301 elif source_type == "jira": 

302 return "ticket" 

303 else: 

304 return "unknown" 

305 

306 @staticmethod 

307 def organize_attachments_by_type(results: list[HybridSearchResult]) -> list[dict]: 

308 """Organize attachment results by file type for better presentation.""" 

309 # Group by file type 

310 type_groups = {} 

311 for result in results: 

312 file_type = FormatterUtils.extract_file_type_minimal(result) 

313 source_type = getattr(result, "source_type", "unknown") 

314 

315 group_key = FormatterUtils.get_attachment_group_key(file_type, source_type) 

316 

317 if group_key not in type_groups: 

318 type_groups[group_key] = [] 

319 type_groups[group_key].append(result) 

320 

321 # Convert to list format with friendly names 

322 organized_groups = [] 

323 for group_key, group_results in type_groups.items(): 

324 organized_groups.append( 

325 { 

326 "group_name": FormatterUtils.generate_friendly_group_name( 

327 group_key 

328 ), 

329 "results": group_results, 

330 "count": len(group_results), 

331 "file_types": list( 

332 { 

333 FormatterUtils.extract_file_type_minimal(r) 

334 for r in group_results 

335 } 

336 ), 

337 } 

338 ) 

339 

340 # Sort by count (descending) 

341 organized_groups.sort(key=lambda x: x["count"], reverse=True) 

342 return organized_groups 

343 

344 @staticmethod 

345 def get_attachment_group_key(file_type: str, source_type: str) -> str: 

346 """Generate logical grouping keys for attachments.""" 

347 # Map to broader categories for better UX 

348 document_types = {"pdf", "doc", "docx", "txt", "md"} 

349 spreadsheet_types = {"xls", "xlsx", "csv"} 

350 image_types = {"png", "jpg", "jpeg", "gif", "svg"} 

351 

352 if file_type in document_types: 

353 return f"documents_{source_type}" 

354 elif file_type in spreadsheet_types: 

355 return f"spreadsheets_{source_type}" 

356 elif file_type in image_types: 

357 return f"images_{source_type}" 

358 else: 

359 return f"other_{source_type}" 

360 

361 @staticmethod 

362 def generate_friendly_group_name(group_key: str) -> str: 

363 """Generate user-friendly group names.""" 

364 # Parse the group key format: "type_source" 

365 if "_" in group_key: 

366 file_category, source_type = group_key.split("_", 1) 

367 

368 # Capitalize and format 

369 category_map = { 

370 "documents": "Documents", 

371 "spreadsheets": "Spreadsheets", 

372 "images": "Images", 

373 "other": "Other Files", 

374 } 

375 

376 source_map = { 

377 "confluence": "Confluence", 

378 "localfile": "Local Files", 

379 "git": "Git Repository", 

380 "jira": "Jira", 

381 } 

382 

383 category = category_map.get(file_category, file_category.title()) 

384 source = source_map.get(source_type, source_type.title()) 

385 

386 return f"{category} ({source})" 

387 

388 return group_key.title() 

389 

390 @staticmethod 

391 def generate_conflict_resolution_suggestion(conflict_info: dict) -> str: 

392 """Generate a resolution suggestion based on conflict type and information.""" 

393 conflict_type = conflict_info.get("type", "unknown") 

394 

395 if conflict_type == "version_conflict": 

396 return "Review documents for version consistency and update outdated information" 

397 elif conflict_type == "contradictory_guidance": 

398 return "Reconcile contradictory guidance by consulting authoritative sources or stakeholders" 

399 elif conflict_type == "procedural_conflict": 

400 return "Establish a single, authoritative procedure and deprecate conflicting processes" 

401 elif conflict_type == "requirement_conflict": 

402 return "Clarify requirements with stakeholders and update documentation to resolve ambiguity" 

403 elif conflict_type == "implementation_conflict": 

404 return "Review implementation approaches and standardize on the preferred solution" 

405 else: 

406 return ( 

407 "Review conflicting information and establish a single source of truth" 

408 ) 

409 

410 @staticmethod 

411 def extract_affected_sections(conflict_info: dict) -> list: 

412 """Extract affected sections from conflict information.""" 

413 affected_sections = [] 

414 

415 # Try to identify sections from structured indicators 

416 structured_indicators = conflict_info.get("structured_indicators", []) 

417 for indicator in structured_indicators: 

418 if isinstance(indicator, dict): 

419 # Look for section keywords in the snippets 

420 doc1_snippet = indicator.get("doc1_snippet", "") 

421 doc2_snippet = indicator.get("doc2_snippet", "") 

422 

423 sections = set() 

424 for snippet in [doc1_snippet, doc2_snippet]: 

425 # Common section patterns 

426 if "introduction" in snippet.lower(): 

427 sections.add("Introduction") 

428 elif "requirement" in snippet.lower(): 

429 sections.add("Requirements") 

430 elif "procedure" in snippet.lower() or "process" in snippet.lower(): 

431 sections.add("Procedures") 

432 elif "implementation" in snippet.lower(): 

433 sections.add("Implementation") 

434 elif ( 

435 "configuration" in snippet.lower() 

436 or "config" in snippet.lower() 

437 ): 

438 sections.add("Configuration") 

439 elif "guideline" in snippet.lower() or "guide" in snippet.lower(): 

440 sections.add("Guidelines") 

441 

442 affected_sections.extend(list(sections)) 

443 

444 # Remove duplicates and return 

445 return list(set(affected_sections)) if affected_sections else ["Content"]