Coverage for src/qdrant_loader/core/state/state_manager.py: 78%

345 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1""" 

2State management service for tracking document ingestion state. 

3""" 

4 

5import os 

6from datetime import UTC, datetime 

7from pathlib import Path 

8 

9from sqlalchemy import select 

10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine 

11from sqlalchemy.pool import StaticPool 

12 

13from qdrant_loader.config.source_config import SourceConfig 

14from qdrant_loader.config.state import IngestionStatus, StateManagementConfig 

15from qdrant_loader.core.document import Document 

16from qdrant_loader.core.state.exceptions import DatabaseError 

17from qdrant_loader.core.state.models import Base, DocumentStateRecord, IngestionHistory 

18from qdrant_loader.utils.logging import LoggingConfig 

19 

20logger = LoggingConfig.get_logger(__name__) 

21 

22 

23class StateManager: 

24 """Manages state for document ingestion.""" 

25 

26 def __init__(self, config: StateManagementConfig): 

27 """Initialize the state manager with configuration.""" 

28 self.config = config 

29 self._initialized = False 

30 self._engine = None 

31 self._session_factory = None 

32 self.logger = LoggingConfig.get_logger(__name__) 

33 

34 async def __aenter__(self): 

35 """Async context manager entry.""" 

36 self.logger.debug("=== StateManager.__aenter__() called ===") 

37 self.logger.debug(f"Current initialization state: {self._initialized}") 

38 

39 # Initialize if not already initialized 

40 if not self._initialized: 

41 self.logger.debug("StateManager not initialized, calling initialize()") 

42 await self.initialize() 

43 else: 

44 self.logger.debug("StateManager already initialized") 

45 

46 return self 

47 

48 async def __aexit__(self, exc_type, exc_val, exc_tb): 

49 """Async context manager exit.""" 

50 await self.dispose() 

51 

52 async def initialize(self) -> None: 

53 """Initialize the database and create tables if they don't exist.""" 

54 if self._initialized: 

55 self.logger.debug("StateManager already initialized, skipping") 

56 return 

57 

58 try: 

59 self.logger.debug("Starting StateManager initialization") 

60 

61 # Process database path with enhanced Windows debugging 

62 db_path_str = self.config.database_path 

63 self.logger.debug(f"Original database path: {db_path_str}") 

64 

65 # Handle special databases 

66 if db_path_str in (":memory:", "sqlite:///:memory:"): 

67 self.logger.debug("Using in-memory database") 

68 database_url = "sqlite+aiosqlite:///:memory:" 

69 elif db_path_str.startswith("sqlite://"): 

70 self.logger.debug("Database path is already a SQLite URL") 

71 database_url = db_path_str.replace("sqlite://", "sqlite+aiosqlite://") 

72 else: 

73 # Process file path 

74 self.logger.debug(f"Processing file path: {db_path_str}") 

75 db_path = Path(db_path_str) 

76 

77 # Resolve to absolute path for proper handling 

78 if not db_path.is_absolute(): 

79 db_path = db_path.resolve() 

80 self.logger.debug(f"Resolved relative path to: {db_path}") 

81 

82 # Validate parent directory exists 

83 parent_dir = db_path.parent 

84 self.logger.debug(f"Database parent directory: {parent_dir}") 

85 

86 if not parent_dir.exists(): 

87 self.logger.debug(f"Creating parent directory: {parent_dir}") 

88 try: 

89 parent_dir.mkdir(parents=True, exist_ok=True) 

90 self.logger.debug("Parent directory created successfully") 

91 except Exception as e: 

92 self.logger.error(f"Failed to create parent directory: {e}") 

93 raise DatabaseError( 

94 f"Cannot create database directory {parent_dir}: {e}" 

95 ) 

96 

97 # Check directory permissions 

98 if not os.access(parent_dir, os.W_OK): 

99 error_msg = ( 

100 f"No write permission for database directory: {parent_dir}" 

101 ) 

102 self.logger.error(error_msg) 

103 raise DatabaseError(error_msg) 

104 

105 # Convert to proper SQLite URL format 

106 db_url_path = db_path.as_posix() 

107 if db_path.is_absolute(): 

108 # For absolute paths, ensure proper URL format 

109 if db_path.parts[0].endswith(":"): 

110 # Windows absolute path with drive letter 

111 database_url = f"sqlite+aiosqlite:///{db_url_path}" 

112 else: 

113 # Unix absolute path (already starts with /) - needs 4 slashes total 

114 database_url = f"sqlite+aiosqlite:///{db_url_path}" 

115 else: 

116 # Relative path 

117 database_url = f"sqlite+aiosqlite:///{db_url_path}" 

118 

119 self.logger.debug(f"Generated database URL: {database_url}") 

120 

121 # Create database engine 

122 self.logger.debug("Creating database engine") 

123 self._engine = create_async_engine( 

124 database_url, 

125 poolclass=StaticPool, 

126 connect_args={"check_same_thread": False}, 

127 echo=False, 

128 ) 

129 self.logger.debug("Database engine created successfully") 

130 

131 # Create session factory 

132 self.logger.debug("Creating session factory") 

133 self._session_factory = async_sessionmaker( 

134 self._engine, expire_on_commit=False 

135 ) 

136 self.logger.debug("Session factory created successfully") 

137 

138 # Create tables 

139 self.logger.debug("Creating database tables") 

140 async with self._engine.begin() as conn: 

141 await conn.run_sync(Base.metadata.create_all) 

142 self.logger.debug("Database tables created successfully") 

143 

144 self._initialized = True 

145 self.logger.debug("StateManager initialization completed successfully") 

146 

147 except Exception as e: 

148 self.logger.error(f"StateManager initialization failed: {e}", exc_info=True) 

149 # Ensure we clean up any partial initialization 

150 if hasattr(self, "_engine") and self._engine: 

151 try: 

152 await self._engine.dispose() 

153 except Exception as cleanup_error: 

154 self.logger.error( 

155 f"Failed to cleanup engine during error handling: {cleanup_error}" 

156 ) 

157 self._initialized = False 

158 raise 

159 

160 async def dispose(self): 

161 """Clean up resources.""" 

162 if self._engine: 

163 self.logger.debug("Disposing database engine") 

164 await self._engine.dispose() 

165 self._engine = None 

166 self._session_factory = None 

167 self._initialized = False 

168 self.logger.debug("StateManager resources disposed") 

169 

170 async def update_last_ingestion( 

171 self, 

172 source_type: str, 

173 source: str, 

174 status: str = IngestionStatus.SUCCESS, 

175 error_message: str | None = None, 

176 document_count: int = 0, 

177 project_id: str | None = None, 

178 ) -> None: 

179 """Update and get the last successful ingestion time for a source.""" 

180 self.logger.debug( 

181 f"Updating last ingestion for {source_type}:{source} (project: {project_id})" 

182 ) 

183 try: 

184 async with self._session_factory() as session: # type: ignore 

185 self.logger.debug( 

186 f"Created database session for {source_type}:{source}" 

187 ) 

188 now = datetime.now(UTC) 

189 self.logger.debug( 

190 f"Executing query to find ingestion history for {source_type}:{source}" 

191 ) 

192 

193 # Build query with optional project filter 

194 query = select(IngestionHistory).filter_by( 

195 source_type=source_type, source=source 

196 ) 

197 if project_id is not None: 

198 query = query.filter_by(project_id=project_id) 

199 

200 result = await session.execute(query) 

201 ingestion = result.scalar_one_or_none() 

202 self.logger.debug( 

203 f"Query result: {'Found' if ingestion else 'Not found'} ingestion history for {source_type}:{source}" 

204 ) 

205 

206 if ingestion: 

207 self.logger.debug( 

208 f"Updating existing ingestion history for {source_type}:{source}" 

209 ) 

210 ingestion.last_successful_ingestion = now if status == IngestionStatus.SUCCESS else ingestion.last_successful_ingestion # type: ignore 

211 ingestion.status = status # type: ignore 

212 ingestion.document_count = document_count if document_count else ingestion.document_count # type: ignore 

213 ingestion.updated_at = now # type: ignore 

214 ingestion.error_message = error_message # type: ignore 

215 else: 

216 self.logger.debug( 

217 f"Creating new ingestion history for {source_type}:{source}" 

218 ) 

219 ingestion = IngestionHistory( 

220 project_id=project_id, 

221 source_type=source_type, 

222 source=source, 

223 last_successful_ingestion=now, 

224 status=status, 

225 document_count=document_count, 

226 error_message=error_message, 

227 created_at=now, 

228 updated_at=now, 

229 ) 

230 session.add(ingestion) 

231 

232 self.logger.debug(f"Committing changes for {source_type}:{source}") 

233 await session.commit() 

234 self.logger.debug( 

235 f"Successfully committed changes for {source_type}:{source}" 

236 ) 

237 

238 self.logger.debug( 

239 "Ingestion history updated", 

240 extra={ 

241 "project_id": project_id, 

242 "source_type": ingestion.source_type, 

243 "source": ingestion.source, 

244 "status": ingestion.status, 

245 "document_count": ingestion.document_count, 

246 }, 

247 ) 

248 except Exception as e: 

249 self.logger.error( 

250 f"Error updating last ingestion for {source_type}:{source}: {str(e)}", 

251 exc_info=True, 

252 ) 

253 raise 

254 

255 async def get_last_ingestion( 

256 self, source_type: str, source: str, project_id: str | None = None 

257 ) -> IngestionHistory | None: 

258 """Get the last ingestion record for a source.""" 

259 self.logger.debug( 

260 f"Getting last ingestion for {source_type}:{source} (project: {project_id})" 

261 ) 

262 try: 

263 async with self._session_factory() as session: # type: ignore 

264 self.logger.debug( 

265 f"Created database session for {source_type}:{source}" 

266 ) 

267 self.logger.debug( 

268 f"Executing query to find last ingestion for {source_type}:{source}" 

269 ) 

270 

271 # Build query with optional project filter 

272 query = select(IngestionHistory).filter( 

273 IngestionHistory.source_type == source_type, 

274 IngestionHistory.source == source, 

275 ) 

276 if project_id is not None: 

277 query = query.filter(IngestionHistory.project_id == project_id) 

278 

279 query = query.order_by( 

280 IngestionHistory.last_successful_ingestion.desc() 

281 ) 

282 result = await session.execute(query) 

283 ingestion = result.scalar_one_or_none() 

284 self.logger.debug( 

285 f"Query result: {'Found' if ingestion else 'Not found'} last ingestion for {source_type}:{source}" 

286 ) 

287 return ingestion 

288 except Exception as e: 

289 self.logger.error( 

290 f"Error getting last ingestion for {source_type}:{source}: {str(e)}", 

291 exc_info=True, 

292 ) 

293 raise 

294 

295 async def mark_document_deleted( 

296 self, 

297 source_type: str, 

298 source: str, 

299 document_id: str, 

300 project_id: str | None = None, 

301 ) -> None: 

302 """Mark a document as deleted.""" 

303 self.logger.debug( 

304 f"Marking document as deleted: {source_type}:{source}:{document_id} (project: {project_id})" 

305 ) 

306 try: 

307 async with self._session_factory() as session: # type: ignore 

308 self.logger.debug( 

309 f"Created database session for {source_type}:{source}:{document_id}" 

310 ) 

311 now = datetime.now(UTC) 

312 self.logger.debug( 

313 "Searching for document to be deleted.", 

314 extra={ 

315 "document_id": document_id, 

316 "source_type": source_type, 

317 "source": source, 

318 "project_id": project_id, 

319 }, 

320 ) 

321 self.logger.debug( 

322 f"Executing query to find document {source_type}:{source}:{document_id}" 

323 ) 

324 

325 # Build query with optional project filter 

326 query = select(DocumentStateRecord).filter( 

327 DocumentStateRecord.source_type == source_type, 

328 DocumentStateRecord.source == source, 

329 DocumentStateRecord.document_id == document_id, 

330 ) 

331 if project_id is not None: 

332 query = query.filter(DocumentStateRecord.project_id == project_id) 

333 

334 result = await session.execute(query) 

335 state = result.scalar_one_or_none() 

336 self.logger.debug( 

337 f"Query result: {'Found' if state else 'Not found'} document {source_type}:{source}:{document_id}" 

338 ) 

339 

340 if state: 

341 self.logger.debug( 

342 f"Updating document state for {source_type}:{source}:{document_id}" 

343 ) 

344 state.is_deleted = True # type: ignore 

345 state.updated_at = now # type: ignore 

346 self.logger.debug( 

347 f"Committing changes for {source_type}:{source}:{document_id}" 

348 ) 

349 await session.commit() 

350 self.logger.debug( 

351 f"Successfully committed changes for {source_type}:{source}:{document_id}" 

352 ) 

353 self.logger.debug( 

354 "Document marked as deleted", 

355 extra={ 

356 "document_id": document_id, 

357 "source_type": source_type, 

358 "source": source, 

359 "project_id": project_id, 

360 }, 

361 ) 

362 else: 

363 self.logger.warning( 

364 f"Document not found: {source_type}:{source}:{document_id}" 

365 ) 

366 except Exception as e: 

367 self.logger.error( 

368 f"Error marking document as deleted {source_type}:{source}:{document_id}: {str(e)}", 

369 exc_info=True, 

370 ) 

371 raise 

372 

373 async def get_document_state_record( 

374 self, 

375 source_type: str, 

376 source: str, 

377 document_id: str, 

378 project_id: str | None = None, 

379 ) -> DocumentStateRecord | None: 

380 """Get the state of a document.""" 

381 self.logger.debug( 

382 f"Getting document state for {source_type}:{source}:{document_id} (project: {project_id})" 

383 ) 

384 try: 

385 async with self._session_factory() as session: # type: ignore 

386 self.logger.debug( 

387 f"Created database session for {source_type}:{source}:{document_id}" 

388 ) 

389 self.logger.debug( 

390 f"Executing query to find document state for {source_type}:{source}:{document_id}" 

391 ) 

392 

393 # Build query with optional project filter 

394 query = select(DocumentStateRecord).filter( 

395 DocumentStateRecord.source_type == source_type, 

396 DocumentStateRecord.source == source, 

397 DocumentStateRecord.document_id == document_id, 

398 ) 

399 if project_id is not None: 

400 query = query.filter(DocumentStateRecord.project_id == project_id) 

401 

402 result = await session.execute(query) 

403 state = result.scalar_one_or_none() 

404 self.logger.debug( 

405 f"Query result: {'Found' if state else 'Not found'} document state for {source_type}:{source}:{document_id}" 

406 ) 

407 return state 

408 except Exception as e: 

409 self.logger.error( 

410 f"Error getting document state for {source_type}:{source}:{document_id}: {str(e)}", 

411 exc_info=True, 

412 ) 

413 raise 

414 

415 async def get_document_state_records( 

416 self, source_config: SourceConfig, since: datetime | None = None 

417 ) -> list[DocumentStateRecord]: 

418 """Get all document states for a source, optionally filtered by date.""" 

419 self.logger.debug( 

420 f"Getting document state records for {source_config.source_type}:{source_config.source}" 

421 ) 

422 try: 

423 async with self._session_factory() as session: # type: ignore 

424 self.logger.debug( 

425 f"Created database session for {source_config.source_type}:{source_config.source}" 

426 ) 

427 query = select(DocumentStateRecord).filter( 

428 DocumentStateRecord.source_type == source_config.source_type, 

429 DocumentStateRecord.source == source_config.source, 

430 ) 

431 if since: 

432 query = query.filter(DocumentStateRecord.updated_at >= since) 

433 self.logger.debug( 

434 f"Executing query for {source_config.source_type}:{source_config.source}" 

435 ) 

436 result = await session.execute(query) 

437 self.logger.debug( 

438 f"Query executed, getting all records for {source_config.source_type}:{source_config.source}" 

439 ) 

440 records = list(result.scalars().all()) 

441 self.logger.debug( 

442 f"Got {len(records)} records for {source_config.source_type}:{source_config.source}" 

443 ) 

444 return records 

445 except Exception as e: 

446 self.logger.error( 

447 f"Error getting document state records for {source_config.source_type}:{source_config.source}: {str(e)}", 

448 exc_info=True, 

449 ) 

450 raise 

451 

452 async def update_document_state( 

453 self, document: Document, project_id: str | None = None 

454 ) -> DocumentStateRecord: 

455 """Update the state of a document.""" 

456 if not self._initialized: 

457 raise RuntimeError("StateManager not initialized. Call initialize() first.") 

458 

459 self.logger.debug( 

460 f"Updating document state for {document.source_type}:{document.source}:{document.id} (project: {project_id})" 

461 ) 

462 try: 

463 async with self._session_factory() as session: # type: ignore 

464 self.logger.debug( 

465 f"Created database session for {document.source_type}:{document.source}:{document.id}" 

466 ) 

467 self.logger.debug( 

468 f"Executing query to find document state for {document.source_type}:{document.source}:{document.id}" 

469 ) 

470 

471 # Build query with optional project filter 

472 query = select(DocumentStateRecord).filter( 

473 DocumentStateRecord.source_type == document.source_type, 

474 DocumentStateRecord.source == document.source, 

475 DocumentStateRecord.document_id == document.id, 

476 ) 

477 if project_id is not None: 

478 query = query.filter(DocumentStateRecord.project_id == project_id) 

479 

480 result = await session.execute(query) 

481 document_state_record = result.scalar_one_or_none() 

482 self.logger.debug( 

483 f"Query result: {'Found' if document_state_record else 'Not found'} document state for {document.source_type}:{document.source}:{document.id}" 

484 ) 

485 

486 now = datetime.now(UTC) 

487 

488 # Extract file conversion metadata from document 

489 metadata = document.metadata 

490 conversion_method = metadata.get("conversion_method") 

491 is_converted = conversion_method is not None 

492 conversion_failed = metadata.get("conversion_failed", False) 

493 

494 # Extract attachment metadata 

495 is_attachment = metadata.get("is_attachment", False) 

496 parent_document_id = metadata.get("parent_document_id") 

497 attachment_id = metadata.get("attachment_id") 

498 

499 if document_state_record: 

500 # Update existing record 

501 self.logger.debug( 

502 f"Updating existing document state for {document.source_type}:{document.source}:{document.id}" 

503 ) 

504 document_state_record.title = document.title # type: ignore 

505 document_state_record.content_hash = document.content_hash # type: ignore 

506 document_state_record.is_deleted = False # type: ignore 

507 document_state_record.updated_at = now # type: ignore 

508 

509 # Update file conversion metadata 

510 document_state_record.is_converted = is_converted # type: ignore 

511 document_state_record.conversion_method = conversion_method # type: ignore 

512 document_state_record.original_file_type = metadata.get("original_file_type") # type: ignore 

513 document_state_record.original_filename = metadata.get("original_filename") # type: ignore 

514 document_state_record.file_size = metadata.get("file_size") # type: ignore 

515 document_state_record.conversion_failed = conversion_failed # type: ignore 

516 document_state_record.conversion_error = metadata.get("conversion_error") # type: ignore 

517 document_state_record.conversion_time = metadata.get("conversion_time") # type: ignore 

518 

519 # Update attachment metadata 

520 document_state_record.is_attachment = is_attachment # type: ignore 

521 document_state_record.parent_document_id = parent_document_id # type: ignore 

522 document_state_record.attachment_id = attachment_id # type: ignore 

523 document_state_record.attachment_filename = metadata.get("attachment_filename") # type: ignore 

524 document_state_record.attachment_mime_type = metadata.get("attachment_mime_type") # type: ignore 

525 document_state_record.attachment_download_url = metadata.get("attachment_download_url") # type: ignore 

526 document_state_record.attachment_author = metadata.get("attachment_author") # type: ignore 

527 

528 # Handle attachment creation date 

529 attachment_created_str = metadata.get("attachment_created_at") 

530 if attachment_created_str: 

531 try: 

532 if isinstance(attachment_created_str, str): 

533 document_state_record.attachment_created_at = datetime.fromisoformat(attachment_created_str.replace("Z", "+00:00")) # type: ignore 

534 elif isinstance(attachment_created_str, datetime): 

535 document_state_record.attachment_created_at = attachment_created_str # type: ignore 

536 except (ValueError, TypeError) as e: 

537 self.logger.warning( 

538 f"Failed to parse attachment_created_at: {e}" 

539 ) 

540 document_state_record.attachment_created_at = None # type: ignore 

541 else: 

542 # Create new record 

543 self.logger.debug( 

544 f"Creating new document state for {document.source_type}:{document.source}:{document.id}" 

545 ) 

546 

547 # Handle attachment creation date for new records 

548 attachment_created_at = None 

549 attachment_created_str = metadata.get("attachment_created_at") 

550 if attachment_created_str: 

551 try: 

552 if isinstance(attachment_created_str, str): 

553 attachment_created_at = datetime.fromisoformat( 

554 attachment_created_str.replace("Z", "+00:00") 

555 ) 

556 elif isinstance(attachment_created_str, datetime): 

557 attachment_created_at = attachment_created_str 

558 except (ValueError, TypeError) as e: 

559 self.logger.warning( 

560 f"Failed to parse attachment_created_at: {e}" 

561 ) 

562 

563 document_state_record = DocumentStateRecord( 

564 project_id=project_id, 

565 document_id=document.id, 

566 source_type=document.source_type, 

567 source=document.source, 

568 url=document.url, 

569 title=document.title, 

570 content_hash=document.content_hash, 

571 is_deleted=False, 

572 created_at=now, 

573 updated_at=now, 

574 # File conversion metadata 

575 is_converted=is_converted, 

576 conversion_method=conversion_method, 

577 original_file_type=metadata.get("original_file_type"), 

578 original_filename=metadata.get("original_filename"), 

579 file_size=metadata.get("file_size"), 

580 conversion_failed=conversion_failed, 

581 conversion_error=metadata.get("conversion_error"), 

582 conversion_time=metadata.get("conversion_time"), 

583 # Attachment metadata 

584 is_attachment=is_attachment, 

585 parent_document_id=parent_document_id, 

586 attachment_id=attachment_id, 

587 attachment_filename=metadata.get("attachment_filename"), 

588 attachment_mime_type=metadata.get("attachment_mime_type"), 

589 attachment_download_url=metadata.get("attachment_download_url"), 

590 attachment_author=metadata.get("attachment_author"), 

591 attachment_created_at=attachment_created_at, 

592 ) 

593 session.add(document_state_record) 

594 

595 self.logger.debug( 

596 f"Committing changes for {document.source_type}:{document.source}:{document.id}" 

597 ) 

598 await session.commit() 

599 self.logger.debug( 

600 f"Successfully committed changes for {document.source_type}:{document.source}:{document.id}" 

601 ) 

602 

603 self.logger.debug( 

604 "Document state updated", 

605 extra={ 

606 "project_id": project_id, 

607 "document_id": document_state_record.document_id, 

608 "content_hash": document_state_record.content_hash, 

609 "updated_at": document_state_record.updated_at, 

610 "is_converted": document_state_record.is_converted, 

611 "is_attachment": document_state_record.is_attachment, 

612 "conversion_method": document_state_record.conversion_method, 

613 }, 

614 ) 

615 return document_state_record 

616 except Exception as e: 

617 self.logger.error( 

618 "Failed to update document state", 

619 extra={ 

620 "project_id": project_id, 

621 "document_id": document.id, 

622 "error": str(e), 

623 "error_type": type(e).__name__, 

624 }, 

625 ) 

626 raise 

627 

628 async def update_conversion_metrics( 

629 self, 

630 source_type: str, 

631 source: str, 

632 converted_files_count: int = 0, 

633 conversion_failures_count: int = 0, 

634 attachments_processed_count: int = 0, 

635 total_conversion_time: float = 0.0, 

636 ) -> None: 

637 """Update file conversion metrics for a source.""" 

638 self.logger.debug(f"Updating conversion metrics for {source_type}:{source}") 

639 try: 

640 async with self._session_factory() as session: # type: ignore 

641 result = await session.execute( 

642 select(IngestionHistory).filter_by( 

643 source_type=source_type, source=source 

644 ) 

645 ) 

646 ingestion = result.scalar_one_or_none() 

647 

648 if ingestion: 

649 # Update existing metrics 

650 ingestion.converted_files_count = (ingestion.converted_files_count or 0) + converted_files_count # type: ignore 

651 ingestion.conversion_failures_count = (ingestion.conversion_failures_count or 0) + conversion_failures_count # type: ignore 

652 ingestion.attachments_processed_count = (ingestion.attachments_processed_count or 0) + attachments_processed_count # type: ignore 

653 ingestion.total_conversion_time = (ingestion.total_conversion_time or 0.0) + total_conversion_time # type: ignore 

654 ingestion.updated_at = datetime.now(UTC) # type: ignore 

655 else: 

656 # Create new record with conversion metrics 

657 now = datetime.now(UTC) 

658 ingestion = IngestionHistory( 

659 source_type=source_type, 

660 source=source, 

661 last_successful_ingestion=now, 

662 status="SUCCESS", 

663 document_count=0, 

664 converted_files_count=converted_files_count, 

665 conversion_failures_count=conversion_failures_count, 

666 attachments_processed_count=attachments_processed_count, 

667 total_conversion_time=total_conversion_time, 

668 created_at=now, 

669 updated_at=now, 

670 ) 

671 session.add(ingestion) 

672 

673 await session.commit() 

674 self.logger.debug( 

675 "Conversion metrics updated", 

676 extra={ 

677 "source_type": source_type, 

678 "source": source, 

679 "converted_files": ingestion.converted_files_count, 

680 "conversion_failures": ingestion.conversion_failures_count, 

681 "attachments_processed": ingestion.attachments_processed_count, 

682 "total_conversion_time": ingestion.total_conversion_time, 

683 }, 

684 ) 

685 except Exception as e: 

686 self.logger.error( 

687 f"Error updating conversion metrics for {source_type}:{source}: {str(e)}", 

688 exc_info=True, 

689 ) 

690 raise 

691 

692 async def get_conversion_metrics( 

693 self, source_type: str, source: str 

694 ) -> dict[str, int | float]: 

695 """Get file conversion metrics for a source.""" 

696 self.logger.debug(f"Getting conversion metrics for {source_type}:{source}") 

697 try: 

698 async with self._session_factory() as session: # type: ignore 

699 result = await session.execute( 

700 select(IngestionHistory).filter_by( 

701 source_type=source_type, source=source 

702 ) 

703 ) 

704 ingestion = result.scalar_one_or_none() 

705 

706 if ingestion: 

707 # Access the actual values from the SQLAlchemy model instance 

708 converted_files: int | None = ingestion.converted_files_count # type: ignore 

709 conversion_failures: int | None = ingestion.conversion_failures_count # type: ignore 

710 attachments_processed: int | None = ingestion.attachments_processed_count # type: ignore 

711 total_time: float | None = ingestion.total_conversion_time # type: ignore 

712 

713 return { 

714 "converted_files_count": ( 

715 converted_files if converted_files is not None else 0 

716 ), 

717 "conversion_failures_count": ( 

718 conversion_failures 

719 if conversion_failures is not None 

720 else 0 

721 ), 

722 "attachments_processed_count": ( 

723 attachments_processed 

724 if attachments_processed is not None 

725 else 0 

726 ), 

727 "total_conversion_time": ( 

728 total_time if total_time is not None else 0.0 

729 ), 

730 } 

731 else: 

732 return { 

733 "converted_files_count": 0, 

734 "conversion_failures_count": 0, 

735 "attachments_processed_count": 0, 

736 "total_conversion_time": 0.0, 

737 } 

738 except Exception as e: 

739 self.logger.error( 

740 f"Error getting conversion metrics for {source_type}:{source}: {str(e)}", 

741 exc_info=True, 

742 ) 

743 raise 

744 

745 async def get_attachment_documents( 

746 self, parent_document_id: str 

747 ) -> list[DocumentStateRecord]: 

748 """Get all attachment documents for a parent document.""" 

749 self.logger.debug( 

750 f"Getting attachment documents for parent {parent_document_id}" 

751 ) 

752 try: 

753 async with self._session_factory() as session: # type: ignore 

754 result = await session.execute( 

755 select(DocumentStateRecord).filter( 

756 DocumentStateRecord.parent_document_id == parent_document_id, 

757 DocumentStateRecord.is_attachment == True, 

758 DocumentStateRecord.is_deleted == False, 

759 ) 

760 ) 

761 attachments = list(result.scalars().all()) 

762 self.logger.debug( 

763 f"Found {len(attachments)} attachments for parent {parent_document_id}" 

764 ) 

765 return attachments 

766 except Exception as e: 

767 self.logger.error( 

768 f"Error getting attachment documents for {parent_document_id}: {str(e)}", 

769 exc_info=True, 

770 ) 

771 raise 

772 

773 async def get_converted_documents( 

774 self, source_type: str, source: str, conversion_method: str | None = None 

775 ) -> list[DocumentStateRecord]: 

776 """Get all converted documents for a source, optionally filtered by conversion method.""" 

777 self.logger.debug(f"Getting converted documents for {source_type}:{source}") 

778 try: 

779 async with self._session_factory() as session: # type: ignore 

780 query = select(DocumentStateRecord).filter( 

781 DocumentStateRecord.source_type == source_type, 

782 DocumentStateRecord.source == source, 

783 DocumentStateRecord.is_converted == True, 

784 DocumentStateRecord.is_deleted == False, 

785 ) 

786 if conversion_method: 

787 query = query.filter( 

788 DocumentStateRecord.conversion_method == conversion_method 

789 ) 

790 

791 result = await session.execute(query) 

792 documents = list(result.scalars().all()) 

793 self.logger.debug( 

794 f"Found {len(documents)} converted documents for {source_type}:{source}" 

795 ) 

796 return documents 

797 except Exception as e: 

798 self.logger.error( 

799 f"Error getting converted documents for {source_type}:{source}: {str(e)}", 

800 exc_info=True, 

801 ) 

802 raise 

803 

804 async def close(self): 

805 """Close all database connections.""" 

806 if hasattr(self, "_engine") and self._engine is not None: 

807 self.logger.debug("Closing database connections") 

808 await self._engine.dispose() 

809 self.logger.debug("Database connections closed")