Coverage for src/qdrant_loader/connectors/jira/connector.py: 66%
187 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Jira connector implementation."""
3import asyncio
4from collections.abc import AsyncGenerator
5from datetime import datetime
6from urllib.parse import urlparse # noqa: F401 - may be used in URL handling
8import requests
9from requests.auth import HTTPBasicAuth # noqa: F401 - compatibility
11from qdrant_loader.config.types import SourceType
12from qdrant_loader.connectors.base import BaseConnector
13from qdrant_loader.connectors.jira.auth import (
14 auto_detect_deployment_type as _auto_detect_type,
15)
16from qdrant_loader.connectors.jira.auth import setup_authentication as _setup_auth
17from qdrant_loader.connectors.jira.config import JiraDeploymentType, JiraProjectConfig
18from qdrant_loader.connectors.jira.mappers import (
19 parse_attachment as _parse_attachment_helper,
20)
21from qdrant_loader.connectors.jira.mappers import parse_comment as _parse_comment_helper
22from qdrant_loader.connectors.jira.mappers import parse_issue as _parse_issue_helper
23from qdrant_loader.connectors.jira.mappers import parse_user as _parse_user_helper
24from qdrant_loader.connectors.jira.models import (
25 JiraAttachment,
26 JiraComment,
27 JiraIssue,
28 JiraUser,
29)
30from qdrant_loader.connectors.shared.attachments import AttachmentReader
31from qdrant_loader.connectors.shared.attachments.metadata import (
32 jira_attachment_to_metadata,
33)
34from qdrant_loader.connectors.shared.http import (
35 RateLimiter,
36)
37from qdrant_loader.connectors.shared.http import (
38 request_with_policy as _http_request_with_policy,
39)
40from qdrant_loader.core.attachment_downloader import (
41 AttachmentDownloader,
42 AttachmentMetadata,
43)
44from qdrant_loader.core.document import Document
45from qdrant_loader.core.file_conversion import (
46 FileConversionConfig,
47 FileConverter,
48 FileDetector,
49)
50from qdrant_loader.utils.logging import LoggingConfig
52logger = LoggingConfig.get_logger(__name__)
55class JiraConnector(BaseConnector):
56 """Jira connector for fetching and processing issues."""
58 def __init__(self, config: JiraProjectConfig):
59 """Initialize the Jira connector.
61 Args:
62 config: The Jira configuration.
64 Raises:
65 ValueError: If required authentication parameters are not set.
66 """
67 super().__init__(config)
68 self.config = config
69 self.base_url = str(config.base_url).rstrip("/")
71 # Initialize session
72 self.session = requests.Session()
74 # Set up authentication based on deployment type
75 self._setup_authentication()
77 self._last_sync: datetime | None = None
78 self._rate_limiter = RateLimiter.per_minute(self.config.requests_per_minute)
79 self._initialized = False
81 # Initialize file conversion components if enabled
82 self.file_converter: FileConverter | None = None
83 self.file_detector: FileDetector | None = None
84 self.attachment_reader: AttachmentReader | None = None
86 if config.enable_file_conversion:
87 self.file_detector = FileDetector()
88 # FileConverter will be initialized when file_conversion_config is set
90 if config.download_attachments:
91 self.attachment_reader = AttachmentReader(
92 session=self.session,
93 downloader=AttachmentDownloader(session=self.session),
94 )
96 def _setup_authentication(self):
97 """Set up authentication based on deployment type."""
98 _setup_auth(self.session, self.config)
100 def _auto_detect_deployment_type(self) -> JiraDeploymentType:
101 """Auto-detect the Jira deployment type based on the base URL.
103 Returns:
104 JiraDeploymentType: Detected deployment type
105 """
106 return _auto_detect_type(str(self.base_url))
108 def set_file_conversion_config(self, config: FileConversionConfig) -> None:
109 """Set the file conversion configuration.
111 Args:
112 config: File conversion configuration
113 """
114 if self.config.enable_file_conversion and self.file_detector:
115 self.file_converter = FileConverter(config)
116 if self.config.download_attachments:
117 # Clean up any existing attachment reader to avoid resource leaks
118 old_reader = self.attachment_reader
119 if old_reader is not None:
120 try:
121 close_callable = None
122 if hasattr(old_reader, "aclose"):
123 close_callable = old_reader.aclose
124 elif hasattr(old_reader, "close"):
125 close_callable = old_reader.close
126 elif hasattr(old_reader, "cleanup"):
127 close_callable = old_reader.cleanup
129 if close_callable is not None:
130 result = close_callable()
131 if asyncio.iscoroutine(result):
132 try:
133 # Try to schedule/await coroutine cleanup safely
134 try:
135 loop = asyncio.get_running_loop()
136 except RuntimeError:
137 loop = None
138 if loop and not loop.is_closed():
139 loop.create_task(result)
140 else:
141 asyncio.run(result)
142 except Exception:
143 # Ignore cleanup errors to not block reconfiguration
144 pass
145 except Exception:
146 # Ignore cleanup errors to avoid masking the config update
147 pass
149 # Drop reference before creating a new reader
150 self.attachment_reader = None
152 # Reinitialize reader with new downloader config
153 self.attachment_reader = AttachmentReader(
154 session=self.session,
155 downloader=AttachmentDownloader(
156 session=self.session,
157 file_conversion_config=config,
158 enable_file_conversion=True,
159 max_attachment_size=config.max_file_size,
160 ),
161 )
163 async def __aenter__(self):
164 """Async context manager entry."""
165 if not self._initialized:
166 self._initialized = True
167 return self
169 async def __aexit__(self, exc_type, exc_val, _exc_tb):
170 """Async context manager exit."""
171 self._initialized = False
173 def _get_api_url(self, endpoint: str) -> str:
174 """Construct the full API URL for an endpoint.
176 Args:
177 endpoint: API endpoint path
179 Returns:
180 str: Full API URL
181 """
182 return f"{self.base_url}/rest/api/2/{endpoint}"
184 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict:
185 """Make an authenticated request to the Jira API.
187 Args:
188 method: HTTP method
189 endpoint: API endpoint path
190 **kwargs: Additional request parameters
192 Returns:
193 dict: Response data
195 Raises:
196 requests.exceptions.RequestException: If the request fails
197 """
198 url = self._get_api_url(endpoint)
200 if "timeout" not in kwargs:
201 kwargs["timeout"] = 60
203 try:
204 logger.debug(
205 "Making JIRA API request",
206 method=method,
207 endpoint=endpoint,
208 url=url,
209 timeout=kwargs.get("timeout"),
210 )
212 if not self.session.headers.get("Authorization"):
213 kwargs["auth"] = self.session.auth
215 response = await _http_request_with_policy(
216 self.session,
217 method,
218 url,
219 rate_limiter=self._rate_limiter,
220 retries=3,
221 backoff_factor=0.5,
222 status_forcelist=(429, 500, 502, 503, 504),
223 overall_timeout=90.0,
224 **kwargs,
225 )
227 response.raise_for_status()
229 logger.debug(
230 "JIRA API request completed successfully",
231 method=method,
232 endpoint=endpoint,
233 status_code=response.status_code,
234 response_size=(
235 len(response.content) if hasattr(response, "content") else 0
236 ),
237 )
239 return response.json()
241 except TimeoutError:
242 logger.error(
243 "JIRA API request timed out",
244 method=method,
245 url=url,
246 timeout=kwargs.get("timeout"),
247 )
248 raise requests.exceptions.Timeout(
249 f"Request to {url} timed out after {kwargs.get('timeout')} seconds"
250 )
252 except requests.exceptions.RequestException as e:
253 logger.error(
254 "Failed to make request to JIRA API",
255 method=method,
256 url=url,
257 error=str(e),
258 error_type=type(e).__name__,
259 )
260 logger.error(
261 "Request details",
262 deployment_type=self.config.deployment_type,
263 has_auth_header=bool(self.session.headers.get("Authorization")),
264 has_session_auth=bool(self.session.auth),
265 )
266 raise
268 def _make_sync_request(self, jql: str, **kwargs):
269 """
270 Make a synchronous request to the Jira API, converting parameters as needed:
271 - Format datetime values in JQL to 'yyyy-MM-dd HH:mm' format
272 """
273 # Format datetime values in JQL query
274 for key, value in kwargs.items():
275 if isinstance(value, datetime):
276 formatted_date = value.strftime("%Y-%m-%d %H:%M")
277 jql = jql.replace(f"{{{key}}}", f"'{formatted_date}'")
279 # Use the new _make_request method for consistency
280 params = {
281 "jql": jql,
282 "startAt": kwargs.get("start", 0),
283 "maxResults": kwargs.get("limit", self.config.page_size),
284 "expand": "changelog",
285 "fields": "*all",
286 }
288 return asyncio.run(self._make_request("GET", "search", params=params))
290 async def get_issues(
291 self, updated_after: datetime | None = None
292 ) -> AsyncGenerator[JiraIssue, None]:
293 """
294 Get all issues from Jira.
296 Args:
297 updated_after: Optional datetime to filter issues updated after this time
299 Yields:
300 JiraIssue objects
301 """
302 start_at = 0
303 page_size = self.config.page_size
304 total_issues = 0
306 logger.info(
307 "🎫 Starting JIRA issue retrieval",
308 project_key=self.config.project_key,
309 page_size=page_size,
310 updated_after=updated_after.isoformat() if updated_after else None,
311 )
313 while True:
314 jql = f'project = "{self.config.project_key}"'
315 if updated_after:
316 jql += f" AND updated >= '{updated_after.strftime('%Y-%m-%d %H:%M')}'"
318 params = {
319 "jql": jql,
320 "startAt": start_at,
321 "maxResults": page_size,
322 "expand": "changelog",
323 "fields": "*all",
324 }
326 logger.debug(
327 "Fetching JIRA issues page",
328 start_at=start_at,
329 page_size=page_size,
330 jql=jql,
331 )
333 try:
334 response = await self._make_request("GET", "search", params=params)
335 except Exception as e:
336 logger.error(
337 "Failed to fetch JIRA issues page",
338 start_at=start_at,
339 page_size=page_size,
340 error=str(e),
341 error_type=type(e).__name__,
342 )
343 raise
345 if not response or not response.get("issues"):
346 logger.debug(
347 "No more JIRA issues found, stopping pagination",
348 start_at=start_at,
349 total_processed=start_at,
350 )
351 break
353 issues = response["issues"]
355 # Update total count if not set
356 if total_issues == 0:
357 total_issues = response.get("total", 0)
358 logger.info(f"🎫 Found {total_issues} JIRA issues to process")
360 # Log progress every 100 issues instead of every 50
361 progress_log_interval = 100
363 for i, issue in enumerate(issues):
364 try:
365 parsed_issue = self._parse_issue(issue)
366 yield parsed_issue
368 if (start_at + i + 1) % progress_log_interval == 0:
369 progress_percent = (
370 round((start_at + i + 1) / total_issues * 100, 1)
371 if total_issues > 0
372 else 0
373 )
374 logger.info(
375 f"🎫 Progress: {start_at + i + 1}/{total_issues} issues ({progress_percent}%)"
376 )
378 except Exception as e:
379 logger.error(
380 "Failed to parse JIRA issue",
381 issue_id=issue.get("id"),
382 issue_key=issue.get("key"),
383 error=str(e),
384 error_type=type(e).__name__,
385 )
386 # Continue processing other issues instead of failing completely
387 continue
389 # Check if we've processed all issues
390 start_at += len(issues)
391 if start_at >= total_issues:
392 logger.info(
393 f"✅ Completed JIRA issue retrieval: {start_at} issues processed"
394 )
395 break
397 def _parse_issue(self, raw_issue: dict) -> JiraIssue:
398 return _parse_issue_helper(raw_issue)
400 def _parse_user(
401 self, raw_user: dict | None, required: bool = False
402 ) -> JiraUser | None:
403 return _parse_user_helper(raw_user, required)
405 def _parse_attachment(self, raw_attachment: dict) -> JiraAttachment:
406 return _parse_attachment_helper(raw_attachment)
408 def _parse_comment(self, raw_comment: dict) -> JiraComment:
409 return _parse_comment_helper(raw_comment)
411 def _get_issue_attachments(self, issue: JiraIssue) -> list[AttachmentMetadata]:
412 """Convert JIRA issue attachments to AttachmentMetadata objects.
414 Args:
415 issue: JIRA issue with attachments
417 Returns:
418 List of attachment metadata objects
419 """
420 if not self.config.download_attachments or not issue.attachments:
421 return []
423 attachment_metadata = [
424 jira_attachment_to_metadata(att, parent_id=issue.id)
425 for att in issue.attachments
426 ]
428 return attachment_metadata
430 async def get_documents(self) -> list[Document]:
431 """Fetch and process documents from Jira.
433 Returns:
434 List[Document]: List of processed documents
435 """
436 documents = []
438 # Collect all issues
439 issues = []
440 async for issue in self.get_issues():
441 issues.append(issue)
443 # Convert issues to documents
444 for issue in issues:
445 # Build content including comments
446 content_parts = [issue.summary]
447 if issue.description:
448 content_parts.append(issue.description)
450 # Add comments to content
451 for comment in issue.comments:
452 content_parts.append(
453 f"\nComment by {comment.author.display_name} on {comment.created.strftime('%Y-%m-%d %H:%M')}:"
454 )
455 content_parts.append(comment.body)
457 content = "\n\n".join(content_parts)
459 base_url = str(self.config.base_url).rstrip("/")
460 document = Document(
461 id=issue.id,
462 content=content,
463 content_type="text",
464 source=self.config.source,
465 source_type=SourceType.JIRA,
466 created_at=issue.created,
467 url=f"{base_url}/browse/{issue.key}",
468 title=issue.summary,
469 updated_at=issue.updated,
470 is_deleted=False,
471 metadata={
472 "project": self.config.project_key,
473 "issue_type": issue.issue_type,
474 "status": issue.status,
475 "key": issue.key,
476 "priority": issue.priority,
477 "labels": issue.labels,
478 "reporter": issue.reporter.display_name if issue.reporter else None,
479 "assignee": issue.assignee.display_name if issue.assignee else None,
480 "created": issue.created.isoformat(),
481 "updated": issue.updated.isoformat(),
482 "parent_key": issue.parent_key,
483 "subtasks": issue.subtasks,
484 "linked_issues": issue.linked_issues,
485 "comments": [
486 {
487 "id": comment.id,
488 "body": comment.body,
489 "created": comment.created.isoformat(),
490 "updated": (
491 comment.updated.isoformat() if comment.updated else None
492 ),
493 "author": (
494 comment.author.display_name if comment.author else None
495 ),
496 }
497 for comment in issue.comments
498 ],
499 "attachments": (
500 [
501 {
502 "id": att.id,
503 "filename": att.filename,
504 "size": att.size,
505 "mime_type": att.mime_type,
506 "created": att.created.isoformat(),
507 "author": (
508 att.author.display_name if att.author else None
509 ),
510 }
511 for att in issue.attachments
512 ]
513 if issue.attachments
514 else []
515 ),
516 },
517 )
518 documents.append(document)
519 logger.debug(
520 "Jira document created",
521 document_id=document.id,
522 source_type=document.source_type,
523 source=document.source,
524 title=document.title,
525 )
527 # Process attachments if enabled
528 if self.config.download_attachments and self.attachment_reader:
529 attachment_metadata = self._get_issue_attachments(issue)
530 if attachment_metadata:
531 logger.info(
532 "Processing attachments for JIRA issue",
533 issue_key=issue.key,
534 attachment_count=len(attachment_metadata),
535 )
537 attachment_documents = (
538 await self.attachment_reader.fetch_and_process(
539 attachment_metadata, document
540 )
541 )
542 documents.extend(attachment_documents)
544 logger.debug(
545 "Processed attachments for JIRA issue",
546 issue_key=issue.key,
547 processed_count=len(attachment_documents),
548 )
550 return documents