Coverage for src / qdrant_loader / core / state / document_state_manager.py: 100%
50 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1import sqlite3
2import time
3from datetime import datetime
5from qdrant_loader.core.state.state_change_detector import DocumentState
8class DocumentStateManager:
9 def __init__(self, logger):
10 self.logger = logger
12 def _get_connection(self):
13 # This method should return a connection to the database
14 # For the sake of this example, we'll use an in-memory SQLite database
15 return sqlite3.connect(":memory:")
17 def update_document_state(self, doc_id: str, state: DocumentState) -> None:
18 """Update the state of a document.
20 Args:
21 doc_id: The ID of the document to update
22 state: The new state to set
23 """
24 self.logger.debug(
25 "Updating document state",
26 extra={
27 "doc_id": doc_id,
28 "uri": state.uri,
29 "content_hash": state.content_hash,
30 "updated_at": state.updated_at.isoformat(),
31 },
32 )
34 try:
35 with self._get_connection() as conn:
36 cursor = conn.cursor()
37 cursor.execute(
38 """
39 INSERT OR REPLACE INTO document_states (doc_id, uri, content_hash, updated_at)
40 VALUES (?, ?, ?, ?)
41 """,
42 (
43 doc_id,
44 state.uri,
45 state.content_hash,
46 state.updated_at.isoformat(),
47 ),
48 )
49 conn.commit()
50 self.logger.debug(
51 "Document state updated successfully",
52 extra={
53 "doc_id": doc_id,
54 "uri": state.uri,
55 "content_hash": state.content_hash,
56 },
57 )
58 except sqlite3.OperationalError as e:
59 if "database is locked" in str(e):
60 self.logger.warning(
61 "Database is locked, retrying in 1 second",
62 extra={"doc_id": doc_id, "error": str(e), "retry_count": 0},
63 )
64 time.sleep(1)
65 self.update_document_state(doc_id, state)
66 else:
67 self.logger.error(
68 f"Error updating document state: {str(e)}",
69 extra={
70 "doc_id": doc_id,
71 "error": str(e),
72 "error_type": type(e).__name__,
73 },
74 )
75 raise
76 except Exception as e:
77 self.logger.error(
78 f"Unexpected error updating document state: {str(e)}",
79 extra={
80 "doc_id": doc_id,
81 "error": str(e),
82 "error_type": type(e).__name__,
83 },
84 )
85 raise
87 def get_document_state(self, doc_id: str) -> DocumentState | None:
88 """Get the current state of a document.
90 Args:
91 doc_id: The ID of the document to check
93 Returns:
94 The current state of the document, or None if not found
95 """
96 self.logger.debug(
97 "Getting document state",
98 extra={"doc_id": doc_id, "timestamp": datetime.now().isoformat()},
99 )
101 try:
102 with self._get_connection() as conn:
103 cursor = conn.cursor()
104 cursor.execute(
105 "SELECT uri, content_hash, updated_at FROM document_states WHERE doc_id = ?",
106 (doc_id,),
107 )
108 result = cursor.fetchone()
110 if result:
111 state = DocumentState(
112 document_id=doc_id,
113 uri=result[0],
114 content_hash=result[1],
115 updated_at=datetime.fromisoformat(result[2]),
116 )
117 self.logger.debug(
118 "Document state retrieved",
119 extra={
120 "doc_id": doc_id,
121 "uri": state.uri,
122 "content_hash": state.content_hash,
123 },
124 )
125 return state
126 else:
127 self.logger.debug(
128 "No state found for document", extra={"doc_id": doc_id}
129 )
130 return None
131 except sqlite3.OperationalError as e:
132 if "database is locked" in str(e):
133 self.logger.warning(
134 "Database is locked, retrying in 1 second",
135 extra={"doc_id": doc_id, "error": str(e), "retry_count": 0},
136 )
137 time.sleep(1)
138 return self.get_document_state(doc_id)
139 else:
140 self.logger.error(
141 f"Error getting document state: {str(e)}",
142 extra={
143 "doc_id": doc_id,
144 "error": str(e),
145 "error_type": type(e).__name__,
146 },
147 )
148 raise
149 except Exception as e:
150 self.logger.error(
151 f"Unexpected error getting document state: {str(e)}",
152 extra={
153 "doc_id": doc_id,
154 "error": str(e),
155 "error_type": type(e).__name__,
156 },
157 )
158 raise