Coverage for src/qdrant_loader_mcp_server/search/enhanced/kg/document_graph.py: 57%

137 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Document Knowledge Graph Interface. 

3 

4This module provides a high-level interface for document knowledge graph operations, 

5integrating graph building, traversal, and content discovery. 

6""" 

7 

8from __future__ import annotations 

9 

10import json 

11from datetime import date, datetime 

12from datetime import time as dtime 

13from enum import Enum 

14from pathlib import Path 

15from typing import TYPE_CHECKING, Any 

16 

17if TYPE_CHECKING: 

18 from ...models import SearchResult 

19 from ...nlp.spacy_analyzer import QueryAnalysis, SpaCyQueryAnalyzer 

20else: 

21 QueryAnalysis = Any 

22 SpaCyQueryAnalyzer = Any 

23 SearchResult = Any 

24 

25from ....utils.logging import LoggingConfig 

26from .builder import GraphBuilder 

27from .graph import KnowledgeGraph 

28from .models import NodeType, TraversalResult, TraversalStrategy 

29from .traverser import GraphTraverser 

30 

31logger = LoggingConfig.get_logger(__name__) 

32 

33 

34class DocumentKnowledgeGraph: 

35 """High-level interface for document knowledge graph operations.""" 

36 

37 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer | None = None): 

38 """Initialize the document knowledge graph system.""" 

39 # Import SpaCyQueryAnalyzer at runtime to avoid circular import 

40 if spacy_analyzer is None: 

41 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer 

42 

43 self.spacy_analyzer = SpaCyQueryAnalyzer() 

44 else: 

45 self.spacy_analyzer = spacy_analyzer 

46 

47 self.graph_builder = GraphBuilder(self.spacy_analyzer) 

48 self.knowledge_graph: KnowledgeGraph | None = None 

49 self.traverser: GraphTraverser | None = None 

50 

51 logger.info("Initialized document knowledge graph system") 

52 

53 def build_graph(self, search_results: list[SearchResult]) -> bool: 

54 """Build knowledge graph from search results.""" 

55 try: 

56 self.knowledge_graph = self.graph_builder.build_from_search_results( 

57 search_results 

58 ) 

59 self.traverser = GraphTraverser(self.knowledge_graph, self.spacy_analyzer) 

60 

61 stats = self.knowledge_graph.get_statistics() 

62 logger.info("Knowledge graph built successfully", **stats) 

63 return True 

64 

65 except Exception as e: 

66 logger.error(f"Failed to build knowledge graph: {e}") 

67 return False 

68 

69 def find_related_content( 

70 self, 

71 query: str, 

72 max_hops: int = 3, 

73 max_results: int = 20, 

74 strategy: TraversalStrategy = TraversalStrategy.SEMANTIC, 

75 ) -> list[TraversalResult]: 

76 """Find related content using graph traversal.""" 

77 

78 if not self.knowledge_graph or not self.traverser: 

79 logger.warning("Knowledge graph not initialized") 

80 return [] 

81 

82 try: 

83 # Analyze query with spaCy 

84 query_analysis = self.spacy_analyzer.analyze_query_semantic(query) 

85 

86 # Find starting nodes based on query entities and concepts 

87 start_nodes = self._find_query_start_nodes(query_analysis) 

88 

89 if not start_nodes: 

90 logger.debug("No starting nodes found for query") 

91 return [] 

92 

93 # Traverse graph to find related content 

94 results = self.traverser.traverse( 

95 start_nodes=start_nodes, 

96 query_analysis=query_analysis, 

97 strategy=strategy, 

98 max_hops=max_hops, 

99 max_results=max_results, 

100 ) 

101 

102 logger.debug( 

103 f"Found {len(results)} related content items via graph traversal" 

104 ) 

105 return results 

106 

107 except Exception as e: 

108 logger.error(f"Failed to find related content: {e}") 

109 return [] 

110 

111 def _find_query_start_nodes(self, query_analysis: QueryAnalysis) -> list[str]: 

112 """Find starting nodes for graph traversal based on query analysis.""" 

113 

114 start_nodes = [] 

115 

116 # Find nodes by entities (defensive against unexpected formats) 

117 for item in getattr(query_analysis, "entities", []) or []: 

118 try: 

119 entity_text: str | None = None 

120 # Accept tuple/list like (text, label, ...) 

121 if isinstance(item, list | tuple) and len(item) > 0: 

122 entity_text = item[0] 

123 # Accept direct string entity 

124 elif isinstance(item, str): 

125 entity_text = item 

126 # Skip dicts or None/empty safely 

127 if entity_text: 

128 entity_nodes = self.knowledge_graph.find_nodes_by_entity( 

129 entity_text 

130 ) 

131 start_nodes.extend([node.id for node in entity_nodes]) 

132 except IndexError: 

133 # Malformed entity entry; skip 

134 continue 

135 

136 # Find nodes by main concepts (as topics) 

137 for concept in query_analysis.main_concepts: 

138 topic_nodes = self.knowledge_graph.find_nodes_by_topic(concept) 

139 start_nodes.extend([node.id for node in topic_nodes]) 

140 

141 # If no entity/topic matches, use high-centrality document nodes 

142 if not start_nodes: 

143 doc_nodes = self.knowledge_graph.find_nodes_by_type(NodeType.DOCUMENT) 

144 # Sort by centrality and take top 3 

145 doc_nodes.sort(key=lambda n: n.centrality_score, reverse=True) 

146 start_nodes = [node.id for node in doc_nodes[:3]] 

147 

148 return list(set(start_nodes)) # Remove duplicates 

149 

150 def get_graph_statistics(self) -> dict[str, Any] | None: 

151 """Get knowledge graph statistics.""" 

152 if self.knowledge_graph: 

153 return self.knowledge_graph.get_statistics() 

154 return None 

155 

156 def export_graph(self, format: str = "json") -> str | None: 

157 """Export knowledge graph in specified format.""" 

158 if not self.knowledge_graph: 

159 return None 

160 

161 try: 

162 if format == "json": 

163 # Export as JSON with nodes and edges 

164 data = { 

165 "nodes": [ 

166 { 

167 "id": node.id, 

168 "type": getattr( 

169 node.node_type, "value", str(node.node_type) 

170 ), 

171 "title": node.title, 

172 "centrality": node.centrality_score, 

173 "entities": node.entities, 

174 "topics": node.topics, 

175 } 

176 for node in self.knowledge_graph.nodes.values() 

177 ], 

178 "edges": [ 

179 { 

180 "source": edge.source_id, 

181 "target": edge.target_id, 

182 "relationship": getattr( 

183 edge.relationship_type, 

184 "value", 

185 str(edge.relationship_type), 

186 ), 

187 "weight": edge.weight, 

188 "confidence": edge.confidence, 

189 } 

190 for edge in self.knowledge_graph.edges.values() 

191 ], 

192 } 

193 

194 class EnhancedJSONEncoder(json.JSONEncoder): 

195 def default(self, obj: Any) -> Any: # type: ignore[override] 

196 if isinstance(obj, datetime | date | dtime): 

197 return obj.isoformat() 

198 if isinstance(obj, Enum): 

199 return getattr(obj, "value", str(obj)) 

200 if isinstance(obj, set): 

201 return list(obj) 

202 if isinstance(obj, bytes): 

203 try: 

204 return obj.decode("utf-8") 

205 except Exception: 

206 return obj.hex() 

207 if isinstance(obj, Path): 

208 return str(obj) 

209 if hasattr(obj, "to_dict"): 

210 try: 

211 return obj.to_dict() 

212 except Exception: 

213 return str(obj) 

214 if hasattr(obj, "__dict__"): 

215 try: 

216 return vars(obj) 

217 except Exception: 

218 return str(obj) 

219 return str(obj) 

220 

221 try: 

222 return json.dumps(data, indent=2, cls=EnhancedJSONEncoder) 

223 except TypeError: 

224 # Fallback: sanitize recursively 

225 def sanitize(value: Any) -> Any: 

226 try: 

227 json.dumps(value) 

228 return value 

229 except TypeError: 

230 if isinstance(value, dict): 

231 return { 

232 sanitize(k): sanitize(v) for k, v in value.items() 

233 } 

234 if isinstance(value, list | tuple | set): 

235 return [sanitize(v) for v in value] 

236 if isinstance(value, datetime | date | dtime): 

237 return value.isoformat() 

238 if isinstance(value, Enum): 

239 return getattr(value, "value", str(value)) 

240 if isinstance(value, bytes): 

241 try: 

242 return value.decode("utf-8") 

243 except Exception: 

244 return value.hex() 

245 return str(value) 

246 

247 safe_data = sanitize(data) 

248 return json.dumps(safe_data, indent=2) 

249 

250 except Exception as e: 

251 logger.error(f"Failed to export graph: {e}") 

252 

253 return None