Coverage for src / qdrant_loader / core / state / document_state_manager.py: 100%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1import sqlite3 

2import time 

3from datetime import datetime 

4 

5from qdrant_loader.core.state.state_change_detector import DocumentState 

6 

7 

8class DocumentStateManager: 

9 def __init__(self, logger): 

10 self.logger = logger 

11 

12 def _get_connection(self): 

13 # This method should return a connection to the database 

14 # For the sake of this example, we'll use an in-memory SQLite database 

15 return sqlite3.connect(":memory:") 

16 

17 def update_document_state(self, doc_id: str, state: DocumentState) -> None: 

18 """Update the state of a document. 

19 

20 Args: 

21 doc_id: The ID of the document to update 

22 state: The new state to set 

23 """ 

24 self.logger.debug( 

25 "Updating document state", 

26 extra={ 

27 "doc_id": doc_id, 

28 "uri": state.uri, 

29 "content_hash": state.content_hash, 

30 "updated_at": state.updated_at.isoformat(), 

31 }, 

32 ) 

33 

34 try: 

35 with self._get_connection() as conn: 

36 cursor = conn.cursor() 

37 cursor.execute( 

38 """ 

39 INSERT OR REPLACE INTO document_states (doc_id, uri, content_hash, updated_at) 

40 VALUES (?, ?, ?, ?) 

41 """, 

42 ( 

43 doc_id, 

44 state.uri, 

45 state.content_hash, 

46 state.updated_at.isoformat(), 

47 ), 

48 ) 

49 conn.commit() 

50 self.logger.debug( 

51 "Document state updated successfully", 

52 extra={ 

53 "doc_id": doc_id, 

54 "uri": state.uri, 

55 "content_hash": state.content_hash, 

56 }, 

57 ) 

58 except sqlite3.OperationalError as e: 

59 if "database is locked" in str(e): 

60 self.logger.warning( 

61 "Database is locked, retrying in 1 second", 

62 extra={"doc_id": doc_id, "error": str(e), "retry_count": 0}, 

63 ) 

64 time.sleep(1) 

65 self.update_document_state(doc_id, state) 

66 else: 

67 self.logger.error( 

68 f"Error updating document state: {str(e)}", 

69 extra={ 

70 "doc_id": doc_id, 

71 "error": str(e), 

72 "error_type": type(e).__name__, 

73 }, 

74 ) 

75 raise 

76 except Exception as e: 

77 self.logger.error( 

78 f"Unexpected error updating document state: {str(e)}", 

79 extra={ 

80 "doc_id": doc_id, 

81 "error": str(e), 

82 "error_type": type(e).__name__, 

83 }, 

84 ) 

85 raise 

86 

87 def get_document_state(self, doc_id: str) -> DocumentState | None: 

88 """Get the current state of a document. 

89 

90 Args: 

91 doc_id: The ID of the document to check 

92 

93 Returns: 

94 The current state of the document, or None if not found 

95 """ 

96 self.logger.debug( 

97 "Getting document state", 

98 extra={"doc_id": doc_id, "timestamp": datetime.now().isoformat()}, 

99 ) 

100 

101 try: 

102 with self._get_connection() as conn: 

103 cursor = conn.cursor() 

104 cursor.execute( 

105 "SELECT uri, content_hash, updated_at FROM document_states WHERE doc_id = ?", 

106 (doc_id,), 

107 ) 

108 result = cursor.fetchone() 

109 

110 if result: 

111 state = DocumentState( 

112 document_id=doc_id, 

113 uri=result[0], 

114 content_hash=result[1], 

115 updated_at=datetime.fromisoformat(result[2]), 

116 ) 

117 self.logger.debug( 

118 "Document state retrieved", 

119 extra={ 

120 "doc_id": doc_id, 

121 "uri": state.uri, 

122 "content_hash": state.content_hash, 

123 }, 

124 ) 

125 return state 

126 else: 

127 self.logger.debug( 

128 "No state found for document", extra={"doc_id": doc_id} 

129 ) 

130 return None 

131 except sqlite3.OperationalError as e: 

132 if "database is locked" in str(e): 

133 self.logger.warning( 

134 "Database is locked, retrying in 1 second", 

135 extra={"doc_id": doc_id, "error": str(e), "retry_count": 0}, 

136 ) 

137 time.sleep(1) 

138 return self.get_document_state(doc_id) 

139 else: 

140 self.logger.error( 

141 f"Error getting document state: {str(e)}", 

142 extra={ 

143 "doc_id": doc_id, 

144 "error": str(e), 

145 "error_type": type(e).__name__, 

146 }, 

147 ) 

148 raise 

149 except Exception as e: 

150 self.logger.error( 

151 f"Unexpected error getting document state: {str(e)}", 

152 extra={ 

153 "doc_id": doc_id, 

154 "error": str(e), 

155 "error_type": type(e).__name__, 

156 }, 

157 ) 

158 raise