Coverage for src / qdrant_loader / config / workers.py: 88%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Worker scheduling configuration.""" 

2 

3from __future__ import annotations 

4 

5import re 

6from typing import Any 

7 

8from pydantic import Field, field_validator 

9 

10from qdrant_loader.config.base import BaseConfig 

11 

12_SHORT_DURATION_RE = re.compile( 

13 r"^\s*(?:(?P<hours>\d+)h)?(?:(?P<minutes>\d+)m)?(?:(?P<seconds>\d+)s)?\s*$", 

14 re.IGNORECASE, 

15) 

16_ISO_DURATION_RE = re.compile( 

17 r"^P(?:T(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?)$", 

18 re.IGNORECASE, 

19) 

20 

21 

22def parse_interval_seconds(value: int | float | str) -> int: 

23 """Parse an interval value into seconds. 

24 

25 Accepted formats: 

26 - Integer/float seconds: 300, 300.0 

27 - Numeric strings: "300" 

28 - Short duration strings: "30s", "5m", "2h", "1h30m" 

29 - ISO-8601 duration (time portion): "PT5M", "PT1H30M" 

30 """ 

31 if isinstance(value, bool): 

32 raise ValueError("interval must be a positive duration") 

33 

34 if isinstance(value, int): 

35 seconds = value 

36 if seconds < 1: 

37 raise ValueError("interval must be at least 1 second") 

38 return seconds 

39 

40 if isinstance(value, float): 

41 if not value.is_integer(): 

42 raise ValueError("interval must be a whole number of seconds") 

43 seconds = int(value) 

44 if seconds < 1: 

45 raise ValueError("interval must be at least 1 second") 

46 return seconds 

47 

48 if not isinstance(value, str): 

49 raise ValueError("interval must be an int, float, or string duration") 

50 

51 raw = value.strip() 

52 if not raw: 

53 raise ValueError("interval cannot be empty") 

54 

55 if raw.isdigit(): 

56 seconds = int(raw) 

57 if seconds < 1: 

58 raise ValueError("interval must be at least 1 second") 

59 return seconds 

60 

61 short_match = _SHORT_DURATION_RE.match(raw) 

62 if short_match: 

63 hours = int(short_match.group("hours") or 0) 

64 minutes = int(short_match.group("minutes") or 0) 

65 seconds = int(short_match.group("seconds") or 0) 

66 total = hours * 3600 + minutes * 60 + seconds 

67 if total < 1: 

68 raise ValueError("interval must be at least 1 second") 

69 return total 

70 

71 iso_match = _ISO_DURATION_RE.match(raw) 

72 if iso_match: 

73 hours = int(iso_match.group("hours") or 0) 

74 minutes = int(iso_match.group("minutes") or 0) 

75 seconds = int(iso_match.group("seconds") or 0) 

76 total = hours * 3600 + minutes * 60 + seconds 

77 if total < 1: 

78 raise ValueError("interval must be at least 1 second") 

79 return total 

80 

81 raise ValueError( 

82 "Invalid interval format. Use seconds, short durations like '5m', or ISO-8601 like 'PT5M'." 

83 ) 

84 

85 

86class IncrementalPullScheduleConfig(BaseConfig): 

87 """Periodic scheduling config for INCREMENTAL_PULL jobs.""" 

88 

89 enabled: bool = Field(default=False, description="Enable periodic scheduling") 

90 interval_seconds: int = Field( 

91 default=300, 

92 alias="interval", 

93 description="Interval in seconds (supports '5m', 'PT5M', or integer seconds)", 

94 ) 

95 dedup_statuses: list[str] = Field( 

96 default_factory=lambda: ["pending", "running"], 

97 description="Job statuses considered active for deduplication", 

98 ) 

99 payload_defaults: dict[str, Any] = Field( 

100 default_factory=dict, 

101 description="Extra payload fields merged into each scheduled job payload", 

102 ) 

103 

104 @field_validator("interval_seconds", mode="before") 

105 @classmethod 

106 def validate_interval_seconds(cls, value: int | float | str) -> int: 

107 return parse_interval_seconds(value) 

108 

109 @field_validator("dedup_statuses") 

110 @classmethod 

111 def validate_dedup_statuses(cls, values: list[str]) -> list[str]: 

112 allowed = {"pending", "running", "done", "failed", "cancelled"} 

113 normalized = [] 

114 for item in values: 

115 if not isinstance(item, str) or not item.strip(): 

116 raise ValueError("dedup statuses must be non-empty strings") 

117 status = item.strip().lower() 

118 if status not in allowed: 

119 raise ValueError( 

120 f"Invalid dedup status '{item}'. Allowed: {sorted(allowed)}" 

121 ) 

122 normalized.append(status) 

123 

124 if not normalized: 

125 raise ValueError("dedup_statuses cannot be empty") 

126 return normalized 

127 

128 @field_validator("payload_defaults") 

129 @classmethod 

130 def validate_payload_defaults(cls, value: dict[str, Any]) -> dict[str, Any]: 

131 def _validate_json_like(v: Any, path: str) -> Any: 

132 if v is None or isinstance(v, (str, int, float, bool)): 

133 return v 

134 

135 if isinstance(v, list): 

136 return [ 

137 _validate_json_like(item, f"{path}[{idx}]") 

138 for idx, item in enumerate(v) 

139 ] 

140 

141 if isinstance(v, dict): 

142 cleaned: dict[str, Any] = {} 

143 for key, item in v.items(): 

144 if not isinstance(key, str): 

145 raise ValueError(f"{path} keys must be strings") 

146 cleaned[key] = _validate_json_like(item, f"{path}.{key}") 

147 return cleaned 

148 

149 raise ValueError( 

150 f"{path} contains unsupported type {type(v).__name__}; " 

151 "allowed types are str, int, float, bool, None, list, dict" 

152 ) 

153 

154 if not isinstance(value, dict): 

155 raise ValueError("payload_defaults must be a dictionary") 

156 

157 cleaned: dict[str, Any] = {} 

158 for key, item in value.items(): 

159 if not isinstance(key, str): 

160 raise ValueError("payload_defaults keys must be strings") 

161 cleaned[key] = _validate_json_like(item, f"payload_defaults.{key}") 

162 return cleaned 

163 

164 

165class WorkerSchedulesConfig(BaseConfig): 

166 """Container for worker schedule definitions.""" 

167 

168 incremental_pull: IncrementalPullScheduleConfig = Field( 

169 default_factory=IncrementalPullScheduleConfig 

170 ) 

171 

172 

173class WorkerRuntimeConfig(BaseConfig): 

174 """Runtime knobs for worker pool behavior.""" 

175 

176 worker_count: int = Field( 

177 default=4, 

178 ge=1, 

179 description="Number of concurrent workers draining the queue", 

180 ) 

181 lease_seconds: int = Field( 

182 default=60, 

183 ge=1, 

184 description="Visibility lease duration (seconds) when a job is claimed", 

185 ) 

186 max_attempts: int = Field( 

187 default=3, 

188 ge=1, 

189 description="Maximum claim attempts per job before marking failed", 

190 ) 

191 retry_backoff_base_seconds: int = Field( 

192 default=5, 

193 ge=0, 

194 description="Exponential retry base in seconds (0 disables backoff)", 

195 ) 

196 

197 

198class WorkersConfig(BaseConfig): 

199 """Worker runtime and scheduling configuration.""" 

200 

201 runtime: WorkerRuntimeConfig = Field(default_factory=WorkerRuntimeConfig) 

202 schedules: WorkerSchedulesConfig = Field(default_factory=WorkerSchedulesConfig)