Coverage for src/qdrant_loader_mcp_server/search/enhanced/kg/builder.py: 90%
182 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Knowledge Graph Builder.
4This module implements graph construction logic, building knowledge graphs
5from document metadata and search results with intelligent relationship extraction.
6"""
8from __future__ import annotations
10import hashlib
11import json
12import time
13from collections import Counter, defaultdict
14from typing import TYPE_CHECKING, Any
16if TYPE_CHECKING:
17 from ...models import SearchResult
18 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer
20from ....utils.logging import LoggingConfig
21from .extractors import (
22 extract_concepts_from_result,
23 extract_entities_from_result,
24 extract_keywords_from_result,
25 extract_topics_from_result,
26)
27from .models import (
28 GraphEdge,
29 GraphNode,
30 NodeType,
31 RelationshipType,
32)
33from .utils import (
34 SIMILARITY_EDGE_THRESHOLD,
35 calculate_node_similarity,
36)
38logger = LoggingConfig.get_logger(__name__)
41class RecoverableBuildError(Exception):
42 """Raised for expected parsing/validation issues during graph building."""
45class GraphBuilder:
46 """Build knowledge graph from document metadata and search results."""
48 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer | None = None):
49 """Initialize the graph builder."""
50 # Import SpaCyQueryAnalyzer at runtime to avoid circular import
51 if spacy_analyzer is None:
52 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer
54 self.spacy_analyzer = SpaCyQueryAnalyzer()
55 else:
56 self.spacy_analyzer = spacy_analyzer
57 logger.info("Initialized graph builder")
59 def build_from_search_results(
60 self, search_results: list[SearchResult]
61 ) -> Any: # KnowledgeGraph - avoiding circular import
62 """Build knowledge graph from search results metadata."""
63 # Import KnowledgeGraph at runtime to avoid circular import
64 from ..knowledge_graph import KnowledgeGraph
66 start_time = time.time()
67 graph = KnowledgeGraph()
69 try:
70 # Step 1: Create nodes from search results
71 document_nodes = self._create_document_nodes(search_results)
72 for node in document_nodes:
73 graph.add_node(node)
75 # Step 2: Create entity and topic nodes
76 entity_nodes, topic_nodes = self._create_concept_nodes(search_results)
77 for node in entity_nodes + topic_nodes:
78 graph.add_node(node)
80 # Step 3: Create relationships
81 edges = self._create_relationships(search_results, graph)
82 for edge in edges:
83 graph.add_edge(edge)
85 # Step 4: Calculate centrality scores
86 graph.calculate_centrality_scores()
88 build_time = (time.time() - start_time) * 1000
89 stats = graph.get_statistics()
91 logger.info(
92 f"Built knowledge graph in {build_time:.2f}ms",
93 nodes=stats["total_nodes"],
94 edges=stats["total_edges"],
95 components=stats["connected_components"],
96 )
98 return graph
100 except (
101 ValueError,
102 KeyError,
103 json.JSONDecodeError,
104 IndexError,
105 RecoverableBuildError,
106 ) as exc:
107 # Known/parsing/validation issues: log and return a clear recoverable indicator
108 logger.exception(
109 "Recoverable error while building knowledge graph", error=str(exc)
110 )
111 return None
112 except Exception as exc:
113 # Unexpected/critical exceptions should propagate after logging for caller visibility
114 logger.exception(
115 "Unexpected error while building knowledge graph", error=str(exc)
116 )
117 raise
119 def _create_document_nodes(
120 self, search_results: list[SearchResult]
121 ) -> list[GraphNode]:
122 """Create document and section nodes from search results."""
124 nodes = []
125 seen_documents = set()
127 for result in search_results:
128 # Create document node
129 doc_id = _doc_id_from_result(result)
131 if doc_id not in seen_documents:
132 seen_documents.add(doc_id)
134 doc_node = GraphNode(
135 id=doc_id,
136 node_type=NodeType.DOCUMENT,
137 title=result.source_title or f"Document from {result.source_type}",
138 content=result.text[:500], # First 500 chars as summary
139 metadata={
140 "source_type": result.source_type,
141 "source_title": result.source_title,
142 "url": result.source_url,
143 "project_id": result.project_id,
144 "collection_name": result.collection_name,
145 },
146 entities=self._extract_entities(result),
147 topics=self._extract_topics(result),
148 concepts=self._extract_concepts(result),
149 keywords=self._extract_keywords(result),
150 )
151 nodes.append(doc_node)
153 # Create section node
154 section_id = _section_id_from_result(result)
155 # Build a safe title string for slicing
156 _raw_title = result.section_title or result.breadcrumb_text or "Section"
157 _safe_title = (
158 _raw_title if isinstance(_raw_title, str) else str(_raw_title or "")
159 )
160 section_node = GraphNode(
161 id=section_id,
162 node_type=NodeType.SECTION,
163 title=(_safe_title or "")[-50:], # Last 50 chars
164 content=result.text,
165 metadata={
166 "parent_document": doc_id,
167 "breadcrumb": result.breadcrumb_text,
168 "section_level": result.section_level or result.depth,
169 "score": result.score,
170 "section_type": result.section_type,
171 },
172 entities=self._extract_entities(result),
173 topics=self._extract_topics(result),
174 concepts=self._extract_concepts(result),
175 keywords=self._extract_keywords(result),
176 )
177 nodes.append(section_node)
179 return nodes
181 def _create_concept_nodes(
182 self, search_results: list[SearchResult]
183 ) -> tuple[list[GraphNode], list[GraphNode]]:
184 """Create entity and topic nodes from extracted metadata."""
186 # Collect all entities and topics
187 entity_counts = Counter()
188 topic_counts = Counter()
190 for result in search_results:
191 entities = self._extract_entities(result)
192 topics = self._extract_topics(result)
194 for entity in entities:
195 entity_counts[entity] += 1
196 for topic in topics:
197 topic_counts[topic] += 1
199 # Create nodes for frequent entities and topics
200 entity_nodes = []
201 topic_nodes = []
203 # Entities mentioned in at least 2 documents
204 for entity, count in entity_counts.items():
205 if count >= 2:
206 entity_node = GraphNode(
207 id=_build_stable_id("entity", entity),
208 node_type=NodeType.ENTITY,
209 title=entity,
210 metadata={"mention_count": count, "entity_type": "extracted"},
211 )
212 entity_nodes.append(entity_node)
214 # Topics mentioned in at least 2 documents
215 for topic, count in topic_counts.items():
216 if count >= 2:
217 topic_node = GraphNode(
218 id=_build_stable_id("topic", topic),
219 node_type=NodeType.TOPIC,
220 title=topic,
221 metadata={"mention_count": count, "topic_type": "extracted"},
222 )
223 topic_nodes.append(topic_node)
225 return entity_nodes, topic_nodes
227 def _create_relationships(
228 self,
229 search_results: list[SearchResult],
230 graph: Any, # KnowledgeGraph - avoiding circular import
231 ) -> list[GraphEdge]:
232 """Create relationships between graph nodes."""
234 edges = []
236 # Document -> Section relationships
237 for result in search_results:
238 doc_id = _doc_id_from_result(result)
239 section_id = _section_id_from_result(result)
241 if doc_id in graph.nodes and section_id in graph.nodes:
242 edge = GraphEdge(
243 source_id=doc_id,
244 target_id=section_id,
245 relationship_type=RelationshipType.CONTAINS,
246 weight=1.0,
247 confidence=1.0,
248 evidence=["hierarchical_structure"],
249 )
250 edges.append(edge)
252 # Entity relationships
253 entity_edges = self._create_entity_relationships(search_results, graph)
254 edges.extend(entity_edges)
256 # Topic relationships
257 topic_edges = self._create_topic_relationships(search_results, graph)
258 edges.extend(topic_edges)
260 # Semantic similarity relationships
261 similarity_edges = self._create_similarity_relationships(graph)
262 edges.extend(similarity_edges)
264 return edges
266 def _create_entity_relationships(
267 self,
268 search_results: list[SearchResult],
269 graph: Any, # KnowledgeGraph - avoiding circular import
270 ) -> list[GraphEdge]:
271 """Create entity-related relationships."""
273 edges = []
275 # Document/Section mentions Entity
276 for result in search_results:
277 section_id = _section_id_from_result(result)
278 entities = self._extract_entities(result)
280 for entity in entities:
281 entity_nodes = graph.find_nodes_by_entity(entity)
282 for entity_node in entity_nodes:
283 if section_id in graph.nodes:
284 edge = GraphEdge(
285 source_id=section_id,
286 target_id=entity_node.id,
287 relationship_type=RelationshipType.MENTIONS,
288 weight=0.7,
289 confidence=0.8,
290 evidence=[f"entity_extraction: {entity}"],
291 )
292 edges.append(edge)
294 # Entity co-occurrence relationships
295 co_occurrence_edges = self._create_entity_cooccurrence(search_results, graph)
296 edges.extend(co_occurrence_edges)
298 return edges
300 def _create_topic_relationships(
301 self,
302 search_results: list[SearchResult],
303 graph: Any, # KnowledgeGraph - avoiding circular import
304 ) -> list[GraphEdge]:
305 """Create topic-related relationships."""
307 edges = []
309 # Document/Section discusses Topic
310 for result in search_results:
311 section_id = _section_id_from_result(result)
312 topics = self._extract_topics(result)
314 for topic in topics:
315 topic_nodes = graph.find_nodes_by_topic(topic)
316 for topic_node in topic_nodes:
317 if section_id in graph.nodes:
318 edge = GraphEdge(
319 source_id=section_id,
320 target_id=topic_node.id,
321 relationship_type=RelationshipType.DISCUSSES,
322 weight=0.6,
323 confidence=0.7,
324 evidence=[f"topic_extraction: {topic}"],
325 )
326 edges.append(edge)
328 return edges
330 def _create_entity_cooccurrence(
331 self,
332 search_results: list[SearchResult],
333 graph: Any, # KnowledgeGraph - avoiding circular import
334 ) -> list[GraphEdge]:
335 """Create entity co-occurrence relationships."""
337 edges = []
338 cooccurrence_counts = defaultdict(int)
340 # Count entity co-occurrences
341 for result in search_results:
342 entities = self._extract_entities(result)
343 for i, entity1 in enumerate(entities):
344 for entity2 in entities[i + 1 :]:
345 pair = tuple(sorted([entity1, entity2]))
346 cooccurrence_counts[pair] += 1
348 # Create edges for significant co-occurrences
349 for (entity1, entity2), count in cooccurrence_counts.items():
350 if count >= 2: # Appeared together at least twice
351 entity1_nodes = graph.find_nodes_by_entity(entity1)
352 entity2_nodes = graph.find_nodes_by_entity(entity2)
354 for node1 in entity1_nodes:
355 for node2 in entity2_nodes:
356 weight = min(1.0, count / 5.0) # Normalize to max 1.0
357 edge = GraphEdge(
358 source_id=node1.id,
359 target_id=node2.id,
360 relationship_type=RelationshipType.CO_OCCURS,
361 weight=weight,
362 confidence=weight,
363 evidence=[f"co_occurrence_count: {count}"],
364 )
365 edges.append(edge)
367 return edges
369 def _create_similarity_relationships(
370 self, graph: Any # KnowledgeGraph - avoiding circular import
371 ) -> list[GraphEdge]:
372 """Create semantic similarity relationships between nodes."""
374 edges = []
376 # Calculate similarity between section nodes
377 section_nodes = graph.find_nodes_by_type(NodeType.SECTION)
379 for i, node1 in enumerate(section_nodes):
380 for node2 in section_nodes[i + 1 :]:
381 similarity = self._calculate_node_similarity(node1, node2)
383 if (
384 similarity > SIMILARITY_EDGE_THRESHOLD
385 ): # Threshold for meaningful similarity
386 edge = GraphEdge(
387 source_id=node1.id,
388 target_id=node2.id,
389 relationship_type=RelationshipType.SIMILAR_TO,
390 weight=similarity,
391 confidence=similarity,
392 evidence=[f"semantic_similarity: {similarity:.3f}"],
393 )
394 edges.append(edge)
396 return edges
398 def _calculate_node_similarity(self, node1: GraphNode, node2: GraphNode) -> float:
399 """Calculate similarity between two nodes."""
400 return calculate_node_similarity(node1, node2)
402 def _extract_entities(self, result: SearchResult) -> list[str]:
403 return extract_entities_from_result(result)
405 def _extract_topics(self, result: SearchResult) -> list[str]:
406 return extract_topics_from_result(result)
408 def _extract_concepts(self, result: SearchResult) -> list[str]:
409 return extract_concepts_from_result(result)
411 def _extract_keywords(self, result: SearchResult) -> list[str]:
412 return extract_keywords_from_result(result)
415def _stable_hash(value: Any) -> str:
416 """Compute a deterministic SHA-256 hex digest for a value using stable serialization."""
417 try:
418 canonical = json.dumps(
419 value, sort_keys=True, separators=(",", ":"), ensure_ascii=False
420 )
421 except Exception:
422 canonical = str(value)
423 return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
426def _build_stable_id(prefix: str, value: Any, digest_length: int = 16) -> str:
427 """Build a stable node id using a prefix and the truncated SHA-256 digest of the value."""
428 digest = _stable_hash(value)[:digest_length]
429 return f"{prefix}_{digest}"
432def _id_from_result(result: Any, prefix: str) -> str:
433 """Create a stable node id from a search result using a given prefix.
435 Shared logic for document and section identifiers:
436 - Use result.source_url if present
437 - Otherwise fallback to the first 100 characters of result.text
438 - Include result.source_type (defaulting to "unknown") in the id
439 The final id format is: {prefix}_{source_type}_{digest}
440 where digest is the first 16 characters of the SHA-256 hexdigest.
441 """
442 source_type = getattr(result, "source_type", "") or "unknown"
443 preferred_identifier = getattr(result, "source_url", None)
444 if not preferred_identifier:
445 preferred_identifier = (getattr(result, "text", "") or "")[:100]
446 if not isinstance(preferred_identifier, str):
447 preferred_identifier = str(preferred_identifier)
449 digest = hashlib.sha256(preferred_identifier.encode("utf-8")).hexdigest()[:16]
450 return f"{prefix}_{source_type}_{digest}"
453def _doc_id_from_result(result: Any) -> str:
454 """Create a stable document node id from a search result."""
455 return _id_from_result(result, "doc")
458def _section_id_from_result(result: Any) -> str:
459 """Create a stable section node id from a search result."""
460 return _id_from_result(result, "section")