Coverage for src / qdrant_loader / config / workers.py: 88%
109 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Worker scheduling configuration."""
3from __future__ import annotations
5import re
6from typing import Any
8from pydantic import Field, field_validator
10from qdrant_loader.config.base import BaseConfig
12_SHORT_DURATION_RE = re.compile(
13 r"^\s*(?:(?P<hours>\d+)h)?(?:(?P<minutes>\d+)m)?(?:(?P<seconds>\d+)s)?\s*$",
14 re.IGNORECASE,
15)
16_ISO_DURATION_RE = re.compile(
17 r"^P(?:T(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?)$",
18 re.IGNORECASE,
19)
22def parse_interval_seconds(value: int | float | str) -> int:
23 """Parse an interval value into seconds.
25 Accepted formats:
26 - Integer/float seconds: 300, 300.0
27 - Numeric strings: "300"
28 - Short duration strings: "30s", "5m", "2h", "1h30m"
29 - ISO-8601 duration (time portion): "PT5M", "PT1H30M"
30 """
31 if isinstance(value, bool):
32 raise ValueError("interval must be a positive duration")
34 if isinstance(value, int):
35 seconds = value
36 if seconds < 1:
37 raise ValueError("interval must be at least 1 second")
38 return seconds
40 if isinstance(value, float):
41 if not value.is_integer():
42 raise ValueError("interval must be a whole number of seconds")
43 seconds = int(value)
44 if seconds < 1:
45 raise ValueError("interval must be at least 1 second")
46 return seconds
48 if not isinstance(value, str):
49 raise ValueError("interval must be an int, float, or string duration")
51 raw = value.strip()
52 if not raw:
53 raise ValueError("interval cannot be empty")
55 if raw.isdigit():
56 seconds = int(raw)
57 if seconds < 1:
58 raise ValueError("interval must be at least 1 second")
59 return seconds
61 short_match = _SHORT_DURATION_RE.match(raw)
62 if short_match:
63 hours = int(short_match.group("hours") or 0)
64 minutes = int(short_match.group("minutes") or 0)
65 seconds = int(short_match.group("seconds") or 0)
66 total = hours * 3600 + minutes * 60 + seconds
67 if total < 1:
68 raise ValueError("interval must be at least 1 second")
69 return total
71 iso_match = _ISO_DURATION_RE.match(raw)
72 if iso_match:
73 hours = int(iso_match.group("hours") or 0)
74 minutes = int(iso_match.group("minutes") or 0)
75 seconds = int(iso_match.group("seconds") or 0)
76 total = hours * 3600 + minutes * 60 + seconds
77 if total < 1:
78 raise ValueError("interval must be at least 1 second")
79 return total
81 raise ValueError(
82 "Invalid interval format. Use seconds, short durations like '5m', or ISO-8601 like 'PT5M'."
83 )
86class IncrementalPullScheduleConfig(BaseConfig):
87 """Periodic scheduling config for INCREMENTAL_PULL jobs."""
89 enabled: bool = Field(default=False, description="Enable periodic scheduling")
90 interval_seconds: int = Field(
91 default=300,
92 alias="interval",
93 description="Interval in seconds (supports '5m', 'PT5M', or integer seconds)",
94 )
95 dedup_statuses: list[str] = Field(
96 default_factory=lambda: ["pending", "running"],
97 description="Job statuses considered active for deduplication",
98 )
99 payload_defaults: dict[str, Any] = Field(
100 default_factory=dict,
101 description="Extra payload fields merged into each scheduled job payload",
102 )
104 @field_validator("interval_seconds", mode="before")
105 @classmethod
106 def validate_interval_seconds(cls, value: int | float | str) -> int:
107 return parse_interval_seconds(value)
109 @field_validator("dedup_statuses")
110 @classmethod
111 def validate_dedup_statuses(cls, values: list[str]) -> list[str]:
112 allowed = {"pending", "running", "done", "failed", "cancelled"}
113 normalized = []
114 for item in values:
115 if not isinstance(item, str) or not item.strip():
116 raise ValueError("dedup statuses must be non-empty strings")
117 status = item.strip().lower()
118 if status not in allowed:
119 raise ValueError(
120 f"Invalid dedup status '{item}'. Allowed: {sorted(allowed)}"
121 )
122 normalized.append(status)
124 if not normalized:
125 raise ValueError("dedup_statuses cannot be empty")
126 return normalized
128 @field_validator("payload_defaults")
129 @classmethod
130 def validate_payload_defaults(cls, value: dict[str, Any]) -> dict[str, Any]:
131 def _validate_json_like(v: Any, path: str) -> Any:
132 if v is None or isinstance(v, (str, int, float, bool)):
133 return v
135 if isinstance(v, list):
136 return [
137 _validate_json_like(item, f"{path}[{idx}]")
138 for idx, item in enumerate(v)
139 ]
141 if isinstance(v, dict):
142 cleaned: dict[str, Any] = {}
143 for key, item in v.items():
144 if not isinstance(key, str):
145 raise ValueError(f"{path} keys must be strings")
146 cleaned[key] = _validate_json_like(item, f"{path}.{key}")
147 return cleaned
149 raise ValueError(
150 f"{path} contains unsupported type {type(v).__name__}; "
151 "allowed types are str, int, float, bool, None, list, dict"
152 )
154 if not isinstance(value, dict):
155 raise ValueError("payload_defaults must be a dictionary")
157 cleaned: dict[str, Any] = {}
158 for key, item in value.items():
159 if not isinstance(key, str):
160 raise ValueError("payload_defaults keys must be strings")
161 cleaned[key] = _validate_json_like(item, f"payload_defaults.{key}")
162 return cleaned
165class WorkerSchedulesConfig(BaseConfig):
166 """Container for worker schedule definitions."""
168 incremental_pull: IncrementalPullScheduleConfig = Field(
169 default_factory=IncrementalPullScheduleConfig
170 )
173class WorkerRuntimeConfig(BaseConfig):
174 """Runtime knobs for worker pool behavior."""
176 worker_count: int = Field(
177 default=4,
178 ge=1,
179 description="Number of concurrent workers draining the queue",
180 )
181 lease_seconds: int = Field(
182 default=60,
183 ge=1,
184 description="Visibility lease duration (seconds) when a job is claimed",
185 )
186 max_attempts: int = Field(
187 default=3,
188 ge=1,
189 description="Maximum claim attempts per job before marking failed",
190 )
191 retry_backoff_base_seconds: int = Field(
192 default=5,
193 ge=0,
194 description="Exponential retry base in seconds (0 disables backoff)",
195 )
198class WorkersConfig(BaseConfig):
199 """Worker runtime and scheduling configuration."""
201 runtime: WorkerRuntimeConfig = Field(default_factory=WorkerRuntimeConfig)
202 schedules: WorkerSchedulesConfig = Field(default_factory=WorkerSchedulesConfig)