Coverage for src/qdrant_loader/core/state/state_change_detector.py: 38%
65 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Base classes for connectors and change detectors."""
3from datetime import datetime
4from urllib.parse import quote, unquote
6from pydantic import BaseModel, ConfigDict
8from qdrant_loader.config.sources import SourcesConfig
9from qdrant_loader.core.document import Document
10from qdrant_loader.core.state.exceptions import InvalidDocumentStateError
11from qdrant_loader.core.state.state_manager import DocumentStateRecord, StateManager
12from qdrant_loader.utils.logging import LoggingConfig
15class DocumentState(BaseModel):
16 """Standardized document state representation.
18 This class provides a consistent way to represent document states across
19 all sources. It includes the essential fields needed for change detection.
20 """
22 uri: str # Universal identifier in format: {source_type}:{source}:{url}
23 content_hash: str # Hash of document content
24 updated_at: datetime # Last update timestamp
26 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
29class StateChangeDetector:
30 """Optimized change detector for document state management.
32 This class provides efficient change detection functionality across
33 all sources with minimal overhead and simplified logic.
34 """
36 def __init__(self, state_manager: StateManager):
37 """Initialize the change detector."""
38 self.logger = LoggingConfig.get_logger(
39 f"qdrant_loader.{self.__class__.__name__}"
40 )
41 self._initialized = False
42 self.state_manager = state_manager
44 async def __aenter__(self):
45 """Async context manager entry."""
46 self._initialized = True
47 return self
49 async def __aexit__(self, exc_type, exc_val, exc_tb):
50 """Async context manager exit."""
51 if exc_type:
52 self.logger.error(
53 "Error in StateChangeDetector context",
54 error_type=exc_type.__name__,
55 error=str(exc_val),
56 )
58 async def detect_changes(
59 self, documents: list[Document], filtered_config: SourcesConfig
60 ) -> dict[str, list[Document]]:
61 """Detect changes in documents efficiently."""
62 if not self._initialized:
63 raise RuntimeError(
64 "StateChangeDetector not initialized. Use as async context manager."
65 )
67 self.logger.info("Starting change detection", document_count=len(documents))
69 # Get current and previous states
70 current_states = [self._get_document_state(doc) for doc in documents]
71 previous_states = await self._get_previous_states(filtered_config)
73 # Create lookup sets/dicts for efficient comparison
74 previous_uris: set[str] = {state.uri for state in previous_states}
75 previous_states_dict: dict[str, DocumentState] = {
76 state.uri: state for state in previous_states
77 }
78 current_uris: set[str] = {state.uri for state in current_states}
80 # Find changes efficiently
81 new_docs = [
82 doc
83 for state, doc in zip(current_states, documents, strict=False)
84 if state.uri not in previous_uris
85 ]
87 updated_docs = [
88 doc
89 for state, doc in zip(current_states, documents, strict=False)
90 if state.uri in previous_states_dict
91 and self._is_document_updated(state, previous_states_dict[state.uri])
92 ]
94 deleted_docs = [
95 self._create_deleted_document(state)
96 for state in previous_states
97 if state.uri not in current_uris
98 ]
100 changes = {
101 "new": new_docs,
102 "updated": updated_docs,
103 "deleted": deleted_docs,
104 }
106 self.logger.info(
107 "Change detection completed",
108 new_count=len(new_docs),
109 updated_count=len(updated_docs),
110 deleted_count=len(deleted_docs),
111 )
113 return changes
115 def _get_document_state(self, document: Document) -> DocumentState:
116 """Get the standardized state of a document."""
117 try:
118 return DocumentState(
119 uri=self._generate_uri_from_document(document),
120 content_hash=document.content_hash,
121 updated_at=document.updated_at,
122 )
123 except Exception as e:
124 raise InvalidDocumentStateError(f"Failed to get document state: {e}") from e
126 def _is_document_updated(
127 self, current_state: DocumentState, previous_state: DocumentState
128 ) -> bool:
129 """Check if a document has been updated."""
130 return (
131 current_state.content_hash != previous_state.content_hash
132 or current_state.updated_at > previous_state.updated_at
133 )
135 def _create_deleted_document(self, document_state: DocumentState) -> Document:
136 """Create a minimal document for a deleted item."""
137 source_type, source, url = document_state.uri.split(":", 2)
138 url = unquote(url)
140 return Document(
141 content="",
142 source=source,
143 source_type=source_type,
144 url=url,
145 title="Deleted Document",
146 metadata={
147 "uri": document_state.uri,
148 "title": "Deleted Document",
149 "updated_at": document_state.updated_at.isoformat(),
150 "content_hash": document_state.content_hash,
151 },
152 )
154 async def _get_previous_states(
155 self, filtered_config: SourcesConfig
156 ) -> list[DocumentState]:
157 """Get previous document states from the state manager efficiently."""
158 previous_states_records: list[DocumentStateRecord] = []
160 # Define source type mappings for cleaner iteration
161 source_mappings = [
162 ("git", filtered_config.git),
163 ("confluence", filtered_config.confluence),
164 ("jira", filtered_config.jira),
165 ("publicdocs", filtered_config.publicdocs),
166 ("localfile", filtered_config.localfile),
167 ]
169 # Process each source type
170 for _source_name, source_configs in source_mappings:
171 if source_configs:
172 for config in source_configs.values():
173 records = await self.state_manager.get_document_state_records(
174 config
175 )
176 previous_states_records.extend(records)
178 # Convert records to states efficiently
179 return [
180 DocumentState(
181 uri=self._generate_uri(
182 record.url, record.source, record.source_type, record.document_id # type: ignore
183 ),
184 content_hash=record.content_hash, # type: ignore
185 updated_at=record.updated_at, # type: ignore
186 )
187 for record in previous_states_records
188 ]
190 def _normalize_url(self, url: str) -> str:
191 """Normalize a URL for consistent hashing."""
192 return quote(url.rstrip("/"), safe="")
194 def _generate_uri_from_document(self, document: Document) -> str:
195 """Generate a URI from a document."""
196 return self._generate_uri(
197 document.url, document.source, document.source_type, document.id
198 )
200 def _generate_uri(
201 self, url: str, source: str, source_type: str, document_id: str
202 ) -> str:
203 """Generate a URI from document components."""
204 return f"{source_type}:{source}:{self._normalize_url(url)}"