Coverage for src/qdrant_loader/core/state/state_manager.py: 79%
313 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""
2State management service for tracking document ingestion state.
3"""
5import os
6import sqlite3
7from datetime import UTC, datetime
9from sqlalchemy import select
10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
12from qdrant_loader.config.source_config import SourceConfig
13from qdrant_loader.config.state import IngestionStatus, StateManagementConfig
14from qdrant_loader.core.document import Document
15from qdrant_loader.core.state.exceptions import DatabaseError
16from qdrant_loader.core.state.models import Base, DocumentStateRecord, IngestionHistory
17from qdrant_loader.utils.logging import LoggingConfig
19logger = LoggingConfig.get_logger(__name__)
22class StateManager:
23 """Manages state for document ingestion."""
25 def __init__(self, config: StateManagementConfig):
26 """Initialize the state manager with configuration."""
27 self.config = config
28 self._initialized = False
29 self._engine = None
30 self._session_factory = None
31 self.logger = LoggingConfig.get_logger(__name__)
33 async def __aenter__(self):
34 """Async context manager entry."""
35 if not self._initialized:
36 raise ValueError("StateManager not initialized. Call initialize() first.")
37 await self.initialize()
38 return self
40 async def __aexit__(self, exc_type, exc_val, exc_tb):
41 """Async context manager exit."""
42 await self.dispose()
44 async def initialize(self):
45 """Initialize the database schema and connection."""
46 if self._initialized:
47 return
49 db_url = self.config.database_path
50 if not db_url.startswith("sqlite:///"):
51 db_url = f"sqlite:///{db_url}"
53 # Extract the actual file path from the URL
54 db_file = db_url.replace("sqlite:///", "")
56 # Skip file permission check for in-memory database
57 if db_file != ":memory:":
58 # Check if the database file exists and is writable
59 if os.path.exists(db_file) and not os.access(db_file, os.W_OK):
60 raise DatabaseError(
61 f"Database file '{db_file}' exists but is not writable. "
62 "Please check file permissions."
63 )
64 # If file doesn't exist, check if directory is writable
65 elif not os.path.exists(db_file):
66 db_dir = os.path.dirname(db_file) or "."
67 if not os.access(db_dir, os.W_OK):
68 raise DatabaseError(
69 f"Cannot create database file in '{db_dir}'. "
70 "Directory is not writable. Please check directory permissions."
71 )
73 # Create async engine for async operations
74 engine_args = {}
75 if not db_url == "sqlite:///:memory:":
76 engine_args.update(
77 {
78 "pool_size": self.config.connection_pool["size"],
79 "pool_timeout": self.config.connection_pool["timeout"],
80 "pool_recycle": 3600, # Recycle connections after 1 hour
81 "pool_pre_ping": True, # Enable connection health checks
82 }
83 )
85 try:
86 self.logger.debug(f"Creating async engine for database: {db_file}")
87 self._engine = create_async_engine(
88 f"sqlite+aiosqlite:///{db_file}", **engine_args
89 )
91 # Create async session factory
92 self.logger.debug("Creating async session factory")
93 self._session_factory = async_sessionmaker(
94 bind=self._engine,
95 expire_on_commit=False, # Prevent expired objects after commit
96 autoflush=False, # Disable autoflush for better control
97 )
99 # Initialize schema
100 self.logger.debug("Initializing database schema")
101 async with self._engine.begin() as conn:
102 await conn.run_sync(Base.metadata.create_all)
104 self._initialized = True
105 self.logger.debug("StateManager initialized successfully")
106 except sqlite3.OperationalError as e:
107 # Handle specific SQLite errors
108 if "readonly database" in str(e).lower():
109 raise DatabaseError(
110 f"Cannot write to database '{db_file}'. Database is read-only."
111 ) from e
112 raise DatabaseError(f"Failed to initialize database: {e}") from e
113 except Exception as e:
114 raise DatabaseError(f"Unexpected error initializing database: {e}") from e
116 async def dispose(self):
117 """Clean up resources."""
118 if self._engine:
119 self.logger.debug("Disposing database engine")
120 await self._engine.dispose()
121 self._engine = None
122 self._session_factory = None
123 self._initialized = False
124 self.logger.debug("StateManager resources disposed")
126 async def update_last_ingestion(
127 self,
128 source_type: str,
129 source: str,
130 status: str = IngestionStatus.SUCCESS,
131 error_message: str | None = None,
132 document_count: int = 0,
133 project_id: str | None = None,
134 ) -> None:
135 """Update and get the last successful ingestion time for a source."""
136 self.logger.debug(
137 f"Updating last ingestion for {source_type}:{source} (project: {project_id})"
138 )
139 try:
140 async with self._session_factory() as session: # type: ignore
141 self.logger.debug(
142 f"Created database session for {source_type}:{source}"
143 )
144 now = datetime.now(UTC)
145 self.logger.debug(
146 f"Executing query to find ingestion history for {source_type}:{source}"
147 )
149 # Build query with optional project filter
150 query = select(IngestionHistory).filter_by(
151 source_type=source_type, source=source
152 )
153 if project_id is not None:
154 query = query.filter_by(project_id=project_id)
156 result = await session.execute(query)
157 ingestion = result.scalar_one_or_none()
158 self.logger.debug(
159 f"Query result: {'Found' if ingestion else 'Not found'} ingestion history for {source_type}:{source}"
160 )
162 if ingestion:
163 self.logger.debug(
164 f"Updating existing ingestion history for {source_type}:{source}"
165 )
166 ingestion.last_successful_ingestion = now if status == IngestionStatus.SUCCESS else ingestion.last_successful_ingestion # type: ignore
167 ingestion.status = status # type: ignore
168 ingestion.document_count = document_count if document_count else ingestion.document_count # type: ignore
169 ingestion.updated_at = now # type: ignore
170 ingestion.error_message = error_message # type: ignore
171 else:
172 self.logger.debug(
173 f"Creating new ingestion history for {source_type}:{source}"
174 )
175 ingestion = IngestionHistory(
176 project_id=project_id,
177 source_type=source_type,
178 source=source,
179 last_successful_ingestion=now,
180 status=status,
181 document_count=document_count,
182 error_message=error_message,
183 created_at=now,
184 updated_at=now,
185 )
186 session.add(ingestion)
188 self.logger.debug(f"Committing changes for {source_type}:{source}")
189 await session.commit()
190 self.logger.debug(
191 f"Successfully committed changes for {source_type}:{source}"
192 )
194 self.logger.debug(
195 "Ingestion history updated",
196 extra={
197 "project_id": project_id,
198 "source_type": ingestion.source_type,
199 "source": ingestion.source,
200 "status": ingestion.status,
201 "document_count": ingestion.document_count,
202 },
203 )
204 except Exception as e:
205 self.logger.error(
206 f"Error updating last ingestion for {source_type}:{source}: {str(e)}",
207 exc_info=True,
208 )
209 raise
211 async def get_last_ingestion(
212 self, source_type: str, source: str, project_id: str | None = None
213 ) -> IngestionHistory | None:
214 """Get the last ingestion record for a source."""
215 self.logger.debug(
216 f"Getting last ingestion for {source_type}:{source} (project: {project_id})"
217 )
218 try:
219 async with self._session_factory() as session: # type: ignore
220 self.logger.debug(
221 f"Created database session for {source_type}:{source}"
222 )
223 self.logger.debug(
224 f"Executing query to find last ingestion for {source_type}:{source}"
225 )
227 # Build query with optional project filter
228 query = select(IngestionHistory).filter(
229 IngestionHistory.source_type == source_type,
230 IngestionHistory.source == source,
231 )
232 if project_id is not None:
233 query = query.filter(IngestionHistory.project_id == project_id)
235 query = query.order_by(
236 IngestionHistory.last_successful_ingestion.desc()
237 )
238 result = await session.execute(query)
239 ingestion = result.scalar_one_or_none()
240 self.logger.debug(
241 f"Query result: {'Found' if ingestion else 'Not found'} last ingestion for {source_type}:{source}"
242 )
243 return ingestion
244 except Exception as e:
245 self.logger.error(
246 f"Error getting last ingestion for {source_type}:{source}: {str(e)}",
247 exc_info=True,
248 )
249 raise
251 async def mark_document_deleted(
252 self,
253 source_type: str,
254 source: str,
255 document_id: str,
256 project_id: str | None = None,
257 ) -> None:
258 """Mark a document as deleted."""
259 self.logger.debug(
260 f"Marking document as deleted: {source_type}:{source}:{document_id} (project: {project_id})"
261 )
262 try:
263 async with self._session_factory() as session: # type: ignore
264 self.logger.debug(
265 f"Created database session for {source_type}:{source}:{document_id}"
266 )
267 now = datetime.now(UTC)
268 self.logger.debug(
269 "Searching for document to be deleted.",
270 extra={
271 "document_id": document_id,
272 "source_type": source_type,
273 "source": source,
274 "project_id": project_id,
275 },
276 )
277 self.logger.debug(
278 f"Executing query to find document {source_type}:{source}:{document_id}"
279 )
281 # Build query with optional project filter
282 query = select(DocumentStateRecord).filter(
283 DocumentStateRecord.source_type == source_type,
284 DocumentStateRecord.source == source,
285 DocumentStateRecord.document_id == document_id,
286 )
287 if project_id is not None:
288 query = query.filter(DocumentStateRecord.project_id == project_id)
290 result = await session.execute(query)
291 state = result.scalar_one_or_none()
292 self.logger.debug(
293 f"Query result: {'Found' if state else 'Not found'} document {source_type}:{source}:{document_id}"
294 )
296 if state:
297 self.logger.debug(
298 f"Updating document state for {source_type}:{source}:{document_id}"
299 )
300 state.is_deleted = True # type: ignore
301 state.updated_at = now # type: ignore
302 self.logger.debug(
303 f"Committing changes for {source_type}:{source}:{document_id}"
304 )
305 await session.commit()
306 self.logger.debug(
307 f"Successfully committed changes for {source_type}:{source}:{document_id}"
308 )
309 self.logger.debug(
310 "Document marked as deleted",
311 extra={
312 "document_id": document_id,
313 "source_type": source_type,
314 "source": source,
315 "project_id": project_id,
316 },
317 )
318 else:
319 self.logger.warning(
320 f"Document not found: {source_type}:{source}:{document_id}"
321 )
322 except Exception as e:
323 self.logger.error(
324 f"Error marking document as deleted {source_type}:{source}:{document_id}: {str(e)}",
325 exc_info=True,
326 )
327 raise
329 async def get_document_state_record(
330 self,
331 source_type: str,
332 source: str,
333 document_id: str,
334 project_id: str | None = None,
335 ) -> DocumentStateRecord | None:
336 """Get the state of a document."""
337 self.logger.debug(
338 f"Getting document state for {source_type}:{source}:{document_id} (project: {project_id})"
339 )
340 try:
341 async with self._session_factory() as session: # type: ignore
342 self.logger.debug(
343 f"Created database session for {source_type}:{source}:{document_id}"
344 )
345 self.logger.debug(
346 f"Executing query to find document state for {source_type}:{source}:{document_id}"
347 )
349 # Build query with optional project filter
350 query = select(DocumentStateRecord).filter(
351 DocumentStateRecord.source_type == source_type,
352 DocumentStateRecord.source == source,
353 DocumentStateRecord.document_id == document_id,
354 )
355 if project_id is not None:
356 query = query.filter(DocumentStateRecord.project_id == project_id)
358 result = await session.execute(query)
359 state = result.scalar_one_or_none()
360 self.logger.debug(
361 f"Query result: {'Found' if state else 'Not found'} document state for {source_type}:{source}:{document_id}"
362 )
363 return state
364 except Exception as e:
365 self.logger.error(
366 f"Error getting document state for {source_type}:{source}:{document_id}: {str(e)}",
367 exc_info=True,
368 )
369 raise
371 async def get_document_state_records(
372 self, source_config: SourceConfig, since: datetime | None = None
373 ) -> list[DocumentStateRecord]:
374 """Get all document states for a source, optionally filtered by date."""
375 self.logger.debug(
376 f"Getting document state records for {source_config.source_type}:{source_config.source}"
377 )
378 try:
379 async with self._session_factory() as session: # type: ignore
380 self.logger.debug(
381 f"Created database session for {source_config.source_type}:{source_config.source}"
382 )
383 query = select(DocumentStateRecord).filter(
384 DocumentStateRecord.source_type == source_config.source_type,
385 DocumentStateRecord.source == source_config.source,
386 )
387 if since:
388 query = query.filter(DocumentStateRecord.updated_at >= since)
389 self.logger.debug(
390 f"Executing query for {source_config.source_type}:{source_config.source}"
391 )
392 result = await session.execute(query)
393 self.logger.debug(
394 f"Query executed, getting all records for {source_config.source_type}:{source_config.source}"
395 )
396 records = list(result.scalars().all())
397 self.logger.debug(
398 f"Got {len(records)} records for {source_config.source_type}:{source_config.source}"
399 )
400 return records
401 except Exception as e:
402 self.logger.error(
403 f"Error getting document state records for {source_config.source_type}:{source_config.source}: {str(e)}",
404 exc_info=True,
405 )
406 raise
408 async def update_document_state(
409 self, document: Document, project_id: str | None = None
410 ) -> DocumentStateRecord:
411 """Update the state of a document."""
412 if not self._initialized:
413 raise RuntimeError("StateManager not initialized. Call initialize() first.")
415 self.logger.debug(
416 f"Updating document state for {document.source_type}:{document.source}:{document.id} (project: {project_id})"
417 )
418 try:
419 async with self._session_factory() as session: # type: ignore
420 self.logger.debug(
421 f"Created database session for {document.source_type}:{document.source}:{document.id}"
422 )
423 self.logger.debug(
424 f"Executing query to find document state for {document.source_type}:{document.source}:{document.id}"
425 )
427 # Build query with optional project filter
428 query = select(DocumentStateRecord).filter(
429 DocumentStateRecord.source_type == document.source_type,
430 DocumentStateRecord.source == document.source,
431 DocumentStateRecord.document_id == document.id,
432 )
433 if project_id is not None:
434 query = query.filter(DocumentStateRecord.project_id == project_id)
436 result = await session.execute(query)
437 document_state_record = result.scalar_one_or_none()
438 self.logger.debug(
439 f"Query result: {'Found' if document_state_record else 'Not found'} document state for {document.source_type}:{document.source}:{document.id}"
440 )
442 now = datetime.now(UTC)
444 # Extract file conversion metadata from document
445 metadata = document.metadata
446 conversion_method = metadata.get("conversion_method")
447 is_converted = conversion_method is not None
448 conversion_failed = metadata.get("conversion_failed", False)
450 # Extract attachment metadata
451 is_attachment = metadata.get("is_attachment", False)
452 parent_document_id = metadata.get("parent_document_id")
453 attachment_id = metadata.get("attachment_id")
455 if document_state_record:
456 # Update existing record
457 self.logger.debug(
458 f"Updating existing document state for {document.source_type}:{document.source}:{document.id}"
459 )
460 document_state_record.title = document.title # type: ignore
461 document_state_record.content_hash = document.content_hash # type: ignore
462 document_state_record.is_deleted = False # type: ignore
463 document_state_record.updated_at = now # type: ignore
465 # Update file conversion metadata
466 document_state_record.is_converted = is_converted # type: ignore
467 document_state_record.conversion_method = conversion_method # type: ignore
468 document_state_record.original_file_type = metadata.get("original_file_type") # type: ignore
469 document_state_record.original_filename = metadata.get("original_filename") # type: ignore
470 document_state_record.file_size = metadata.get("file_size") # type: ignore
471 document_state_record.conversion_failed = conversion_failed # type: ignore
472 document_state_record.conversion_error = metadata.get("conversion_error") # type: ignore
473 document_state_record.conversion_time = metadata.get("conversion_time") # type: ignore
475 # Update attachment metadata
476 document_state_record.is_attachment = is_attachment # type: ignore
477 document_state_record.parent_document_id = parent_document_id # type: ignore
478 document_state_record.attachment_id = attachment_id # type: ignore
479 document_state_record.attachment_filename = metadata.get("attachment_filename") # type: ignore
480 document_state_record.attachment_mime_type = metadata.get("attachment_mime_type") # type: ignore
481 document_state_record.attachment_download_url = metadata.get("attachment_download_url") # type: ignore
482 document_state_record.attachment_author = metadata.get("attachment_author") # type: ignore
484 # Handle attachment creation date
485 attachment_created_str = metadata.get("attachment_created_at")
486 if attachment_created_str:
487 try:
488 if isinstance(attachment_created_str, str):
489 document_state_record.attachment_created_at = datetime.fromisoformat(attachment_created_str.replace("Z", "+00:00")) # type: ignore
490 elif isinstance(attachment_created_str, datetime):
491 document_state_record.attachment_created_at = attachment_created_str # type: ignore
492 except (ValueError, TypeError) as e:
493 self.logger.warning(
494 f"Failed to parse attachment_created_at: {e}"
495 )
496 document_state_record.attachment_created_at = None # type: ignore
497 else:
498 # Create new record
499 self.logger.debug(
500 f"Creating new document state for {document.source_type}:{document.source}:{document.id}"
501 )
503 # Handle attachment creation date for new records
504 attachment_created_at = None
505 attachment_created_str = metadata.get("attachment_created_at")
506 if attachment_created_str:
507 try:
508 if isinstance(attachment_created_str, str):
509 attachment_created_at = datetime.fromisoformat(
510 attachment_created_str.replace("Z", "+00:00")
511 )
512 elif isinstance(attachment_created_str, datetime):
513 attachment_created_at = attachment_created_str
514 except (ValueError, TypeError) as e:
515 self.logger.warning(
516 f"Failed to parse attachment_created_at: {e}"
517 )
519 document_state_record = DocumentStateRecord(
520 project_id=project_id,
521 document_id=document.id,
522 source_type=document.source_type,
523 source=document.source,
524 url=document.url,
525 title=document.title,
526 content_hash=document.content_hash,
527 is_deleted=False,
528 created_at=now,
529 updated_at=now,
530 # File conversion metadata
531 is_converted=is_converted,
532 conversion_method=conversion_method,
533 original_file_type=metadata.get("original_file_type"),
534 original_filename=metadata.get("original_filename"),
535 file_size=metadata.get("file_size"),
536 conversion_failed=conversion_failed,
537 conversion_error=metadata.get("conversion_error"),
538 conversion_time=metadata.get("conversion_time"),
539 # Attachment metadata
540 is_attachment=is_attachment,
541 parent_document_id=parent_document_id,
542 attachment_id=attachment_id,
543 attachment_filename=metadata.get("attachment_filename"),
544 attachment_mime_type=metadata.get("attachment_mime_type"),
545 attachment_download_url=metadata.get("attachment_download_url"),
546 attachment_author=metadata.get("attachment_author"),
547 attachment_created_at=attachment_created_at,
548 )
549 session.add(document_state_record)
551 self.logger.debug(
552 f"Committing changes for {document.source_type}:{document.source}:{document.id}"
553 )
554 await session.commit()
555 self.logger.debug(
556 f"Successfully committed changes for {document.source_type}:{document.source}:{document.id}"
557 )
559 self.logger.debug(
560 "Document state updated",
561 extra={
562 "project_id": project_id,
563 "document_id": document_state_record.document_id,
564 "content_hash": document_state_record.content_hash,
565 "updated_at": document_state_record.updated_at,
566 "is_converted": document_state_record.is_converted,
567 "is_attachment": document_state_record.is_attachment,
568 "conversion_method": document_state_record.conversion_method,
569 },
570 )
571 return document_state_record
572 except Exception as e:
573 self.logger.error(
574 "Failed to update document state",
575 extra={
576 "project_id": project_id,
577 "document_id": document.id,
578 "error": str(e),
579 "error_type": type(e).__name__,
580 },
581 )
582 raise
584 async def update_conversion_metrics(
585 self,
586 source_type: str,
587 source: str,
588 converted_files_count: int = 0,
589 conversion_failures_count: int = 0,
590 attachments_processed_count: int = 0,
591 total_conversion_time: float = 0.0,
592 ) -> None:
593 """Update file conversion metrics for a source."""
594 self.logger.debug(f"Updating conversion metrics for {source_type}:{source}")
595 try:
596 async with self._session_factory() as session: # type: ignore
597 result = await session.execute(
598 select(IngestionHistory).filter_by(
599 source_type=source_type, source=source
600 )
601 )
602 ingestion = result.scalar_one_or_none()
604 if ingestion:
605 # Update existing metrics
606 ingestion.converted_files_count = (ingestion.converted_files_count or 0) + converted_files_count # type: ignore
607 ingestion.conversion_failures_count = (ingestion.conversion_failures_count or 0) + conversion_failures_count # type: ignore
608 ingestion.attachments_processed_count = (ingestion.attachments_processed_count or 0) + attachments_processed_count # type: ignore
609 ingestion.total_conversion_time = (ingestion.total_conversion_time or 0.0) + total_conversion_time # type: ignore
610 ingestion.updated_at = datetime.now(UTC) # type: ignore
611 else:
612 # Create new record with conversion metrics
613 now = datetime.now(UTC)
614 ingestion = IngestionHistory(
615 source_type=source_type,
616 source=source,
617 last_successful_ingestion=now,
618 status="SUCCESS",
619 document_count=0,
620 converted_files_count=converted_files_count,
621 conversion_failures_count=conversion_failures_count,
622 attachments_processed_count=attachments_processed_count,
623 total_conversion_time=total_conversion_time,
624 created_at=now,
625 updated_at=now,
626 )
627 session.add(ingestion)
629 await session.commit()
630 self.logger.debug(
631 "Conversion metrics updated",
632 extra={
633 "source_type": source_type,
634 "source": source,
635 "converted_files": ingestion.converted_files_count,
636 "conversion_failures": ingestion.conversion_failures_count,
637 "attachments_processed": ingestion.attachments_processed_count,
638 "total_conversion_time": ingestion.total_conversion_time,
639 },
640 )
641 except Exception as e:
642 self.logger.error(
643 f"Error updating conversion metrics for {source_type}:{source}: {str(e)}",
644 exc_info=True,
645 )
646 raise
648 async def get_conversion_metrics(
649 self, source_type: str, source: str
650 ) -> dict[str, int | float]:
651 """Get file conversion metrics for a source."""
652 self.logger.debug(f"Getting conversion metrics for {source_type}:{source}")
653 try:
654 async with self._session_factory() as session: # type: ignore
655 result = await session.execute(
656 select(IngestionHistory).filter_by(
657 source_type=source_type, source=source
658 )
659 )
660 ingestion = result.scalar_one_or_none()
662 if ingestion:
663 # Access the actual values from the SQLAlchemy model instance
664 converted_files: int | None = ingestion.converted_files_count # type: ignore
665 conversion_failures: int | None = ingestion.conversion_failures_count # type: ignore
666 attachments_processed: int | None = ingestion.attachments_processed_count # type: ignore
667 total_time: float | None = ingestion.total_conversion_time # type: ignore
669 return {
670 "converted_files_count": (
671 converted_files if converted_files is not None else 0
672 ),
673 "conversion_failures_count": (
674 conversion_failures
675 if conversion_failures is not None
676 else 0
677 ),
678 "attachments_processed_count": (
679 attachments_processed
680 if attachments_processed is not None
681 else 0
682 ),
683 "total_conversion_time": (
684 total_time if total_time is not None else 0.0
685 ),
686 }
687 else:
688 return {
689 "converted_files_count": 0,
690 "conversion_failures_count": 0,
691 "attachments_processed_count": 0,
692 "total_conversion_time": 0.0,
693 }
694 except Exception as e:
695 self.logger.error(
696 f"Error getting conversion metrics for {source_type}:{source}: {str(e)}",
697 exc_info=True,
698 )
699 raise
701 async def get_attachment_documents(
702 self, parent_document_id: str
703 ) -> list[DocumentStateRecord]:
704 """Get all attachment documents for a parent document."""
705 self.logger.debug(
706 f"Getting attachment documents for parent {parent_document_id}"
707 )
708 try:
709 async with self._session_factory() as session: # type: ignore
710 result = await session.execute(
711 select(DocumentStateRecord).filter(
712 DocumentStateRecord.parent_document_id == parent_document_id,
713 DocumentStateRecord.is_attachment == True,
714 DocumentStateRecord.is_deleted == False,
715 )
716 )
717 attachments = list(result.scalars().all())
718 self.logger.debug(
719 f"Found {len(attachments)} attachments for parent {parent_document_id}"
720 )
721 return attachments
722 except Exception as e:
723 self.logger.error(
724 f"Error getting attachment documents for {parent_document_id}: {str(e)}",
725 exc_info=True,
726 )
727 raise
729 async def get_converted_documents(
730 self, source_type: str, source: str, conversion_method: str | None = None
731 ) -> list[DocumentStateRecord]:
732 """Get all converted documents for a source, optionally filtered by conversion method."""
733 self.logger.debug(f"Getting converted documents for {source_type}:{source}")
734 try:
735 async with self._session_factory() as session: # type: ignore
736 query = select(DocumentStateRecord).filter(
737 DocumentStateRecord.source_type == source_type,
738 DocumentStateRecord.source == source,
739 DocumentStateRecord.is_converted == True,
740 DocumentStateRecord.is_deleted == False,
741 )
742 if conversion_method:
743 query = query.filter(
744 DocumentStateRecord.conversion_method == conversion_method
745 )
747 result = await session.execute(query)
748 documents = list(result.scalars().all())
749 self.logger.debug(
750 f"Found {len(documents)} converted documents for {source_type}:{source}"
751 )
752 return documents
753 except Exception as e:
754 self.logger.error(
755 f"Error getting converted documents for {source_type}:{source}: {str(e)}",
756 exc_info=True,
757 )
758 raise
760 async def close(self):
761 """Close all database connections."""
762 if hasattr(self, "_engine") and self._engine is not None:
763 self.logger.debug("Closing database connections")
764 await self._engine.dispose()
765 self.logger.debug("Database connections closed")