Coverage for src / qdrant_loader / connectors / jira / connector.py: 72%
229 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Jira connector implementation."""
3import asyncio
4import warnings
5from abc import abstractmethod
6from collections.abc import AsyncGenerator, AsyncIterator
7from datetime import datetime
8from urllib.parse import urlparse # noqa: F401 - may be used in URL handling
10import requests
11from requests.auth import HTTPBasicAuth # noqa: F401 - compatibility
13from qdrant_loader.config.types import SourceType
14from qdrant_loader.connectors.base import BaseConnector, ConnectorConfigurationError
15from qdrant_loader.connectors.jira.auth import (
16 auto_detect_deployment_type as _auto_detect_type,
17)
18from qdrant_loader.connectors.jira.auth import setup_authentication as _setup_auth
19from qdrant_loader.connectors.jira.config import (
20 JiraDeploymentType,
21 JiraExtraField,
22 JiraProjectConfig,
23)
24from qdrant_loader.connectors.jira.mappers import (
25 parse_attachment as _parse_attachment_helper,
26)
27from qdrant_loader.connectors.jira.mappers import parse_comment as _parse_comment_helper
28from qdrant_loader.connectors.jira.mappers import parse_issue as _parse_issue_helper
29from qdrant_loader.connectors.jira.mappers import parse_user as _parse_user_helper
30from qdrant_loader.connectors.jira.models import (
31 JiraAttachment,
32 JiraComment,
33 JiraIssue,
34 JiraUser,
35)
36from qdrant_loader.connectors.shared.attachments import AttachmentReader
37from qdrant_loader.connectors.shared.attachments.metadata import (
38 jira_attachment_to_metadata,
39)
40from qdrant_loader.connectors.shared.http import (
41 RateLimiter,
42)
43from qdrant_loader.connectors.shared.http import (
44 request_with_policy as _http_request_with_policy,
45)
46from qdrant_loader.core.attachment_downloader import (
47 AttachmentDownloader,
48 AttachmentMetadata,
49)
50from qdrant_loader.core.document import Document
51from qdrant_loader.core.file_conversion import (
52 FileConversionConfig,
53 FileConverter,
54 FileDetector,
55)
56from qdrant_loader.utils.logging import LoggingConfig
58logger = LoggingConfig.get_logger(__name__)
61class BaseJiraConnector(BaseConnector):
62 """Base class for all Jira connectors."""
64 def __init__(self, config: JiraProjectConfig):
65 """Initialize the Jira connector.
67 Args:
68 config: The Jira configuration.
70 Raises:
71 ValueError: If required authentication parameters are not set.
72 """
73 super().__init__(config)
74 self.config = config
75 self.base_url = str(config.base_url).rstrip("/")
77 # Initialize session
78 self.session = requests.Session()
80 # Set up authentication based on deployment type
81 self._setup_authentication()
83 self._last_sync: datetime | None = None
84 self._rate_limiter = RateLimiter.per_minute(self.config.requests_per_minute)
85 self._initialized = False
87 # Initialize file conversion components if enabled
88 self.file_converter: FileConverter | None = None
89 self.file_detector: FileDetector | None = None
90 self.attachment_reader: AttachmentReader | None = None
92 if config.enable_file_conversion:
93 self.file_detector = FileDetector()
94 # FileConverter will be initialized when file_conversion_config is set
96 if config.download_attachments:
97 self.attachment_reader = AttachmentReader(
98 session=self.session,
99 downloader=AttachmentDownloader(session=self.session),
100 )
102 def _setup_authentication(self):
103 """Set up authentication based on deployment type."""
104 _setup_auth(self.session, self.config)
106 def _auto_detect_deployment_type(self) -> JiraDeploymentType:
107 """Auto-detect the Jira deployment type based on the base URL.
109 Returns:
110 JiraDeploymentType: Detected deployment type
111 """
112 return _auto_detect_type(str(self.base_url))
114 def set_file_conversion_config(self, config: FileConversionConfig) -> None:
115 """Set the file conversion configuration.
117 Args:
118 config: File conversion configuration
119 """
120 if self.config.enable_file_conversion and self.file_detector:
121 self.file_converter = FileConverter(config)
122 if self.config.download_attachments:
123 # Clean up any existing attachment reader to avoid resource leaks
124 old_reader = self.attachment_reader
125 if old_reader is not None:
126 try:
127 close_callable = None
128 if hasattr(old_reader, "aclose"):
129 close_callable = old_reader.aclose
130 elif hasattr(old_reader, "close"):
131 close_callable = old_reader.close
132 elif hasattr(old_reader, "cleanup"):
133 close_callable = old_reader.cleanup
135 if close_callable is not None:
136 result = close_callable()
137 if asyncio.iscoroutine(result):
138 try:
139 # Try to schedule/await coroutine cleanup safely
140 try:
141 loop = asyncio.get_running_loop()
142 except RuntimeError:
143 loop = None
144 if loop and not loop.is_closed():
145 loop.create_task(result)
146 else:
147 asyncio.run(result)
148 except Exception:
149 # Ignore cleanup errors to not block reconfiguration
150 pass
151 except Exception:
152 # Ignore cleanup errors to avoid masking the config update
153 pass
155 # Drop reference before creating a new reader
156 self.attachment_reader = None
158 # Reinitialize reader with new downloader config
159 self.attachment_reader = AttachmentReader(
160 session=self.session,
161 downloader=AttachmentDownloader(
162 session=self.session,
163 file_conversion_config=config,
164 enable_file_conversion=True,
165 max_attachment_size=config.max_file_size,
166 ),
167 )
169 async def _validate_connection(self) -> None:
170 """Validate connectivity, auth, and project access before use.
172 Raises:
173 ConnectorConfigurationError: for invalid URL, bad credentials,
174 missing permissions, or unknown project key.
175 """
176 # ── Step 1: reachability + authentication (/myself endpoint) ──────────
177 try:
178 await self._make_request("GET", "myself")
179 except requests.exceptions.Timeout as exc:
180 raise ConnectorConfigurationError(
181 f"Connection to Jira at '{self.base_url}' timed out. "
182 "Verify network connectivity and try again."
183 ) from exc
184 except requests.exceptions.ConnectionError as exc:
185 raise ConnectorConfigurationError(
186 f"Cannot connect to Jira at '{self.base_url}'. "
187 "Verify that base_url is correct and the server is reachable."
188 ) from exc
189 except requests.exceptions.HTTPError as exc:
190 status = exc.response.status_code if exc.response is not None else None
191 if status == 401:
192 raise ConnectorConfigurationError(
193 f"Authentication failed for Jira at '{self.base_url}' (HTTP 401). "
194 "Check that token and email are valid."
195 ) from exc
196 if status == 403:
197 raise ConnectorConfigurationError(
198 f"Access denied to Jira at '{self.base_url}' (HTTP 403). "
199 "The account does not have sufficient permissions."
200 ) from exc
201 raise ConnectorConfigurationError(
202 f"Validation request to Jira at '{self.base_url}' failed "
203 f"with HTTP {status}: {exc}"
204 ) from exc
205 except requests.exceptions.RequestException as exc:
206 raise ConnectorConfigurationError(
207 f"Validation request to Jira at '{self.base_url}' failed: {exc}"
208 ) from exc
210 # ── Step 2: project key exists and is accessible ───────────────────────
211 try:
212 await self._make_request("GET", f"project/{self.config.project_key}")
213 except requests.exceptions.Timeout as exc:
214 raise ConnectorConfigurationError(
215 f"Connection to Jira at '{self.base_url}' timed out while validating "
216 f"project '{self.config.project_key}'."
217 ) from exc
218 except requests.exceptions.ConnectionError as exc:
219 raise ConnectorConfigurationError(
220 f"Connection to Jira at '{self.base_url}' was lost while validating "
221 f"project '{self.config.project_key}' (between validation steps). "
222 "Verify network connectivity and Jira availability."
223 ) from exc
224 except requests.exceptions.HTTPError as exc:
225 status = exc.response.status_code if exc.response is not None else None
226 if status == 404:
227 raise ConnectorConfigurationError(
228 f"Project '{self.config.project_key}' not found in Jira (HTTP 404). "
229 "Check that project_key is correct."
230 ) from exc
231 if status == 403:
232 raise ConnectorConfigurationError(
233 f"No permission to access project '{self.config.project_key}' "
234 f"in Jira (HTTP 403)."
235 ) from exc
236 raise ConnectorConfigurationError(
237 f"Validation request for project '{self.config.project_key}' at "
238 f"'{self.base_url}' failed with HTTP {status}: {exc}"
239 ) from exc
240 except requests.exceptions.RequestException as exc:
241 raise ConnectorConfigurationError(
242 f"Validation request for project '{self.config.project_key}' at "
243 f"'{self.base_url}' failed: {exc}"
244 ) from exc
246 @staticmethod
247 def _escape_jql_literal(value: str) -> str:
248 """Escape special characters in JQL string literals.
250 Escapes backslashes and double quotes to prevent JQL injection
251 and query breaking when config values contain these characters.
253 Args:
254 value: The string value to escape
256 Returns:
257 str: The escaped string safe for inclusion in JQL quoted literals
258 """
259 # Replace backslash first to avoid double-escaping
260 value = value.replace("\\", "\\\\")
261 # Then escape double quotes
262 value = value.replace('"', '\\"')
263 return value
265 def _build_jql_filter(self, updated_after: datetime | None = None) -> str:
266 """Build JQL filter query with project key, issue types, and statuses.
268 Args:
269 updated_after: Optional datetime to filter issues updated after this time
271 Returns:
272 str: JQL filter query
273 """
274 escaped_project_key = self._escape_jql_literal(self.config.project_key)
275 jql = f'project = "{escaped_project_key}"'
277 # Add issue type filter if configured
278 if self.config.issue_types:
279 escaped_types = [
280 self._escape_jql_literal(t) for t in self.config.issue_types
281 ]
282 types_str = ", ".join(f'"{t}"' for t in escaped_types)
283 jql += f" AND type IN ({types_str})"
284 logger.debug(f"Applied JIRA issue type filter: {self.config.issue_types}")
286 # Add status filter if configured
287 if self.config.include_statuses:
288 escaped_statuses = [
289 self._escape_jql_literal(s) for s in self.config.include_statuses
290 ]
291 statuses_str = ", ".join(f'"{s}"' for s in escaped_statuses)
292 jql += f" AND status IN ({statuses_str})"
293 logger.debug(f"Applied JIRA status filter: {self.config.include_statuses}")
295 # Add updated_after filter if provided
296 if updated_after:
297 jql += f" AND updated >= '{updated_after.strftime('%Y-%m-%d %H:%M')}'"
299 return jql
301 async def __aenter__(self):
302 """Async context manager entry."""
303 if not self._initialized:
304 await self._validate_connection()
305 self._initialized = True
306 return self
308 async def __aexit__(self, exc_type, exc_val, _exc_tb):
309 """Async context manager exit."""
310 try:
311 self.session.close()
312 finally:
313 self._initialized = False
315 @abstractmethod
316 def _get_api_url(self, endpoint: str) -> str:
317 """Construct the full API URL for an endpoint."""
318 ...
320 async def _make_request(self, method: str, endpoint: str, **kwargs) -> dict:
321 """Make an authenticated request to the Jira API.
323 Args:
324 method: HTTP method
325 endpoint: API endpoint path
326 **kwargs: Additional request parameters
328 Returns:
329 dict: Response data
331 Raises:
332 requests.exceptions.RequestException: If the request fails
333 """
334 url = self._get_api_url(endpoint)
336 if "timeout" not in kwargs:
337 kwargs["timeout"] = 60
339 try:
340 logger.debug(
341 "Making JIRA API request",
342 method=method,
343 endpoint=endpoint,
344 url=url,
345 timeout=kwargs.get("timeout"),
346 )
348 if not self.session.headers.get("Authorization"):
349 kwargs["auth"] = self.session.auth
351 response = await _http_request_with_policy(
352 self.session,
353 method,
354 url,
355 rate_limiter=self._rate_limiter,
356 retries=3,
357 backoff_factor=0.5,
358 status_forcelist=(429, 500, 502, 503, 504),
359 overall_timeout=90.0,
360 **kwargs,
361 )
363 response.raise_for_status()
365 logger.debug(
366 "JIRA API request completed successfully",
367 method=method,
368 endpoint=endpoint,
369 status_code=response.status_code,
370 response_size=(
371 len(response.content) if hasattr(response, "content") else 0
372 ),
373 )
375 return response.json()
377 except TimeoutError:
378 logger.error(
379 "JIRA API request timed out",
380 method=method,
381 url=url,
382 timeout=kwargs.get("timeout"),
383 )
384 raise requests.exceptions.Timeout(
385 f"Request to {url} timed out after {kwargs.get('timeout')} seconds"
386 )
388 except requests.exceptions.RequestException as e:
389 logger.error(
390 "Failed to make request to JIRA API",
391 method=method,
392 url=url,
393 error=str(e),
394 error_type=type(e).__name__,
395 )
396 logger.error(
397 "Request details",
398 deployment_type=self.config.deployment_type,
399 has_auth_header=bool(self.session.headers.get("Authorization")),
400 has_session_auth=bool(self.session.auth),
401 )
402 raise
404 @abstractmethod
405 async def get_issues(
406 self, updated_after: datetime | None = None
407 ) -> AsyncGenerator[JiraIssue, None]:
408 """Get all issues from Jira."""
409 ...
411 def _parse_issue(
412 self, raw_issue: dict, extra_fields: list[JiraExtraField] | None = None
413 ) -> JiraIssue:
414 """Parse a raw issue from the Jira response into a JiraIssue object."""
415 return _parse_issue_helper(raw_issue, extra_fields)
417 def _parse_user(
418 self, raw_user: dict | None, required: bool = False
419 ) -> JiraUser | None:
420 """Parse a raw user from the Jira response into a JiraUser object."""
421 return _parse_user_helper(raw_user, required)
423 def _parse_attachment(self, raw_attachment: dict) -> JiraAttachment:
424 """Parse a raw attachment from the Jira response into a JiraAttachment object."""
425 return _parse_attachment_helper(raw_attachment)
427 def _parse_comment(self, raw_comment: dict) -> JiraComment:
428 """Parse a raw comment from the Jira response into a JiraComment object."""
429 return _parse_comment_helper(raw_comment)
431 def _get_issue_attachments(self, issue: JiraIssue) -> list[AttachmentMetadata]:
432 """Convert JIRA issue attachments to AttachmentMetadata objects.
434 Args:
435 issue: JIRA issue with attachments
437 Returns:
438 List of attachment metadata objects
439 """
440 if not self.config.download_attachments or not issue.attachments:
441 return []
443 attachment_metadata = [
444 jira_attachment_to_metadata(att, parent_id=issue.id)
445 for att in issue.attachments
446 ]
448 return attachment_metadata
450 async def fetch_by_id(self, entity_id: str) -> Document | None:
451 """Fetch a single Jira issue by key or numeric ID."""
452 try:
453 raw_issue = await self._make_request("GET", f"issue/{entity_id}")
454 issue = self._parse_issue(raw_issue, self.config.extra_fields)
455 documents = await self._issues_to_documents(
456 [issue], include_attachments=False
457 )
458 except Exception as exc:
459 logger.error(
460 "Failed to fetch/parse Jira issue by id",
461 entity_id=entity_id,
462 error=str(exc),
463 )
464 return None
466 return documents[0] if documents else None
468 async def list_entity_ids(self) -> AsyncIterator[str]:
469 """Stream all issue keys in the configured project."""
470 async for issue in self.get_issues():
471 yield issue.key
473 async def _issues_to_documents(
474 self, issues: list[JiraIssue], include_attachments: bool = True
475 ) -> list[Document]:
476 """Convert Jira issues to Document objects.
478 Set ``include_attachments=False`` for single-entity fetches where the
479 parent document must remain available even if attachment materialization
480 fails.
482 .. deprecated:: WS-1
483 Use :meth:`_stream_issues_to_documents` for streaming.
484 """
485 documents: list[Document] = []
486 async for document in self._stream_issues_to_documents(
487 issues, include_attachments=include_attachments
488 ):
489 documents.append(document)
490 return documents
492 async def _stream_issues_to_documents(
493 self, issues: list[JiraIssue], include_attachments: bool = True
494 ) -> AsyncGenerator[Document, None]:
495 """Stream Jira issues as Document objects, including attachments.
497 Yields documents one at a time, with attachments yielded immediately after
498 their parent issue document.
499 """
500 for issue in issues:
501 content_parts = [issue.summary]
502 if issue.description:
503 content_parts.append(issue.description)
505 for comment in issue.comments:
506 content_parts.append(
507 f"\nComment by {comment.author.display_name} on {comment.created.strftime('%Y-%m-%d %H:%M')}:"
508 )
509 content_parts.append(comment.body)
511 content = "\n\n".join(content_parts)
512 metadata = {
513 "project": self.config.project_key,
514 "issue_type": issue.issue_type,
515 "status": issue.status,
516 "key": issue.key,
517 "priority": issue.priority,
518 "labels": issue.labels,
519 "reporter": issue.reporter.display_name if issue.reporter else None,
520 "assignee": issue.assignee.display_name if issue.assignee else None,
521 "created": issue.created.isoformat(),
522 "updated": issue.updated.isoformat(),
523 "parent_key": issue.parent_key,
524 "subtasks": issue.subtasks,
525 "linked_issues": issue.linked_issues,
526 "comments": [
527 {
528 "id": comment.id,
529 "body": comment.body,
530 "created": comment.created.isoformat(),
531 "updated": (
532 comment.updated.isoformat() if comment.updated else None
533 ),
534 "author": (
535 comment.author.display_name if comment.author else None
536 ),
537 }
538 for comment in issue.comments
539 ],
540 "attachments": (
541 [
542 {
543 "id": att.id,
544 "filename": att.filename,
545 "size": att.size,
546 "mime_type": att.mime_type,
547 "created": att.created.isoformat(),
548 "author": (att.author.display_name if att.author else None),
549 }
550 for att in issue.attachments
551 ]
552 if issue.attachments
553 else []
554 ),
555 }
556 if self.config.extra_fields:
557 for field in self.config.extra_fields:
558 metadata[field.name] = getattr(issue, field.name)
559 base_url = str(self.config.base_url).rstrip("/")
560 document = Document(
561 id=issue.id,
562 content=content,
563 content_type="text",
564 source=self.config.source,
565 source_type=SourceType.JIRA,
566 created_at=issue.created,
567 url=f"{base_url}/browse/{issue.key}",
568 title=issue.summary,
569 updated_at=issue.updated,
570 is_deleted=False,
571 metadata=metadata,
572 )
573 logger.debug(
574 "Jira document created",
575 document_id=document.id,
576 source_type=document.source_type,
577 source=document.source,
578 title=document.title,
579 )
580 yield document
582 if (
583 include_attachments
584 and self.config.download_attachments
585 and self.attachment_reader
586 ):
587 attachment_metadata = self._get_issue_attachments(issue)
588 if attachment_metadata:
589 logger.info(
590 "Processing attachments for JIRA issue",
591 issue_key=issue.key,
592 attachment_count=len(attachment_metadata),
593 )
595 attachment_documents = (
596 await self.attachment_reader.fetch_and_process(
597 attachment_metadata, document
598 )
599 )
600 for attachment_document in attachment_documents:
601 yield attachment_document
603 logger.debug(
604 "Processed attachments for JIRA issue",
605 issue_key=issue.key,
606 processed_count=len(attachment_documents),
607 )
609 async def stream_documents(
610 self, since: datetime | None = None
611 ) -> AsyncGenerator[Document, None]:
612 """Stream documents from Jira (WS-1 connector contract)."""
613 effective_since = since if since is not None else self.config.updated_after
614 async for issue in self.get_issues(updated_after=effective_since):
615 async for document in self._stream_issues_to_documents([issue]):
616 yield document
618 async def get_documents(self) -> list[Document]:
619 """Fetch and process documents from Jira (DEPRECATED - use stream_documents)."""
620 warnings.warn(
621 "BaseJiraConnector.get_documents is deprecated. Implement stream_documents() "
622 "or use connector.stream_documents() to avoid materializing the full "
623 "document list in memory.",
624 DeprecationWarning,
625 stacklevel=2,
626 )
627 documents = []
628 async for document in self.stream_documents():
629 documents.append(document)
630 return documents