Coverage for src/qdrant_loader/core/state/utils.py: 54%
39 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""
2Utilities for StateManager: database URL construction and common query builders.
3"""
5from __future__ import annotations
7import os
8from pathlib import Path
10from sqlalchemy import select
12from qdrant_loader.core.state.exceptions import DatabaseError
13from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory
16def ensure_parent_directory(db_path: Path) -> None:
17 """Ensure the parent directory of the database file exists and is writable."""
18 parent_dir = db_path.parent
19 if not parent_dir.exists():
20 try:
21 parent_dir.mkdir(parents=True, exist_ok=True)
22 except Exception as e: # pragma: no cover - safety net
23 raise DatabaseError(
24 f"Cannot create database directory {parent_dir}: {e}"
25 ) from e
26 if not os.access(parent_dir, os.W_OK):
27 raise DatabaseError(f"No write permission for database directory: {parent_dir}")
30def generate_sqlite_aiosqlite_url(database_path: str) -> str:
31 """Generate an aiosqlite URL from a configured database path string.
33 Supports special values like ":memory:" and already-prefixed sqlite URLs.
34 Ensures parent directory exists for file-backed databases.
35 """
36 if database_path in (":memory:", "sqlite:///:memory:", "sqlite://:memory:"):
37 return "sqlite+aiosqlite:///:memory:"
39 if database_path.startswith("sqlite://"):
40 # Convert to aiosqlite dialect
41 return database_path.replace("sqlite://", "sqlite+aiosqlite://")
43 # Treat as filesystem path
44 db_path = Path(database_path)
45 if not db_path.is_absolute():
46 db_path = db_path.resolve()
48 ensure_parent_directory(db_path)
50 # Normalize to POSIX path for SQLAlchemy URL
51 db_url_path = db_path.as_posix()
52 # Absolute and relative are handled similarly here (three slashes)
53 return f"sqlite+aiosqlite:///{db_url_path}"
56def build_ingestion_history_select(
57 source_type: str,
58 source: str,
59 project_id: str | None = None,
60 order_by_last_successful_desc: bool = False,
61):
62 """Create a select() for IngestionHistory with optional project filter and ordering."""
63 query = select(IngestionHistory).filter(
64 IngestionHistory.source_type == source_type, IngestionHistory.source == source
65 )
66 if project_id is not None:
67 query = query.filter(IngestionHistory.project_id == project_id)
68 if order_by_last_successful_desc:
69 query = query.order_by(IngestionHistory.last_successful_ingestion.desc())
70 return query
73def build_document_state_select(
74 source_type: str,
75 source: str,
76 document_id: str | None = None,
77 project_id: str | None = None,
78):
79 """Create a select() for DocumentStateRecord with optional project/doc filters."""
80 conditions = [
81 DocumentStateRecord.source_type == source_type,
82 DocumentStateRecord.source == source,
83 ]
84 if document_id is not None:
85 conditions.append(DocumentStateRecord.document_id == document_id)
86 query = select(DocumentStateRecord).filter(*conditions)
87 if project_id is not None:
88 query = query.filter(DocumentStateRecord.project_id == project_id)
89 return query