Coverage for src / qdrant_loader / core / state / state_change_detector.py: 72%
93 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Base classes for connectors and change detectors."""
3from datetime import datetime
4from urllib.parse import quote, unquote
6from pydantic import BaseModel, ConfigDict
8from qdrant_loader.config.sources import SourcesConfig
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.state.exceptions import InvalidDocumentStateError
11from qdrant_loader.core.state.state_manager import DocumentStateRecord, StateManager
12from qdrant_loader.utils.logging import LoggingConfig
15class DocumentState(BaseModel):
16 """Standardized document state representation.
18 This class provides a consistent way to represent document states across
19 all sources. It includes the essential fields needed for change detection.
20 """
22 document_id: str
23 uri: str # Universal identifier in format: {source_type}:{source}:{url}
24 content_hash: str # Hash of document content
25 updated_at: datetime # Last update timestamp
27 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
30class StateChangeDetector:
31 """Optimized change detector for document state management.
33 This class provides efficient change detection functionality across
34 all sources with minimal overhead and simplified logic.
35 """
37 def __init__(self, state_manager: StateManager):
38 """Initialize the change detector."""
39 self.logger = LoggingConfig.get_logger(
40 f"qdrant_loader.{self.__class__.__name__}"
41 )
42 self._initialized = False
43 self.state_manager = state_manager
45 async def __aenter__(self):
46 """Async context manager entry."""
47 self._initialized = True
48 return self
50 async def __aexit__(self, exc_type, exc_val, _exc_tb):
51 """Async context manager exit."""
52 if exc_type:
53 self.logger.error(
54 "Error in StateChangeDetector context",
55 error_type=exc_type.__name__,
56 error=str(exc_val),
57 )
59 async def detect_changes(
60 self, documents: list[Document], filtered_config: SourcesConfig
61 ) -> dict[str, list[Document]]:
62 """Detect changes in documents efficiently."""
63 if not self._initialized:
64 raise RuntimeError(
65 "StateChangeDetector not initialized. Use as async context manager."
66 )
68 self.logger.info("Starting change detection", document_count=len(documents))
70 # Get current and previous states
71 current_states = [self._get_document_state(doc) for doc in documents]
72 previous_states = await self._get_previous_states(filtered_config)
74 # Create lookup sets/dicts for efficient comparison
75 previous_uris: set[str] = {state.uri for state in previous_states}
76 previous_states_dict: dict[str, DocumentState] = {
77 state.uri: state for state in previous_states
78 }
79 current_uris: set[str] = {state.uri for state in current_states}
81 # Find changes efficiently
82 new_docs = [
83 doc
84 for state, doc in zip(current_states, documents, strict=False)
85 if state.uri not in previous_uris
86 ]
88 updated_docs = [
89 doc
90 for state, doc in zip(current_states, documents, strict=False)
91 if state.uri in previous_states_dict
92 and self._is_document_updated(state, previous_states_dict[state.uri])
93 ]
95 deleted_docs = [
96 self._create_deleted_document(state)
97 for state in previous_states
98 if state.uri not in current_uris
99 ]
101 changes = {
102 "new": new_docs,
103 "updated": updated_docs,
104 "deleted": deleted_docs,
105 }
107 self.logger.info(
108 "Change detection completed",
109 new_count=len(new_docs),
110 updated_count=len(updated_docs),
111 deleted_count=len(deleted_docs),
112 )
114 return changes
116 async def classify_batch(
117 self,
118 documents: list[Document],
119 filtered_config: SourcesConfig,
120 project_id: str | None = None,
121 ) -> list[Document]:
122 """Classify a bounded batch of documents by comparing against state records.
124 This method performs targeted state lookups per document URI and returns
125 only new or updated documents for processing.
126 """
127 if not self._initialized:
128 raise RuntimeError(
129 "StateChangeDetector not initialized. Use as async context manager."
130 )
132 self.logger.info(
133 "Classifying document batch for inline change detection",
134 document_count=len(documents),
135 )
137 current_states = [self._get_document_state(doc) for doc in documents]
138 previous_states_by_key: dict[tuple[str, str, str], DocumentState] = {}
140 # Group documents by source to perform bulk DB lookups per source
141 groups: dict[tuple[str, str], list[Document]] = {}
142 for doc in documents:
143 key = (doc.source_type, doc.source)
144 groups.setdefault(key, []).append(doc)
146 for (source_type, source), docs in groups.items():
147 try:
148 ids = [d.id for d in docs]
149 records = await self.state_manager.get_document_state_records_by_ids(
150 source_type, source, ids, project_id
151 )
152 for record in records:
153 uri = self._generate_uri(
154 record.url, record.source, record.source_type, record.document_id # type: ignore
155 )
156 previous_states_by_key[
157 (record.source_type, record.source, record.document_id)
158 ] = DocumentState(
159 document_id=record.document_id, # type: ignore
160 uri=uri,
161 content_hash=record.content_hash, # type: ignore
162 updated_at=record.updated_at, # type: ignore
163 )
164 except Exception as e:
165 self.logger.error(
166 "Error loading existing document state for batch classification",
167 source_type=source_type,
168 source=source,
169 error=str(e),
170 error_type=type(e).__name__,
171 )
172 raise
174 changed_documents: list[Document] = []
175 for state, doc in zip(current_states, documents, strict=False):
176 previous = previous_states_by_key.get((doc.source_type, doc.source, doc.id))
177 if (
178 previous is None
179 or state.uri != previous.uri
180 or self._is_document_updated(state, previous)
181 ):
182 changed_documents.append(doc)
184 self.logger.info(
185 "Batch classification completed",
186 new_or_updated_count=len(changed_documents),
187 total_documents=len(documents),
188 )
190 return changed_documents
192 def _get_document_state(self, document: Document) -> DocumentState:
193 """Get the standardized state of a document."""
194 try:
195 return DocumentState(
196 document_id=document.id,
197 uri=self._generate_uri_from_document(document),
198 content_hash=document.content_hash,
199 updated_at=document.updated_at,
200 )
201 except Exception as e:
202 raise InvalidDocumentStateError(f"Failed to get document state: {e}") from e
204 def _is_document_updated(
205 self, current_state: DocumentState, previous_state: DocumentState
206 ) -> bool:
207 """Check if a document has been updated."""
208 return (
209 current_state.content_hash != previous_state.content_hash
210 or current_state.updated_at > previous_state.updated_at
211 )
213 def _create_deleted_document(self, document_state: DocumentState) -> Document:
214 """Create a minimal document for a deleted item."""
215 source_type, source, url = document_state.uri.split(":", 2)
216 url = unquote(url)
218 return Document(
219 id=document_state.document_id,
220 content="",
221 content_type="md",
222 source=source,
223 source_type=source_type,
224 url=url,
225 title="Deleted Document",
226 metadata={
227 "uri": document_state.uri,
228 "title": "Deleted Document",
229 "updated_at": document_state.updated_at.isoformat(),
230 "content_hash": document_state.content_hash,
231 },
232 )
234 async def _get_previous_states(
235 self, filtered_config: SourcesConfig
236 ) -> list[DocumentState]:
237 """Get previous document states from the state manager efficiently."""
238 previous_states_records: list[DocumentStateRecord] = []
240 # Define source type mappings for cleaner iteration
241 source_mappings = [
242 ("git", filtered_config.git),
243 ("confluence", filtered_config.confluence),
244 ("jira", filtered_config.jira),
245 ("publicdocs", filtered_config.publicdocs),
246 ("localfile", filtered_config.localfile),
247 ]
249 # Process each source type
250 for _source_name, source_configs in source_mappings:
251 if source_configs:
252 for config in source_configs.values():
253 records = await self.state_manager.get_document_state_records(
254 config
255 )
256 previous_states_records.extend(records)
258 # Convert records to states efficiently
259 return [
260 DocumentState(
261 document_id=record.document_id, # type: ignore
262 uri=self._generate_uri(
263 record.url, record.source, record.source_type, record.document_id # type: ignore
264 ),
265 content_hash=record.content_hash, # type: ignore
266 updated_at=record.updated_at, # type: ignore
267 )
268 for record in previous_states_records
269 ]
271 def _normalize_url(self, url: str) -> str:
272 """Normalize a URL for consistent hashing."""
273 return quote(url.rstrip("/"), safe="")
275 def _generate_uri_from_document(self, document: Document) -> str:
276 """Generate a URI from a document."""
277 return self._generate_uri(
278 document.url, document.source, document.source_type, document.id
279 )
281 def _generate_uri(
282 self, url: str, source: str, source_type: str, document_id: str
283 ) -> str:
284 """Generate a URI from document components."""
285 return f"{source_type}:{source}:{self._normalize_url(url)}"