Coverage for src/qdrant_loader_mcp_server/search/enhanced/kg/document_graph.py: 57%
137 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Document Knowledge Graph Interface.
4This module provides a high-level interface for document knowledge graph operations,
5integrating graph building, traversal, and content discovery.
6"""
8from __future__ import annotations
10import json
11from datetime import date, datetime
12from datetime import time as dtime
13from enum import Enum
14from pathlib import Path
15from typing import TYPE_CHECKING, Any
17if TYPE_CHECKING:
18 from ...models import SearchResult
19 from ...nlp.spacy_analyzer import QueryAnalysis, SpaCyQueryAnalyzer
20else:
21 QueryAnalysis = Any
22 SpaCyQueryAnalyzer = Any
23 SearchResult = Any
25from ....utils.logging import LoggingConfig
26from .builder import GraphBuilder
27from .graph import KnowledgeGraph
28from .models import NodeType, TraversalResult, TraversalStrategy
29from .traverser import GraphTraverser
31logger = LoggingConfig.get_logger(__name__)
34class DocumentKnowledgeGraph:
35 """High-level interface for document knowledge graph operations."""
37 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer | None = None):
38 """Initialize the document knowledge graph system."""
39 # Import SpaCyQueryAnalyzer at runtime to avoid circular import
40 if spacy_analyzer is None:
41 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer
43 self.spacy_analyzer = SpaCyQueryAnalyzer()
44 else:
45 self.spacy_analyzer = spacy_analyzer
47 self.graph_builder = GraphBuilder(self.spacy_analyzer)
48 self.knowledge_graph: KnowledgeGraph | None = None
49 self.traverser: GraphTraverser | None = None
51 logger.info("Initialized document knowledge graph system")
53 def build_graph(self, search_results: list[SearchResult]) -> bool:
54 """Build knowledge graph from search results."""
55 try:
56 self.knowledge_graph = self.graph_builder.build_from_search_results(
57 search_results
58 )
59 self.traverser = GraphTraverser(self.knowledge_graph, self.spacy_analyzer)
61 stats = self.knowledge_graph.get_statistics()
62 logger.info("Knowledge graph built successfully", **stats)
63 return True
65 except Exception as e:
66 logger.error(f"Failed to build knowledge graph: {e}")
67 return False
69 def find_related_content(
70 self,
71 query: str,
72 max_hops: int = 3,
73 max_results: int = 20,
74 strategy: TraversalStrategy = TraversalStrategy.SEMANTIC,
75 ) -> list[TraversalResult]:
76 """Find related content using graph traversal."""
78 if not self.knowledge_graph or not self.traverser:
79 logger.warning("Knowledge graph not initialized")
80 return []
82 try:
83 # Analyze query with spaCy
84 query_analysis = self.spacy_analyzer.analyze_query_semantic(query)
86 # Find starting nodes based on query entities and concepts
87 start_nodes = self._find_query_start_nodes(query_analysis)
89 if not start_nodes:
90 logger.debug("No starting nodes found for query")
91 return []
93 # Traverse graph to find related content
94 results = self.traverser.traverse(
95 start_nodes=start_nodes,
96 query_analysis=query_analysis,
97 strategy=strategy,
98 max_hops=max_hops,
99 max_results=max_results,
100 )
102 logger.debug(
103 f"Found {len(results)} related content items via graph traversal"
104 )
105 return results
107 except Exception as e:
108 logger.error(f"Failed to find related content: {e}")
109 return []
111 def _find_query_start_nodes(self, query_analysis: QueryAnalysis) -> list[str]:
112 """Find starting nodes for graph traversal based on query analysis."""
114 start_nodes = []
116 # Find nodes by entities (defensive against unexpected formats)
117 for item in getattr(query_analysis, "entities", []) or []:
118 try:
119 entity_text: str | None = None
120 # Accept tuple/list like (text, label, ...)
121 if isinstance(item, list | tuple) and len(item) > 0:
122 entity_text = item[0]
123 # Accept direct string entity
124 elif isinstance(item, str):
125 entity_text = item
126 # Skip dicts or None/empty safely
127 if entity_text:
128 entity_nodes = self.knowledge_graph.find_nodes_by_entity(
129 entity_text
130 )
131 start_nodes.extend([node.id for node in entity_nodes])
132 except IndexError:
133 # Malformed entity entry; skip
134 continue
136 # Find nodes by main concepts (as topics)
137 for concept in query_analysis.main_concepts:
138 topic_nodes = self.knowledge_graph.find_nodes_by_topic(concept)
139 start_nodes.extend([node.id for node in topic_nodes])
141 # If no entity/topic matches, use high-centrality document nodes
142 if not start_nodes:
143 doc_nodes = self.knowledge_graph.find_nodes_by_type(NodeType.DOCUMENT)
144 # Sort by centrality and take top 3
145 doc_nodes.sort(key=lambda n: n.centrality_score, reverse=True)
146 start_nodes = [node.id for node in doc_nodes[:3]]
148 return list(set(start_nodes)) # Remove duplicates
150 def get_graph_statistics(self) -> dict[str, Any] | None:
151 """Get knowledge graph statistics."""
152 if self.knowledge_graph:
153 return self.knowledge_graph.get_statistics()
154 return None
156 def export_graph(self, format: str = "json") -> str | None:
157 """Export knowledge graph in specified format."""
158 if not self.knowledge_graph:
159 return None
161 try:
162 if format == "json":
163 # Export as JSON with nodes and edges
164 data = {
165 "nodes": [
166 {
167 "id": node.id,
168 "type": getattr(
169 node.node_type, "value", str(node.node_type)
170 ),
171 "title": node.title,
172 "centrality": node.centrality_score,
173 "entities": node.entities,
174 "topics": node.topics,
175 }
176 for node in self.knowledge_graph.nodes.values()
177 ],
178 "edges": [
179 {
180 "source": edge.source_id,
181 "target": edge.target_id,
182 "relationship": getattr(
183 edge.relationship_type,
184 "value",
185 str(edge.relationship_type),
186 ),
187 "weight": edge.weight,
188 "confidence": edge.confidence,
189 }
190 for edge in self.knowledge_graph.edges.values()
191 ],
192 }
194 class EnhancedJSONEncoder(json.JSONEncoder):
195 def default(self, obj: Any) -> Any: # type: ignore[override]
196 if isinstance(obj, datetime | date | dtime):
197 return obj.isoformat()
198 if isinstance(obj, Enum):
199 return getattr(obj, "value", str(obj))
200 if isinstance(obj, set):
201 return list(obj)
202 if isinstance(obj, bytes):
203 try:
204 return obj.decode("utf-8")
205 except Exception:
206 return obj.hex()
207 if isinstance(obj, Path):
208 return str(obj)
209 if hasattr(obj, "to_dict"):
210 try:
211 return obj.to_dict()
212 except Exception:
213 return str(obj)
214 if hasattr(obj, "__dict__"):
215 try:
216 return vars(obj)
217 except Exception:
218 return str(obj)
219 return str(obj)
221 try:
222 return json.dumps(data, indent=2, cls=EnhancedJSONEncoder)
223 except TypeError:
224 # Fallback: sanitize recursively
225 def sanitize(value: Any) -> Any:
226 try:
227 json.dumps(value)
228 return value
229 except TypeError:
230 if isinstance(value, dict):
231 return {
232 sanitize(k): sanitize(v) for k, v in value.items()
233 }
234 if isinstance(value, list | tuple | set):
235 return [sanitize(v) for v in value]
236 if isinstance(value, datetime | date | dtime):
237 return value.isoformat()
238 if isinstance(value, Enum):
239 return getattr(value, "value", str(value))
240 if isinstance(value, bytes):
241 try:
242 return value.decode("utf-8")
243 except Exception:
244 return value.hex()
245 return str(value)
247 safe_data = sanitize(data)
248 return json.dumps(safe_data, indent=2)
250 except Exception as e:
251 logger.error(f"Failed to export graph: {e}")
253 return None