Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/conflict_resolution.py: 61%
54 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1from __future__ import annotations
3from typing import Any
6def describe_conflict(_detector: Any, indicators: list) -> str:
7 if not indicators:
8 return "No specific conflict indicators found."
9 descriptions: list[str] = []
10 for indicator in indicators[:3]:
11 if isinstance(indicator, dict) and "type" in indicator:
12 descriptions.append(f"{indicator['type']} conflict detected")
13 elif isinstance(indicator, str):
14 descriptions.append(indicator)
15 else:
16 descriptions.append("General conflict indicator")
17 return (
18 "; ".join(descriptions)
19 if descriptions
20 else "Multiple conflict indicators found."
21 )
24def generate_resolution_suggestions(_detector: Any, conflicts: Any) -> dict[str, str]:
25 if not conflicts:
26 return {"general": "No conflicts detected - no resolution needed."}
28 suggestions: dict[str, str] = {}
30 if hasattr(conflicts, "conflict_categories"):
31 for category, _pairs in conflicts.conflict_categories.items():
32 if category == "version":
33 suggestions[category] = (
34 "Consider consolidating version information across documents."
35 )
36 elif category == "procedural":
37 suggestions[category] = "Review and standardize procedural steps."
38 elif category == "data":
39 suggestions[category] = (
40 "Verify and update data consistency across sources."
41 )
42 else:
43 suggestions[category] = f"Review and resolve {category} conflicts."
44 return suggestions or {
45 "general": "Review conflicting documents and update accordingly."
46 }
48 if isinstance(conflicts, list):
49 for i, conflict in enumerate(conflicts[:3]):
50 key = f"conflict_{i+1}"
51 if isinstance(conflict, dict):
52 ctype = conflict.get("type", "unknown").lower()
53 if "version" in ctype:
54 suggestions[key] = (
55 "Consider consolidating version information across documents."
56 )
57 elif "procedure" in ctype or "process" in ctype:
58 suggestions[key] = "Review and standardize procedural steps."
59 elif "data" in ctype:
60 suggestions[key] = (
61 "Verify and update data consistency across sources."
62 )
63 else:
64 suggestions[key] = (
65 "Review conflicting information and update as needed."
66 )
67 else:
68 suggestions[key] = "Review and resolve identified conflicts."
69 return suggestions or {
70 "general": "Review conflicting documents and update accordingly."
71 }
73 return {"general": "Review and resolve detected conflicts."}
76def extract_context_snippet(
77 _detector: Any,
78 text: str,
79 keyword: str,
80 context_length: int = 100,
81 max_length: int | None = None,
82) -> str:
83 if not keyword or not text:
84 return ""
85 keyword_lower = keyword.lower()
86 text_lower = text.lower()
87 effective_length = max_length if max_length is not None else context_length
88 start_idx = text_lower.find(keyword_lower)
89 if start_idx == -1:
90 return text[:effective_length].strip()
91 context_start = max(0, start_idx - effective_length // 2)
92 context_end = min(len(text), start_idx + len(keyword) + effective_length // 2)
93 return text[context_start:context_end].strip()