Coverage for src / qdrant_loader / connectors / jira / connector.py: 72%

229 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Jira connector implementation.""" 

2 

3import asyncio 

4import warnings 

5from abc import abstractmethod 

6from collections.abc import AsyncGenerator, AsyncIterator 

7from datetime import datetime 

8from urllib.parse import urlparse # noqa: F401 - may be used in URL handling 

9 

10import requests 

11from requests.auth import HTTPBasicAuth # noqa: F401 - compatibility 

12 

13from qdrant_loader.config.types import SourceType 

14from qdrant_loader.connectors.base import BaseConnector, ConnectorConfigurationError 

15from qdrant_loader.connectors.jira.auth import ( 

16 auto_detect_deployment_type as _auto_detect_type, 

17) 

18from qdrant_loader.connectors.jira.auth import setup_authentication as _setup_auth 

19from qdrant_loader.connectors.jira.config import ( 

20 JiraDeploymentType, 

21 JiraExtraField, 

22 JiraProjectConfig, 

23) 

24from qdrant_loader.connectors.jira.mappers import ( 

25 parse_attachment as _parse_attachment_helper, 

26) 

27from qdrant_loader.connectors.jira.mappers import parse_comment as _parse_comment_helper 

28from qdrant_loader.connectors.jira.mappers import parse_issue as _parse_issue_helper 

29from qdrant_loader.connectors.jira.mappers import parse_user as _parse_user_helper 

30from qdrant_loader.connectors.jira.models import ( 

31 JiraAttachment, 

32 JiraComment, 

33 JiraIssue, 

34 JiraUser, 

35) 

36from qdrant_loader.connectors.shared.attachments import AttachmentReader 

37from qdrant_loader.connectors.shared.attachments.metadata import ( 

38 jira_attachment_to_metadata, 

39) 

40from qdrant_loader.connectors.shared.http import ( 

41 RateLimiter, 

42) 

43from qdrant_loader.connectors.shared.http import ( 

44 request_with_policy as _http_request_with_policy, 

45) 

46from qdrant_loader.core.attachment_downloader import ( 

47 AttachmentDownloader, 

48 AttachmentMetadata, 

49) 

50from qdrant_loader.core.document import Document 

51from qdrant_loader.core.file_conversion import ( 

52 FileConversionConfig, 

53 FileConverter, 

54 FileDetector, 

55) 

56from qdrant_loader.utils.logging import LoggingConfig 

57 

58logger = LoggingConfig.get_logger(__name__) 

59 

60 

61class BaseJiraConnector(BaseConnector): 

62 """Base class for all Jira connectors.""" 

63 

64 def __init__(self, config: JiraProjectConfig): 

65 """Initialize the Jira connector. 

66 

67 Args: 

68 config: The Jira configuration. 

69 

70 Raises: 

71 ValueError: If required authentication parameters are not set. 

72 """ 

73 super().__init__(config) 

74 self.config = config 

75 self.base_url = str(config.base_url).rstrip("/") 

76 

77 # Initialize session 

78 self.session = requests.Session() 

79 

80 # Set up authentication based on deployment type 

81 self._setup_authentication() 

82 

83 self._last_sync: datetime | None = None 

84 self._rate_limiter = RateLimiter.per_minute(self.config.requests_per_minute) 

85 self._initialized = False 

86 

87 # Initialize file conversion components if enabled 

88 self.file_converter: FileConverter | None = None 

89 self.file_detector: FileDetector | None = None 

90 self.attachment_reader: AttachmentReader | None = None 

91 

92 if config.enable_file_conversion: 

93 self.file_detector = FileDetector() 

94 # FileConverter will be initialized when file_conversion_config is set 

95 

96 if config.download_attachments: 

97 self.attachment_reader = AttachmentReader( 

98 session=self.session, 

99 downloader=AttachmentDownloader(session=self.session), 

100 ) 

101 

102 def _setup_authentication(self): 

103 """Set up authentication based on deployment type.""" 

104 _setup_auth(self.session, self.config) 

105 

106 def _auto_detect_deployment_type(self) -> JiraDeploymentType: 

107 """Auto-detect the Jira deployment type based on the base URL. 

108 

109 Returns: 

110 JiraDeploymentType: Detected deployment type 

111 """ 

112 return _auto_detect_type(str(self.base_url)) 

113 

114 def set_file_conversion_config(self, config: FileConversionConfig) -> None: 

115 """Set the file conversion configuration. 

116 

117 Args: 

118 config: File conversion configuration 

119 """ 

120 if self.config.enable_file_conversion and self.file_detector: 

121 self.file_converter = FileConverter(config) 

122 if self.config.download_attachments: 

123 # Clean up any existing attachment reader to avoid resource leaks 

124 old_reader = self.attachment_reader 

125 if old_reader is not None: 

126 try: 

127 close_callable = None 

128 if hasattr(old_reader, "aclose"): 

129 close_callable = old_reader.aclose 

130 elif hasattr(old_reader, "close"): 

131 close_callable = old_reader.close 

132 elif hasattr(old_reader, "cleanup"): 

133 close_callable = old_reader.cleanup 

134 

135 if close_callable is not None: 

136 result = close_callable() 

137 if asyncio.iscoroutine(result): 

138 try: 

139 # Try to schedule/await coroutine cleanup safely 

140 try: 

141 loop = asyncio.get_running_loop() 

142 except RuntimeError: 

143 loop = None 

144 if loop and not loop.is_closed(): 

145 loop.create_task(result) 

146 else: 

147 asyncio.run(result) 

148 except Exception: 

149 # Ignore cleanup errors to not block reconfiguration 

150 pass 

151 except Exception: 

152 # Ignore cleanup errors to avoid masking the config update 

153 pass 

154 

155 # Drop reference before creating a new reader 

156 self.attachment_reader = None 

157 

158 # Reinitialize reader with new downloader config 

159 self.attachment_reader = AttachmentReader( 

160 session=self.session, 

161 downloader=AttachmentDownloader( 

162 session=self.session, 

163 file_conversion_config=config, 

164 enable_file_conversion=True, 

165 max_attachment_size=config.max_file_size, 

166 ), 

167 ) 

168 

169 async def _validate_connection(self) -> None: 

170 """Validate connectivity, auth, and project access before use. 

171 

172 Raises: 

173 ConnectorConfigurationError: for invalid URL, bad credentials, 

174 missing permissions, or unknown project key. 

175 """ 

176 # ── Step 1: reachability + authentication (/myself endpoint) ────────── 

177 try: 

178 await self._make_request("GET", "myself") 

179 except requests.exceptions.Timeout as exc: 

180 raise ConnectorConfigurationError( 

181 f"Connection to Jira at '{self.base_url}' timed out. " 

182 "Verify network connectivity and try again." 

183 ) from exc 

184 except requests.exceptions.ConnectionError as exc: 

185 raise ConnectorConfigurationError( 

186 f"Cannot connect to Jira at '{self.base_url}'. " 

187 "Verify that base_url is correct and the server is reachable." 

188 ) from exc 

189 except requests.exceptions.HTTPError as exc: 

190 status = exc.response.status_code if exc.response is not None else None 

191 if status == 401: 

192 raise ConnectorConfigurationError( 

193 f"Authentication failed for Jira at '{self.base_url}' (HTTP 401). " 

194 "Check that token and email are valid." 

195 ) from exc 

196 if status == 403: 

197 raise ConnectorConfigurationError( 

198 f"Access denied to Jira at '{self.base_url}' (HTTP 403). " 

199 "The account does not have sufficient permissions." 

200 ) from exc 

201 raise ConnectorConfigurationError( 

202 f"Validation request to Jira at '{self.base_url}' failed " 

203 f"with HTTP {status}: {exc}" 

204 ) from exc 

205 except requests.exceptions.RequestException as exc: 

206 raise ConnectorConfigurationError( 

207 f"Validation request to Jira at '{self.base_url}' failed: {exc}" 

208 ) from exc 

209 

210 # ── Step 2: project key exists and is accessible ─────────────────────── 

211 try: 

212 await self._make_request("GET", f"project/{self.config.project_key}") 

213 except requests.exceptions.Timeout as exc: 

214 raise ConnectorConfigurationError( 

215 f"Connection to Jira at '{self.base_url}' timed out while validating " 

216 f"project '{self.config.project_key}'." 

217 ) from exc 

218 except requests.exceptions.ConnectionError as exc: 

219 raise ConnectorConfigurationError( 

220 f"Connection to Jira at '{self.base_url}' was lost while validating " 

221 f"project '{self.config.project_key}' (between validation steps). " 

222 "Verify network connectivity and Jira availability." 

223 ) from exc 

224 except requests.exceptions.HTTPError as exc: 

225 status = exc.response.status_code if exc.response is not None else None 

226 if status == 404: 

227 raise ConnectorConfigurationError( 

228 f"Project '{self.config.project_key}' not found in Jira (HTTP 404). " 

229 "Check that project_key is correct." 

230 ) from exc 

231 if status == 403: 

232 raise ConnectorConfigurationError( 

233 f"No permission to access project '{self.config.project_key}' " 

234 f"in Jira (HTTP 403)." 

235 ) from exc 

236 raise ConnectorConfigurationError( 

237 f"Validation request for project '{self.config.project_key}' at " 

238 f"'{self.base_url}' failed with HTTP {status}: {exc}" 

239 ) from exc 

240 except requests.exceptions.RequestException as exc: 

241 raise ConnectorConfigurationError( 

242 f"Validation request for project '{self.config.project_key}' at " 

243 f"'{self.base_url}' failed: {exc}" 

244 ) from exc 

245 

246 @staticmethod 

247 def _escape_jql_literal(value: str) -> str: 

248 """Escape special characters in JQL string literals. 

249 

250 Escapes backslashes and double quotes to prevent JQL injection 

251 and query breaking when config values contain these characters. 

252 

253 Args: 

254 value: The string value to escape 

255 

256 Returns: 

257 str: The escaped string safe for inclusion in JQL quoted literals 

258 """ 

259 # Replace backslash first to avoid double-escaping 

260 value = value.replace("\\", "\\\\") 

261 # Then escape double quotes 

262 value = value.replace('"', '\\"') 

263 return value 

264 

265 def _build_jql_filter(self, updated_after: datetime | None = None) -> str: 

266 """Build JQL filter query with project key, issue types, and statuses. 

267 

268 Args: 

269 updated_after: Optional datetime to filter issues updated after this time 

270 

271 Returns: 

272 str: JQL filter query 

273 """ 

274 escaped_project_key = self._escape_jql_literal(self.config.project_key) 

275 jql = f'project = "{escaped_project_key}"' 

276 

277 # Add issue type filter if configured 

278 if self.config.issue_types: 

279 escaped_types = [ 

280 self._escape_jql_literal(t) for t in self.config.issue_types 

281 ] 

282 types_str = ", ".join(f'"{t}"' for t in escaped_types) 

283 jql += f" AND type IN ({types_str})" 

284 logger.debug(f"Applied JIRA issue type filter: {self.config.issue_types}") 

285 

286 # Add status filter if configured 

287 if self.config.include_statuses: 

288 escaped_statuses = [ 

289 self._escape_jql_literal(s) for s in self.config.include_statuses 

290 ] 

291 statuses_str = ", ".join(f'"{s}"' for s in escaped_statuses) 

292 jql += f" AND status IN ({statuses_str})" 

293 logger.debug(f"Applied JIRA status filter: {self.config.include_statuses}") 

294 

295 # Add updated_after filter if provided 

296 if updated_after: 

297 jql += f" AND updated >= '{updated_after.strftime('%Y-%m-%d %H:%M')}'" 

298 

299 return jql 

300 

301 async def __aenter__(self): 

302 """Async context manager entry.""" 

303 if not self._initialized: 

304 await self._validate_connection() 

305 self._initialized = True 

306 return self 

307 

308 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

309 """Async context manager exit.""" 

310 try: 

311 self.session.close() 

312 finally: 

313 self._initialized = False 

314 

315 @abstractmethod 

316 def _get_api_url(self, endpoint: str) -> str: 

317 """Construct the full API URL for an endpoint.""" 

318 ... 

319 

320 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict: 

321 """Make an authenticated request to the Jira API. 

322 

323 Args: 

324 method: HTTP method 

325 endpoint: API endpoint path 

326 **kwargs: Additional request parameters 

327 

328 Returns: 

329 dict: Response data 

330 

331 Raises: 

332 requests.exceptions.RequestException: If the request fails 

333 """ 

334 url = self._get_api_url(endpoint) 

335 

336 if "timeout" not in kwargs: 

337 kwargs["timeout"] = 60 

338 

339 try: 

340 logger.debug( 

341 "Making JIRA API request", 

342 method=method, 

343 endpoint=endpoint, 

344 url=url, 

345 timeout=kwargs.get("timeout"), 

346 ) 

347 

348 if not self.session.headers.get("Authorization"): 

349 kwargs["auth"] = self.session.auth 

350 

351 response = await _http_request_with_policy( 

352 self.session, 

353 method, 

354 url, 

355 rate_limiter=self._rate_limiter, 

356 retries=3, 

357 backoff_factor=0.5, 

358 status_forcelist=(429, 500, 502, 503, 504), 

359 overall_timeout=90.0, 

360 **kwargs, 

361 ) 

362 

363 response.raise_for_status() 

364 

365 logger.debug( 

366 "JIRA API request completed successfully", 

367 method=method, 

368 endpoint=endpoint, 

369 status_code=response.status_code, 

370 response_size=( 

371 len(response.content) if hasattr(response, "content") else 0 

372 ), 

373 ) 

374 

375 return response.json() 

376 

377 except TimeoutError: 

378 logger.error( 

379 "JIRA API request timed out", 

380 method=method, 

381 url=url, 

382 timeout=kwargs.get("timeout"), 

383 ) 

384 raise requests.exceptions.Timeout( 

385 f"Request to {url} timed out after {kwargs.get('timeout')} seconds" 

386 ) 

387 

388 except requests.exceptions.RequestException as e: 

389 logger.error( 

390 "Failed to make request to JIRA API", 

391 method=method, 

392 url=url, 

393 error=str(e), 

394 error_type=type(e).__name__, 

395 ) 

396 logger.error( 

397 "Request details", 

398 deployment_type=self.config.deployment_type, 

399 has_auth_header=bool(self.session.headers.get("Authorization")), 

400 has_session_auth=bool(self.session.auth), 

401 ) 

402 raise 

403 

404 @abstractmethod 

405 async def get_issues( 

406 self, updated_after: datetime | None = None 

407 ) -> AsyncGenerator[JiraIssue, None]: 

408 """Get all issues from Jira.""" 

409 ... 

410 

411 def _parse_issue( 

412 self, raw_issue: dict, extra_fields: list[JiraExtraField] | None = None 

413 ) -> JiraIssue: 

414 """Parse a raw issue from the Jira response into a JiraIssue object.""" 

415 return _parse_issue_helper(raw_issue, extra_fields) 

416 

417 def _parse_user( 

418 self, raw_user: dict | None, required: bool = False 

419 ) -> JiraUser | None: 

420 """Parse a raw user from the Jira response into a JiraUser object.""" 

421 return _parse_user_helper(raw_user, required) 

422 

423 def _parse_attachment(self, raw_attachment: dict) -> JiraAttachment: 

424 """Parse a raw attachment from the Jira response into a JiraAttachment object.""" 

425 return _parse_attachment_helper(raw_attachment) 

426 

427 def _parse_comment(self, raw_comment: dict) -> JiraComment: 

428 """Parse a raw comment from the Jira response into a JiraComment object.""" 

429 return _parse_comment_helper(raw_comment) 

430 

431 def _get_issue_attachments(self, issue: JiraIssue) -> list[AttachmentMetadata]: 

432 """Convert JIRA issue attachments to AttachmentMetadata objects. 

433 

434 Args: 

435 issue: JIRA issue with attachments 

436 

437 Returns: 

438 List of attachment metadata objects 

439 """ 

440 if not self.config.download_attachments or not issue.attachments: 

441 return [] 

442 

443 attachment_metadata = [ 

444 jira_attachment_to_metadata(att, parent_id=issue.id) 

445 for att in issue.attachments 

446 ] 

447 

448 return attachment_metadata 

449 

450 async def fetch_by_id(self, entity_id: str) -> Document | None: 

451 """Fetch a single Jira issue by key or numeric ID.""" 

452 try: 

453 raw_issue = await self._make_request("GET", f"issue/{entity_id}") 

454 issue = self._parse_issue(raw_issue, self.config.extra_fields) 

455 documents = await self._issues_to_documents( 

456 [issue], include_attachments=False 

457 ) 

458 except Exception as exc: 

459 logger.error( 

460 "Failed to fetch/parse Jira issue by id", 

461 entity_id=entity_id, 

462 error=str(exc), 

463 ) 

464 return None 

465 

466 return documents[0] if documents else None 

467 

468 async def list_entity_ids(self) -> AsyncIterator[str]: 

469 """Stream all issue keys in the configured project.""" 

470 async for issue in self.get_issues(): 

471 yield issue.key 

472 

473 async def _issues_to_documents( 

474 self, issues: list[JiraIssue], include_attachments: bool = True 

475 ) -> list[Document]: 

476 """Convert Jira issues to Document objects. 

477 

478 Set ``include_attachments=False`` for single-entity fetches where the 

479 parent document must remain available even if attachment materialization 

480 fails. 

481 

482 .. deprecated:: WS-1 

483 Use :meth:`_stream_issues_to_documents` for streaming. 

484 """ 

485 documents: list[Document] = [] 

486 async for document in self._stream_issues_to_documents( 

487 issues, include_attachments=include_attachments 

488 ): 

489 documents.append(document) 

490 return documents 

491 

492 async def _stream_issues_to_documents( 

493 self, issues: list[JiraIssue], include_attachments: bool = True 

494 ) -> AsyncGenerator[Document, None]: 

495 """Stream Jira issues as Document objects, including attachments. 

496 

497 Yields documents one at a time, with attachments yielded immediately after 

498 their parent issue document. 

499 """ 

500 for issue in issues: 

501 content_parts = [issue.summary] 

502 if issue.description: 

503 content_parts.append(issue.description) 

504 

505 for comment in issue.comments: 

506 content_parts.append( 

507 f"\nComment by {comment.author.display_name} on {comment.created.strftime('%Y-%m-%d %H:%M')}:" 

508 ) 

509 content_parts.append(comment.body) 

510 

511 content = "\n\n".join(content_parts) 

512 metadata = { 

513 "project": self.config.project_key, 

514 "issue_type": issue.issue_type, 

515 "status": issue.status, 

516 "key": issue.key, 

517 "priority": issue.priority, 

518 "labels": issue.labels, 

519 "reporter": issue.reporter.display_name if issue.reporter else None, 

520 "assignee": issue.assignee.display_name if issue.assignee else None, 

521 "created": issue.created.isoformat(), 

522 "updated": issue.updated.isoformat(), 

523 "parent_key": issue.parent_key, 

524 "subtasks": issue.subtasks, 

525 "linked_issues": issue.linked_issues, 

526 "comments": [ 

527 { 

528 "id": comment.id, 

529 "body": comment.body, 

530 "created": comment.created.isoformat(), 

531 "updated": ( 

532 comment.updated.isoformat() if comment.updated else None 

533 ), 

534 "author": ( 

535 comment.author.display_name if comment.author else None 

536 ), 

537 } 

538 for comment in issue.comments 

539 ], 

540 "attachments": ( 

541 [ 

542 { 

543 "id": att.id, 

544 "filename": att.filename, 

545 "size": att.size, 

546 "mime_type": att.mime_type, 

547 "created": att.created.isoformat(), 

548 "author": (att.author.display_name if att.author else None), 

549 } 

550 for att in issue.attachments 

551 ] 

552 if issue.attachments 

553 else [] 

554 ), 

555 } 

556 if self.config.extra_fields: 

557 for field in self.config.extra_fields: 

558 metadata[field.name] = getattr(issue, field.name) 

559 base_url = str(self.config.base_url).rstrip("/") 

560 document = Document( 

561 id=issue.id, 

562 content=content, 

563 content_type="text", 

564 source=self.config.source, 

565 source_type=SourceType.JIRA, 

566 created_at=issue.created, 

567 url=f"{base_url}/browse/{issue.key}", 

568 title=issue.summary, 

569 updated_at=issue.updated, 

570 is_deleted=False, 

571 metadata=metadata, 

572 ) 

573 logger.debug( 

574 "Jira document created", 

575 document_id=document.id, 

576 source_type=document.source_type, 

577 source=document.source, 

578 title=document.title, 

579 ) 

580 yield document 

581 

582 if ( 

583 include_attachments 

584 and self.config.download_attachments 

585 and self.attachment_reader 

586 ): 

587 attachment_metadata = self._get_issue_attachments(issue) 

588 if attachment_metadata: 

589 logger.info( 

590 "Processing attachments for JIRA issue", 

591 issue_key=issue.key, 

592 attachment_count=len(attachment_metadata), 

593 ) 

594 

595 attachment_documents = ( 

596 await self.attachment_reader.fetch_and_process( 

597 attachment_metadata, document 

598 ) 

599 ) 

600 for attachment_document in attachment_documents: 

601 yield attachment_document 

602 

603 logger.debug( 

604 "Processed attachments for JIRA issue", 

605 issue_key=issue.key, 

606 processed_count=len(attachment_documents), 

607 ) 

608 

609 async def stream_documents( 

610 self, since: datetime | None = None 

611 ) -> AsyncGenerator[Document, None]: 

612 """Stream documents from Jira (WS-1 connector contract).""" 

613 effective_since = since if since is not None else self.config.updated_after 

614 async for issue in self.get_issues(updated_after=effective_since): 

615 async for document in self._stream_issues_to_documents([issue]): 

616 yield document 

617 

618 async def get_documents(self) -> list[Document]: 

619 """Fetch and process documents from Jira (DEPRECATED - use stream_documents).""" 

620 warnings.warn( 

621 "BaseJiraConnector.get_documents is deprecated. Implement stream_documents() " 

622 "or use connector.stream_documents() to avoid materializing the full " 

623 "document list in memory.", 

624 DeprecationWarning, 

625 stacklevel=2, 

626 ) 

627 documents = [] 

628 async for document in self.stream_documents(): 

629 documents.append(document) 

630 return documents