Coverage for src/qdrant_loader_mcp_server/search/enhanced/cdi/citations.py: 67%

91 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:06 +0000

1""" 

2Citation Network Analysis for Cross-Document Intelligence. 

3 

4This module implements citation and reference network analysis between documents, 

5building networks from cross-references, hierarchical relationships, and calculating 

6centrality scores to identify authoritative and well-connected documents. 

7""" 

8 

9from __future__ import annotations 

10 

11import time 

12 

13from ....utils.logging import LoggingConfig 

14from ...models import SearchResult 

15from .models import CitationNetwork 

16 

17 

18class CitationNetworkAnalyzer: 

19 """Analyzes citation and reference networks between documents.""" 

20 

21 def __init__(self): 

22 """Initialize the citation network analyzer.""" 

23 self.logger = LoggingConfig.get_logger(__name__) 

24 

25 def build_citation_network(self, documents: list[SearchResult]) -> CitationNetwork: 

26 """Build citation network from document cross-references and hierarchical relationships.""" 

27 start_time = time.time() 

28 

29 network = CitationNetwork() 

30 doc_lookup = {f"{doc.source_type}:{doc.source_title}": doc for doc in documents} 

31 

32 # Add nodes to the network 

33 for doc in documents: 

34 doc_id = f"{doc.source_type}:{doc.source_title}" 

35 network.nodes[doc_id] = { 

36 "title": doc.source_title, 

37 "source_type": doc.source_type, 

38 "project_id": doc.project_id, 

39 "word_count": doc.word_count or 0, 

40 "has_code": doc.has_code_blocks, 

41 "has_tables": doc.has_tables, 

42 "depth": doc.depth or 0, 

43 "creation_date": getattr(doc, "created_at", None), 

44 } 

45 

46 # Add edges based on cross-references 

47 for doc in documents: 

48 doc_id = f"{doc.source_type}:{doc.source_title}" 

49 

50 # Process cross-references 

51 if doc.cross_references: 

52 for ref in doc.cross_references: 

53 target_url = ref.get("url", "") if isinstance(ref, dict) else "" 

54 ref_text = ( 

55 ref.get("text", "") if isinstance(ref, dict) else str(ref) 

56 ) 

57 

58 # Try to find referenced document 

59 target_doc_id = self._find_referenced_document( 

60 target_url, doc_lookup 

61 ) 

62 if target_doc_id and target_doc_id != doc_id: 

63 network.edges.append( 

64 ( 

65 doc_id, 

66 target_doc_id, 

67 { 

68 "relation_type": "cross_reference", 

69 "reference_text": ref_text, 

70 "reference_url": target_url, 

71 "weight": 1.0, 

72 }, 

73 ) 

74 ) 

75 

76 # Add hierarchical relationships 

77 if doc.parent_id is not None: 

78 if doc.parent_id in doc_lookup: 

79 network.edges.append( 

80 ( 

81 doc.parent_id, 

82 doc_id, 

83 { 

84 "relation_type": "hierarchical_child", 

85 "weight": 2.0, # Higher weight for hierarchical relationships 

86 }, 

87 ) 

88 ) 

89 else: 

90 # Parent declared but not found; log for visibility and skip 

91 self.logger.debug( 

92 "Parent ID not found in documents for hierarchical edge", 

93 child_id=doc_id, 

94 parent_id=doc.parent_id, 

95 ) 

96 

97 # Add sibling relationships 

98 if doc.sibling_sections: 

99 for sibling in doc.sibling_sections: 

100 sibling_doc_id = self._find_sibling_document( 

101 sibling, doc_lookup, doc 

102 ) 

103 if sibling_doc_id and sibling_doc_id != doc_id: 

104 network.edges.append( 

105 ( 

106 doc_id, 

107 sibling_doc_id, 

108 {"relation_type": "sibling", "weight": 0.5}, 

109 ) 

110 ) 

111 

112 # Build NetworkX graph and calculate centrality scores 

113 network.build_graph() 

114 network.calculate_centrality_scores() 

115 

116 processing_time = (time.time() - start_time) * 1000 

117 self.logger.info( 

118 f"Built citation network with {len(network.nodes)} nodes and {len(network.edges)} edges in {processing_time:.2f}ms" 

119 ) 

120 

121 return network 

122 

123 def _find_referenced_document( 

124 self, reference_url: str, doc_lookup: dict[str, SearchResult] 

125 ) -> str | None: 

126 """Find document that matches a reference URL.""" 

127 if not reference_url: 

128 return None 

129 

130 # Try exact URL match first 

131 for doc_id, doc in doc_lookup.items(): 

132 if doc.source_url and reference_url in doc.source_url: 

133 return doc_id 

134 

135 # Try title-based matching for internal references 

136 for doc_id, doc in doc_lookup.items(): 

137 if reference_url.lower() in doc.source_title.lower(): 

138 return doc_id 

139 

140 return None 

141 

142 def _find_sibling_document( 

143 self, 

144 sibling_reference: str, 

145 doc_lookup: dict[str, SearchResult], 

146 current_doc: SearchResult | None = None, 

147 ) -> str | None: 

148 """Find document that matches a sibling reference. 

149 

150 Uses normalized whole-phrase matching and, when available, validates 

151 via explicit hierarchy metadata (matching parent identifiers) to avoid 

152 false positives from broad substring checks. 

153 """ 

154 import re 

155 

156 if not sibling_reference: 

157 return None 

158 

159 def normalize_title(value: str) -> str: 

160 # Lowercase and keep word characters joined by single spaces 

161 tokens = re.findall(r"\w+", (value or "").lower()) 

162 return " ".join(tokens) 

163 

164 target_norm = normalize_title(sibling_reference) 

165 

166 for doc_id, candidate in doc_lookup.items(): 

167 # Prefer siblings within the same source type when known 

168 if current_doc is not None: 

169 if getattr(current_doc, "source_type", None) and ( 

170 candidate.source_type != current_doc.source_type 

171 ): 

172 continue 

173 

174 # Skip self 

175 if candidate is current_doc: 

176 continue 

177 

178 cand_title_norm = normalize_title(getattr(candidate, "source_title", "")) 

179 if not cand_title_norm: 

180 continue 

181 

182 # Exact normalized title match or whole-phrase match using word boundaries 

183 if target_norm == cand_title_norm or re.search( 

184 r"\b" + re.escape(target_norm) + r"\b", cand_title_norm 

185 ): 

186 # Validate using parent metadata if both sides provide it 

187 parent_ok = True 

188 if current_doc is not None: 

189 cur_parent_id = getattr(current_doc, "parent_id", None) 

190 cand_parent_id = getattr(candidate, "parent_id", None) 

191 if cur_parent_id is not None and cand_parent_id is not None: 

192 parent_ok = cur_parent_id == cand_parent_id 

193 

194 if parent_ok: 

195 cur_parent_doc_id = getattr( 

196 current_doc, "parent_document_id", None 

197 ) 

198 cand_parent_doc_id = getattr( 

199 candidate, "parent_document_id", None 

200 ) 

201 if ( 

202 cur_parent_doc_id is not None 

203 and cand_parent_doc_id is not None 

204 ): 

205 parent_ok = cur_parent_doc_id == cand_parent_doc_id 

206 

207 if parent_ok: 

208 return doc_id 

209 

210 return None 

211 

212 def get_most_authoritative_documents( 

213 self, network: CitationNetwork, limit: int = 10 

214 ) -> list[tuple[str, float]]: 

215 """Get the most authoritative documents based on citation analysis.""" 

216 if not network.authority_scores: 

217 return [] 

218 

219 # Sort by authority score 

220 sorted_docs = sorted( 

221 network.authority_scores.items(), key=lambda x: x[1], reverse=True 

222 ) 

223 return sorted_docs[:limit] 

224 

225 def get_most_connected_documents( 

226 self, network: CitationNetwork, limit: int = 10 

227 ) -> list[tuple[str, int]]: 

228 """Get the most connected documents based on degree centrality.""" 

229 if not network.graph: 

230 return [] 

231 

232 # Calculate degree centrality 

233 degree_centrality = dict(network.graph.degree()) 

234 sorted_docs = sorted( 

235 degree_centrality.items(), key=lambda x: x[1], reverse=True 

236 ) 

237 return sorted_docs[:limit]