Coverage for src/qdrant_loader/core/state/state_manager.py: 64%
174 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""
2State management service for tracking document ingestion state.
3"""
5from datetime import datetime
6from typing import TYPE_CHECKING
8from sqlalchemy import func, select
10from qdrant_loader.config.source_config import SourceConfig
11from qdrant_loader.config.state import IngestionStatus, StateManagementConfig
12from qdrant_loader.core.document import Document
13from qdrant_loader.core.state import transitions as _transitions
14from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory
15from qdrant_loader.core.state.session import create_tables as _create_tables
16from qdrant_loader.core.state.session import dispose_engine as _dispose_engine
17from qdrant_loader.core.state.session import (
18 initialize_engine_and_session as _init_engine_session,
19)
20from qdrant_loader.core.state.utils import generate_sqlite_aiosqlite_url as _gen_url
21from qdrant_loader.utils.logging import LoggingConfig
23if TYPE_CHECKING:
24 from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
26logger = LoggingConfig.get_logger(__name__)
29class StateManager:
30 """Manages state for document ingestion."""
32 def __init__(self, config: StateManagementConfig):
33 """Initialize the state manager with configuration."""
34 self.config = config
35 self._initialized = False
36 self._engine: AsyncEngine | None = None
37 self._session_factory: async_sessionmaker[AsyncSession] | None = None
38 self.logger = LoggingConfig.get_logger(__name__)
40 @property
41 def is_initialized(self) -> bool:
42 """Public accessor for initialization state used by callers/tests."""
43 return self._initialized
45 async def get_session(self) -> "AsyncSession":
46 """Return an async session context manager, initializing if needed.
48 This method allows callers to use:
49 async with await state_manager.get_session() as session:
50 ...
51 """
52 if not self._initialized:
53 await self.initialize()
54 if self._session_factory is None:
55 raise RuntimeError("State manager session factory is not available")
56 return self._session_factory()
58 async def create_session(self) -> "AsyncSession":
59 """Alias for get_session for backward compatibility."""
60 return await self.get_session()
62 async def __aenter__(self):
63 """Async context manager entry."""
64 self.logger.debug("=== StateManager.__aenter__() called ===")
65 self.logger.debug(f"Current initialization state: {self._initialized}")
67 # Initialize if not already initialized
68 if not self._initialized:
69 self.logger.debug("StateManager not initialized, calling initialize()")
70 await self.initialize()
71 else:
72 self.logger.debug("StateManager already initialized")
74 return self
76 async def __aexit__(self, exc_type, exc_val, _exc_tb):
77 """Async context manager exit."""
78 await self.dispose()
80 async def initialize(self) -> None:
81 """Initialize the database and create tables if they don't exist."""
82 if self._initialized:
83 self.logger.debug("StateManager already initialized, skipping")
84 return
86 try:
87 self.logger.debug("Starting StateManager initialization")
89 # Process database path with enhanced Windows debugging
90 db_path_str = self.config.database_path
91 self.logger.debug(f"Original database path: {db_path_str}")
93 # Handle special databases and generate URL
94 database_url = _gen_url(db_path_str)
95 self.logger.debug(f"Generated database URL: {database_url}")
97 # Create database engine and session factory
98 self.logger.debug("Creating database engine and session factory")
99 self._engine, self._session_factory = _init_engine_session(self.config)
100 self.logger.debug("Engine and session factory created successfully")
102 # Create tables
103 self.logger.debug("Creating database tables")
104 await _create_tables(self._engine)
105 self.logger.debug("Database tables created successfully")
107 self._initialized = True
108 self.logger.debug("StateManager initialization completed successfully")
110 except Exception as e:
111 self.logger.error(f"StateManager initialization failed: {e}", exc_info=True)
112 # Ensure we clean up any partial initialization
113 if hasattr(self, "_engine") and self._engine:
114 try:
115 await _dispose_engine(self._engine)
116 except Exception as cleanup_error:
117 self.logger.error(
118 f"Failed to cleanup engine during error handling: {cleanup_error}"
119 )
120 self._initialized = False
121 raise
123 async def dispose(self):
124 """Clean up resources."""
125 if self._engine:
126 self.logger.debug("Disposing database engine")
127 await _dispose_engine(self._engine)
128 self._engine = None
129 self._session_factory = None
130 self._initialized = False
131 self.logger.debug("StateManager resources disposed")
133 async def update_last_ingestion(
134 self,
135 source_type: str,
136 source: str,
137 status: str = IngestionStatus.SUCCESS,
138 error_message: str | None = None,
139 document_count: int = 0,
140 project_id: str | None = None,
141 ) -> None:
142 """Update and get the last successful ingestion time for a source."""
143 self.logger.debug(
144 f"Updating last ingestion for {source_type}:{source} (project: {project_id})"
145 )
146 try:
147 await _transitions.update_last_ingestion(
148 self._session_factory, # type: ignore[arg-type]
149 source_type=source_type,
150 source=source,
151 status=status,
152 error_message=error_message,
153 document_count=document_count,
154 project_id=project_id,
155 )
156 except Exception as e:
157 self.logger.error(
158 f"Error updating last ingestion for {source_type}:{source}: {str(e)}",
159 exc_info=True,
160 )
161 raise
163 async def get_last_ingestion(
164 self, source_type: str, source: str, project_id: str | None = None
165 ) -> IngestionHistory | None:
166 """Get the last ingestion record for a source."""
167 self.logger.debug(
168 f"Getting last ingestion for {source_type}:{source} (project: {project_id})"
169 )
170 try:
171 return await _transitions.get_last_ingestion(
172 self._session_factory, # type: ignore[arg-type]
173 source_type=source_type,
174 source=source,
175 project_id=project_id,
176 )
177 except Exception as e:
178 self.logger.error(
179 f"Error getting last ingestion for {source_type}:{source}: {str(e)}",
180 exc_info=True,
181 )
182 raise
184 async def get_project_document_count(self, project_id: str) -> int:
185 """Get the count of non-deleted documents for a project.
187 Returns 0 on failure to avoid breaking CLI status output.
188 """
189 try:
190 session_factory = getattr(self, "_session_factory", None)
191 if session_factory is None:
192 ctx = await self.get_session()
193 else:
194 ctx = (
195 session_factory() if callable(session_factory) else session_factory
196 )
197 async with ctx as session: # type: ignore
198 result = await session.execute(
199 select(func.count(DocumentStateRecord.id))
200 .filter_by(project_id=project_id)
201 .filter_by(is_deleted=False)
202 )
203 count = result.scalar() or 0
204 return count
205 except Exception as e: # pragma: no cover - fallback path
206 self.logger.error(
207 f"Error getting project document count for {project_id}: {str(e)}",
208 exc_info=True,
209 )
210 return 0
212 async def get_project_latest_ingestion(self, project_id: str) -> str | None:
213 """Get the latest ingestion timestamp (ISO) for a project.
215 Returns None on failure or when no ingestion exists.
216 """
217 try:
218 session_factory = getattr(self, "_session_factory", None)
219 if session_factory is None:
220 ctx = await self.get_session()
221 else:
222 ctx = (
223 session_factory() if callable(session_factory) else session_factory
224 )
225 async with ctx as session: # type: ignore
226 result = await session.execute(
227 select(IngestionHistory.last_successful_ingestion)
228 .filter_by(project_id=project_id)
229 .order_by(IngestionHistory.last_successful_ingestion.desc())
230 .limit(1)
231 )
232 timestamp = result.scalar_one_or_none()
233 return timestamp.isoformat() if timestamp else None
234 except Exception as e: # pragma: no cover - fallback path
235 self.logger.error(
236 f"Error getting project latest ingestion for {project_id}: {str(e)}",
237 exc_info=True,
238 )
239 return None
241 async def mark_document_deleted(
242 self,
243 source_type: str,
244 source: str,
245 document_id: str,
246 project_id: str | None = None,
247 ) -> None:
248 """Mark a document as deleted."""
249 self.logger.debug(
250 f"Marking document as deleted: {source_type}:{source}:{document_id} (project: {project_id})"
251 )
252 try:
253 await _transitions.mark_document_deleted(
254 self._session_factory, # type: ignore[arg-type]
255 source_type=source_type,
256 source=source,
257 document_id=document_id,
258 project_id=project_id,
259 )
260 except Exception as e:
261 self.logger.error(
262 f"Error marking document as deleted {source_type}:{source}:{document_id}: {str(e)}",
263 exc_info=True,
264 )
265 raise
267 async def get_document_state_record(
268 self,
269 source_type: str,
270 source: str,
271 document_id: str,
272 project_id: str | None = None,
273 ) -> DocumentStateRecord | None:
274 """Get the state of a document."""
275 self.logger.debug(
276 f"Getting document state for {source_type}:{source}:{document_id} (project: {project_id})"
277 )
278 try:
279 return await _transitions.get_document_state_record(
280 self._session_factory, # type: ignore[arg-type]
281 source_type=source_type,
282 source=source,
283 document_id=document_id,
284 project_id=project_id,
285 )
286 except Exception as e:
287 self.logger.error(
288 f"Error getting document state for {source_type}:{source}:{document_id}: {str(e)}",
289 exc_info=True,
290 )
291 raise
293 async def get_document_state_records(
294 self, source_config: SourceConfig, since: datetime | None = None
295 ) -> list[DocumentStateRecord]:
296 """Get all document states for a source, optionally filtered by date."""
297 self.logger.debug(
298 f"Getting document state records for {source_config.source_type}:{source_config.source}"
299 )
300 try:
301 return await _transitions.get_document_state_records(
302 self._session_factory, # type: ignore[arg-type]
303 source_type=source_config.source_type,
304 source=source_config.source,
305 since=since,
306 )
307 except Exception as e:
308 self.logger.error(
309 f"Error getting document state records for {source_config.source_type}:{source_config.source}: {str(e)}",
310 exc_info=True,
311 )
312 raise
314 async def update_document_state(
315 self, document: Document, project_id: str | None = None
316 ) -> DocumentStateRecord:
317 """Update the state of a document."""
318 if not self._initialized:
319 raise RuntimeError("StateManager not initialized. Call initialize() first.")
321 self.logger.debug(
322 f"Updating document state for {document.source_type}:{document.source}:{document.id} (project: {project_id})"
323 )
324 try:
325 return await _transitions.update_document_state(
326 self._session_factory, # type: ignore[arg-type]
327 document=document,
328 project_id=project_id,
329 )
330 except Exception as e:
331 self.logger.error(
332 "Failed to update document state",
333 extra={
334 "project_id": project_id,
335 "document_id": document.id,
336 "error": str(e),
337 "error_type": type(e).__name__,
338 },
339 )
340 raise
342 async def update_conversion_metrics(
343 self,
344 source_type: str,
345 source: str,
346 converted_files_count: int = 0,
347 conversion_failures_count: int = 0,
348 attachments_processed_count: int = 0,
349 total_conversion_time: float = 0.0,
350 ) -> None:
351 """Update file conversion metrics for a source."""
352 self.logger.debug(f"Updating conversion metrics for {source_type}:{source}")
353 try:
354 await _transitions.update_conversion_metrics(
355 self._session_factory, # type: ignore[arg-type]
356 source_type=source_type,
357 source=source,
358 converted_files_count=converted_files_count,
359 conversion_failures_count=conversion_failures_count,
360 attachments_processed_count=attachments_processed_count,
361 total_conversion_time=total_conversion_time,
362 )
363 except Exception as e:
364 self.logger.error(
365 f"Error updating conversion metrics for {source_type}:{source}: {str(e)}",
366 exc_info=True,
367 )
368 raise
370 async def get_conversion_metrics(
371 self, source_type: str, source: str
372 ) -> dict[str, int | float]:
373 """Get file conversion metrics for a source."""
374 self.logger.debug(f"Getting conversion metrics for {source_type}:{source}")
375 try:
376 return await _transitions.get_conversion_metrics(
377 self._session_factory, # type: ignore[arg-type]
378 source_type=source_type,
379 source=source,
380 )
381 except Exception as e:
382 self.logger.error(
383 f"Error getting conversion metrics for {source_type}:{source}: {str(e)}",
384 exc_info=True,
385 )
386 raise
388 async def get_attachment_documents(
389 self, parent_document_id: str
390 ) -> list[DocumentStateRecord]:
391 """Get all attachment documents for a parent document."""
392 self.logger.debug(
393 f"Getting attachment documents for parent {parent_document_id}"
394 )
395 try:
396 return await _transitions.get_attachment_documents(
397 self._session_factory, # type: ignore[arg-type]
398 parent_document_id=parent_document_id,
399 )
400 except Exception as e:
401 self.logger.error(
402 f"Error getting attachment documents for {parent_document_id}: {str(e)}",
403 exc_info=True,
404 )
405 raise
407 async def get_converted_documents(
408 self, source_type: str, source: str, conversion_method: str | None = None
409 ) -> list[DocumentStateRecord]:
410 """Get all converted documents for a source, optionally filtered by conversion method."""
411 self.logger.debug(f"Getting converted documents for {source_type}:{source}")
412 try:
413 return await _transitions.get_converted_documents(
414 self._session_factory, # type: ignore[arg-type]
415 source_type=source_type,
416 source=source,
417 conversion_method=conversion_method,
418 )
419 except Exception as e:
420 self.logger.error(
421 f"Error getting converted documents for {source_type}:{source}: {str(e)}",
422 exc_info=True,
423 )
424 raise
426 async def close(self):
427 """Close all database connections."""
428 if hasattr(self, "_engine") and self._engine is not None:
429 self.logger.debug("Closing database connections")
430 await _dispose_engine(self._engine)
431 self.logger.debug("Database connections closed")