Coverage for src/qdrant_loader/core/state/state_change_detector.py: 38%

65 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Base classes for connectors and change detectors.""" 

2 

3from datetime import datetime 

4from urllib.parse import quote, unquote 

5 

6from pydantic import BaseModel, ConfigDict 

7 

8from qdrant_loader.config.sources import SourcesConfig 

9from qdrant_loader.core.document import Document 

10from qdrant_loader.core.state.exceptions import InvalidDocumentStateError 

11from qdrant_loader.core.state.state_manager import DocumentStateRecord, StateManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14 

15class DocumentState(BaseModel): 

16 """Standardized document state representation. 

17 

18 This class provides a consistent way to represent document states across 

19 all sources. It includes the essential fields needed for change detection. 

20 """ 

21 

22 uri: str # Universal identifier in format: {source_type}:{source}:{url} 

23 content_hash: str # Hash of document content 

24 updated_at: datetime # Last update timestamp 

25 

26 model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") 

27 

28 

29class StateChangeDetector: 

30 """Optimized change detector for document state management. 

31 

32 This class provides efficient change detection functionality across 

33 all sources with minimal overhead and simplified logic. 

34 """ 

35 

36 def __init__(self, state_manager: StateManager): 

37 """Initialize the change detector.""" 

38 self.logger = LoggingConfig.get_logger( 

39 f"qdrant_loader.{self.__class__.__name__}" 

40 ) 

41 self._initialized = False 

42 self.state_manager = state_manager 

43 

44 async def __aenter__(self): 

45 """Async context manager entry.""" 

46 self._initialized = True 

47 return self 

48 

49 async def __aexit__(self, exc_type, exc_val, exc_tb): 

50 """Async context manager exit.""" 

51 if exc_type: 

52 self.logger.error( 

53 "Error in StateChangeDetector context", 

54 error_type=exc_type.__name__, 

55 error=str(exc_val), 

56 ) 

57 

58 async def detect_changes( 

59 self, documents: list[Document], filtered_config: SourcesConfig 

60 ) -> dict[str, list[Document]]: 

61 """Detect changes in documents efficiently.""" 

62 if not self._initialized: 

63 raise RuntimeError( 

64 "StateChangeDetector not initialized. Use as async context manager." 

65 ) 

66 

67 self.logger.info("Starting change detection", document_count=len(documents)) 

68 

69 # Get current and previous states 

70 current_states = [self._get_document_state(doc) for doc in documents] 

71 previous_states = await self._get_previous_states(filtered_config) 

72 

73 # Create lookup sets/dicts for efficient comparison 

74 previous_uris: set[str] = {state.uri for state in previous_states} 

75 previous_states_dict: dict[str, DocumentState] = { 

76 state.uri: state for state in previous_states 

77 } 

78 current_uris: set[str] = {state.uri for state in current_states} 

79 

80 # Find changes efficiently 

81 new_docs = [ 

82 doc 

83 for state, doc in zip(current_states, documents, strict=False) 

84 if state.uri not in previous_uris 

85 ] 

86 

87 updated_docs = [ 

88 doc 

89 for state, doc in zip(current_states, documents, strict=False) 

90 if state.uri in previous_states_dict 

91 and self._is_document_updated(state, previous_states_dict[state.uri]) 

92 ] 

93 

94 deleted_docs = [ 

95 self._create_deleted_document(state) 

96 for state in previous_states 

97 if state.uri not in current_uris 

98 ] 

99 

100 changes = { 

101 "new": new_docs, 

102 "updated": updated_docs, 

103 "deleted": deleted_docs, 

104 } 

105 

106 self.logger.info( 

107 "Change detection completed", 

108 new_count=len(new_docs), 

109 updated_count=len(updated_docs), 

110 deleted_count=len(deleted_docs), 

111 ) 

112 

113 return changes 

114 

115 def _get_document_state(self, document: Document) -> DocumentState: 

116 """Get the standardized state of a document.""" 

117 try: 

118 return DocumentState( 

119 uri=self._generate_uri_from_document(document), 

120 content_hash=document.content_hash, 

121 updated_at=document.updated_at, 

122 ) 

123 except Exception as e: 

124 raise InvalidDocumentStateError(f"Failed to get document state: {e}") from e 

125 

126 def _is_document_updated( 

127 self, current_state: DocumentState, previous_state: DocumentState 

128 ) -> bool: 

129 """Check if a document has been updated.""" 

130 return ( 

131 current_state.content_hash != previous_state.content_hash 

132 or current_state.updated_at > previous_state.updated_at 

133 ) 

134 

135 def _create_deleted_document(self, document_state: DocumentState) -> Document: 

136 """Create a minimal document for a deleted item.""" 

137 source_type, source, url = document_state.uri.split(":", 2) 

138 url = unquote(url) 

139 

140 return Document( 

141 content="", 

142 source=source, 

143 source_type=source_type, 

144 url=url, 

145 title="Deleted Document", 

146 metadata={ 

147 "uri": document_state.uri, 

148 "title": "Deleted Document", 

149 "updated_at": document_state.updated_at.isoformat(), 

150 "content_hash": document_state.content_hash, 

151 }, 

152 ) 

153 

154 async def _get_previous_states( 

155 self, filtered_config: SourcesConfig 

156 ) -> list[DocumentState]: 

157 """Get previous document states from the state manager efficiently.""" 

158 previous_states_records: list[DocumentStateRecord] = [] 

159 

160 # Define source type mappings for cleaner iteration 

161 source_mappings = [ 

162 ("git", filtered_config.git), 

163 ("confluence", filtered_config.confluence), 

164 ("jira", filtered_config.jira), 

165 ("publicdocs", filtered_config.publicdocs), 

166 ("localfile", filtered_config.localfile), 

167 ] 

168 

169 # Process each source type 

170 for _source_name, source_configs in source_mappings: 

171 if source_configs: 

172 for config in source_configs.values(): 

173 records = await self.state_manager.get_document_state_records( 

174 config 

175 ) 

176 previous_states_records.extend(records) 

177 

178 # Convert records to states efficiently 

179 return [ 

180 DocumentState( 

181 uri=self._generate_uri( 

182 record.url, record.source, record.source_type, record.document_id # type: ignore 

183 ), 

184 content_hash=record.content_hash, # type: ignore 

185 updated_at=record.updated_at, # type: ignore 

186 ) 

187 for record in previous_states_records 

188 ] 

189 

190 def _normalize_url(self, url: str) -> str: 

191 """Normalize a URL for consistent hashing.""" 

192 return quote(url.rstrip("/"), safe="") 

193 

194 def _generate_uri_from_document(self, document: Document) -> str: 

195 """Generate a URI from a document.""" 

196 return self._generate_uri( 

197 document.url, document.source, document.source_type, document.id 

198 ) 

199 

200 def _generate_uri( 

201 self, url: str, source: str, source_type: str, document_id: str 

202 ) -> str: 

203 """Generate a URI from document components.""" 

204 return f"{source_type}:{source}:{self._normalize_url(url)}"