Coverage for src/qdrant_loader/core/state/state_manager.py: 79%

313 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1""" 

2State management service for tracking document ingestion state. 

3""" 

4 

5import os 

6import sqlite3 

7from datetime import UTC, datetime 

8 

9from sqlalchemy import select 

10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine 

11 

12from qdrant_loader.config.source_config import SourceConfig 

13from qdrant_loader.config.state import IngestionStatus, StateManagementConfig 

14from qdrant_loader.core.document import Document 

15from qdrant_loader.core.state.exceptions import DatabaseError 

16from qdrant_loader.core.state.models import Base, DocumentStateRecord, IngestionHistory 

17from qdrant_loader.utils.logging import LoggingConfig 

18 

19logger = LoggingConfig.get_logger(__name__) 

20 

21 

22class StateManager: 

23 """Manages state for document ingestion.""" 

24 

25 def __init__(self, config: StateManagementConfig): 

26 """Initialize the state manager with configuration.""" 

27 self.config = config 

28 self._initialized = False 

29 self._engine = None 

30 self._session_factory = None 

31 self.logger = LoggingConfig.get_logger(__name__) 

32 

33 async def __aenter__(self): 

34 """Async context manager entry.""" 

35 if not self._initialized: 

36 raise ValueError("StateManager not initialized. Call initialize() first.") 

37 await self.initialize() 

38 return self 

39 

40 async def __aexit__(self, exc_type, exc_val, exc_tb): 

41 """Async context manager exit.""" 

42 await self.dispose() 

43 

44 async def initialize(self): 

45 """Initialize the database schema and connection.""" 

46 if self._initialized: 

47 return 

48 

49 db_url = self.config.database_path 

50 if not db_url.startswith("sqlite:///"): 

51 db_url = f"sqlite:///{db_url}" 

52 

53 # Extract the actual file path from the URL 

54 db_file = db_url.replace("sqlite:///", "") 

55 

56 # Skip file permission check for in-memory database 

57 if db_file != ":memory:": 

58 # Check if the database file exists and is writable 

59 if os.path.exists(db_file) and not os.access(db_file, os.W_OK): 

60 raise DatabaseError( 

61 f"Database file '{db_file}' exists but is not writable. " 

62 "Please check file permissions." 

63 ) 

64 # If file doesn't exist, check if directory is writable 

65 elif not os.path.exists(db_file): 

66 db_dir = os.path.dirname(db_file) or "." 

67 if not os.access(db_dir, os.W_OK): 

68 raise DatabaseError( 

69 f"Cannot create database file in '{db_dir}'. " 

70 "Directory is not writable. Please check directory permissions." 

71 ) 

72 

73 # Create async engine for async operations 

74 engine_args = {} 

75 if not db_url == "sqlite:///:memory:": 

76 engine_args.update( 

77 { 

78 "pool_size": self.config.connection_pool["size"], 

79 "pool_timeout": self.config.connection_pool["timeout"], 

80 "pool_recycle": 3600, # Recycle connections after 1 hour 

81 "pool_pre_ping": True, # Enable connection health checks 

82 } 

83 ) 

84 

85 try: 

86 self.logger.debug(f"Creating async engine for database: {db_file}") 

87 self._engine = create_async_engine( 

88 f"sqlite+aiosqlite:///{db_file}", **engine_args 

89 ) 

90 

91 # Create async session factory 

92 self.logger.debug("Creating async session factory") 

93 self._session_factory = async_sessionmaker( 

94 bind=self._engine, 

95 expire_on_commit=False, # Prevent expired objects after commit 

96 autoflush=False, # Disable autoflush for better control 

97 ) 

98 

99 # Initialize schema 

100 self.logger.debug("Initializing database schema") 

101 async with self._engine.begin() as conn: 

102 await conn.run_sync(Base.metadata.create_all) 

103 

104 self._initialized = True 

105 self.logger.debug("StateManager initialized successfully") 

106 except sqlite3.OperationalError as e: 

107 # Handle specific SQLite errors 

108 if "readonly database" in str(e).lower(): 

109 raise DatabaseError( 

110 f"Cannot write to database '{db_file}'. Database is read-only." 

111 ) from e 

112 raise DatabaseError(f"Failed to initialize database: {e}") from e 

113 except Exception as e: 

114 raise DatabaseError(f"Unexpected error initializing database: {e}") from e 

115 

116 async def dispose(self): 

117 """Clean up resources.""" 

118 if self._engine: 

119 self.logger.debug("Disposing database engine") 

120 await self._engine.dispose() 

121 self._engine = None 

122 self._session_factory = None 

123 self._initialized = False 

124 self.logger.debug("StateManager resources disposed") 

125 

126 async def update_last_ingestion( 

127 self, 

128 source_type: str, 

129 source: str, 

130 status: str = IngestionStatus.SUCCESS, 

131 error_message: str | None = None, 

132 document_count: int = 0, 

133 project_id: str | None = None, 

134 ) -> None: 

135 """Update and get the last successful ingestion time for a source.""" 

136 self.logger.debug( 

137 f"Updating last ingestion for {source_type}:{source} (project: {project_id})" 

138 ) 

139 try: 

140 async with self._session_factory() as session: # type: ignore 

141 self.logger.debug( 

142 f"Created database session for {source_type}:{source}" 

143 ) 

144 now = datetime.now(UTC) 

145 self.logger.debug( 

146 f"Executing query to find ingestion history for {source_type}:{source}" 

147 ) 

148 

149 # Build query with optional project filter 

150 query = select(IngestionHistory).filter_by( 

151 source_type=source_type, source=source 

152 ) 

153 if project_id is not None: 

154 query = query.filter_by(project_id=project_id) 

155 

156 result = await session.execute(query) 

157 ingestion = result.scalar_one_or_none() 

158 self.logger.debug( 

159 f"Query result: {'Found' if ingestion else 'Not found'} ingestion history for {source_type}:{source}" 

160 ) 

161 

162 if ingestion: 

163 self.logger.debug( 

164 f"Updating existing ingestion history for {source_type}:{source}" 

165 ) 

166 ingestion.last_successful_ingestion = now if status == IngestionStatus.SUCCESS else ingestion.last_successful_ingestion # type: ignore 

167 ingestion.status = status # type: ignore 

168 ingestion.document_count = document_count if document_count else ingestion.document_count # type: ignore 

169 ingestion.updated_at = now # type: ignore 

170 ingestion.error_message = error_message # type: ignore 

171 else: 

172 self.logger.debug( 

173 f"Creating new ingestion history for {source_type}:{source}" 

174 ) 

175 ingestion = IngestionHistory( 

176 project_id=project_id, 

177 source_type=source_type, 

178 source=source, 

179 last_successful_ingestion=now, 

180 status=status, 

181 document_count=document_count, 

182 error_message=error_message, 

183 created_at=now, 

184 updated_at=now, 

185 ) 

186 session.add(ingestion) 

187 

188 self.logger.debug(f"Committing changes for {source_type}:{source}") 

189 await session.commit() 

190 self.logger.debug( 

191 f"Successfully committed changes for {source_type}:{source}" 

192 ) 

193 

194 self.logger.debug( 

195 "Ingestion history updated", 

196 extra={ 

197 "project_id": project_id, 

198 "source_type": ingestion.source_type, 

199 "source": ingestion.source, 

200 "status": ingestion.status, 

201 "document_count": ingestion.document_count, 

202 }, 

203 ) 

204 except Exception as e: 

205 self.logger.error( 

206 f"Error updating last ingestion for {source_type}:{source}: {str(e)}", 

207 exc_info=True, 

208 ) 

209 raise 

210 

211 async def get_last_ingestion( 

212 self, source_type: str, source: str, project_id: str | None = None 

213 ) -> IngestionHistory | None: 

214 """Get the last ingestion record for a source.""" 

215 self.logger.debug( 

216 f"Getting last ingestion for {source_type}:{source} (project: {project_id})" 

217 ) 

218 try: 

219 async with self._session_factory() as session: # type: ignore 

220 self.logger.debug( 

221 f"Created database session for {source_type}:{source}" 

222 ) 

223 self.logger.debug( 

224 f"Executing query to find last ingestion for {source_type}:{source}" 

225 ) 

226 

227 # Build query with optional project filter 

228 query = select(IngestionHistory).filter( 

229 IngestionHistory.source_type == source_type, 

230 IngestionHistory.source == source, 

231 ) 

232 if project_id is not None: 

233 query = query.filter(IngestionHistory.project_id == project_id) 

234 

235 query = query.order_by( 

236 IngestionHistory.last_successful_ingestion.desc() 

237 ) 

238 result = await session.execute(query) 

239 ingestion = result.scalar_one_or_none() 

240 self.logger.debug( 

241 f"Query result: {'Found' if ingestion else 'Not found'} last ingestion for {source_type}:{source}" 

242 ) 

243 return ingestion 

244 except Exception as e: 

245 self.logger.error( 

246 f"Error getting last ingestion for {source_type}:{source}: {str(e)}", 

247 exc_info=True, 

248 ) 

249 raise 

250 

251 async def mark_document_deleted( 

252 self, 

253 source_type: str, 

254 source: str, 

255 document_id: str, 

256 project_id: str | None = None, 

257 ) -> None: 

258 """Mark a document as deleted.""" 

259 self.logger.debug( 

260 f"Marking document as deleted: {source_type}:{source}:{document_id} (project: {project_id})" 

261 ) 

262 try: 

263 async with self._session_factory() as session: # type: ignore 

264 self.logger.debug( 

265 f"Created database session for {source_type}:{source}:{document_id}" 

266 ) 

267 now = datetime.now(UTC) 

268 self.logger.debug( 

269 "Searching for document to be deleted.", 

270 extra={ 

271 "document_id": document_id, 

272 "source_type": source_type, 

273 "source": source, 

274 "project_id": project_id, 

275 }, 

276 ) 

277 self.logger.debug( 

278 f"Executing query to find document {source_type}:{source}:{document_id}" 

279 ) 

280 

281 # Build query with optional project filter 

282 query = select(DocumentStateRecord).filter( 

283 DocumentStateRecord.source_type == source_type, 

284 DocumentStateRecord.source == source, 

285 DocumentStateRecord.document_id == document_id, 

286 ) 

287 if project_id is not None: 

288 query = query.filter(DocumentStateRecord.project_id == project_id) 

289 

290 result = await session.execute(query) 

291 state = result.scalar_one_or_none() 

292 self.logger.debug( 

293 f"Query result: {'Found' if state else 'Not found'} document {source_type}:{source}:{document_id}" 

294 ) 

295 

296 if state: 

297 self.logger.debug( 

298 f"Updating document state for {source_type}:{source}:{document_id}" 

299 ) 

300 state.is_deleted = True # type: ignore 

301 state.updated_at = now # type: ignore 

302 self.logger.debug( 

303 f"Committing changes for {source_type}:{source}:{document_id}" 

304 ) 

305 await session.commit() 

306 self.logger.debug( 

307 f"Successfully committed changes for {source_type}:{source}:{document_id}" 

308 ) 

309 self.logger.debug( 

310 "Document marked as deleted", 

311 extra={ 

312 "document_id": document_id, 

313 "source_type": source_type, 

314 "source": source, 

315 "project_id": project_id, 

316 }, 

317 ) 

318 else: 

319 self.logger.warning( 

320 f"Document not found: {source_type}:{source}:{document_id}" 

321 ) 

322 except Exception as e: 

323 self.logger.error( 

324 f"Error marking document as deleted {source_type}:{source}:{document_id}: {str(e)}", 

325 exc_info=True, 

326 ) 

327 raise 

328 

329 async def get_document_state_record( 

330 self, 

331 source_type: str, 

332 source: str, 

333 document_id: str, 

334 project_id: str | None = None, 

335 ) -> DocumentStateRecord | None: 

336 """Get the state of a document.""" 

337 self.logger.debug( 

338 f"Getting document state for {source_type}:{source}:{document_id} (project: {project_id})" 

339 ) 

340 try: 

341 async with self._session_factory() as session: # type: ignore 

342 self.logger.debug( 

343 f"Created database session for {source_type}:{source}:{document_id}" 

344 ) 

345 self.logger.debug( 

346 f"Executing query to find document state for {source_type}:{source}:{document_id}" 

347 ) 

348 

349 # Build query with optional project filter 

350 query = select(DocumentStateRecord).filter( 

351 DocumentStateRecord.source_type == source_type, 

352 DocumentStateRecord.source == source, 

353 DocumentStateRecord.document_id == document_id, 

354 ) 

355 if project_id is not None: 

356 query = query.filter(DocumentStateRecord.project_id == project_id) 

357 

358 result = await session.execute(query) 

359 state = result.scalar_one_or_none() 

360 self.logger.debug( 

361 f"Query result: {'Found' if state else 'Not found'} document state for {source_type}:{source}:{document_id}" 

362 ) 

363 return state 

364 except Exception as e: 

365 self.logger.error( 

366 f"Error getting document state for {source_type}:{source}:{document_id}: {str(e)}", 

367 exc_info=True, 

368 ) 

369 raise 

370 

371 async def get_document_state_records( 

372 self, source_config: SourceConfig, since: datetime | None = None 

373 ) -> list[DocumentStateRecord]: 

374 """Get all document states for a source, optionally filtered by date.""" 

375 self.logger.debug( 

376 f"Getting document state records for {source_config.source_type}:{source_config.source}" 

377 ) 

378 try: 

379 async with self._session_factory() as session: # type: ignore 

380 self.logger.debug( 

381 f"Created database session for {source_config.source_type}:{source_config.source}" 

382 ) 

383 query = select(DocumentStateRecord).filter( 

384 DocumentStateRecord.source_type == source_config.source_type, 

385 DocumentStateRecord.source == source_config.source, 

386 ) 

387 if since: 

388 query = query.filter(DocumentStateRecord.updated_at >= since) 

389 self.logger.debug( 

390 f"Executing query for {source_config.source_type}:{source_config.source}" 

391 ) 

392 result = await session.execute(query) 

393 self.logger.debug( 

394 f"Query executed, getting all records for {source_config.source_type}:{source_config.source}" 

395 ) 

396 records = list(result.scalars().all()) 

397 self.logger.debug( 

398 f"Got {len(records)} records for {source_config.source_type}:{source_config.source}" 

399 ) 

400 return records 

401 except Exception as e: 

402 self.logger.error( 

403 f"Error getting document state records for {source_config.source_type}:{source_config.source}: {str(e)}", 

404 exc_info=True, 

405 ) 

406 raise 

407 

408 async def update_document_state( 

409 self, document: Document, project_id: str | None = None 

410 ) -> DocumentStateRecord: 

411 """Update the state of a document.""" 

412 if not self._initialized: 

413 raise RuntimeError("StateManager not initialized. Call initialize() first.") 

414 

415 self.logger.debug( 

416 f"Updating document state for {document.source_type}:{document.source}:{document.id} (project: {project_id})" 

417 ) 

418 try: 

419 async with self._session_factory() as session: # type: ignore 

420 self.logger.debug( 

421 f"Created database session for {document.source_type}:{document.source}:{document.id}" 

422 ) 

423 self.logger.debug( 

424 f"Executing query to find document state for {document.source_type}:{document.source}:{document.id}" 

425 ) 

426 

427 # Build query with optional project filter 

428 query = select(DocumentStateRecord).filter( 

429 DocumentStateRecord.source_type == document.source_type, 

430 DocumentStateRecord.source == document.source, 

431 DocumentStateRecord.document_id == document.id, 

432 ) 

433 if project_id is not None: 

434 query = query.filter(DocumentStateRecord.project_id == project_id) 

435 

436 result = await session.execute(query) 

437 document_state_record = result.scalar_one_or_none() 

438 self.logger.debug( 

439 f"Query result: {'Found' if document_state_record else 'Not found'} document state for {document.source_type}:{document.source}:{document.id}" 

440 ) 

441 

442 now = datetime.now(UTC) 

443 

444 # Extract file conversion metadata from document 

445 metadata = document.metadata 

446 conversion_method = metadata.get("conversion_method") 

447 is_converted = conversion_method is not None 

448 conversion_failed = metadata.get("conversion_failed", False) 

449 

450 # Extract attachment metadata 

451 is_attachment = metadata.get("is_attachment", False) 

452 parent_document_id = metadata.get("parent_document_id") 

453 attachment_id = metadata.get("attachment_id") 

454 

455 if document_state_record: 

456 # Update existing record 

457 self.logger.debug( 

458 f"Updating existing document state for {document.source_type}:{document.source}:{document.id}" 

459 ) 

460 document_state_record.title = document.title # type: ignore 

461 document_state_record.content_hash = document.content_hash # type: ignore 

462 document_state_record.is_deleted = False # type: ignore 

463 document_state_record.updated_at = now # type: ignore 

464 

465 # Update file conversion metadata 

466 document_state_record.is_converted = is_converted # type: ignore 

467 document_state_record.conversion_method = conversion_method # type: ignore 

468 document_state_record.original_file_type = metadata.get("original_file_type") # type: ignore 

469 document_state_record.original_filename = metadata.get("original_filename") # type: ignore 

470 document_state_record.file_size = metadata.get("file_size") # type: ignore 

471 document_state_record.conversion_failed = conversion_failed # type: ignore 

472 document_state_record.conversion_error = metadata.get("conversion_error") # type: ignore 

473 document_state_record.conversion_time = metadata.get("conversion_time") # type: ignore 

474 

475 # Update attachment metadata 

476 document_state_record.is_attachment = is_attachment # type: ignore 

477 document_state_record.parent_document_id = parent_document_id # type: ignore 

478 document_state_record.attachment_id = attachment_id # type: ignore 

479 document_state_record.attachment_filename = metadata.get("attachment_filename") # type: ignore 

480 document_state_record.attachment_mime_type = metadata.get("attachment_mime_type") # type: ignore 

481 document_state_record.attachment_download_url = metadata.get("attachment_download_url") # type: ignore 

482 document_state_record.attachment_author = metadata.get("attachment_author") # type: ignore 

483 

484 # Handle attachment creation date 

485 attachment_created_str = metadata.get("attachment_created_at") 

486 if attachment_created_str: 

487 try: 

488 if isinstance(attachment_created_str, str): 

489 document_state_record.attachment_created_at = datetime.fromisoformat(attachment_created_str.replace("Z", "+00:00")) # type: ignore 

490 elif isinstance(attachment_created_str, datetime): 

491 document_state_record.attachment_created_at = attachment_created_str # type: ignore 

492 except (ValueError, TypeError) as e: 

493 self.logger.warning( 

494 f"Failed to parse attachment_created_at: {e}" 

495 ) 

496 document_state_record.attachment_created_at = None # type: ignore 

497 else: 

498 # Create new record 

499 self.logger.debug( 

500 f"Creating new document state for {document.source_type}:{document.source}:{document.id}" 

501 ) 

502 

503 # Handle attachment creation date for new records 

504 attachment_created_at = None 

505 attachment_created_str = metadata.get("attachment_created_at") 

506 if attachment_created_str: 

507 try: 

508 if isinstance(attachment_created_str, str): 

509 attachment_created_at = datetime.fromisoformat( 

510 attachment_created_str.replace("Z", "+00:00") 

511 ) 

512 elif isinstance(attachment_created_str, datetime): 

513 attachment_created_at = attachment_created_str 

514 except (ValueError, TypeError) as e: 

515 self.logger.warning( 

516 f"Failed to parse attachment_created_at: {e}" 

517 ) 

518 

519 document_state_record = DocumentStateRecord( 

520 project_id=project_id, 

521 document_id=document.id, 

522 source_type=document.source_type, 

523 source=document.source, 

524 url=document.url, 

525 title=document.title, 

526 content_hash=document.content_hash, 

527 is_deleted=False, 

528 created_at=now, 

529 updated_at=now, 

530 # File conversion metadata 

531 is_converted=is_converted, 

532 conversion_method=conversion_method, 

533 original_file_type=metadata.get("original_file_type"), 

534 original_filename=metadata.get("original_filename"), 

535 file_size=metadata.get("file_size"), 

536 conversion_failed=conversion_failed, 

537 conversion_error=metadata.get("conversion_error"), 

538 conversion_time=metadata.get("conversion_time"), 

539 # Attachment metadata 

540 is_attachment=is_attachment, 

541 parent_document_id=parent_document_id, 

542 attachment_id=attachment_id, 

543 attachment_filename=metadata.get("attachment_filename"), 

544 attachment_mime_type=metadata.get("attachment_mime_type"), 

545 attachment_download_url=metadata.get("attachment_download_url"), 

546 attachment_author=metadata.get("attachment_author"), 

547 attachment_created_at=attachment_created_at, 

548 ) 

549 session.add(document_state_record) 

550 

551 self.logger.debug( 

552 f"Committing changes for {document.source_type}:{document.source}:{document.id}" 

553 ) 

554 await session.commit() 

555 self.logger.debug( 

556 f"Successfully committed changes for {document.source_type}:{document.source}:{document.id}" 

557 ) 

558 

559 self.logger.debug( 

560 "Document state updated", 

561 extra={ 

562 "project_id": project_id, 

563 "document_id": document_state_record.document_id, 

564 "content_hash": document_state_record.content_hash, 

565 "updated_at": document_state_record.updated_at, 

566 "is_converted": document_state_record.is_converted, 

567 "is_attachment": document_state_record.is_attachment, 

568 "conversion_method": document_state_record.conversion_method, 

569 }, 

570 ) 

571 return document_state_record 

572 except Exception as e: 

573 self.logger.error( 

574 "Failed to update document state", 

575 extra={ 

576 "project_id": project_id, 

577 "document_id": document.id, 

578 "error": str(e), 

579 "error_type": type(e).__name__, 

580 }, 

581 ) 

582 raise 

583 

584 async def update_conversion_metrics( 

585 self, 

586 source_type: str, 

587 source: str, 

588 converted_files_count: int = 0, 

589 conversion_failures_count: int = 0, 

590 attachments_processed_count: int = 0, 

591 total_conversion_time: float = 0.0, 

592 ) -> None: 

593 """Update file conversion metrics for a source.""" 

594 self.logger.debug(f"Updating conversion metrics for {source_type}:{source}") 

595 try: 

596 async with self._session_factory() as session: # type: ignore 

597 result = await session.execute( 

598 select(IngestionHistory).filter_by( 

599 source_type=source_type, source=source 

600 ) 

601 ) 

602 ingestion = result.scalar_one_or_none() 

603 

604 if ingestion: 

605 # Update existing metrics 

606 ingestion.converted_files_count = (ingestion.converted_files_count or 0) + converted_files_count # type: ignore 

607 ingestion.conversion_failures_count = (ingestion.conversion_failures_count or 0) + conversion_failures_count # type: ignore 

608 ingestion.attachments_processed_count = (ingestion.attachments_processed_count or 0) + attachments_processed_count # type: ignore 

609 ingestion.total_conversion_time = (ingestion.total_conversion_time or 0.0) + total_conversion_time # type: ignore 

610 ingestion.updated_at = datetime.now(UTC) # type: ignore 

611 else: 

612 # Create new record with conversion metrics 

613 now = datetime.now(UTC) 

614 ingestion = IngestionHistory( 

615 source_type=source_type, 

616 source=source, 

617 last_successful_ingestion=now, 

618 status="SUCCESS", 

619 document_count=0, 

620 converted_files_count=converted_files_count, 

621 conversion_failures_count=conversion_failures_count, 

622 attachments_processed_count=attachments_processed_count, 

623 total_conversion_time=total_conversion_time, 

624 created_at=now, 

625 updated_at=now, 

626 ) 

627 session.add(ingestion) 

628 

629 await session.commit() 

630 self.logger.debug( 

631 "Conversion metrics updated", 

632 extra={ 

633 "source_type": source_type, 

634 "source": source, 

635 "converted_files": ingestion.converted_files_count, 

636 "conversion_failures": ingestion.conversion_failures_count, 

637 "attachments_processed": ingestion.attachments_processed_count, 

638 "total_conversion_time": ingestion.total_conversion_time, 

639 }, 

640 ) 

641 except Exception as e: 

642 self.logger.error( 

643 f"Error updating conversion metrics for {source_type}:{source}: {str(e)}", 

644 exc_info=True, 

645 ) 

646 raise 

647 

648 async def get_conversion_metrics( 

649 self, source_type: str, source: str 

650 ) -> dict[str, int | float]: 

651 """Get file conversion metrics for a source.""" 

652 self.logger.debug(f"Getting conversion metrics for {source_type}:{source}") 

653 try: 

654 async with self._session_factory() as session: # type: ignore 

655 result = await session.execute( 

656 select(IngestionHistory).filter_by( 

657 source_type=source_type, source=source 

658 ) 

659 ) 

660 ingestion = result.scalar_one_or_none() 

661 

662 if ingestion: 

663 # Access the actual values from the SQLAlchemy model instance 

664 converted_files: int | None = ingestion.converted_files_count # type: ignore 

665 conversion_failures: int | None = ingestion.conversion_failures_count # type: ignore 

666 attachments_processed: int | None = ingestion.attachments_processed_count # type: ignore 

667 total_time: float | None = ingestion.total_conversion_time # type: ignore 

668 

669 return { 

670 "converted_files_count": ( 

671 converted_files if converted_files is not None else 0 

672 ), 

673 "conversion_failures_count": ( 

674 conversion_failures 

675 if conversion_failures is not None 

676 else 0 

677 ), 

678 "attachments_processed_count": ( 

679 attachments_processed 

680 if attachments_processed is not None 

681 else 0 

682 ), 

683 "total_conversion_time": ( 

684 total_time if total_time is not None else 0.0 

685 ), 

686 } 

687 else: 

688 return { 

689 "converted_files_count": 0, 

690 "conversion_failures_count": 0, 

691 "attachments_processed_count": 0, 

692 "total_conversion_time": 0.0, 

693 } 

694 except Exception as e: 

695 self.logger.error( 

696 f"Error getting conversion metrics for {source_type}:{source}: {str(e)}", 

697 exc_info=True, 

698 ) 

699 raise 

700 

701 async def get_attachment_documents( 

702 self, parent_document_id: str 

703 ) -> list[DocumentStateRecord]: 

704 """Get all attachment documents for a parent document.""" 

705 self.logger.debug( 

706 f"Getting attachment documents for parent {parent_document_id}" 

707 ) 

708 try: 

709 async with self._session_factory() as session: # type: ignore 

710 result = await session.execute( 

711 select(DocumentStateRecord).filter( 

712 DocumentStateRecord.parent_document_id == parent_document_id, 

713 DocumentStateRecord.is_attachment == True, 

714 DocumentStateRecord.is_deleted == False, 

715 ) 

716 ) 

717 attachments = list(result.scalars().all()) 

718 self.logger.debug( 

719 f"Found {len(attachments)} attachments for parent {parent_document_id}" 

720 ) 

721 return attachments 

722 except Exception as e: 

723 self.logger.error( 

724 f"Error getting attachment documents for {parent_document_id}: {str(e)}", 

725 exc_info=True, 

726 ) 

727 raise 

728 

729 async def get_converted_documents( 

730 self, source_type: str, source: str, conversion_method: str | None = None 

731 ) -> list[DocumentStateRecord]: 

732 """Get all converted documents for a source, optionally filtered by conversion method.""" 

733 self.logger.debug(f"Getting converted documents for {source_type}:{source}") 

734 try: 

735 async with self._session_factory() as session: # type: ignore 

736 query = select(DocumentStateRecord).filter( 

737 DocumentStateRecord.source_type == source_type, 

738 DocumentStateRecord.source == source, 

739 DocumentStateRecord.is_converted == True, 

740 DocumentStateRecord.is_deleted == False, 

741 ) 

742 if conversion_method: 

743 query = query.filter( 

744 DocumentStateRecord.conversion_method == conversion_method 

745 ) 

746 

747 result = await session.execute(query) 

748 documents = list(result.scalars().all()) 

749 self.logger.debug( 

750 f"Found {len(documents)} converted documents for {source_type}:{source}" 

751 ) 

752 return documents 

753 except Exception as e: 

754 self.logger.error( 

755 f"Error getting converted documents for {source_type}:{source}: {str(e)}", 

756 exc_info=True, 

757 ) 

758 raise 

759 

760 async def close(self): 

761 """Close all database connections.""" 

762 if hasattr(self, "_engine") and self._engine is not None: 

763 self.logger.debug("Closing database connections") 

764 await self._engine.dispose() 

765 self.logger.debug("Database connections closed")