Coverage for src/qdrant_loader/config/state.py: 74%

70 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""State management configuration. 

2 

3This module defines the configuration settings for state management, 

4including database path, table prefix, and connection pool settings. 

5""" 

6 

7import os 

8from pathlib import Path 

9from typing import Any 

10 

11from pydantic import Field, ValidationInfo, field_validator 

12 

13from qdrant_loader.config.base import BaseConfig 

14 

15 

16class DatabaseDirectoryError(Exception): 

17 """Exception raised when database directory needs to be created.""" 

18 

19 def __init__(self, path: Path): 

20 self.path = path 

21 super().__init__(f"Database directory does not exist: {path}") 

22 

23 

24class IngestionStatus: 

25 """Enum-like class for ingestion status values.""" 

26 

27 SUCCESS = "success" 

28 FAILED = "failed" 

29 IN_PROGRESS = "in_progress" 

30 SKIPPED = "skipped" 

31 CANCELLED = "cancelled" 

32 

33 

34class StateManagementConfig(BaseConfig): 

35 """Configuration for state management.""" 

36 

37 database_path: str = Field(..., description="Path to SQLite database file") 

38 table_prefix: str = Field( 

39 default="qdrant_loader_", description="Prefix for database tables" 

40 ) 

41 connection_pool: dict[str, Any] = Field( 

42 default_factory=lambda: {"size": 5, "timeout": 30}, 

43 description="Connection pool settings", 

44 ) 

45 

46 @field_validator("database_path") 

47 @classmethod 

48 def validate_database_path(cls, v: str, info: ValidationInfo) -> str: 

49 """Validate database path.""" 

50 # Handle in-memory database 

51 if v in (":memory:", "sqlite:///:memory:"): 

52 return v 

53 

54 # Handle SQLite URLs 

55 if v.startswith("sqlite://"): 

56 # For SQLite URLs, skip file path validation since they might be 

57 # in-memory or use special formats 

58 return v 

59 

60 # For file paths, perform basic validation but allow directory creation 

61 try: 

62 # Expand environment variables, including $HOME 

63 expanded_path = os.path.expanduser(os.path.expandvars(v)) 

64 path = Path(expanded_path) 

65 

66 # Convert to absolute path for consistent handling 

67 if not path.is_absolute(): 

68 path = path.resolve() 

69 

70 # For absolute paths, use them as-is 

71 parent_dir = path.parent 

72 

73 # Check if parent directory exists 

74 if not parent_dir.exists(): 

75 # Don't fail here - let StateManager handle directory creation 

76 # Just validate that the path structure is reasonable 

77 try: 

78 # Test if the path is valid by trying to resolve it 

79 # Don't actually create the directory here 

80 resolved_parent = parent_dir.resolve() 

81 

82 # Basic validation: ensure the path is reasonable 

83 # Note: We removed the arbitrary depth limit as it was too restrictive 

84 # for legitimate use cases like nested project structures and Windows paths 

85 

86 except OSError as e: 

87 raise ValueError( 

88 f"Invalid database path - cannot resolve directory {parent_dir}: {e}" 

89 ) 

90 else: 

91 # Directory exists, check if it's actually a directory and writable 

92 if not parent_dir.is_dir(): 

93 raise ValueError( 

94 f"Database directory path is not a directory: {parent_dir}" 

95 ) 

96 

97 if not os.access(str(parent_dir), os.W_OK): 

98 raise ValueError( 

99 f"Database directory is not writable: {parent_dir}" 

100 ) 

101 

102 except Exception as e: 

103 # If any validation fails, still allow the path through 

104 # StateManager will provide better error handling 

105 if isinstance(e, ValueError): 

106 raise # Re-raise validation errors 

107 # For other exceptions, just log and allow the path 

108 pass 

109 

110 # Return the original value to preserve any environment variables 

111 return v 

112 

113 @field_validator("table_prefix") 

114 @classmethod 

115 def validate_table_prefix(cls, v: str, info: ValidationInfo) -> str: 

116 """Validate table prefix format.""" 

117 if not v: 

118 raise ValueError("Table prefix cannot be empty") 

119 if not v.replace("_", "").isalnum(): 

120 raise ValueError( 

121 "Table prefix can only contain alphanumeric characters and underscores" 

122 ) 

123 return v 

124 

125 @field_validator("connection_pool") 

126 @classmethod 

127 def validate_connection_pool( 

128 cls, v: dict[str, Any], info: ValidationInfo 

129 ) -> dict[str, Any]: 

130 """Validate connection pool settings.""" 

131 if "size" not in v: 

132 raise ValueError("Connection pool must specify 'size'") 

133 if not isinstance(v["size"], int) or v["size"] < 1: 

134 raise ValueError("Connection pool size must be a positive integer") 

135 

136 if "timeout" not in v: 

137 raise ValueError("Connection pool must specify 'timeout'") 

138 if not isinstance(v["timeout"], int) or v["timeout"] < 1: 

139 raise ValueError("Connection pool timeout must be a positive integer") 

140 

141 return v 

142 

143 def __init__(self, **data): 

144 """Initialize state management configuration.""" 

145 # If database_path is not provided, use in-memory database 

146 if "database_path" not in data: 

147 data["database_path"] = ":memory:" 

148 super().__init__(**data)