Coverage for src/qdrant_loader/connectors/jira/connector.py: 75%

203 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Jira connector implementation.""" 

2 

3import asyncio 

4import time 

5from collections.abc import AsyncGenerator 

6from datetime import datetime 

7from urllib.parse import urlparse 

8 

9import requests 

10from requests.auth import HTTPBasicAuth 

11 

12from qdrant_loader.config.types import SourceType 

13from qdrant_loader.connectors.base import BaseConnector 

14from qdrant_loader.connectors.jira.config import JiraDeploymentType, JiraProjectConfig 

15from qdrant_loader.connectors.jira.models import ( 

16 JiraAttachment, 

17 JiraComment, 

18 JiraIssue, 

19 JiraUser, 

20) 

21from qdrant_loader.core.attachment_downloader import ( 

22 AttachmentDownloader, 

23 AttachmentMetadata, 

24) 

25from qdrant_loader.core.document import Document 

26from qdrant_loader.core.file_conversion import ( 

27 FileConversionConfig, 

28 FileConverter, 

29 FileDetector, 

30) 

31from qdrant_loader.utils.logging import LoggingConfig 

32 

33logger = LoggingConfig.get_logger(__name__) 

34 

35 

36class JiraConnector(BaseConnector): 

37 """Jira connector for fetching and processing issues.""" 

38 

39 def __init__(self, config: JiraProjectConfig): 

40 """Initialize the Jira connector. 

41 

42 Args: 

43 config: The Jira configuration. 

44 

45 Raises: 

46 ValueError: If required authentication parameters are not set. 

47 """ 

48 super().__init__(config) 

49 self.config = config 

50 self.base_url = str(config.base_url).rstrip("/") 

51 

52 # Initialize session 

53 self.session = requests.Session() 

54 

55 # Set up authentication based on deployment type 

56 self._setup_authentication() 

57 

58 self._last_sync: datetime | None = None 

59 self._rate_limit_lock = asyncio.Lock() 

60 self._last_request_time = 0.0 

61 self._initialized = False 

62 

63 # Initialize file conversion components if enabled 

64 self.file_converter: FileConverter | None = None 

65 self.file_detector: FileDetector | None = None 

66 self.attachment_downloader: AttachmentDownloader | None = None 

67 

68 if config.enable_file_conversion: 

69 self.file_detector = FileDetector() 

70 # FileConverter will be initialized when file_conversion_config is set 

71 

72 if config.download_attachments: 

73 self.attachment_downloader = AttachmentDownloader(session=self.session) 

74 

75 def _setup_authentication(self): 

76 """Set up authentication based on deployment type.""" 

77 if self.config.deployment_type == JiraDeploymentType.CLOUD: 

78 # Cloud uses Basic Auth with email:api_token 

79 if not self.config.token: 

80 raise ValueError("API token is required for Jira Cloud") 

81 if not self.config.email: 

82 raise ValueError("Email is required for Jira Cloud") 

83 

84 self.session.auth = HTTPBasicAuth(self.config.email, self.config.token) 

85 logger.debug( 

86 "Configured Jira Cloud authentication with email and API token" 

87 ) 

88 

89 else: 

90 # Data Center/Server uses Personal Access Token with Bearer authentication 

91 if not self.config.token: 

92 raise ValueError( 

93 "Personal Access Token is required for Jira Data Center/Server" 

94 ) 

95 

96 self.session.headers.update( 

97 { 

98 "Authorization": f"Bearer {self.config.token}", 

99 "Content-Type": "application/json", 

100 } 

101 ) 

102 logger.debug( 

103 "Configured Jira Data Center authentication with Personal Access Token" 

104 ) 

105 

106 def _auto_detect_deployment_type(self) -> JiraDeploymentType: 

107 """Auto-detect the Jira deployment type based on the base URL. 

108 

109 Returns: 

110 JiraDeploymentType: Detected deployment type 

111 """ 

112 try: 

113 parsed_url = urlparse(str(self.base_url)) 

114 hostname = parsed_url.hostname 

115 

116 if hostname is None: 

117 # If we can't parse the hostname, default to DATACENTER 

118 return JiraDeploymentType.DATACENTER 

119 

120 # Cloud instances use *.atlassian.net domains 

121 # Use proper hostname checking with endswith to ensure it's a subdomain 

122 if hostname.endswith(".atlassian.net") or hostname == "atlassian.net": 

123 return JiraDeploymentType.CLOUD 

124 

125 # Everything else is likely Data Center/Server 

126 return JiraDeploymentType.DATACENTER 

127 except Exception: 

128 # If URL parsing fails, default to DATACENTER 

129 return JiraDeploymentType.DATACENTER 

130 

131 def set_file_conversion_config(self, config: FileConversionConfig) -> None: 

132 """Set the file conversion configuration. 

133 

134 Args: 

135 config: File conversion configuration 

136 """ 

137 if self.config.enable_file_conversion and self.file_detector: 

138 self.file_converter = FileConverter(config) 

139 if self.config.download_attachments: 

140 # Reinitialize attachment downloader with file conversion config 

141 self.attachment_downloader = AttachmentDownloader( 

142 session=self.session, 

143 file_conversion_config=config, 

144 enable_file_conversion=True, 

145 max_attachment_size=config.max_file_size, 

146 ) 

147 

148 async def __aenter__(self): 

149 """Async context manager entry.""" 

150 if not self._initialized: 

151 self._initialized = True 

152 return self 

153 

154 async def __aexit__(self, exc_type, exc_val, exc_tb): 

155 """Async context manager exit.""" 

156 self._initialized = False 

157 

158 def _get_api_url(self, endpoint: str) -> str: 

159 """Construct the full API URL for an endpoint. 

160 

161 Args: 

162 endpoint: API endpoint path 

163 

164 Returns: 

165 str: Full API URL 

166 """ 

167 return f"{self.base_url}/rest/api/2/{endpoint}" 

168 

169 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict: 

170 """Make an authenticated request to the Jira API. 

171 

172 Args: 

173 method: HTTP method 

174 endpoint: API endpoint path 

175 **kwargs: Additional request parameters 

176 

177 Returns: 

178 dict: Response data 

179 

180 Raises: 

181 requests.exceptions.RequestException: If the request fails 

182 """ 

183 async with self._rate_limit_lock: 

184 # Calculate time to wait based on rate limit 

185 min_interval = 60.0 / self.config.requests_per_minute 

186 now = time.time() 

187 time_since_last_request = now - self._last_request_time 

188 

189 if time_since_last_request < min_interval: 

190 await asyncio.sleep(min_interval - time_since_last_request) 

191 

192 self._last_request_time = time.time() 

193 

194 url = self._get_api_url(endpoint) 

195 

196 # Add timeout to kwargs if not already specified 

197 if "timeout" not in kwargs: 

198 kwargs["timeout"] = 60 # 60 second timeout for HTTP requests 

199 

200 try: 

201 logger.debug( 

202 "Making JIRA API request", 

203 method=method, 

204 endpoint=endpoint, 

205 url=url, 

206 timeout=kwargs.get("timeout"), 

207 ) 

208 

209 # For Data Center with PAT, headers are already set 

210 # For Cloud, use session auth 

211 if not self.session.headers.get("Authorization"): 

212 kwargs["auth"] = self.session.auth 

213 

214 # Use asyncio.wait_for to add an additional timeout layer 

215 response = await asyncio.wait_for( 

216 asyncio.to_thread(self.session.request, method, url, **kwargs), 

217 timeout=90.0, # 90 second timeout for the entire operation 

218 ) 

219 

220 response.raise_for_status() 

221 

222 logger.debug( 

223 "JIRA API request completed successfully", 

224 method=method, 

225 endpoint=endpoint, 

226 status_code=response.status_code, 

227 response_size=( 

228 len(response.content) if hasattr(response, "content") else 0 

229 ), 

230 ) 

231 

232 return response.json() 

233 

234 except asyncio.TimeoutError: 

235 logger.error( 

236 "JIRA API request timed out", 

237 method=method, 

238 url=url, 

239 timeout=kwargs.get("timeout"), 

240 ) 

241 raise requests.exceptions.Timeout( 

242 f"Request to {url} timed out after {kwargs.get('timeout')} seconds" 

243 ) 

244 

245 except requests.exceptions.RequestException as e: 

246 logger.error( 

247 "Failed to make request to JIRA API", 

248 method=method, 

249 url=url, 

250 error=str(e), 

251 error_type=type(e).__name__, 

252 ) 

253 # Log additional context for debugging 

254 logger.error( 

255 "Request details", 

256 deployment_type=self.config.deployment_type, 

257 has_auth_header=bool(self.session.headers.get("Authorization")), 

258 has_session_auth=bool(self.session.auth), 

259 ) 

260 raise 

261 

262 def _make_sync_request(self, jql: str, **kwargs): 

263 """ 

264 Make a synchronous request to the Jira API, converting parameters as needed: 

265 - Format datetime values in JQL to 'yyyy-MM-dd HH:mm' format 

266 """ 

267 # Format datetime values in JQL query 

268 for key, value in kwargs.items(): 

269 if isinstance(value, datetime): 

270 formatted_date = value.strftime("%Y-%m-%d %H:%M") 

271 jql = jql.replace(f"{ {key}} ", f"'{formatted_date}'") 

272 

273 # Use the new _make_request method for consistency 

274 params = { 

275 "jql": jql, 

276 "startAt": kwargs.get("start", 0), 

277 "maxResults": kwargs.get("limit", self.config.page_size), 

278 "expand": "changelog", 

279 "fields": "*all", 

280 } 

281 

282 return asyncio.run(self._make_request("GET", "search", params=params)) 

283 

284 async def get_issues( 

285 self, updated_after: datetime | None = None 

286 ) -> AsyncGenerator[JiraIssue, None]: 

287 """ 

288 Get all issues from Jira. 

289 

290 Args: 

291 updated_after: Optional datetime to filter issues updated after this time 

292 

293 Yields: 

294 JiraIssue objects 

295 """ 

296 start_at = 0 

297 page_size = self.config.page_size 

298 total_issues = 0 

299 

300 logger.info( 

301 "🎫 Starting JIRA issue retrieval", 

302 project_key=self.config.project_key, 

303 page_size=page_size, 

304 updated_after=updated_after.isoformat() if updated_after else None, 

305 ) 

306 

307 while True: 

308 jql = f'project = "{self.config.project_key}"' 

309 if updated_after: 

310 jql += f" AND updated >= '{updated_after.strftime('%Y-%m-%d %H:%M')}'" 

311 

312 params = { 

313 "jql": jql, 

314 "startAt": start_at, 

315 "maxResults": page_size, 

316 "expand": "changelog", 

317 "fields": "*all", 

318 } 

319 

320 logger.debug( 

321 "Fetching JIRA issues page", 

322 start_at=start_at, 

323 page_size=page_size, 

324 jql=jql, 

325 ) 

326 

327 try: 

328 response = await self._make_request("GET", "search", params=params) 

329 except Exception as e: 

330 logger.error( 

331 "Failed to fetch JIRA issues page", 

332 start_at=start_at, 

333 page_size=page_size, 

334 error=str(e), 

335 error_type=type(e).__name__, 

336 ) 

337 raise 

338 

339 if not response or not response.get("issues"): 

340 logger.debug( 

341 "No more JIRA issues found, stopping pagination", 

342 start_at=start_at, 

343 total_processed=start_at, 

344 ) 

345 break 

346 

347 issues = response["issues"] 

348 

349 # Update total count if not set 

350 if total_issues == 0: 

351 total_issues = response.get("total", 0) 

352 logger.info(f"🎫 Found {total_issues} JIRA issues to process") 

353 

354 # Log progress every 100 issues instead of every 50 

355 progress_log_interval = 100 

356 

357 for i, issue in enumerate(issues): 

358 try: 

359 parsed_issue = self._parse_issue(issue) 

360 yield parsed_issue 

361 

362 if (start_at + i + 1) % progress_log_interval == 0: 

363 progress_percent = ( 

364 round((start_at + i + 1) / total_issues * 100, 1) 

365 if total_issues > 0 

366 else 0 

367 ) 

368 logger.info( 

369 f"🎫 Progress: {start_at + i + 1}/{total_issues} issues ({progress_percent}%)" 

370 ) 

371 

372 except Exception as e: 

373 logger.error( 

374 "Failed to parse JIRA issue", 

375 issue_id=issue.get("id"), 

376 issue_key=issue.get("key"), 

377 error=str(e), 

378 error_type=type(e).__name__, 

379 ) 

380 # Continue processing other issues instead of failing completely 

381 continue 

382 

383 # Check if we've processed all issues 

384 start_at += len(issues) 

385 if start_at >= total_issues: 

386 logger.info( 

387 f"✅ Completed JIRA issue retrieval: {start_at} issues processed" 

388 ) 

389 break 

390 

391 def _parse_issue(self, raw_issue: dict) -> JiraIssue: 

392 """Parse raw Jira issue data into JiraIssue model. 

393 

394 Args: 

395 raw_issue: Raw issue data from Jira API 

396 

397 Returns: 

398 Parsed JiraIssue 

399 """ 

400 fields = raw_issue["fields"] 

401 parent = fields.get("parent") 

402 parent_key = parent.get("key") if parent else None 

403 

404 # Parse reporter with type assertion since it's required 

405 reporter = self._parse_user(fields["reporter"], required=True) 

406 assert reporter is not None # For type checker 

407 

408 jira_issue = JiraIssue( 

409 id=raw_issue["id"], 

410 key=raw_issue["key"], 

411 summary=fields["summary"], 

412 description=fields.get("description"), 

413 issue_type=fields["issuetype"]["name"], 

414 status=fields["status"]["name"], 

415 priority=( 

416 fields.get("priority", {}).get("name") 

417 if fields.get("priority") 

418 else None 

419 ), 

420 project_key=fields["project"]["key"], 

421 created=datetime.fromisoformat(fields["created"].replace("Z", "+00:00")), 

422 updated=datetime.fromisoformat(fields["updated"].replace("Z", "+00:00")), 

423 reporter=reporter, 

424 assignee=self._parse_user(fields.get("assignee")), 

425 labels=fields.get("labels", []), 

426 attachments=[ 

427 self._parse_attachment(att) for att in fields.get("attachment", []) 

428 ], 

429 comments=[ 

430 self._parse_comment(comment) 

431 for comment in fields.get("comment", {}).get("comments", []) 

432 ], 

433 parent_key=parent_key, 

434 subtasks=[st["key"] for st in fields.get("subtasks", [])], 

435 linked_issues=[ 

436 link["outwardIssue"]["key"] 

437 for link in fields.get("issuelinks", []) 

438 if "outwardIssue" in link 

439 ], 

440 ) 

441 

442 return jira_issue 

443 

444 def _parse_user( 

445 self, raw_user: dict | None, required: bool = False 

446 ) -> JiraUser | None: 

447 """Parse raw Jira user data into JiraUser model. 

448 

449 Args: 

450 raw_user: Raw user data from Jira API 

451 required: Whether this user field is required (e.g., reporter, author) 

452 

453 Returns: 

454 Parsed JiraUser or None if raw_user is None and not required 

455 

456 Raises: 

457 ValueError: If raw_user is None and required is True 

458 """ 

459 if not raw_user: 

460 if required: 

461 raise ValueError("User data is required but not provided") 

462 return None 

463 

464 # Handle different user formats between Cloud and Data Center 

465 # Cloud uses 'accountId', Data Center uses 'name' or 'key' 

466 account_id = raw_user.get("accountId") 

467 if not account_id: 

468 # Fallback to 'name' or 'key' for Data Center 

469 account_id = raw_user.get("name") or raw_user.get("key") 

470 

471 if not account_id: 

472 if required: 

473 raise ValueError( 

474 "User data missing required identifier (accountId, name, or key)" 

475 ) 

476 return None 

477 

478 return JiraUser( 

479 account_id=account_id, 

480 display_name=raw_user["displayName"], 

481 email_address=raw_user.get("emailAddress"), 

482 ) 

483 

484 def _parse_attachment(self, raw_attachment: dict) -> JiraAttachment: 

485 """Parse raw Jira attachment data into JiraAttachment model. 

486 

487 Args: 

488 raw_attachment: Raw attachment data from Jira API 

489 

490 Returns: 

491 Parsed JiraAttachment 

492 """ 

493 # Parse author with type assertion since it's required 

494 author = self._parse_user(raw_attachment["author"], required=True) 

495 assert author is not None # For type checker 

496 

497 return JiraAttachment( 

498 id=raw_attachment["id"], 

499 filename=raw_attachment["filename"], 

500 size=raw_attachment["size"], 

501 mime_type=raw_attachment["mimeType"], 

502 content_url=raw_attachment["content"], 

503 created=datetime.fromisoformat( 

504 raw_attachment["created"].replace("Z", "+00:00") 

505 ), 

506 author=author, 

507 ) 

508 

509 def _parse_comment(self, raw_comment: dict) -> JiraComment: 

510 """Parse raw Jira comment data into JiraComment model. 

511 

512 Args: 

513 raw_comment: Raw comment data from Jira API 

514 

515 Returns: 

516 Parsed JiraComment 

517 """ 

518 # Parse author with type assertion since it's required 

519 author = self._parse_user(raw_comment["author"], required=True) 

520 assert author is not None # For type checker 

521 

522 return JiraComment( 

523 id=raw_comment["id"], 

524 body=raw_comment["body"], 

525 created=datetime.fromisoformat( 

526 raw_comment["created"].replace("Z", "+00:00") 

527 ), 

528 updated=( 

529 datetime.fromisoformat(raw_comment["updated"].replace("Z", "+00:00")) 

530 if "updated" in raw_comment 

531 else None 

532 ), 

533 author=author, 

534 ) 

535 

536 def _get_issue_attachments(self, issue: JiraIssue) -> list[AttachmentMetadata]: 

537 """Convert JIRA issue attachments to AttachmentMetadata objects. 

538 

539 Args: 

540 issue: JIRA issue with attachments 

541 

542 Returns: 

543 List of attachment metadata objects 

544 """ 

545 if not self.config.download_attachments or not issue.attachments: 

546 return [] 

547 

548 attachment_metadata = [] 

549 for attachment in issue.attachments: 

550 metadata = AttachmentMetadata( 

551 id=attachment.id, 

552 filename=attachment.filename, 

553 size=attachment.size, 

554 mime_type=attachment.mime_type, 

555 download_url=str(attachment.content_url), 

556 parent_document_id=issue.id, 

557 created_at=( 

558 attachment.created.isoformat() if attachment.created else None 

559 ), 

560 updated_at=None, # JIRA attachments don't have update timestamps 

561 author=attachment.author.display_name if attachment.author else None, 

562 ) 

563 attachment_metadata.append(metadata) 

564 

565 return attachment_metadata 

566 

567 async def get_documents(self) -> list[Document]: 

568 """Fetch and process documents from Jira. 

569 

570 Returns: 

571 List[Document]: List of processed documents 

572 """ 

573 documents = [] 

574 

575 # Collect all issues 

576 issues = [] 

577 async for issue in self.get_issues(): 

578 issues.append(issue) 

579 

580 # Convert issues to documents 

581 for issue in issues: 

582 # Build content including comments 

583 content_parts = [issue.summary] 

584 if issue.description: 

585 content_parts.append(issue.description) 

586 

587 # Add comments to content 

588 for comment in issue.comments: 

589 content_parts.append( 

590 f"\nComment by {comment.author.display_name} on {comment.created.strftime('%Y-%m-%d %H:%M')}:" 

591 ) 

592 content_parts.append(comment.body) 

593 

594 content = "\n\n".join(content_parts) 

595 

596 base_url = str(self.config.base_url).rstrip("/") 

597 document = Document( 

598 id=issue.id, 

599 content=content, 

600 content_type="text", 

601 source=self.config.source, 

602 source_type=SourceType.JIRA, 

603 created_at=issue.created, 

604 url=f"{base_url}/browse/{issue.key}", 

605 title=issue.summary, 

606 updated_at=issue.updated, 

607 is_deleted=False, 

608 metadata={ 

609 "project": self.config.project_key, 

610 "issue_type": issue.issue_type, 

611 "status": issue.status, 

612 "key": issue.key, 

613 "priority": issue.priority, 

614 "labels": issue.labels, 

615 "reporter": issue.reporter.display_name if issue.reporter else None, 

616 "assignee": issue.assignee.display_name if issue.assignee else None, 

617 "created": issue.created.isoformat(), 

618 "updated": issue.updated.isoformat(), 

619 "parent_key": issue.parent_key, 

620 "subtasks": issue.subtasks, 

621 "linked_issues": issue.linked_issues, 

622 "comments": [ 

623 { 

624 "id": comment.id, 

625 "body": comment.body, 

626 "created": comment.created.isoformat(), 

627 "updated": ( 

628 comment.updated.isoformat() if comment.updated else None 

629 ), 

630 "author": ( 

631 comment.author.display_name if comment.author else None 

632 ), 

633 } 

634 for comment in issue.comments 

635 ], 

636 "attachments": ( 

637 [ 

638 { 

639 "id": att.id, 

640 "filename": att.filename, 

641 "size": att.size, 

642 "mime_type": att.mime_type, 

643 "created": att.created.isoformat(), 

644 "author": ( 

645 att.author.display_name if att.author else None 

646 ), 

647 } 

648 for att in issue.attachments 

649 ] 

650 if issue.attachments 

651 else [] 

652 ), 

653 }, 

654 ) 

655 documents.append(document) 

656 logger.debug( 

657 "Jira document created", 

658 document_id=document.id, 

659 source_type=document.source_type, 

660 source=document.source, 

661 title=document.title, 

662 ) 

663 

664 # Process attachments if enabled 

665 if self.config.download_attachments and self.attachment_downloader: 

666 attachment_metadata = self._get_issue_attachments(issue) 

667 if attachment_metadata: 

668 logger.info( 

669 "Processing attachments for JIRA issue", 

670 issue_key=issue.key, 

671 attachment_count=len(attachment_metadata), 

672 ) 

673 

674 attachment_documents = await self.attachment_downloader.download_and_process_attachments( 

675 attachment_metadata, document 

676 ) 

677 documents.extend(attachment_documents) 

678 

679 logger.debug( 

680 "Processed attachments for JIRA issue", 

681 issue_key=issue.key, 

682 processed_count=len(attachment_documents), 

683 ) 

684 

685 return documents