Coverage for src/qdrant_loader_mcp_server/mcp/formatters/utils.py: 63%
226 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Formatter Utilities - Shared Helper Functions.
4This module contains utility functions and helpers used across
5different formatter modules for common operations like field extraction,
6group generation, and data processing.
7"""
9from typing import Any
11from ...search.components.search_result_models import HybridSearchResult
14class FormatterUtils:
15 """Shared utility functions for formatters."""
17 @staticmethod
18 def extract_minimal_doc_fields(
19 result: HybridSearchResult, include_content: bool = False
20 ) -> dict[str, Any]:
21 """Extract minimal document fields for lightweight responses."""
22 minimal = {
23 "document_id": getattr(result, "document_id", ""),
24 "title": getattr(result, "source_title", "Untitled"),
25 "source_type": getattr(result, "source_type", "unknown"),
26 "score": getattr(result, "score", 0.0),
27 }
29 if include_content:
30 content = getattr(result, "text", "")
31 minimal["snippet"] = (
32 content[:200] + "..." if len(content) > 200 else content
33 )
35 # Add optional fields if available
36 optional_fields = ["source_url", "file_path", "breadcrumb_text"]
37 for field in optional_fields:
38 value = getattr(result, field, None)
39 if value:
40 minimal[field] = value
42 return minimal
44 @staticmethod
45 def extract_conflicting_statements(conflict_info: dict) -> list[dict[str, Any]]:
46 """Extract conflicting statements from conflict information."""
47 statements = []
49 # Extract from structured indicators
50 structured_indicators = conflict_info.get("structured_indicators", [])
51 for indicator in structured_indicators:
52 if (
53 isinstance(indicator, dict)
54 and "doc1_snippet" in indicator
55 and "doc2_snippet" in indicator
56 ):
57 statements.append(
58 {
59 "document_1_statement": indicator["doc1_snippet"],
60 "document_2_statement": indicator["doc2_snippet"],
61 "context": indicator.get("context", ""),
62 }
63 )
65 # Fallback to basic conflict description if no structured indicators
66 if not statements and "description" in conflict_info:
67 statements.append(
68 {
69 "document_1_statement": conflict_info.get("description", ""),
70 "document_2_statement": "",
71 "context": "General conflict detected",
72 }
73 )
75 return statements
77 @staticmethod
78 def generate_clean_group_name(group_key: str, results: list) -> str:
79 """Generate clear, short group names."""
80 # Remove chunk/content prefixes from group names
81 if group_key.startswith("Exists, limited clarity"):
82 return "Technical Documentation"
83 if group_key.startswith("Immediately begin compiling"):
84 return "Product Management"
85 if group_key.startswith("Purpose and Scope"):
86 return "Project Overview"
88 # Use first meaningful part of breadcrumb
89 if " > " in group_key:
90 return group_key.split(" > ")[0]
92 # Truncate long names and add context
93 if len(group_key) > 50:
94 source_type = (
95 getattr(results[0], "source_type", "unknown") if results else "unknown"
96 )
97 return f"{group_key[:47]}... ({source_type.title()})"
99 return group_key
101 @staticmethod
102 def get_group_key(result) -> str:
103 """Generate a stable group key for hierarchy organization."""
104 # Try synthetic breadcrumb first
105 synthetic_breadcrumb = FormatterUtils.extract_synthetic_breadcrumb(result)
106 if synthetic_breadcrumb:
107 if getattr(result, "source_type", None) == "confluence":
108 return synthetic_breadcrumb
109 elif getattr(result, "source_type", None) == "localfile":
110 # Use root folder from breadcrumb
111 return (
112 synthetic_breadcrumb.split(" > ")[0]
113 if " > " in synthetic_breadcrumb
114 else synthetic_breadcrumb
115 )
117 # Fallback to file path for localfiles
118 if getattr(result, "source_type", None) == "localfile" and getattr(
119 result, "file_path", None
120 ):
121 path_parts = [p for p in result.file_path.split("/") if p and p != "."]
122 return path_parts[0] if path_parts else "Root"
124 # Fallback to title
125 return getattr(result, "source_title", None) or "Uncategorized"
127 @staticmethod
128 def count_siblings(result, all_results: list) -> int:
129 """Count sibling documents at the same hierarchy level."""
130 if not all_results:
131 return 0
133 # Get the parent of the current result
134 parent_id = FormatterUtils.extract_synthetic_parent_id(result)
135 if not parent_id:
136 # If no parent, count documents at root level
137 siblings = [
138 r
139 for r in all_results
140 if not FormatterUtils.extract_synthetic_parent_id(r)
141 ]
142 return len(siblings)
144 # Count documents with the same parent
145 siblings = [
146 r
147 for r in all_results
148 if FormatterUtils.extract_synthetic_parent_id(r) == parent_id
149 ]
150 return len(siblings)
152 @staticmethod
153 def extract_synthetic_depth(result) -> int:
154 """Extract synthetic hierarchy depth from breadcrumb or file path."""
155 # First try breadcrumb
156 breadcrumb = FormatterUtils.extract_synthetic_breadcrumb(result)
157 if breadcrumb and " > " in breadcrumb:
158 return len(breadcrumb.split(" > "))
160 # Try file path for local files
161 if getattr(result, "source_type", None) == "localfile" and getattr(
162 result, "file_path", None
163 ):
164 # Remove leading ./ and count path segments
165 clean_path = result.file_path.lstrip("./")
166 path_parts = [p for p in clean_path.split("/") if p]
167 return len(path_parts)
169 return 1 # Default depth
171 @staticmethod
172 def extract_synthetic_parent_id(result) -> str | None:
173 """Extract synthetic parent ID from hierarchy information."""
174 breadcrumb = FormatterUtils.extract_synthetic_breadcrumb(result)
175 if breadcrumb and " > " in breadcrumb:
176 # Parent is the second-to-last element in breadcrumb
177 parts = breadcrumb.split(" > ")
178 if len(parts) > 1:
179 return parts[-2] # Second to last is the parent
181 # For file paths, parent is the directory
182 if getattr(result, "source_type", None) == "localfile" and getattr(
183 result, "file_path", None
184 ):
185 path_parts = [p for p in result.file_path.split("/") if p and p != "."]
186 if len(path_parts) > 1:
187 return path_parts[-2] # Parent directory
189 return None
191 @staticmethod
192 def extract_synthetic_parent_title(result) -> str | None:
193 """Extract synthetic parent title from hierarchy information."""
194 parent_id = FormatterUtils.extract_synthetic_parent_id(result)
195 return parent_id # In most cases, parent ID is the title
197 @staticmethod
198 def extract_synthetic_breadcrumb(result) -> str | None:
199 """Extract synthetic breadcrumb from result metadata."""
200 # First try the breadcrumb_text field
201 if hasattr(result, "breadcrumb_text") and result.breadcrumb_text:
202 return result.breadcrumb_text
204 # Try to construct from file path
205 if getattr(result, "source_type", None) == "localfile" and getattr(
206 result, "file_path", None
207 ):
208 path_parts = [p for p in result.file_path.split("/") if p and p != "."]
209 if path_parts:
210 # Include filename without extension for local files
211 filename = path_parts[-1]
212 if "." in filename:
213 filename = filename.rsplit(".", 1)[0]
214 path_parts[-1] = filename
215 return " > ".join(path_parts)
217 # Try hierarchy context for Confluence
218 if hasattr(result, "hierarchy_context") and result.hierarchy_context:
219 return result.hierarchy_context
221 return None
223 @staticmethod
224 def extract_has_children(result) -> bool:
225 """Extract whether the result has child documents."""
226 # Check if result has a has_children method
227 if hasattr(result, "has_children") and callable(result.has_children):
228 return result.has_children()
230 # Check for children_count attribute
231 children_count = getattr(result, "children_count", 0)
232 return children_count > 0
234 @staticmethod
235 def extract_children_count(result, all_results: list) -> int:
236 """Extract children count for a result."""
237 # First check if the result has a children_count attribute
238 if hasattr(result, "children_count"):
239 return getattr(result, "children_count", 0)
241 # Calculate based on hierarchy if all_results provided
242 if all_results:
243 result_id = getattr(result, "document_id", None) or getattr(
244 result, "source_title", ""
245 )
246 children = [
247 r
248 for r in all_results
249 if FormatterUtils.extract_synthetic_parent_id(r) == result_id
250 ]
251 return len(children)
253 return 0
255 @staticmethod
256 def extract_safe_filename(result: HybridSearchResult) -> str:
257 """Extract a safe filename from result, handling various edge cases."""
258 # First try original_filename (prioritize this for all results)
259 original_filename = getattr(result, "original_filename", None)
260 if original_filename:
261 return original_filename
263 # Then try file_path
264 file_path = getattr(result, "file_path", None)
265 if file_path:
266 # Extract filename from path
267 filename = file_path.split("/")[-1] if "/" in file_path else file_path
268 return filename if filename else "Unknown"
270 # Fallback to source_title
271 return getattr(result, "source_title", "Unknown")
273 @staticmethod
274 def extract_file_type_minimal(result: HybridSearchResult) -> str:
275 """Extract minimal file type information from result."""
276 # First try mime_type for more detailed type information
277 mime_type = getattr(result, "mime_type", None)
278 if mime_type:
279 # Convert common mime types to readable format
280 if mime_type == "text/markdown":
281 return "markdown"
282 elif mime_type.startswith("text/"):
283 return mime_type.replace("text/", "")
284 elif mime_type.startswith("application/pdf"):
285 return "pdf"
286 elif mime_type.startswith("application/"):
287 return mime_type.replace("application/", "")
288 elif "/" in mime_type:
289 return mime_type.split("/")[-1]
291 # Fallback to filename extension
292 filename = FormatterUtils.extract_safe_filename(result)
293 if "." in filename:
294 extension = filename.split(".")[-1].lower()
295 return extension
297 # Final fallback based on source_type
298 source_type = getattr(result, "source_type", "")
299 if source_type == "confluence":
300 return "page"
301 elif source_type == "jira":
302 return "ticket"
303 else:
304 return "unknown"
306 @staticmethod
307 def organize_attachments_by_type(results: list[HybridSearchResult]) -> list[dict]:
308 """Organize attachment results by file type for better presentation."""
309 # Group by file type
310 type_groups = {}
311 for result in results:
312 file_type = FormatterUtils.extract_file_type_minimal(result)
313 source_type = getattr(result, "source_type", "unknown")
315 group_key = FormatterUtils.get_attachment_group_key(file_type, source_type)
317 if group_key not in type_groups:
318 type_groups[group_key] = []
319 type_groups[group_key].append(result)
321 # Convert to list format with friendly names
322 organized_groups = []
323 for group_key, group_results in type_groups.items():
324 organized_groups.append(
325 {
326 "group_name": FormatterUtils.generate_friendly_group_name(
327 group_key
328 ),
329 "results": group_results,
330 "count": len(group_results),
331 "file_types": list(
332 {
333 FormatterUtils.extract_file_type_minimal(r)
334 for r in group_results
335 }
336 ),
337 }
338 )
340 # Sort by count (descending)
341 organized_groups.sort(key=lambda x: x["count"], reverse=True)
342 return organized_groups
344 @staticmethod
345 def get_attachment_group_key(file_type: str, source_type: str) -> str:
346 """Generate logical grouping keys for attachments."""
347 # Map to broader categories for better UX
348 document_types = {"pdf", "doc", "docx", "txt", "md"}
349 spreadsheet_types = {"xls", "xlsx", "csv"}
350 image_types = {"png", "jpg", "jpeg", "gif", "svg"}
352 if file_type in document_types:
353 return f"documents_{source_type}"
354 elif file_type in spreadsheet_types:
355 return f"spreadsheets_{source_type}"
356 elif file_type in image_types:
357 return f"images_{source_type}"
358 else:
359 return f"other_{source_type}"
361 @staticmethod
362 def generate_friendly_group_name(group_key: str) -> str:
363 """Generate user-friendly group names."""
364 # Parse the group key format: "type_source"
365 if "_" in group_key:
366 file_category, source_type = group_key.split("_", 1)
368 # Capitalize and format
369 category_map = {
370 "documents": "Documents",
371 "spreadsheets": "Spreadsheets",
372 "images": "Images",
373 "other": "Other Files",
374 }
376 source_map = {
377 "confluence": "Confluence",
378 "localfile": "Local Files",
379 "git": "Git Repository",
380 "jira": "Jira",
381 }
383 category = category_map.get(file_category, file_category.title())
384 source = source_map.get(source_type, source_type.title())
386 return f"{category} ({source})"
388 return group_key.title()
390 @staticmethod
391 def generate_conflict_resolution_suggestion(conflict_info: dict) -> str:
392 """Generate a resolution suggestion based on conflict type and information."""
393 conflict_type = conflict_info.get("type", "unknown")
395 if conflict_type == "version_conflict":
396 return "Review documents for version consistency and update outdated information"
397 elif conflict_type == "contradictory_guidance":
398 return "Reconcile contradictory guidance by consulting authoritative sources or stakeholders"
399 elif conflict_type == "procedural_conflict":
400 return "Establish a single, authoritative procedure and deprecate conflicting processes"
401 elif conflict_type == "requirement_conflict":
402 return "Clarify requirements with stakeholders and update documentation to resolve ambiguity"
403 elif conflict_type == "implementation_conflict":
404 return "Review implementation approaches and standardize on the preferred solution"
405 else:
406 return (
407 "Review conflicting information and establish a single source of truth"
408 )
410 @staticmethod
411 def extract_affected_sections(conflict_info: dict) -> list:
412 """Extract affected sections from conflict information."""
413 affected_sections = []
415 # Try to identify sections from structured indicators
416 structured_indicators = conflict_info.get("structured_indicators", [])
417 for indicator in structured_indicators:
418 if isinstance(indicator, dict):
419 # Look for section keywords in the snippets
420 doc1_snippet = indicator.get("doc1_snippet", "")
421 doc2_snippet = indicator.get("doc2_snippet", "")
423 sections = set()
424 for snippet in [doc1_snippet, doc2_snippet]:
425 # Common section patterns
426 if "introduction" in snippet.lower():
427 sections.add("Introduction")
428 elif "requirement" in snippet.lower():
429 sections.add("Requirements")
430 elif "procedure" in snippet.lower() or "process" in snippet.lower():
431 sections.add("Procedures")
432 elif "implementation" in snippet.lower():
433 sections.add("Implementation")
434 elif (
435 "configuration" in snippet.lower()
436 or "config" in snippet.lower()
437 ):
438 sections.add("Configuration")
439 elif "guideline" in snippet.lower() or "guide" in snippet.lower():
440 sections.add("Guidelines")
442 affected_sections.extend(list(sections))
444 # Remove duplicates and return
445 return list(set(affected_sections)) if affected_sections else ["Content"]