Coverage for src/qdrant_loader_mcp_server/search/enhanced/cross_document_intelligence.py: 84%
928 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:38 +0000
1"""
2🔥 Phase 2.3: Cross-Document Intelligence
4This module implements advanced cross-document relationship analysis that leverages
5the rich metadata extracted during document ingestion. It provides intelligent
6document clustering, similarity analysis, citation networks, and complementary
7content discovery.
9Key Features:
10- Document similarity calculation using entity/topic/metadata overlap
11- Intelligent document clustering based on shared concepts
12- Citation network analysis from cross-references and hierarchical data
13- Complementary content recommendation using knowledge graph
14- Conflict detection between documents
15- Cross-project relationship discovery
16"""
18import logging
19import time
20import math
21import networkx as nx
22from collections import defaultdict, Counter
23from dataclasses import dataclass, field
24from enum import Enum
25from typing import Any, Dict, List, Optional, Set, Tuple, Union
26from datetime import datetime
28from ...utils.logging import LoggingConfig
29from ..nlp.spacy_analyzer import SpaCyQueryAnalyzer, QueryAnalysis
30from ..models import SearchResult
31from .knowledge_graph import DocumentKnowledgeGraph, NodeType, TraversalStrategy
33logger = LoggingConfig.get_logger(__name__)
36class SimilarityMetric(Enum):
37 """Types of similarity metrics for document comparison."""
38 ENTITY_OVERLAP = "entity_overlap"
39 TOPIC_OVERLAP = "topic_overlap"
40 SEMANTIC_SIMILARITY = "semantic_similarity"
41 METADATA_SIMILARITY = "metadata_similarity"
42 HIERARCHICAL_DISTANCE = "hierarchical_distance"
43 CONTENT_FEATURES = "content_features"
44 COMBINED = "combined"
47class ClusteringStrategy(Enum):
48 """Strategies for document clustering."""
49 ENTITY_BASED = "entity_based"
50 TOPIC_BASED = "topic_based"
51 PROJECT_BASED = "project_based"
52 HIERARCHICAL = "hierarchical"
53 MIXED_FEATURES = "mixed_features"
54 SEMANTIC_EMBEDDING = "semantic_embedding"
57class RelationshipType(Enum):
58 """Types of relationships between documents."""
59 HIERARCHICAL = "hierarchical" # Parent-child relationships
60 CROSS_REFERENCE = "cross_reference" # Explicit links between documents
61 SEMANTIC_SIMILARITY = "semantic_similarity" # Content similarity
62 COMPLEMENTARY = "complementary" # Documents that complement each other
63 CONFLICTING = "conflicting" # Documents with contradictory information
64 SEQUENTIAL = "sequential" # Documents in sequence (next/previous)
65 TOPICAL_GROUPING = "topical_grouping" # Documents on same topic
66 PROJECT_GROUPING = "project_grouping" # Documents in same project
69@dataclass
70class DocumentSimilarity:
71 """Represents similarity between two documents."""
72 doc1_id: str
73 doc2_id: str
74 similarity_score: float # 0.0 - 1.0
75 metric_scores: Dict[SimilarityMetric, float] = field(default_factory=dict)
76 shared_entities: List[str] = field(default_factory=list)
77 shared_topics: List[str] = field(default_factory=list)
78 relationship_type: RelationshipType = RelationshipType.SEMANTIC_SIMILARITY
79 explanation: str = ""
81 def get_display_explanation(self) -> str:
82 """Get human-readable explanation of similarity."""
83 if self.explanation:
84 return self.explanation
86 explanations = []
87 if self.shared_entities:
88 explanations.append(f"Shared entities: {', '.join(self.shared_entities[:3])}")
89 if self.shared_topics:
90 explanations.append(f"Shared topics: {', '.join(self.shared_topics[:3])}")
91 if self.metric_scores:
92 top_metric = max(self.metric_scores.items(), key=lambda x: x[1])
93 explanations.append(f"High {top_metric[0].value}: {top_metric[1]:.2f}")
95 return "; ".join(explanations) if explanations else "Semantic similarity"
98@dataclass
99class DocumentCluster:
100 """Represents a cluster of related documents."""
101 cluster_id: str
102 name: str
103 documents: List[str] = field(default_factory=list) # Document IDs
104 shared_entities: List[str] = field(default_factory=list)
105 shared_topics: List[str] = field(default_factory=list)
106 cluster_strategy: ClusteringStrategy = ClusteringStrategy.MIXED_FEATURES
107 coherence_score: float = 0.0 # 0.0 - 1.0
108 representative_doc_id: str = ""
109 cluster_description: str = ""
111 def get_cluster_summary(self) -> Dict[str, Any]:
112 """Get summary information about the cluster."""
113 return {
114 "cluster_id": self.cluster_id,
115 "name": self.name,
116 "document_count": len(self.documents),
117 "coherence_score": self.coherence_score,
118 "primary_entities": self.shared_entities[:5],
119 "primary_topics": self.shared_topics[:5],
120 "strategy": self.cluster_strategy.value,
121 "description": self.cluster_description
122 }
125@dataclass
126class CitationNetwork:
127 """Represents a citation/reference network between documents."""
128 nodes: Dict[str, Dict[str, Any]] = field(default_factory=dict) # doc_id -> metadata
129 edges: List[Tuple[str, str, Dict[str, Any]]] = field(default_factory=list) # (from, to, metadata)
130 graph: Optional[nx.DiGraph] = None
131 authority_scores: Dict[str, float] = field(default_factory=dict)
132 hub_scores: Dict[str, float] = field(default_factory=dict)
133 pagerank_scores: Dict[str, float] = field(default_factory=dict)
135 def build_graph(self) -> nx.DiGraph:
136 """Build NetworkX graph from nodes and edges."""
137 if self.graph is None:
138 self.graph = nx.DiGraph()
140 # Add nodes
141 for doc_id, metadata in self.nodes.items():
142 self.graph.add_node(doc_id, **metadata)
144 # Add edges
145 for from_doc, to_doc, edge_metadata in self.edges:
146 self.graph.add_edge(from_doc, to_doc, **edge_metadata)
148 return self.graph
150 def calculate_centrality_scores(self):
151 """Calculate various centrality scores for the citation network."""
152 if self.graph is None:
153 self.build_graph()
155 try:
156 # Calculate HITS algorithm scores
157 hits_scores = nx.hits(self.graph, max_iter=100, normalized=True)
158 self.hub_scores = hits_scores[0]
159 self.authority_scores = hits_scores[1]
161 # Calculate PageRank scores
162 self.pagerank_scores = nx.pagerank(self.graph, max_iter=100)
164 except Exception as e:
165 logger.warning(f"Failed to calculate centrality scores: {e}")
166 # Fallback to simple degree centrality
167 if self.graph.nodes():
168 degree_centrality = nx.degree_centrality(self.graph)
169 self.authority_scores = degree_centrality
170 self.hub_scores = degree_centrality
171 self.pagerank_scores = degree_centrality
174@dataclass
175class ComplementaryContent:
176 """Represents complementary content recommendations."""
177 target_doc_id: str
178 recommendations: List[Tuple[str, float, str]] = field(default_factory=list) # (doc_id, score, reason)
179 recommendation_strategy: str = "mixed"
180 generated_at: datetime = field(default_factory=datetime.now)
182 def get_top_recommendations(self, limit: int = 5) -> List[Dict[str, Any]]:
183 """Get top N recommendations with detailed information."""
184 top_recs = sorted(self.recommendations, key=lambda x: x[1], reverse=True)[:limit]
185 return [
186 {
187 "document_id": doc_id,
188 "relevance_score": score,
189 "recommendation_reason": reason,
190 "strategy": self.recommendation_strategy
191 }
192 for doc_id, score, reason in top_recs
193 ]
196@dataclass
197class ConflictAnalysis:
198 """Represents analysis of conflicting information between documents."""
199 conflicting_pairs: List[Tuple[str, str, Dict[str, Any]]] = field(default_factory=list) # (doc1, doc2, conflict_info)
200 conflict_categories: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict)
201 resolution_suggestions: Dict[str, str] = field(default_factory=dict)
203 def get_conflict_summary(self) -> Dict[str, Any]:
204 """Get summary of detected conflicts."""
205 return {
206 "total_conflicts": len(self.conflicting_pairs),
207 "conflict_categories": {cat: len(pairs) for cat, pairs in self.conflict_categories.items()},
208 "most_common_conflicts": self._get_most_common_conflicts(),
209 "resolution_suggestions": list(self.resolution_suggestions.values())[:3]
210 }
212 def _get_most_common_conflicts(self) -> List[str]:
213 """Get the most common types of conflicts."""
214 return sorted(self.conflict_categories.keys(),
215 key=lambda x: len(self.conflict_categories[x]),
216 reverse=True)[:3]
219class DocumentSimilarityCalculator:
220 """Calculates similarity between documents using multiple metrics."""
222 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer):
223 """Initialize the similarity calculator."""
224 self.spacy_analyzer = spacy_analyzer
225 self.logger = LoggingConfig.get_logger(__name__)
227 def calculate_similarity(self, doc1: SearchResult, doc2: SearchResult,
228 metrics: List[SimilarityMetric] = None) -> DocumentSimilarity:
229 """Calculate comprehensive similarity between two documents."""
230 if metrics is None:
231 metrics = [SimilarityMetric.ENTITY_OVERLAP, SimilarityMetric.TOPIC_OVERLAP,
232 SimilarityMetric.METADATA_SIMILARITY, SimilarityMetric.CONTENT_FEATURES]
234 start_time = time.time()
235 metric_scores = {}
237 # Calculate individual metric scores
238 for metric in metrics:
239 if metric == SimilarityMetric.ENTITY_OVERLAP:
240 metric_scores[metric] = self._calculate_entity_overlap(doc1, doc2)
241 elif metric == SimilarityMetric.TOPIC_OVERLAP:
242 metric_scores[metric] = self._calculate_topic_overlap(doc1, doc2)
243 elif metric == SimilarityMetric.METADATA_SIMILARITY:
244 metric_scores[metric] = self._calculate_metadata_similarity(doc1, doc2)
245 elif metric == SimilarityMetric.CONTENT_FEATURES:
246 metric_scores[metric] = self._calculate_content_features_similarity(doc1, doc2)
247 elif metric == SimilarityMetric.HIERARCHICAL_DISTANCE:
248 metric_scores[metric] = self._calculate_hierarchical_similarity(doc1, doc2)
249 elif metric == SimilarityMetric.SEMANTIC_SIMILARITY:
250 metric_scores[metric] = self._calculate_semantic_similarity(doc1, doc2)
252 # Calculate combined similarity score
253 combined_score = self._combine_metric_scores(metric_scores)
255 # Extract shared entities and topics
256 shared_entities = self._get_shared_entities(doc1, doc2)
257 shared_topics = self._get_shared_topics(doc1, doc2)
259 # Determine relationship type
260 relationship_type = self._determine_relationship_type(doc1, doc2, metric_scores)
262 processing_time = (time.time() - start_time) * 1000
263 self.logger.debug(f"Calculated similarity between documents in {processing_time:.2f}ms")
265 return DocumentSimilarity(
266 doc1_id=f"{doc1.source_type}:{doc1.source_title}",
267 doc2_id=f"{doc2.source_type}:{doc2.source_title}",
268 similarity_score=combined_score,
269 metric_scores=metric_scores,
270 shared_entities=shared_entities,
271 shared_topics=shared_topics,
272 relationship_type=relationship_type
273 )
275 def _calculate_entity_overlap(self, doc1: SearchResult, doc2: SearchResult) -> float:
276 """Calculate entity overlap between documents."""
277 entities1 = self._extract_entity_texts(doc1.entities)
278 entities2 = self._extract_entity_texts(doc2.entities)
280 if not entities1 and not entities2:
281 return 0.0
282 if not entities1 or not entities2:
283 return 0.0
285 # Jaccard similarity
286 intersection = len(set(entities1) & set(entities2))
287 union = len(set(entities1) | set(entities2))
289 return intersection / union if union > 0 else 0.0
291 def _calculate_topic_overlap(self, doc1: SearchResult, doc2: SearchResult) -> float:
292 """Calculate topic overlap between documents."""
293 topics1 = self._extract_topic_texts(doc1.topics)
294 topics2 = self._extract_topic_texts(doc2.topics)
296 if not topics1 and not topics2:
297 return 0.0
298 if not topics1 or not topics2:
299 return 0.0
301 # Jaccard similarity with topic weighting
302 intersection = len(set(topics1) & set(topics2))
303 union = len(set(topics1) | set(topics2))
305 return intersection / union if union > 0 else 0.0
307 def _calculate_metadata_similarity(self, doc1: SearchResult, doc2: SearchResult) -> float:
308 """Calculate metadata similarity between documents."""
309 similarity_factors = []
311 # Project similarity
312 if doc1.project_id and doc2.project_id:
313 if doc1.project_id == doc2.project_id:
314 similarity_factors.append(1.0)
315 else:
316 similarity_factors.append(0.0)
318 # Source type similarity
319 if doc1.source_type == doc2.source_type:
320 similarity_factors.append(0.5)
321 else:
322 similarity_factors.append(0.0)
324 # Content features similarity
325 features1 = [doc1.has_code_blocks, doc1.has_tables, doc1.has_images, doc1.has_links]
326 features2 = [doc2.has_code_blocks, doc2.has_tables, doc2.has_images, doc2.has_links]
327 feature_similarity = sum(f1 == f2 for f1, f2 in zip(features1, features2)) / len(features1)
328 similarity_factors.append(feature_similarity)
330 # Word count similarity (normalized)
331 if doc1.word_count and doc2.word_count:
332 min_words = min(doc1.word_count, doc2.word_count)
333 max_words = max(doc1.word_count, doc2.word_count)
334 word_similarity = min_words / max_words if max_words > 0 else 0.0
335 similarity_factors.append(word_similarity)
337 return sum(similarity_factors) / len(similarity_factors) if similarity_factors else 0.0
339 def _calculate_content_features_similarity(self, doc1: SearchResult, doc2: SearchResult) -> float:
340 """Calculate content features similarity."""
341 # Compare read time ranges
342 read_time_similarity = 0.0
343 if doc1.estimated_read_time and doc2.estimated_read_time:
344 min_time = min(doc1.estimated_read_time, doc2.estimated_read_time)
345 max_time = max(doc1.estimated_read_time, doc2.estimated_read_time)
346 read_time_similarity = min_time / max_time if max_time > 0 else 0.0
348 # Compare section depth (hierarchical level)
349 depth_similarity = 0.0
350 if doc1.depth is not None and doc2.depth is not None:
351 depth_diff = abs(doc1.depth - doc2.depth)
352 depth_similarity = max(0.0, 1.0 - depth_diff / 5.0) # Normalize to 5 levels
354 # Compare content type features
355 feature_factors = [read_time_similarity, depth_similarity]
356 return sum(feature_factors) / len(feature_factors) if feature_factors else 0.0
358 def _calculate_hierarchical_similarity(self, doc1: SearchResult, doc2: SearchResult) -> float:
359 """Calculate hierarchical relationship similarity."""
360 # Check for direct parent-child relationship
361 if doc1.parent_id and doc1.parent_id == f"{doc2.source_type}:{doc2.source_title}":
362 return 1.0
363 if doc2.parent_id and doc2.parent_id == f"{doc1.source_type}:{doc1.source_title}":
364 return 1.0
366 # Check for sibling relationship (same parent)
367 if doc1.parent_id and doc2.parent_id and doc1.parent_id == doc2.parent_id:
368 return 0.8
370 # Check for breadcrumb overlap
371 if doc1.breadcrumb_text and doc2.breadcrumb_text:
372 breadcrumb1 = set(doc1.breadcrumb_text.split(" > "))
373 breadcrumb2 = set(doc2.breadcrumb_text.split(" > "))
375 if breadcrumb1 and breadcrumb2:
376 intersection = len(breadcrumb1 & breadcrumb2)
377 union = len(breadcrumb1 | breadcrumb2)
378 return intersection / union if union > 0 else 0.0
380 return 0.0
382 def _calculate_semantic_similarity(self, doc1: SearchResult, doc2: SearchResult) -> float:
383 """Calculate semantic similarity using spaCy."""
384 try:
385 # Use spaCy to analyze text similarity
386 doc1_analyzed = self.spacy_analyzer.nlp(doc1.text[:500]) # First 500 chars for performance
387 doc2_analyzed = self.spacy_analyzer.nlp(doc2.text[:500])
389 return doc1_analyzed.similarity(doc2_analyzed)
390 except Exception as e:
391 self.logger.warning(f"Failed to calculate semantic similarity: {e}")
392 return 0.0
394 def _combine_metric_scores(self, metric_scores: Dict[SimilarityMetric, float]) -> float:
395 """Combine multiple metric scores into final similarity score."""
396 if not metric_scores:
397 return 0.0
399 # Weighted combination of metrics
400 weights = {
401 SimilarityMetric.ENTITY_OVERLAP: 0.25,
402 SimilarityMetric.TOPIC_OVERLAP: 0.25,
403 SimilarityMetric.METADATA_SIMILARITY: 0.20,
404 SimilarityMetric.CONTENT_FEATURES: 0.15,
405 SimilarityMetric.HIERARCHICAL_DISTANCE: 0.10,
406 SimilarityMetric.SEMANTIC_SIMILARITY: 0.05
407 }
409 weighted_sum = 0.0
410 total_weight = 0.0
412 for metric, score in metric_scores.items():
413 weight = weights.get(metric, 0.1)
414 weighted_sum += score * weight
415 total_weight += weight
417 return weighted_sum / total_weight if total_weight > 0 else 0.0
419 def _get_shared_entities(self, doc1: SearchResult, doc2: SearchResult) -> List[str]:
420 """Get shared entities between documents."""
421 entities1 = self._extract_entity_texts(doc1.entities)
422 entities2 = self._extract_entity_texts(doc2.entities)
423 return list(set(entities1) & set(entities2))
425 def _get_shared_topics(self, doc1: SearchResult, doc2: SearchResult) -> List[str]:
426 """Get shared topics between documents."""
427 topics1 = self._extract_topic_texts(doc1.topics)
428 topics2 = self._extract_topic_texts(doc2.topics)
429 return list(set(topics1) & set(topics2))
431 def _extract_entity_texts(self, entities: List[Union[dict, str]]) -> List[str]:
432 """Extract entity text from various formats."""
433 texts = []
434 for entity in entities:
435 if isinstance(entity, dict):
436 texts.append(entity.get("text", "").lower())
437 elif isinstance(entity, str):
438 texts.append(entity.lower())
439 return [t for t in texts if t]
441 def _extract_topic_texts(self, topics: List[Union[dict, str]]) -> List[str]:
442 """Extract topic text from various formats."""
443 texts = []
444 for topic in topics:
445 if isinstance(topic, dict):
446 texts.append(topic.get("text", "").lower())
447 elif isinstance(topic, str):
448 texts.append(topic.lower())
449 return [t for t in texts if t]
451 def _determine_relationship_type(self, doc1: SearchResult, doc2: SearchResult,
452 metric_scores: Dict[SimilarityMetric, float]) -> RelationshipType:
453 """Determine the type of relationship between documents."""
454 # Check for hierarchical relationship
455 if (SimilarityMetric.HIERARCHICAL_DISTANCE in metric_scores and
456 metric_scores[SimilarityMetric.HIERARCHICAL_DISTANCE] > 0.7):
457 return RelationshipType.HIERARCHICAL
459 # Check for cross-references
460 if doc1.cross_references or doc2.cross_references:
461 return RelationshipType.CROSS_REFERENCE
463 # Check for project grouping
464 if doc1.project_id and doc2.project_id and doc1.project_id == doc2.project_id:
465 return RelationshipType.PROJECT_GROUPING
467 # Default to semantic similarity
468 return RelationshipType.SEMANTIC_SIMILARITY
471class DocumentClusterAnalyzer:
472 """Analyzes and creates clusters of related documents."""
474 def __init__(self, similarity_calculator: DocumentSimilarityCalculator):
475 """Initialize the cluster analyzer."""
476 self.similarity_calculator = similarity_calculator
477 self.logger = LoggingConfig.get_logger(__name__)
479 def create_clusters(self, documents: List[SearchResult],
480 strategy: ClusteringStrategy = ClusteringStrategy.MIXED_FEATURES,
481 max_clusters: int = 10,
482 min_cluster_size: int = 2) -> List[DocumentCluster]:
483 """Create document clusters using specified strategy."""
484 start_time = time.time()
486 if strategy == ClusteringStrategy.ENTITY_BASED:
487 clusters = self._cluster_by_entities(documents, max_clusters, min_cluster_size)
488 elif strategy == ClusteringStrategy.TOPIC_BASED:
489 clusters = self._cluster_by_topics(documents, max_clusters, min_cluster_size)
490 elif strategy == ClusteringStrategy.PROJECT_BASED:
491 clusters = self._cluster_by_projects(documents, max_clusters, min_cluster_size)
492 elif strategy == ClusteringStrategy.HIERARCHICAL:
493 clusters = self._cluster_by_hierarchy(documents, max_clusters, min_cluster_size)
494 elif strategy == ClusteringStrategy.MIXED_FEATURES:
495 clusters = self._cluster_by_mixed_features(documents, max_clusters, min_cluster_size)
496 else:
497 clusters = self._cluster_by_mixed_features(documents, max_clusters, min_cluster_size)
499 # Calculate coherence scores for clusters
500 for cluster in clusters:
501 cluster.coherence_score = self._calculate_cluster_coherence(cluster, documents)
502 cluster.representative_doc_id = self._find_representative_document(cluster, documents)
503 cluster.cluster_description = self._generate_cluster_description(cluster, documents)
505 processing_time = (time.time() - start_time) * 1000
506 self.logger.info(f"Created {len(clusters)} clusters using {strategy.value} in {processing_time:.2f}ms")
508 return clusters
510 def _cluster_by_entities(self, documents: List[SearchResult],
511 max_clusters: int, min_cluster_size: int) -> List[DocumentCluster]:
512 """Cluster documents based on shared entities."""
513 entity_groups = defaultdict(list)
515 # Group documents by their most common entities
516 for doc in documents:
517 doc_id = f"{doc.source_type}:{doc.source_title}"
518 entities = self.similarity_calculator._extract_entity_texts(doc.entities)
520 # Use most frequent entities as clustering key
521 entity_counter = Counter(entities)
522 top_entities = [entity for entity, _ in entity_counter.most_common(3)]
524 if top_entities:
525 cluster_key = "|".join(sorted(top_entities))
526 entity_groups[cluster_key].append(doc_id)
528 # Convert to DocumentCluster objects
529 clusters = []
530 for i, (entity_key, doc_ids) in enumerate(entity_groups.items()):
531 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
532 shared_entities = entity_key.split("|")
533 cluster = DocumentCluster(
534 cluster_id=f"entity_cluster_{i}",
535 name=f"Entity Cluster: {', '.join(shared_entities[:2])}",
536 documents=doc_ids,
537 shared_entities=shared_entities,
538 cluster_strategy=ClusteringStrategy.ENTITY_BASED
539 )
540 clusters.append(cluster)
542 return clusters
544 def _cluster_by_topics(self, documents: List[SearchResult],
545 max_clusters: int, min_cluster_size: int) -> List[DocumentCluster]:
546 """Cluster documents based on shared topics."""
547 topic_groups = defaultdict(list)
549 # Group documents by their most common topics
550 for doc in documents:
551 doc_id = f"{doc.source_type}:{doc.source_title}"
552 topics = self.similarity_calculator._extract_topic_texts(doc.topics)
554 # Use most frequent topics as clustering key
555 topic_counter = Counter(topics)
556 top_topics = [topic for topic, _ in topic_counter.most_common(3)]
558 if top_topics:
559 cluster_key = "|".join(sorted(top_topics))
560 topic_groups[cluster_key].append(doc_id)
562 # Convert to DocumentCluster objects
563 clusters = []
564 for i, (topic_key, doc_ids) in enumerate(topic_groups.items()):
565 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
566 shared_topics = topic_key.split("|")
567 cluster = DocumentCluster(
568 cluster_id=f"topic_cluster_{i}",
569 name=f"Topic Cluster: {', '.join(shared_topics[:2])}",
570 documents=doc_ids,
571 shared_topics=shared_topics,
572 cluster_strategy=ClusteringStrategy.TOPIC_BASED
573 )
574 clusters.append(cluster)
576 return clusters
578 def _cluster_by_projects(self, documents: List[SearchResult],
579 max_clusters: int, min_cluster_size: int) -> List[DocumentCluster]:
580 """Cluster documents based on project groupings."""
581 project_groups = defaultdict(list)
583 # Group documents by project
584 for doc in documents:
585 doc_id = f"{doc.source_type}:{doc.source_title}"
586 project_key = doc.project_id or "no_project"
587 project_groups[project_key].append(doc_id)
589 # Convert to DocumentCluster objects
590 clusters = []
591 for i, (project_key, doc_ids) in enumerate(project_groups.items()):
592 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
593 cluster = DocumentCluster(
594 cluster_id=f"project_cluster_{i}",
595 name=f"Project: {project_key}",
596 documents=doc_ids,
597 cluster_strategy=ClusteringStrategy.PROJECT_BASED
598 )
599 clusters.append(cluster)
601 return clusters
603 def _cluster_by_hierarchy(self, documents: List[SearchResult],
604 max_clusters: int, min_cluster_size: int) -> List[DocumentCluster]:
605 """Cluster documents based on hierarchical relationships."""
606 hierarchy_groups = defaultdict(list)
608 # Group documents by hierarchical context
609 for doc in documents:
610 doc_id = f"{doc.source_type}:{doc.source_title}"
612 # Use breadcrumb as clustering key
613 if doc.breadcrumb_text:
614 # Use first few levels of breadcrumb
615 breadcrumb_parts = doc.breadcrumb_text.split(" > ")
616 cluster_key = " > ".join(breadcrumb_parts[:2]) # First 2 levels
617 hierarchy_groups[cluster_key].append(doc_id)
618 else:
619 hierarchy_groups["root"].append(doc_id)
621 # Convert to DocumentCluster objects
622 clusters = []
623 for i, (hierarchy_key, doc_ids) in enumerate(hierarchy_groups.items()):
624 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
625 cluster = DocumentCluster(
626 cluster_id=f"hierarchy_cluster_{i}",
627 name=f"Hierarchy: {hierarchy_key}",
628 documents=doc_ids,
629 cluster_strategy=ClusteringStrategy.HIERARCHICAL
630 )
631 clusters.append(cluster)
633 return clusters
635 def _cluster_by_mixed_features(self, documents: List[SearchResult],
636 max_clusters: int, min_cluster_size: int) -> List[DocumentCluster]:
637 """Cluster documents using mixed features (entities + topics + project)."""
638 feature_groups = defaultdict(list)
640 # Group documents by combined features
641 for doc in documents:
642 doc_id = f"{doc.source_type}:{doc.source_title}"
644 # Combine key features
645 entities = self.similarity_calculator._extract_entity_texts(doc.entities)[:2]
646 topics = self.similarity_calculator._extract_topic_texts(doc.topics)[:2]
647 project = doc.project_id or "no_project"
649 # Create composite clustering key
650 feature_parts = []
651 if entities:
652 feature_parts.append(f"entities:{','.join(entities)}")
653 if topics:
654 feature_parts.append(f"topics:{','.join(topics)}")
655 feature_parts.append(f"project:{project}")
657 cluster_key = "|".join(feature_parts)
658 feature_groups[cluster_key].append(doc_id)
660 # Convert to DocumentCluster objects
661 clusters = []
662 for i, (feature_key, doc_ids) in enumerate(feature_groups.items()):
663 if len(doc_ids) >= min_cluster_size and len(clusters) < max_clusters:
664 # Parse shared features
665 shared_entities = []
666 shared_topics = []
668 for part in feature_key.split("|"):
669 if part.startswith("entities:"):
670 shared_entities = part.replace("entities:", "").split(",")
671 elif part.startswith("topics:"):
672 shared_topics = part.replace("topics:", "").split(",")
674 cluster = DocumentCluster(
675 cluster_id=f"mixed_cluster_{i}",
676 name=f"Mixed Cluster {i+1}",
677 documents=doc_ids,
678 shared_entities=[e for e in shared_entities if e],
679 shared_topics=[t for t in shared_topics if t],
680 cluster_strategy=ClusteringStrategy.MIXED_FEATURES
681 )
682 clusters.append(cluster)
684 return clusters
686 def _calculate_cluster_coherence(self, cluster: DocumentCluster,
687 all_documents: List[SearchResult]) -> float:
688 """Calculate coherence score for a cluster."""
689 if len(cluster.documents) < 2:
690 return 1.0
692 # Find documents in this cluster
693 cluster_docs = []
694 doc_lookup = {f"{doc.source_type}:{doc.source_title}": doc for doc in all_documents}
696 for doc_id in cluster.documents:
697 if doc_id in doc_lookup:
698 cluster_docs.append(doc_lookup[doc_id])
700 if len(cluster_docs) < 2:
701 return 0.0
703 # Calculate pairwise similarities within cluster
704 similarities = []
705 for i in range(len(cluster_docs)):
706 for j in range(i + 1, len(cluster_docs)):
707 similarity = self.similarity_calculator.calculate_similarity(
708 cluster_docs[i], cluster_docs[j]
709 )
710 similarities.append(similarity.similarity_score)
712 # Return average similarity as coherence score
713 return sum(similarities) / len(similarities) if similarities else 0.0
715 def _find_representative_document(self, cluster: DocumentCluster,
716 all_documents: List[SearchResult]) -> str:
717 """Find the most representative document in a cluster."""
718 if not cluster.documents:
719 return ""
721 # For now, return the first document
722 # Could be enhanced to find document with highest centrality
723 return cluster.documents[0]
725 def _generate_cluster_description(self, cluster: DocumentCluster,
726 all_documents: List[SearchResult]) -> str:
727 """Generate a description for the cluster."""
728 descriptions = []
730 if cluster.shared_entities:
731 descriptions.append(f"Documents related to {', '.join(cluster.shared_entities[:2])}")
733 if cluster.shared_topics:
734 descriptions.append(f"Topics: {', '.join(cluster.shared_topics[:2])}")
736 descriptions.append(f"Strategy: {cluster.cluster_strategy.value}")
737 descriptions.append(f"{len(cluster.documents)} documents")
739 return "; ".join(descriptions)
742class CitationNetworkAnalyzer:
743 """Analyzes citation and reference networks between documents."""
745 def __init__(self):
746 """Initialize the citation network analyzer."""
747 self.logger = LoggingConfig.get_logger(__name__)
749 def build_citation_network(self, documents: List[SearchResult]) -> CitationNetwork:
750 """Build citation network from document cross-references and hierarchical relationships."""
751 start_time = time.time()
753 network = CitationNetwork()
754 doc_lookup = {f"{doc.source_type}:{doc.source_title}": doc for doc in documents}
756 # Add nodes to the network
757 for doc in documents:
758 doc_id = f"{doc.source_type}:{doc.source_title}"
759 network.nodes[doc_id] = {
760 "title": doc.source_title,
761 "source_type": doc.source_type,
762 "project_id": doc.project_id,
763 "word_count": doc.word_count or 0,
764 "has_code": doc.has_code_blocks,
765 "has_tables": doc.has_tables,
766 "depth": doc.depth or 0,
767 "creation_date": getattr(doc, 'created_at', None),
768 }
770 # Add edges based on cross-references
771 for doc in documents:
772 doc_id = f"{doc.source_type}:{doc.source_title}"
774 # Process cross-references
775 if doc.cross_references:
776 for ref in doc.cross_references:
777 target_url = ref.get("url", "") if isinstance(ref, dict) else ""
778 ref_text = ref.get("text", "") if isinstance(ref, dict) else str(ref)
780 # Try to find referenced document
781 target_doc_id = self._find_referenced_document(target_url, doc_lookup)
782 if target_doc_id and target_doc_id != doc_id:
783 network.edges.append((doc_id, target_doc_id, {
784 "relation_type": "cross_reference",
785 "reference_text": ref_text,
786 "reference_url": target_url,
787 "weight": 1.0
788 }))
790 # Add hierarchical relationships
791 if doc.parent_id and doc.parent_id in doc_lookup:
792 network.edges.append((doc.parent_id, doc_id, {
793 "relation_type": "hierarchical_child",
794 "weight": 2.0 # Higher weight for hierarchical relationships
795 }))
797 # Add sibling relationships
798 if doc.sibling_sections:
799 for sibling in doc.sibling_sections:
800 sibling_doc_id = self._find_sibling_document(sibling, doc_lookup)
801 if sibling_doc_id and sibling_doc_id != doc_id:
802 network.edges.append((doc_id, sibling_doc_id, {
803 "relation_type": "sibling",
804 "weight": 0.5
805 }))
807 # Build NetworkX graph and calculate centrality scores
808 network.build_graph()
809 network.calculate_centrality_scores()
811 processing_time = (time.time() - start_time) * 1000
812 self.logger.info(f"Built citation network with {len(network.nodes)} nodes and {len(network.edges)} edges in {processing_time:.2f}ms")
814 return network
816 def _find_referenced_document(self, reference_url: str, doc_lookup: Dict[str, SearchResult]) -> Optional[str]:
817 """Find document that matches a reference URL."""
818 if not reference_url:
819 return None
821 # Try exact URL match first
822 for doc_id, doc in doc_lookup.items():
823 if doc.source_url and reference_url in doc.source_url:
824 return doc_id
826 # Try title-based matching for internal references
827 for doc_id, doc in doc_lookup.items():
828 if reference_url.lower() in doc.source_title.lower():
829 return doc_id
831 return None
833 def _find_sibling_document(self, sibling_reference: str, doc_lookup: Dict[str, SearchResult]) -> Optional[str]:
834 """Find document that matches a sibling reference."""
835 # Try title-based matching
836 for doc_id, doc in doc_lookup.items():
837 if sibling_reference.lower() in doc.source_title.lower():
838 return doc_id
840 return None
842 def get_most_authoritative_documents(self, network: CitationNetwork, limit: int = 10) -> List[Tuple[str, float]]:
843 """Get the most authoritative documents based on citation analysis."""
844 if not network.authority_scores:
845 return []
847 # Sort by authority score
848 sorted_docs = sorted(network.authority_scores.items(), key=lambda x: x[1], reverse=True)
849 return sorted_docs[:limit]
851 def get_most_connected_documents(self, network: CitationNetwork, limit: int = 10) -> List[Tuple[str, int]]:
852 """Get the most connected documents based on degree centrality."""
853 if not network.graph:
854 return []
856 # Calculate degree centrality
857 degree_centrality = dict(network.graph.degree())
858 sorted_docs = sorted(degree_centrality.items(), key=lambda x: x[1], reverse=True)
859 return sorted_docs[:limit]
862class ComplementaryContentFinder:
863 """Finds complementary content that would enhance understanding of a target document."""
865 def __init__(self, similarity_calculator: DocumentSimilarityCalculator,
866 knowledge_graph: Optional[DocumentKnowledgeGraph] = None):
867 """Initialize the complementary content finder."""
868 self.similarity_calculator = similarity_calculator
869 self.knowledge_graph = knowledge_graph
870 self.logger = LoggingConfig.get_logger(__name__)
872 def find_complementary_content(self, target_doc: SearchResult,
873 candidate_docs: List[SearchResult],
874 max_recommendations: int = 5) -> ComplementaryContent:
875 """Find complementary content for a target document."""
876 start_time = time.time()
878 recommendations = []
879 target_doc_id = f"{target_doc.source_type}:{target_doc.source_title}"
881 self.logger.info(f"Finding complementary content for target: {target_doc_id}")
882 self.logger.info(f"Target doc topics: {target_doc.topics}")
883 self.logger.info(f"Target doc entities: {target_doc.entities}")
884 self.logger.info(f"Analyzing {len(candidate_docs)} candidate documents")
886 for candidate in candidate_docs:
887 candidate_id = f"{candidate.source_type}:{candidate.source_title}"
889 if candidate_id == target_doc_id:
890 continue
892 self.logger.debug(f"Analyzing candidate: {candidate_id}")
893 self.logger.debug(f"Candidate topics: {candidate.topics}")
894 self.logger.debug(f"Candidate entities: {candidate.entities}")
896 # Calculate complementary score
897 complementary_score, reason = self._calculate_complementary_score(target_doc, candidate)
899 self.logger.info(f"Complementary score for {candidate_id}: {complementary_score:.3f} - {reason}")
901 if complementary_score > 0.15: # Lowered threshold for complementary content
902 recommendations.append((candidate_id, complementary_score, reason))
903 else:
904 # Log why it didn't make the cut
905 self.logger.debug(f"Rejected {candidate_id}: score {complementary_score:.3f} below threshold 0.15")
907 # Sort by complementary score
908 recommendations.sort(key=lambda x: x[1], reverse=True)
910 processing_time = (time.time() - start_time) * 1000
911 self.logger.info(f"Found {len(recommendations)} complementary recommendations in {processing_time:.2f}ms")
913 return ComplementaryContent(
914 target_doc_id=target_doc_id,
915 recommendations=recommendations[:max_recommendations],
916 recommendation_strategy="mixed"
917 )
919 def _calculate_complementary_score(self, target_doc: SearchResult,
920 candidate_doc: SearchResult) -> Tuple[float, str]:
921 """Calculate how complementary a candidate document is to the target.
923 Redesigned algorithm that prioritizes intra-project relationships while
924 maintaining intelligent inter-project discovery capabilities.
925 """
926 self.logger.info(f"=== Scoring {candidate_doc.source_title} against {target_doc.source_title} ===")
928 same_project = (target_doc.project_id == candidate_doc.project_id)
929 self.logger.info(f"Project context: target={target_doc.project_id}, candidate={candidate_doc.project_id}, same_project={same_project}")
931 if same_project:
932 # Prioritize intra-project relationships
933 score, reason = self._score_intra_project_complementary(target_doc, candidate_doc)
935 # Boost for high topic relevance within project
936 if score > 0 and self._has_high_topic_overlap(target_doc, candidate_doc):
937 boosted_score = min(0.95, score * 1.2)
938 self.logger.info(f"✓ Intra-project topic boost: {score:.3f} → {boosted_score:.3f}")
939 score = boosted_score
940 reason = f"{reason} (high topic relevance)"
942 else:
943 # Evaluate inter-project relationships
944 score, reason = self._score_inter_project_complementary(target_doc, candidate_doc)
946 # Apply cross-project penalty (inter-project content is less immediately useful)
947 if score > 0:
948 adjusted_score = score * 0.8
949 self.logger.info(f"✓ Inter-project penalty applied: {score:.3f} → {adjusted_score:.3f}")
950 score = adjusted_score
951 reason = f"Inter-project: {reason}"
953 self.logger.info(f"Final complementary score: {score:.3f} for {candidate_doc.source_title} - {reason}")
954 return score, reason
956 def _score_intra_project_complementary(self, target_doc: SearchResult, candidate_doc: SearchResult) -> Tuple[float, str]:
957 """Score complementary relationships within the same project."""
958 factors = []
960 # A. Requirements ↔ Implementation Chain
961 if self._is_requirements_implementation_pair(target_doc, candidate_doc):
962 factors.append((0.85, "Requirements-implementation chain"))
963 self.logger.info("✓ Found requirements-implementation pair")
965 # B. Abstraction Level Differences
966 abstraction_gap = self._calculate_abstraction_gap(target_doc, candidate_doc)
967 if abstraction_gap > 0:
968 score = 0.7 + (abstraction_gap * 0.1)
969 factors.append((score, f"Different abstraction levels (gap: {abstraction_gap})"))
970 self.logger.info(f"✓ Abstraction gap: {abstraction_gap} → score: {score:.3f}")
972 # C. Cross-Functional Perspectives
973 if self._has_cross_functional_relationship(target_doc, candidate_doc):
974 factors.append((0.75, "Cross-functional perspectives"))
975 self.logger.info("✓ Cross-functional relationship detected")
977 # D. Topic Overlap with Different Document Types
978 if (self._has_shared_topics(target_doc, candidate_doc) and
979 self._has_different_document_types(target_doc, candidate_doc)):
980 shared_topics = self._get_shared_topics_count(target_doc, candidate_doc)
981 score = min(0.65, 0.35 + (shared_topics * 0.1))
982 factors.append((score, f"Same topics, different document types ({shared_topics} topics)"))
983 self.logger.info(f"✓ Topic overlap with different doc types: {score:.3f}")
985 return self._calculate_weighted_score(factors, target_doc, candidate_doc)
987 def _score_inter_project_complementary(self, target_doc: SearchResult, candidate_doc: SearchResult) -> Tuple[float, str]:
988 """Score complementary relationships between different projects."""
989 factors = []
991 # A. Similar Challenges/Solutions
992 if self._has_similar_challenges(target_doc, candidate_doc):
993 factors.append((0.8, "Similar challenges/solutions"))
994 self.logger.info("✓ Similar challenges detected")
996 # B. Domain Expertise Transfer
997 if self._has_transferable_domain_knowledge(target_doc, candidate_doc):
998 factors.append((0.75, "Transferable domain knowledge"))
999 self.logger.info("✓ Transferable domain knowledge")
1001 # C. Architectural Patterns
1002 if self._has_reusable_architecture_patterns(target_doc, candidate_doc):
1003 factors.append((0.7, "Reusable architecture patterns"))
1004 self.logger.info("✓ Architecture patterns detected")
1006 # D. Shared Technologies/Standards
1007 if self._has_shared_technologies(target_doc, candidate_doc):
1008 shared_count = self._get_shared_technologies_count(target_doc, candidate_doc)
1009 score = min(0.6, 0.3 + (shared_count * 0.1))
1010 factors.append((score, f"Shared technologies ({shared_count} common)"))
1011 self.logger.info(f"✓ Shared technologies: {score:.3f}")
1013 return self._calculate_weighted_score(factors, target_doc, candidate_doc)
1015 def _calculate_weighted_score(self, factors: List[Tuple[float, str]], target_doc: SearchResult = None, candidate_doc: SearchResult = None) -> Tuple[float, str]:
1016 """Calculate weighted score from multiple factors."""
1017 if not factors:
1018 if target_doc and candidate_doc:
1019 return self._enhanced_fallback_scoring(target_doc, candidate_doc)
1020 else:
1021 return 0.0, "No complementary relationship found"
1023 # Use the highest scoring factor as primary, but consider multiple factors
1024 factors.sort(key=lambda x: x[0], reverse=True)
1025 primary_score, primary_reason = factors[0]
1027 # Boost if multiple factors contribute
1028 if len(factors) > 1:
1029 secondary_boost = sum(score for score, _ in factors[1:]) * 0.1
1030 final_score = min(0.95, primary_score + secondary_boost)
1031 primary_reason = f"{primary_reason} (+{len(factors)-1} other factors)"
1032 else:
1033 final_score = primary_score
1035 return final_score, primary_reason
1037 def _is_requirements_implementation_pair(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1038 """Detect if documents form a requirements -> implementation chain."""
1039 req_keywords = ["requirements", "specification", "user story", "feature", "functional"]
1040 impl_keywords = ["implementation", "technical", "architecture", "api", "code", "development"]
1042 title1 = doc1.source_title.lower()
1043 title2 = doc2.source_title.lower()
1045 doc1_is_req = any(keyword in title1 for keyword in req_keywords)
1046 doc1_is_impl = any(keyword in title1 for keyword in impl_keywords)
1047 doc2_is_req = any(keyword in title2 for keyword in req_keywords)
1048 doc2_is_impl = any(keyword in title2 for keyword in impl_keywords)
1050 # One is requirements, other is implementation
1051 return ((doc1_is_req and doc2_is_impl) or (doc1_is_impl and doc2_is_req)) and \
1052 (self._has_shared_topics(doc1, doc2) or self._has_shared_entities(doc1, doc2))
1054 def _calculate_abstraction_gap(self, doc1: SearchResult, doc2: SearchResult) -> int:
1055 """Calculate difference in abstraction levels (0-3).
1056 0: Same level, 3: Maximum gap (e.g., epic vs implementation detail)
1057 """
1058 level1 = self._get_abstraction_level(doc1)
1059 level2 = self._get_abstraction_level(doc2)
1060 return abs(level1 - level2)
1062 def _get_abstraction_level(self, doc: SearchResult) -> int:
1063 """Determine abstraction level of document (0=highest, 3=lowest)."""
1064 title = doc.source_title.lower()
1066 # Level 0: High-level business/strategy
1067 if any(keyword in title for keyword in ["strategy", "vision", "overview", "executive", "business case"]):
1068 return 0
1070 # Level 1: Requirements/features
1071 if any(keyword in title for keyword in ["requirements", "features", "user story", "epic", "specification"]):
1072 return 1
1074 # Level 2: Design/architecture
1075 if any(keyword in title for keyword in ["design", "architecture", "workflow", "process", "wireframe"]):
1076 return 2
1078 # Level 3: Implementation details
1079 if any(keyword in title for keyword in ["implementation", "code", "api", "technical", "development", "configuration"]):
1080 return 3
1082 # Default to middle level
1083 return 2
1085 def _has_cross_functional_relationship(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1086 """Detect business + technical, feature + security, etc."""
1087 business_keywords = ["business", "user", "requirements", "workflow", "process", "feature"]
1088 technical_keywords = ["technical", "architecture", "api", "implementation", "code", "development"]
1089 security_keywords = ["security", "authentication", "authorization", "compliance", "audit"]
1091 title1 = doc1.source_title.lower()
1092 title2 = doc2.source_title.lower()
1094 # Business + Technical
1095 if (any(k in title1 for k in business_keywords) and any(k in title2 for k in technical_keywords)) or \
1096 (any(k in title2 for k in business_keywords) and any(k in title1 for k in technical_keywords)):
1097 return True
1099 # Feature + Security
1100 if (any(k in title1 for k in ["feature", "functionality"]) and any(k in title2 for k in security_keywords)) or \
1101 (any(k in title2 for k in ["feature", "functionality"]) and any(k in title1 for k in security_keywords)):
1102 return True
1104 return False
1106 def _has_different_document_types(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1107 """Check if documents are of different types based on content and title."""
1108 type1 = self._classify_document_type(doc1)
1109 type2 = self._classify_document_type(doc2)
1110 return type1 != type2
1112 def _classify_document_type(self, doc: SearchResult) -> str:
1113 """Classify document as: user_story, technical_spec, architecture, compliance, testing, etc."""
1114 title = doc.source_title.lower()
1116 # Check more specific categories first to avoid conflicts
1117 if any(keyword in title for keyword in ["security", "compliance", "audit", "policy"]):
1118 return "compliance"
1119 elif any(keyword in title for keyword in ["test", "testing", "qa", "quality"]):
1120 return "testing"
1121 elif any(keyword in title for keyword in ["user story", "epic", "feature"]):
1122 return "user_story"
1123 elif any(keyword in title for keyword in ["technical", "specification", "api", "implementation"]):
1124 return "technical_spec"
1125 elif any(keyword in title for keyword in ["architecture", "design", "system"]):
1126 return "architecture"
1127 elif any(keyword in title for keyword in ["workflow", "process", "procedure"]):
1128 return "process"
1129 elif any(keyword in title for keyword in ["requirement"]): # More general, check last
1130 return "user_story"
1131 else:
1132 return "general"
1134 def _has_high_topic_overlap(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1135 """Check if documents have high topic overlap (>= 3 shared topics)."""
1136 return self._get_shared_topics_count(doc1, doc2) >= 3
1138 def _has_similar_challenges(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1139 """Identify common challenge patterns (auth, scalability, compliance)."""
1140 challenge_patterns = [
1141 ["authentication", "login", "auth", "signin"],
1142 ["scalability", "performance", "optimization", "scale"],
1143 ["compliance", "regulation", "audit", "governance"],
1144 ["integration", "api", "interface", "connection"],
1145 ["security", "privacy", "protection", "safety"],
1146 ["migration", "upgrade", "transition", "conversion"]
1147 ]
1149 title1 = doc1.source_title.lower()
1150 title2 = doc2.source_title.lower()
1152 for pattern in challenge_patterns:
1153 if (any(keyword in title1 for keyword in pattern) and
1154 any(keyword in title2 for keyword in pattern)):
1155 return True
1157 return False
1159 def _has_transferable_domain_knowledge(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1160 """Check for transferable domain expertise between projects."""
1161 # This is a simplified implementation - could be enhanced with NLP
1162 domain_keywords = [
1163 ["healthcare", "medical", "patient", "clinical"],
1164 ["finance", "payment", "banking", "financial"],
1165 ["ecommerce", "retail", "shopping", "commerce"],
1166 ["education", "learning", "student", "academic"],
1167 ["iot", "device", "sensor", "embedded"],
1168 ["mobile", "app", "ios", "android"]
1169 ]
1171 title1 = doc1.source_title.lower()
1172 title2 = doc2.source_title.lower()
1174 for domain in domain_keywords:
1175 if (any(keyword in title1 for keyword in domain) and
1176 any(keyword in title2 for keyword in domain)):
1177 return True
1179 return False
1181 def _has_reusable_architecture_patterns(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1182 """Identify architectural patterns that are reusable across projects."""
1183 architecture_patterns = [
1184 ["microservices", "service", "microservice"],
1185 ["api", "rest", "graphql", "endpoint"],
1186 ["database", "data", "storage", "persistence"],
1187 ["authentication", "auth", "identity", "oauth"],
1188 ["messaging", "queue", "event", "pub-sub"],
1189 ["cache", "caching", "redis", "memory"],
1190 ["monitoring", "logging", "observability", "metrics"]
1191 ]
1193 title1 = doc1.source_title.lower()
1194 title2 = doc2.source_title.lower()
1196 for pattern in architecture_patterns:
1197 if (any(keyword in title1 for keyword in pattern) and
1198 any(keyword in title2 for keyword in pattern)):
1199 return True
1201 return False
1203 def _has_shared_technologies(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1204 """Identify shared technologies, frameworks, standards."""
1205 tech_patterns = [
1206 ["react", "angular", "vue", "frontend"],
1207 ["node", "python", "java", "golang"],
1208 ["docker", "kubernetes", "container"],
1209 ["aws", "azure", "gcp", "cloud"],
1210 ["postgres", "mysql", "mongodb", "database"],
1211 ["jwt", "oauth", "saml", "authentication"],
1212 ["rest", "graphql", "grpc", "api"]
1213 ]
1215 title1 = doc1.source_title.lower()
1216 title2 = doc2.source_title.lower()
1218 for tech in tech_patterns:
1219 if (any(keyword in title1 for keyword in tech) and
1220 any(keyword in title2 for keyword in tech)):
1221 return True
1223 return False
1225 def _get_shared_technologies_count(self, doc1: SearchResult, doc2: SearchResult) -> int:
1226 """Count shared technologies between documents."""
1227 # Simplified implementation based on title analysis
1228 tech_keywords = ["react", "angular", "vue", "node", "python", "java", "docker",
1229 "kubernetes", "aws", "azure", "postgres", "mysql", "jwt", "oauth"]
1231 title1_words = set(doc1.source_title.lower().split())
1232 title2_words = set(doc2.source_title.lower().split())
1234 shared_tech = 0
1235 for tech in tech_keywords:
1236 if tech in title1_words and tech in title2_words:
1237 shared_tech += 1
1239 return shared_tech
1241 def _enhanced_fallback_scoring(self, target_doc: SearchResult, candidate_doc: SearchResult) -> Tuple[float, str]:
1242 """Enhanced fallback when advanced algorithms don't apply."""
1243 fallback_score = self._calculate_fallback_score(target_doc, candidate_doc)
1244 if fallback_score > 0:
1245 return fallback_score, "Basic content similarity"
1246 else:
1247 return 0.0, "No complementary relationship found"
1249 def _calculate_fallback_score(self, target_doc: SearchResult, candidate_doc: SearchResult) -> float:
1250 """Fallback scoring for when advanced methods don't find relationships."""
1251 score = 0.0
1253 # Just having any shared topics at all
1254 if self._has_shared_topics(target_doc, candidate_doc):
1255 shared_count = self._get_shared_topics_count(target_doc, candidate_doc)
1256 score = max(score, 0.2 + (shared_count * 0.05))
1257 self.logger.debug(f"Fallback: {shared_count} shared topics → score: {score:.3f}")
1259 # Just having any shared entities at all
1260 if self._has_shared_entities(target_doc, candidate_doc):
1261 shared_count = self._get_shared_entities_count(target_doc, candidate_doc)
1262 score = max(score, 0.15 + (shared_count * 0.05))
1263 self.logger.debug(f"Fallback: {shared_count} shared entities → score: {score:.3f}")
1265 # Simple keyword overlap in titles
1266 target_words = set(target_doc.source_title.lower().split())
1267 candidate_words = set(candidate_doc.source_title.lower().split())
1268 common_words = target_words & candidate_words
1269 if len(common_words) > 1: # More than just common words like "the", "and"
1270 score = max(score, 0.1 + (len(common_words) * 0.02))
1271 self.logger.debug(f"Fallback: {len(common_words)} common words in titles → score: {score:.3f}")
1273 return min(score, 0.5) # Cap fallback scores
1275 def _has_shared_entities(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1276 """Check if documents have shared entities."""
1277 entities1 = self.similarity_calculator._extract_entity_texts(doc1.entities)
1278 entities2 = self.similarity_calculator._extract_entity_texts(doc2.entities)
1279 return len(set(entities1) & set(entities2)) > 0
1281 def _has_shared_topics(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1282 """Check if documents have shared topics."""
1283 topics1 = self.similarity_calculator._extract_topic_texts(doc1.topics)
1284 topics2 = self.similarity_calculator._extract_topic_texts(doc2.topics)
1285 return len(set(topics1) & set(topics2)) > 0
1289 def _get_shared_topics_count(self, doc1: SearchResult, doc2: SearchResult) -> int:
1290 """Get the count of shared topics between documents."""
1291 topics1 = self.similarity_calculator._extract_topic_texts(doc1.topics)
1292 topics2 = self.similarity_calculator._extract_topic_texts(doc2.topics)
1293 return len(set(topics1) & set(topics2))
1295 def _get_shared_entities_count(self, doc1: SearchResult, doc2: SearchResult) -> int:
1296 """Get the count of shared entities between documents."""
1297 entities1 = self.similarity_calculator._extract_entity_texts(doc1.entities)
1298 entities2 = self.similarity_calculator._extract_entity_texts(doc2.entities)
1299 return len(set(entities1) & set(entities2))
1301 def _has_different_content_complexity(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1302 """Check if documents have different levels of content complexity."""
1303 # Compare word counts if available
1304 if doc1.word_count and doc2.word_count:
1305 ratio = max(doc1.word_count, doc2.word_count) / min(doc1.word_count, doc2.word_count)
1306 if ratio > 2.0: # One document is significantly longer
1307 return True
1309 # Compare content features
1310 features1 = (doc1.has_code_blocks, doc1.has_tables, doc1.has_images)
1311 features2 = (doc2.has_code_blocks, doc2.has_tables, doc2.has_images)
1313 # Different if one has technical content and the other doesn't
1314 return features1 != features2
1316 def _get_complementary_content_type_score(self, target_doc: SearchResult, candidate_doc: SearchResult) -> float:
1317 """Calculate score based on complementary content types."""
1318 score = 0.0
1320 # Technical + Business complement
1321 technical_keywords = ["api", "code", "implementation", "technical", "development", "architecture"]
1322 business_keywords = ["requirements", "business", "specification", "user", "workflow", "process"]
1324 target_title = target_doc.source_title.lower()
1325 candidate_title = candidate_doc.source_title.lower()
1327 target_is_technical = any(keyword in target_title for keyword in technical_keywords)
1328 target_is_business = any(keyword in target_title for keyword in business_keywords)
1329 candidate_is_technical = any(keyword in candidate_title for keyword in technical_keywords)
1330 candidate_is_business = any(keyword in candidate_title for keyword in business_keywords)
1332 # Technical document + Business document = complementary
1333 if (target_is_technical and candidate_is_business) or (target_is_business and candidate_is_technical):
1334 score = 0.7
1336 # Documentation + Implementation complement
1337 if ("documentation" in target_title and "implementation" in candidate_title) or \
1338 ("implementation" in target_title and "documentation" in candidate_title):
1339 score = max(score, 0.6)
1341 # Requirements + Design complement
1342 if ("requirements" in target_title and ("design" in candidate_title or "architecture" in candidate_title)) or \
1343 (("design" in target_title or "architecture" in target_title) and "requirements" in candidate_title):
1344 score = max(score, 0.6)
1346 return score
1351class ConflictDetector:
1352 """Detects conflicting information between documents."""
1354 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer):
1355 """Initialize the conflict detector."""
1356 self.spacy_analyzer = spacy_analyzer
1357 self.logger = LoggingConfig.get_logger(__name__)
1359 def detect_conflicts(self, documents: List[SearchResult]) -> ConflictAnalysis:
1360 """Detect conflicts between documents."""
1361 start_time = time.time()
1363 conflicts = ConflictAnalysis()
1365 # Compare documents pairwise for conflicts
1366 for i in range(len(documents)):
1367 for j in range(i + 1, len(documents)):
1368 doc1, doc2 = documents[i], documents[j]
1370 conflict_info = self._analyze_document_pair_for_conflicts(doc1, doc2)
1371 if conflict_info:
1372 doc1_id = f"{doc1.source_type}:{doc1.source_title}"
1373 doc2_id = f"{doc2.source_type}:{doc2.source_title}"
1374 conflicts.conflicting_pairs.append((doc1_id, doc2_id, conflict_info))
1376 # Categorize conflict
1377 conflict_type = conflict_info.get("type", "general")
1378 if conflict_type not in conflicts.conflict_categories:
1379 conflicts.conflict_categories[conflict_type] = []
1380 conflicts.conflict_categories[conflict_type].append((doc1_id, doc2_id))
1382 # Generate resolution suggestions
1383 conflicts.resolution_suggestions = self._generate_resolution_suggestions(conflicts)
1385 processing_time = (time.time() - start_time) * 1000
1386 self.logger.info(f"Detected {len(conflicts.conflicting_pairs)} conflicts in {processing_time:.2f}ms")
1388 return conflicts
1390 def _analyze_document_pair_for_conflicts(self, doc1: SearchResult,
1391 doc2: SearchResult) -> Optional[Dict[str, Any]]:
1392 """Analyze a pair of documents for potential conflicts."""
1393 # Only analyze documents that share some context (same project, entities, topics)
1394 if not self._should_analyze_for_conflicts(doc1, doc2):
1395 return None
1397 conflict_indicators = []
1399 # Check for contradictory information patterns
1400 contradiction_patterns = self._find_contradiction_patterns(doc1, doc2)
1401 if contradiction_patterns:
1402 conflict_indicators.extend(contradiction_patterns)
1404 # Check for version conflicts
1405 version_conflicts = self._detect_version_conflicts(doc1, doc2)
1406 if version_conflicts:
1407 conflict_indicators.extend(version_conflicts)
1409 # Check for procedural conflicts
1410 procedural_conflicts = self._detect_procedural_conflicts(doc1, doc2)
1411 if procedural_conflicts:
1412 conflict_indicators.extend(procedural_conflicts)
1414 if conflict_indicators:
1415 return {
1416 "type": self._categorize_conflict(conflict_indicators),
1417 "indicators": conflict_indicators,
1418 "confidence": self._calculate_conflict_confidence(conflict_indicators),
1419 "description": self._describe_conflict(conflict_indicators)
1420 }
1422 return None
1424 def _should_analyze_for_conflicts(self, doc1: SearchResult, doc2: SearchResult) -> bool:
1425 """Determine if two documents should be analyzed for conflicts."""
1426 # Same project
1427 if doc1.project_id and doc2.project_id and doc1.project_id == doc2.project_id:
1428 return True
1430 # Shared entities
1431 entities1 = self._extract_entity_texts(doc1.entities)
1432 entities2 = self._extract_entity_texts(doc2.entities)
1433 if len(set(entities1) & set(entities2)) > 0:
1434 return True
1436 # Shared topics
1437 topics1 = self._extract_topic_texts(doc1.topics)
1438 topics2 = self._extract_topic_texts(doc2.topics)
1439 if len(set(topics1) & set(topics2)) > 0:
1440 return True
1442 return False
1444 def _find_contradiction_patterns(self, doc1: SearchResult, doc2: SearchResult) -> List[str]:
1445 """Find textual patterns that suggest contradictions."""
1446 patterns = []
1448 # Look for opposing statements (simplified)
1449 opposing_keywords = [
1450 ("should", "should not"), ("enabled", "disabled"), ("true", "false"),
1451 ("required", "optional"), ("always", "never"), ("use", "avoid")
1452 ]
1454 text1_lower = doc1.text.lower()
1455 text2_lower = doc2.text.lower()
1457 for positive, negative in opposing_keywords:
1458 if positive in text1_lower and negative in text2_lower:
1459 patterns.append(f"Contradictory guidance: '{positive}' vs '{negative}'")
1460 elif negative in text1_lower and positive in text2_lower:
1461 patterns.append(f"Contradictory guidance: '{negative}' vs '{positive}'")
1463 return patterns
1465 def _detect_version_conflicts(self, doc1: SearchResult, doc2: SearchResult) -> List[str]:
1466 """Detect version-related conflicts."""
1467 conflicts = []
1469 # Check for different version numbers in similar contexts
1470 import re
1471 version_pattern = r'v?\d+\.\d+(?:\.\d+)?'
1473 versions1 = re.findall(version_pattern, doc1.text)
1474 versions2 = re.findall(version_pattern, doc2.text)
1476 if versions1 and versions2 and set(versions1) != set(versions2):
1477 conflicts.append(f"Version mismatch: {versions1} vs {versions2}")
1479 return conflicts
1481 def _detect_procedural_conflicts(self, doc1: SearchResult, doc2: SearchResult) -> List[str]:
1482 """Detect conflicts in procedural information."""
1483 conflicts = []
1485 # Look for step-by-step procedures that differ
1486 step_pattern = r'step \d+|step-\d+|\d+\.'
1488 if ("step" in doc1.text.lower() and "step" in doc2.text.lower()):
1489 # Simplified check for different procedure patterns
1490 if len(doc1.text.split("step")) != len(doc2.text.split("step")):
1491 conflicts.append("Different number of procedural steps")
1493 return conflicts
1495 def _categorize_conflict(self, indicators: List[str]) -> str:
1496 """Categorize the type of conflict."""
1497 indicator_text = " ".join(indicators).lower()
1499 if "version" in indicator_text:
1500 return "version_conflict"
1501 elif "step" in indicator_text or "procedure" in indicator_text:
1502 return "procedural_conflict"
1503 elif "guidance" in indicator_text:
1504 return "guidance_conflict"
1505 else:
1506 return "general_conflict"
1508 def _calculate_conflict_confidence(self, indicators: List[str]) -> float:
1509 """Calculate confidence in the conflict detection."""
1510 # Simple confidence based on number of indicators
1511 return min(0.9, len(indicators) * 0.3)
1513 def _describe_conflict(self, indicators: List[str]) -> str:
1514 """Generate a description of the conflict."""
1515 if len(indicators) == 1:
1516 return indicators[0]
1517 else:
1518 return f"Multiple conflicts detected: {'; '.join(indicators[:2])}"
1520 def _generate_resolution_suggestions(self, conflicts: ConflictAnalysis) -> Dict[str, str]:
1521 """Generate suggestions for resolving conflicts."""
1522 suggestions = {}
1524 for conflict_type, pairs in conflicts.conflict_categories.items():
1525 if conflict_type == "version_conflict":
1526 suggestions[conflict_type] = "Review documents for version consistency and update outdated information"
1527 elif conflict_type == "procedural_conflict":
1528 suggestions[conflict_type] = "Standardize procedural documentation and merge conflicting steps"
1529 elif conflict_type == "guidance_conflict":
1530 suggestions[conflict_type] = "Clarify guidance and ensure consistent recommendations"
1531 else:
1532 suggestions[conflict_type] = "Review conflicting documents and resolve inconsistencies"
1534 return suggestions
1536 def _extract_entity_texts(self, entities: List[Union[dict, str]]) -> List[str]:
1537 """Extract entity text from various formats."""
1538 texts = []
1539 for entity in entities:
1540 if isinstance(entity, dict):
1541 texts.append(entity.get("text", "").lower())
1542 elif isinstance(entity, str):
1543 texts.append(entity.lower())
1544 return [t for t in texts if t]
1546 def _extract_topic_texts(self, topics: List[Union[dict, str]]) -> List[str]:
1547 """Extract topic text from various formats."""
1548 texts = []
1549 for topic in topics:
1550 if isinstance(topic, dict):
1551 texts.append(topic.get("text", "").lower())
1552 elif isinstance(topic, str):
1553 texts.append(topic.lower())
1554 return [t for t in texts if t]
1557class CrossDocumentIntelligenceEngine:
1558 """Main engine that orchestrates cross-document intelligence analysis."""
1560 def __init__(self, spacy_analyzer: SpaCyQueryAnalyzer,
1561 knowledge_graph: Optional[DocumentKnowledgeGraph] = None):
1562 """Initialize the cross-document intelligence engine."""
1563 self.spacy_analyzer = spacy_analyzer
1564 self.knowledge_graph = knowledge_graph
1565 self.logger = LoggingConfig.get_logger(__name__)
1567 # Initialize component analyzers
1568 self.similarity_calculator = DocumentSimilarityCalculator(spacy_analyzer)
1569 self.cluster_analyzer = DocumentClusterAnalyzer(self.similarity_calculator)
1570 self.citation_analyzer = CitationNetworkAnalyzer()
1571 self.complementary_finder = ComplementaryContentFinder(self.similarity_calculator, knowledge_graph)
1572 self.conflict_detector = ConflictDetector(spacy_analyzer)
1574 def analyze_document_relationships(self, documents: List[SearchResult]) -> Dict[str, Any]:
1575 """Perform comprehensive cross-document relationship analysis."""
1576 start_time = time.time()
1578 self.logger.info(f"Starting cross-document intelligence analysis for {len(documents)} documents")
1580 # Document similarity analysis
1581 similarity_matrix = self._build_similarity_matrix(documents)
1583 # Document clustering
1584 clusters = self.cluster_analyzer.create_clusters(
1585 documents,
1586 strategy=ClusteringStrategy.MIXED_FEATURES,
1587 max_clusters=10,
1588 min_cluster_size=2
1589 )
1591 # Citation network analysis
1592 citation_network = self.citation_analyzer.build_citation_network(documents)
1594 # Find complementary content for each document
1595 complementary_recommendations = {}
1596 for doc in documents[:5]: # Limit to first 5 for performance
1597 doc_id = f"{doc.source_type}:{doc.source_title}"
1598 complementary = self.complementary_finder.find_complementary_content(doc, documents)
1599 complementary_recommendations[doc_id] = complementary
1601 # Conflict detection
1602 conflicts = self.conflict_detector.detect_conflicts(documents)
1604 processing_time = (time.time() - start_time) * 1000
1606 # Compile comprehensive analysis results
1607 analysis_results = {
1608 "summary": {
1609 "total_documents": len(documents),
1610 "processing_time_ms": processing_time,
1611 "clusters_found": len(clusters),
1612 "citation_relationships": len(citation_network.edges),
1613 "conflicts_detected": len(conflicts.conflicting_pairs),
1614 "complementary_pairs": sum(len(comp.recommendations) for comp in complementary_recommendations.values())
1615 },
1616 "document_clusters": [cluster.get_cluster_summary() for cluster in clusters],
1617 "citation_network": {
1618 "nodes": len(citation_network.nodes),
1619 "edges": len(citation_network.edges),
1620 "most_authoritative": self.citation_analyzer.get_most_authoritative_documents(citation_network, 5),
1621 "most_connected": self.citation_analyzer.get_most_connected_documents(citation_network, 5)
1622 },
1623 "complementary_content": {
1624 doc_id: comp.get_top_recommendations(3)
1625 for doc_id, comp in complementary_recommendations.items()
1626 },
1627 "conflict_analysis": conflicts.get_conflict_summary(),
1628 "similarity_insights": self._extract_similarity_insights(similarity_matrix),
1629 }
1631 self.logger.info(f"Cross-document intelligence analysis completed in {processing_time:.2f}ms")
1633 return analysis_results
1635 def find_document_relationships(self, target_doc_id: str, documents: List[SearchResult],
1636 relationship_types: List[RelationshipType] = None) -> Dict[str, List[Dict[str, Any]]]:
1637 """Find specific relationships for a target document."""
1638 if relationship_types is None:
1639 relationship_types = [RelationshipType.SEMANTIC_SIMILARITY, RelationshipType.COMPLEMENTARY,
1640 RelationshipType.HIERARCHICAL]
1642 # Find target document
1643 target_doc = None
1644 for doc in documents:
1645 if f"{doc.source_type}:{doc.source_title}" == target_doc_id:
1646 target_doc = doc
1647 break
1649 if not target_doc:
1650 return {"error": "Target document not found"}
1652 relationships = {rel_type.value: [] for rel_type in relationship_types}
1654 for rel_type in relationship_types:
1655 if rel_type == RelationshipType.SEMANTIC_SIMILARITY:
1656 # Find similar documents
1657 for doc in documents:
1658 if doc != target_doc:
1659 similarity = self.similarity_calculator.calculate_similarity(target_doc, doc)
1660 if similarity.similarity_score > 0.5:
1661 relationships[rel_type.value].append({
1662 "document_id": f"{doc.source_type}:{doc.source_title}",
1663 "score": similarity.similarity_score,
1664 "explanation": similarity.get_display_explanation()
1665 })
1667 elif rel_type == RelationshipType.COMPLEMENTARY:
1668 # Find complementary content
1669 complementary = self.complementary_finder.find_complementary_content(target_doc, documents)
1670 relationships[rel_type.value] = complementary.get_top_recommendations(5)
1672 elif rel_type == RelationshipType.HIERARCHICAL:
1673 # Find hierarchical relationships
1674 for doc in documents:
1675 if doc != target_doc:
1676 if (doc.parent_id == target_doc_id or
1677 target_doc.parent_id == f"{doc.source_type}:{doc.source_title}"):
1678 relationships[rel_type.value].append({
1679 "document_id": f"{doc.source_type}:{doc.source_title}",
1680 "relationship": "parent" if doc.parent_id == target_doc_id else "child",
1681 "explanation": "Direct hierarchical relationship"
1682 })
1684 # Sort each relationship type by score/relevance
1685 for rel_type in relationships:
1686 if relationships[rel_type]:
1687 relationships[rel_type] = sorted(
1688 relationships[rel_type],
1689 key=lambda x: x.get("score", x.get("relevance_score", 0)),
1690 reverse=True
1691 )[:5] # Top 5 for each type
1693 return relationships
1695 def _build_similarity_matrix(self, documents: List[SearchResult]) -> Dict[str, Dict[str, float]]:
1696 """Build similarity matrix for all document pairs."""
1697 matrix = {}
1699 for i, doc1 in enumerate(documents):
1700 doc1_id = f"{doc1.source_type}:{doc1.source_title}"
1701 matrix[doc1_id] = {}
1703 for j, doc2 in enumerate(documents):
1704 doc2_id = f"{doc2.source_type}:{doc2.source_title}"
1706 if i == j:
1707 matrix[doc1_id][doc2_id] = 1.0
1708 elif doc2_id in matrix and doc1_id in matrix[doc2_id]:
1709 # Use cached value
1710 matrix[doc1_id][doc2_id] = matrix[doc2_id][doc1_id]
1711 else:
1712 # Calculate similarity
1713 similarity = self.similarity_calculator.calculate_similarity(doc1, doc2)
1714 matrix[doc1_id][doc2_id] = similarity.similarity_score
1716 return matrix
1718 def _extract_similarity_insights(self, similarity_matrix: Dict[str, Dict[str, float]]) -> Dict[str, Any]:
1719 """Extract insights from the similarity matrix."""
1720 if not similarity_matrix:
1721 return {}
1723 all_scores = []
1724 for doc1_scores in similarity_matrix.values():
1725 for doc2_id, score in doc1_scores.items():
1726 if score < 1.0: # Exclude self-similarity
1727 all_scores.append(score)
1729 if not all_scores:
1730 return {}
1732 return {
1733 "average_similarity": sum(all_scores) / len(all_scores),
1734 "max_similarity": max(all_scores),
1735 "min_similarity": min(all_scores),
1736 "high_similarity_pairs": sum(1 for score in all_scores if score > 0.7),
1737 "total_pairs_analyzed": len(all_scores)
1738 }