Coverage for src / qdrant_loader / core / state / state_manager.py: 53%

224 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1""" 

2State management service for tracking document ingestion state. 

3""" 

4 

5from datetime import UTC, datetime 

6from typing import TYPE_CHECKING 

7 

8from sqlalchemy import func, select 

9 

10from qdrant_loader.config.source_config import SourceConfig 

11from qdrant_loader.config.state import IngestionStatus, StateManagementConfig 

12from qdrant_loader.core.document import Document 

13from qdrant_loader.core.state import transitions as _transitions 

14from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory 

15from qdrant_loader.core.state.session import create_tables as _create_tables 

16from qdrant_loader.core.state.session import dispose_engine as _dispose_engine 

17from qdrant_loader.core.state.session import ( 

18 initialize_engine_and_session as _init_engine_session, 

19) 

20from qdrant_loader.core.state.utils import generate_sqlite_aiosqlite_url as _gen_url 

21from qdrant_loader.utils.logging import LoggingConfig 

22 

23if TYPE_CHECKING: 

24 from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker 

25 

26logger = LoggingConfig.get_logger(__name__) 

27 

28 

29class StateManager: 

30 """Manages state for document ingestion.""" 

31 

32 def __init__(self, config: StateManagementConfig): 

33 """Initialize the state manager with configuration.""" 

34 self.config = config 

35 self._initialized = False 

36 self._engine: AsyncEngine | None = None 

37 self._session_factory: async_sessionmaker[AsyncSession] | None = None 

38 self.logger = LoggingConfig.get_logger(__name__) 

39 

40 @property 

41 def is_initialized(self) -> bool: 

42 """Public accessor for initialization state used by callers/tests.""" 

43 return self._initialized 

44 

45 @property 

46 def session_factory(self) -> "async_sessionmaker[AsyncSession]": 

47 """Return session factory if initialized, else raise a clear error.""" 

48 if self._session_factory is None: 

49 raise RuntimeError("State manager session factory is not initialized") 

50 return self._session_factory 

51 

52 async def get_session(self) -> "AsyncSession": 

53 """Return an async session context manager, initializing if needed. 

54 

55 This method allows callers to use: 

56 async with await state_manager.get_session() as session: 

57 ... 

58 """ 

59 if not self._initialized: 

60 await self.initialize() 

61 if self._session_factory is None: 

62 raise RuntimeError("State manager session factory is not available") 

63 return self._session_factory() 

64 

65 async def create_session(self) -> "AsyncSession": 

66 """Alias for get_session for backward compatibility.""" 

67 return await self.get_session() 

68 

69 async def __aenter__(self): 

70 """Async context manager entry.""" 

71 self.logger.debug("=== StateManager.__aenter__() called ===") 

72 self.logger.debug(f"Current initialization state: {self._initialized}") 

73 

74 # Initialize if not already initialized 

75 if not self._initialized: 

76 self.logger.debug("StateManager not initialized, calling initialize()") 

77 await self.initialize() 

78 else: 

79 self.logger.debug("StateManager already initialized") 

80 

81 return self 

82 

83 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

84 """Async context manager exit.""" 

85 await self.dispose() 

86 

87 async def initialize(self) -> None: 

88 """Initialize the database and create tables if they don't exist.""" 

89 if self._initialized: 

90 self.logger.debug("StateManager already initialized, skipping") 

91 return 

92 

93 try: 

94 self.logger.debug("Starting StateManager initialization") 

95 

96 # Process database path with enhanced Windows debugging 

97 db_path_str = self.config.database_path 

98 self.logger.debug(f"Original database path: {db_path_str}") 

99 

100 # Handle special databases and generate URL 

101 database_url = _gen_url(db_path_str) 

102 self.logger.debug(f"Generated database URL: {database_url}") 

103 

104 # Create database engine and session factory 

105 self.logger.debug("Creating database engine and session factory") 

106 self._engine, self._session_factory = _init_engine_session(self.config) 

107 self.logger.debug("Engine and session factory created successfully") 

108 

109 # Create tables 

110 self.logger.debug("Creating database tables") 

111 await _create_tables(self._engine) 

112 self.logger.debug("Database tables created successfully") 

113 

114 self._initialized = True 

115 self.logger.debug("StateManager initialization completed successfully") 

116 

117 except Exception as e: 

118 self.logger.error(f"StateManager initialization failed: {e}", exc_info=True) 

119 # Ensure we clean up any partial initialization 

120 if hasattr(self, "_engine") and self._engine: 

121 try: 

122 await _dispose_engine(self._engine) 

123 except Exception as cleanup_error: 

124 self.logger.error( 

125 f"Failed to cleanup engine during error handling: {cleanup_error}" 

126 ) 

127 self._initialized = False 

128 raise 

129 

130 async def dispose(self): 

131 """Clean up resources.""" 

132 if self._engine: 

133 self.logger.debug("Disposing database engine") 

134 await _dispose_engine(self._engine) 

135 self._engine = None 

136 self._session_factory = None 

137 self._initialized = False 

138 self.logger.debug("StateManager resources disposed") 

139 

140 async def update_last_ingestion( 

141 self, 

142 source_type: str, 

143 source: str, 

144 status: str = IngestionStatus.SUCCESS, 

145 error_message: str | None = None, 

146 document_count: int = 0, 

147 project_id: str | None = None, 

148 ) -> None: 

149 """Update and get the last successful ingestion time for a source.""" 

150 self.logger.debug( 

151 f"Updating last ingestion for {source_type}:{source} (project: {project_id})" 

152 ) 

153 try: 

154 await _transitions.update_last_ingestion( 

155 self._session_factory, # type: ignore[arg-type] 

156 source_type=source_type, 

157 source=source, 

158 status=status, 

159 error_message=error_message, 

160 document_count=document_count, 

161 project_id=project_id, 

162 ) 

163 except Exception as e: 

164 self.logger.error( 

165 f"Error updating last ingestion for {source_type}:{source}: {str(e)}", 

166 exc_info=True, 

167 ) 

168 raise 

169 

170 async def get_last_ingestion( 

171 self, source_type: str, source: str, project_id: str | None = None 

172 ) -> IngestionHistory | None: 

173 """Get the last ingestion record for a source.""" 

174 self.logger.debug( 

175 f"Getting last ingestion for {source_type}:{source} (project: {project_id})" 

176 ) 

177 try: 

178 return await _transitions.get_last_ingestion( 

179 self._session_factory, # type: ignore[arg-type] 

180 source_type=source_type, 

181 source=source, 

182 project_id=project_id, 

183 ) 

184 except Exception as e: 

185 self.logger.error( 

186 f"Error getting last ingestion for {source_type}:{source}: {str(e)}", 

187 exc_info=True, 

188 ) 

189 raise 

190 

191 async def get_project_document_count(self, project_id: str) -> int: 

192 """Get the count of non-deleted documents for a project. 

193 

194 Returns 0 on failure to avoid breaking CLI status output. 

195 """ 

196 try: 

197 session_factory = getattr(self, "_session_factory", None) 

198 if session_factory is None: 

199 ctx = await self.get_session() 

200 else: 

201 ctx = ( 

202 session_factory() if callable(session_factory) else session_factory 

203 ) 

204 async with ctx as session: # type: ignore 

205 result = await session.execute( 

206 select(func.count(DocumentStateRecord.id)) 

207 .filter_by(project_id=project_id) 

208 .filter_by(is_deleted=False) 

209 ) 

210 count = result.scalar() or 0 

211 return count 

212 except Exception as e: # pragma: no cover - fallback path 

213 self.logger.error( 

214 f"Error getting project document count for {project_id}: {str(e)}", 

215 exc_info=True, 

216 ) 

217 return 0 

218 

219 async def get_project_latest_ingestion(self, project_id: str) -> str | None: 

220 """Get the latest ingestion timestamp (ISO) for a project. 

221 

222 Returns None on failure or when no ingestion exists. 

223 """ 

224 try: 

225 session_factory = getattr(self, "_session_factory", None) 

226 if session_factory is None: 

227 ctx = await self.get_session() 

228 else: 

229 ctx = ( 

230 session_factory() if callable(session_factory) else session_factory 

231 ) 

232 async with ctx as session: # type: ignore 

233 result = await session.execute( 

234 select(IngestionHistory.last_successful_ingestion) 

235 .filter_by(project_id=project_id) 

236 .order_by(IngestionHistory.last_successful_ingestion.desc()) 

237 .limit(1) 

238 ) 

239 timestamp = result.scalar_one_or_none() 

240 return timestamp.isoformat() if timestamp else None 

241 except Exception as e: # pragma: no cover - fallback path 

242 self.logger.error( 

243 f"Error getting project latest ingestion for {project_id}: {str(e)}", 

244 exc_info=True, 

245 ) 

246 return None 

247 

248 async def mark_document_deleted( 

249 self, 

250 source_type: str, 

251 source: str, 

252 document_id: str, 

253 project_id: str | None = None, 

254 ) -> None: 

255 """Mark a document as deleted.""" 

256 self.logger.debug( 

257 f"Marking document as deleted: {source_type}:{source}:{document_id} (project: {project_id})" 

258 ) 

259 try: 

260 await _transitions.mark_document_deleted( 

261 self._session_factory, # type: ignore[arg-type] 

262 source_type=source_type, 

263 source=source, 

264 document_id=document_id, 

265 project_id=project_id, 

266 ) 

267 except Exception as e: 

268 self.logger.error( 

269 f"Error marking document as deleted {source_type}:{source}:{document_id}: {str(e)}", 

270 exc_info=True, 

271 ) 

272 raise 

273 

274 async def mark_documents_deleted_atomic( 

275 self, 

276 deleted_documents: list["Document"], 

277 qdrant_manager, 

278 project_id: str | None = None, 

279 ) -> list[str]: 

280 """Atomically mark documents as deleted in state and delete their points in Qdrant. 

281 

282 Transaction ordering: 

283 1. Mark state records as is_deleted=True and COMMIT to DB immediately 

284 2. Delete points from Qdrant (separate operation, outside DB transaction) 

285 3. If Qdrant delete fails, vectors remain orphaned but state is correctly marked 

286 (orphan vectors are recoverable by re-running delete; orphan state is silent corruption) 

287 

288 Returns the list of document IDs that were marked and deleted. 

289 

290 WS-3 DESIGN NOTE: Current design prefers orphan vectors (safe, recoverable) over orphan 

291 state (dangerous, silent). If Qdrant fails to delete, the operation should be re-enqueued 

292 for retry as an idempotent operation. 

293 """ 

294 if not self._initialized: 

295 raise RuntimeError("StateManager not initialized. Call initialize() first.") 

296 

297 session_factory = getattr(self, "_session_factory", None) 

298 if session_factory is None: 

299 raise RuntimeError("State manager session factory is not available") 

300 

301 document_ids_to_delete: list[str] = [] 

302 

303 # STEP 1: Commit state changes to DB first 

304 async with session_factory() as session: # type: ignore 

305 tx = await session.begin() 

306 try: 

307 now = datetime.now(UTC) 

308 for doc in deleted_documents: 

309 query = select(DocumentStateRecord).filter( 

310 DocumentStateRecord.source_type == doc.source_type, 

311 DocumentStateRecord.source == doc.source, 

312 DocumentStateRecord.document_id == doc.id, 

313 ) 

314 if project_id is not None: 

315 query = query.filter( 

316 DocumentStateRecord.project_id == project_id 

317 ) 

318 result = await session.execute(query) 

319 state = result.scalar_one_or_none() 

320 if state: 

321 state.is_deleted = True # type: ignore 

322 state.updated_at = now # type: ignore 

323 document_ids_to_delete.append(doc.id) 

324 

325 # Commit state changes immediately 

326 await tx.commit() 

327 self.logger.info( 

328 f"Marked {len(document_ids_to_delete)} documents as deleted in state DB" 

329 ) 

330 except Exception as e: 

331 # Rollback on any DB error 

332 try: 

333 await tx.rollback() 

334 except Exception as rb_err: 

335 self.logger.error( 

336 f"Failed to rollback transaction after error: {rb_err}", 

337 exc_info=True, 

338 ) 

339 self.logger.error( 

340 f"Failed to mark documents deleted in DB: {str(e)}", 

341 exc_info=True, 

342 ) 

343 raise 

344 

345 # STEP 2: Delete from Qdrant (separate operation, after state is committed) 

346 # If this fails, vectors remain but state is correctly marked as deleted 

347 if document_ids_to_delete: 

348 try: 

349 await qdrant_manager.delete_points_by_document_id( 

350 document_ids_to_delete 

351 ) 

352 self.logger.info( 

353 f"Deleted {len(document_ids_to_delete)} documents' points from Qdrant" 

354 ) 

355 except Exception as e: 

356 # Qdrant delete failed, but state is already committed 

357 # Log the failure; the operation should be re-enqueued for retry (idempotent) 

358 self.logger.error( 

359 f"Failed to delete points from Qdrant (state still marked as deleted): {str(e)}", 

360 exc_info=True, 

361 ) 

362 # Re-raise to signal caller that cleanup should be retried 

363 raise 

364 

365 return document_ids_to_delete 

366 

367 async def get_document_state_record( 

368 self, 

369 source_type: str, 

370 source: str, 

371 document_id: str, 

372 project_id: str | None = None, 

373 ) -> DocumentStateRecord | None: 

374 """Get the state of a document.""" 

375 self.logger.debug( 

376 f"Getting document state for {source_type}:{source}:{document_id} (project: {project_id})" 

377 ) 

378 try: 

379 return await _transitions.get_document_state_record( 

380 self._session_factory, # type: ignore[arg-type] 

381 source_type=source_type, 

382 source=source, 

383 document_id=document_id, 

384 project_id=project_id, 

385 ) 

386 except Exception as e: 

387 self.logger.error( 

388 f"Error getting document state for {source_type}:{source}:{document_id}: {str(e)}", 

389 exc_info=True, 

390 ) 

391 raise 

392 

393 async def get_document_state_records_by_ids( 

394 self, 

395 source_type: str, 

396 source: str, 

397 document_ids: list[str], 

398 project_id: str | None = None, 

399 ) -> list[DocumentStateRecord]: 

400 """Get multiple document state records for a given source in a single query.""" 

401 self.logger.debug( 

402 f"Getting document state records for {source_type}:{source} (batch of {len(document_ids)})" 

403 ) 

404 try: 

405 return await _transitions.get_document_state_records_by_ids( 

406 self._session_factory, # type: ignore[arg-type] 

407 source_type=source_type, 

408 source=source, 

409 document_ids=document_ids, 

410 project_id=project_id, 

411 ) 

412 except Exception as e: 

413 self.logger.error( 

414 f"Error getting document state records by ids for {source_type}:{source}: {str(e)}", 

415 exc_info=True, 

416 ) 

417 raise 

418 

419 async def get_document_state_records( 

420 self, source_config: SourceConfig, since: datetime | None = None 

421 ) -> list[DocumentStateRecord]: 

422 """Get all document states for a source, optionally filtered by date.""" 

423 self.logger.debug( 

424 f"Getting document state records for {source_config.source_type}:{source_config.source}" 

425 ) 

426 try: 

427 return await _transitions.get_document_state_records( 

428 self._session_factory, # type: ignore[arg-type] 

429 source_type=source_config.source_type, 

430 source=source_config.source, 

431 since=since, 

432 ) 

433 except Exception as e: 

434 self.logger.error( 

435 f"Error getting document state records for {source_config.source_type}:{source_config.source}: {str(e)}", 

436 exc_info=True, 

437 ) 

438 raise 

439 

440 async def update_document_state( 

441 self, document: Document, project_id: str | None = None 

442 ) -> DocumentStateRecord: 

443 """Update the state of a document.""" 

444 if not self._initialized: 

445 raise RuntimeError("StateManager not initialized. Call initialize() first.") 

446 

447 self.logger.debug( 

448 f"Updating document state for {document.source_type}:{document.source}:{document.id} (project: {project_id})" 

449 ) 

450 try: 

451 return await _transitions.update_document_state( 

452 self._session_factory, # type: ignore[arg-type] 

453 document=document, 

454 project_id=project_id, 

455 ) 

456 except Exception as e: 

457 self.logger.error( 

458 "Failed to update document state", 

459 extra={ 

460 "project_id": project_id, 

461 "document_id": document.id, 

462 "error": str(e), 

463 "error_type": type(e).__name__, 

464 }, 

465 ) 

466 raise 

467 

468 async def update_conversion_metrics( 

469 self, 

470 source_type: str, 

471 source: str, 

472 converted_files_count: int = 0, 

473 conversion_failures_count: int = 0, 

474 attachments_processed_count: int = 0, 

475 total_conversion_time: float = 0.0, 

476 ) -> None: 

477 """Update file conversion metrics for a source.""" 

478 self.logger.debug(f"Updating conversion metrics for {source_type}:{source}") 

479 try: 

480 await _transitions.update_conversion_metrics( 

481 self._session_factory, # type: ignore[arg-type] 

482 source_type=source_type, 

483 source=source, 

484 converted_files_count=converted_files_count, 

485 conversion_failures_count=conversion_failures_count, 

486 attachments_processed_count=attachments_processed_count, 

487 total_conversion_time=total_conversion_time, 

488 ) 

489 except Exception as e: 

490 self.logger.error( 

491 f"Error updating conversion metrics for {source_type}:{source}: {str(e)}", 

492 exc_info=True, 

493 ) 

494 raise 

495 

496 async def get_conversion_metrics( 

497 self, source_type: str, source: str 

498 ) -> dict[str, int | float]: 

499 """Get file conversion metrics for a source.""" 

500 self.logger.debug(f"Getting conversion metrics for {source_type}:{source}") 

501 try: 

502 return await _transitions.get_conversion_metrics( 

503 self._session_factory, # type: ignore[arg-type] 

504 source_type=source_type, 

505 source=source, 

506 ) 

507 except Exception as e: 

508 self.logger.error( 

509 f"Error getting conversion metrics for {source_type}:{source}: {str(e)}", 

510 exc_info=True, 

511 ) 

512 raise 

513 

514 async def get_attachment_documents( 

515 self, parent_document_id: str 

516 ) -> list[DocumentStateRecord]: 

517 """Get all attachment documents for a parent document.""" 

518 self.logger.debug( 

519 f"Getting attachment documents for parent {parent_document_id}" 

520 ) 

521 try: 

522 return await _transitions.get_attachment_documents( 

523 self._session_factory, # type: ignore[arg-type] 

524 parent_document_id=parent_document_id, 

525 ) 

526 except Exception as e: 

527 self.logger.error( 

528 f"Error getting attachment documents for {parent_document_id}: {str(e)}", 

529 exc_info=True, 

530 ) 

531 raise 

532 

533 async def get_converted_documents( 

534 self, source_type: str, source: str, conversion_method: str | None = None 

535 ) -> list[DocumentStateRecord]: 

536 """Get all converted documents for a source, optionally filtered by conversion method.""" 

537 self.logger.debug(f"Getting converted documents for {source_type}:{source}") 

538 try: 

539 return await _transitions.get_converted_documents( 

540 self._session_factory, # type: ignore[arg-type] 

541 source_type=source_type, 

542 source=source, 

543 conversion_method=conversion_method, 

544 ) 

545 except Exception as e: 

546 self.logger.error( 

547 f"Error getting converted documents for {source_type}:{source}: {str(e)}", 

548 exc_info=True, 

549 ) 

550 raise 

551 

552 async def close(self): 

553 """Close all database connections.""" 

554 if hasattr(self, "_engine") and self._engine is not None: 

555 self.logger.debug("Closing database connections") 

556 await _dispose_engine(self._engine) 

557 self.logger.debug("Database connections closed")