Coverage for src / qdrant_loader / core / state / state_change_detector.py: 72%

93 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Base classes for connectors and change detectors.""" 

2 

3from datetime import datetime 

4from urllib.parse import quote, unquote 

5 

6from pydantic import BaseModel, ConfigDict 

7 

8from qdrant_loader.config.sources import SourcesConfig 

9from qdrant_loader.core.document import Document 

10from qdrant_loader.core.state.exceptions import InvalidDocumentStateError 

11from qdrant_loader.core.state.state_manager import DocumentStateRecord, StateManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14 

15class DocumentState(BaseModel): 

16 """Standardized document state representation. 

17 

18 This class provides a consistent way to represent document states across 

19 all sources. It includes the essential fields needed for change detection. 

20 """ 

21 

22 document_id: str 

23 uri: str # Universal identifier in format: {source_type}:{source}:{url} 

24 content_hash: str # Hash of document content 

25 updated_at: datetime # Last update timestamp 

26 

27 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") 

28 

29 

30class StateChangeDetector: 

31 """Optimized change detector for document state management. 

32 

33 This class provides efficient change detection functionality across 

34 all sources with minimal overhead and simplified logic. 

35 """ 

36 

37 def __init__(self, state_manager: StateManager): 

38 """Initialize the change detector.""" 

39 self.logger = LoggingConfig.get_logger( 

40 f"qdrant_loader.{self.__class__.__name__}" 

41 ) 

42 self._initialized = False 

43 self.state_manager = state_manager 

44 

45 async def __aenter__(self): 

46 """Async context manager entry.""" 

47 self._initialized = True 

48 return self 

49 

50 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

51 """Async context manager exit.""" 

52 if exc_type: 

53 self.logger.error( 

54 "Error in StateChangeDetector context", 

55 error_type=exc_type.__name__, 

56 error=str(exc_val), 

57 ) 

58 

59 async def detect_changes( 

60 self, documents: list[Document], filtered_config: SourcesConfig 

61 ) -> dict[str, list[Document]]: 

62 """Detect changes in documents efficiently.""" 

63 if not self._initialized: 

64 raise RuntimeError( 

65 "StateChangeDetector not initialized. Use as async context manager." 

66 ) 

67 

68 self.logger.info("Starting change detection", document_count=len(documents)) 

69 

70 # Get current and previous states 

71 current_states = [self._get_document_state(doc) for doc in documents] 

72 previous_states = await self._get_previous_states(filtered_config) 

73 

74 # Create lookup sets/dicts for efficient comparison 

75 previous_uris: set[str] = {state.uri for state in previous_states} 

76 previous_states_dict: dict[str, DocumentState] = { 

77 state.uri: state for state in previous_states 

78 } 

79 current_uris: set[str] = {state.uri for state in current_states} 

80 

81 # Find changes efficiently 

82 new_docs = [ 

83 doc 

84 for state, doc in zip(current_states, documents, strict=False) 

85 if state.uri not in previous_uris 

86 ] 

87 

88 updated_docs = [ 

89 doc 

90 for state, doc in zip(current_states, documents, strict=False) 

91 if state.uri in previous_states_dict 

92 and self._is_document_updated(state, previous_states_dict[state.uri]) 

93 ] 

94 

95 deleted_docs = [ 

96 self._create_deleted_document(state) 

97 for state in previous_states 

98 if state.uri not in current_uris 

99 ] 

100 

101 changes = { 

102 "new": new_docs, 

103 "updated": updated_docs, 

104 "deleted": deleted_docs, 

105 } 

106 

107 self.logger.info( 

108 "Change detection completed", 

109 new_count=len(new_docs), 

110 updated_count=len(updated_docs), 

111 deleted_count=len(deleted_docs), 

112 ) 

113 

114 return changes 

115 

116 async def classify_batch( 

117 self, 

118 documents: list[Document], 

119 filtered_config: SourcesConfig, 

120 project_id: str | None = None, 

121 ) -> list[Document]: 

122 """Classify a bounded batch of documents by comparing against state records. 

123 

124 This method performs targeted state lookups per document URI and returns 

125 only new or updated documents for processing. 

126 """ 

127 if not self._initialized: 

128 raise RuntimeError( 

129 "StateChangeDetector not initialized. Use as async context manager." 

130 ) 

131 

132 self.logger.info( 

133 "Classifying document batch for inline change detection", 

134 document_count=len(documents), 

135 ) 

136 

137 current_states = [self._get_document_state(doc) for doc in documents] 

138 previous_states_by_key: dict[tuple[str, str, str], DocumentState] = {} 

139 

140 # Group documents by source to perform bulk DB lookups per source 

141 groups: dict[tuple[str, str], list[Document]] = {} 

142 for doc in documents: 

143 key = (doc.source_type, doc.source) 

144 groups.setdefault(key, []).append(doc) 

145 

146 for (source_type, source), docs in groups.items(): 

147 try: 

148 ids = [d.id for d in docs] 

149 records = await self.state_manager.get_document_state_records_by_ids( 

150 source_type, source, ids, project_id 

151 ) 

152 for record in records: 

153 uri = self._generate_uri( 

154 record.url, record.source, record.source_type, record.document_id # type: ignore 

155 ) 

156 previous_states_by_key[ 

157 (record.source_type, record.source, record.document_id) 

158 ] = DocumentState( 

159 document_id=record.document_id, # type: ignore 

160 uri=uri, 

161 content_hash=record.content_hash, # type: ignore 

162 updated_at=record.updated_at, # type: ignore 

163 ) 

164 except Exception as e: 

165 self.logger.error( 

166 "Error loading existing document state for batch classification", 

167 source_type=source_type, 

168 source=source, 

169 error=str(e), 

170 error_type=type(e).__name__, 

171 ) 

172 raise 

173 

174 changed_documents: list[Document] = [] 

175 for state, doc in zip(current_states, documents, strict=False): 

176 previous = previous_states_by_key.get((doc.source_type, doc.source, doc.id)) 

177 if ( 

178 previous is None 

179 or state.uri != previous.uri 

180 or self._is_document_updated(state, previous) 

181 ): 

182 changed_documents.append(doc) 

183 

184 self.logger.info( 

185 "Batch classification completed", 

186 new_or_updated_count=len(changed_documents), 

187 total_documents=len(documents), 

188 ) 

189 

190 return changed_documents 

191 

192 def _get_document_state(self, document: Document) -> DocumentState: 

193 """Get the standardized state of a document.""" 

194 try: 

195 return DocumentState( 

196 document_id=document.id, 

197 uri=self._generate_uri_from_document(document), 

198 content_hash=document.content_hash, 

199 updated_at=document.updated_at, 

200 ) 

201 except Exception as e: 

202 raise InvalidDocumentStateError(f"Failed to get document state: {e}") from e 

203 

204 def _is_document_updated( 

205 self, current_state: DocumentState, previous_state: DocumentState 

206 ) -> bool: 

207 """Check if a document has been updated.""" 

208 return ( 

209 current_state.content_hash != previous_state.content_hash 

210 or current_state.updated_at > previous_state.updated_at 

211 ) 

212 

213 def _create_deleted_document(self, document_state: DocumentState) -> Document: 

214 """Create a minimal document for a deleted item.""" 

215 source_type, source, url = document_state.uri.split(":", 2) 

216 url = unquote(url) 

217 

218 return Document( 

219 id=document_state.document_id, 

220 content="", 

221 content_type="md", 

222 source=source, 

223 source_type=source_type, 

224 url=url, 

225 title="Deleted Document", 

226 metadata={ 

227 "uri": document_state.uri, 

228 "title": "Deleted Document", 

229 "updated_at": document_state.updated_at.isoformat(), 

230 "content_hash": document_state.content_hash, 

231 }, 

232 ) 

233 

234 async def _get_previous_states( 

235 self, filtered_config: SourcesConfig 

236 ) -> list[DocumentState]: 

237 """Get previous document states from the state manager efficiently.""" 

238 previous_states_records: list[DocumentStateRecord] = [] 

239 

240 # Define source type mappings for cleaner iteration 

241 source_mappings = [ 

242 ("git", filtered_config.git), 

243 ("confluence", filtered_config.confluence), 

244 ("jira", filtered_config.jira), 

245 ("publicdocs", filtered_config.publicdocs), 

246 ("localfile", filtered_config.localfile), 

247 ] 

248 

249 # Process each source type 

250 for _source_name, source_configs in source_mappings: 

251 if source_configs: 

252 for config in source_configs.values(): 

253 records = await self.state_manager.get_document_state_records( 

254 config 

255 ) 

256 previous_states_records.extend(records) 

257 

258 # Convert records to states efficiently 

259 return [ 

260 DocumentState( 

261 document_id=record.document_id, # type: ignore 

262 uri=self._generate_uri( 

263 record.url, record.source, record.source_type, record.document_id # type: ignore 

264 ), 

265 content_hash=record.content_hash, # type: ignore 

266 updated_at=record.updated_at, # type: ignore 

267 ) 

268 for record in previous_states_records 

269 ] 

270 

271 def _normalize_url(self, url: str) -> str: 

272 """Normalize a URL for consistent hashing.""" 

273 return quote(url.rstrip("/"), safe="") 

274 

275 def _generate_uri_from_document(self, document: Document) -> str: 

276 """Generate a URI from a document.""" 

277 return self._generate_uri( 

278 document.url, document.source, document.source_type, document.id 

279 ) 

280 

281 def _generate_uri( 

282 self, url: str, source: str, source_type: str, document_id: str 

283 ) -> str: 

284 """Generate a URI from document components.""" 

285 return f"{source_type}:{source}:{self._normalize_url(url)}"