Coverage for src/qdrant_loader_mcp_server/search/enhanced/kg/utils.py: 73%

64 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1from __future__ import annotations 

2 

3import logging 

4 

5from .models import GraphEdge, GraphNode 

6 

7# Module-level logger 

8logger = logging.getLogger(__name__) 

9 

10# Default weights and thresholds used across KG computations 

11ENTITY_SIM_WEIGHT: float = 0.4 

12TOPIC_SIM_WEIGHT: float = 0.3 

13KEYWORD_SIM_WEIGHT: float = 0.3 

14SIMILARITY_EDGE_THRESHOLD: float = 0.3 

15 

16 

17def jaccard_similarity(set1: set[str], set2: set[str]) -> float: 

18 """Calculate Jaccard similarity between two sets.""" 

19 if not set1 or not set2: 

20 return 0.0 

21 

22 intersection = len(set1.intersection(set2)) 

23 union = len(set1.union(set2)) 

24 

25 return intersection / max(union, 1) 

26 

27 

28def calculate_list_similarity( 

29 list1: list[tuple[str, str]], list2: list[tuple[str, str]] 

30) -> float: 

31 """Calculate similarity between two lists of (text, label) items.""" 

32 if not list1 or not list2: 

33 return 0.0 

34 

35 set1 = {item[0].lower() for item in list1} 

36 set2 = {item[0].lower() for item in list2} 

37 

38 intersection = len(set1.intersection(set2)) 

39 union = len(set1.union(set2)) 

40 

41 return intersection / max(union, 1) 

42 

43 

44def calculate_node_similarity(node1: GraphNode, node2: GraphNode) -> float: 

45 """Calculate semantic similarity between two KG nodes.""" 

46 entity_similarity = jaccard_similarity(set(node1.entities), set(node2.entities)) 

47 topic_similarity = jaccard_similarity(set(node1.topics), set(node2.topics)) 

48 keyword_similarity = jaccard_similarity(set(node1.keywords), set(node2.keywords)) 

49 

50 total_similarity = ( 

51 entity_similarity * ENTITY_SIM_WEIGHT 

52 + topic_similarity * TOPIC_SIM_WEIGHT 

53 + keyword_similarity * KEYWORD_SIM_WEIGHT 

54 ) 

55 return total_similarity 

56 

57 

58def _get_relationship_value(edge: GraphEdge | object) -> str: 

59 """Safely extract the relationship value string from an edge. 

60 

61 Handles Enum, None, raw string, or objects with a ``value`` attribute. 

62 Falls back to "unknown" when it cannot determine a proper value. 

63 """ 

64 relationship_type = getattr(edge, "relationship_type", None) 

65 if relationship_type is None: 

66 return "unknown" 

67 # Enum-like objects expose a .value string 

68 try: 

69 from enum import Enum 

70 

71 if isinstance(relationship_type, Enum): 

72 value = getattr(relationship_type, "value", None) 

73 if isinstance(value, str) and value: 

74 return value 

75 return "unknown" 

76 except Exception: 

77 # If Enum isn't available or isinstance check fails, continue with other strategies 

78 pass 

79 

80 # Raw string 

81 if isinstance(relationship_type, str): 

82 return relationship_type 

83 

84 # Objects with a .value attribute 

85 value_attr = getattr(relationship_type, "value", None) 

86 if isinstance(value_attr, str) and value_attr: 

87 return value_attr 

88 

89 # Fallback to string conversion; ensure we always return something 

90 try: 

91 return str(relationship_type) 

92 except Exception: 

93 return "unknown" 

94 

95 

96def build_reasoning_path( 

97 edges: list[GraphEdge], nodes_by_id: dict[str, GraphNode] 

98) -> list[str]: 

99 """Build a human-readable reasoning path from a traversal. 

100 

101 Parameters 

102 - edges: Ordered list of graph edges traversed. 

103 - nodes_by_id: Mapping from node id to `GraphNode` for resolving titles. 

104 """ 

105 reasoning: list[str] = [] 

106 for _i, edge in enumerate(edges): 

107 source_node = nodes_by_id.get(edge.source_id) 

108 target_node = nodes_by_id.get(edge.target_id) 

109 

110 if source_node is None or target_node is None: 

111 edge_id = getattr(edge, "id", "N/A") 

112 relationship = _get_relationship_value(edge) 

113 logger.warning( 

114 "KG reasoning: missing node(s) for edge. edge_id=%s relationship=%s source_id=%s found=%s target_id=%s found=%s", 

115 edge_id, 

116 relationship, 

117 getattr(edge, "source_id", "N/A"), 

118 source_node is not None, 

119 getattr(edge, "target_id", "N/A"), 

120 target_node is not None, 

121 ) 

122 

123 source_title = ( 

124 source_node.title 

125 if source_node is not None 

126 else f"UNKNOWN NODE {getattr(edge, 'source_id', 'N/A')}" 

127 ) 

128 target_title = ( 

129 target_node.title 

130 if target_node is not None 

131 else f"UNKNOWN NODE {getattr(edge, 'target_id', 'N/A')}" 

132 ) 

133 

134 relationship_value = _get_relationship_value(edge) 

135 

136 reasoning.append( 

137 f"{source_title} --{relationship_value}--> {target_title} " 

138 f"(weight: {getattr(edge, 'weight', 0.0):.2f})" 

139 ) 

140 return reasoning