Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/conflict_resolution.py: 61%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3from typing import Any 

4 

5 

6def describe_conflict(_detector: Any, indicators: list) -> str: 

7 if not indicators: 

8 return "No specific conflict indicators found." 

9 descriptions: list[str] = [] 

10 for indicator in indicators[:3]: 

11 if isinstance(indicator, dict) and "type" in indicator: 

12 descriptions.append(f"{indicator['type']} conflict detected") 

13 elif isinstance(indicator, str): 

14 descriptions.append(indicator) 

15 else: 

16 descriptions.append("General conflict indicator") 

17 return ( 

18 "; ".join(descriptions) 

19 if descriptions 

20 else "Multiple conflict indicators found." 

21 ) 

22 

23 

24def generate_resolution_suggestions(_detector: Any, conflicts: Any) -> dict[str, str]: 

25 if not conflicts: 

26 return {"general": "No conflicts detected - no resolution needed."} 

27 

28 suggestions: dict[str, str] = {} 

29 

30 if hasattr(conflicts, "conflict_categories"): 

31 for category, _pairs in conflicts.conflict_categories.items(): 

32 if category == "version": 

33 suggestions[category] = ( 

34 "Consider consolidating version information across documents." 

35 ) 

36 elif category == "procedural": 

37 suggestions[category] = "Review and standardize procedural steps." 

38 elif category == "data": 

39 suggestions[category] = ( 

40 "Verify and update data consistency across sources." 

41 ) 

42 else: 

43 suggestions[category] = f"Review and resolve {category} conflicts." 

44 return suggestions or { 

45 "general": "Review conflicting documents and update accordingly." 

46 } 

47 

48 if isinstance(conflicts, list): 

49 for i, conflict in enumerate(conflicts[:3]): 

50 key = f"conflict_{i+1}" 

51 if isinstance(conflict, dict): 

52 ctype = conflict.get("type", "unknown").lower() 

53 if "version" in ctype: 

54 suggestions[key] = ( 

55 "Consider consolidating version information across documents." 

56 ) 

57 elif "procedure" in ctype or "process" in ctype: 

58 suggestions[key] = "Review and standardize procedural steps." 

59 elif "data" in ctype: 

60 suggestions[key] = ( 

61 "Verify and update data consistency across sources." 

62 ) 

63 else: 

64 suggestions[key] = ( 

65 "Review conflicting information and update as needed." 

66 ) 

67 else: 

68 suggestions[key] = "Review and resolve identified conflicts." 

69 return suggestions or { 

70 "general": "Review conflicting documents and update accordingly." 

71 } 

72 

73 return {"general": "Review and resolve detected conflicts."} 

74 

75 

76def extract_context_snippet( 

77 _detector: Any, 

78 text: str, 

79 keyword: str, 

80 context_length: int = 100, 

81 max_length: int | None = None, 

82) -> str: 

83 if not keyword or not text: 

84 return "" 

85 keyword_lower = keyword.lower() 

86 text_lower = text.lower() 

87 effective_length = max_length if max_length is not None else context_length 

88 start_idx = text_lower.find(keyword_lower) 

89 if start_idx == -1: 

90 return text[:effective_length].strip() 

91 context_start = max(0, start_idx - effective_length // 2) 

92 context_end = min(len(text), start_idx + len(keyword) + effective_length // 2) 

93 return text[context_start:context_end].strip()