Coverage for src / qdrant_loader / connectors / jira / connector.py: 70%

200 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Jira connector implementation.""" 

2 

3import asyncio 

4from abc import abstractmethod 

5from collections.abc import AsyncGenerator 

6from datetime import datetime 

7from urllib.parse import urlparse # noqa: F401 - may be used in URL handling 

8 

9import requests 

10from requests.auth import HTTPBasicAuth # noqa: F401 - compatibility 

11 

12from qdrant_loader.config.types import SourceType 

13from qdrant_loader.connectors.base import BaseConnector, ConnectorConfigurationError 

14from qdrant_loader.connectors.jira.auth import ( 

15 auto_detect_deployment_type as _auto_detect_type, 

16) 

17from qdrant_loader.connectors.jira.auth import setup_authentication as _setup_auth 

18from qdrant_loader.connectors.jira.config import JiraDeploymentType, JiraProjectConfig 

19from qdrant_loader.connectors.jira.mappers import ( 

20 parse_attachment as _parse_attachment_helper, 

21) 

22from qdrant_loader.connectors.jira.mappers import parse_comment as _parse_comment_helper 

23from qdrant_loader.connectors.jira.mappers import parse_issue as _parse_issue_helper 

24from qdrant_loader.connectors.jira.mappers import parse_user as _parse_user_helper 

25from qdrant_loader.connectors.jira.models import ( 

26 JiraAttachment, 

27 JiraComment, 

28 JiraIssue, 

29 JiraUser, 

30) 

31from qdrant_loader.connectors.shared.attachments import AttachmentReader 

32from qdrant_loader.connectors.shared.attachments.metadata import ( 

33 jira_attachment_to_metadata, 

34) 

35from qdrant_loader.connectors.shared.http import ( 

36 RateLimiter, 

37) 

38from qdrant_loader.connectors.shared.http import ( 

39 request_with_policy as _http_request_with_policy, 

40) 

41from qdrant_loader.core.attachment_downloader import ( 

42 AttachmentDownloader, 

43 AttachmentMetadata, 

44) 

45from qdrant_loader.core.document import Document 

46from qdrant_loader.core.file_conversion import ( 

47 FileConversionConfig, 

48 FileConverter, 

49 FileDetector, 

50) 

51from qdrant_loader.utils.logging import LoggingConfig 

52 

53logger = LoggingConfig.get_logger(__name__) 

54 

55 

56class BaseJiraConnector(BaseConnector): 

57 """Base class for all Jira connectors.""" 

58 

59 def __init__(self, config: JiraProjectConfig): 

60 """Initialize the Jira connector. 

61 

62 Args: 

63 config: The Jira configuration. 

64 

65 Raises: 

66 ValueError: If required authentication parameters are not set. 

67 """ 

68 super().__init__(config) 

69 self.config = config 

70 self.base_url = str(config.base_url).rstrip("/") 

71 

72 # Initialize session 

73 self.session = requests.Session() 

74 

75 # Set up authentication based on deployment type 

76 self._setup_authentication() 

77 

78 self._last_sync: datetime | None = None 

79 self._rate_limiter = RateLimiter.per_minute(self.config.requests_per_minute) 

80 self._initialized = False 

81 

82 # Initialize file conversion components if enabled 

83 self.file_converter: FileConverter | None = None 

84 self.file_detector: FileDetector | None = None 

85 self.attachment_reader: AttachmentReader | None = None 

86 

87 if config.enable_file_conversion: 

88 self.file_detector = FileDetector() 

89 # FileConverter will be initialized when file_conversion_config is set 

90 

91 if config.download_attachments: 

92 self.attachment_reader = AttachmentReader( 

93 session=self.session, 

94 downloader=AttachmentDownloader(session=self.session), 

95 ) 

96 

97 def _setup_authentication(self): 

98 """Set up authentication based on deployment type.""" 

99 _setup_auth(self.session, self.config) 

100 

101 def _auto_detect_deployment_type(self) -> JiraDeploymentType: 

102 """Auto-detect the Jira deployment type based on the base URL. 

103 

104 Returns: 

105 JiraDeploymentType: Detected deployment type 

106 """ 

107 return _auto_detect_type(str(self.base_url)) 

108 

109 def set_file_conversion_config(self, config: FileConversionConfig) -> None: 

110 """Set the file conversion configuration. 

111 

112 Args: 

113 config: File conversion configuration 

114 """ 

115 if self.config.enable_file_conversion and self.file_detector: 

116 self.file_converter = FileConverter(config) 

117 if self.config.download_attachments: 

118 # Clean up any existing attachment reader to avoid resource leaks 

119 old_reader = self.attachment_reader 

120 if old_reader is not None: 

121 try: 

122 close_callable = None 

123 if hasattr(old_reader, "aclose"): 

124 close_callable = old_reader.aclose 

125 elif hasattr(old_reader, "close"): 

126 close_callable = old_reader.close 

127 elif hasattr(old_reader, "cleanup"): 

128 close_callable = old_reader.cleanup 

129 

130 if close_callable is not None: 

131 result = close_callable() 

132 if asyncio.iscoroutine(result): 

133 try: 

134 # Try to schedule/await coroutine cleanup safely 

135 try: 

136 loop = asyncio.get_running_loop() 

137 except RuntimeError: 

138 loop = None 

139 if loop and not loop.is_closed(): 

140 loop.create_task(result) 

141 else: 

142 asyncio.run(result) 

143 except Exception: 

144 # Ignore cleanup errors to not block reconfiguration 

145 pass 

146 except Exception: 

147 # Ignore cleanup errors to avoid masking the config update 

148 pass 

149 

150 # Drop reference before creating a new reader 

151 self.attachment_reader = None 

152 

153 # Reinitialize reader with new downloader config 

154 self.attachment_reader = AttachmentReader( 

155 session=self.session, 

156 downloader=AttachmentDownloader( 

157 session=self.session, 

158 file_conversion_config=config, 

159 enable_file_conversion=True, 

160 max_attachment_size=config.max_file_size, 

161 ), 

162 ) 

163 

164 async def _validate_connection(self) -> None: 

165 """Validate connectivity, auth, and project access before use. 

166 

167 Raises: 

168 ConnectorConfigurationError: for invalid URL, bad credentials, 

169 missing permissions, or unknown project key. 

170 """ 

171 # ── Step 1: reachability + authentication (/myself endpoint) ────────── 

172 try: 

173 await self._make_request("GET", "myself") 

174 except requests.exceptions.Timeout as exc: 

175 raise ConnectorConfigurationError( 

176 f"Connection to Jira at '{self.base_url}' timed out. " 

177 "Verify network connectivity and try again." 

178 ) from exc 

179 except requests.exceptions.ConnectionError as exc: 

180 raise ConnectorConfigurationError( 

181 f"Cannot connect to Jira at '{self.base_url}'. " 

182 "Verify that base_url is correct and the server is reachable." 

183 ) from exc 

184 except requests.exceptions.HTTPError as exc: 

185 status = exc.response.status_code if exc.response is not None else None 

186 if status == 401: 

187 raise ConnectorConfigurationError( 

188 f"Authentication failed for Jira at '{self.base_url}' (HTTP 401). " 

189 "Check that token and email are valid." 

190 ) from exc 

191 if status == 403: 

192 raise ConnectorConfigurationError( 

193 f"Access denied to Jira at '{self.base_url}' (HTTP 403). " 

194 "The account does not have sufficient permissions." 

195 ) from exc 

196 raise ConnectorConfigurationError( 

197 f"Validation request to Jira at '{self.base_url}' failed " 

198 f"with HTTP {status}: {exc}" 

199 ) from exc 

200 except requests.exceptions.RequestException as exc: 

201 raise ConnectorConfigurationError( 

202 f"Validation request to Jira at '{self.base_url}' failed: {exc}" 

203 ) from exc 

204 

205 # ── Step 2: project key exists and is accessible ─────────────────────── 

206 try: 

207 await self._make_request("GET", f"project/{self.config.project_key}") 

208 except requests.exceptions.Timeout as exc: 

209 raise ConnectorConfigurationError( 

210 f"Connection to Jira at '{self.base_url}' timed out while validating " 

211 f"project '{self.config.project_key}'." 

212 ) from exc 

213 except requests.exceptions.ConnectionError as exc: 

214 raise ConnectorConfigurationError( 

215 f"Connection to Jira at '{self.base_url}' was lost while validating " 

216 f"project '{self.config.project_key}' (between validation steps). " 

217 "Verify network connectivity and Jira availability." 

218 ) from exc 

219 except requests.exceptions.HTTPError as exc: 

220 status = exc.response.status_code if exc.response is not None else None 

221 if status == 404: 

222 raise ConnectorConfigurationError( 

223 f"Project '{self.config.project_key}' not found in Jira (HTTP 404). " 

224 "Check that project_key is correct." 

225 ) from exc 

226 if status == 403: 

227 raise ConnectorConfigurationError( 

228 f"No permission to access project '{self.config.project_key}' " 

229 f"in Jira (HTTP 403)." 

230 ) from exc 

231 raise ConnectorConfigurationError( 

232 f"Validation request for project '{self.config.project_key}' at " 

233 f"'{self.base_url}' failed with HTTP {status}: {exc}" 

234 ) from exc 

235 except requests.exceptions.RequestException as exc: 

236 raise ConnectorConfigurationError( 

237 f"Validation request for project '{self.config.project_key}' at " 

238 f"'{self.base_url}' failed: {exc}" 

239 ) from exc 

240 

241 @staticmethod 

242 def _escape_jql_literal(value: str) -> str: 

243 """Escape special characters in JQL string literals. 

244 

245 Escapes backslashes and double quotes to prevent JQL injection 

246 and query breaking when config values contain these characters. 

247 

248 Args: 

249 value: The string value to escape 

250 

251 Returns: 

252 str: The escaped string safe for inclusion in JQL quoted literals 

253 """ 

254 # Replace backslash first to avoid double-escaping 

255 value = value.replace("\\", "\\\\") 

256 # Then escape double quotes 

257 value = value.replace('"', '\\"') 

258 return value 

259 

260 def _build_jql_filter(self, updated_after: datetime | None = None) -> str: 

261 """Build JQL filter query with project key, issue types, and statuses. 

262 

263 Args: 

264 updated_after: Optional datetime to filter issues updated after this time 

265 

266 Returns: 

267 str: JQL filter query 

268 """ 

269 escaped_project_key = self._escape_jql_literal(self.config.project_key) 

270 jql = f'project = "{escaped_project_key}"' 

271 

272 # Add issue type filter if configured 

273 if self.config.issue_types: 

274 escaped_types = [ 

275 self._escape_jql_literal(t) for t in self.config.issue_types 

276 ] 

277 types_str = ", ".join(f'"{t}"' for t in escaped_types) 

278 jql += f" AND type IN ({types_str})" 

279 logger.debug(f"Applied JIRA issue type filter: {self.config.issue_types}") 

280 

281 # Add status filter if configured 

282 if self.config.include_statuses: 

283 escaped_statuses = [ 

284 self._escape_jql_literal(s) for s in self.config.include_statuses 

285 ] 

286 statuses_str = ", ".join(f'"{s}"' for s in escaped_statuses) 

287 jql += f" AND status IN ({statuses_str})" 

288 logger.debug(f"Applied JIRA status filter: {self.config.include_statuses}") 

289 

290 # Add updated_after filter if provided 

291 if updated_after: 

292 jql += f" AND updated >= '{updated_after.strftime('%Y-%m-%d %H:%M:%S')}'" 

293 

294 return jql 

295 

296 async def __aenter__(self): 

297 """Async context manager entry.""" 

298 if not self._initialized: 

299 await self._validate_connection() 

300 self._initialized = True 

301 return self 

302 

303 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

304 """Async context manager exit.""" 

305 try: 

306 self.session.close() 

307 finally: 

308 self._initialized = False 

309 

310 @abstractmethod 

311 def _get_api_url(self, endpoint: str) -> str: 

312 """Construct the full API URL for an endpoint.""" 

313 ... 

314 

315 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict: 

316 """Make an authenticated request to the Jira API. 

317 

318 Args: 

319 method: HTTP method 

320 endpoint: API endpoint path 

321 **kwargs: Additional request parameters 

322 

323 Returns: 

324 dict: Response data 

325 

326 Raises: 

327 requests.exceptions.RequestException: If the request fails 

328 """ 

329 url = self._get_api_url(endpoint) 

330 

331 if "timeout" not in kwargs: 

332 kwargs["timeout"] = 60 

333 

334 try: 

335 logger.debug( 

336 "Making JIRA API request", 

337 method=method, 

338 endpoint=endpoint, 

339 url=url, 

340 timeout=kwargs.get("timeout"), 

341 ) 

342 

343 if not self.session.headers.get("Authorization"): 

344 kwargs["auth"] = self.session.auth 

345 

346 response = await _http_request_with_policy( 

347 self.session, 

348 method, 

349 url, 

350 rate_limiter=self._rate_limiter, 

351 retries=3, 

352 backoff_factor=0.5, 

353 status_forcelist=(429, 500, 502, 503, 504), 

354 overall_timeout=90.0, 

355 **kwargs, 

356 ) 

357 

358 response.raise_for_status() 

359 

360 logger.debug( 

361 "JIRA API request completed successfully", 

362 method=method, 

363 endpoint=endpoint, 

364 status_code=response.status_code, 

365 response_size=( 

366 len(response.content) if hasattr(response, "content") else 0 

367 ), 

368 ) 

369 

370 return response.json() 

371 

372 except TimeoutError: 

373 logger.error( 

374 "JIRA API request timed out", 

375 method=method, 

376 url=url, 

377 timeout=kwargs.get("timeout"), 

378 ) 

379 raise requests.exceptions.Timeout( 

380 f"Request to {url} timed out after {kwargs.get('timeout')} seconds" 

381 ) 

382 

383 except requests.exceptions.RequestException as e: 

384 logger.error( 

385 "Failed to make request to JIRA API", 

386 method=method, 

387 url=url, 

388 error=str(e), 

389 error_type=type(e).__name__, 

390 ) 

391 logger.error( 

392 "Request details", 

393 deployment_type=self.config.deployment_type, 

394 has_auth_header=bool(self.session.headers.get("Authorization")), 

395 has_session_auth=bool(self.session.auth), 

396 ) 

397 raise 

398 

399 @abstractmethod 

400 async def get_issues( 

401 self, updated_after: datetime | None = None 

402 ) -> AsyncGenerator[JiraIssue, None]: 

403 """Get all issues from Jira.""" 

404 ... 

405 

406 def _parse_issue(self, raw_issue: dict) -> JiraIssue: 

407 """Parse a raw issue from the Jira response into a JiraIssue object.""" 

408 return _parse_issue_helper(raw_issue) 

409 

410 def _parse_user( 

411 self, raw_user: dict | None, required: bool = False 

412 ) -> JiraUser | None: 

413 """Parse a raw user from the Jira response into a JiraUser object.""" 

414 return _parse_user_helper(raw_user, required) 

415 

416 def _parse_attachment(self, raw_attachment: dict) -> JiraAttachment: 

417 """Parse a raw attachment from the Jira response into a JiraAttachment object.""" 

418 return _parse_attachment_helper(raw_attachment) 

419 

420 def _parse_comment(self, raw_comment: dict) -> JiraComment: 

421 """Parse a raw comment from the Jira response into a JiraComment object.""" 

422 return _parse_comment_helper(raw_comment) 

423 

424 def _get_issue_attachments(self, issue: JiraIssue) -> list[AttachmentMetadata]: 

425 """Convert JIRA issue attachments to AttachmentMetadata objects. 

426 

427 Args: 

428 issue: JIRA issue with attachments 

429 

430 Returns: 

431 List of attachment metadata objects 

432 """ 

433 if not self.config.download_attachments or not issue.attachments: 

434 return [] 

435 

436 attachment_metadata = [ 

437 jira_attachment_to_metadata(att, parent_id=issue.id) 

438 for att in issue.attachments 

439 ] 

440 

441 return attachment_metadata 

442 

443 async def get_documents(self) -> list[Document]: 

444 """Fetch and process documents from Jira. 

445 

446 Returns: 

447 List[Document]: List of processed documents 

448 """ 

449 documents = [] 

450 

451 # Collect all issues 

452 issues = [] 

453 async for issue in self.get_issues(): 

454 issues.append(issue) 

455 

456 # Convert issues to documents 

457 for issue in issues: 

458 # Build content including comments 

459 content_parts = [issue.summary] 

460 if issue.description: 

461 content_parts.append(issue.description) 

462 

463 # Add comments to content 

464 for comment in issue.comments: 

465 content_parts.append( 

466 f"\nComment by {comment.author.display_name} on {comment.created.strftime('%Y-%m-%d %H:%M')}:" 

467 ) 

468 content_parts.append(comment.body) 

469 

470 content = "\n\n".join(content_parts) 

471 

472 base_url = str(self.config.base_url).rstrip("/") 

473 document = Document( 

474 id=issue.id, 

475 content=content, 

476 content_type="text", 

477 source=self.config.source, 

478 source_type=SourceType.JIRA, 

479 created_at=issue.created, 

480 url=f"{base_url}/browse/{issue.key}", 

481 title=issue.summary, 

482 updated_at=issue.updated, 

483 is_deleted=False, 

484 metadata={ 

485 "project": self.config.project_key, 

486 "issue_type": issue.issue_type, 

487 "status": issue.status, 

488 "key": issue.key, 

489 "priority": issue.priority, 

490 "labels": issue.labels, 

491 "reporter": issue.reporter.display_name if issue.reporter else None, 

492 "assignee": issue.assignee.display_name if issue.assignee else None, 

493 "created": issue.created.isoformat(), 

494 "updated": issue.updated.isoformat(), 

495 "parent_key": issue.parent_key, 

496 "subtasks": issue.subtasks, 

497 "linked_issues": issue.linked_issues, 

498 "comments": [ 

499 { 

500 "id": comment.id, 

501 "body": comment.body, 

502 "created": comment.created.isoformat(), 

503 "updated": ( 

504 comment.updated.isoformat() if comment.updated else None 

505 ), 

506 "author": ( 

507 comment.author.display_name if comment.author else None 

508 ), 

509 } 

510 for comment in issue.comments 

511 ], 

512 "attachments": ( 

513 [ 

514 { 

515 "id": att.id, 

516 "filename": att.filename, 

517 "size": att.size, 

518 "mime_type": att.mime_type, 

519 "created": att.created.isoformat(), 

520 "author": ( 

521 att.author.display_name if att.author else None 

522 ), 

523 } 

524 for att in issue.attachments 

525 ] 

526 if issue.attachments 

527 else [] 

528 ), 

529 }, 

530 ) 

531 documents.append(document) 

532 logger.debug( 

533 "Jira document created", 

534 document_id=document.id, 

535 source_type=document.source_type, 

536 source=document.source, 

537 title=document.title, 

538 ) 

539 

540 # Process attachments if enabled 

541 if self.config.download_attachments and self.attachment_reader: 

542 attachment_metadata = self._get_issue_attachments(issue) 

543 if attachment_metadata: 

544 logger.info( 

545 "Processing attachments for JIRA issue", 

546 issue_key=issue.key, 

547 attachment_count=len(attachment_metadata), 

548 ) 

549 

550 attachment_documents = ( 

551 await self.attachment_reader.fetch_and_process( 

552 attachment_metadata, document 

553 ) 

554 ) 

555 documents.extend(attachment_documents) 

556 

557 logger.debug( 

558 "Processed attachments for JIRA issue", 

559 issue_key=issue.key, 

560 processed_count=len(attachment_documents), 

561 ) 

562 

563 return documents