Coverage for src/qdrant_loader/connectors/jira/connector.py: 66%

187 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Jira connector implementation.""" 

2 

3import asyncio 

4from collections.abc import AsyncGenerator 

5from datetime import datetime 

6from urllib.parse import urlparse # noqa: F401 - may be used in URL handling 

7 

8import requests 

9from requests.auth import HTTPBasicAuth # noqa: F401 - compatibility 

10 

11from qdrant_loader.config.types import SourceType 

12from qdrant_loader.connectors.base import BaseConnector 

13from qdrant_loader.connectors.jira.auth import ( 

14 auto_detect_deployment_type as _auto_detect_type, 

15) 

16from qdrant_loader.connectors.jira.auth import setup_authentication as _setup_auth 

17from qdrant_loader.connectors.jira.config import JiraDeploymentType, JiraProjectConfig 

18from qdrant_loader.connectors.jira.mappers import ( 

19 parse_attachment as _parse_attachment_helper, 

20) 

21from qdrant_loader.connectors.jira.mappers import parse_comment as _parse_comment_helper 

22from qdrant_loader.connectors.jira.mappers import parse_issue as _parse_issue_helper 

23from qdrant_loader.connectors.jira.mappers import parse_user as _parse_user_helper 

24from qdrant_loader.connectors.jira.models import ( 

25 JiraAttachment, 

26 JiraComment, 

27 JiraIssue, 

28 JiraUser, 

29) 

30from qdrant_loader.connectors.shared.attachments import AttachmentReader 

31from qdrant_loader.connectors.shared.attachments.metadata import ( 

32 jira_attachment_to_metadata, 

33) 

34from qdrant_loader.connectors.shared.http import ( 

35 RateLimiter, 

36) 

37from qdrant_loader.connectors.shared.http import ( 

38 request_with_policy as _http_request_with_policy, 

39) 

40from qdrant_loader.core.attachment_downloader import ( 

41 AttachmentDownloader, 

42 AttachmentMetadata, 

43) 

44from qdrant_loader.core.document import Document 

45from qdrant_loader.core.file_conversion import ( 

46 FileConversionConfig, 

47 FileConverter, 

48 FileDetector, 

49) 

50from qdrant_loader.utils.logging import LoggingConfig 

51 

52logger = LoggingConfig.get_logger(__name__) 

53 

54 

55class JiraConnector(BaseConnector): 

56 """Jira connector for fetching and processing issues.""" 

57 

58 def __init__(self, config: JiraProjectConfig): 

59 """Initialize the Jira connector. 

60 

61 Args: 

62 config: The Jira configuration. 

63 

64 Raises: 

65 ValueError: If required authentication parameters are not set. 

66 """ 

67 super().__init__(config) 

68 self.config = config 

69 self.base_url = str(config.base_url).rstrip("/") 

70 

71 # Initialize session 

72 self.session = requests.Session() 

73 

74 # Set up authentication based on deployment type 

75 self._setup_authentication() 

76 

77 self._last_sync: datetime | None = None 

78 self._rate_limiter = RateLimiter.per_minute(self.config.requests_per_minute) 

79 self._initialized = False 

80 

81 # Initialize file conversion components if enabled 

82 self.file_converter: FileConverter | None = None 

83 self.file_detector: FileDetector | None = None 

84 self.attachment_reader: AttachmentReader | None = None 

85 

86 if config.enable_file_conversion: 

87 self.file_detector = FileDetector() 

88 # FileConverter will be initialized when file_conversion_config is set 

89 

90 if config.download_attachments: 

91 self.attachment_reader = AttachmentReader( 

92 session=self.session, 

93 downloader=AttachmentDownloader(session=self.session), 

94 ) 

95 

96 def _setup_authentication(self): 

97 """Set up authentication based on deployment type.""" 

98 _setup_auth(self.session, self.config) 

99 

100 def _auto_detect_deployment_type(self) -> JiraDeploymentType: 

101 """Auto-detect the Jira deployment type based on the base URL. 

102 

103 Returns: 

104 JiraDeploymentType: Detected deployment type 

105 """ 

106 return _auto_detect_type(str(self.base_url)) 

107 

108 def set_file_conversion_config(self, config: FileConversionConfig) -> None: 

109 """Set the file conversion configuration. 

110 

111 Args: 

112 config: File conversion configuration 

113 """ 

114 if self.config.enable_file_conversion and self.file_detector: 

115 self.file_converter = FileConverter(config) 

116 if self.config.download_attachments: 

117 # Clean up any existing attachment reader to avoid resource leaks 

118 old_reader = self.attachment_reader 

119 if old_reader is not None: 

120 try: 

121 close_callable = None 

122 if hasattr(old_reader, "aclose"): 

123 close_callable = old_reader.aclose 

124 elif hasattr(old_reader, "close"): 

125 close_callable = old_reader.close 

126 elif hasattr(old_reader, "cleanup"): 

127 close_callable = old_reader.cleanup 

128 

129 if close_callable is not None: 

130 result = close_callable() 

131 if asyncio.iscoroutine(result): 

132 try: 

133 # Try to schedule/await coroutine cleanup safely 

134 try: 

135 loop = asyncio.get_running_loop() 

136 except RuntimeError: 

137 loop = None 

138 if loop and not loop.is_closed(): 

139 loop.create_task(result) 

140 else: 

141 asyncio.run(result) 

142 except Exception: 

143 # Ignore cleanup errors to not block reconfiguration 

144 pass 

145 except Exception: 

146 # Ignore cleanup errors to avoid masking the config update 

147 pass 

148 

149 # Drop reference before creating a new reader 

150 self.attachment_reader = None 

151 

152 # Reinitialize reader with new downloader config 

153 self.attachment_reader = AttachmentReader( 

154 session=self.session, 

155 downloader=AttachmentDownloader( 

156 session=self.session, 

157 file_conversion_config=config, 

158 enable_file_conversion=True, 

159 max_attachment_size=config.max_file_size, 

160 ), 

161 ) 

162 

163 async def __aenter__(self): 

164 """Async context manager entry.""" 

165 if not self._initialized: 

166 self._initialized = True 

167 return self 

168 

169 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

170 """Async context manager exit.""" 

171 self._initialized = False 

172 

173 def _get_api_url(self, endpoint: str) -> str: 

174 """Construct the full API URL for an endpoint. 

175 

176 Args: 

177 endpoint: API endpoint path 

178 

179 Returns: 

180 str: Full API URL 

181 """ 

182 return f"{self.base_url}/rest/api/2/{endpoint}" 

183 

184 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict: 

185 """Make an authenticated request to the Jira API. 

186 

187 Args: 

188 method: HTTP method 

189 endpoint: API endpoint path 

190 **kwargs: Additional request parameters 

191 

192 Returns: 

193 dict: Response data 

194 

195 Raises: 

196 requests.exceptions.RequestException: If the request fails 

197 """ 

198 url = self._get_api_url(endpoint) 

199 

200 if "timeout" not in kwargs: 

201 kwargs["timeout"] = 60 

202 

203 try: 

204 logger.debug( 

205 "Making JIRA API request", 

206 method=method, 

207 endpoint=endpoint, 

208 url=url, 

209 timeout=kwargs.get("timeout"), 

210 ) 

211 

212 if not self.session.headers.get("Authorization"): 

213 kwargs["auth"] = self.session.auth 

214 

215 response = await _http_request_with_policy( 

216 self.session, 

217 method, 

218 url, 

219 rate_limiter=self._rate_limiter, 

220 retries=3, 

221 backoff_factor=0.5, 

222 status_forcelist=(429, 500, 502, 503, 504), 

223 overall_timeout=90.0, 

224 **kwargs, 

225 ) 

226 

227 response.raise_for_status() 

228 

229 logger.debug( 

230 "JIRA API request completed successfully", 

231 method=method, 

232 endpoint=endpoint, 

233 status_code=response.status_code, 

234 response_size=( 

235 len(response.content) if hasattr(response, "content") else 0 

236 ), 

237 ) 

238 

239 return response.json() 

240 

241 except TimeoutError: 

242 logger.error( 

243 "JIRA API request timed out", 

244 method=method, 

245 url=url, 

246 timeout=kwargs.get("timeout"), 

247 ) 

248 raise requests.exceptions.Timeout( 

249 f"Request to {url} timed out after {kwargs.get('timeout')} seconds" 

250 ) 

251 

252 except requests.exceptions.RequestException as e: 

253 logger.error( 

254 "Failed to make request to JIRA API", 

255 method=method, 

256 url=url, 

257 error=str(e), 

258 error_type=type(e).__name__, 

259 ) 

260 logger.error( 

261 "Request details", 

262 deployment_type=self.config.deployment_type, 

263 has_auth_header=bool(self.session.headers.get("Authorization")), 

264 has_session_auth=bool(self.session.auth), 

265 ) 

266 raise 

267 

268 def _make_sync_request(self, jql: str, **kwargs): 

269 """ 

270 Make a synchronous request to the Jira API, converting parameters as needed: 

271 - Format datetime values in JQL to 'yyyy-MM-dd HH:mm' format 

272 """ 

273 # Format datetime values in JQL query 

274 for key, value in kwargs.items(): 

275 if isinstance(value, datetime): 

276 formatted_date = value.strftime("%Y-%m-%d %H:%M") 

277 jql = jql.replace(f"{{{key}}}", f"'{formatted_date}'") 

278 

279 # Use the new _make_request method for consistency 

280 params = { 

281 "jql": jql, 

282 "startAt": kwargs.get("start", 0), 

283 "maxResults": kwargs.get("limit", self.config.page_size), 

284 "expand": "changelog", 

285 "fields": "*all", 

286 } 

287 

288 return asyncio.run(self._make_request("GET", "search", params=params)) 

289 

290 async def get_issues( 

291 self, updated_after: datetime | None = None 

292 ) -> AsyncGenerator[JiraIssue, None]: 

293 """ 

294 Get all issues from Jira. 

295 

296 Args: 

297 updated_after: Optional datetime to filter issues updated after this time 

298 

299 Yields: 

300 JiraIssue objects 

301 """ 

302 start_at = 0 

303 page_size = self.config.page_size 

304 total_issues = 0 

305 

306 logger.info( 

307 "🎫 Starting JIRA issue retrieval", 

308 project_key=self.config.project_key, 

309 page_size=page_size, 

310 updated_after=updated_after.isoformat() if updated_after else None, 

311 ) 

312 

313 while True: 

314 jql = f'project = "{self.config.project_key}"' 

315 if updated_after: 

316 jql += f" AND updated >= '{updated_after.strftime('%Y-%m-%d %H:%M')}'" 

317 

318 params = { 

319 "jql": jql, 

320 "startAt": start_at, 

321 "maxResults": page_size, 

322 "expand": "changelog", 

323 "fields": "*all", 

324 } 

325 

326 logger.debug( 

327 "Fetching JIRA issues page", 

328 start_at=start_at, 

329 page_size=page_size, 

330 jql=jql, 

331 ) 

332 

333 try: 

334 response = await self._make_request("GET", "search", params=params) 

335 except Exception as e: 

336 logger.error( 

337 "Failed to fetch JIRA issues page", 

338 start_at=start_at, 

339 page_size=page_size, 

340 error=str(e), 

341 error_type=type(e).__name__, 

342 ) 

343 raise 

344 

345 if not response or not response.get("issues"): 

346 logger.debug( 

347 "No more JIRA issues found, stopping pagination", 

348 start_at=start_at, 

349 total_processed=start_at, 

350 ) 

351 break 

352 

353 issues = response["issues"] 

354 

355 # Update total count if not set 

356 if total_issues == 0: 

357 total_issues = response.get("total", 0) 

358 logger.info(f"🎫 Found {total_issues} JIRA issues to process") 

359 

360 # Log progress every 100 issues instead of every 50 

361 progress_log_interval = 100 

362 

363 for i, issue in enumerate(issues): 

364 try: 

365 parsed_issue = self._parse_issue(issue) 

366 yield parsed_issue 

367 

368 if (start_at + i + 1) % progress_log_interval == 0: 

369 progress_percent = ( 

370 round((start_at + i + 1) / total_issues * 100, 1) 

371 if total_issues > 0 

372 else 0 

373 ) 

374 logger.info( 

375 f"🎫 Progress: {start_at + i + 1}/{total_issues} issues ({progress_percent}%)" 

376 ) 

377 

378 except Exception as e: 

379 logger.error( 

380 "Failed to parse JIRA issue", 

381 issue_id=issue.get("id"), 

382 issue_key=issue.get("key"), 

383 error=str(e), 

384 error_type=type(e).__name__, 

385 ) 

386 # Continue processing other issues instead of failing completely 

387 continue 

388 

389 # Check if we've processed all issues 

390 start_at += len(issues) 

391 if start_at >= total_issues: 

392 logger.info( 

393 f"✅ Completed JIRA issue retrieval: {start_at} issues processed" 

394 ) 

395 break 

396 

397 def _parse_issue(self, raw_issue: dict) -> JiraIssue: 

398 return _parse_issue_helper(raw_issue) 

399 

400 def _parse_user( 

401 self, raw_user: dict | None, required: bool = False 

402 ) -> JiraUser | None: 

403 return _parse_user_helper(raw_user, required) 

404 

405 def _parse_attachment(self, raw_attachment: dict) -> JiraAttachment: 

406 return _parse_attachment_helper(raw_attachment) 

407 

408 def _parse_comment(self, raw_comment: dict) -> JiraComment: 

409 return _parse_comment_helper(raw_comment) 

410 

411 def _get_issue_attachments(self, issue: JiraIssue) -> list[AttachmentMetadata]: 

412 """Convert JIRA issue attachments to AttachmentMetadata objects. 

413 

414 Args: 

415 issue: JIRA issue with attachments 

416 

417 Returns: 

418 List of attachment metadata objects 

419 """ 

420 if not self.config.download_attachments or not issue.attachments: 

421 return [] 

422 

423 attachment_metadata = [ 

424 jira_attachment_to_metadata(att, parent_id=issue.id) 

425 for att in issue.attachments 

426 ] 

427 

428 return attachment_metadata 

429 

430 async def get_documents(self) -> list[Document]: 

431 """Fetch and process documents from Jira. 

432 

433 Returns: 

434 List[Document]: List of processed documents 

435 """ 

436 documents = [] 

437 

438 # Collect all issues 

439 issues = [] 

440 async for issue in self.get_issues(): 

441 issues.append(issue) 

442 

443 # Convert issues to documents 

444 for issue in issues: 

445 # Build content including comments 

446 content_parts = [issue.summary] 

447 if issue.description: 

448 content_parts.append(issue.description) 

449 

450 # Add comments to content 

451 for comment in issue.comments: 

452 content_parts.append( 

453 f"\nComment by {comment.author.display_name} on {comment.created.strftime('%Y-%m-%d %H:%M')}:" 

454 ) 

455 content_parts.append(comment.body) 

456 

457 content = "\n\n".join(content_parts) 

458 

459 base_url = str(self.config.base_url).rstrip("/") 

460 document = Document( 

461 id=issue.id, 

462 content=content, 

463 content_type="text", 

464 source=self.config.source, 

465 source_type=SourceType.JIRA, 

466 created_at=issue.created, 

467 url=f"{base_url}/browse/{issue.key}", 

468 title=issue.summary, 

469 updated_at=issue.updated, 

470 is_deleted=False, 

471 metadata={ 

472 "project": self.config.project_key, 

473 "issue_type": issue.issue_type, 

474 "status": issue.status, 

475 "key": issue.key, 

476 "priority": issue.priority, 

477 "labels": issue.labels, 

478 "reporter": issue.reporter.display_name if issue.reporter else None, 

479 "assignee": issue.assignee.display_name if issue.assignee else None, 

480 "created": issue.created.isoformat(), 

481 "updated": issue.updated.isoformat(), 

482 "parent_key": issue.parent_key, 

483 "subtasks": issue.subtasks, 

484 "linked_issues": issue.linked_issues, 

485 "comments": [ 

486 { 

487 "id": comment.id, 

488 "body": comment.body, 

489 "created": comment.created.isoformat(), 

490 "updated": ( 

491 comment.updated.isoformat() if comment.updated else None 

492 ), 

493 "author": ( 

494 comment.author.display_name if comment.author else None 

495 ), 

496 } 

497 for comment in issue.comments 

498 ], 

499 "attachments": ( 

500 [ 

501 { 

502 "id": att.id, 

503 "filename": att.filename, 

504 "size": att.size, 

505 "mime_type": att.mime_type, 

506 "created": att.created.isoformat(), 

507 "author": ( 

508 att.author.display_name if att.author else None 

509 ), 

510 } 

511 for att in issue.attachments 

512 ] 

513 if issue.attachments 

514 else [] 

515 ), 

516 }, 

517 ) 

518 documents.append(document) 

519 logger.debug( 

520 "Jira document created", 

521 document_id=document.id, 

522 source_type=document.source_type, 

523 source=document.source, 

524 title=document.title, 

525 ) 

526 

527 # Process attachments if enabled 

528 if self.config.download_attachments and self.attachment_reader: 

529 attachment_metadata = self._get_issue_attachments(issue) 

530 if attachment_metadata: 

531 logger.info( 

532 "Processing attachments for JIRA issue", 

533 issue_key=issue.key, 

534 attachment_count=len(attachment_metadata), 

535 ) 

536 

537 attachment_documents = ( 

538 await self.attachment_reader.fetch_and_process( 

539 attachment_metadata, document 

540 ) 

541 ) 

542 documents.extend(attachment_documents) 

543 

544 logger.debug( 

545 "Processed attachments for JIRA issue", 

546 issue_key=issue.key, 

547 processed_count=len(attachment_documents), 

548 ) 

549 

550 return documents