Coverage for src / qdrant_loader / connectors / jira / config.py: 96%
134 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Configuration for Jira connector."""
3import os
4from datetime import datetime, timedelta
5from enum import StrEnum
6from typing import Self
8from pydantic import (
9 BaseModel,
10 ConfigDict,
11 Field,
12 HttpUrl,
13 field_validator,
14 model_validator,
15)
17from qdrant_loader.config.source_config import SourceConfig
20class JiraDeploymentType(StrEnum):
21 """Jira deployment types."""
23 CLOUD = "cloud"
24 DATACENTER = "datacenter"
27class JiraFieldType(StrEnum):
28 SIMPLE = "simple"
29 OBJECT = "object" # extract attribute from single object
30 ARRAY = "array" # plain list
31 ARRAY_OBJECT = "array_object" # extract attribute from list of objects
34RESERVED_NAMES = {
35 "id",
36 "key",
37 "summary",
38 "description",
39 "issue_type",
40 "status",
41 "priority",
42 "project_key",
43 "created",
44 "updated",
45 "reporter",
46 "assignee",
47 "labels",
48 "attachments",
49 "comments",
50 "parent_key",
51 "subtasks",
52 "linked_issues",
53}
56class JiraExtraField(BaseModel):
57 param_name: str = Field(
58 ...,
59 min_length=1,
60 description="The Jira API parameter name (e.g., 'customfield_11406', 'priority')",
61 )
62 name: str = Field(
63 ...,
64 min_length=1,
65 description="Target attribute name on JiraIssue (e.g., 'sah_project')",
66 )
67 field_type: JiraFieldType = Field(
68 default=JiraFieldType.SIMPLE,
69 description=(
70 "Extraction strategy: "
71 "'simple' = direct value, "
72 "'object' = extract attribute from object (requires attr_name), "
73 "'array' = plain list, "
74 "'array_object' = extract attribute from list of objects (requires attr_name)"
75 ),
76 )
77 attr_name: str | None = Field(
78 default=None,
79 description="Attribute to extract from object(s) (e.g., 'name', 'value')",
80 )
82 @field_validator("param_name", "name", "attr_name", mode="before")
83 @classmethod
84 def normalize_strings(cls, v: str | None) -> str | None:
85 if v is None:
86 return None
87 s = v.strip()
88 if s == "":
89 raise ValueError("Field value cannot be empty or whitespace")
90 return s
92 @model_validator(mode="after")
93 def validate_attr_requirement(self) -> "JiraExtraField":
94 if self.field_type in {JiraFieldType.OBJECT, JiraFieldType.ARRAY_OBJECT}:
95 if not self.attr_name:
96 raise ValueError(
97 f"'attr_name' is required for field_type='{self.field_type}'"
98 )
99 elif self.attr_name is not None:
100 raise ValueError(
101 "'attr_name' is only allowed for field_type='object' or 'array_object'"
102 )
103 return self
105 @model_validator(mode="after")
106 def validate_reserved_name(self) -> "JiraExtraField":
107 if self.name in RESERVED_NAMES:
108 raise ValueError(
109 f"'name' cannot be one of reserved attributes: {sorted(RESERVED_NAMES)}"
110 )
111 return self
114class JiraProjectConfig(SourceConfig):
115 """Configuration for a Jira project."""
117 # Authentication
118 token: str | None = Field(
119 default=None, description="Jira API token or Personal Access Token"
120 )
121 email: str | None = Field(
122 default=None, description="Email associated with the API token (Cloud only)"
123 )
124 base_url: HttpUrl = Field(
125 ...,
126 description="Base URL of the Jira instance (e.g., 'https://your-domain.atlassian.net')",
127 )
129 # Project configuration
130 project_key: str = Field(
131 ..., description="Project key to process (e.g., 'PROJ')", min_length=1
132 )
134 # Deployment type
135 deployment_type: JiraDeploymentType = Field(
136 default=JiraDeploymentType.CLOUD,
137 description="Jira deployment type (cloud, datacenter, or server)",
138 )
140 # Rate limiting
141 requests_per_minute: int = Field(
142 default=60, description="Maximum number of requests per minute", ge=1, le=1000
143 )
145 # Pagination
146 page_size: int = Field(
147 default=100,
148 description="Number of items per page for paginated requests",
149 ge=1,
150 le=100,
151 )
153 # Attachment handling
154 download_attachments: bool = Field(
155 default=False, description="Whether to download and process issue attachments"
156 )
158 # Additional configuration
159 issue_types: list[str] = Field(
160 default=[],
161 description="Optional list of issue types to process (e.g., ['Bug', 'Story']). If empty, all types are processed.",
162 )
163 include_statuses: list[str] = Field(
164 default=[],
165 description="Optional list of statuses to include (e.g., ['Open', 'In Progress']). If empty, all statuses are included.",
166 )
168 # Issue filtering
169 updated_after: datetime | None = Field(
170 default=None,
171 description="Only fetch issues updated after this datetime. Supports ISO 8601 format (e.g., '2026-05-04T12:00:00') or relative format (e.g., '-2 days', '-48h', '-1w'). Set to None to fetch all issues.",
172 )
173 extra_fields: list[JiraExtraField] | None = Field(
174 default=None,
175 description="Optional list of extra Jira fields to retrieve with their extraction type.",
176 )
178 model_config = ConfigDict(validate_default=True, arbitrary_types_allowed=True)
180 @field_validator("deployment_type", mode="before")
181 @classmethod
182 def auto_detect_deployment_type(
183 cls, v: str | JiraDeploymentType
184 ) -> JiraDeploymentType:
185 """Auto-detect deployment type if not specified."""
186 if isinstance(v, str):
187 return JiraDeploymentType(v.lower())
188 return v
190 @field_validator("token", mode="after")
191 @classmethod
192 def load_token_from_env(cls, v: str | None) -> str | None:
193 """Load token from environment variable if not provided."""
194 return v or os.getenv("JIRA_TOKEN")
196 @field_validator("email", mode="after")
197 @classmethod
198 def load_email_from_env(cls, v: str | None) -> str | None:
199 """Load email from environment variable if not provided."""
200 return v or os.getenv("JIRA_EMAIL")
202 @model_validator(mode="after")
203 def validate_no_placeholders(self) -> Self:
204 """Fail immediately if any required field still contains an un-substituted ${VAR} placeholder."""
205 import re
207 _placeholder = re.compile(r"\$\{[^}]+\}")
209 fields_to_check: dict[str, str | None] = {
210 "project_key": self.project_key,
211 "base_url": str(self.base_url) if self.base_url else None,
212 "token": self.token,
213 "email": self.email,
214 }
216 bad: list[str] = []
217 for field_name, value in fields_to_check.items():
218 if value and _placeholder.search(value):
219 # Extract the variable name for a helpful hint
220 var = _placeholder.search(value).group(0) # type: ignore[union-attr]
221 bad.append(f" - {field_name}: {var} (env var not set)")
223 if bad:
224 raise ValueError(
225 "Jira source config contains un-substituted environment variables.\n"
226 "Set the following variables in your .env file or shell before running:\n"
227 + "\n".join(bad)
228 )
230 return self
232 @model_validator(mode="after")
233 def validate_auth_config(self) -> Self:
234 """Validate authentication configuration based on deployment type."""
235 if self.deployment_type == JiraDeploymentType.CLOUD:
236 # Cloud requires email and token
237 if not self.email:
238 raise ValueError("Email is required for Jira Cloud deployment")
239 if not self.token:
240 raise ValueError("API token is required for Jira Cloud deployment")
241 else:
242 # Data Center/Server requires Personal Access Token
243 if not self.token:
244 raise ValueError(
245 "Personal Access Token is required for Jira Data Center/Server deployment"
246 )
248 return self
250 @field_validator("issue_types", "include_statuses")
251 @classmethod
252 def validate_list_items(cls, v: list[str]) -> list[str]:
253 """Validate that list items are not empty strings."""
254 if any(not item.strip() for item in v):
255 raise ValueError("List items cannot be empty strings")
256 return [item.strip() for item in v]
258 @field_validator("extra_fields")
259 @classmethod
260 def validate_extra_fields_unique(
261 cls, v: list[JiraExtraField] | None
262 ) -> list[JiraExtraField] | None:
263 """Validate that extra field param_names and names are unique."""
264 if v is None:
265 return v
266 param_names = [f.param_name for f in v if f.param_name is not None]
267 if len(param_names) != len(set(param_names)):
268 raise ValueError("Extra field 'param_name' values must be unique")
269 names = [f.name for f in v if f.name is not None]
270 if len(names) != len(set(names)):
271 raise ValueError("Extra field 'name' values must be unique")
272 return v
274 @field_validator("updated_after", mode="before")
275 @classmethod
276 def parse_updated_after(cls, v: str | datetime | None) -> datetime | None:
277 """Parse updated_after field supporting relative date strings.
279 Supports formats like:
280 - ISO 8601: "2026-05-04T12:00:00"
281 - Relative: "-2 days", "-48h", "-2d", "-1w"
282 - None: fetch all issues
283 """
284 if v is None or isinstance(v, datetime):
285 return v
287 if isinstance(v, str):
288 import re
290 # Try to parse as ISO 8601 datetime first
291 try:
292 return datetime.fromisoformat(v)
293 except ValueError:
294 pass
296 # Parse relative dates like "-2 days", "-48h", "-2d", "-1w"
297 match = re.match(
298 r"^-(\d+)\s*(days?|hours?|h|d|w|weeks?)$", v.strip(), re.IGNORECASE
299 )
300 if match:
301 amount = int(match.group(1))
302 unit = match.group(2).lower()
304 if unit in ("day", "days", "d"):
305 return datetime.now() - timedelta(days=amount)
306 elif unit in ("hour", "hours", "h"):
307 return datetime.now() - timedelta(hours=amount)
308 elif unit in ("week", "weeks", "w"):
309 return datetime.now() - timedelta(weeks=amount)
311 raise ValueError(
312 f"Invalid updated_after format: '{v}'. "
313 "Use ISO 8601 (e.g., '2026-05-04T12:00:00') or relative format (e.g., '-2 days', '-48h', '-1w')"
314 )
316 raise ValueError(f"updated_after must be a datetime or string, got {type(v)}")