Coverage for src/qdrant_loader/core/state/document_state_manager.py: 100%
50 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-09 09:14 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-09 09:14 +0000
1import sqlite3
2import time
3from datetime import datetime
5from qdrant_loader.core.state.state_change_detector import DocumentState
8class DocumentStateManager:
9 def __init__(self, logger):
10 self.logger = logger
12 def _get_connection(self):
13 # This method should return a connection to the database
14 # For the sake of this example, we'll use an in-memory SQLite database
15 return sqlite3.connect(":memory:")
17 def update_document_state(self, doc_id: str, state: DocumentState) -> None:
18 """Update the state of a document.
20 Args:
21 doc_id: The ID of the document to update
22 state: The new state to set
23 """
24 self.logger.debug(
25 "Updating document state",
26 extra={
27 "doc_id": doc_id,
28 "uri": state.uri,
29 "content_hash": state.content_hash,
30 "updated_at": state.updated_at.isoformat(),
31 },
32 )
34 try:
35 with self._get_connection() as conn:
36 cursor = conn.cursor()
37 cursor.execute(
38 """
39 INSERT OR REPLACE INTO document_states (doc_id, uri, content_hash, updated_at)
40 VALUES (?, ?, ?, ?)
41 """,
42 (
43 doc_id,
44 state.uri,
45 state.content_hash,
46 state.updated_at.isoformat(),
47 ),
48 )
49 conn.commit()
50 self.logger.debug(
51 "Document state updated successfully",
52 extra={
53 "doc_id": doc_id,
54 "uri": state.uri,
55 "content_hash": state.content_hash,
56 },
57 )
58 except sqlite3.OperationalError as e:
59 if "database is locked" in str(e):
60 self.logger.warning(
61 "Database is locked, retrying in 1 second",
62 extra={"doc_id": doc_id, "error": str(e), "retry_count": 0},
63 )
64 time.sleep(1)
65 self.update_document_state(doc_id, state)
66 else:
67 self.logger.error(
68 f"Error updating document state: {str(e)}",
69 extra={
70 "doc_id": doc_id,
71 "error": str(e),
72 "error_type": type(e).__name__,
73 },
74 )
75 raise
76 except Exception as e:
77 self.logger.error(
78 f"Unexpected error updating document state: {str(e)}",
79 extra={
80 "doc_id": doc_id,
81 "error": str(e),
82 "error_type": type(e).__name__,
83 },
84 )
85 raise
87 def get_document_state(self, doc_id: str) -> DocumentState | None:
88 """Get the current state of a document.
90 Args:
91 doc_id: The ID of the document to check
93 Returns:
94 The current state of the document, or None if not found
95 """
96 self.logger.debug(
97 "Getting document state",
98 extra={"doc_id": doc_id, "timestamp": datetime.now().isoformat()},
99 )
101 try:
102 with self._get_connection() as conn:
103 cursor = conn.cursor()
104 cursor.execute(
105 "SELECT uri, content_hash, updated_at FROM document_states WHERE doc_id = ?",
106 (doc_id,),
107 )
108 result = cursor.fetchone()
110 if result:
111 state = DocumentState(
112 uri=result[0],
113 content_hash=result[1],
114 updated_at=datetime.fromisoformat(result[2]),
115 )
116 self.logger.debug(
117 "Document state retrieved",
118 extra={
119 "doc_id": doc_id,
120 "uri": state.uri,
121 "content_hash": state.content_hash,
122 },
123 )
124 return state
125 else:
126 self.logger.debug(
127 "No state found for document", extra={"doc_id": doc_id}
128 )
129 return None
130 except sqlite3.OperationalError as e:
131 if "database is locked" in str(e):
132 self.logger.warning(
133 "Database is locked, retrying in 1 second",
134 extra={"doc_id": doc_id, "error": str(e), "retry_count": 0},
135 )
136 time.sleep(1)
137 return self.get_document_state(doc_id)
138 else:
139 self.logger.error(
140 f"Error getting document state: {str(e)}",
141 extra={
142 "doc_id": doc_id,
143 "error": str(e),
144 "error_type": type(e).__name__,
145 },
146 )
147 raise
148 except Exception as e:
149 self.logger.error(
150 f"Unexpected error getting document state: {str(e)}",
151 extra={
152 "doc_id": doc_id,
153 "error": str(e),
154 "error_type": type(e).__name__,
155 },
156 )
157 raise