Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/citations.py: 67%
91 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:06 +0000
1"""
2Citation Network Analysis for Cross-Document Intelligence.
4This module implements citation and reference network analysis between documents,
5building networks from cross-references, hierarchical relationships, and calculating
6centrality scores to identify authoritative and well-connected documents.
7"""
9from __future__ import annotations
11import time
13from ....utils.logging import LoggingConfig
14from ...models import SearchResult
15from .models import CitationNetwork
18class CitationNetworkAnalyzer:
19 """Analyzes citation and reference networks between documents."""
21 def __init__(self):
22 """Initialize the citation network analyzer."""
23 self.logger = LoggingConfig.get_logger(__name__)
25 def build_citation_network(self, documents: list[SearchResult]) -> CitationNetwork:
26 """Build citation network from document cross-references and hierarchical relationships."""
27 start_time = time.time()
29 network = CitationNetwork()
30 doc_lookup = {f"{doc.source_type}:{doc.source_title}": doc for doc in documents}
32 # Add nodes to the network
33 for doc in documents:
34 doc_id = f"{doc.source_type}:{doc.source_title}"
35 network.nodes[doc_id] = {
36 "title": doc.source_title,
37 "source_type": doc.source_type,
38 "project_id": doc.project_id,
39 "word_count": doc.word_count or 0,
40 "has_code": doc.has_code_blocks,
41 "has_tables": doc.has_tables,
42 "depth": doc.depth or 0,
43 "creation_date": getattr(doc, "created_at", None),
44 }
46 # Add edges based on cross-references
47 for doc in documents:
48 doc_id = f"{doc.source_type}:{doc.source_title}"
50 # Process cross-references
51 if doc.cross_references:
52 for ref in doc.cross_references:
53 target_url = ref.get("url", "") if isinstance(ref, dict) else ""
54 ref_text = (
55 ref.get("text", "") if isinstance(ref, dict) else str(ref)
56 )
58 # Try to find referenced document
59 target_doc_id = self._find_referenced_document(
60 target_url, doc_lookup
61 )
62 if target_doc_id and target_doc_id != doc_id:
63 network.edges.append(
64 (
65 doc_id,
66 target_doc_id,
67 {
68 "relation_type": "cross_reference",
69 "reference_text": ref_text,
70 "reference_url": target_url,
71 "weight": 1.0,
72 },
73 )
74 )
76 # Add hierarchical relationships
77 if doc.parent_id is not None:
78 if doc.parent_id in doc_lookup:
79 network.edges.append(
80 (
81 doc.parent_id,
82 doc_id,
83 {
84 "relation_type": "hierarchical_child",
85 "weight": 2.0, # Higher weight for hierarchical relationships
86 },
87 )
88 )
89 else:
90 # Parent declared but not found; log for visibility and skip
91 self.logger.debug(
92 "Parent ID not found in documents for hierarchical edge",
93 child_id=doc_id,
94 parent_id=doc.parent_id,
95 )
97 # Add sibling relationships
98 if doc.sibling_sections:
99 for sibling in doc.sibling_sections:
100 sibling_doc_id = self._find_sibling_document(
101 sibling, doc_lookup, doc
102 )
103 if sibling_doc_id and sibling_doc_id != doc_id:
104 network.edges.append(
105 (
106 doc_id,
107 sibling_doc_id,
108 {"relation_type": "sibling", "weight": 0.5},
109 )
110 )
112 # Build NetworkX graph and calculate centrality scores
113 network.build_graph()
114 network.calculate_centrality_scores()
116 processing_time = (time.time() - start_time) * 1000
117 self.logger.info(
118 f"Built citation network with {len(network.nodes)} nodes and {len(network.edges)} edges in {processing_time:.2f}ms"
119 )
121 return network
123 def _find_referenced_document(
124 self, reference_url: str, doc_lookup: dict[str, SearchResult]
125 ) -> str | None:
126 """Find document that matches a reference URL."""
127 if not reference_url:
128 return None
130 # Try exact URL match first
131 for doc_id, doc in doc_lookup.items():
132 if doc.source_url and reference_url in doc.source_url:
133 return doc_id
135 # Try title-based matching for internal references
136 for doc_id, doc in doc_lookup.items():
137 if reference_url.lower() in doc.source_title.lower():
138 return doc_id
140 return None
142 def _find_sibling_document(
143 self,
144 sibling_reference: str,
145 doc_lookup: dict[str, SearchResult],
146 current_doc: SearchResult | None = None,
147 ) -> str | None:
148 """Find document that matches a sibling reference.
150 Uses normalized whole-phrase matching and, when available, validates
151 via explicit hierarchy metadata (matching parent identifiers) to avoid
152 false positives from broad substring checks.
153 """
154 import re
156 if not sibling_reference:
157 return None
159 def normalize_title(value: str) -> str:
160 # Lowercase and keep word characters joined by single spaces
161 tokens = re.findall(r"\w+", (value or "").lower())
162 return " ".join(tokens)
164 target_norm = normalize_title(sibling_reference)
166 for doc_id, candidate in doc_lookup.items():
167 # Prefer siblings within the same source type when known
168 if current_doc is not None:
169 if getattr(current_doc, "source_type", None) and (
170 candidate.source_type != current_doc.source_type
171 ):
172 continue
174 # Skip self
175 if candidate is current_doc:
176 continue
178 cand_title_norm = normalize_title(getattr(candidate, "source_title", ""))
179 if not cand_title_norm:
180 continue
182 # Exact normalized title match or whole-phrase match using word boundaries
183 if target_norm == cand_title_norm or re.search(
184 r"\b" + re.escape(target_norm) + r"\b", cand_title_norm
185 ):
186 # Validate using parent metadata if both sides provide it
187 parent_ok = True
188 if current_doc is not None:
189 cur_parent_id = getattr(current_doc, "parent_id", None)
190 cand_parent_id = getattr(candidate, "parent_id", None)
191 if cur_parent_id is not None and cand_parent_id is not None:
192 parent_ok = cur_parent_id == cand_parent_id
194 if parent_ok:
195 cur_parent_doc_id = getattr(
196 current_doc, "parent_document_id", None
197 )
198 cand_parent_doc_id = getattr(
199 candidate, "parent_document_id", None
200 )
201 if (
202 cur_parent_doc_id is not None
203 and cand_parent_doc_id is not None
204 ):
205 parent_ok = cur_parent_doc_id == cand_parent_doc_id
207 if parent_ok:
208 return doc_id
210 return None
212 def get_most_authoritative_documents(
213 self, network: CitationNetwork, limit: int = 10
214 ) -> list[tuple[str, float]]:
215 """Get the most authoritative documents based on citation analysis."""
216 if not network.authority_scores:
217 return []
219 # Sort by authority score
220 sorted_docs = sorted(
221 network.authority_scores.items(), key=lambda x: x[1], reverse=True
222 )
223 return sorted_docs[:limit]
225 def get_most_connected_documents(
226 self, network: CitationNetwork, limit: int = 10
227 ) -> list[tuple[str, int]]:
228 """Get the most connected documents based on degree centrality."""
229 if not network.graph:
230 return []
232 # Calculate degree centrality
233 degree_centrality = dict(network.graph.degree())
234 sorted_docs = sorted(
235 degree_centrality.items(), key=lambda x: x[1], reverse=True
236 )
237 return sorted_docs[:limit]