Coverage for src/qdrant_loader/core/state/state_change_detector.py: 38%
65 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""Base classes for connectors and change detectors."""
3from datetime import datetime
4from urllib.parse import quote, unquote
6from pydantic import BaseModel, ConfigDict
8from qdrant_loader.config.sources import SourcesConfig
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.state.exceptions import InvalidDocumentStateError
11from qdrant_loader.core.state.state_manager import DocumentStateRecord, StateManager
12from qdrant_loader.utils.logging import LoggingConfig
15class DocumentState(BaseModel):
16 """Standardized document state representation.
18 This class provides a consistent way to represent document states across
19 all sources. It includes the essential fields needed for change detection.
20 """
22 uri: str # Universal identifier in format: {source_type}:{source}:{url}
23 content_hash: str # Hash of document content
24 updated_at: datetime # Last update timestamp
26 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
29class StateChangeDetector:
30 """Optimized change detector for document state management.
32 This class provides efficient change detection functionality across
33 all sources with minimal overhead and simplified logic.
34 """
36 def __init__(self, state_manager: StateManager):
37 """Initialize the change detector."""
38 self.logger = LoggingConfig.get_logger(
39 f"qdrant_loader.{self.__class__.__name__}"
40 )
41 self._initialized = False
42 self.state_manager = state_manager
44 async def __aenter__(self):
45 """Async context manager entry."""
46 self._initialized = True
47 return self
49 async def __aexit__(self, exc_type, exc_val, exc_tb):
50 """Async context manager exit."""
51 if exc_type:
52 self.logger.error(
53 "Error in StateChangeDetector context",
54 error_type=exc_type.__name__,
55 error=str(exc_val),
56 )
58 async def detect_changes(
59 self, documents: list[Document], filtered_config: SourcesConfig
60 ) -> dict[str, list[Document]]:
61 """Detect changes in documents efficiently."""
62 if not self._initialized:
63 raise RuntimeError(
64 "StateChangeDetector not initialized. Use as async context manager."
65 )
67 self.logger.info("Starting change detection", document_count=len(documents))
69 # Get current and previous states
70 current_states = [self._get_document_state(doc) for doc in documents]
71 previous_states = await self._get_previous_states(filtered_config)
73 # Create lookup sets/dicts for efficient comparison
74 previous_uris: set[str] = {state.uri for state in previous_states}
75 previous_states_dict: dict[str, DocumentState] = {
76 state.uri: state for state in previous_states
77 }
78 current_uris: set[str] = {state.uri for state in current_states}
80 # Find changes efficiently
81 new_docs = [
82 doc
83 for state, doc in zip(current_states, documents, strict=False)
84 if state.uri not in previous_uris
85 ]
87 updated_docs = [
88 doc
89 for state, doc in zip(current_states, documents, strict=False)
90 if state.uri in previous_states_dict
91 and self._is_document_updated(state, previous_states_dict[state.uri])
92 ]
94 deleted_docs = [
95 self._create_deleted_document(state)
96 for state in previous_states
97 if state.uri not in current_uris
98 ]
100 changes = {
101 "new": new_docs,
102 "updated": updated_docs,
103 "deleted": deleted_docs,
104 }
106 self.logger.info(
107 "Change detection completed",
108 new_count=len(new_docs),
109 updated_count=len(updated_docs),
110 deleted_count=len(deleted_docs),
111 )
113 return changes
115 def _get_document_state(self, document: Document) -> DocumentState:
116 """Get the standardized state of a document."""
117 try:
118 return DocumentState(
119 uri=self._generate_uri_from_document(document),
120 content_hash=document.content_hash,
121 updated_at=document.updated_at,
122 )
123 except Exception as e:
124 raise InvalidDocumentStateError(f"Failed to get document state: {e}") from e
126 def _is_document_updated(
127 self, current_state: DocumentState, previous_state: DocumentState
128 ) -> bool:
129 """Check if a document has been updated."""
130 return (
131 current_state.content_hash != previous_state.content_hash
132 or current_state.updated_at > previous_state.updated_at
133 )
135 def _create_deleted_document(self, document_state: DocumentState) -> Document:
136 """Create a minimal document for a deleted item."""
137 source_type, source, url = document_state.uri.split(":", 2)
138 url = unquote(url)
140 return Document(
141 content="",
142 content_type="md",
143 source=source,
144 source_type=source_type,
145 url=url,
146 title="Deleted Document",
147 metadata={
148 "uri": document_state.uri,
149 "title": "Deleted Document",
150 "updated_at": document_state.updated_at.isoformat(),
151 "content_hash": document_state.content_hash,
152 },
153 )
155 async def _get_previous_states(
156 self, filtered_config: SourcesConfig
157 ) -> list[DocumentState]:
158 """Get previous document states from the state manager efficiently."""
159 previous_states_records: list[DocumentStateRecord] = []
161 # Define source type mappings for cleaner iteration
162 source_mappings = [
163 ("git", filtered_config.git),
164 ("confluence", filtered_config.confluence),
165 ("jira", filtered_config.jira),
166 ("publicdocs", filtered_config.publicdocs),
167 ("localfile", filtered_config.localfile),
168 ]
170 # Process each source type
171 for _source_name, source_configs in source_mappings:
172 if source_configs:
173 for config in source_configs.values():
174 records = await self.state_manager.get_document_state_records(
175 config
176 )
177 previous_states_records.extend(records)
179 # Convert records to states efficiently
180 return [
181 DocumentState(
182 uri=self._generate_uri(
183 record.url, record.source, record.source_type, record.document_id # type: ignore
184 ),
185 content_hash=record.content_hash, # type: ignore
186 updated_at=record.updated_at, # type: ignore
187 )
188 for record in previous_states_records
189 ]
191 def _normalize_url(self, url: str) -> str:
192 """Normalize a URL for consistent hashing."""
193 return quote(url.rstrip("/"), safe="")
195 def _generate_uri_from_document(self, document: Document) -> str:
196 """Generate a URI from a document."""
197 return self._generate_uri(
198 document.url, document.source, document.source_type, document.id
199 )
201 def _generate_uri(
202 self, url: str, source: str, source_type: str, document_id: str
203 ) -> str:
204 """Generate a URI from document components."""
205 return f"{source_type}:{source}:{self._normalize_url(url)}"