Coverage for src/qdrant_loader_mcp_server/search/enhanced/knowledge_graph.py: 92%
532 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
1"""Knowledge Graph Construction for Advanced Search Intelligence.
3This module implements Phase 2.1 of the search enhancement roadmap:
4- Multi-node graph construction from document metadata
5- Relationship extraction and strength scoring
6- Graph traversal algorithms for multi-hop search
7- Integration with spaCy-powered Phase 1.0 components
8"""
10import logging
11import time
12import networkx as nx
13from dataclasses import dataclass, field
14from enum import Enum
15from typing import Any, Dict, List, Optional, Set, Tuple, Union
16from collections import defaultdict, Counter
17import json
18import math
20from ...utils.logging import LoggingConfig
21from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer, QueryAnalysis
22from ..models import SearchResult
24logger = LoggingConfig.get_logger(__name__)
27class NodeType(Enum):
28 """Types of nodes in the knowledge graph."""
29 DOCUMENT = "document"
30 SECTION = "section"
31 ENTITY = "entity"
32 TOPIC = "topic"
33 CONCEPT = "concept"
36class RelationshipType(Enum):
37 """Types of relationships between graph nodes."""
38 # Hierarchical relationships
39 CONTAINS = "contains" # document contains section, section contains subsection
40 PART_OF = "part_of" # inverse of contains
41 SIBLING = "sibling" # sections at same level
43 # Content relationships
44 MENTIONS = "mentions" # document/section mentions entity
45 DISCUSSES = "discusses" # document/section discusses topic
46 RELATES_TO = "relates_to" # generic semantic relationship
47 SIMILAR_TO = "similar_to" # semantic similarity
49 # Cross-document relationships
50 REFERENCES = "references" # explicit cross-reference
51 CITES = "cites" # citation relationship
52 BUILDS_ON = "builds_on" # conceptual building
53 CONTRADICTS = "contradicts" # conflicting information
55 # Entity relationships
56 CO_OCCURS = "co_occurs" # entities appear together
57 CATEGORIZED_AS = "categorized_as" # entity belongs to topic/concept
60@dataclass
61class GraphNode:
62 """Node in the knowledge graph."""
64 id: str
65 node_type: NodeType
66 title: str
67 content: Optional[str] = None
68 metadata: Dict[str, Any] = field(default_factory=dict)
70 # Graph metrics (calculated)
71 centrality_score: float = 0.0
72 authority_score: float = 0.0
73 hub_score: float = 0.0
75 # Content analysis
76 entities: List[str] = field(default_factory=list)
77 topics: List[str] = field(default_factory=list)
78 concepts: List[str] = field(default_factory=list)
79 keywords: List[str] = field(default_factory=list)
81 def __post_init__(self):
82 """Initialize derived properties."""
83 if not self.id:
84 self.id = f"{self.node_type.value}_{hash(self.title)}"
87@dataclass
88class GraphEdge:
89 """Edge in the knowledge graph."""
91 source_id: str
92 target_id: str
93 relationship_type: RelationshipType
94 weight: float = 1.0
95 metadata: Dict[str, Any] = field(default_factory=dict)
97 # Evidence for the relationship
98 evidence: List[str] = field(default_factory=list)
99 confidence: float = 1.0
101 def __post_init__(self):
102 """Normalize weight and confidence."""
103 self.weight = max(0.0, min(1.0, self.weight))
104 self.confidence = max(0.0, min(1.0, self.confidence))
107class TraversalStrategy(Enum):
108 """Graph traversal strategies for different search goals."""
109 BREADTH_FIRST = "breadth_first" # Explore broadly
110 DEPTH_FIRST = "depth_first" # Explore deeply
111 WEIGHTED = "weighted" # Follow strongest relationships
112 CENTRALITY = "centrality" # Prefer high-centrality nodes
113 SEMANTIC = "semantic" # Follow semantic similarity
114 HIERARCHICAL = "hierarchical" # Follow document structure
117@dataclass
118class TraversalResult:
119 """Result of graph traversal operation."""
121 path: List[str] # Node IDs in traversal order
122 nodes: List[GraphNode] # Actual node objects
123 relationships: List[GraphEdge] # Edges traversed
124 total_weight: float # Sum of edge weights
125 semantic_score: float # Semantic relevance to query
126 hop_count: int # Number of hops from start
127 reasoning_path: List[str] # Human-readable reasoning
130class KnowledgeGraph:
131 """Core knowledge graph implementation using NetworkX."""
133 def __init__(self):
134 """Initialize the knowledge graph."""
135 self.graph = nx.MultiDiGraph() # Allow multiple edges between nodes
136 self.nodes: Dict[str, GraphNode] = {}
137 self.edges: Dict[Tuple[str, str, str], GraphEdge] = {} # (source, target, relationship)
138 self.node_type_index: Dict[NodeType, Set[str]] = defaultdict(set)
139 self.entity_index: Dict[str, Set[str]] = defaultdict(set) # entity -> node_ids
140 self.topic_index: Dict[str, Set[str]] = defaultdict(set) # topic -> node_ids
142 logger.info("Initialized empty knowledge graph")
144 def add_node(self, node: GraphNode) -> bool:
145 """Add a node to the graph."""
146 try:
147 if node.id in self.nodes:
148 logger.debug(f"Node {node.id} already exists, updating")
150 self.nodes[node.id] = node
151 self.graph.add_node(node.id, **node.metadata)
152 self.node_type_index[node.node_type].add(node.id)
154 # Index entities and topics for fast lookup
155 for entity in node.entities:
156 self.entity_index[entity.lower()].add(node.id)
157 for topic in node.topics:
158 self.topic_index[topic.lower()].add(node.id)
160 logger.debug(f"Added {node.node_type.value} node: {node.id}")
161 return True
163 except Exception as e:
164 logger.error(f"Failed to add node {node.id}: {e}")
165 return False
167 def add_edge(self, edge: GraphEdge) -> bool:
168 """Add an edge to the graph."""
169 try:
170 if edge.source_id not in self.nodes or edge.target_id not in self.nodes:
171 logger.warning(f"Edge {edge.source_id} -> {edge.target_id}: missing nodes")
172 return False
174 edge_key = (edge.source_id, edge.target_id, edge.relationship_type.value)
175 self.edges[edge_key] = edge
177 self.graph.add_edge(
178 edge.source_id,
179 edge.target_id,
180 key=edge.relationship_type.value,
181 weight=edge.weight,
182 relationship=edge.relationship_type.value,
183 confidence=edge.confidence,
184 **edge.metadata
185 )
187 logger.debug(f"Added edge: {edge.source_id} --{edge.relationship_type.value}--> {edge.target_id}")
188 return True
190 except Exception as e:
191 logger.error(f"Failed to add edge {edge.source_id} -> {edge.target_id}: {e}")
192 return False
194 def find_nodes_by_type(self, node_type: NodeType) -> List[GraphNode]:
195 """Find all nodes of a specific type."""
196 return [self.nodes[node_id] for node_id in self.node_type_index[node_type]]
198 def find_nodes_by_entity(self, entity: str) -> List[GraphNode]:
199 """Find all nodes containing a specific entity."""
200 node_ids = self.entity_index.get(entity.lower(), set())
201 return [self.nodes[node_id] for node_id in node_ids]
203 def find_nodes_by_topic(self, topic: str) -> List[GraphNode]:
204 """Find all nodes discussing a specific topic."""
205 node_ids = self.topic_index.get(topic.lower(), set())
206 return [self.nodes[node_id] for node_id in node_ids]
208 def calculate_centrality_scores(self):
209 """Calculate centrality scores for all nodes."""
210 try:
211 if len(self.graph.nodes) == 0:
212 return
214 # Calculate different centrality metrics
215 degree_centrality = nx.degree_centrality(self.graph)
216 betweenness_centrality = nx.betweenness_centrality(self.graph)
218 # For directed graphs, calculate hub and authority scores
219 try:
220 hub_scores, authority_scores = nx.hits(self.graph, max_iter=100)
221 except nx.PowerIterationFailedConvergence:
222 logger.warning("HITS algorithm failed to converge, using default scores")
223 hub_scores = {node: 0.0 for node in self.graph.nodes}
224 authority_scores = {node: 0.0 for node in self.graph.nodes}
226 # Update node objects with calculated scores
227 for node_id, node in self.nodes.items():
228 node.centrality_score = (
229 degree_centrality.get(node_id, 0.0) * 0.4 +
230 betweenness_centrality.get(node_id, 0.0) * 0.6
231 )
232 node.hub_score = hub_scores.get(node_id, 0.0)
233 node.authority_score = authority_scores.get(node_id, 0.0)
235 logger.info(f"Calculated centrality scores for {len(self.nodes)} nodes")
237 except Exception as e:
238 logger.error(f"Failed to calculate centrality scores: {e}")
240 def get_neighbors(self, node_id: str, relationship_types: Optional[List[RelationshipType]] = None) -> List[Tuple[str, GraphEdge]]:
241 """Get neighboring nodes with their connecting edges."""
242 neighbors = []
244 if node_id not in self.graph:
245 return neighbors
247 for neighbor_id in self.graph.neighbors(node_id):
248 # Get all edges between these nodes
249 edge_data = self.graph.get_edge_data(node_id, neighbor_id)
250 if edge_data:
251 for key, data in edge_data.items():
252 relationship = RelationshipType(data['relationship'])
253 if relationship_types is None or relationship in relationship_types:
254 edge_key = (node_id, neighbor_id, relationship.value)
255 if edge_key in self.edges:
256 neighbors.append((neighbor_id, self.edges[edge_key]))
258 return neighbors
260 def get_statistics(self) -> Dict[str, Any]:
261 """Get graph statistics."""
262 stats = {
263 "total_nodes": len(self.nodes),
264 "total_edges": len(self.edges),
265 "node_types": {node_type.value: len(nodes) for node_type, nodes in self.node_type_index.items()},
266 "relationship_types": {},
267 "connected_components": nx.number_weakly_connected_components(self.graph),
268 "avg_degree": sum(dict(self.graph.degree()).values()) / max(len(self.graph.nodes), 1)
269 }
271 # Count relationship types
272 for edge in self.edges.values():
273 rel_type = edge.relationship_type.value
274 stats["relationship_types"][rel_type] = stats["relationship_types"].get(rel_type, 0) + 1
276 return stats
279class GraphTraverser:
280 """Advanced graph traversal for multi-hop search and content discovery."""
282 def __init__(self, knowledge_graph: KnowledgeGraph, spacy_analyzer: Optional[SpaCyQueryAnalyzer] = None):
283 """Initialize the graph traverser."""
284 self.graph = knowledge_graph
285 self.spacy_analyzer = spacy_analyzer or SpaCyQueryAnalyzer()
286 logger.debug("Initialized graph traverser")
288 def traverse(
289 self,
290 start_nodes: List[str],
291 query_analysis: Optional[QueryAnalysis] = None,
292 strategy: TraversalStrategy = TraversalStrategy.WEIGHTED,
293 max_hops: int = 3,
294 max_results: int = 20,
295 min_weight: float = 0.1
296 ) -> List[TraversalResult]:
297 """Traverse the graph to find related content."""
299 results = []
300 visited = set()
302 for start_node_id in start_nodes:
303 if start_node_id not in self.graph.nodes:
304 continue
306 # Perform traversal based on strategy
307 if strategy == TraversalStrategy.BREADTH_FIRST:
308 node_results = self._breadth_first_traversal(
309 start_node_id, query_analysis, max_hops, max_results, min_weight, visited
310 )
311 elif strategy == TraversalStrategy.WEIGHTED:
312 node_results = self._weighted_traversal(
313 start_node_id, query_analysis, max_hops, max_results, min_weight, visited
314 )
315 elif strategy == TraversalStrategy.CENTRALITY:
316 node_results = self._centrality_traversal(
317 start_node_id, query_analysis, max_hops, max_results, min_weight, visited
318 )
319 elif strategy == TraversalStrategy.SEMANTIC:
320 node_results = self._semantic_traversal(
321 start_node_id, query_analysis, max_hops, max_results, min_weight, visited
322 )
323 else:
324 node_results = self._breadth_first_traversal(
325 start_node_id, query_analysis, max_hops, max_results, min_weight, visited
326 )
328 results.extend(node_results)
330 # Sort by semantic score and total weight
331 results.sort(key=lambda r: (r.semantic_score, r.total_weight), reverse=True)
332 return results[:max_results]
334 def _breadth_first_traversal(
335 self,
336 start_node_id: str,
337 query_analysis: Optional[QueryAnalysis],
338 max_hops: int,
339 max_results: int,
340 min_weight: float,
341 visited: Set[str]
342 ) -> List[TraversalResult]:
343 """Breadth-first traversal implementation."""
345 results = []
346 queue = [(start_node_id, [], [], 0.0, 0)] # (node_id, path, edges, weight, hops)
347 local_visited = set()
349 while queue and len(results) < max_results:
350 node_id, path, edges, total_weight, hops = queue.pop(0)
352 if node_id in local_visited or hops >= max_hops:
353 continue
355 local_visited.add(node_id)
357 # Create traversal result
358 if node_id != start_node_id: # Don't include the starting node
359 semantic_score = self._calculate_semantic_score(node_id, query_analysis)
360 reasoning_path = self._build_reasoning_path(path, edges)
362 result = TraversalResult(
363 path=path + [node_id],
364 nodes=[self.graph.nodes[nid] for nid in path + [node_id]],
365 relationships=edges,
366 total_weight=total_weight,
367 semantic_score=semantic_score,
368 hop_count=hops,
369 reasoning_path=reasoning_path
370 )
371 results.append(result)
373 # Add neighbors to queue
374 neighbors = self.graph.get_neighbors(node_id)
375 for neighbor_id, edge in neighbors:
376 if neighbor_id not in local_visited and edge.weight >= min_weight:
377 queue.append((
378 neighbor_id,
379 path + [node_id],
380 edges + [edge],
381 total_weight + edge.weight,
382 hops + 1
383 ))
385 return results
387 def _weighted_traversal(
388 self,
389 start_node_id: str,
390 query_analysis: Optional[QueryAnalysis],
391 max_hops: int,
392 max_results: int,
393 min_weight: float,
394 visited: Set[str]
395 ) -> List[TraversalResult]:
396 """Weighted traversal prioritizing strong relationships."""
398 results = []
399 # Priority queue: (negative_weight, node_id, path, edges, weight, hops)
400 import heapq
401 heap = [(-1.0, start_node_id, [], [], 0.0, 0)]
402 local_visited = set()
404 while heap and len(results) < max_results:
405 neg_weight, node_id, path, edges, total_weight, hops = heapq.heappop(heap)
407 if node_id in local_visited or hops >= max_hops:
408 continue
410 local_visited.add(node_id)
412 # Create traversal result
413 if node_id != start_node_id:
414 semantic_score = self._calculate_semantic_score(node_id, query_analysis)
415 reasoning_path = self._build_reasoning_path(path, edges)
417 result = TraversalResult(
418 path=path + [node_id],
419 nodes=[self.graph.nodes[nid] for nid in path + [node_id]],
420 relationships=edges,
421 total_weight=total_weight,
422 semantic_score=semantic_score,
423 hop_count=hops,
424 reasoning_path=reasoning_path
425 )
426 results.append(result)
428 # Add neighbors to heap
429 neighbors = self.graph.get_neighbors(node_id)
430 for neighbor_id, edge in neighbors:
431 if neighbor_id not in local_visited and edge.weight >= min_weight:
432 new_weight = total_weight + edge.weight
433 heapq.heappush(heap, (
434 -new_weight, # Negative for max-heap behavior
435 neighbor_id,
436 path + [node_id],
437 edges + [edge],
438 new_weight,
439 hops + 1
440 ))
442 return results
444 def _centrality_traversal(
445 self,
446 start_node_id: str,
447 query_analysis: Optional[QueryAnalysis],
448 max_hops: int,
449 max_results: int,
450 min_weight: float,
451 visited: Set[str]
452 ) -> List[TraversalResult]:
453 """Traversal prioritizing high-centrality nodes."""
455 results = []
456 import heapq
457 # Priority queue: (negative_centrality, node_id, path, edges, weight, hops)
458 start_centrality = self.graph.nodes[start_node_id].centrality_score
459 heap = [(-start_centrality, start_node_id, [], [], 0.0, 0)]
460 local_visited = set()
462 while heap and len(results) < max_results:
463 neg_centrality, node_id, path, edges, total_weight, hops = heapq.heappop(heap)
465 if node_id in local_visited or hops >= max_hops:
466 continue
468 local_visited.add(node_id)
470 # Create traversal result
471 if node_id != start_node_id:
472 semantic_score = self._calculate_semantic_score(node_id, query_analysis)
473 reasoning_path = self._build_reasoning_path(path, edges)
475 result = TraversalResult(
476 path=path + [node_id],
477 nodes=[self.graph.nodes[nid] for nid in path + [node_id]],
478 relationships=edges,
479 total_weight=total_weight,
480 semantic_score=semantic_score,
481 hop_count=hops,
482 reasoning_path=reasoning_path
483 )
484 results.append(result)
486 # Add neighbors to heap
487 neighbors = self.graph.get_neighbors(node_id)
488 for neighbor_id, edge in neighbors:
489 if neighbor_id not in local_visited and edge.weight >= min_weight:
490 neighbor_centrality = self.graph.nodes[neighbor_id].centrality_score
491 heapq.heappush(heap, (
492 -neighbor_centrality,
493 neighbor_id,
494 path + [node_id],
495 edges + [edge],
496 total_weight + edge.weight,
497 hops + 1
498 ))
500 return results
502 def _semantic_traversal(
503 self,
504 start_node_id: str,
505 query_analysis: Optional[QueryAnalysis],
506 max_hops: int,
507 max_results: int,
508 min_weight: float,
509 visited: Set[str]
510 ) -> List[TraversalResult]:
511 """Traversal prioritizing semantic similarity to query."""
513 if not query_analysis:
514 return self._breadth_first_traversal(start_node_id, query_analysis, max_hops, max_results, min_weight, visited)
516 results = []
517 import heapq
518 # Priority queue: (negative_semantic_score, node_id, path, edges, weight, hops)
519 start_score = self._calculate_semantic_score(start_node_id, query_analysis)
520 heap = [(-start_score, start_node_id, [], [], 0.0, 0)]
521 local_visited = set()
523 while heap and len(results) < max_results:
524 neg_score, node_id, path, edges, total_weight, hops = heapq.heappop(heap)
526 if node_id in local_visited or hops >= max_hops:
527 continue
529 local_visited.add(node_id)
531 # Create traversal result
532 if node_id != start_node_id:
533 semantic_score = -neg_score # Convert back from negative
534 reasoning_path = self._build_reasoning_path(path, edges)
536 result = TraversalResult(
537 path=path + [node_id],
538 nodes=[self.graph.nodes[nid] for nid in path + [node_id]],
539 relationships=edges,
540 total_weight=total_weight,
541 semantic_score=semantic_score,
542 hop_count=hops,
543 reasoning_path=reasoning_path
544 )
545 results.append(result)
547 # Add neighbors to heap
548 neighbors = self.graph.get_neighbors(node_id)
549 for neighbor_id, edge in neighbors:
550 if neighbor_id not in local_visited and edge.weight >= min_weight:
551 neighbor_score = self._calculate_semantic_score(neighbor_id, query_analysis)
552 heapq.heappush(heap, (
553 -neighbor_score,
554 neighbor_id,
555 path + [node_id],
556 edges + [edge],
557 total_weight + edge.weight,
558 hops + 1
559 ))
561 return results
563 def _calculate_semantic_score(self, node_id: str, query_analysis: Optional[QueryAnalysis]) -> float:
564 """Calculate semantic similarity between node and query."""
565 if not query_analysis:
566 return 0.0
568 node = self.graph.nodes[node_id]
570 # Calculate similarity based on entities, topics, and keywords
571 entity_similarity = self._calculate_list_similarity(
572 query_analysis.entities, [(e, "") for e in node.entities]
573 )
575 topic_similarity = self._calculate_list_similarity(
576 [(t, "") for t in query_analysis.main_concepts], [(t, "") for t in node.topics]
577 )
579 keyword_similarity = self._calculate_list_similarity(
580 [(k, "") for k in query_analysis.semantic_keywords], [(k, "") for k in node.keywords]
581 )
583 # Weighted combination
584 total_score = (
585 entity_similarity * 0.4 +
586 topic_similarity * 0.3 +
587 keyword_similarity * 0.3
588 )
590 return total_score
592 def _calculate_list_similarity(self, list1: List[Tuple[str, str]], list2: List[Tuple[str, str]]) -> float:
593 """Calculate similarity between two lists of items."""
594 if not list1 or not list2:
595 return 0.0
597 set1 = set(item[0].lower() for item in list1)
598 set2 = set(item[0].lower() for item in list2)
600 intersection = len(set1.intersection(set2))
601 union = len(set1.union(set2))
603 return intersection / max(union, 1)
605 def _build_reasoning_path(self, path: List[str], edges: List[GraphEdge]) -> List[str]:
606 """Build human-readable reasoning path."""
607 reasoning = []
609 for i, edge in enumerate(edges):
610 source_node = self.graph.nodes[edge.source_id]
611 target_node = self.graph.nodes[edge.target_id]
613 reasoning.append(
614 f"{source_node.title} --{edge.relationship_type.value}--> {target_node.title} "
615 f"(weight: {edge.weight:.2f})"
616 )
618 return reasoning
621class GraphBuilder:
622 """Build knowledge graph from document metadata and search results."""
624 def __init__(self, spacy_analyzer: Optional[SpaCyQueryAnalyzer] = None):
625 """Initialize the graph builder."""
626 self.spacy_analyzer = spacy_analyzer or SpaCyQueryAnalyzer()
627 logger.info("Initialized graph builder")
629 def build_from_search_results(self, search_results: List[SearchResult]) -> KnowledgeGraph:
630 """Build knowledge graph from search results metadata."""
632 start_time = time.time()
633 graph = KnowledgeGraph()
635 try:
636 # Step 1: Create nodes from search results
637 document_nodes = self._create_document_nodes(search_results)
638 for node in document_nodes:
639 graph.add_node(node)
641 # Step 2: Create entity and topic nodes
642 entity_nodes, topic_nodes = self._create_concept_nodes(search_results)
643 for node in entity_nodes + topic_nodes:
644 graph.add_node(node)
646 # Step 3: Create relationships
647 edges = self._create_relationships(search_results, graph)
648 for edge in edges:
649 graph.add_edge(edge)
651 # Step 4: Calculate centrality scores
652 graph.calculate_centrality_scores()
654 build_time = (time.time() - start_time) * 1000
655 stats = graph.get_statistics()
657 logger.info(
658 f"Built knowledge graph in {build_time:.2f}ms",
659 nodes=stats["total_nodes"],
660 edges=stats["total_edges"],
661 components=stats["connected_components"]
662 )
664 return graph
666 except Exception as e:
667 logger.error(f"Failed to build knowledge graph: {e}")
668 return graph
670 def _create_document_nodes(self, search_results: List[SearchResult]) -> List[GraphNode]:
671 """Create document and section nodes from search results."""
673 nodes = []
674 seen_documents = set()
676 for result in search_results:
677 # Create document node
678 doc_id = f"doc_{result.source_type}_{hash(result.source_url or result.text[:100])}"
680 if doc_id not in seen_documents:
681 seen_documents.add(doc_id)
683 doc_node = GraphNode(
684 id=doc_id,
685 node_type=NodeType.DOCUMENT,
686 title=result.source_title or f"Document from {result.source_type}",
687 content=result.text[:500], # First 500 chars as summary
688 metadata={
689 "source_type": result.source_type,
690 "source_title": result.source_title,
691 "url": result.source_url,
692 "project_id": result.project_id,
693 "collection_name": result.collection_name
694 },
695 entities=self._extract_entities(result),
696 topics=self._extract_topics(result),
697 concepts=self._extract_concepts(result),
698 keywords=self._extract_keywords(result)
699 )
700 nodes.append(doc_node)
702 # Create section node
703 section_id = f"section_{hash(result.text)}"
704 section_node = GraphNode(
705 id=section_id,
706 node_type=NodeType.SECTION,
707 title=(result.section_title or result.breadcrumb_text or "Section")[-50:], # Last 50 chars
708 content=result.text,
709 metadata={
710 "parent_document": doc_id,
711 "breadcrumb": result.breadcrumb_text,
712 "section_level": result.section_level or result.depth,
713 "score": result.score,
714 "section_type": result.section_type
715 },
716 entities=self._extract_entities(result),
717 topics=self._extract_topics(result),
718 concepts=self._extract_concepts(result),
719 keywords=self._extract_keywords(result)
720 )
721 nodes.append(section_node)
723 return nodes
725 def _create_concept_nodes(self, search_results: List[SearchResult]) -> Tuple[List[GraphNode], List[GraphNode]]:
726 """Create entity and topic nodes from extracted metadata."""
728 # Collect all entities and topics
729 entity_counts = Counter()
730 topic_counts = Counter()
732 for result in search_results:
733 entities = self._extract_entities(result)
734 topics = self._extract_topics(result)
736 for entity in entities:
737 entity_counts[entity] += 1
738 for topic in topics:
739 topic_counts[topic] += 1
741 # Create nodes for frequent entities and topics
742 entity_nodes = []
743 topic_nodes = []
745 # Entities mentioned in at least 2 documents
746 for entity, count in entity_counts.items():
747 if count >= 2:
748 entity_node = GraphNode(
749 id=f"entity_{hash(entity)}",
750 node_type=NodeType.ENTITY,
751 title=entity,
752 metadata={"mention_count": count, "entity_type": "extracted"}
753 )
754 entity_nodes.append(entity_node)
756 # Topics mentioned in at least 2 documents
757 for topic, count in topic_counts.items():
758 if count >= 2:
759 topic_node = GraphNode(
760 id=f"topic_{hash(topic)}",
761 node_type=NodeType.TOPIC,
762 title=topic,
763 metadata={"mention_count": count, "topic_type": "extracted"}
764 )
765 topic_nodes.append(topic_node)
767 return entity_nodes, topic_nodes
769 def _create_relationships(self, search_results: List[SearchResult], graph: KnowledgeGraph) -> List[GraphEdge]:
770 """Create relationships between graph nodes."""
772 edges = []
774 # Document -> Section relationships
775 for result in search_results:
776 doc_id = f"doc_{result.source_type}_{hash(result.source_url or result.text[:100])}"
777 section_id = f"section_{hash(result.text)}"
779 if doc_id in graph.nodes and section_id in graph.nodes:
780 edge = GraphEdge(
781 source_id=doc_id,
782 target_id=section_id,
783 relationship_type=RelationshipType.CONTAINS,
784 weight=1.0,
785 confidence=1.0,
786 evidence=["hierarchical_structure"]
787 )
788 edges.append(edge)
790 # Entity relationships
791 entity_edges = self._create_entity_relationships(search_results, graph)
792 edges.extend(entity_edges)
794 # Topic relationships
795 topic_edges = self._create_topic_relationships(search_results, graph)
796 edges.extend(topic_edges)
798 # Semantic similarity relationships
799 similarity_edges = self._create_similarity_relationships(graph)
800 edges.extend(similarity_edges)
802 return edges
804 def _create_entity_relationships(self, search_results: List[SearchResult], graph: KnowledgeGraph) -> List[GraphEdge]:
805 """Create entity-related relationships."""
807 edges = []
809 # Document/Section mentions Entity
810 for result in search_results:
811 section_id = f"section_{hash(result.text)}"
812 entities = self._extract_entities(result)
814 for entity in entities:
815 entity_nodes = graph.find_nodes_by_entity(entity)
816 for entity_node in entity_nodes:
817 if section_id in graph.nodes:
818 edge = GraphEdge(
819 source_id=section_id,
820 target_id=entity_node.id,
821 relationship_type=RelationshipType.MENTIONS,
822 weight=0.7,
823 confidence=0.8,
824 evidence=[f"entity_extraction: {entity}"]
825 )
826 edges.append(edge)
828 # Entity co-occurrence relationships
829 co_occurrence_edges = self._create_entity_cooccurrence(search_results, graph)
830 edges.extend(co_occurrence_edges)
832 return edges
834 def _create_topic_relationships(self, search_results: List[SearchResult], graph: KnowledgeGraph) -> List[GraphEdge]:
835 """Create topic-related relationships."""
837 edges = []
839 # Document/Section discusses Topic
840 for result in search_results:
841 section_id = f"section_{hash(result.text)}"
842 topics = self._extract_topics(result)
844 for topic in topics:
845 topic_nodes = graph.find_nodes_by_topic(topic)
846 for topic_node in topic_nodes:
847 if section_id in graph.nodes:
848 edge = GraphEdge(
849 source_id=section_id,
850 target_id=topic_node.id,
851 relationship_type=RelationshipType.DISCUSSES,
852 weight=0.6,
853 confidence=0.7,
854 evidence=[f"topic_extraction: {topic}"]
855 )
856 edges.append(edge)
858 return edges
860 def _create_entity_cooccurrence(self, search_results: List[SearchResult], graph: KnowledgeGraph) -> List[GraphEdge]:
861 """Create entity co-occurrence relationships."""
863 edges = []
864 cooccurrence_counts = defaultdict(int)
866 # Count entity co-occurrences
867 for result in search_results:
868 entities = self._extract_entities(result)
869 for i, entity1 in enumerate(entities):
870 for entity2 in entities[i+1:]:
871 pair = tuple(sorted([entity1, entity2]))
872 cooccurrence_counts[pair] += 1
874 # Create edges for significant co-occurrences
875 for (entity1, entity2), count in cooccurrence_counts.items():
876 if count >= 2: # Appeared together at least twice
877 entity1_nodes = graph.find_nodes_by_entity(entity1)
878 entity2_nodes = graph.find_nodes_by_entity(entity2)
880 for node1 in entity1_nodes:
881 for node2 in entity2_nodes:
882 weight = min(1.0, count / 5.0) # Normalize to max 1.0
883 edge = GraphEdge(
884 source_id=node1.id,
885 target_id=node2.id,
886 relationship_type=RelationshipType.CO_OCCURS,
887 weight=weight,
888 confidence=weight,
889 evidence=[f"co_occurrence_count: {count}"]
890 )
891 edges.append(edge)
893 return edges
895 def _create_similarity_relationships(self, graph: KnowledgeGraph) -> List[GraphEdge]:
896 """Create semantic similarity relationships between nodes."""
898 edges = []
900 # Calculate similarity between section nodes
901 section_nodes = graph.find_nodes_by_type(NodeType.SECTION)
903 for i, node1 in enumerate(section_nodes):
904 for node2 in section_nodes[i+1:]:
905 similarity = self._calculate_node_similarity(node1, node2)
907 if similarity > 0.3: # Threshold for meaningful similarity
908 edge = GraphEdge(
909 source_id=node1.id,
910 target_id=node2.id,
911 relationship_type=RelationshipType.SIMILAR_TO,
912 weight=similarity,
913 confidence=similarity,
914 evidence=[f"semantic_similarity: {similarity:.3f}"]
915 )
916 edges.append(edge)
918 return edges
920 def _calculate_node_similarity(self, node1: GraphNode, node2: GraphNode) -> float:
921 """Calculate similarity between two nodes."""
923 # Entity overlap
924 entity_similarity = self._jaccard_similarity(set(node1.entities), set(node2.entities))
926 # Topic overlap
927 topic_similarity = self._jaccard_similarity(set(node1.topics), set(node2.topics))
929 # Keyword overlap
930 keyword_similarity = self._jaccard_similarity(set(node1.keywords), set(node2.keywords))
932 # Weighted combination
933 total_similarity = (
934 entity_similarity * 0.4 +
935 topic_similarity * 0.3 +
936 keyword_similarity * 0.3
937 )
939 return total_similarity
941 def _jaccard_similarity(self, set1: Set[str], set2: Set[str]) -> float:
942 """Calculate Jaccard similarity between two sets."""
943 if not set1 or not set2:
944 return 0.0
946 intersection = len(set1.intersection(set2))
947 union = len(set1.union(set2))
949 return intersection / max(union, 1)
951 def _extract_entities(self, result: SearchResult) -> List[str]:
952 """Extract entities from search result fields."""
953 entities = []
955 # Use available fields from the SearchResult model
956 # Since the metadata structure is now flattened, we'll extract from text and titles
957 if result.source_title:
958 entities.append(result.source_title)
960 if result.parent_title:
961 entities.append(result.parent_title)
963 if result.section_title:
964 entities.append(result.section_title)
966 # Basic entity extraction from project names
967 if result.project_name:
968 entities.append(result.project_name)
970 return list(set(entities)) # Remove duplicates
972 def _extract_topics(self, result: SearchResult) -> List[str]:
973 """Extract topics from search result fields."""
974 topics = []
976 # From breadcrumb (hierarchical topics)
977 if result.breadcrumb_text:
978 topics.extend([section.strip() for section in result.breadcrumb_text.split(" > ")])
980 # From section information
981 if result.section_type:
982 topics.append(result.section_type)
984 # From source type as a topic
985 if result.source_type:
986 topics.append(result.source_type)
988 return list(set(topics))
990 def _extract_concepts(self, result: SearchResult) -> List[str]:
991 """Extract concepts from search result fields."""
992 concepts = []
994 # Use section titles and breadcrumbs as concepts
995 if result.section_title:
996 concepts.append(result.section_title)
998 if result.hierarchy_context:
999 concepts.append(result.hierarchy_context)
1001 return list(set(concepts))
1003 def _extract_keywords(self, result: SearchResult) -> List[str]:
1004 """Extract keywords from search result text and titles."""
1005 keywords = []
1007 # Basic keyword extraction from text (first few words)
1008 text_words = result.text.lower().split()[:10] # First 10 words
1009 keywords.extend([word for word in text_words if len(word) > 3 and word.isalpha()])
1011 # Keywords from titles
1012 if result.source_title:
1013 title_words = result.source_title.lower().split()
1014 keywords.extend([word for word in title_words if len(word) > 3 and word.isalpha()])
1016 return list(set(keywords))
1019class DocumentKnowledgeGraph:
1020 """High-level interface for document knowledge graph operations."""
1022 def __init__(self, spacy_analyzer: Optional[SpaCyQueryAnalyzer] = None):
1023 """Initialize the document knowledge graph system."""
1024 self.spacy_analyzer = spacy_analyzer or SpaCyQueryAnalyzer()
1025 self.graph_builder = GraphBuilder(self.spacy_analyzer)
1026 self.knowledge_graph: Optional[KnowledgeGraph] = None
1027 self.traverser: Optional[GraphTraverser] = None
1029 logger.info("Initialized document knowledge graph system")
1031 def build_graph(self, search_results: List[SearchResult]) -> bool:
1032 """Build knowledge graph from search results."""
1033 try:
1034 self.knowledge_graph = self.graph_builder.build_from_search_results(search_results)
1035 self.traverser = GraphTraverser(self.knowledge_graph, self.spacy_analyzer)
1037 stats = self.knowledge_graph.get_statistics()
1038 logger.info("Knowledge graph built successfully", **stats)
1039 return True
1041 except Exception as e:
1042 logger.error(f"Failed to build knowledge graph: {e}")
1043 return False
1045 def find_related_content(
1046 self,
1047 query: str,
1048 max_hops: int = 3,
1049 max_results: int = 20,
1050 strategy: TraversalStrategy = TraversalStrategy.SEMANTIC
1051 ) -> List[TraversalResult]:
1052 """Find related content using graph traversal."""
1054 if not self.knowledge_graph or not self.traverser:
1055 logger.warning("Knowledge graph not initialized")
1056 return []
1058 try:
1059 # Analyze query with spaCy
1060 query_analysis = self.spacy_analyzer.analyze_query_semantic(query)
1062 # Find starting nodes based on query entities and concepts
1063 start_nodes = self._find_query_start_nodes(query_analysis)
1065 if not start_nodes:
1066 logger.debug("No starting nodes found for query")
1067 return []
1069 # Traverse graph to find related content
1070 results = self.traverser.traverse(
1071 start_nodes=start_nodes,
1072 query_analysis=query_analysis,
1073 strategy=strategy,
1074 max_hops=max_hops,
1075 max_results=max_results
1076 )
1078 logger.debug(f"Found {len(results)} related content items via graph traversal")
1079 return results
1081 except Exception as e:
1082 logger.error(f"Failed to find related content: {e}")
1083 return []
1085 def _find_query_start_nodes(self, query_analysis: QueryAnalysis) -> List[str]:
1086 """Find starting nodes for graph traversal based on query analysis."""
1088 start_nodes = []
1090 # Find nodes by entities
1091 for entity_text, entity_label in query_analysis.entities:
1092 entity_nodes = self.knowledge_graph.find_nodes_by_entity(entity_text)
1093 start_nodes.extend([node.id for node in entity_nodes])
1095 # Find nodes by main concepts (as topics)
1096 for concept in query_analysis.main_concepts:
1097 topic_nodes = self.knowledge_graph.find_nodes_by_topic(concept)
1098 start_nodes.extend([node.id for node in topic_nodes])
1100 # If no entity/topic matches, use high-centrality document nodes
1101 if not start_nodes:
1102 doc_nodes = self.knowledge_graph.find_nodes_by_type(NodeType.DOCUMENT)
1103 # Sort by centrality and take top 3
1104 doc_nodes.sort(key=lambda n: n.centrality_score, reverse=True)
1105 start_nodes = [node.id for node in doc_nodes[:3]]
1107 return list(set(start_nodes)) # Remove duplicates
1109 def get_graph_statistics(self) -> Optional[Dict[str, Any]]:
1110 """Get knowledge graph statistics."""
1111 if self.knowledge_graph:
1112 return self.knowledge_graph.get_statistics()
1113 return None
1115 def export_graph(self, format: str = "json") -> Optional[str]:
1116 """Export knowledge graph in specified format."""
1117 if not self.knowledge_graph:
1118 return None
1120 try:
1121 if format == "json":
1122 # Export as JSON with nodes and edges
1123 data = {
1124 "nodes": [
1125 {
1126 "id": node.id,
1127 "type": node.node_type.value,
1128 "title": node.title,
1129 "centrality": node.centrality_score,
1130 "entities": node.entities,
1131 "topics": node.topics
1132 }
1133 for node in self.knowledge_graph.nodes.values()
1134 ],
1135 "edges": [
1136 {
1137 "source": edge.source_id,
1138 "target": edge.target_id,
1139 "relationship": edge.relationship_type.value,
1140 "weight": edge.weight,
1141 "confidence": edge.confidence
1142 }
1143 for edge in self.knowledge_graph.edges.values()
1144 ]
1145 }
1146 return json.dumps(data, indent=2)
1148 except Exception as e:
1149 logger.error(f"Failed to export graph: {e}")
1151 return None