Coverage for src/qdrant_loader/connectors/jira/connector.py: 75%
203 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Jira connector implementation."""
3import asyncio
4import time
5from collections.abc import AsyncGenerator
6from datetime import datetime
7from urllib.parse import urlparse
9import requests
10from requests.auth import HTTPBasicAuth
12from qdrant_loader.config.types import SourceType
13from qdrant_loader.connectors.base import BaseConnector
14from qdrant_loader.connectors.jira.config import JiraDeploymentType, JiraProjectConfig
15from qdrant_loader.connectors.jira.models import (
16 JiraAttachment,
17 JiraComment,
18 JiraIssue,
19 JiraUser,
20)
21from qdrant_loader.core.attachment_downloader import (
22 AttachmentDownloader,
23 AttachmentMetadata,
24)
25from qdrant_loader.core.document import Document
26from qdrant_loader.core.file_conversion import (
27 FileConversionConfig,
28 FileConverter,
29 FileDetector,
30)
31from qdrant_loader.utils.logging import LoggingConfig
33logger = LoggingConfig.get_logger(__name__)
36class JiraConnector(BaseConnector):
37 """Jira connector for fetching and processing issues."""
39 def __init__(self, config: JiraProjectConfig):
40 """Initialize the Jira connector.
42 Args:
43 config: The Jira configuration.
45 Raises:
46 ValueError: If required authentication parameters are not set.
47 """
48 super().__init__(config)
49 self.config = config
50 self.base_url = str(config.base_url).rstrip("/")
52 # Initialize session
53 self.session = requests.Session()
55 # Set up authentication based on deployment type
56 self._setup_authentication()
58 self._last_sync: datetime | None = None
59 self._rate_limit_lock = asyncio.Lock()
60 self._last_request_time = 0.0
61 self._initialized = False
63 # Initialize file conversion components if enabled
64 self.file_converter: FileConverter | None = None
65 self.file_detector: FileDetector | None = None
66 self.attachment_downloader: AttachmentDownloader | None = None
68 if config.enable_file_conversion:
69 self.file_detector = FileDetector()
70 # FileConverter will be initialized when file_conversion_config is set
72 if config.download_attachments:
73 self.attachment_downloader = AttachmentDownloader(session=self.session)
75 def _setup_authentication(self):
76 """Set up authentication based on deployment type."""
77 if self.config.deployment_type == JiraDeploymentType.CLOUD:
78 # Cloud uses Basic Auth with email:api_token
79 if not self.config.token:
80 raise ValueError("API token is required for Jira Cloud")
81 if not self.config.email:
82 raise ValueError("Email is required for Jira Cloud")
84 self.session.auth = HTTPBasicAuth(self.config.email, self.config.token)
85 logger.debug(
86 "Configured Jira Cloud authentication with email and API token"
87 )
89 else:
90 # Data Center/Server uses Personal Access Token with Bearer authentication
91 if not self.config.token:
92 raise ValueError(
93 "Personal Access Token is required for Jira Data Center/Server"
94 )
96 self.session.headers.update(
97 {
98 "Authorization": f"Bearer {self.config.token}",
99 "Content-Type": "application/json",
100 }
101 )
102 logger.debug(
103 "Configured Jira Data Center authentication with Personal Access Token"
104 )
106 def _auto_detect_deployment_type(self) -> JiraDeploymentType:
107 """Auto-detect the Jira deployment type based on the base URL.
109 Returns:
110 JiraDeploymentType: Detected deployment type
111 """
112 try:
113 parsed_url = urlparse(str(self.base_url))
114 hostname = parsed_url.hostname
116 if hostname is None:
117 # If we can't parse the hostname, default to DATACENTER
118 return JiraDeploymentType.DATACENTER
120 # Cloud instances use *.atlassian.net domains
121 # Use proper hostname checking with endswith to ensure it's a subdomain
122 if hostname.endswith(".atlassian.net") or hostname == "atlassian.net":
123 return JiraDeploymentType.CLOUD
125 # Everything else is likely Data Center/Server
126 return JiraDeploymentType.DATACENTER
127 except Exception:
128 # If URL parsing fails, default to DATACENTER
129 return JiraDeploymentType.DATACENTER
131 def set_file_conversion_config(self, config: FileConversionConfig) -> None:
132 """Set the file conversion configuration.
134 Args:
135 config: File conversion configuration
136 """
137 if self.config.enable_file_conversion and self.file_detector:
138 self.file_converter = FileConverter(config)
139 if self.config.download_attachments:
140 # Reinitialize attachment downloader with file conversion config
141 self.attachment_downloader = AttachmentDownloader(
142 session=self.session,
143 file_conversion_config=config,
144 enable_file_conversion=True,
145 max_attachment_size=config.max_file_size,
146 )
148 async def __aenter__(self):
149 """Async context manager entry."""
150 if not self._initialized:
151 self._initialized = True
152 return self
154 async def __aexit__(self, exc_type, exc_val, exc_tb):
155 """Async context manager exit."""
156 self._initialized = False
158 def _get_api_url(self, endpoint: str) -> str:
159 """Construct the full API URL for an endpoint.
161 Args:
162 endpoint: API endpoint path
164 Returns:
165 str: Full API URL
166 """
167 return f"{self.base_url}/rest/api/2/{endpoint}"
169 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict:
170 """Make an authenticated request to the Jira API.
172 Args:
173 method: HTTP method
174 endpoint: API endpoint path
175 **kwargs: Additional request parameters
177 Returns:
178 dict: Response data
180 Raises:
181 requests.exceptions.RequestException: If the request fails
182 """
183 async with self._rate_limit_lock:
184 # Calculate time to wait based on rate limit
185 min_interval = 60.0 / self.config.requests_per_minute
186 now = time.time()
187 time_since_last_request = now - self._last_request_time
189 if time_since_last_request < min_interval:
190 await asyncio.sleep(min_interval - time_since_last_request)
192 self._last_request_time = time.time()
194 url = self._get_api_url(endpoint)
196 # Add timeout to kwargs if not already specified
197 if "timeout" not in kwargs:
198 kwargs["timeout"] = 60 # 60 second timeout for HTTP requests
200 try:
201 logger.debug(
202 "Making JIRA API request",
203 method=method,
204 endpoint=endpoint,
205 url=url,
206 timeout=kwargs.get("timeout"),
207 )
209 # For Data Center with PAT, headers are already set
210 # For Cloud, use session auth
211 if not self.session.headers.get("Authorization"):
212 kwargs["auth"] = self.session.auth
214 # Use asyncio.wait_for to add an additional timeout layer
215 response = await asyncio.wait_for(
216 asyncio.to_thread(self.session.request, method, url, **kwargs),
217 timeout=90.0, # 90 second timeout for the entire operation
218 )
220 response.raise_for_status()
222 logger.debug(
223 "JIRA API request completed successfully",
224 method=method,
225 endpoint=endpoint,
226 status_code=response.status_code,
227 response_size=(
228 len(response.content) if hasattr(response, "content") else 0
229 ),
230 )
232 return response.json()
234 except asyncio.TimeoutError:
235 logger.error(
236 "JIRA API request timed out",
237 method=method,
238 url=url,
239 timeout=kwargs.get("timeout"),
240 )
241 raise requests.exceptions.Timeout(
242 f"Request to {url} timed out after {kwargs.get('timeout')} seconds"
243 )
245 except requests.exceptions.RequestException as e:
246 logger.error(
247 "Failed to make request to JIRA API",
248 method=method,
249 url=url,
250 error=str(e),
251 error_type=type(e).__name__,
252 )
253 # Log additional context for debugging
254 logger.error(
255 "Request details",
256 deployment_type=self.config.deployment_type,
257 has_auth_header=bool(self.session.headers.get("Authorization")),
258 has_session_auth=bool(self.session.auth),
259 )
260 raise
262 def _make_sync_request(self, jql: str, **kwargs):
263 """
264 Make a synchronous request to the Jira API, converting parameters as needed:
265 - Format datetime values in JQL to 'yyyy-MM-dd HH:mm' format
266 """
267 # Format datetime values in JQL query
268 for key, value in kwargs.items():
269 if isinstance(value, datetime):
270 formatted_date = value.strftime("%Y-%m-%d %H:%M")
271 jql = jql.replace(f"{ {key}} ", f"'{formatted_date}'")
273 # Use the new _make_request method for consistency
274 params = {
275 "jql": jql,
276 "startAt": kwargs.get("start", 0),
277 "maxResults": kwargs.get("limit", self.config.page_size),
278 "expand": "changelog",
279 "fields": "*all",
280 }
282 return asyncio.run(self._make_request("GET", "search", params=params))
284 async def get_issues(
285 self, updated_after: datetime | None = None
286 ) -> AsyncGenerator[JiraIssue, None]:
287 """
288 Get all issues from Jira.
290 Args:
291 updated_after: Optional datetime to filter issues updated after this time
293 Yields:
294 JiraIssue objects
295 """
296 start_at = 0
297 page_size = self.config.page_size
298 total_issues = 0
300 logger.info(
301 "🎫 Starting JIRA issue retrieval",
302 project_key=self.config.project_key,
303 page_size=page_size,
304 updated_after=updated_after.isoformat() if updated_after else None,
305 )
307 while True:
308 jql = f'project = "{self.config.project_key}"'
309 if updated_after:
310 jql += f" AND updated >= '{updated_after.strftime('%Y-%m-%d %H:%M')}'"
312 params = {
313 "jql": jql,
314 "startAt": start_at,
315 "maxResults": page_size,
316 "expand": "changelog",
317 "fields": "*all",
318 }
320 logger.debug(
321 "Fetching JIRA issues page",
322 start_at=start_at,
323 page_size=page_size,
324 jql=jql,
325 )
327 try:
328 response = await self._make_request("GET", "search", params=params)
329 except Exception as e:
330 logger.error(
331 "Failed to fetch JIRA issues page",
332 start_at=start_at,
333 page_size=page_size,
334 error=str(e),
335 error_type=type(e).__name__,
336 )
337 raise
339 if not response or not response.get("issues"):
340 logger.debug(
341 "No more JIRA issues found, stopping pagination",
342 start_at=start_at,
343 total_processed=start_at,
344 )
345 break
347 issues = response["issues"]
349 # Update total count if not set
350 if total_issues == 0:
351 total_issues = response.get("total", 0)
352 logger.info(f"🎫 Found {total_issues} JIRA issues to process")
354 # Log progress every 100 issues instead of every 50
355 progress_log_interval = 100
357 for i, issue in enumerate(issues):
358 try:
359 parsed_issue = self._parse_issue(issue)
360 yield parsed_issue
362 if (start_at + i + 1) % progress_log_interval == 0:
363 progress_percent = (
364 round((start_at + i + 1) / total_issues * 100, 1)
365 if total_issues > 0
366 else 0
367 )
368 logger.info(
369 f"🎫 Progress: {start_at + i + 1}/{total_issues} issues ({progress_percent}%)"
370 )
372 except Exception as e:
373 logger.error(
374 "Failed to parse JIRA issue",
375 issue_id=issue.get("id"),
376 issue_key=issue.get("key"),
377 error=str(e),
378 error_type=type(e).__name__,
379 )
380 # Continue processing other issues instead of failing completely
381 continue
383 # Check if we've processed all issues
384 start_at += len(issues)
385 if start_at >= total_issues:
386 logger.info(
387 f"✅ Completed JIRA issue retrieval: {start_at} issues processed"
388 )
389 break
391 def _parse_issue(self, raw_issue: dict) -> JiraIssue:
392 """Parse raw Jira issue data into JiraIssue model.
394 Args:
395 raw_issue: Raw issue data from Jira API
397 Returns:
398 Parsed JiraIssue
399 """
400 fields = raw_issue["fields"]
401 parent = fields.get("parent")
402 parent_key = parent.get("key") if parent else None
404 # Parse reporter with type assertion since it's required
405 reporter = self._parse_user(fields["reporter"], required=True)
406 assert reporter is not None # For type checker
408 jira_issue = JiraIssue(
409 id=raw_issue["id"],
410 key=raw_issue["key"],
411 summary=fields["summary"],
412 description=fields.get("description"),
413 issue_type=fields["issuetype"]["name"],
414 status=fields["status"]["name"],
415 priority=(
416 fields.get("priority", {}).get("name")
417 if fields.get("priority")
418 else None
419 ),
420 project_key=fields["project"]["key"],
421 created=datetime.fromisoformat(fields["created"].replace("Z", "+00:00")),
422 updated=datetime.fromisoformat(fields["updated"].replace("Z", "+00:00")),
423 reporter=reporter,
424 assignee=self._parse_user(fields.get("assignee")),
425 labels=fields.get("labels", []),
426 attachments=[
427 self._parse_attachment(att) for att in fields.get("attachment", [])
428 ],
429 comments=[
430 self._parse_comment(comment)
431 for comment in fields.get("comment", {}).get("comments", [])
432 ],
433 parent_key=parent_key,
434 subtasks=[st["key"] for st in fields.get("subtasks", [])],
435 linked_issues=[
436 link["outwardIssue"]["key"]
437 for link in fields.get("issuelinks", [])
438 if "outwardIssue" in link
439 ],
440 )
442 return jira_issue
444 def _parse_user(
445 self, raw_user: dict | None, required: bool = False
446 ) -> JiraUser | None:
447 """Parse raw Jira user data into JiraUser model.
449 Args:
450 raw_user: Raw user data from Jira API
451 required: Whether this user field is required (e.g., reporter, author)
453 Returns:
454 Parsed JiraUser or None if raw_user is None and not required
456 Raises:
457 ValueError: If raw_user is None and required is True
458 """
459 if not raw_user:
460 if required:
461 raise ValueError("User data is required but not provided")
462 return None
464 # Handle different user formats between Cloud and Data Center
465 # Cloud uses 'accountId', Data Center uses 'name' or 'key'
466 account_id = raw_user.get("accountId")
467 if not account_id:
468 # Fallback to 'name' or 'key' for Data Center
469 account_id = raw_user.get("name") or raw_user.get("key")
471 if not account_id:
472 if required:
473 raise ValueError(
474 "User data missing required identifier (accountId, name, or key)"
475 )
476 return None
478 return JiraUser(
479 account_id=account_id,
480 display_name=raw_user["displayName"],
481 email_address=raw_user.get("emailAddress"),
482 )
484 def _parse_attachment(self, raw_attachment: dict) -> JiraAttachment:
485 """Parse raw Jira attachment data into JiraAttachment model.
487 Args:
488 raw_attachment: Raw attachment data from Jira API
490 Returns:
491 Parsed JiraAttachment
492 """
493 # Parse author with type assertion since it's required
494 author = self._parse_user(raw_attachment["author"], required=True)
495 assert author is not None # For type checker
497 return JiraAttachment(
498 id=raw_attachment["id"],
499 filename=raw_attachment["filename"],
500 size=raw_attachment["size"],
501 mime_type=raw_attachment["mimeType"],
502 content_url=raw_attachment["content"],
503 created=datetime.fromisoformat(
504 raw_attachment["created"].replace("Z", "+00:00")
505 ),
506 author=author,
507 )
509 def _parse_comment(self, raw_comment: dict) -> JiraComment:
510 """Parse raw Jira comment data into JiraComment model.
512 Args:
513 raw_comment: Raw comment data from Jira API
515 Returns:
516 Parsed JiraComment
517 """
518 # Parse author with type assertion since it's required
519 author = self._parse_user(raw_comment["author"], required=True)
520 assert author is not None # For type checker
522 return JiraComment(
523 id=raw_comment["id"],
524 body=raw_comment["body"],
525 created=datetime.fromisoformat(
526 raw_comment["created"].replace("Z", "+00:00")
527 ),
528 updated=(
529 datetime.fromisoformat(raw_comment["updated"].replace("Z", "+00:00"))
530 if "updated" in raw_comment
531 else None
532 ),
533 author=author,
534 )
536 def _get_issue_attachments(self, issue: JiraIssue) -> list[AttachmentMetadata]:
537 """Convert JIRA issue attachments to AttachmentMetadata objects.
539 Args:
540 issue: JIRA issue with attachments
542 Returns:
543 List of attachment metadata objects
544 """
545 if not self.config.download_attachments or not issue.attachments:
546 return []
548 attachment_metadata = []
549 for attachment in issue.attachments:
550 metadata = AttachmentMetadata(
551 id=attachment.id,
552 filename=attachment.filename,
553 size=attachment.size,
554 mime_type=attachment.mime_type,
555 download_url=str(attachment.content_url),
556 parent_document_id=issue.id,
557 created_at=(
558 attachment.created.isoformat() if attachment.created else None
559 ),
560 updated_at=None, # JIRA attachments don't have update timestamps
561 author=attachment.author.display_name if attachment.author else None,
562 )
563 attachment_metadata.append(metadata)
565 return attachment_metadata
567 async def get_documents(self) -> list[Document]:
568 """Fetch and process documents from Jira.
570 Returns:
571 List[Document]: List of processed documents
572 """
573 documents = []
575 # Collect all issues
576 issues = []
577 async for issue in self.get_issues():
578 issues.append(issue)
580 # Convert issues to documents
581 for issue in issues:
582 # Build content including comments
583 content_parts = [issue.summary]
584 if issue.description:
585 content_parts.append(issue.description)
587 # Add comments to content
588 for comment in issue.comments:
589 content_parts.append(
590 f"\nComment by {comment.author.display_name} on {comment.created.strftime('%Y-%m-%d %H:%M')}:"
591 )
592 content_parts.append(comment.body)
594 content = "\n\n".join(content_parts)
596 base_url = str(self.config.base_url).rstrip("/")
597 document = Document(
598 id=issue.id,
599 content=content,
600 content_type="text",
601 source=self.config.source,
602 source_type=SourceType.JIRA,
603 created_at=issue.created,
604 url=f"{base_url}/browse/{issue.key}",
605 title=issue.summary,
606 updated_at=issue.updated,
607 is_deleted=False,
608 metadata={
609 "project": self.config.project_key,
610 "issue_type": issue.issue_type,
611 "status": issue.status,
612 "key": issue.key,
613 "priority": issue.priority,
614 "labels": issue.labels,
615 "reporter": issue.reporter.display_name if issue.reporter else None,
616 "assignee": issue.assignee.display_name if issue.assignee else None,
617 "created": issue.created.isoformat(),
618 "updated": issue.updated.isoformat(),
619 "parent_key": issue.parent_key,
620 "subtasks": issue.subtasks,
621 "linked_issues": issue.linked_issues,
622 "comments": [
623 {
624 "id": comment.id,
625 "body": comment.body,
626 "created": comment.created.isoformat(),
627 "updated": (
628 comment.updated.isoformat() if comment.updated else None
629 ),
630 "author": (
631 comment.author.display_name if comment.author else None
632 ),
633 }
634 for comment in issue.comments
635 ],
636 "attachments": (
637 [
638 {
639 "id": att.id,
640 "filename": att.filename,
641 "size": att.size,
642 "mime_type": att.mime_type,
643 "created": att.created.isoformat(),
644 "author": (
645 att.author.display_name if att.author else None
646 ),
647 }
648 for att in issue.attachments
649 ]
650 if issue.attachments
651 else []
652 ),
653 },
654 )
655 documents.append(document)
656 logger.debug(
657 "Jira document created",
658 document_id=document.id,
659 source_type=document.source_type,
660 source=document.source,
661 title=document.title,
662 )
664 # Process attachments if enabled
665 if self.config.download_attachments and self.attachment_downloader:
666 attachment_metadata = self._get_issue_attachments(issue)
667 if attachment_metadata:
668 logger.info(
669 "Processing attachments for JIRA issue",
670 issue_key=issue.key,
671 attachment_count=len(attachment_metadata),
672 )
674 attachment_documents = await self.attachment_downloader.download_and_process_attachments(
675 attachment_metadata, document
676 )
677 documents.extend(attachment_documents)
679 logger.debug(
680 "Processed attachments for JIRA issue",
681 issue_key=issue.key,
682 processed_count=len(attachment_documents),
683 )
685 return documents