Coverage for src / qdrant_loader / config / state.py: 76%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 04:48 +0000

1"""State management configuration. 

2 

3This module defines the configuration settings for state management, 

4including database path, table prefix, and connection pool settings. 

5""" 

6 

7import os 

8from pathlib import Path 

9from typing import Any 

10 

11from pydantic import Field, ValidationInfo, field_validator 

12 

13from qdrant_loader.config.base import BaseConfig 

14 

15 

16class DatabaseDirectoryError(Exception): 

17 """Exception raised when database directory needs to be created.""" 

18 

19 def __init__(self, path: Path): 

20 self.path = path 

21 super().__init__(f"Database directory does not exist: {path}") 

22 

23 

24class IngestionStatus: 

25 """Enum-like class for ingestion status values.""" 

26 

27 SUCCESS = "success" 

28 FAILED = "failed" 

29 IN_PROGRESS = "in_progress" 

30 SKIPPED = "skipped" 

31 CANCELLED = "cancelled" 

32 

33 

34class StateManagementConfig(BaseConfig): 

35 """Configuration for state management.""" 

36 

37 database_path: str = Field( 

38 default="./state.db", description="Path to SQLite database file" 

39 ) 

40 table_prefix: str = Field( 

41 default="qdrant_loader_", description="Prefix for database tables" 

42 ) 

43 connection_pool: dict[str, Any] = Field( 

44 default_factory=lambda: {"size": 5, "timeout": 30}, 

45 description="Connection pool settings", 

46 ) 

47 

48 @field_validator("database_path") 

49 @classmethod 

50 def validate_database_path(cls, v: str, info: ValidationInfo) -> str: 

51 """Validate database path.""" 

52 # Handle in-memory database 

53 if v in (":memory:", "sqlite:///:memory:"): 

54 return v 

55 

56 # Handle SQLite URLs 

57 if v.startswith("sqlite://"): 

58 # For SQLite URLs, skip file path validation since they might be 

59 # in-memory or use special formats 

60 return v 

61 

62 # For file paths, perform basic validation but allow directory creation 

63 try: 

64 # Expand environment variables, including $HOME 

65 expanded_path = os.path.expanduser(os.path.expandvars(v)) 

66 path = Path(expanded_path) 

67 

68 # Convert to absolute path for consistent handling 

69 if not path.is_absolute(): 

70 path = path.resolve() 

71 

72 # For absolute paths, use them as-is 

73 parent_dir = path.parent 

74 

75 # Check if parent directory exists 

76 if not parent_dir.exists(): 

77 # Don't fail here - let StateManager handle directory creation 

78 # Just validate that the path structure is reasonable 

79 try: 

80 # Test if the path is valid by trying to resolve it 

81 # Don't actually create the directory here 

82 parent_dir.resolve() 

83 

84 # Basic validation: ensure the path is reasonable 

85 # Note: We removed the arbitrary depth limit as it was too restrictive 

86 # for legitimate use cases like nested project structures and Windows paths 

87 

88 except OSError as e: 

89 raise ValueError( 

90 f"Invalid database path - cannot resolve directory {parent_dir}: {e}" 

91 ) 

92 else: 

93 # Directory exists, check if it's actually a directory and writable 

94 if not parent_dir.is_dir(): 

95 raise ValueError( 

96 f"Database directory path is not a directory: {parent_dir}" 

97 ) 

98 

99 if not os.access(str(parent_dir), os.W_OK): 

100 raise ValueError( 

101 f"Database directory is not writable: {parent_dir}" 

102 ) 

103 

104 except Exception as e: 

105 # If any validation fails, still allow the path through 

106 # StateManager will provide better error handling 

107 if isinstance(e, ValueError): 

108 raise # Re-raise validation errors 

109 # For other exceptions, just log and allow the path 

110 pass 

111 

112 # Return the original value to preserve any environment variables 

113 return v 

114 

115 @field_validator("table_prefix") 

116 @classmethod 

117 def validate_table_prefix(cls, v: str, info: ValidationInfo) -> str: 

118 """Validate table prefix format.""" 

119 if not v: 

120 raise ValueError("Table prefix cannot be empty") 

121 if not v.replace("_", "").isalnum(): 

122 raise ValueError( 

123 "Table prefix can only contain alphanumeric characters and underscores" 

124 ) 

125 return v 

126 

127 @field_validator("connection_pool") 

128 @classmethod 

129 def validate_connection_pool( 

130 cls, v: dict[str, Any], info: ValidationInfo 

131 ) -> dict[str, Any]: 

132 """Validate connection pool settings.""" 

133 if "size" not in v: 

134 raise ValueError("Connection pool must specify 'size'") 

135 if not isinstance(v["size"], int) or v["size"] < 1: 

136 raise ValueError("Connection pool size must be a positive integer") 

137 

138 if "timeout" not in v: 

139 raise ValueError("Connection pool must specify 'timeout'") 

140 if not isinstance(v["timeout"], int) or v["timeout"] < 1: 

141 raise ValueError("Connection pool timeout must be a positive integer") 

142 

143 return v 

144 

145 def __init__(self, **data): 

146 """Initialize state management configuration.""" 

147 # If database_path is not provided, use default file-based database 

148 if "database_path" not in data: 

149 data["database_path"] = "./state.db" 

150 super().__init__(**data)