Coverage for src/qdrant_loader/core/state/state_manager.py: 64%

174 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1""" 

2State management service for tracking document ingestion state. 

3""" 

4 

5from datetime import datetime 

6from typing import TYPE_CHECKING 

7 

8from sqlalchemy import func, select 

9 

10from qdrant_loader.config.source_config import SourceConfig 

11from qdrant_loader.config.state import IngestionStatus, StateManagementConfig 

12from qdrant_loader.core.document import Document 

13from qdrant_loader.core.state import transitions as _transitions 

14from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory 

15from qdrant_loader.core.state.session import create_tables as _create_tables 

16from qdrant_loader.core.state.session import dispose_engine as _dispose_engine 

17from qdrant_loader.core.state.session import ( 

18 initialize_engine_and_session as _init_engine_session, 

19) 

20from qdrant_loader.core.state.utils import generate_sqlite_aiosqlite_url as _gen_url 

21from qdrant_loader.utils.logging import LoggingConfig 

22 

23if TYPE_CHECKING: 

24 from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker 

25 

26logger = LoggingConfig.get_logger(__name__) 

27 

28 

29class StateManager: 

30 """Manages state for document ingestion.""" 

31 

32 def __init__(self, config: StateManagementConfig): 

33 """Initialize the state manager with configuration.""" 

34 self.config = config 

35 self._initialized = False 

36 self._engine: AsyncEngine | None = None 

37 self._session_factory: async_sessionmaker[AsyncSession] | None = None 

38 self.logger = LoggingConfig.get_logger(__name__) 

39 

40 @property 

41 def is_initialized(self) -> bool: 

42 """Public accessor for initialization state used by callers/tests.""" 

43 return self._initialized 

44 

45 async def get_session(self) -> "AsyncSession": 

46 """Return an async session context manager, initializing if needed. 

47 

48 This method allows callers to use: 

49 async with await state_manager.get_session() as session: 

50 ... 

51 """ 

52 if not self._initialized: 

53 await self.initialize() 

54 if self._session_factory is None: 

55 raise RuntimeError("State manager session factory is not available") 

56 return self._session_factory() 

57 

58 async def create_session(self) -> "AsyncSession": 

59 """Alias for get_session for backward compatibility.""" 

60 return await self.get_session() 

61 

62 async def __aenter__(self): 

63 """Async context manager entry.""" 

64 self.logger.debug("=== StateManager.__aenter__() called ===") 

65 self.logger.debug(f"Current initialization state: {self._initialized}") 

66 

67 # Initialize if not already initialized 

68 if not self._initialized: 

69 self.logger.debug("StateManager not initialized, calling initialize()") 

70 await self.initialize() 

71 else: 

72 self.logger.debug("StateManager already initialized") 

73 

74 return self 

75 

76 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

77 """Async context manager exit.""" 

78 await self.dispose() 

79 

80 async def initialize(self) -> None: 

81 """Initialize the database and create tables if they don't exist.""" 

82 if self._initialized: 

83 self.logger.debug("StateManager already initialized, skipping") 

84 return 

85 

86 try: 

87 self.logger.debug("Starting StateManager initialization") 

88 

89 # Process database path with enhanced Windows debugging 

90 db_path_str = self.config.database_path 

91 self.logger.debug(f"Original database path: {db_path_str}") 

92 

93 # Handle special databases and generate URL 

94 database_url = _gen_url(db_path_str) 

95 self.logger.debug(f"Generated database URL: {database_url}") 

96 

97 # Create database engine and session factory 

98 self.logger.debug("Creating database engine and session factory") 

99 self._engine, self._session_factory = _init_engine_session(self.config) 

100 self.logger.debug("Engine and session factory created successfully") 

101 

102 # Create tables 

103 self.logger.debug("Creating database tables") 

104 await _create_tables(self._engine) 

105 self.logger.debug("Database tables created successfully") 

106 

107 self._initialized = True 

108 self.logger.debug("StateManager initialization completed successfully") 

109 

110 except Exception as e: 

111 self.logger.error(f"StateManager initialization failed: {e}", exc_info=True) 

112 # Ensure we clean up any partial initialization 

113 if hasattr(self, "_engine") and self._engine: 

114 try: 

115 await _dispose_engine(self._engine) 

116 except Exception as cleanup_error: 

117 self.logger.error( 

118 f"Failed to cleanup engine during error handling: {cleanup_error}" 

119 ) 

120 self._initialized = False 

121 raise 

122 

123 async def dispose(self): 

124 """Clean up resources.""" 

125 if self._engine: 

126 self.logger.debug("Disposing database engine") 

127 await _dispose_engine(self._engine) 

128 self._engine = None 

129 self._session_factory = None 

130 self._initialized = False 

131 self.logger.debug("StateManager resources disposed") 

132 

133 async def update_last_ingestion( 

134 self, 

135 source_type: str, 

136 source: str, 

137 status: str = IngestionStatus.SUCCESS, 

138 error_message: str | None = None, 

139 document_count: int = 0, 

140 project_id: str | None = None, 

141 ) -> None: 

142 """Update and get the last successful ingestion time for a source.""" 

143 self.logger.debug( 

144 f"Updating last ingestion for {source_type}:{source} (project: {project_id})" 

145 ) 

146 try: 

147 await _transitions.update_last_ingestion( 

148 self._session_factory, # type: ignore[arg-type] 

149 source_type=source_type, 

150 source=source, 

151 status=status, 

152 error_message=error_message, 

153 document_count=document_count, 

154 project_id=project_id, 

155 ) 

156 except Exception as e: 

157 self.logger.error( 

158 f"Error updating last ingestion for {source_type}:{source}: {str(e)}", 

159 exc_info=True, 

160 ) 

161 raise 

162 

163 async def get_last_ingestion( 

164 self, source_type: str, source: str, project_id: str | None = None 

165 ) -> IngestionHistory | None: 

166 """Get the last ingestion record for a source.""" 

167 self.logger.debug( 

168 f"Getting last ingestion for {source_type}:{source} (project: {project_id})" 

169 ) 

170 try: 

171 return await _transitions.get_last_ingestion( 

172 self._session_factory, # type: ignore[arg-type] 

173 source_type=source_type, 

174 source=source, 

175 project_id=project_id, 

176 ) 

177 except Exception as e: 

178 self.logger.error( 

179 f"Error getting last ingestion for {source_type}:{source}: {str(e)}", 

180 exc_info=True, 

181 ) 

182 raise 

183 

184 async def get_project_document_count(self, project_id: str) -> int: 

185 """Get the count of non-deleted documents for a project. 

186 

187 Returns 0 on failure to avoid breaking CLI status output. 

188 """ 

189 try: 

190 session_factory = getattr(self, "_session_factory", None) 

191 if session_factory is None: 

192 ctx = await self.get_session() 

193 else: 

194 ctx = ( 

195 session_factory() if callable(session_factory) else session_factory 

196 ) 

197 async with ctx as session: # type: ignore 

198 result = await session.execute( 

199 select(func.count(DocumentStateRecord.id)) 

200 .filter_by(project_id=project_id) 

201 .filter_by(is_deleted=False) 

202 ) 

203 count = result.scalar() or 0 

204 return count 

205 except Exception as e: # pragma: no cover - fallback path 

206 self.logger.error( 

207 f"Error getting project document count for {project_id}: {str(e)}", 

208 exc_info=True, 

209 ) 

210 return 0 

211 

212 async def get_project_latest_ingestion(self, project_id: str) -> str | None: 

213 """Get the latest ingestion timestamp (ISO) for a project. 

214 

215 Returns None on failure or when no ingestion exists. 

216 """ 

217 try: 

218 session_factory = getattr(self, "_session_factory", None) 

219 if session_factory is None: 

220 ctx = await self.get_session() 

221 else: 

222 ctx = ( 

223 session_factory() if callable(session_factory) else session_factory 

224 ) 

225 async with ctx as session: # type: ignore 

226 result = await session.execute( 

227 select(IngestionHistory.last_successful_ingestion) 

228 .filter_by(project_id=project_id) 

229 .order_by(IngestionHistory.last_successful_ingestion.desc()) 

230 .limit(1) 

231 ) 

232 timestamp = result.scalar_one_or_none() 

233 return timestamp.isoformat() if timestamp else None 

234 except Exception as e: # pragma: no cover - fallback path 

235 self.logger.error( 

236 f"Error getting project latest ingestion for {project_id}: {str(e)}", 

237 exc_info=True, 

238 ) 

239 return None 

240 

241 async def mark_document_deleted( 

242 self, 

243 source_type: str, 

244 source: str, 

245 document_id: str, 

246 project_id: str | None = None, 

247 ) -> None: 

248 """Mark a document as deleted.""" 

249 self.logger.debug( 

250 f"Marking document as deleted: {source_type}:{source}:{document_id} (project: {project_id})" 

251 ) 

252 try: 

253 await _transitions.mark_document_deleted( 

254 self._session_factory, # type: ignore[arg-type] 

255 source_type=source_type, 

256 source=source, 

257 document_id=document_id, 

258 project_id=project_id, 

259 ) 

260 except Exception as e: 

261 self.logger.error( 

262 f"Error marking document as deleted {source_type}:{source}:{document_id}: {str(e)}", 

263 exc_info=True, 

264 ) 

265 raise 

266 

267 async def get_document_state_record( 

268 self, 

269 source_type: str, 

270 source: str, 

271 document_id: str, 

272 project_id: str | None = None, 

273 ) -> DocumentStateRecord | None: 

274 """Get the state of a document.""" 

275 self.logger.debug( 

276 f"Getting document state for {source_type}:{source}:{document_id} (project: {project_id})" 

277 ) 

278 try: 

279 return await _transitions.get_document_state_record( 

280 self._session_factory, # type: ignore[arg-type] 

281 source_type=source_type, 

282 source=source, 

283 document_id=document_id, 

284 project_id=project_id, 

285 ) 

286 except Exception as e: 

287 self.logger.error( 

288 f"Error getting document state for {source_type}:{source}:{document_id}: {str(e)}", 

289 exc_info=True, 

290 ) 

291 raise 

292 

293 async def get_document_state_records( 

294 self, source_config: SourceConfig, since: datetime | None = None 

295 ) -> list[DocumentStateRecord]: 

296 """Get all document states for a source, optionally filtered by date.""" 

297 self.logger.debug( 

298 f"Getting document state records for {source_config.source_type}:{source_config.source}" 

299 ) 

300 try: 

301 return await _transitions.get_document_state_records( 

302 self._session_factory, # type: ignore[arg-type] 

303 source_type=source_config.source_type, 

304 source=source_config.source, 

305 since=since, 

306 ) 

307 except Exception as e: 

308 self.logger.error( 

309 f"Error getting document state records for {source_config.source_type}:{source_config.source}: {str(e)}", 

310 exc_info=True, 

311 ) 

312 raise 

313 

314 async def update_document_state( 

315 self, document: Document, project_id: str | None = None 

316 ) -> DocumentStateRecord: 

317 """Update the state of a document.""" 

318 if not self._initialized: 

319 raise RuntimeError("StateManager not initialized. Call initialize() first.") 

320 

321 self.logger.debug( 

322 f"Updating document state for {document.source_type}:{document.source}:{document.id} (project: {project_id})" 

323 ) 

324 try: 

325 return await _transitions.update_document_state( 

326 self._session_factory, # type: ignore[arg-type] 

327 document=document, 

328 project_id=project_id, 

329 ) 

330 except Exception as e: 

331 self.logger.error( 

332 "Failed to update document state", 

333 extra={ 

334 "project_id": project_id, 

335 "document_id": document.id, 

336 "error": str(e), 

337 "error_type": type(e).__name__, 

338 }, 

339 ) 

340 raise 

341 

342 async def update_conversion_metrics( 

343 self, 

344 source_type: str, 

345 source: str, 

346 converted_files_count: int = 0, 

347 conversion_failures_count: int = 0, 

348 attachments_processed_count: int = 0, 

349 total_conversion_time: float = 0.0, 

350 ) -> None: 

351 """Update file conversion metrics for a source.""" 

352 self.logger.debug(f"Updating conversion metrics for {source_type}:{source}") 

353 try: 

354 await _transitions.update_conversion_metrics( 

355 self._session_factory, # type: ignore[arg-type] 

356 source_type=source_type, 

357 source=source, 

358 converted_files_count=converted_files_count, 

359 conversion_failures_count=conversion_failures_count, 

360 attachments_processed_count=attachments_processed_count, 

361 total_conversion_time=total_conversion_time, 

362 ) 

363 except Exception as e: 

364 self.logger.error( 

365 f"Error updating conversion metrics for {source_type}:{source}: {str(e)}", 

366 exc_info=True, 

367 ) 

368 raise 

369 

370 async def get_conversion_metrics( 

371 self, source_type: str, source: str 

372 ) -> dict[str, int | float]: 

373 """Get file conversion metrics for a source.""" 

374 self.logger.debug(f"Getting conversion metrics for {source_type}:{source}") 

375 try: 

376 return await _transitions.get_conversion_metrics( 

377 self._session_factory, # type: ignore[arg-type] 

378 source_type=source_type, 

379 source=source, 

380 ) 

381 except Exception as e: 

382 self.logger.error( 

383 f"Error getting conversion metrics for {source_type}:{source}: {str(e)}", 

384 exc_info=True, 

385 ) 

386 raise 

387 

388 async def get_attachment_documents( 

389 self, parent_document_id: str 

390 ) -> list[DocumentStateRecord]: 

391 """Get all attachment documents for a parent document.""" 

392 self.logger.debug( 

393 f"Getting attachment documents for parent {parent_document_id}" 

394 ) 

395 try: 

396 return await _transitions.get_attachment_documents( 

397 self._session_factory, # type: ignore[arg-type] 

398 parent_document_id=parent_document_id, 

399 ) 

400 except Exception as e: 

401 self.logger.error( 

402 f"Error getting attachment documents for {parent_document_id}: {str(e)}", 

403 exc_info=True, 

404 ) 

405 raise 

406 

407 async def get_converted_documents( 

408 self, source_type: str, source: str, conversion_method: str | None = None 

409 ) -> list[DocumentStateRecord]: 

410 """Get all converted documents for a source, optionally filtered by conversion method.""" 

411 self.logger.debug(f"Getting converted documents for {source_type}:{source}") 

412 try: 

413 return await _transitions.get_converted_documents( 

414 self._session_factory, # type: ignore[arg-type] 

415 source_type=source_type, 

416 source=source, 

417 conversion_method=conversion_method, 

418 ) 

419 except Exception as e: 

420 self.logger.error( 

421 f"Error getting converted documents for {source_type}:{source}: {str(e)}", 

422 exc_info=True, 

423 ) 

424 raise 

425 

426 async def close(self): 

427 """Close all database connections.""" 

428 if hasattr(self, "_engine") and self._engine is not None: 

429 self.logger.debug("Closing database connections") 

430 await _dispose_engine(self._engine) 

431 self.logger.debug("Database connections closed")