Coverage for src/qdrant_loader/core/state/state_manager.py: 78%
345 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""
2State management service for tracking document ingestion state.
3"""
5import os
6from datetime import UTC, datetime
7from pathlib import Path
9from sqlalchemy import select
10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
11from sqlalchemy.pool import StaticPool
13from qdrant_loader.config.source_config import SourceConfig
14from qdrant_loader.config.state import IngestionStatus, StateManagementConfig
15from qdrant_loader.core.document import Document
16from qdrant_loader.core.state.exceptions import DatabaseError
17from qdrant_loader.core.state.models import Base, DocumentStateRecord, IngestionHistory
18from qdrant_loader.utils.logging import LoggingConfig
20logger = LoggingConfig.get_logger(__name__)
23class StateManager:
24 """Manages state for document ingestion."""
26 def __init__(self, config: StateManagementConfig):
27 """Initialize the state manager with configuration."""
28 self.config = config
29 self._initialized = False
30 self._engine = None
31 self._session_factory = None
32 self.logger = LoggingConfig.get_logger(__name__)
34 async def __aenter__(self):
35 """Async context manager entry."""
36 self.logger.debug("=== StateManager.__aenter__() called ===")
37 self.logger.debug(f"Current initialization state: {self._initialized}")
39 # Initialize if not already initialized
40 if not self._initialized:
41 self.logger.debug("StateManager not initialized, calling initialize()")
42 await self.initialize()
43 else:
44 self.logger.debug("StateManager already initialized")
46 return self
48 async def __aexit__(self, exc_type, exc_val, exc_tb):
49 """Async context manager exit."""
50 await self.dispose()
52 async def initialize(self) -> None:
53 """Initialize the database and create tables if they don't exist."""
54 if self._initialized:
55 self.logger.debug("StateManager already initialized, skipping")
56 return
58 try:
59 self.logger.debug("Starting StateManager initialization")
61 # Process database path with enhanced Windows debugging
62 db_path_str = self.config.database_path
63 self.logger.debug(f"Original database path: {db_path_str}")
65 # Handle special databases
66 if db_path_str in (":memory:", "sqlite:///:memory:"):
67 self.logger.debug("Using in-memory database")
68 database_url = "sqlite+aiosqlite:///:memory:"
69 elif db_path_str.startswith("sqlite://"):
70 self.logger.debug("Database path is already a SQLite URL")
71 database_url = db_path_str.replace("sqlite://", "sqlite+aiosqlite://")
72 else:
73 # Process file path
74 self.logger.debug(f"Processing file path: {db_path_str}")
75 db_path = Path(db_path_str)
77 # Resolve to absolute path for proper handling
78 if not db_path.is_absolute():
79 db_path = db_path.resolve()
80 self.logger.debug(f"Resolved relative path to: {db_path}")
82 # Validate parent directory exists
83 parent_dir = db_path.parent
84 self.logger.debug(f"Database parent directory: {parent_dir}")
86 if not parent_dir.exists():
87 self.logger.debug(f"Creating parent directory: {parent_dir}")
88 try:
89 parent_dir.mkdir(parents=True, exist_ok=True)
90 self.logger.debug("Parent directory created successfully")
91 except Exception as e:
92 self.logger.error(f"Failed to create parent directory: {e}")
93 raise DatabaseError(
94 f"Cannot create database directory {parent_dir}: {e}"
95 )
97 # Check directory permissions
98 if not os.access(parent_dir, os.W_OK):
99 error_msg = (
100 f"No write permission for database directory: {parent_dir}"
101 )
102 self.logger.error(error_msg)
103 raise DatabaseError(error_msg)
105 # Convert to proper SQLite URL format
106 db_url_path = db_path.as_posix()
107 if db_path.is_absolute():
108 # For absolute paths, ensure proper URL format
109 if db_path.parts[0].endswith(":"):
110 # Windows absolute path with drive letter
111 database_url = f"sqlite+aiosqlite:///{db_url_path}"
112 else:
113 # Unix absolute path (already starts with /) - needs 4 slashes total
114 database_url = f"sqlite+aiosqlite:///{db_url_path}"
115 else:
116 # Relative path
117 database_url = f"sqlite+aiosqlite:///{db_url_path}"
119 self.logger.debug(f"Generated database URL: {database_url}")
121 # Create database engine
122 self.logger.debug("Creating database engine")
123 self._engine = create_async_engine(
124 database_url,
125 poolclass=StaticPool,
126 connect_args={"check_same_thread": False},
127 echo=False,
128 )
129 self.logger.debug("Database engine created successfully")
131 # Create session factory
132 self.logger.debug("Creating session factory")
133 self._session_factory = async_sessionmaker(
134 self._engine, expire_on_commit=False
135 )
136 self.logger.debug("Session factory created successfully")
138 # Create tables
139 self.logger.debug("Creating database tables")
140 async with self._engine.begin() as conn:
141 await conn.run_sync(Base.metadata.create_all)
142 self.logger.debug("Database tables created successfully")
144 self._initialized = True
145 self.logger.debug("StateManager initialization completed successfully")
147 except Exception as e:
148 self.logger.error(f"StateManager initialization failed: {e}", exc_info=True)
149 # Ensure we clean up any partial initialization
150 if hasattr(self, "_engine") and self._engine:
151 try:
152 await self._engine.dispose()
153 except Exception as cleanup_error:
154 self.logger.error(
155 f"Failed to cleanup engine during error handling: {cleanup_error}"
156 )
157 self._initialized = False
158 raise
160 async def dispose(self):
161 """Clean up resources."""
162 if self._engine:
163 self.logger.debug("Disposing database engine")
164 await self._engine.dispose()
165 self._engine = None
166 self._session_factory = None
167 self._initialized = False
168 self.logger.debug("StateManager resources disposed")
170 async def update_last_ingestion(
171 self,
172 source_type: str,
173 source: str,
174 status: str = IngestionStatus.SUCCESS,
175 error_message: str | None = None,
176 document_count: int = 0,
177 project_id: str | None = None,
178 ) -> None:
179 """Update and get the last successful ingestion time for a source."""
180 self.logger.debug(
181 f"Updating last ingestion for {source_type}:{source} (project: {project_id})"
182 )
183 try:
184 async with self._session_factory() as session: # type: ignore
185 self.logger.debug(
186 f"Created database session for {source_type}:{source}"
187 )
188 now = datetime.now(UTC)
189 self.logger.debug(
190 f"Executing query to find ingestion history for {source_type}:{source}"
191 )
193 # Build query with optional project filter
194 query = select(IngestionHistory).filter_by(
195 source_type=source_type, source=source
196 )
197 if project_id is not None:
198 query = query.filter_by(project_id=project_id)
200 result = await session.execute(query)
201 ingestion = result.scalar_one_or_none()
202 self.logger.debug(
203 f"Query result: {'Found' if ingestion else 'Not found'} ingestion history for {source_type}:{source}"
204 )
206 if ingestion:
207 self.logger.debug(
208 f"Updating existing ingestion history for {source_type}:{source}"
209 )
210 ingestion.last_successful_ingestion = now if status == IngestionStatus.SUCCESS else ingestion.last_successful_ingestion # type: ignore
211 ingestion.status = status # type: ignore
212 ingestion.document_count = document_count if document_count else ingestion.document_count # type: ignore
213 ingestion.updated_at = now # type: ignore
214 ingestion.error_message = error_message # type: ignore
215 else:
216 self.logger.debug(
217 f"Creating new ingestion history for {source_type}:{source}"
218 )
219 ingestion = IngestionHistory(
220 project_id=project_id,
221 source_type=source_type,
222 source=source,
223 last_successful_ingestion=now,
224 status=status,
225 document_count=document_count,
226 error_message=error_message,
227 created_at=now,
228 updated_at=now,
229 )
230 session.add(ingestion)
232 self.logger.debug(f"Committing changes for {source_type}:{source}")
233 await session.commit()
234 self.logger.debug(
235 f"Successfully committed changes for {source_type}:{source}"
236 )
238 self.logger.debug(
239 "Ingestion history updated",
240 extra={
241 "project_id": project_id,
242 "source_type": ingestion.source_type,
243 "source": ingestion.source,
244 "status": ingestion.status,
245 "document_count": ingestion.document_count,
246 },
247 )
248 except Exception as e:
249 self.logger.error(
250 f"Error updating last ingestion for {source_type}:{source}: {str(e)}",
251 exc_info=True,
252 )
253 raise
255 async def get_last_ingestion(
256 self, source_type: str, source: str, project_id: str | None = None
257 ) -> IngestionHistory | None:
258 """Get the last ingestion record for a source."""
259 self.logger.debug(
260 f"Getting last ingestion for {source_type}:{source} (project: {project_id})"
261 )
262 try:
263 async with self._session_factory() as session: # type: ignore
264 self.logger.debug(
265 f"Created database session for {source_type}:{source}"
266 )
267 self.logger.debug(
268 f"Executing query to find last ingestion for {source_type}:{source}"
269 )
271 # Build query with optional project filter
272 query = select(IngestionHistory).filter(
273 IngestionHistory.source_type == source_type,
274 IngestionHistory.source == source,
275 )
276 if project_id is not None:
277 query = query.filter(IngestionHistory.project_id == project_id)
279 query = query.order_by(
280 IngestionHistory.last_successful_ingestion.desc()
281 )
282 result = await session.execute(query)
283 ingestion = result.scalar_one_or_none()
284 self.logger.debug(
285 f"Query result: {'Found' if ingestion else 'Not found'} last ingestion for {source_type}:{source}"
286 )
287 return ingestion
288 except Exception as e:
289 self.logger.error(
290 f"Error getting last ingestion for {source_type}:{source}: {str(e)}",
291 exc_info=True,
292 )
293 raise
295 async def mark_document_deleted(
296 self,
297 source_type: str,
298 source: str,
299 document_id: str,
300 project_id: str | None = None,
301 ) -> None:
302 """Mark a document as deleted."""
303 self.logger.debug(
304 f"Marking document as deleted: {source_type}:{source}:{document_id} (project: {project_id})"
305 )
306 try:
307 async with self._session_factory() as session: # type: ignore
308 self.logger.debug(
309 f"Created database session for {source_type}:{source}:{document_id}"
310 )
311 now = datetime.now(UTC)
312 self.logger.debug(
313 "Searching for document to be deleted.",
314 extra={
315 "document_id": document_id,
316 "source_type": source_type,
317 "source": source,
318 "project_id": project_id,
319 },
320 )
321 self.logger.debug(
322 f"Executing query to find document {source_type}:{source}:{document_id}"
323 )
325 # Build query with optional project filter
326 query = select(DocumentStateRecord).filter(
327 DocumentStateRecord.source_type == source_type,
328 DocumentStateRecord.source == source,
329 DocumentStateRecord.document_id == document_id,
330 )
331 if project_id is not None:
332 query = query.filter(DocumentStateRecord.project_id == project_id)
334 result = await session.execute(query)
335 state = result.scalar_one_or_none()
336 self.logger.debug(
337 f"Query result: {'Found' if state else 'Not found'} document {source_type}:{source}:{document_id}"
338 )
340 if state:
341 self.logger.debug(
342 f"Updating document state for {source_type}:{source}:{document_id}"
343 )
344 state.is_deleted = True # type: ignore
345 state.updated_at = now # type: ignore
346 self.logger.debug(
347 f"Committing changes for {source_type}:{source}:{document_id}"
348 )
349 await session.commit()
350 self.logger.debug(
351 f"Successfully committed changes for {source_type}:{source}:{document_id}"
352 )
353 self.logger.debug(
354 "Document marked as deleted",
355 extra={
356 "document_id": document_id,
357 "source_type": source_type,
358 "source": source,
359 "project_id": project_id,
360 },
361 )
362 else:
363 self.logger.warning(
364 f"Document not found: {source_type}:{source}:{document_id}"
365 )
366 except Exception as e:
367 self.logger.error(
368 f"Error marking document as deleted {source_type}:{source}:{document_id}: {str(e)}",
369 exc_info=True,
370 )
371 raise
373 async def get_document_state_record(
374 self,
375 source_type: str,
376 source: str,
377 document_id: str,
378 project_id: str | None = None,
379 ) -> DocumentStateRecord | None:
380 """Get the state of a document."""
381 self.logger.debug(
382 f"Getting document state for {source_type}:{source}:{document_id} (project: {project_id})"
383 )
384 try:
385 async with self._session_factory() as session: # type: ignore
386 self.logger.debug(
387 f"Created database session for {source_type}:{source}:{document_id}"
388 )
389 self.logger.debug(
390 f"Executing query to find document state for {source_type}:{source}:{document_id}"
391 )
393 # Build query with optional project filter
394 query = select(DocumentStateRecord).filter(
395 DocumentStateRecord.source_type == source_type,
396 DocumentStateRecord.source == source,
397 DocumentStateRecord.document_id == document_id,
398 )
399 if project_id is not None:
400 query = query.filter(DocumentStateRecord.project_id == project_id)
402 result = await session.execute(query)
403 state = result.scalar_one_or_none()
404 self.logger.debug(
405 f"Query result: {'Found' if state else 'Not found'} document state for {source_type}:{source}:{document_id}"
406 )
407 return state
408 except Exception as e:
409 self.logger.error(
410 f"Error getting document state for {source_type}:{source}:{document_id}: {str(e)}",
411 exc_info=True,
412 )
413 raise
415 async def get_document_state_records(
416 self, source_config: SourceConfig, since: datetime | None = None
417 ) -> list[DocumentStateRecord]:
418 """Get all document states for a source, optionally filtered by date."""
419 self.logger.debug(
420 f"Getting document state records for {source_config.source_type}:{source_config.source}"
421 )
422 try:
423 async with self._session_factory() as session: # type: ignore
424 self.logger.debug(
425 f"Created database session for {source_config.source_type}:{source_config.source}"
426 )
427 query = select(DocumentStateRecord).filter(
428 DocumentStateRecord.source_type == source_config.source_type,
429 DocumentStateRecord.source == source_config.source,
430 )
431 if since:
432 query = query.filter(DocumentStateRecord.updated_at >= since)
433 self.logger.debug(
434 f"Executing query for {source_config.source_type}:{source_config.source}"
435 )
436 result = await session.execute(query)
437 self.logger.debug(
438 f"Query executed, getting all records for {source_config.source_type}:{source_config.source}"
439 )
440 records = list(result.scalars().all())
441 self.logger.debug(
442 f"Got {len(records)} records for {source_config.source_type}:{source_config.source}"
443 )
444 return records
445 except Exception as e:
446 self.logger.error(
447 f"Error getting document state records for {source_config.source_type}:{source_config.source}: {str(e)}",
448 exc_info=True,
449 )
450 raise
452 async def update_document_state(
453 self, document: Document, project_id: str | None = None
454 ) -> DocumentStateRecord:
455 """Update the state of a document."""
456 if not self._initialized:
457 raise RuntimeError("StateManager not initialized. Call initialize() first.")
459 self.logger.debug(
460 f"Updating document state for {document.source_type}:{document.source}:{document.id} (project: {project_id})"
461 )
462 try:
463 async with self._session_factory() as session: # type: ignore
464 self.logger.debug(
465 f"Created database session for {document.source_type}:{document.source}:{document.id}"
466 )
467 self.logger.debug(
468 f"Executing query to find document state for {document.source_type}:{document.source}:{document.id}"
469 )
471 # Build query with optional project filter
472 query = select(DocumentStateRecord).filter(
473 DocumentStateRecord.source_type == document.source_type,
474 DocumentStateRecord.source == document.source,
475 DocumentStateRecord.document_id == document.id,
476 )
477 if project_id is not None:
478 query = query.filter(DocumentStateRecord.project_id == project_id)
480 result = await session.execute(query)
481 document_state_record = result.scalar_one_or_none()
482 self.logger.debug(
483 f"Query result: {'Found' if document_state_record else 'Not found'} document state for {document.source_type}:{document.source}:{document.id}"
484 )
486 now = datetime.now(UTC)
488 # Extract file conversion metadata from document
489 metadata = document.metadata
490 conversion_method = metadata.get("conversion_method")
491 is_converted = conversion_method is not None
492 conversion_failed = metadata.get("conversion_failed", False)
494 # Extract attachment metadata
495 is_attachment = metadata.get("is_attachment", False)
496 parent_document_id = metadata.get("parent_document_id")
497 attachment_id = metadata.get("attachment_id")
499 if document_state_record:
500 # Update existing record
501 self.logger.debug(
502 f"Updating existing document state for {document.source_type}:{document.source}:{document.id}"
503 )
504 document_state_record.title = document.title # type: ignore
505 document_state_record.content_hash = document.content_hash # type: ignore
506 document_state_record.is_deleted = False # type: ignore
507 document_state_record.updated_at = now # type: ignore
509 # Update file conversion metadata
510 document_state_record.is_converted = is_converted # type: ignore
511 document_state_record.conversion_method = conversion_method # type: ignore
512 document_state_record.original_file_type = metadata.get("original_file_type") # type: ignore
513 document_state_record.original_filename = metadata.get("original_filename") # type: ignore
514 document_state_record.file_size = metadata.get("file_size") # type: ignore
515 document_state_record.conversion_failed = conversion_failed # type: ignore
516 document_state_record.conversion_error = metadata.get("conversion_error") # type: ignore
517 document_state_record.conversion_time = metadata.get("conversion_time") # type: ignore
519 # Update attachment metadata
520 document_state_record.is_attachment = is_attachment # type: ignore
521 document_state_record.parent_document_id = parent_document_id # type: ignore
522 document_state_record.attachment_id = attachment_id # type: ignore
523 document_state_record.attachment_filename = metadata.get("attachment_filename") # type: ignore
524 document_state_record.attachment_mime_type = metadata.get("attachment_mime_type") # type: ignore
525 document_state_record.attachment_download_url = metadata.get("attachment_download_url") # type: ignore
526 document_state_record.attachment_author = metadata.get("attachment_author") # type: ignore
528 # Handle attachment creation date
529 attachment_created_str = metadata.get("attachment_created_at")
530 if attachment_created_str:
531 try:
532 if isinstance(attachment_created_str, str):
533 document_state_record.attachment_created_at = datetime.fromisoformat(attachment_created_str.replace("Z", "+00:00")) # type: ignore
534 elif isinstance(attachment_created_str, datetime):
535 document_state_record.attachment_created_at = attachment_created_str # type: ignore
536 except (ValueError, TypeError) as e:
537 self.logger.warning(
538 f"Failed to parse attachment_created_at: {e}"
539 )
540 document_state_record.attachment_created_at = None # type: ignore
541 else:
542 # Create new record
543 self.logger.debug(
544 f"Creating new document state for {document.source_type}:{document.source}:{document.id}"
545 )
547 # Handle attachment creation date for new records
548 attachment_created_at = None
549 attachment_created_str = metadata.get("attachment_created_at")
550 if attachment_created_str:
551 try:
552 if isinstance(attachment_created_str, str):
553 attachment_created_at = datetime.fromisoformat(
554 attachment_created_str.replace("Z", "+00:00")
555 )
556 elif isinstance(attachment_created_str, datetime):
557 attachment_created_at = attachment_created_str
558 except (ValueError, TypeError) as e:
559 self.logger.warning(
560 f"Failed to parse attachment_created_at: {e}"
561 )
563 document_state_record = DocumentStateRecord(
564 project_id=project_id,
565 document_id=document.id,
566 source_type=document.source_type,
567 source=document.source,
568 url=document.url,
569 title=document.title,
570 content_hash=document.content_hash,
571 is_deleted=False,
572 created_at=now,
573 updated_at=now,
574 # File conversion metadata
575 is_converted=is_converted,
576 conversion_method=conversion_method,
577 original_file_type=metadata.get("original_file_type"),
578 original_filename=metadata.get("original_filename"),
579 file_size=metadata.get("file_size"),
580 conversion_failed=conversion_failed,
581 conversion_error=metadata.get("conversion_error"),
582 conversion_time=metadata.get("conversion_time"),
583 # Attachment metadata
584 is_attachment=is_attachment,
585 parent_document_id=parent_document_id,
586 attachment_id=attachment_id,
587 attachment_filename=metadata.get("attachment_filename"),
588 attachment_mime_type=metadata.get("attachment_mime_type"),
589 attachment_download_url=metadata.get("attachment_download_url"),
590 attachment_author=metadata.get("attachment_author"),
591 attachment_created_at=attachment_created_at,
592 )
593 session.add(document_state_record)
595 self.logger.debug(
596 f"Committing changes for {document.source_type}:{document.source}:{document.id}"
597 )
598 await session.commit()
599 self.logger.debug(
600 f"Successfully committed changes for {document.source_type}:{document.source}:{document.id}"
601 )
603 self.logger.debug(
604 "Document state updated",
605 extra={
606 "project_id": project_id,
607 "document_id": document_state_record.document_id,
608 "content_hash": document_state_record.content_hash,
609 "updated_at": document_state_record.updated_at,
610 "is_converted": document_state_record.is_converted,
611 "is_attachment": document_state_record.is_attachment,
612 "conversion_method": document_state_record.conversion_method,
613 },
614 )
615 return document_state_record
616 except Exception as e:
617 self.logger.error(
618 "Failed to update document state",
619 extra={
620 "project_id": project_id,
621 "document_id": document.id,
622 "error": str(e),
623 "error_type": type(e).__name__,
624 },
625 )
626 raise
628 async def update_conversion_metrics(
629 self,
630 source_type: str,
631 source: str,
632 converted_files_count: int = 0,
633 conversion_failures_count: int = 0,
634 attachments_processed_count: int = 0,
635 total_conversion_time: float = 0.0,
636 ) -> None:
637 """Update file conversion metrics for a source."""
638 self.logger.debug(f"Updating conversion metrics for {source_type}:{source}")
639 try:
640 async with self._session_factory() as session: # type: ignore
641 result = await session.execute(
642 select(IngestionHistory).filter_by(
643 source_type=source_type, source=source
644 )
645 )
646 ingestion = result.scalar_one_or_none()
648 if ingestion:
649 # Update existing metrics
650 ingestion.converted_files_count = (ingestion.converted_files_count or 0) + converted_files_count # type: ignore
651 ingestion.conversion_failures_count = (ingestion.conversion_failures_count or 0) + conversion_failures_count # type: ignore
652 ingestion.attachments_processed_count = (ingestion.attachments_processed_count or 0) + attachments_processed_count # type: ignore
653 ingestion.total_conversion_time = (ingestion.total_conversion_time or 0.0) + total_conversion_time # type: ignore
654 ingestion.updated_at = datetime.now(UTC) # type: ignore
655 else:
656 # Create new record with conversion metrics
657 now = datetime.now(UTC)
658 ingestion = IngestionHistory(
659 source_type=source_type,
660 source=source,
661 last_successful_ingestion=now,
662 status="SUCCESS",
663 document_count=0,
664 converted_files_count=converted_files_count,
665 conversion_failures_count=conversion_failures_count,
666 attachments_processed_count=attachments_processed_count,
667 total_conversion_time=total_conversion_time,
668 created_at=now,
669 updated_at=now,
670 )
671 session.add(ingestion)
673 await session.commit()
674 self.logger.debug(
675 "Conversion metrics updated",
676 extra={
677 "source_type": source_type,
678 "source": source,
679 "converted_files": ingestion.converted_files_count,
680 "conversion_failures": ingestion.conversion_failures_count,
681 "attachments_processed": ingestion.attachments_processed_count,
682 "total_conversion_time": ingestion.total_conversion_time,
683 },
684 )
685 except Exception as e:
686 self.logger.error(
687 f"Error updating conversion metrics for {source_type}:{source}: {str(e)}",
688 exc_info=True,
689 )
690 raise
692 async def get_conversion_metrics(
693 self, source_type: str, source: str
694 ) -> dict[str, int | float]:
695 """Get file conversion metrics for a source."""
696 self.logger.debug(f"Getting conversion metrics for {source_type}:{source}")
697 try:
698 async with self._session_factory() as session: # type: ignore
699 result = await session.execute(
700 select(IngestionHistory).filter_by(
701 source_type=source_type, source=source
702 )
703 )
704 ingestion = result.scalar_one_or_none()
706 if ingestion:
707 # Access the actual values from the SQLAlchemy model instance
708 converted_files: int | None = ingestion.converted_files_count # type: ignore
709 conversion_failures: int | None = ingestion.conversion_failures_count # type: ignore
710 attachments_processed: int | None = ingestion.attachments_processed_count # type: ignore
711 total_time: float | None = ingestion.total_conversion_time # type: ignore
713 return {
714 "converted_files_count": (
715 converted_files if converted_files is not None else 0
716 ),
717 "conversion_failures_count": (
718 conversion_failures
719 if conversion_failures is not None
720 else 0
721 ),
722 "attachments_processed_count": (
723 attachments_processed
724 if attachments_processed is not None
725 else 0
726 ),
727 "total_conversion_time": (
728 total_time if total_time is not None else 0.0
729 ),
730 }
731 else:
732 return {
733 "converted_files_count": 0,
734 "conversion_failures_count": 0,
735 "attachments_processed_count": 0,
736 "total_conversion_time": 0.0,
737 }
738 except Exception as e:
739 self.logger.error(
740 f"Error getting conversion metrics for {source_type}:{source}: {str(e)}",
741 exc_info=True,
742 )
743 raise
745 async def get_attachment_documents(
746 self, parent_document_id: str
747 ) -> list[DocumentStateRecord]:
748 """Get all attachment documents for a parent document."""
749 self.logger.debug(
750 f"Getting attachment documents for parent {parent_document_id}"
751 )
752 try:
753 async with self._session_factory() as session: # type: ignore
754 result = await session.execute(
755 select(DocumentStateRecord).filter(
756 DocumentStateRecord.parent_document_id == parent_document_id,
757 DocumentStateRecord.is_attachment == True,
758 DocumentStateRecord.is_deleted == False,
759 )
760 )
761 attachments = list(result.scalars().all())
762 self.logger.debug(
763 f"Found {len(attachments)} attachments for parent {parent_document_id}"
764 )
765 return attachments
766 except Exception as e:
767 self.logger.error(
768 f"Error getting attachment documents for {parent_document_id}: {str(e)}",
769 exc_info=True,
770 )
771 raise
773 async def get_converted_documents(
774 self, source_type: str, source: str, conversion_method: str | None = None
775 ) -> list[DocumentStateRecord]:
776 """Get all converted documents for a source, optionally filtered by conversion method."""
777 self.logger.debug(f"Getting converted documents for {source_type}:{source}")
778 try:
779 async with self._session_factory() as session: # type: ignore
780 query = select(DocumentStateRecord).filter(
781 DocumentStateRecord.source_type == source_type,
782 DocumentStateRecord.source == source,
783 DocumentStateRecord.is_converted == True,
784 DocumentStateRecord.is_deleted == False,
785 )
786 if conversion_method:
787 query = query.filter(
788 DocumentStateRecord.conversion_method == conversion_method
789 )
791 result = await session.execute(query)
792 documents = list(result.scalars().all())
793 self.logger.debug(
794 f"Found {len(documents)} converted documents for {source_type}:{source}"
795 )
796 return documents
797 except Exception as e:
798 self.logger.error(
799 f"Error getting converted documents for {source_type}:{source}: {str(e)}",
800 exc_info=True,
801 )
802 raise
804 async def close(self):
805 """Close all database connections."""
806 if hasattr(self, "_engine") and self._engine is not None:
807 self.logger.debug("Closing database connections")
808 await self._engine.dispose()
809 self.logger.debug("Database connections closed")