Coverage for src/qdrant_loader/config/parser.py: 100%

76 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-11 07:21 +0000

1"""Multi-project configuration parser. 

2 

3This module provides parsing functionality for multi-project configurations. 

4""" 

5 

6import re 

7from typing import Any 

8 

9from pydantic import ValidationError 

10 

11from ..utils.logging import LoggingConfig 

12from .global_config import GlobalConfig 

13from .models import ParsedConfig, ProjectConfig, ProjectsConfig 

14from .sources import SourcesConfig 

15from .validator import ConfigValidator 

16 

17 

18def _get_logger(): 

19 return LoggingConfig.get_logger(__name__) 

20 

21 

22class MultiProjectConfigParser: 

23 """Parser for multi-project configurations.""" 

24 

25 def __init__(self, validator: ConfigValidator): 

26 """Initialize the parser with a validator. 

27 

28 Args: 

29 validator: Configuration validator instance 

30 """ 

31 self.validator = validator 

32 

33 def parse( 

34 self, config_data: dict[str, Any], skip_validation: bool = False 

35 ) -> ParsedConfig: 

36 """Parse configuration with multi-project support. 

37 

38 Args: 

39 config_data: Raw configuration data from YAML 

40 skip_validation: Whether to skip validation during parsing 

41 

42 Returns: 

43 ParsedConfig: Parsed configuration with project information 

44 

45 Raises: 

46 ValidationError: If configuration is invalid 

47 """ 

48 _get_logger().debug("Starting configuration parsing") 

49 

50 # Validate configuration structure 

51 self.validator.validate_structure(config_data) 

52 

53 # Parse global configuration 

54 global_config = self._parse_global_config( 

55 config_data.get("global", {}), skip_validation 

56 ) 

57 

58 # Parse projects 

59 projects_config = self._parse_projects(config_data, global_config) 

60 

61 _get_logger().debug( 

62 "Configuration parsing completed", 

63 project_count=len(projects_config.projects), 

64 ) 

65 

66 return ParsedConfig( 

67 global_config=global_config, 

68 projects_config=projects_config, 

69 ) 

70 

71 def _parse_global_config( 

72 self, global_data: dict[str, Any], skip_validation: bool = False 

73 ) -> GlobalConfig: 

74 """Parse global configuration section. 

75 

76 Args: 

77 global_data: Global configuration data 

78 skip_validation: Whether to skip validation during parsing 

79 

80 Returns: 

81 GlobalConfig: Parsed global configuration 

82 """ 

83 try: 

84 return GlobalConfig(**global_data, skip_validation=skip_validation) 

85 except ValidationError as e: 

86 _get_logger().error("Failed to parse global configuration", error=str(e)) 

87 raise 

88 

89 def _parse_projects( 

90 self, config_data: dict[str, Any], global_config: GlobalConfig 

91 ) -> ProjectsConfig: 

92 """Parse project configurations. 

93 

94 Args: 

95 config_data: Raw configuration data 

96 global_config: Parsed global configuration 

97 

98 Returns: 

99 ProjectsConfig: Parsed projects configuration 

100 """ 

101 projects_config = ProjectsConfig() 

102 

103 # Handle multi-project format 

104 projects_data = config_data.get("projects", {}) 

105 for project_id, project_data in projects_data.items(): 

106 project_config = self._parse_project_config( 

107 project_id, project_data, global_config 

108 ) 

109 projects_config.add_project(project_config) 

110 _get_logger().debug("Parsed project configuration", project_id=project_id) 

111 

112 return projects_config 

113 

114 def _parse_project_config( 

115 self, project_id: str, project_data: dict[str, Any], global_config: GlobalConfig 

116 ) -> ProjectConfig: 

117 """Parse individual project configuration. 

118 

119 Args: 

120 project_id: Project identifier 

121 project_data: Project configuration data 

122 global_config: Global configuration 

123 

124 Returns: 

125 ProjectConfig: Parsed project configuration 

126 """ 

127 # Validate project ID 

128 if not self._is_valid_project_id(project_id): 

129 raise ValueError( 

130 f"Invalid project ID '{project_id}'. " 

131 "Project IDs must be valid Python identifiers (alphanumeric + underscores)." 

132 ) 

133 

134 # Extract basic project information 

135 display_name = project_data.get("display_name", project_id) 

136 description = project_data.get("description") 

137 project_data.get("collection_name") 

138 

139 # Parse project-specific sources with automatic field injection 

140 sources_data = project_data.get("sources", {}) 

141 enhanced_sources_data = self._inject_source_metadata(sources_data) 

142 sources_config = SourcesConfig(**enhanced_sources_data) 

143 

144 # Extract configuration overrides 

145 overrides = project_data.get("overrides", {}) 

146 

147 # Merge project-specific overrides with global config 

148 merged_overrides = self._merge_configs(global_config, overrides) 

149 

150 return ProjectConfig( 

151 project_id=project_id, 

152 display_name=display_name, 

153 description=description, 

154 sources=sources_config, 

155 overrides=merged_overrides, 

156 ) 

157 

158 def _inject_source_metadata(self, sources_data: dict[str, Any]) -> dict[str, Any]: 

159 """Inject source_type and source fields into source configurations. 

160 

161 Args: 

162 sources_data: Raw sources configuration data 

163 

164 Returns: 

165 Dict[str, Any]: Enhanced sources data with injected metadata 

166 """ 

167 enhanced_data = {} 

168 

169 for source_type, source_configs in sources_data.items(): 

170 if not isinstance(source_configs, dict): 

171 enhanced_data[source_type] = source_configs 

172 continue 

173 

174 enhanced_source_configs = {} 

175 for source_name, source_config in source_configs.items(): 

176 if isinstance(source_config, dict): 

177 # Create a copy to avoid modifying the original 

178 enhanced_config = source_config.copy() 

179 

180 # Always inject source_type and source fields 

181 enhanced_config["source_type"] = source_type 

182 enhanced_config["source"] = source_name 

183 

184 enhanced_source_configs[source_name] = enhanced_config 

185 else: 

186 enhanced_source_configs[source_name] = source_config 

187 

188 enhanced_data[source_type] = enhanced_source_configs 

189 

190 return enhanced_data 

191 

192 def _is_valid_project_id(self, project_id: str) -> bool: 

193 """Validate project ID format. 

194 

195 Args: 

196 project_id: Project identifier to validate 

197 

198 Returns: 

199 bool: True if valid, False otherwise 

200 """ 

201 # Project IDs must be valid Python identifiers 

202 # Allow alphanumeric characters, underscores, and hyphens 

203 pattern = r"^[a-zA-Z][a-zA-Z0-9_-]*$" 

204 return bool(re.match(pattern, project_id)) 

205 

206 def _merge_configs( 

207 self, global_config: GlobalConfig, project_overrides: dict[str, Any] 

208 ) -> dict[str, Any]: 

209 """Merge project-specific overrides with global configuration. 

210 

211 Args: 

212 global_config: Global configuration 

213 project_overrides: Project-specific overrides 

214 

215 Returns: 

216 Dict[str, Any]: Merged configuration 

217 """ 

218 # Convert global config to dict 

219 global_dict = global_config.to_dict() 

220 

221 # Deep merge project overrides 

222 merged = self._deep_merge_dicts(global_dict, project_overrides) 

223 

224 return merged 

225 

226 def _deep_merge_dicts( 

227 self, base: dict[str, Any], override: dict[str, Any] 

228 ) -> dict[str, Any]: 

229 """Deep merge two dictionaries. 

230 

231 Args: 

232 base: Base dictionary 

233 override: Override dictionary 

234 

235 Returns: 

236 Dict[str, Any]: Merged dictionary 

237 """ 

238 result = base.copy() 

239 

240 for key, value in override.items(): 

241 if ( 

242 key in result 

243 and isinstance(result[key], dict) 

244 and isinstance(value, dict) 

245 ): 

246 result[key] = self._deep_merge_dicts(result[key], value) 

247 else: 

248 result[key] = value 

249 

250 return result