Coverage for src/qdrant_loader/core/state/document_state_manager.py: 100%

50 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-09 09:14 +0000

1import sqlite3 

2import time 

3from datetime import datetime 

4 

5from qdrant_loader.core.state.state_change_detector import DocumentState 

6 

7 

8class DocumentStateManager: 

9 def __init__(self, logger): 

10 self.logger = logger 

11 

12 def _get_connection(self): 

13 # This method should return a connection to the database 

14 # For the sake of this example, we'll use an in-memory SQLite database 

15 return sqlite3.connect(":memory:") 

16 

17 def update_document_state(self, doc_id: str, state: DocumentState) -> None: 

18 """Update the state of a document. 

19 

20 Args: 

21 doc_id: The ID of the document to update 

22 state: The new state to set 

23 """ 

24 self.logger.debug( 

25 "Updating document state", 

26 extra={ 

27 "doc_id": doc_id, 

28 "uri": state.uri, 

29 "content_hash": state.content_hash, 

30 "updated_at": state.updated_at.isoformat(), 

31 }, 

32 ) 

33 

34 try: 

35 with self._get_connection() as conn: 

36 cursor = conn.cursor() 

37 cursor.execute( 

38 """ 

39 INSERT OR REPLACE INTO document_states (doc_id, uri, content_hash, updated_at) 

40 VALUES (?, ?, ?, ?) 

41 """, 

42 ( 

43 doc_id, 

44 state.uri, 

45 state.content_hash, 

46 state.updated_at.isoformat(), 

47 ), 

48 ) 

49 conn.commit() 

50 self.logger.debug( 

51 "Document state updated successfully", 

52 extra={ 

53 "doc_id": doc_id, 

54 "uri": state.uri, 

55 "content_hash": state.content_hash, 

56 }, 

57 ) 

58 except sqlite3.OperationalError as e: 

59 if "database is locked" in str(e): 

60 self.logger.warning( 

61 "Database is locked, retrying in 1 second", 

62 extra={"doc_id": doc_id, "error": str(e), "retry_count": 0}, 

63 ) 

64 time.sleep(1) 

65 self.update_document_state(doc_id, state) 

66 else: 

67 self.logger.error( 

68 f"Error updating document state: {str(e)}", 

69 extra={ 

70 "doc_id": doc_id, 

71 "error": str(e), 

72 "error_type": type(e).__name__, 

73 }, 

74 ) 

75 raise 

76 except Exception as e: 

77 self.logger.error( 

78 f"Unexpected error updating document state: {str(e)}", 

79 extra={ 

80 "doc_id": doc_id, 

81 "error": str(e), 

82 "error_type": type(e).__name__, 

83 }, 

84 ) 

85 raise 

86 

87 def get_document_state(self, doc_id: str) -> DocumentState | None: 

88 """Get the current state of a document. 

89 

90 Args: 

91 doc_id: The ID of the document to check 

92 

93 Returns: 

94 The current state of the document, or None if not found 

95 """ 

96 self.logger.debug( 

97 "Getting document state", 

98 extra={"doc_id": doc_id, "timestamp": datetime.now().isoformat()}, 

99 ) 

100 

101 try: 

102 with self._get_connection() as conn: 

103 cursor = conn.cursor() 

104 cursor.execute( 

105 "SELECT uri, content_hash, updated_at FROM document_states WHERE doc_id = ?", 

106 (doc_id,), 

107 ) 

108 result = cursor.fetchone() 

109 

110 if result: 

111 state = DocumentState( 

112 uri=result[0], 

113 content_hash=result[1], 

114 updated_at=datetime.fromisoformat(result[2]), 

115 ) 

116 self.logger.debug( 

117 "Document state retrieved", 

118 extra={ 

119 "doc_id": doc_id, 

120 "uri": state.uri, 

121 "content_hash": state.content_hash, 

122 }, 

123 ) 

124 return state 

125 else: 

126 self.logger.debug( 

127 "No state found for document", extra={"doc_id": doc_id} 

128 ) 

129 return None 

130 except sqlite3.OperationalError as e: 

131 if "database is locked" in str(e): 

132 self.logger.warning( 

133 "Database is locked, retrying in 1 second", 

134 extra={"doc_id": doc_id, "error": str(e), "retry_count": 0}, 

135 ) 

136 time.sleep(1) 

137 return self.get_document_state(doc_id) 

138 else: 

139 self.logger.error( 

140 f"Error getting document state: {str(e)}", 

141 extra={ 

142 "doc_id": doc_id, 

143 "error": str(e), 

144 "error_type": type(e).__name__, 

145 }, 

146 ) 

147 raise 

148 except Exception as e: 

149 self.logger.error( 

150 f"Unexpected error getting document state: {str(e)}", 

151 extra={ 

152 "doc_id": doc_id, 

153 "error": str(e), 

154 "error_type": type(e).__name__, 

155 }, 

156 ) 

157 raise