Coverage for src / qdrant_loader_mcp_server / search / enhanced / kg / builder.py: 90%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:51 +0000

1""" 

2Knowledge Graph Builder. 

3 

4This module implements graph construction logic, building knowledge graphs 

5from document metadata and search results with intelligent relationship extraction. 

6""" 

7 

8from __future__ import annotations 

9 

10import hashlib 

11import json 

12import time 

13from collections import Counter, defaultdict 

14from typing import TYPE_CHECKING, Any 

15 

16if TYPE_CHECKING: 

17 from ...models import SearchResult 

18 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer 

19 

20from ....utils.logging import LoggingConfig 

21from .extractors import ( 

22 extract_concepts_from_result, 

23 extract_entities_from_result, 

24 extract_keywords_from_result, 

25 extract_topics_from_result, 

26) 

27from .models import GraphEdge, GraphNode, NodeType, RelationshipType 

28from .utils import SIMILARITY_EDGE_THRESHOLD, calculate_node_similarity 

29 

30logger = LoggingConfig.get_logger(__name__) 

31 

32 

33class RecoverableBuildError(Exception): 

34 """Raised for expected parsing/validation issues during graph building.""" 

35 

36 

37class GraphBuilder: 

38 """Build knowledge graph from document metadata and search results.""" 

39 

40 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer | None = None): 

41 """Initialize the graph builder.""" 

42 # Import SpaCyQueryAnalyzer at runtime to avoid circular import 

43 if spacy_analyzer is None: 

44 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer 

45 

46 self.spacy_analyzer = SpaCyQueryAnalyzer() 

47 else: 

48 self.spacy_analyzer = spacy_analyzer 

49 logger.info("Initialized graph builder") 

50 

51 def build_from_search_results( 

52 self, search_results: list[SearchResult] 

53 ) -> Any: # KnowledgeGraph - avoiding circular import 

54 """Build knowledge graph from search results metadata.""" 

55 # Import KnowledgeGraph at runtime to avoid circular import 

56 from ..knowledge_graph import KnowledgeGraph 

57 

58 start_time = time.time() 

59 graph = KnowledgeGraph() 

60 

61 try: 

62 # Step 1: Create nodes from search results 

63 document_nodes = self._create_document_nodes(search_results) 

64 for node in document_nodes: 

65 graph.add_node(node) 

66 

67 # Step 2: Create entity and topic nodes 

68 entity_nodes, topic_nodes = self._create_concept_nodes(search_results) 

69 for node in entity_nodes + topic_nodes: 

70 graph.add_node(node) 

71 

72 # Step 3: Create relationships 

73 edges = self._create_relationships(search_results, graph) 

74 for edge in edges: 

75 graph.add_edge(edge) 

76 

77 # Step 4: Calculate centrality scores 

78 graph.calculate_centrality_scores() 

79 

80 build_time = (time.time() - start_time) * 1000 

81 stats = graph.get_statistics() 

82 

83 logger.info( 

84 f"Built knowledge graph in {build_time:.2f}ms", 

85 nodes=stats["total_nodes"], 

86 edges=stats["total_edges"], 

87 components=stats["connected_components"], 

88 ) 

89 

90 return graph 

91 

92 except ( 

93 ValueError, 

94 KeyError, 

95 json.JSONDecodeError, 

96 IndexError, 

97 RecoverableBuildError, 

98 ) as exc: 

99 # Known/parsing/validation issues: log and return a clear recoverable indicator 

100 logger.exception( 

101 "Recoverable error while building knowledge graph", error=str(exc) 

102 ) 

103 return None 

104 except Exception as exc: 

105 # Unexpected/critical exceptions should propagate after logging for caller visibility 

106 logger.exception( 

107 "Unexpected error while building knowledge graph", error=str(exc) 

108 ) 

109 raise 

110 

111 def _create_document_nodes( 

112 self, search_results: list[SearchResult] 

113 ) -> list[GraphNode]: 

114 """Create document and section nodes from search results.""" 

115 

116 nodes = [] 

117 seen_documents = set() 

118 

119 for result in search_results: 

120 # Create document node 

121 doc_id = _doc_id_from_result(result) 

122 

123 if doc_id not in seen_documents: 

124 seen_documents.add(doc_id) 

125 

126 doc_node = GraphNode( 

127 id=doc_id, 

128 node_type=NodeType.DOCUMENT, 

129 title=result.source_title or f"Document from {result.source_type}", 

130 content=result.text[:500], # First 500 chars as summary 

131 metadata={ 

132 "source_type": result.source_type, 

133 "source_title": result.source_title, 

134 "url": result.source_url, 

135 "project_id": result.project_id, 

136 "collection_name": result.collection_name, 

137 }, 

138 entities=self._extract_entities(result), 

139 topics=self._extract_topics(result), 

140 concepts=self._extract_concepts(result), 

141 keywords=self._extract_keywords(result), 

142 ) 

143 nodes.append(doc_node) 

144 

145 # Create section node 

146 section_id = _section_id_from_result(result) 

147 # Build a safe title string for slicing 

148 _raw_title = result.section_title or result.breadcrumb_text or "Section" 

149 _safe_title = ( 

150 _raw_title if isinstance(_raw_title, str) else str(_raw_title or "") 

151 ) 

152 section_node = GraphNode( 

153 id=section_id, 

154 node_type=NodeType.SECTION, 

155 title=(_safe_title or "")[-50:], # Last 50 chars 

156 content=result.text, 

157 metadata={ 

158 "parent_document": doc_id, 

159 "breadcrumb": result.breadcrumb_text, 

160 "section_level": result.section_level or result.depth, 

161 "score": result.score, 

162 "section_type": result.section_type, 

163 }, 

164 entities=self._extract_entities(result), 

165 topics=self._extract_topics(result), 

166 concepts=self._extract_concepts(result), 

167 keywords=self._extract_keywords(result), 

168 ) 

169 nodes.append(section_node) 

170 

171 return nodes 

172 

173 def _create_concept_nodes( 

174 self, search_results: list[SearchResult] 

175 ) -> tuple[list[GraphNode], list[GraphNode]]: 

176 """Create entity and topic nodes from extracted metadata.""" 

177 

178 # Collect all entities and topics 

179 entity_counts = Counter() 

180 topic_counts = Counter() 

181 

182 for result in search_results: 

183 entities = self._extract_entities(result) 

184 topics = self._extract_topics(result) 

185 

186 for entity in entities: 

187 entity_counts[entity] += 1 

188 for topic in topics: 

189 topic_counts[topic] += 1 

190 

191 # Create nodes for frequent entities and topics 

192 entity_nodes = [] 

193 topic_nodes = [] 

194 

195 # Entities mentioned in at least 2 documents 

196 for entity, count in entity_counts.items(): 

197 if count >= 2: 

198 entity_node = GraphNode( 

199 id=_build_stable_id("entity", entity), 

200 node_type=NodeType.ENTITY, 

201 title=entity, 

202 metadata={"mention_count": count, "entity_type": "extracted"}, 

203 ) 

204 entity_nodes.append(entity_node) 

205 

206 # Topics mentioned in at least 2 documents 

207 for topic, count in topic_counts.items(): 

208 if count >= 2: 

209 topic_node = GraphNode( 

210 id=_build_stable_id("topic", topic), 

211 node_type=NodeType.TOPIC, 

212 title=topic, 

213 metadata={"mention_count": count, "topic_type": "extracted"}, 

214 ) 

215 topic_nodes.append(topic_node) 

216 

217 return entity_nodes, topic_nodes 

218 

219 def _create_relationships( 

220 self, 

221 search_results: list[SearchResult], 

222 graph: Any, # KnowledgeGraph - avoiding circular import 

223 ) -> list[GraphEdge]: 

224 """Create relationships between graph nodes.""" 

225 

226 edges = [] 

227 

228 # Document -> Section relationships 

229 for result in search_results: 

230 doc_id = _doc_id_from_result(result) 

231 section_id = _section_id_from_result(result) 

232 

233 if doc_id in graph.nodes and section_id in graph.nodes: 

234 edge = GraphEdge( 

235 source_id=doc_id, 

236 target_id=section_id, 

237 relationship_type=RelationshipType.CONTAINS, 

238 weight=1.0, 

239 confidence=1.0, 

240 evidence=["hierarchical_structure"], 

241 ) 

242 edges.append(edge) 

243 

244 # Entity relationships 

245 entity_edges = self._create_entity_relationships(search_results, graph) 

246 edges.extend(entity_edges) 

247 

248 # Topic relationships 

249 topic_edges = self._create_topic_relationships(search_results, graph) 

250 edges.extend(topic_edges) 

251 

252 # Semantic similarity relationships 

253 similarity_edges = self._create_similarity_relationships(graph) 

254 edges.extend(similarity_edges) 

255 

256 return edges 

257 

258 def _create_entity_relationships( 

259 self, 

260 search_results: list[SearchResult], 

261 graph: Any, # KnowledgeGraph - avoiding circular import 

262 ) -> list[GraphEdge]: 

263 """Create entity-related relationships.""" 

264 

265 edges = [] 

266 

267 # Document/Section mentions Entity 

268 for result in search_results: 

269 section_id = _section_id_from_result(result) 

270 entities = self._extract_entities(result) 

271 

272 for entity in entities: 

273 entity_nodes = graph.find_nodes_by_entity(entity) 

274 for entity_node in entity_nodes: 

275 if section_id in graph.nodes: 

276 edge = GraphEdge( 

277 source_id=section_id, 

278 target_id=entity_node.id, 

279 relationship_type=RelationshipType.MENTIONS, 

280 weight=0.7, 

281 confidence=0.8, 

282 evidence=[f"entity_extraction: {entity}"], 

283 ) 

284 edges.append(edge) 

285 

286 # Entity co-occurrence relationships 

287 co_occurrence_edges = self._create_entity_cooccurrence(search_results, graph) 

288 edges.extend(co_occurrence_edges) 

289 

290 return edges 

291 

292 def _create_topic_relationships( 

293 self, 

294 search_results: list[SearchResult], 

295 graph: Any, # KnowledgeGraph - avoiding circular import 

296 ) -> list[GraphEdge]: 

297 """Create topic-related relationships.""" 

298 

299 edges = [] 

300 

301 # Document/Section discusses Topic 

302 for result in search_results: 

303 section_id = _section_id_from_result(result) 

304 topics = self._extract_topics(result) 

305 

306 for topic in topics: 

307 topic_nodes = graph.find_nodes_by_topic(topic) 

308 for topic_node in topic_nodes: 

309 if section_id in graph.nodes: 

310 edge = GraphEdge( 

311 source_id=section_id, 

312 target_id=topic_node.id, 

313 relationship_type=RelationshipType.DISCUSSES, 

314 weight=0.6, 

315 confidence=0.7, 

316 evidence=[f"topic_extraction: {topic}"], 

317 ) 

318 edges.append(edge) 

319 

320 return edges 

321 

322 def _create_entity_cooccurrence( 

323 self, 

324 search_results: list[SearchResult], 

325 graph: Any, # KnowledgeGraph - avoiding circular import 

326 ) -> list[GraphEdge]: 

327 """Create entity co-occurrence relationships.""" 

328 

329 edges = [] 

330 cooccurrence_counts = defaultdict(int) 

331 

332 # Count entity co-occurrences 

333 for result in search_results: 

334 entities = self._extract_entities(result) 

335 for i, entity1 in enumerate(entities): 

336 for entity2 in entities[i + 1 :]: 

337 pair = tuple(sorted([entity1, entity2])) 

338 cooccurrence_counts[pair] += 1 

339 

340 # Create edges for significant co-occurrences 

341 for (entity1, entity2), count in cooccurrence_counts.items(): 

342 if count >= 2: # Appeared together at least twice 

343 entity1_nodes = graph.find_nodes_by_entity(entity1) 

344 entity2_nodes = graph.find_nodes_by_entity(entity2) 

345 

346 for node1 in entity1_nodes: 

347 for node2 in entity2_nodes: 

348 weight = min(1.0, count / 5.0) # Normalize to max 1.0 

349 edge = GraphEdge( 

350 source_id=node1.id, 

351 target_id=node2.id, 

352 relationship_type=RelationshipType.CO_OCCURS, 

353 weight=weight, 

354 confidence=weight, 

355 evidence=[f"co_occurrence_count: {count}"], 

356 ) 

357 edges.append(edge) 

358 

359 return edges 

360 

361 def _create_similarity_relationships( 

362 self, 

363 graph: Any, # KnowledgeGraph - avoiding circular import 

364 ) -> list[GraphEdge]: 

365 """Create semantic similarity relationships between nodes.""" 

366 

367 edges = [] 

368 

369 # Calculate similarity between section nodes 

370 section_nodes = graph.find_nodes_by_type(NodeType.SECTION) 

371 

372 for i, node1 in enumerate(section_nodes): 

373 for node2 in section_nodes[i + 1 :]: 

374 similarity = self._calculate_node_similarity(node1, node2) 

375 

376 if ( 

377 similarity > SIMILARITY_EDGE_THRESHOLD 

378 ): # Threshold for meaningful similarity 

379 edge = GraphEdge( 

380 source_id=node1.id, 

381 target_id=node2.id, 

382 relationship_type=RelationshipType.SIMILAR_TO, 

383 weight=similarity, 

384 confidence=similarity, 

385 evidence=[f"semantic_similarity: {similarity:.3f}"], 

386 ) 

387 edges.append(edge) 

388 

389 return edges 

390 

391 def _calculate_node_similarity(self, node1: GraphNode, node2: GraphNode) -> float: 

392 """Calculate similarity between two nodes.""" 

393 return calculate_node_similarity(node1, node2) 

394 

395 def _extract_entities(self, result: SearchResult) -> list[str]: 

396 return extract_entities_from_result(result) 

397 

398 def _extract_topics(self, result: SearchResult) -> list[str]: 

399 return extract_topics_from_result(result) 

400 

401 def _extract_concepts(self, result: SearchResult) -> list[str]: 

402 return extract_concepts_from_result(result) 

403 

404 def _extract_keywords(self, result: SearchResult) -> list[str]: 

405 return extract_keywords_from_result(result) 

406 

407 

408def _stable_hash(value: Any) -> str: 

409 """Compute a deterministic SHA-256 hex digest for a value using stable serialization.""" 

410 try: 

411 canonical = json.dumps( 

412 value, sort_keys=True, separators=(",", ":"), ensure_ascii=False 

413 ) 

414 except Exception: 

415 canonical = str(value) 

416 return hashlib.sha256(canonical.encode("utf-8")).hexdigest() 

417 

418 

419def _build_stable_id(prefix: str, value: Any, digest_length: int = 16) -> str: 

420 """Build a stable node id using a prefix and the truncated SHA-256 digest of the value.""" 

421 digest = _stable_hash(value)[:digest_length] 

422 return f"{prefix}_{digest}" 

423 

424 

425def _id_from_result(result: Any, prefix: str) -> str: 

426 """Create a stable node id from a search result using a given prefix. 

427 

428 Shared logic for document and section identifiers: 

429 - Use result.source_url if present 

430 - Otherwise fallback to the first 100 characters of result.text 

431 - Include result.source_type (defaulting to "unknown") in the id 

432 The final id format is: {prefix}_{source_type}_{digest} 

433 where digest is the first 16 characters of the SHA-256 hexdigest. 

434 """ 

435 source_type = getattr(result, "source_type", "") or "unknown" 

436 preferred_identifier = getattr(result, "source_url", None) 

437 if not preferred_identifier: 

438 preferred_identifier = (getattr(result, "text", "") or "")[:100] 

439 if not isinstance(preferred_identifier, str): 

440 preferred_identifier = str(preferred_identifier) 

441 

442 digest = hashlib.sha256(preferred_identifier.encode("utf-8")).hexdigest()[:16] 

443 return f"{prefix}_{source_type}_{digest}" 

444 

445 

446def _doc_id_from_result(result: Any) -> str: 

447 """Create a stable document node id from a search result.""" 

448 return _id_from_result(result, "doc") 

449 

450 

451def _section_id_from_result(result: Any) -> str: 

452 """Create a stable section node id from a search result.""" 

453 return _id_from_result(result, "section")