Coverage for src/qdrant_loader_mcp_server/search/enhanced/kg/builder.py: 90%

182 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Knowledge Graph Builder. 

3 

4This module implements graph construction logic, building knowledge graphs 

5from document metadata and search results with intelligent relationship extraction. 

6""" 

7 

8from __future__ import annotations 

9 

10import hashlib 

11import json 

12import time 

13from collections import Counter, defaultdict 

14from typing import TYPE_CHECKING, Any 

15 

16if TYPE_CHECKING: 

17 from ...models import SearchResult 

18 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer 

19 

20from ....utils.logging import LoggingConfig 

21from .extractors import ( 

22 extract_concepts_from_result, 

23 extract_entities_from_result, 

24 extract_keywords_from_result, 

25 extract_topics_from_result, 

26) 

27from .models import ( 

28 GraphEdge, 

29 GraphNode, 

30 NodeType, 

31 RelationshipType, 

32) 

33from .utils import ( 

34 SIMILARITY_EDGE_THRESHOLD, 

35 calculate_node_similarity, 

36) 

37 

38logger = LoggingConfig.get_logger(__name__) 

39 

40 

41class RecoverableBuildError(Exception): 

42 """Raised for expected parsing/validation issues during graph building.""" 

43 

44 

45class GraphBuilder: 

46 """Build knowledge graph from document metadata and search results.""" 

47 

48 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer | None = None): 

49 """Initialize the graph builder.""" 

50 # Import SpaCyQueryAnalyzer at runtime to avoid circular import 

51 if spacy_analyzer is None: 

52 from ...nlp.spacy_analyzer import SpaCyQueryAnalyzer 

53 

54 self.spacy_analyzer = SpaCyQueryAnalyzer() 

55 else: 

56 self.spacy_analyzer = spacy_analyzer 

57 logger.info("Initialized graph builder") 

58 

59 def build_from_search_results( 

60 self, search_results: list[SearchResult] 

61 ) -> Any: # KnowledgeGraph - avoiding circular import 

62 """Build knowledge graph from search results metadata.""" 

63 # Import KnowledgeGraph at runtime to avoid circular import 

64 from ..knowledge_graph import KnowledgeGraph 

65 

66 start_time = time.time() 

67 graph = KnowledgeGraph() 

68 

69 try: 

70 # Step 1: Create nodes from search results 

71 document_nodes = self._create_document_nodes(search_results) 

72 for node in document_nodes: 

73 graph.add_node(node) 

74 

75 # Step 2: Create entity and topic nodes 

76 entity_nodes, topic_nodes = self._create_concept_nodes(search_results) 

77 for node in entity_nodes + topic_nodes: 

78 graph.add_node(node) 

79 

80 # Step 3: Create relationships 

81 edges = self._create_relationships(search_results, graph) 

82 for edge in edges: 

83 graph.add_edge(edge) 

84 

85 # Step 4: Calculate centrality scores 

86 graph.calculate_centrality_scores() 

87 

88 build_time = (time.time() - start_time) * 1000 

89 stats = graph.get_statistics() 

90 

91 logger.info( 

92 f"Built knowledge graph in {build_time:.2f}ms", 

93 nodes=stats["total_nodes"], 

94 edges=stats["total_edges"], 

95 components=stats["connected_components"], 

96 ) 

97 

98 return graph 

99 

100 except ( 

101 ValueError, 

102 KeyError, 

103 json.JSONDecodeError, 

104 IndexError, 

105 RecoverableBuildError, 

106 ) as exc: 

107 # Known/parsing/validation issues: log and return a clear recoverable indicator 

108 logger.exception( 

109 "Recoverable error while building knowledge graph", error=str(exc) 

110 ) 

111 return None 

112 except Exception as exc: 

113 # Unexpected/critical exceptions should propagate after logging for caller visibility 

114 logger.exception( 

115 "Unexpected error while building knowledge graph", error=str(exc) 

116 ) 

117 raise 

118 

119 def _create_document_nodes( 

120 self, search_results: list[SearchResult] 

121 ) -> list[GraphNode]: 

122 """Create document and section nodes from search results.""" 

123 

124 nodes = [] 

125 seen_documents = set() 

126 

127 for result in search_results: 

128 # Create document node 

129 doc_id = _doc_id_from_result(result) 

130 

131 if doc_id not in seen_documents: 

132 seen_documents.add(doc_id) 

133 

134 doc_node = GraphNode( 

135 id=doc_id, 

136 node_type=NodeType.DOCUMENT, 

137 title=result.source_title or f"Document from {result.source_type}", 

138 content=result.text[:500], # First 500 chars as summary 

139 metadata={ 

140 "source_type": result.source_type, 

141 "source_title": result.source_title, 

142 "url": result.source_url, 

143 "project_id": result.project_id, 

144 "collection_name": result.collection_name, 

145 }, 

146 entities=self._extract_entities(result), 

147 topics=self._extract_topics(result), 

148 concepts=self._extract_concepts(result), 

149 keywords=self._extract_keywords(result), 

150 ) 

151 nodes.append(doc_node) 

152 

153 # Create section node 

154 section_id = _section_id_from_result(result) 

155 # Build a safe title string for slicing 

156 _raw_title = result.section_title or result.breadcrumb_text or "Section" 

157 _safe_title = ( 

158 _raw_title if isinstance(_raw_title, str) else str(_raw_title or "") 

159 ) 

160 section_node = GraphNode( 

161 id=section_id, 

162 node_type=NodeType.SECTION, 

163 title=(_safe_title or "")[-50:], # Last 50 chars 

164 content=result.text, 

165 metadata={ 

166 "parent_document": doc_id, 

167 "breadcrumb": result.breadcrumb_text, 

168 "section_level": result.section_level or result.depth, 

169 "score": result.score, 

170 "section_type": result.section_type, 

171 }, 

172 entities=self._extract_entities(result), 

173 topics=self._extract_topics(result), 

174 concepts=self._extract_concepts(result), 

175 keywords=self._extract_keywords(result), 

176 ) 

177 nodes.append(section_node) 

178 

179 return nodes 

180 

181 def _create_concept_nodes( 

182 self, search_results: list[SearchResult] 

183 ) -> tuple[list[GraphNode], list[GraphNode]]: 

184 """Create entity and topic nodes from extracted metadata.""" 

185 

186 # Collect all entities and topics 

187 entity_counts = Counter() 

188 topic_counts = Counter() 

189 

190 for result in search_results: 

191 entities = self._extract_entities(result) 

192 topics = self._extract_topics(result) 

193 

194 for entity in entities: 

195 entity_counts[entity] += 1 

196 for topic in topics: 

197 topic_counts[topic] += 1 

198 

199 # Create nodes for frequent entities and topics 

200 entity_nodes = [] 

201 topic_nodes = [] 

202 

203 # Entities mentioned in at least 2 documents 

204 for entity, count in entity_counts.items(): 

205 if count >= 2: 

206 entity_node = GraphNode( 

207 id=_build_stable_id("entity", entity), 

208 node_type=NodeType.ENTITY, 

209 title=entity, 

210 metadata={"mention_count": count, "entity_type": "extracted"}, 

211 ) 

212 entity_nodes.append(entity_node) 

213 

214 # Topics mentioned in at least 2 documents 

215 for topic, count in topic_counts.items(): 

216 if count >= 2: 

217 topic_node = GraphNode( 

218 id=_build_stable_id("topic", topic), 

219 node_type=NodeType.TOPIC, 

220 title=topic, 

221 metadata={"mention_count": count, "topic_type": "extracted"}, 

222 ) 

223 topic_nodes.append(topic_node) 

224 

225 return entity_nodes, topic_nodes 

226 

227 def _create_relationships( 

228 self, 

229 search_results: list[SearchResult], 

230 graph: Any, # KnowledgeGraph - avoiding circular import 

231 ) -> list[GraphEdge]: 

232 """Create relationships between graph nodes.""" 

233 

234 edges = [] 

235 

236 # Document -> Section relationships 

237 for result in search_results: 

238 doc_id = _doc_id_from_result(result) 

239 section_id = _section_id_from_result(result) 

240 

241 if doc_id in graph.nodes and section_id in graph.nodes: 

242 edge = GraphEdge( 

243 source_id=doc_id, 

244 target_id=section_id, 

245 relationship_type=RelationshipType.CONTAINS, 

246 weight=1.0, 

247 confidence=1.0, 

248 evidence=["hierarchical_structure"], 

249 ) 

250 edges.append(edge) 

251 

252 # Entity relationships 

253 entity_edges = self._create_entity_relationships(search_results, graph) 

254 edges.extend(entity_edges) 

255 

256 # Topic relationships 

257 topic_edges = self._create_topic_relationships(search_results, graph) 

258 edges.extend(topic_edges) 

259 

260 # Semantic similarity relationships 

261 similarity_edges = self._create_similarity_relationships(graph) 

262 edges.extend(similarity_edges) 

263 

264 return edges 

265 

266 def _create_entity_relationships( 

267 self, 

268 search_results: list[SearchResult], 

269 graph: Any, # KnowledgeGraph - avoiding circular import 

270 ) -> list[GraphEdge]: 

271 """Create entity-related relationships.""" 

272 

273 edges = [] 

274 

275 # Document/Section mentions Entity 

276 for result in search_results: 

277 section_id = _section_id_from_result(result) 

278 entities = self._extract_entities(result) 

279 

280 for entity in entities: 

281 entity_nodes = graph.find_nodes_by_entity(entity) 

282 for entity_node in entity_nodes: 

283 if section_id in graph.nodes: 

284 edge = GraphEdge( 

285 source_id=section_id, 

286 target_id=entity_node.id, 

287 relationship_type=RelationshipType.MENTIONS, 

288 weight=0.7, 

289 confidence=0.8, 

290 evidence=[f"entity_extraction: {entity}"], 

291 ) 

292 edges.append(edge) 

293 

294 # Entity co-occurrence relationships 

295 co_occurrence_edges = self._create_entity_cooccurrence(search_results, graph) 

296 edges.extend(co_occurrence_edges) 

297 

298 return edges 

299 

300 def _create_topic_relationships( 

301 self, 

302 search_results: list[SearchResult], 

303 graph: Any, # KnowledgeGraph - avoiding circular import 

304 ) -> list[GraphEdge]: 

305 """Create topic-related relationships.""" 

306 

307 edges = [] 

308 

309 # Document/Section discusses Topic 

310 for result in search_results: 

311 section_id = _section_id_from_result(result) 

312 topics = self._extract_topics(result) 

313 

314 for topic in topics: 

315 topic_nodes = graph.find_nodes_by_topic(topic) 

316 for topic_node in topic_nodes: 

317 if section_id in graph.nodes: 

318 edge = GraphEdge( 

319 source_id=section_id, 

320 target_id=topic_node.id, 

321 relationship_type=RelationshipType.DISCUSSES, 

322 weight=0.6, 

323 confidence=0.7, 

324 evidence=[f"topic_extraction: {topic}"], 

325 ) 

326 edges.append(edge) 

327 

328 return edges 

329 

330 def _create_entity_cooccurrence( 

331 self, 

332 search_results: list[SearchResult], 

333 graph: Any, # KnowledgeGraph - avoiding circular import 

334 ) -> list[GraphEdge]: 

335 """Create entity co-occurrence relationships.""" 

336 

337 edges = [] 

338 cooccurrence_counts = defaultdict(int) 

339 

340 # Count entity co-occurrences 

341 for result in search_results: 

342 entities = self._extract_entities(result) 

343 for i, entity1 in enumerate(entities): 

344 for entity2 in entities[i + 1 :]: 

345 pair = tuple(sorted([entity1, entity2])) 

346 cooccurrence_counts[pair] += 1 

347 

348 # Create edges for significant co-occurrences 

349 for (entity1, entity2), count in cooccurrence_counts.items(): 

350 if count >= 2: # Appeared together at least twice 

351 entity1_nodes = graph.find_nodes_by_entity(entity1) 

352 entity2_nodes = graph.find_nodes_by_entity(entity2) 

353 

354 for node1 in entity1_nodes: 

355 for node2 in entity2_nodes: 

356 weight = min(1.0, count / 5.0) # Normalize to max 1.0 

357 edge = GraphEdge( 

358 source_id=node1.id, 

359 target_id=node2.id, 

360 relationship_type=RelationshipType.CO_OCCURS, 

361 weight=weight, 

362 confidence=weight, 

363 evidence=[f"co_occurrence_count: {count}"], 

364 ) 

365 edges.append(edge) 

366 

367 return edges 

368 

369 def _create_similarity_relationships( 

370 self, graph: Any # KnowledgeGraph - avoiding circular import 

371 ) -> list[GraphEdge]: 

372 """Create semantic similarity relationships between nodes.""" 

373 

374 edges = [] 

375 

376 # Calculate similarity between section nodes 

377 section_nodes = graph.find_nodes_by_type(NodeType.SECTION) 

378 

379 for i, node1 in enumerate(section_nodes): 

380 for node2 in section_nodes[i + 1 :]: 

381 similarity = self._calculate_node_similarity(node1, node2) 

382 

383 if ( 

384 similarity > SIMILARITY_EDGE_THRESHOLD 

385 ): # Threshold for meaningful similarity 

386 edge = GraphEdge( 

387 source_id=node1.id, 

388 target_id=node2.id, 

389 relationship_type=RelationshipType.SIMILAR_TO, 

390 weight=similarity, 

391 confidence=similarity, 

392 evidence=[f"semantic_similarity: {similarity:.3f}"], 

393 ) 

394 edges.append(edge) 

395 

396 return edges 

397 

398 def _calculate_node_similarity(self, node1: GraphNode, node2: GraphNode) -> float: 

399 """Calculate similarity between two nodes.""" 

400 return calculate_node_similarity(node1, node2) 

401 

402 def _extract_entities(self, result: SearchResult) -> list[str]: 

403 return extract_entities_from_result(result) 

404 

405 def _extract_topics(self, result: SearchResult) -> list[str]: 

406 return extract_topics_from_result(result) 

407 

408 def _extract_concepts(self, result: SearchResult) -> list[str]: 

409 return extract_concepts_from_result(result) 

410 

411 def _extract_keywords(self, result: SearchResult) -> list[str]: 

412 return extract_keywords_from_result(result) 

413 

414 

415def _stable_hash(value: Any) -> str: 

416 """Compute a deterministic SHA-256 hex digest for a value using stable serialization.""" 

417 try: 

418 canonical = json.dumps( 

419 value, sort_keys=True, separators=(",", ":"), ensure_ascii=False 

420 ) 

421 except Exception: 

422 canonical = str(value) 

423 return hashlib.sha256(canonical.encode("utf-8")).hexdigest() 

424 

425 

426def _build_stable_id(prefix: str, value: Any, digest_length: int = 16) -> str: 

427 """Build a stable node id using a prefix and the truncated SHA-256 digest of the value.""" 

428 digest = _stable_hash(value)[:digest_length] 

429 return f"{prefix}_{digest}" 

430 

431 

432def _id_from_result(result: Any, prefix: str) -> str: 

433 """Create a stable node id from a search result using a given prefix. 

434 

435 Shared logic for document and section identifiers: 

436 - Use result.source_url if present 

437 - Otherwise fallback to the first 100 characters of result.text 

438 - Include result.source_type (defaulting to "unknown") in the id 

439 The final id format is: {prefix}_{source_type}_{digest} 

440 where digest is the first 16 characters of the SHA-256 hexdigest. 

441 """ 

442 source_type = getattr(result, "source_type", "") or "unknown" 

443 preferred_identifier = getattr(result, "source_url", None) 

444 if not preferred_identifier: 

445 preferred_identifier = (getattr(result, "text", "") or "")[:100] 

446 if not isinstance(preferred_identifier, str): 

447 preferred_identifier = str(preferred_identifier) 

448 

449 digest = hashlib.sha256(preferred_identifier.encode("utf-8")).hexdigest()[:16] 

450 return f"{prefix}_{source_type}_{digest}" 

451 

452 

453def _doc_id_from_result(result: Any) -> str: 

454 """Create a stable document node id from a search result.""" 

455 return _id_from_result(result, "doc") 

456 

457 

458def _section_id_from_result(result: Any) -> str: 

459 """Create a stable section node id from a search result.""" 

460 return _id_from_result(result, "section")