Coverage for src/qdrant_loader/core/state/utils.py: 54%

39 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1""" 

2Utilities for StateManager: database URL construction and common query builders. 

3""" 

4 

5from __future__ import annotations 

6 

7import os 

8from pathlib import Path 

9 

10from sqlalchemy import select 

11 

12from qdrant_loader.core.state.exceptions import DatabaseError 

13from qdrant_loader.core.state.models import DocumentStateRecord, IngestionHistory 

14 

15 

16def ensure_parent_directory(db_path: Path) -> None: 

17 """Ensure the parent directory of the database file exists and is writable.""" 

18 parent_dir = db_path.parent 

19 if not parent_dir.exists(): 

20 try: 

21 parent_dir.mkdir(parents=True, exist_ok=True) 

22 except Exception as e: # pragma: no cover - safety net 

23 raise DatabaseError( 

24 f"Cannot create database directory {parent_dir}: {e}" 

25 ) from e 

26 if not os.access(parent_dir, os.W_OK): 

27 raise DatabaseError(f"No write permission for database directory: {parent_dir}") 

28 

29 

30def generate_sqlite_aiosqlite_url(database_path: str) -> str: 

31 """Generate an aiosqlite URL from a configured database path string. 

32 

33 Supports special values like ":memory:" and already-prefixed sqlite URLs. 

34 Ensures parent directory exists for file-backed databases. 

35 """ 

36 if database_path in (":memory:", "sqlite:///:memory:", "sqlite://:memory:"): 

37 return "sqlite+aiosqlite:///:memory:" 

38 

39 if database_path.startswith("sqlite://"): 

40 # Convert to aiosqlite dialect 

41 return database_path.replace("sqlite://", "sqlite+aiosqlite://") 

42 

43 # Treat as filesystem path 

44 db_path = Path(database_path) 

45 if not db_path.is_absolute(): 

46 db_path = db_path.resolve() 

47 

48 ensure_parent_directory(db_path) 

49 

50 # Normalize to POSIX path for SQLAlchemy URL 

51 db_url_path = db_path.as_posix() 

52 # Absolute and relative are handled similarly here (three slashes) 

53 return f"sqlite+aiosqlite:///{db_url_path}" 

54 

55 

56def build_ingestion_history_select( 

57 source_type: str, 

58 source: str, 

59 project_id: str | None = None, 

60 order_by_last_successful_desc: bool = False, 

61): 

62 """Create a select() for IngestionHistory with optional project filter and ordering.""" 

63 query = select(IngestionHistory).filter( 

64 IngestionHistory.source_type == source_type, IngestionHistory.source == source 

65 ) 

66 if project_id is not None: 

67 query = query.filter(IngestionHistory.project_id == project_id) 

68 if order_by_last_successful_desc: 

69 query = query.order_by(IngestionHistory.last_successful_ingestion.desc()) 

70 return query 

71 

72 

73def build_document_state_select( 

74 source_type: str, 

75 source: str, 

76 document_id: str | None = None, 

77 project_id: str | None = None, 

78): 

79 """Create a select() for DocumentStateRecord with optional project/doc filters.""" 

80 conditions = [ 

81 DocumentStateRecord.source_type == source_type, 

82 DocumentStateRecord.source == source, 

83 ] 

84 if document_id is not None: 

85 conditions.append(DocumentStateRecord.document_id == document_id) 

86 query = select(DocumentStateRecord).filter(*conditions) 

87 if project_id is not None: 

88 query = query.filter(DocumentStateRecord.project_id == project_id) 

89 return query