Coverage for src / qdrant_loader / connectors / jira / connector.py: 70%
200 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-10 09:40 +0000
1"""Jira connector implementation."""
3import asyncio
4from abc import abstractmethod
5from collections.abc import AsyncGenerator
6from datetime import datetime
7from urllib.parse import urlparse # noqa: F401 - may be used in URL handling
9import requests
10from requests.auth import HTTPBasicAuth # noqa: F401 - compatibility
12from qdrant_loader.config.types import SourceType
13from qdrant_loader.connectors.base import BaseConnector, ConnectorConfigurationError
14from qdrant_loader.connectors.jira.auth import (
15 auto_detect_deployment_type as _auto_detect_type,
16)
17from qdrant_loader.connectors.jira.auth import setup_authentication as _setup_auth
18from qdrant_loader.connectors.jira.config import JiraDeploymentType, JiraProjectConfig
19from qdrant_loader.connectors.jira.mappers import (
20 parse_attachment as _parse_attachment_helper,
21)
22from qdrant_loader.connectors.jira.mappers import parse_comment as _parse_comment_helper
23from qdrant_loader.connectors.jira.mappers import parse_issue as _parse_issue_helper
24from qdrant_loader.connectors.jira.mappers import parse_user as _parse_user_helper
25from qdrant_loader.connectors.jira.models import (
26 JiraAttachment,
27 JiraComment,
28 JiraIssue,
29 JiraUser,
30)
31from qdrant_loader.connectors.shared.attachments import AttachmentReader
32from qdrant_loader.connectors.shared.attachments.metadata import (
33 jira_attachment_to_metadata,
34)
35from qdrant_loader.connectors.shared.http import (
36 RateLimiter,
37)
38from qdrant_loader.connectors.shared.http import (
39 request_with_policy as _http_request_with_policy,
40)
41from qdrant_loader.core.attachment_downloader import (
42 AttachmentDownloader,
43 AttachmentMetadata,
44)
45from qdrant_loader.core.document import Document
46from qdrant_loader.core.file_conversion import (
47 FileConversionConfig,
48 FileConverter,
49 FileDetector,
50)
51from qdrant_loader.utils.logging import LoggingConfig
53logger = LoggingConfig.get_logger(__name__)
56class BaseJiraConnector(BaseConnector):
57 """Base class for all Jira connectors."""
59 def __init__(self, config: JiraProjectConfig):
60 """Initialize the Jira connector.
62 Args:
63 config: The Jira configuration.
65 Raises:
66 ValueError: If required authentication parameters are not set.
67 """
68 super().__init__(config)
69 self.config = config
70 self.base_url = str(config.base_url).rstrip("/")
72 # Initialize session
73 self.session = requests.Session()
75 # Set up authentication based on deployment type
76 self._setup_authentication()
78 self._last_sync: datetime | None = None
79 self._rate_limiter = RateLimiter.per_minute(self.config.requests_per_minute)
80 self._initialized = False
82 # Initialize file conversion components if enabled
83 self.file_converter: FileConverter | None = None
84 self.file_detector: FileDetector | None = None
85 self.attachment_reader: AttachmentReader | None = None
87 if config.enable_file_conversion:
88 self.file_detector = FileDetector()
89 # FileConverter will be initialized when file_conversion_config is set
91 if config.download_attachments:
92 self.attachment_reader = AttachmentReader(
93 session=self.session,
94 downloader=AttachmentDownloader(session=self.session),
95 )
97 def _setup_authentication(self):
98 """Set up authentication based on deployment type."""
99 _setup_auth(self.session, self.config)
101 def _auto_detect_deployment_type(self) -> JiraDeploymentType:
102 """Auto-detect the Jira deployment type based on the base URL.
104 Returns:
105 JiraDeploymentType: Detected deployment type
106 """
107 return _auto_detect_type(str(self.base_url))
109 def set_file_conversion_config(self, config: FileConversionConfig) -> None:
110 """Set the file conversion configuration.
112 Args:
113 config: File conversion configuration
114 """
115 if self.config.enable_file_conversion and self.file_detector:
116 self.file_converter = FileConverter(config)
117 if self.config.download_attachments:
118 # Clean up any existing attachment reader to avoid resource leaks
119 old_reader = self.attachment_reader
120 if old_reader is not None:
121 try:
122 close_callable = None
123 if hasattr(old_reader, "aclose"):
124 close_callable = old_reader.aclose
125 elif hasattr(old_reader, "close"):
126 close_callable = old_reader.close
127 elif hasattr(old_reader, "cleanup"):
128 close_callable = old_reader.cleanup
130 if close_callable is not None:
131 result = close_callable()
132 if asyncio.iscoroutine(result):
133 try:
134 # Try to schedule/await coroutine cleanup safely
135 try:
136 loop = asyncio.get_running_loop()
137 except RuntimeError:
138 loop = None
139 if loop and not loop.is_closed():
140 loop.create_task(result)
141 else:
142 asyncio.run(result)
143 except Exception:
144 # Ignore cleanup errors to not block reconfiguration
145 pass
146 except Exception:
147 # Ignore cleanup errors to avoid masking the config update
148 pass
150 # Drop reference before creating a new reader
151 self.attachment_reader = None
153 # Reinitialize reader with new downloader config
154 self.attachment_reader = AttachmentReader(
155 session=self.session,
156 downloader=AttachmentDownloader(
157 session=self.session,
158 file_conversion_config=config,
159 enable_file_conversion=True,
160 max_attachment_size=config.max_file_size,
161 ),
162 )
164 async def _validate_connection(self) -> None:
165 """Validate connectivity, auth, and project access before use.
167 Raises:
168 ConnectorConfigurationError: for invalid URL, bad credentials,
169 missing permissions, or unknown project key.
170 """
171 # ── Step 1: reachability + authentication (/myself endpoint) ──────────
172 try:
173 await self._make_request("GET", "myself")
174 except requests.exceptions.Timeout as exc:
175 raise ConnectorConfigurationError(
176 f"Connection to Jira at '{self.base_url}' timed out. "
177 "Verify network connectivity and try again."
178 ) from exc
179 except requests.exceptions.ConnectionError as exc:
180 raise ConnectorConfigurationError(
181 f"Cannot connect to Jira at '{self.base_url}'. "
182 "Verify that base_url is correct and the server is reachable."
183 ) from exc
184 except requests.exceptions.HTTPError as exc:
185 status = exc.response.status_code if exc.response is not None else None
186 if status == 401:
187 raise ConnectorConfigurationError(
188 f"Authentication failed for Jira at '{self.base_url}' (HTTP 401). "
189 "Check that token and email are valid."
190 ) from exc
191 if status == 403:
192 raise ConnectorConfigurationError(
193 f"Access denied to Jira at '{self.base_url}' (HTTP 403). "
194 "The account does not have sufficient permissions."
195 ) from exc
196 raise ConnectorConfigurationError(
197 f"Validation request to Jira at '{self.base_url}' failed "
198 f"with HTTP {status}: {exc}"
199 ) from exc
200 except requests.exceptions.RequestException as exc:
201 raise ConnectorConfigurationError(
202 f"Validation request to Jira at '{self.base_url}' failed: {exc}"
203 ) from exc
205 # ── Step 2: project key exists and is accessible ───────────────────────
206 try:
207 await self._make_request("GET", f"project/{self.config.project_key}")
208 except requests.exceptions.Timeout as exc:
209 raise ConnectorConfigurationError(
210 f"Connection to Jira at '{self.base_url}' timed out while validating "
211 f"project '{self.config.project_key}'."
212 ) from exc
213 except requests.exceptions.ConnectionError as exc:
214 raise ConnectorConfigurationError(
215 f"Connection to Jira at '{self.base_url}' was lost while validating "
216 f"project '{self.config.project_key}' (between validation steps). "
217 "Verify network connectivity and Jira availability."
218 ) from exc
219 except requests.exceptions.HTTPError as exc:
220 status = exc.response.status_code if exc.response is not None else None
221 if status == 404:
222 raise ConnectorConfigurationError(
223 f"Project '{self.config.project_key}' not found in Jira (HTTP 404). "
224 "Check that project_key is correct."
225 ) from exc
226 if status == 403:
227 raise ConnectorConfigurationError(
228 f"No permission to access project '{self.config.project_key}' "
229 f"in Jira (HTTP 403)."
230 ) from exc
231 raise ConnectorConfigurationError(
232 f"Validation request for project '{self.config.project_key}' at "
233 f"'{self.base_url}' failed with HTTP {status}: {exc}"
234 ) from exc
235 except requests.exceptions.RequestException as exc:
236 raise ConnectorConfigurationError(
237 f"Validation request for project '{self.config.project_key}' at "
238 f"'{self.base_url}' failed: {exc}"
239 ) from exc
241 @staticmethod
242 def _escape_jql_literal(value: str) -> str:
243 """Escape special characters in JQL string literals.
245 Escapes backslashes and double quotes to prevent JQL injection
246 and query breaking when config values contain these characters.
248 Args:
249 value: The string value to escape
251 Returns:
252 str: The escaped string safe for inclusion in JQL quoted literals
253 """
254 # Replace backslash first to avoid double-escaping
255 value = value.replace("\\", "\\\\")
256 # Then escape double quotes
257 value = value.replace('"', '\\"')
258 return value
260 def _build_jql_filter(self, updated_after: datetime | None = None) -> str:
261 """Build JQL filter query with project key, issue types, and statuses.
263 Args:
264 updated_after: Optional datetime to filter issues updated after this time
266 Returns:
267 str: JQL filter query
268 """
269 escaped_project_key = self._escape_jql_literal(self.config.project_key)
270 jql = f'project = "{escaped_project_key}"'
272 # Add issue type filter if configured
273 if self.config.issue_types:
274 escaped_types = [
275 self._escape_jql_literal(t) for t in self.config.issue_types
276 ]
277 types_str = ", ".join(f'"{t}"' for t in escaped_types)
278 jql += f" AND type IN ({types_str})"
279 logger.debug(f"Applied JIRA issue type filter: {self.config.issue_types}")
281 # Add status filter if configured
282 if self.config.include_statuses:
283 escaped_statuses = [
284 self._escape_jql_literal(s) for s in self.config.include_statuses
285 ]
286 statuses_str = ", ".join(f'"{s}"' for s in escaped_statuses)
287 jql += f" AND status IN ({statuses_str})"
288 logger.debug(f"Applied JIRA status filter: {self.config.include_statuses}")
290 # Add updated_after filter if provided
291 if updated_after:
292 jql += f" AND updated >= '{updated_after.strftime('%Y-%m-%d %H:%M:%S')}'"
294 return jql
296 async def __aenter__(self):
297 """Async context manager entry."""
298 if not self._initialized:
299 await self._validate_connection()
300 self._initialized = True
301 return self
303 async def __aexit__(self, exc_type, exc_val, _exc_tb):
304 """Async context manager exit."""
305 try:
306 self.session.close()
307 finally:
308 self._initialized = False
310 @abstractmethod
311 def _get_api_url(self, endpoint: str) -> str:
312 """Construct the full API URL for an endpoint."""
313 ...
315 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict:
316 """Make an authenticated request to the Jira API.
318 Args:
319 method: HTTP method
320 endpoint: API endpoint path
321 **kwargs: Additional request parameters
323 Returns:
324 dict: Response data
326 Raises:
327 requests.exceptions.RequestException: If the request fails
328 """
329 url = self._get_api_url(endpoint)
331 if "timeout" not in kwargs:
332 kwargs["timeout"] = 60
334 try:
335 logger.debug(
336 "Making JIRA API request",
337 method=method,
338 endpoint=endpoint,
339 url=url,
340 timeout=kwargs.get("timeout"),
341 )
343 if not self.session.headers.get("Authorization"):
344 kwargs["auth"] = self.session.auth
346 response = await _http_request_with_policy(
347 self.session,
348 method,
349 url,
350 rate_limiter=self._rate_limiter,
351 retries=3,
352 backoff_factor=0.5,
353 status_forcelist=(429, 500, 502, 503, 504),
354 overall_timeout=90.0,
355 **kwargs,
356 )
358 response.raise_for_status()
360 logger.debug(
361 "JIRA API request completed successfully",
362 method=method,
363 endpoint=endpoint,
364 status_code=response.status_code,
365 response_size=(
366 len(response.content) if hasattr(response, "content") else 0
367 ),
368 )
370 return response.json()
372 except TimeoutError:
373 logger.error(
374 "JIRA API request timed out",
375 method=method,
376 url=url,
377 timeout=kwargs.get("timeout"),
378 )
379 raise requests.exceptions.Timeout(
380 f"Request to {url} timed out after {kwargs.get('timeout')} seconds"
381 )
383 except requests.exceptions.RequestException as e:
384 logger.error(
385 "Failed to make request to JIRA API",
386 method=method,
387 url=url,
388 error=str(e),
389 error_type=type(e).__name__,
390 )
391 logger.error(
392 "Request details",
393 deployment_type=self.config.deployment_type,
394 has_auth_header=bool(self.session.headers.get("Authorization")),
395 has_session_auth=bool(self.session.auth),
396 )
397 raise
399 @abstractmethod
400 async def get_issues(
401 self, updated_after: datetime | None = None
402 ) -> AsyncGenerator[JiraIssue, None]:
403 """Get all issues from Jira."""
404 ...
406 def _parse_issue(self, raw_issue: dict) -> JiraIssue:
407 """Parse a raw issue from the Jira response into a JiraIssue object."""
408 return _parse_issue_helper(raw_issue)
410 def _parse_user(
411 self, raw_user: dict | None, required: bool = False
412 ) -> JiraUser | None:
413 """Parse a raw user from the Jira response into a JiraUser object."""
414 return _parse_user_helper(raw_user, required)
416 def _parse_attachment(self, raw_attachment: dict) -> JiraAttachment:
417 """Parse a raw attachment from the Jira response into a JiraAttachment object."""
418 return _parse_attachment_helper(raw_attachment)
420 def _parse_comment(self, raw_comment: dict) -> JiraComment:
421 """Parse a raw comment from the Jira response into a JiraComment object."""
422 return _parse_comment_helper(raw_comment)
424 def _get_issue_attachments(self, issue: JiraIssue) -> list[AttachmentMetadata]:
425 """Convert JIRA issue attachments to AttachmentMetadata objects.
427 Args:
428 issue: JIRA issue with attachments
430 Returns:
431 List of attachment metadata objects
432 """
433 if not self.config.download_attachments or not issue.attachments:
434 return []
436 attachment_metadata = [
437 jira_attachment_to_metadata(att, parent_id=issue.id)
438 for att in issue.attachments
439 ]
441 return attachment_metadata
443 async def get_documents(self) -> list[Document]:
444 """Fetch and process documents from Jira.
446 Returns:
447 List[Document]: List of processed documents
448 """
449 documents = []
451 # Collect all issues
452 issues = []
453 async for issue in self.get_issues():
454 issues.append(issue)
456 # Convert issues to documents
457 for issue in issues:
458 # Build content including comments
459 content_parts = [issue.summary]
460 if issue.description:
461 content_parts.append(issue.description)
463 # Add comments to content
464 for comment in issue.comments:
465 content_parts.append(
466 f"\nComment by {comment.author.display_name} on {comment.created.strftime('%Y-%m-%d %H:%M')}:"
467 )
468 content_parts.append(comment.body)
470 content = "\n\n".join(content_parts)
472 base_url = str(self.config.base_url).rstrip("/")
473 document = Document(
474 id=issue.id,
475 content=content,
476 content_type="text",
477 source=self.config.source,
478 source_type=SourceType.JIRA,
479 created_at=issue.created,
480 url=f"{base_url}/browse/{issue.key}",
481 title=issue.summary,
482 updated_at=issue.updated,
483 is_deleted=False,
484 metadata={
485 "project": self.config.project_key,
486 "issue_type": issue.issue_type,
487 "status": issue.status,
488 "key": issue.key,
489 "priority": issue.priority,
490 "labels": issue.labels,
491 "reporter": issue.reporter.display_name if issue.reporter else None,
492 "assignee": issue.assignee.display_name if issue.assignee else None,
493 "created": issue.created.isoformat(),
494 "updated": issue.updated.isoformat(),
495 "parent_key": issue.parent_key,
496 "subtasks": issue.subtasks,
497 "linked_issues": issue.linked_issues,
498 "comments": [
499 {
500 "id": comment.id,
501 "body": comment.body,
502 "created": comment.created.isoformat(),
503 "updated": (
504 comment.updated.isoformat() if comment.updated else None
505 ),
506 "author": (
507 comment.author.display_name if comment.author else None
508 ),
509 }
510 for comment in issue.comments
511 ],
512 "attachments": (
513 [
514 {
515 "id": att.id,
516 "filename": att.filename,
517 "size": att.size,
518 "mime_type": att.mime_type,
519 "created": att.created.isoformat(),
520 "author": (
521 att.author.display_name if att.author else None
522 ),
523 }
524 for att in issue.attachments
525 ]
526 if issue.attachments
527 else []
528 ),
529 },
530 )
531 documents.append(document)
532 logger.debug(
533 "Jira document created",
534 document_id=document.id,
535 source_type=document.source_type,
536 source=document.source,
537 title=document.title,
538 )
540 # Process attachments if enabled
541 if self.config.download_attachments and self.attachment_reader:
542 attachment_metadata = self._get_issue_attachments(issue)
543 if attachment_metadata:
544 logger.info(
545 "Processing attachments for JIRA issue",
546 issue_key=issue.key,
547 attachment_count=len(attachment_metadata),
548 )
550 attachment_documents = (
551 await self.attachment_reader.fetch_and_process(
552 attachment_metadata, document
553 )
554 )
555 documents.extend(attachment_documents)
557 logger.debug(
558 "Processed attachments for JIRA issue",
559 issue_key=issue.key,
560 processed_count=len(attachment_documents),
561 )
563 return documents