Coverage for src / qdrant_loader / core / state / state_manager.py: 53%
224 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""
2State management service for tracking document ingestion state.
3"""
5from datetime import UTC, datetime
6from typing import TYPE_CHECKING
8from sqlalchemy import func, select
10from qdrant_loader.config.source_config import SourceConfig
11from qdrant_loader.config.state import IngestionStatus, StateManagementConfig
12from qdrant_loader.core.document import Document
13from qdrant_loader.core.state import transitions as _transitions
14from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory
15from qdrant_loader.core.state.session import create_tables as _create_tables
16from qdrant_loader.core.state.session import dispose_engine as _dispose_engine
17from qdrant_loader.core.state.session import (
18 initialize_engine_and_session as _init_engine_session,
19)
20from qdrant_loader.core.state.utils import generate_sqlite_aiosqlite_url as _gen_url
21from qdrant_loader.utils.logging import LoggingConfig
23if TYPE_CHECKING:
24 from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
26logger = LoggingConfig.get_logger(__name__)
29class StateManager:
30 """Manages state for document ingestion."""
32 def __init__(self, config: StateManagementConfig):
33 """Initialize the state manager with configuration."""
34 self.config = config
35 self._initialized = False
36 self._engine: AsyncEngine | None = None
37 self._session_factory: async_sessionmaker[AsyncSession] | None = None
38 self.logger = LoggingConfig.get_logger(__name__)
40 @property
41 def is_initialized(self) -> bool:
42 """Public accessor for initialization state used by callers/tests."""
43 return self._initialized
45 @property
46 def session_factory(self) -> "async_sessionmaker[AsyncSession]":
47 """Return session factory if initialized, else raise a clear error."""
48 if self._session_factory is None:
49 raise RuntimeError("State manager session factory is not initialized")
50 return self._session_factory
52 async def get_session(self) -> "AsyncSession":
53 """Return an async session context manager, initializing if needed.
55 This method allows callers to use:
56 async with await state_manager.get_session() as session:
57 ...
58 """
59 if not self._initialized:
60 await self.initialize()
61 if self._session_factory is None:
62 raise RuntimeError("State manager session factory is not available")
63 return self._session_factory()
65 async def create_session(self) -> "AsyncSession":
66 """Alias for get_session for backward compatibility."""
67 return await self.get_session()
69 async def __aenter__(self):
70 """Async context manager entry."""
71 self.logger.debug("=== StateManager.__aenter__() called ===")
72 self.logger.debug(f"Current initialization state: {self._initialized}")
74 # Initialize if not already initialized
75 if not self._initialized:
76 self.logger.debug("StateManager not initialized, calling initialize()")
77 await self.initialize()
78 else:
79 self.logger.debug("StateManager already initialized")
81 return self
83 async def __aexit__(self, exc_type, exc_val, _exc_tb):
84 """Async context manager exit."""
85 await self.dispose()
87 async def initialize(self) -> None:
88 """Initialize the database and create tables if they don't exist."""
89 if self._initialized:
90 self.logger.debug("StateManager already initialized, skipping")
91 return
93 try:
94 self.logger.debug("Starting StateManager initialization")
96 # Process database path with enhanced Windows debugging
97 db_path_str = self.config.database_path
98 self.logger.debug(f"Original database path: {db_path_str}")
100 # Handle special databases and generate URL
101 database_url = _gen_url(db_path_str)
102 self.logger.debug(f"Generated database URL: {database_url}")
104 # Create database engine and session factory
105 self.logger.debug("Creating database engine and session factory")
106 self._engine, self._session_factory = _init_engine_session(self.config)
107 self.logger.debug("Engine and session factory created successfully")
109 # Create tables
110 self.logger.debug("Creating database tables")
111 await _create_tables(self._engine)
112 self.logger.debug("Database tables created successfully")
114 self._initialized = True
115 self.logger.debug("StateManager initialization completed successfully")
117 except Exception as e:
118 self.logger.error(f"StateManager initialization failed: {e}", exc_info=True)
119 # Ensure we clean up any partial initialization
120 if hasattr(self, "_engine") and self._engine:
121 try:
122 await _dispose_engine(self._engine)
123 except Exception as cleanup_error:
124 self.logger.error(
125 f"Failed to cleanup engine during error handling: {cleanup_error}"
126 )
127 self._initialized = False
128 raise
130 async def dispose(self):
131 """Clean up resources."""
132 if self._engine:
133 self.logger.debug("Disposing database engine")
134 await _dispose_engine(self._engine)
135 self._engine = None
136 self._session_factory = None
137 self._initialized = False
138 self.logger.debug("StateManager resources disposed")
140 async def update_last_ingestion(
141 self,
142 source_type: str,
143 source: str,
144 status: str = IngestionStatus.SUCCESS,
145 error_message: str | None = None,
146 document_count: int = 0,
147 project_id: str | None = None,
148 ) -> None:
149 """Update and get the last successful ingestion time for a source."""
150 self.logger.debug(
151 f"Updating last ingestion for {source_type}:{source} (project: {project_id})"
152 )
153 try:
154 await _transitions.update_last_ingestion(
155 self._session_factory, # type: ignore[arg-type]
156 source_type=source_type,
157 source=source,
158 status=status,
159 error_message=error_message,
160 document_count=document_count,
161 project_id=project_id,
162 )
163 except Exception as e:
164 self.logger.error(
165 f"Error updating last ingestion for {source_type}:{source}: {str(e)}",
166 exc_info=True,
167 )
168 raise
170 async def get_last_ingestion(
171 self, source_type: str, source: str, project_id: str | None = None
172 ) -> IngestionHistory | None:
173 """Get the last ingestion record for a source."""
174 self.logger.debug(
175 f"Getting last ingestion for {source_type}:{source} (project: {project_id})"
176 )
177 try:
178 return await _transitions.get_last_ingestion(
179 self._session_factory, # type: ignore[arg-type]
180 source_type=source_type,
181 source=source,
182 project_id=project_id,
183 )
184 except Exception as e:
185 self.logger.error(
186 f"Error getting last ingestion for {source_type}:{source}: {str(e)}",
187 exc_info=True,
188 )
189 raise
191 async def get_project_document_count(self, project_id: str) -> int:
192 """Get the count of non-deleted documents for a project.
194 Returns 0 on failure to avoid breaking CLI status output.
195 """
196 try:
197 session_factory = getattr(self, "_session_factory", None)
198 if session_factory is None:
199 ctx = await self.get_session()
200 else:
201 ctx = (
202 session_factory() if callable(session_factory) else session_factory
203 )
204 async with ctx as session: # type: ignore
205 result = await session.execute(
206 select(func.count(DocumentStateRecord.id))
207 .filter_by(project_id=project_id)
208 .filter_by(is_deleted=False)
209 )
210 count = result.scalar() or 0
211 return count
212 except Exception as e: # pragma: no cover - fallback path
213 self.logger.error(
214 f"Error getting project document count for {project_id}: {str(e)}",
215 exc_info=True,
216 )
217 return 0
219 async def get_project_latest_ingestion(self, project_id: str) -> str | None:
220 """Get the latest ingestion timestamp (ISO) for a project.
222 Returns None on failure or when no ingestion exists.
223 """
224 try:
225 session_factory = getattr(self, "_session_factory", None)
226 if session_factory is None:
227 ctx = await self.get_session()
228 else:
229 ctx = (
230 session_factory() if callable(session_factory) else session_factory
231 )
232 async with ctx as session: # type: ignore
233 result = await session.execute(
234 select(IngestionHistory.last_successful_ingestion)
235 .filter_by(project_id=project_id)
236 .order_by(IngestionHistory.last_successful_ingestion.desc())
237 .limit(1)
238 )
239 timestamp = result.scalar_one_or_none()
240 return timestamp.isoformat() if timestamp else None
241 except Exception as e: # pragma: no cover - fallback path
242 self.logger.error(
243 f"Error getting project latest ingestion for {project_id}: {str(e)}",
244 exc_info=True,
245 )
246 return None
248 async def mark_document_deleted(
249 self,
250 source_type: str,
251 source: str,
252 document_id: str,
253 project_id: str | None = None,
254 ) -> None:
255 """Mark a document as deleted."""
256 self.logger.debug(
257 f"Marking document as deleted: {source_type}:{source}:{document_id} (project: {project_id})"
258 )
259 try:
260 await _transitions.mark_document_deleted(
261 self._session_factory, # type: ignore[arg-type]
262 source_type=source_type,
263 source=source,
264 document_id=document_id,
265 project_id=project_id,
266 )
267 except Exception as e:
268 self.logger.error(
269 f"Error marking document as deleted {source_type}:{source}:{document_id}: {str(e)}",
270 exc_info=True,
271 )
272 raise
274 async def mark_documents_deleted_atomic(
275 self,
276 deleted_documents: list["Document"],
277 qdrant_manager,
278 project_id: str | None = None,
279 ) -> list[str]:
280 """Atomically mark documents as deleted in state and delete their points in Qdrant.
282 Transaction ordering:
283 1. Mark state records as is_deleted=True and COMMIT to DB immediately
284 2. Delete points from Qdrant (separate operation, outside DB transaction)
285 3. If Qdrant delete fails, vectors remain orphaned but state is correctly marked
286 (orphan vectors are recoverable by re-running delete; orphan state is silent corruption)
288 Returns the list of document IDs that were marked and deleted.
290 WS-3 DESIGN NOTE: Current design prefers orphan vectors (safe, recoverable) over orphan
291 state (dangerous, silent). If Qdrant fails to delete, the operation should be re-enqueued
292 for retry as an idempotent operation.
293 """
294 if not self._initialized:
295 raise RuntimeError("StateManager not initialized. Call initialize() first.")
297 session_factory = getattr(self, "_session_factory", None)
298 if session_factory is None:
299 raise RuntimeError("State manager session factory is not available")
301 document_ids_to_delete: list[str] = []
303 # STEP 1: Commit state changes to DB first
304 async with session_factory() as session: # type: ignore
305 tx = await session.begin()
306 try:
307 now = datetime.now(UTC)
308 for doc in deleted_documents:
309 query = select(DocumentStateRecord).filter(
310 DocumentStateRecord.source_type == doc.source_type,
311 DocumentStateRecord.source == doc.source,
312 DocumentStateRecord.document_id == doc.id,
313 )
314 if project_id is not None:
315 query = query.filter(
316 DocumentStateRecord.project_id == project_id
317 )
318 result = await session.execute(query)
319 state = result.scalar_one_or_none()
320 if state:
321 state.is_deleted = True # type: ignore
322 state.updated_at = now # type: ignore
323 document_ids_to_delete.append(doc.id)
325 # Commit state changes immediately
326 await tx.commit()
327 self.logger.info(
328 f"Marked {len(document_ids_to_delete)} documents as deleted in state DB"
329 )
330 except Exception as e:
331 # Rollback on any DB error
332 try:
333 await tx.rollback()
334 except Exception as rb_err:
335 self.logger.error(
336 f"Failed to rollback transaction after error: {rb_err}",
337 exc_info=True,
338 )
339 self.logger.error(
340 f"Failed to mark documents deleted in DB: {str(e)}",
341 exc_info=True,
342 )
343 raise
345 # STEP 2: Delete from Qdrant (separate operation, after state is committed)
346 # If this fails, vectors remain but state is correctly marked as deleted
347 if document_ids_to_delete:
348 try:
349 await qdrant_manager.delete_points_by_document_id(
350 document_ids_to_delete
351 )
352 self.logger.info(
353 f"Deleted {len(document_ids_to_delete)} documents' points from Qdrant"
354 )
355 except Exception as e:
356 # Qdrant delete failed, but state is already committed
357 # Log the failure; the operation should be re-enqueued for retry (idempotent)
358 self.logger.error(
359 f"Failed to delete points from Qdrant (state still marked as deleted): {str(e)}",
360 exc_info=True,
361 )
362 # Re-raise to signal caller that cleanup should be retried
363 raise
365 return document_ids_to_delete
367 async def get_document_state_record(
368 self,
369 source_type: str,
370 source: str,
371 document_id: str,
372 project_id: str | None = None,
373 ) -> DocumentStateRecord | None:
374 """Get the state of a document."""
375 self.logger.debug(
376 f"Getting document state for {source_type}:{source}:{document_id} (project: {project_id})"
377 )
378 try:
379 return await _transitions.get_document_state_record(
380 self._session_factory, # type: ignore[arg-type]
381 source_type=source_type,
382 source=source,
383 document_id=document_id,
384 project_id=project_id,
385 )
386 except Exception as e:
387 self.logger.error(
388 f"Error getting document state for {source_type}:{source}:{document_id}: {str(e)}",
389 exc_info=True,
390 )
391 raise
393 async def get_document_state_records_by_ids(
394 self,
395 source_type: str,
396 source: str,
397 document_ids: list[str],
398 project_id: str | None = None,
399 ) -> list[DocumentStateRecord]:
400 """Get multiple document state records for a given source in a single query."""
401 self.logger.debug(
402 f"Getting document state records for {source_type}:{source} (batch of {len(document_ids)})"
403 )
404 try:
405 return await _transitions.get_document_state_records_by_ids(
406 self._session_factory, # type: ignore[arg-type]
407 source_type=source_type,
408 source=source,
409 document_ids=document_ids,
410 project_id=project_id,
411 )
412 except Exception as e:
413 self.logger.error(
414 f"Error getting document state records by ids for {source_type}:{source}: {str(e)}",
415 exc_info=True,
416 )
417 raise
419 async def get_document_state_records(
420 self, source_config: SourceConfig, since: datetime | None = None
421 ) -> list[DocumentStateRecord]:
422 """Get all document states for a source, optionally filtered by date."""
423 self.logger.debug(
424 f"Getting document state records for {source_config.source_type}:{source_config.source}"
425 )
426 try:
427 return await _transitions.get_document_state_records(
428 self._session_factory, # type: ignore[arg-type]
429 source_type=source_config.source_type,
430 source=source_config.source,
431 since=since,
432 )
433 except Exception as e:
434 self.logger.error(
435 f"Error getting document state records for {source_config.source_type}:{source_config.source}: {str(e)}",
436 exc_info=True,
437 )
438 raise
440 async def update_document_state(
441 self, document: Document, project_id: str | None = None
442 ) -> DocumentStateRecord:
443 """Update the state of a document."""
444 if not self._initialized:
445 raise RuntimeError("StateManager not initialized. Call initialize() first.")
447 self.logger.debug(
448 f"Updating document state for {document.source_type}:{document.source}:{document.id} (project: {project_id})"
449 )
450 try:
451 return await _transitions.update_document_state(
452 self._session_factory, # type: ignore[arg-type]
453 document=document,
454 project_id=project_id,
455 )
456 except Exception as e:
457 self.logger.error(
458 "Failed to update document state",
459 extra={
460 "project_id": project_id,
461 "document_id": document.id,
462 "error": str(e),
463 "error_type": type(e).__name__,
464 },
465 )
466 raise
468 async def update_conversion_metrics(
469 self,
470 source_type: str,
471 source: str,
472 converted_files_count: int = 0,
473 conversion_failures_count: int = 0,
474 attachments_processed_count: int = 0,
475 total_conversion_time: float = 0.0,
476 ) -> None:
477 """Update file conversion metrics for a source."""
478 self.logger.debug(f"Updating conversion metrics for {source_type}:{source}")
479 try:
480 await _transitions.update_conversion_metrics(
481 self._session_factory, # type: ignore[arg-type]
482 source_type=source_type,
483 source=source,
484 converted_files_count=converted_files_count,
485 conversion_failures_count=conversion_failures_count,
486 attachments_processed_count=attachments_processed_count,
487 total_conversion_time=total_conversion_time,
488 )
489 except Exception as e:
490 self.logger.error(
491 f"Error updating conversion metrics for {source_type}:{source}: {str(e)}",
492 exc_info=True,
493 )
494 raise
496 async def get_conversion_metrics(
497 self, source_type: str, source: str
498 ) -> dict[str, int | float]:
499 """Get file conversion metrics for a source."""
500 self.logger.debug(f"Getting conversion metrics for {source_type}:{source}")
501 try:
502 return await _transitions.get_conversion_metrics(
503 self._session_factory, # type: ignore[arg-type]
504 source_type=source_type,
505 source=source,
506 )
507 except Exception as e:
508 self.logger.error(
509 f"Error getting conversion metrics for {source_type}:{source}: {str(e)}",
510 exc_info=True,
511 )
512 raise
514 async def get_attachment_documents(
515 self, parent_document_id: str
516 ) -> list[DocumentStateRecord]:
517 """Get all attachment documents for a parent document."""
518 self.logger.debug(
519 f"Getting attachment documents for parent {parent_document_id}"
520 )
521 try:
522 return await _transitions.get_attachment_documents(
523 self._session_factory, # type: ignore[arg-type]
524 parent_document_id=parent_document_id,
525 )
526 except Exception as e:
527 self.logger.error(
528 f"Error getting attachment documents for {parent_document_id}: {str(e)}",
529 exc_info=True,
530 )
531 raise
533 async def get_converted_documents(
534 self, source_type: str, source: str, conversion_method: str | None = None
535 ) -> list[DocumentStateRecord]:
536 """Get all converted documents for a source, optionally filtered by conversion method."""
537 self.logger.debug(f"Getting converted documents for {source_type}:{source}")
538 try:
539 return await _transitions.get_converted_documents(
540 self._session_factory, # type: ignore[arg-type]
541 source_type=source_type,
542 source=source,
543 conversion_method=conversion_method,
544 )
545 except Exception as e:
546 self.logger.error(
547 f"Error getting converted documents for {source_type}:{source}: {str(e)}",
548 exc_info=True,
549 )
550 raise
552 async def close(self):
553 """Close all database connections."""
554 if hasattr(self, "_engine") and self._engine is not None:
555 self.logger.debug("Closing database connections")
556 await _dispose_engine(self._engine)
557 self.logger.debug("Database connections closed")