Coverage for src/qdrant_loader_mcp_server/search/enhanced/knowledge_graph.py: 92%

532 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:38 +0000

1"""Knowledge Graph Construction for Advanced Search Intelligence. 

2 

3This module implements Phase 2.1 of the search enhancement roadmap: 

4- Multi-node graph construction from document metadata 

5- Relationship extraction and strength scoring 

6- Graph traversal algorithms for multi-hop search 

7- Integration with spaCy-powered Phase 1.0 components 

8""" 

9 

10import logging 

11import time 

12import networkx as nx 

13from dataclasses import dataclass, field 

14from enum import Enum 

15from typing import Any, Dict, List, Optional, Set, Tuple, Union 

16from collections import defaultdict, Counter 

17import json 

18import math 

19 

20from ...utils.logging import LoggingConfig 

21from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer, QueryAnalysis 

22from ..models import SearchResult 

23 

24logger = LoggingConfig.get_logger(__name__) 

25 

26 

27class NodeType(Enum): 

28 """Types of nodes in the knowledge graph.""" 

29 DOCUMENT = "document" 

30 SECTION = "section" 

31 ENTITY = "entity" 

32 TOPIC = "topic" 

33 CONCEPT = "concept" 

34 

35 

36class RelationshipType(Enum): 

37 """Types of relationships between graph nodes.""" 

38 # Hierarchical relationships 

39 CONTAINS = "contains" # document contains section, section contains subsection 

40 PART_OF = "part_of" # inverse of contains 

41 SIBLING = "sibling" # sections at same level 

42 

43 # Content relationships  

44 MENTIONS = "mentions" # document/section mentions entity 

45 DISCUSSES = "discusses" # document/section discusses topic 

46 RELATES_TO = "relates_to" # generic semantic relationship 

47 SIMILAR_TO = "similar_to" # semantic similarity 

48 

49 # Cross-document relationships 

50 REFERENCES = "references" # explicit cross-reference 

51 CITES = "cites" # citation relationship 

52 BUILDS_ON = "builds_on" # conceptual building 

53 CONTRADICTS = "contradicts" # conflicting information 

54 

55 # Entity relationships 

56 CO_OCCURS = "co_occurs" # entities appear together 

57 CATEGORIZED_AS = "categorized_as" # entity belongs to topic/concept 

58 

59 

60@dataclass 

61class GraphNode: 

62 """Node in the knowledge graph.""" 

63 

64 id: str 

65 node_type: NodeType 

66 title: str 

67 content: Optional[str] = None 

68 metadata: Dict[str, Any] = field(default_factory=dict) 

69 

70 # Graph metrics (calculated) 

71 centrality_score: float = 0.0 

72 authority_score: float = 0.0 

73 hub_score: float = 0.0 

74 

75 # Content analysis 

76 entities: List[str] = field(default_factory=list) 

77 topics: List[str] = field(default_factory=list) 

78 concepts: List[str] = field(default_factory=list) 

79 keywords: List[str] = field(default_factory=list) 

80 

81 def __post_init__(self): 

82 """Initialize derived properties.""" 

83 if not self.id: 

84 self.id = f"{self.node_type.value}_{hash(self.title)}" 

85 

86 

87@dataclass 

88class GraphEdge: 

89 """Edge in the knowledge graph.""" 

90 

91 source_id: str 

92 target_id: str 

93 relationship_type: RelationshipType 

94 weight: float = 1.0 

95 metadata: Dict[str, Any] = field(default_factory=dict) 

96 

97 # Evidence for the relationship 

98 evidence: List[str] = field(default_factory=list) 

99 confidence: float = 1.0 

100 

101 def __post_init__(self): 

102 """Normalize weight and confidence.""" 

103 self.weight = max(0.0, min(1.0, self.weight)) 

104 self.confidence = max(0.0, min(1.0, self.confidence)) 

105 

106 

107class TraversalStrategy(Enum): 

108 """Graph traversal strategies for different search goals.""" 

109 BREADTH_FIRST = "breadth_first" # Explore broadly 

110 DEPTH_FIRST = "depth_first" # Explore deeply 

111 WEIGHTED = "weighted" # Follow strongest relationships 

112 CENTRALITY = "centrality" # Prefer high-centrality nodes 

113 SEMANTIC = "semantic" # Follow semantic similarity 

114 HIERARCHICAL = "hierarchical" # Follow document structure 

115 

116 

117@dataclass 

118class TraversalResult: 

119 """Result of graph traversal operation.""" 

120 

121 path: List[str] # Node IDs in traversal order 

122 nodes: List[GraphNode] # Actual node objects 

123 relationships: List[GraphEdge] # Edges traversed 

124 total_weight: float # Sum of edge weights 

125 semantic_score: float # Semantic relevance to query 

126 hop_count: int # Number of hops from start 

127 reasoning_path: List[str] # Human-readable reasoning 

128 

129 

130class KnowledgeGraph: 

131 """Core knowledge graph implementation using NetworkX.""" 

132 

133 def __init__(self): 

134 """Initialize the knowledge graph.""" 

135 self.graph = nx.MultiDiGraph() # Allow multiple edges between nodes 

136 self.nodes: Dict[str, GraphNode] = {} 

137 self.edges: Dict[Tuple[str, str, str], GraphEdge] = {} # (source, target, relationship) 

138 self.node_type_index: Dict[NodeType, Set[str]] = defaultdict(set) 

139 self.entity_index: Dict[str, Set[str]] = defaultdict(set) # entity -> node_ids 

140 self.topic_index: Dict[str, Set[str]] = defaultdict(set) # topic -> node_ids 

141 

142 logger.info("Initialized empty knowledge graph") 

143 

144 def add_node(self, node: GraphNode) -> bool: 

145 """Add a node to the graph.""" 

146 try: 

147 if node.id in self.nodes: 

148 logger.debug(f"Node {node.id} already exists, updating") 

149 

150 self.nodes[node.id] = node 

151 self.graph.add_node(node.id, **node.metadata) 

152 self.node_type_index[node.node_type].add(node.id) 

153 

154 # Index entities and topics for fast lookup 

155 for entity in node.entities: 

156 self.entity_index[entity.lower()].add(node.id) 

157 for topic in node.topics: 

158 self.topic_index[topic.lower()].add(node.id) 

159 

160 logger.debug(f"Added {node.node_type.value} node: {node.id}") 

161 return True 

162 

163 except Exception as e: 

164 logger.error(f"Failed to add node {node.id}: {e}") 

165 return False 

166 

167 def add_edge(self, edge: GraphEdge) -> bool: 

168 """Add an edge to the graph.""" 

169 try: 

170 if edge.source_id not in self.nodes or edge.target_id not in self.nodes: 

171 logger.warning(f"Edge {edge.source_id} -> {edge.target_id}: missing nodes") 

172 return False 

173 

174 edge_key = (edge.source_id, edge.target_id, edge.relationship_type.value) 

175 self.edges[edge_key] = edge 

176 

177 self.graph.add_edge( 

178 edge.source_id, 

179 edge.target_id, 

180 key=edge.relationship_type.value, 

181 weight=edge.weight, 

182 relationship=edge.relationship_type.value, 

183 confidence=edge.confidence, 

184 **edge.metadata 

185 ) 

186 

187 logger.debug(f"Added edge: {edge.source_id} --{edge.relationship_type.value}--> {edge.target_id}") 

188 return True 

189 

190 except Exception as e: 

191 logger.error(f"Failed to add edge {edge.source_id} -> {edge.target_id}: {e}") 

192 return False 

193 

194 def find_nodes_by_type(self, node_type: NodeType) -> List[GraphNode]: 

195 """Find all nodes of a specific type.""" 

196 return [self.nodes[node_id] for node_id in self.node_type_index[node_type]] 

197 

198 def find_nodes_by_entity(self, entity: str) -> List[GraphNode]: 

199 """Find all nodes containing a specific entity.""" 

200 node_ids = self.entity_index.get(entity.lower(), set()) 

201 return [self.nodes[node_id] for node_id in node_ids] 

202 

203 def find_nodes_by_topic(self, topic: str) -> List[GraphNode]: 

204 """Find all nodes discussing a specific topic.""" 

205 node_ids = self.topic_index.get(topic.lower(), set()) 

206 return [self.nodes[node_id] for node_id in node_ids] 

207 

208 def calculate_centrality_scores(self): 

209 """Calculate centrality scores for all nodes.""" 

210 try: 

211 if len(self.graph.nodes) == 0: 

212 return 

213 

214 # Calculate different centrality metrics 

215 degree_centrality = nx.degree_centrality(self.graph) 

216 betweenness_centrality = nx.betweenness_centrality(self.graph) 

217 

218 # For directed graphs, calculate hub and authority scores 

219 try: 

220 hub_scores, authority_scores = nx.hits(self.graph, max_iter=100) 

221 except nx.PowerIterationFailedConvergence: 

222 logger.warning("HITS algorithm failed to converge, using default scores") 

223 hub_scores = {node: 0.0 for node in self.graph.nodes} 

224 authority_scores = {node: 0.0 for node in self.graph.nodes} 

225 

226 # Update node objects with calculated scores 

227 for node_id, node in self.nodes.items(): 

228 node.centrality_score = ( 

229 degree_centrality.get(node_id, 0.0) * 0.4 + 

230 betweenness_centrality.get(node_id, 0.0) * 0.6 

231 ) 

232 node.hub_score = hub_scores.get(node_id, 0.0) 

233 node.authority_score = authority_scores.get(node_id, 0.0) 

234 

235 logger.info(f"Calculated centrality scores for {len(self.nodes)} nodes") 

236 

237 except Exception as e: 

238 logger.error(f"Failed to calculate centrality scores: {e}") 

239 

240 def get_neighbors(self, node_id: str, relationship_types: Optional[List[RelationshipType]] = None) -> List[Tuple[str, GraphEdge]]: 

241 """Get neighboring nodes with their connecting edges.""" 

242 neighbors = [] 

243 

244 if node_id not in self.graph: 

245 return neighbors 

246 

247 for neighbor_id in self.graph.neighbors(node_id): 

248 # Get all edges between these nodes 

249 edge_data = self.graph.get_edge_data(node_id, neighbor_id) 

250 if edge_data: 

251 for key, data in edge_data.items(): 

252 relationship = RelationshipType(data['relationship']) 

253 if relationship_types is None or relationship in relationship_types: 

254 edge_key = (node_id, neighbor_id, relationship.value) 

255 if edge_key in self.edges: 

256 neighbors.append((neighbor_id, self.edges[edge_key])) 

257 

258 return neighbors 

259 

260 def get_statistics(self) -> Dict[str, Any]: 

261 """Get graph statistics.""" 

262 stats = { 

263 "total_nodes": len(self.nodes), 

264 "total_edges": len(self.edges), 

265 "node_types": {node_type.value: len(nodes) for node_type, nodes in self.node_type_index.items()}, 

266 "relationship_types": {}, 

267 "connected_components": nx.number_weakly_connected_components(self.graph), 

268 "avg_degree": sum(dict(self.graph.degree()).values()) / max(len(self.graph.nodes), 1) 

269 } 

270 

271 # Count relationship types 

272 for edge in self.edges.values(): 

273 rel_type = edge.relationship_type.value 

274 stats["relationship_types"][rel_type] = stats["relationship_types"].get(rel_type, 0) + 1 

275 

276 return stats 

277 

278 

279class GraphTraverser: 

280 """Advanced graph traversal for multi-hop search and content discovery.""" 

281 

282 def __init__(self, knowledge_graph: KnowledgeGraph, spacy_analyzer: Optional[SpaCyQueryAnalyzer] = None): 

283 """Initialize the graph traverser.""" 

284 self.graph = knowledge_graph 

285 self.spacy_analyzer = spacy_analyzer or SpaCyQueryAnalyzer() 

286 logger.debug("Initialized graph traverser") 

287 

288 def traverse( 

289 self, 

290 start_nodes: List[str], 

291 query_analysis: Optional[QueryAnalysis] = None, 

292 strategy: TraversalStrategy = TraversalStrategy.WEIGHTED, 

293 max_hops: int = 3, 

294 max_results: int = 20, 

295 min_weight: float = 0.1 

296 ) -> List[TraversalResult]: 

297 """Traverse the graph to find related content.""" 

298 

299 results = [] 

300 visited = set() 

301 

302 for start_node_id in start_nodes: 

303 if start_node_id not in self.graph.nodes: 

304 continue 

305 

306 # Perform traversal based on strategy 

307 if strategy == TraversalStrategy.BREADTH_FIRST: 

308 node_results = self._breadth_first_traversal( 

309 start_node_id, query_analysis, max_hops, max_results, min_weight, visited 

310 ) 

311 elif strategy == TraversalStrategy.WEIGHTED: 

312 node_results = self._weighted_traversal( 

313 start_node_id, query_analysis, max_hops, max_results, min_weight, visited 

314 ) 

315 elif strategy == TraversalStrategy.CENTRALITY: 

316 node_results = self._centrality_traversal( 

317 start_node_id, query_analysis, max_hops, max_results, min_weight, visited 

318 ) 

319 elif strategy == TraversalStrategy.SEMANTIC: 

320 node_results = self._semantic_traversal( 

321 start_node_id, query_analysis, max_hops, max_results, min_weight, visited 

322 ) 

323 else: 

324 node_results = self._breadth_first_traversal( 

325 start_node_id, query_analysis, max_hops, max_results, min_weight, visited 

326 ) 

327 

328 results.extend(node_results) 

329 

330 # Sort by semantic score and total weight 

331 results.sort(key=lambda r: (r.semantic_score, r.total_weight), reverse=True) 

332 return results[:max_results] 

333 

334 def _breadth_first_traversal( 

335 self, 

336 start_node_id: str, 

337 query_analysis: Optional[QueryAnalysis], 

338 max_hops: int, 

339 max_results: int, 

340 min_weight: float, 

341 visited: Set[str] 

342 ) -> List[TraversalResult]: 

343 """Breadth-first traversal implementation.""" 

344 

345 results = [] 

346 queue = [(start_node_id, [], [], 0.0, 0)] # (node_id, path, edges, weight, hops) 

347 local_visited = set() 

348 

349 while queue and len(results) < max_results: 

350 node_id, path, edges, total_weight, hops = queue.pop(0) 

351 

352 if node_id in local_visited or hops >= max_hops: 

353 continue 

354 

355 local_visited.add(node_id) 

356 

357 # Create traversal result 

358 if node_id != start_node_id: # Don't include the starting node 

359 semantic_score = self._calculate_semantic_score(node_id, query_analysis) 

360 reasoning_path = self._build_reasoning_path(path, edges) 

361 

362 result = TraversalResult( 

363 path=path + [node_id], 

364 nodes=[self.graph.nodes[nid] for nid in path + [node_id]], 

365 relationships=edges, 

366 total_weight=total_weight, 

367 semantic_score=semantic_score, 

368 hop_count=hops, 

369 reasoning_path=reasoning_path 

370 ) 

371 results.append(result) 

372 

373 # Add neighbors to queue 

374 neighbors = self.graph.get_neighbors(node_id) 

375 for neighbor_id, edge in neighbors: 

376 if neighbor_id not in local_visited and edge.weight >= min_weight: 

377 queue.append(( 

378 neighbor_id, 

379 path + [node_id], 

380 edges + [edge], 

381 total_weight + edge.weight, 

382 hops + 1 

383 )) 

384 

385 return results 

386 

387 def _weighted_traversal( 

388 self, 

389 start_node_id: str, 

390 query_analysis: Optional[QueryAnalysis], 

391 max_hops: int, 

392 max_results: int, 

393 min_weight: float, 

394 visited: Set[str] 

395 ) -> List[TraversalResult]: 

396 """Weighted traversal prioritizing strong relationships.""" 

397 

398 results = [] 

399 # Priority queue: (negative_weight, node_id, path, edges, weight, hops) 

400 import heapq 

401 heap = [(-1.0, start_node_id, [], [], 0.0, 0)] 

402 local_visited = set() 

403 

404 while heap and len(results) < max_results: 

405 neg_weight, node_id, path, edges, total_weight, hops = heapq.heappop(heap) 

406 

407 if node_id in local_visited or hops >= max_hops: 

408 continue 

409 

410 local_visited.add(node_id) 

411 

412 # Create traversal result 

413 if node_id != start_node_id: 

414 semantic_score = self._calculate_semantic_score(node_id, query_analysis) 

415 reasoning_path = self._build_reasoning_path(path, edges) 

416 

417 result = TraversalResult( 

418 path=path + [node_id], 

419 nodes=[self.graph.nodes[nid] for nid in path + [node_id]], 

420 relationships=edges, 

421 total_weight=total_weight, 

422 semantic_score=semantic_score, 

423 hop_count=hops, 

424 reasoning_path=reasoning_path 

425 ) 

426 results.append(result) 

427 

428 # Add neighbors to heap 

429 neighbors = self.graph.get_neighbors(node_id) 

430 for neighbor_id, edge in neighbors: 

431 if neighbor_id not in local_visited and edge.weight >= min_weight: 

432 new_weight = total_weight + edge.weight 

433 heapq.heappush(heap, ( 

434 -new_weight, # Negative for max-heap behavior 

435 neighbor_id, 

436 path + [node_id], 

437 edges + [edge], 

438 new_weight, 

439 hops + 1 

440 )) 

441 

442 return results 

443 

444 def _centrality_traversal( 

445 self, 

446 start_node_id: str, 

447 query_analysis: Optional[QueryAnalysis], 

448 max_hops: int, 

449 max_results: int, 

450 min_weight: float, 

451 visited: Set[str] 

452 ) -> List[TraversalResult]: 

453 """Traversal prioritizing high-centrality nodes.""" 

454 

455 results = [] 

456 import heapq 

457 # Priority queue: (negative_centrality, node_id, path, edges, weight, hops) 

458 start_centrality = self.graph.nodes[start_node_id].centrality_score 

459 heap = [(-start_centrality, start_node_id, [], [], 0.0, 0)] 

460 local_visited = set() 

461 

462 while heap and len(results) < max_results: 

463 neg_centrality, node_id, path, edges, total_weight, hops = heapq.heappop(heap) 

464 

465 if node_id in local_visited or hops >= max_hops: 

466 continue 

467 

468 local_visited.add(node_id) 

469 

470 # Create traversal result 

471 if node_id != start_node_id: 

472 semantic_score = self._calculate_semantic_score(node_id, query_analysis) 

473 reasoning_path = self._build_reasoning_path(path, edges) 

474 

475 result = TraversalResult( 

476 path=path + [node_id], 

477 nodes=[self.graph.nodes[nid] for nid in path + [node_id]], 

478 relationships=edges, 

479 total_weight=total_weight, 

480 semantic_score=semantic_score, 

481 hop_count=hops, 

482 reasoning_path=reasoning_path 

483 ) 

484 results.append(result) 

485 

486 # Add neighbors to heap 

487 neighbors = self.graph.get_neighbors(node_id) 

488 for neighbor_id, edge in neighbors: 

489 if neighbor_id not in local_visited and edge.weight >= min_weight: 

490 neighbor_centrality = self.graph.nodes[neighbor_id].centrality_score 

491 heapq.heappush(heap, ( 

492 -neighbor_centrality, 

493 neighbor_id, 

494 path + [node_id], 

495 edges + [edge], 

496 total_weight + edge.weight, 

497 hops + 1 

498 )) 

499 

500 return results 

501 

502 def _semantic_traversal( 

503 self, 

504 start_node_id: str, 

505 query_analysis: Optional[QueryAnalysis], 

506 max_hops: int, 

507 max_results: int, 

508 min_weight: float, 

509 visited: Set[str] 

510 ) -> List[TraversalResult]: 

511 """Traversal prioritizing semantic similarity to query.""" 

512 

513 if not query_analysis: 

514 return self._breadth_first_traversal(start_node_id, query_analysis, max_hops, max_results, min_weight, visited) 

515 

516 results = [] 

517 import heapq 

518 # Priority queue: (negative_semantic_score, node_id, path, edges, weight, hops) 

519 start_score = self._calculate_semantic_score(start_node_id, query_analysis) 

520 heap = [(-start_score, start_node_id, [], [], 0.0, 0)] 

521 local_visited = set() 

522 

523 while heap and len(results) < max_results: 

524 neg_score, node_id, path, edges, total_weight, hops = heapq.heappop(heap) 

525 

526 if node_id in local_visited or hops >= max_hops: 

527 continue 

528 

529 local_visited.add(node_id) 

530 

531 # Create traversal result 

532 if node_id != start_node_id: 

533 semantic_score = -neg_score # Convert back from negative 

534 reasoning_path = self._build_reasoning_path(path, edges) 

535 

536 result = TraversalResult( 

537 path=path + [node_id], 

538 nodes=[self.graph.nodes[nid] for nid in path + [node_id]], 

539 relationships=edges, 

540 total_weight=total_weight, 

541 semantic_score=semantic_score, 

542 hop_count=hops, 

543 reasoning_path=reasoning_path 

544 ) 

545 results.append(result) 

546 

547 # Add neighbors to heap 

548 neighbors = self.graph.get_neighbors(node_id) 

549 for neighbor_id, edge in neighbors: 

550 if neighbor_id not in local_visited and edge.weight >= min_weight: 

551 neighbor_score = self._calculate_semantic_score(neighbor_id, query_analysis) 

552 heapq.heappush(heap, ( 

553 -neighbor_score, 

554 neighbor_id, 

555 path + [node_id], 

556 edges + [edge], 

557 total_weight + edge.weight, 

558 hops + 1 

559 )) 

560 

561 return results 

562 

563 def _calculate_semantic_score(self, node_id: str, query_analysis: Optional[QueryAnalysis]) -> float: 

564 """Calculate semantic similarity between node and query.""" 

565 if not query_analysis: 

566 return 0.0 

567 

568 node = self.graph.nodes[node_id] 

569 

570 # Calculate similarity based on entities, topics, and keywords 

571 entity_similarity = self._calculate_list_similarity( 

572 query_analysis.entities, [(e, "") for e in node.entities] 

573 ) 

574 

575 topic_similarity = self._calculate_list_similarity( 

576 [(t, "") for t in query_analysis.main_concepts], [(t, "") for t in node.topics] 

577 ) 

578 

579 keyword_similarity = self._calculate_list_similarity( 

580 [(k, "") for k in query_analysis.semantic_keywords], [(k, "") for k in node.keywords] 

581 ) 

582 

583 # Weighted combination 

584 total_score = ( 

585 entity_similarity * 0.4 + 

586 topic_similarity * 0.3 + 

587 keyword_similarity * 0.3 

588 ) 

589 

590 return total_score 

591 

592 def _calculate_list_similarity(self, list1: List[Tuple[str, str]], list2: List[Tuple[str, str]]) -> float: 

593 """Calculate similarity between two lists of items.""" 

594 if not list1 or not list2: 

595 return 0.0 

596 

597 set1 = set(item[0].lower() for item in list1) 

598 set2 = set(item[0].lower() for item in list2) 

599 

600 intersection = len(set1.intersection(set2)) 

601 union = len(set1.union(set2)) 

602 

603 return intersection / max(union, 1) 

604 

605 def _build_reasoning_path(self, path: List[str], edges: List[GraphEdge]) -> List[str]: 

606 """Build human-readable reasoning path.""" 

607 reasoning = [] 

608 

609 for i, edge in enumerate(edges): 

610 source_node = self.graph.nodes[edge.source_id] 

611 target_node = self.graph.nodes[edge.target_id] 

612 

613 reasoning.append( 

614 f"{source_node.title} --{edge.relationship_type.value}--> {target_node.title} " 

615 f"(weight: {edge.weight:.2f})" 

616 ) 

617 

618 return reasoning 

619 

620 

621class GraphBuilder: 

622 """Build knowledge graph from document metadata and search results.""" 

623 

624 def __init__(self, spacy_analyzer: Optional[SpaCyQueryAnalyzer] = None): 

625 """Initialize the graph builder.""" 

626 self.spacy_analyzer = spacy_analyzer or SpaCyQueryAnalyzer() 

627 logger.info("Initialized graph builder") 

628 

629 def build_from_search_results(self, search_results: List[SearchResult]) -> KnowledgeGraph: 

630 """Build knowledge graph from search results metadata.""" 

631 

632 start_time = time.time() 

633 graph = KnowledgeGraph() 

634 

635 try: 

636 # Step 1: Create nodes from search results 

637 document_nodes = self._create_document_nodes(search_results) 

638 for node in document_nodes: 

639 graph.add_node(node) 

640 

641 # Step 2: Create entity and topic nodes 

642 entity_nodes, topic_nodes = self._create_concept_nodes(search_results) 

643 for node in entity_nodes + topic_nodes: 

644 graph.add_node(node) 

645 

646 # Step 3: Create relationships 

647 edges = self._create_relationships(search_results, graph) 

648 for edge in edges: 

649 graph.add_edge(edge) 

650 

651 # Step 4: Calculate centrality scores 

652 graph.calculate_centrality_scores() 

653 

654 build_time = (time.time() - start_time) * 1000 

655 stats = graph.get_statistics() 

656 

657 logger.info( 

658 f"Built knowledge graph in {build_time:.2f}ms", 

659 nodes=stats["total_nodes"], 

660 edges=stats["total_edges"], 

661 components=stats["connected_components"] 

662 ) 

663 

664 return graph 

665 

666 except Exception as e: 

667 logger.error(f"Failed to build knowledge graph: {e}") 

668 return graph 

669 

670 def _create_document_nodes(self, search_results: List[SearchResult]) -> List[GraphNode]: 

671 """Create document and section nodes from search results.""" 

672 

673 nodes = [] 

674 seen_documents = set() 

675 

676 for result in search_results: 

677 # Create document node 

678 doc_id = f"doc_{result.source_type}_{hash(result.source_url or result.text[:100])}" 

679 

680 if doc_id not in seen_documents: 

681 seen_documents.add(doc_id) 

682 

683 doc_node = GraphNode( 

684 id=doc_id, 

685 node_type=NodeType.DOCUMENT, 

686 title=result.source_title or f"Document from {result.source_type}", 

687 content=result.text[:500], # First 500 chars as summary 

688 metadata={ 

689 "source_type": result.source_type, 

690 "source_title": result.source_title, 

691 "url": result.source_url, 

692 "project_id": result.project_id, 

693 "collection_name": result.collection_name 

694 }, 

695 entities=self._extract_entities(result), 

696 topics=self._extract_topics(result), 

697 concepts=self._extract_concepts(result), 

698 keywords=self._extract_keywords(result) 

699 ) 

700 nodes.append(doc_node) 

701 

702 # Create section node 

703 section_id = f"section_{hash(result.text)}" 

704 section_node = GraphNode( 

705 id=section_id, 

706 node_type=NodeType.SECTION, 

707 title=(result.section_title or result.breadcrumb_text or "Section")[-50:], # Last 50 chars 

708 content=result.text, 

709 metadata={ 

710 "parent_document": doc_id, 

711 "breadcrumb": result.breadcrumb_text, 

712 "section_level": result.section_level or result.depth, 

713 "score": result.score, 

714 "section_type": result.section_type 

715 }, 

716 entities=self._extract_entities(result), 

717 topics=self._extract_topics(result), 

718 concepts=self._extract_concepts(result), 

719 keywords=self._extract_keywords(result) 

720 ) 

721 nodes.append(section_node) 

722 

723 return nodes 

724 

725 def _create_concept_nodes(self, search_results: List[SearchResult]) -> Tuple[List[GraphNode], List[GraphNode]]: 

726 """Create entity and topic nodes from extracted metadata.""" 

727 

728 # Collect all entities and topics 

729 entity_counts = Counter() 

730 topic_counts = Counter() 

731 

732 for result in search_results: 

733 entities = self._extract_entities(result) 

734 topics = self._extract_topics(result) 

735 

736 for entity in entities: 

737 entity_counts[entity] += 1 

738 for topic in topics: 

739 topic_counts[topic] += 1 

740 

741 # Create nodes for frequent entities and topics 

742 entity_nodes = [] 

743 topic_nodes = [] 

744 

745 # Entities mentioned in at least 2 documents 

746 for entity, count in entity_counts.items(): 

747 if count >= 2: 

748 entity_node = GraphNode( 

749 id=f"entity_{hash(entity)}", 

750 node_type=NodeType.ENTITY, 

751 title=entity, 

752 metadata={"mention_count": count, "entity_type": "extracted"} 

753 ) 

754 entity_nodes.append(entity_node) 

755 

756 # Topics mentioned in at least 2 documents 

757 for topic, count in topic_counts.items(): 

758 if count >= 2: 

759 topic_node = GraphNode( 

760 id=f"topic_{hash(topic)}", 

761 node_type=NodeType.TOPIC, 

762 title=topic, 

763 metadata={"mention_count": count, "topic_type": "extracted"} 

764 ) 

765 topic_nodes.append(topic_node) 

766 

767 return entity_nodes, topic_nodes 

768 

769 def _create_relationships(self, search_results: List[SearchResult], graph: KnowledgeGraph) -> List[GraphEdge]: 

770 """Create relationships between graph nodes.""" 

771 

772 edges = [] 

773 

774 # Document -> Section relationships 

775 for result in search_results: 

776 doc_id = f"doc_{result.source_type}_{hash(result.source_url or result.text[:100])}" 

777 section_id = f"section_{hash(result.text)}" 

778 

779 if doc_id in graph.nodes and section_id in graph.nodes: 

780 edge = GraphEdge( 

781 source_id=doc_id, 

782 target_id=section_id, 

783 relationship_type=RelationshipType.CONTAINS, 

784 weight=1.0, 

785 confidence=1.0, 

786 evidence=["hierarchical_structure"] 

787 ) 

788 edges.append(edge) 

789 

790 # Entity relationships 

791 entity_edges = self._create_entity_relationships(search_results, graph) 

792 edges.extend(entity_edges) 

793 

794 # Topic relationships  

795 topic_edges = self._create_topic_relationships(search_results, graph) 

796 edges.extend(topic_edges) 

797 

798 # Semantic similarity relationships 

799 similarity_edges = self._create_similarity_relationships(graph) 

800 edges.extend(similarity_edges) 

801 

802 return edges 

803 

804 def _create_entity_relationships(self, search_results: List[SearchResult], graph: KnowledgeGraph) -> List[GraphEdge]: 

805 """Create entity-related relationships.""" 

806 

807 edges = [] 

808 

809 # Document/Section mentions Entity 

810 for result in search_results: 

811 section_id = f"section_{hash(result.text)}" 

812 entities = self._extract_entities(result) 

813 

814 for entity in entities: 

815 entity_nodes = graph.find_nodes_by_entity(entity) 

816 for entity_node in entity_nodes: 

817 if section_id in graph.nodes: 

818 edge = GraphEdge( 

819 source_id=section_id, 

820 target_id=entity_node.id, 

821 relationship_type=RelationshipType.MENTIONS, 

822 weight=0.7, 

823 confidence=0.8, 

824 evidence=[f"entity_extraction: {entity}"] 

825 ) 

826 edges.append(edge) 

827 

828 # Entity co-occurrence relationships 

829 co_occurrence_edges = self._create_entity_cooccurrence(search_results, graph) 

830 edges.extend(co_occurrence_edges) 

831 

832 return edges 

833 

834 def _create_topic_relationships(self, search_results: List[SearchResult], graph: KnowledgeGraph) -> List[GraphEdge]: 

835 """Create topic-related relationships.""" 

836 

837 edges = [] 

838 

839 # Document/Section discusses Topic 

840 for result in search_results: 

841 section_id = f"section_{hash(result.text)}" 

842 topics = self._extract_topics(result) 

843 

844 for topic in topics: 

845 topic_nodes = graph.find_nodes_by_topic(topic) 

846 for topic_node in topic_nodes: 

847 if section_id in graph.nodes: 

848 edge = GraphEdge( 

849 source_id=section_id, 

850 target_id=topic_node.id, 

851 relationship_type=RelationshipType.DISCUSSES, 

852 weight=0.6, 

853 confidence=0.7, 

854 evidence=[f"topic_extraction: {topic}"] 

855 ) 

856 edges.append(edge) 

857 

858 return edges 

859 

860 def _create_entity_cooccurrence(self, search_results: List[SearchResult], graph: KnowledgeGraph) -> List[GraphEdge]: 

861 """Create entity co-occurrence relationships.""" 

862 

863 edges = [] 

864 cooccurrence_counts = defaultdict(int) 

865 

866 # Count entity co-occurrences 

867 for result in search_results: 

868 entities = self._extract_entities(result) 

869 for i, entity1 in enumerate(entities): 

870 for entity2 in entities[i+1:]: 

871 pair = tuple(sorted([entity1, entity2])) 

872 cooccurrence_counts[pair] += 1 

873 

874 # Create edges for significant co-occurrences 

875 for (entity1, entity2), count in cooccurrence_counts.items(): 

876 if count >= 2: # Appeared together at least twice 

877 entity1_nodes = graph.find_nodes_by_entity(entity1) 

878 entity2_nodes = graph.find_nodes_by_entity(entity2) 

879 

880 for node1 in entity1_nodes: 

881 for node2 in entity2_nodes: 

882 weight = min(1.0, count / 5.0) # Normalize to max 1.0 

883 edge = GraphEdge( 

884 source_id=node1.id, 

885 target_id=node2.id, 

886 relationship_type=RelationshipType.CO_OCCURS, 

887 weight=weight, 

888 confidence=weight, 

889 evidence=[f"co_occurrence_count: {count}"] 

890 ) 

891 edges.append(edge) 

892 

893 return edges 

894 

895 def _create_similarity_relationships(self, graph: KnowledgeGraph) -> List[GraphEdge]: 

896 """Create semantic similarity relationships between nodes.""" 

897 

898 edges = [] 

899 

900 # Calculate similarity between section nodes 

901 section_nodes = graph.find_nodes_by_type(NodeType.SECTION) 

902 

903 for i, node1 in enumerate(section_nodes): 

904 for node2 in section_nodes[i+1:]: 

905 similarity = self._calculate_node_similarity(node1, node2) 

906 

907 if similarity > 0.3: # Threshold for meaningful similarity 

908 edge = GraphEdge( 

909 source_id=node1.id, 

910 target_id=node2.id, 

911 relationship_type=RelationshipType.SIMILAR_TO, 

912 weight=similarity, 

913 confidence=similarity, 

914 evidence=[f"semantic_similarity: {similarity:.3f}"] 

915 ) 

916 edges.append(edge) 

917 

918 return edges 

919 

920 def _calculate_node_similarity(self, node1: GraphNode, node2: GraphNode) -> float: 

921 """Calculate similarity between two nodes.""" 

922 

923 # Entity overlap 

924 entity_similarity = self._jaccard_similarity(set(node1.entities), set(node2.entities)) 

925 

926 # Topic overlap 

927 topic_similarity = self._jaccard_similarity(set(node1.topics), set(node2.topics)) 

928 

929 # Keyword overlap 

930 keyword_similarity = self._jaccard_similarity(set(node1.keywords), set(node2.keywords)) 

931 

932 # Weighted combination 

933 total_similarity = ( 

934 entity_similarity * 0.4 + 

935 topic_similarity * 0.3 + 

936 keyword_similarity * 0.3 

937 ) 

938 

939 return total_similarity 

940 

941 def _jaccard_similarity(self, set1: Set[str], set2: Set[str]) -> float: 

942 """Calculate Jaccard similarity between two sets.""" 

943 if not set1 or not set2: 

944 return 0.0 

945 

946 intersection = len(set1.intersection(set2)) 

947 union = len(set1.union(set2)) 

948 

949 return intersection / max(union, 1) 

950 

951 def _extract_entities(self, result: SearchResult) -> List[str]: 

952 """Extract entities from search result fields.""" 

953 entities = [] 

954 

955 # Use available fields from the SearchResult model 

956 # Since the metadata structure is now flattened, we'll extract from text and titles 

957 if result.source_title: 

958 entities.append(result.source_title) 

959 

960 if result.parent_title: 

961 entities.append(result.parent_title) 

962 

963 if result.section_title: 

964 entities.append(result.section_title) 

965 

966 # Basic entity extraction from project names 

967 if result.project_name: 

968 entities.append(result.project_name) 

969 

970 return list(set(entities)) # Remove duplicates 

971 

972 def _extract_topics(self, result: SearchResult) -> List[str]: 

973 """Extract topics from search result fields.""" 

974 topics = [] 

975 

976 # From breadcrumb (hierarchical topics) 

977 if result.breadcrumb_text: 

978 topics.extend([section.strip() for section in result.breadcrumb_text.split(" > ")]) 

979 

980 # From section information 

981 if result.section_type: 

982 topics.append(result.section_type) 

983 

984 # From source type as a topic 

985 if result.source_type: 

986 topics.append(result.source_type) 

987 

988 return list(set(topics)) 

989 

990 def _extract_concepts(self, result: SearchResult) -> List[str]: 

991 """Extract concepts from search result fields.""" 

992 concepts = [] 

993 

994 # Use section titles and breadcrumbs as concepts 

995 if result.section_title: 

996 concepts.append(result.section_title) 

997 

998 if result.hierarchy_context: 

999 concepts.append(result.hierarchy_context) 

1000 

1001 return list(set(concepts)) 

1002 

1003 def _extract_keywords(self, result: SearchResult) -> List[str]: 

1004 """Extract keywords from search result text and titles.""" 

1005 keywords = [] 

1006 

1007 # Basic keyword extraction from text (first few words) 

1008 text_words = result.text.lower().split()[:10] # First 10 words 

1009 keywords.extend([word for word in text_words if len(word) > 3 and word.isalpha()]) 

1010 

1011 # Keywords from titles 

1012 if result.source_title: 

1013 title_words = result.source_title.lower().split() 

1014 keywords.extend([word for word in title_words if len(word) > 3 and word.isalpha()]) 

1015 

1016 return list(set(keywords)) 

1017 

1018 

1019class DocumentKnowledgeGraph: 

1020 """High-level interface for document knowledge graph operations.""" 

1021 

1022 def __init__(self, spacy_analyzer: Optional[SpaCyQueryAnalyzer] = None): 

1023 """Initialize the document knowledge graph system.""" 

1024 self.spacy_analyzer = spacy_analyzer or SpaCyQueryAnalyzer() 

1025 self.graph_builder = GraphBuilder(self.spacy_analyzer) 

1026 self.knowledge_graph: Optional[KnowledgeGraph] = None 

1027 self.traverser: Optional[GraphTraverser] = None 

1028 

1029 logger.info("Initialized document knowledge graph system") 

1030 

1031 def build_graph(self, search_results: List[SearchResult]) -> bool: 

1032 """Build knowledge graph from search results.""" 

1033 try: 

1034 self.knowledge_graph = self.graph_builder.build_from_search_results(search_results) 

1035 self.traverser = GraphTraverser(self.knowledge_graph, self.spacy_analyzer) 

1036 

1037 stats = self.knowledge_graph.get_statistics() 

1038 logger.info("Knowledge graph built successfully", **stats) 

1039 return True 

1040 

1041 except Exception as e: 

1042 logger.error(f"Failed to build knowledge graph: {e}") 

1043 return False 

1044 

1045 def find_related_content( 

1046 self, 

1047 query: str, 

1048 max_hops: int = 3, 

1049 max_results: int = 20, 

1050 strategy: TraversalStrategy = TraversalStrategy.SEMANTIC 

1051 ) -> List[TraversalResult]: 

1052 """Find related content using graph traversal.""" 

1053 

1054 if not self.knowledge_graph or not self.traverser: 

1055 logger.warning("Knowledge graph not initialized") 

1056 return [] 

1057 

1058 try: 

1059 # Analyze query with spaCy 

1060 query_analysis = self.spacy_analyzer.analyze_query_semantic(query) 

1061 

1062 # Find starting nodes based on query entities and concepts 

1063 start_nodes = self._find_query_start_nodes(query_analysis) 

1064 

1065 if not start_nodes: 

1066 logger.debug("No starting nodes found for query") 

1067 return [] 

1068 

1069 # Traverse graph to find related content 

1070 results = self.traverser.traverse( 

1071 start_nodes=start_nodes, 

1072 query_analysis=query_analysis, 

1073 strategy=strategy, 

1074 max_hops=max_hops, 

1075 max_results=max_results 

1076 ) 

1077 

1078 logger.debug(f"Found {len(results)} related content items via graph traversal") 

1079 return results 

1080 

1081 except Exception as e: 

1082 logger.error(f"Failed to find related content: {e}") 

1083 return [] 

1084 

1085 def _find_query_start_nodes(self, query_analysis: QueryAnalysis) -> List[str]: 

1086 """Find starting nodes for graph traversal based on query analysis.""" 

1087 

1088 start_nodes = [] 

1089 

1090 # Find nodes by entities 

1091 for entity_text, entity_label in query_analysis.entities: 

1092 entity_nodes = self.knowledge_graph.find_nodes_by_entity(entity_text) 

1093 start_nodes.extend([node.id for node in entity_nodes]) 

1094 

1095 # Find nodes by main concepts (as topics) 

1096 for concept in query_analysis.main_concepts: 

1097 topic_nodes = self.knowledge_graph.find_nodes_by_topic(concept) 

1098 start_nodes.extend([node.id for node in topic_nodes]) 

1099 

1100 # If no entity/topic matches, use high-centrality document nodes 

1101 if not start_nodes: 

1102 doc_nodes = self.knowledge_graph.find_nodes_by_type(NodeType.DOCUMENT) 

1103 # Sort by centrality and take top 3 

1104 doc_nodes.sort(key=lambda n: n.centrality_score, reverse=True) 

1105 start_nodes = [node.id for node in doc_nodes[:3]] 

1106 

1107 return list(set(start_nodes)) # Remove duplicates 

1108 

1109 def get_graph_statistics(self) -> Optional[Dict[str, Any]]: 

1110 """Get knowledge graph statistics.""" 

1111 if self.knowledge_graph: 

1112 return self.knowledge_graph.get_statistics() 

1113 return None 

1114 

1115 def export_graph(self, format: str = "json") -> Optional[str]: 

1116 """Export knowledge graph in specified format.""" 

1117 if not self.knowledge_graph: 

1118 return None 

1119 

1120 try: 

1121 if format == "json": 

1122 # Export as JSON with nodes and edges 

1123 data = { 

1124 "nodes": [ 

1125 { 

1126 "id": node.id, 

1127 "type": node.node_type.value, 

1128 "title": node.title, 

1129 "centrality": node.centrality_score, 

1130 "entities": node.entities, 

1131 "topics": node.topics 

1132 } 

1133 for node in self.knowledge_graph.nodes.values() 

1134 ], 

1135 "edges": [ 

1136 { 

1137 "source": edge.source_id, 

1138 "target": edge.target_id, 

1139 "relationship": edge.relationship_type.value, 

1140 "weight": edge.weight, 

1141 "confidence": edge.confidence 

1142 } 

1143 for edge in self.knowledge_graph.edges.values() 

1144 ] 

1145 } 

1146 return json.dumps(data, indent=2) 

1147 

1148 except Exception as e: 

1149 logger.error(f"Failed to export graph: {e}") 

1150 

1151 return None