Coverage for src / qdrant_loader_mcp_server / search / enhanced / kg / builder.py: 90%
182 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:51 +0000
1"""
2Knowledge Graph Builder.
4This module implements graph construction logic, building knowledge graphs
5from document metadata and search results with intelligent relationship extraction.
6"""
8from __future__ import annotations
10import hashlib
11import json
12import time
13from collections import Counter, defaultdict
14from typing import TYPE_CHECKING, Any
16if TYPE_CHECKING:
17 from ...models import SearchResult
18 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer
20from ....utils.logging import LoggingConfig
21from .extractors import (
22 extract_concepts_from_result,
23 extract_entities_from_result,
24 extract_keywords_from_result,
25 extract_topics_from_result,
26)
27from .models import GraphEdge, GraphNode, NodeType, RelationshipType
28from .utils import SIMILARITY_EDGE_THRESHOLD, calculate_node_similarity
30logger = LoggingConfig.get_logger(__name__)
33class RecoverableBuildError(Exception):
34 """Raised for expected parsing/validation issues during graph building."""
37class GraphBuilder:
38 """Build knowledge graph from document metadata and search results."""
40 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer | None = None):
41 """Initialize the graph builder."""
42 # Import SpaCyQueryAnalyzer at runtime to avoid circular import
43 if spacy_analyzer is None:
44 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer
46 self.spacy_analyzer = SpaCyQueryAnalyzer()
47 else:
48 self.spacy_analyzer = spacy_analyzer
49 logger.info("Initialized graph builder")
51 def build_from_search_results(
52 self, search_results: list[SearchResult]
53 ) -> Any: # KnowledgeGraph - avoiding circular import
54 """Build knowledge graph from search results metadata."""
55 # Import KnowledgeGraph at runtime to avoid circular import
56 from ..knowledge_graph import KnowledgeGraph
58 start_time = time.time()
59 graph = KnowledgeGraph()
61 try:
62 # Step 1: Create nodes from search results
63 document_nodes = self._create_document_nodes(search_results)
64 for node in document_nodes:
65 graph.add_node(node)
67 # Step 2: Create entity and topic nodes
68 entity_nodes, topic_nodes = self._create_concept_nodes(search_results)
69 for node in entity_nodes + topic_nodes:
70 graph.add_node(node)
72 # Step 3: Create relationships
73 edges = self._create_relationships(search_results, graph)
74 for edge in edges:
75 graph.add_edge(edge)
77 # Step 4: Calculate centrality scores
78 graph.calculate_centrality_scores()
80 build_time = (time.time() - start_time) * 1000
81 stats = graph.get_statistics()
83 logger.info(
84 f"Built knowledge graph in {build_time:.2f}ms",
85 nodes=stats["total_nodes"],
86 edges=stats["total_edges"],
87 components=stats["connected_components"],
88 )
90 return graph
92 except (
93 ValueError,
94 KeyError,
95 json.JSONDecodeError,
96 IndexError,
97 RecoverableBuildError,
98 ) as exc:
99 # Known/parsing/validation issues: log and return a clear recoverable indicator
100 logger.exception(
101 "Recoverable error while building knowledge graph", error=str(exc)
102 )
103 return None
104 except Exception as exc:
105 # Unexpected/critical exceptions should propagate after logging for caller visibility
106 logger.exception(
107 "Unexpected error while building knowledge graph", error=str(exc)
108 )
109 raise
111 def _create_document_nodes(
112 self, search_results: list[SearchResult]
113 ) -> list[GraphNode]:
114 """Create document and section nodes from search results."""
116 nodes = []
117 seen_documents = set()
119 for result in search_results:
120 # Create document node
121 doc_id = _doc_id_from_result(result)
123 if doc_id not in seen_documents:
124 seen_documents.add(doc_id)
126 doc_node = GraphNode(
127 id=doc_id,
128 node_type=NodeType.DOCUMENT,
129 title=result.source_title or f"Document from {result.source_type}",
130 content=result.text[:500], # First 500 chars as summary
131 metadata={
132 "source_type": result.source_type,
133 "source_title": result.source_title,
134 "url": result.source_url,
135 "project_id": result.project_id,
136 "collection_name": result.collection_name,
137 },
138 entities=self._extract_entities(result),
139 topics=self._extract_topics(result),
140 concepts=self._extract_concepts(result),
141 keywords=self._extract_keywords(result),
142 )
143 nodes.append(doc_node)
145 # Create section node
146 section_id = _section_id_from_result(result)
147 # Build a safe title string for slicing
148 _raw_title = result.section_title or result.breadcrumb_text or "Section"
149 _safe_title = (
150 _raw_title if isinstance(_raw_title, str) else str(_raw_title or "")
151 )
152 section_node = GraphNode(
153 id=section_id,
154 node_type=NodeType.SECTION,
155 title=(_safe_title or "")[-50:], # Last 50 chars
156 content=result.text,
157 metadata={
158 "parent_document": doc_id,
159 "breadcrumb": result.breadcrumb_text,
160 "section_level": result.section_level or result.depth,
161 "score": result.score,
162 "section_type": result.section_type,
163 },
164 entities=self._extract_entities(result),
165 topics=self._extract_topics(result),
166 concepts=self._extract_concepts(result),
167 keywords=self._extract_keywords(result),
168 )
169 nodes.append(section_node)
171 return nodes
173 def _create_concept_nodes(
174 self, search_results: list[SearchResult]
175 ) -> tuple[list[GraphNode], list[GraphNode]]:
176 """Create entity and topic nodes from extracted metadata."""
178 # Collect all entities and topics
179 entity_counts = Counter()
180 topic_counts = Counter()
182 for result in search_results:
183 entities = self._extract_entities(result)
184 topics = self._extract_topics(result)
186 for entity in entities:
187 entity_counts[entity] += 1
188 for topic in topics:
189 topic_counts[topic] += 1
191 # Create nodes for frequent entities and topics
192 entity_nodes = []
193 topic_nodes = []
195 # Entities mentioned in at least 2 documents
196 for entity, count in entity_counts.items():
197 if count >= 2:
198 entity_node = GraphNode(
199 id=_build_stable_id("entity", entity),
200 node_type=NodeType.ENTITY,
201 title=entity,
202 metadata={"mention_count": count, "entity_type": "extracted"},
203 )
204 entity_nodes.append(entity_node)
206 # Topics mentioned in at least 2 documents
207 for topic, count in topic_counts.items():
208 if count >= 2:
209 topic_node = GraphNode(
210 id=_build_stable_id("topic", topic),
211 node_type=NodeType.TOPIC,
212 title=topic,
213 metadata={"mention_count": count, "topic_type": "extracted"},
214 )
215 topic_nodes.append(topic_node)
217 return entity_nodes, topic_nodes
219 def _create_relationships(
220 self,
221 search_results: list[SearchResult],
222 graph: Any, # KnowledgeGraph - avoiding circular import
223 ) -> list[GraphEdge]:
224 """Create relationships between graph nodes."""
226 edges = []
228 # Document -> Section relationships
229 for result in search_results:
230 doc_id = _doc_id_from_result(result)
231 section_id = _section_id_from_result(result)
233 if doc_id in graph.nodes and section_id in graph.nodes:
234 edge = GraphEdge(
235 source_id=doc_id,
236 target_id=section_id,
237 relationship_type=RelationshipType.CONTAINS,
238 weight=1.0,
239 confidence=1.0,
240 evidence=["hierarchical_structure"],
241 )
242 edges.append(edge)
244 # Entity relationships
245 entity_edges = self._create_entity_relationships(search_results, graph)
246 edges.extend(entity_edges)
248 # Topic relationships
249 topic_edges = self._create_topic_relationships(search_results, graph)
250 edges.extend(topic_edges)
252 # Semantic similarity relationships
253 similarity_edges = self._create_similarity_relationships(graph)
254 edges.extend(similarity_edges)
256 return edges
258 def _create_entity_relationships(
259 self,
260 search_results: list[SearchResult],
261 graph: Any, # KnowledgeGraph - avoiding circular import
262 ) -> list[GraphEdge]:
263 """Create entity-related relationships."""
265 edges = []
267 # Document/Section mentions Entity
268 for result in search_results:
269 section_id = _section_id_from_result(result)
270 entities = self._extract_entities(result)
272 for entity in entities:
273 entity_nodes = graph.find_nodes_by_entity(entity)
274 for entity_node in entity_nodes:
275 if section_id in graph.nodes:
276 edge = GraphEdge(
277 source_id=section_id,
278 target_id=entity_node.id,
279 relationship_type=RelationshipType.MENTIONS,
280 weight=0.7,
281 confidence=0.8,
282 evidence=[f"entity_extraction: {entity}"],
283 )
284 edges.append(edge)
286 # Entity co-occurrence relationships
287 co_occurrence_edges = self._create_entity_cooccurrence(search_results, graph)
288 edges.extend(co_occurrence_edges)
290 return edges
292 def _create_topic_relationships(
293 self,
294 search_results: list[SearchResult],
295 graph: Any, # KnowledgeGraph - avoiding circular import
296 ) -> list[GraphEdge]:
297 """Create topic-related relationships."""
299 edges = []
301 # Document/Section discusses Topic
302 for result in search_results:
303 section_id = _section_id_from_result(result)
304 topics = self._extract_topics(result)
306 for topic in topics:
307 topic_nodes = graph.find_nodes_by_topic(topic)
308 for topic_node in topic_nodes:
309 if section_id in graph.nodes:
310 edge = GraphEdge(
311 source_id=section_id,
312 target_id=topic_node.id,
313 relationship_type=RelationshipType.DISCUSSES,
314 weight=0.6,
315 confidence=0.7,
316 evidence=[f"topic_extraction: {topic}"],
317 )
318 edges.append(edge)
320 return edges
322 def _create_entity_cooccurrence(
323 self,
324 search_results: list[SearchResult],
325 graph: Any, # KnowledgeGraph - avoiding circular import
326 ) -> list[GraphEdge]:
327 """Create entity co-occurrence relationships."""
329 edges = []
330 cooccurrence_counts = defaultdict(int)
332 # Count entity co-occurrences
333 for result in search_results:
334 entities = self._extract_entities(result)
335 for i, entity1 in enumerate(entities):
336 for entity2 in entities[i + 1 :]:
337 pair = tuple(sorted([entity1, entity2]))
338 cooccurrence_counts[pair] += 1
340 # Create edges for significant co-occurrences
341 for (entity1, entity2), count in cooccurrence_counts.items():
342 if count >= 2: # Appeared together at least twice
343 entity1_nodes = graph.find_nodes_by_entity(entity1)
344 entity2_nodes = graph.find_nodes_by_entity(entity2)
346 for node1 in entity1_nodes:
347 for node2 in entity2_nodes:
348 weight = min(1.0, count / 5.0) # Normalize to max 1.0
349 edge = GraphEdge(
350 source_id=node1.id,
351 target_id=node2.id,
352 relationship_type=RelationshipType.CO_OCCURS,
353 weight=weight,
354 confidence=weight,
355 evidence=[f"co_occurrence_count: {count}"],
356 )
357 edges.append(edge)
359 return edges
361 def _create_similarity_relationships(
362 self,
363 graph: Any, # KnowledgeGraph - avoiding circular import
364 ) -> list[GraphEdge]:
365 """Create semantic similarity relationships between nodes."""
367 edges = []
369 # Calculate similarity between section nodes
370 section_nodes = graph.find_nodes_by_type(NodeType.SECTION)
372 for i, node1 in enumerate(section_nodes):
373 for node2 in section_nodes[i + 1 :]:
374 similarity = self._calculate_node_similarity(node1, node2)
376 if (
377 similarity > SIMILARITY_EDGE_THRESHOLD
378 ): # Threshold for meaningful similarity
379 edge = GraphEdge(
380 source_id=node1.id,
381 target_id=node2.id,
382 relationship_type=RelationshipType.SIMILAR_TO,
383 weight=similarity,
384 confidence=similarity,
385 evidence=[f"semantic_similarity: {similarity:.3f}"],
386 )
387 edges.append(edge)
389 return edges
391 def _calculate_node_similarity(self, node1: GraphNode, node2: GraphNode) -> float:
392 """Calculate similarity between two nodes."""
393 return calculate_node_similarity(node1, node2)
395 def _extract_entities(self, result: SearchResult) -> list[str]:
396 return extract_entities_from_result(result)
398 def _extract_topics(self, result: SearchResult) -> list[str]:
399 return extract_topics_from_result(result)
401 def _extract_concepts(self, result: SearchResult) -> list[str]:
402 return extract_concepts_from_result(result)
404 def _extract_keywords(self, result: SearchResult) -> list[str]:
405 return extract_keywords_from_result(result)
408def _stable_hash(value: Any) -> str:
409 """Compute a deterministic SHA-256 hex digest for a value using stable serialization."""
410 try:
411 canonical = json.dumps(
412 value, sort_keys=True, separators=(",", ":"), ensure_ascii=False
413 )
414 except Exception:
415 canonical = str(value)
416 return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
419def _build_stable_id(prefix: str, value: Any, digest_length: int = 16) -> str:
420 """Build a stable node id using a prefix and the truncated SHA-256 digest of the value."""
421 digest = _stable_hash(value)[:digest_length]
422 return f"{prefix}_{digest}"
425def _id_from_result(result: Any, prefix: str) -> str:
426 """Create a stable node id from a search result using a given prefix.
428 Shared logic for document and section identifiers:
429 - Use result.source_url if present
430 - Otherwise fallback to the first 100 characters of result.text
431 - Include result.source_type (defaulting to "unknown") in the id
432 The final id format is: {prefix}_{source_type}_{digest}
433 where digest is the first 16 characters of the SHA-256 hexdigest.
434 """
435 source_type = getattr(result, "source_type", "") or "unknown"
436 preferred_identifier = getattr(result, "source_url", None)
437 if not preferred_identifier:
438 preferred_identifier = (getattr(result, "text", "") or "")[:100]
439 if not isinstance(preferred_identifier, str):
440 preferred_identifier = str(preferred_identifier)
442 digest = hashlib.sha256(preferred_identifier.encode("utf-8")).hexdigest()[:16]
443 return f"{prefix}_{source_type}_{digest}"
446def _doc_id_from_result(result: Any) -> str:
447 """Create a stable document node id from a search result."""
448 return _id_from_result(result, "doc")
451def _section_id_from_result(result: Any) -> str:
452 """Create a stable section node id from a search result."""
453 return _id_from_result(result, "section")