Coverage for src / qdrant_loader / connectors / jira / data_center_connector.py: 92%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Jira connector implementation.""" 

2 

3from collections.abc import AsyncGenerator 

4from datetime import datetime 

5from urllib.parse import urlparse # noqa: F401 - may be used in URL handling 

6 

7from requests.auth import HTTPBasicAuth # noqa: F401 - compatibility 

8 

9from qdrant_loader.connectors.jira.connector import BaseJiraConnector 

10from qdrant_loader.connectors.jira.models import ( 

11 JiraIssue, 

12) 

13from qdrant_loader.utils.logging import LoggingConfig 

14 

15logger = LoggingConfig.get_logger(__name__) 

16 

17 

18class JiraDataCenterConnector(BaseJiraConnector): 

19 """Jira data center connector for fetching and processing issues.""" 

20 

21 SEARCH_ENDPOINT = "search" 

22 

23 def _get_api_url(self, endpoint: str) -> str: 

24 """Construct the full API URL for an endpoint. 

25 

26 Args: 

27 endpoint: API endpoint path 

28 

29 Returns: 

30 str: Full API URL 

31 """ 

32 return f"{self.base_url}/rest/api/2/{endpoint}" 

33 

34 async def get_issues( 

35 self, updated_after: datetime | None = None 

36 ) -> AsyncGenerator[JiraIssue, None]: 

37 """ 

38 Get all issues from Jira. 

39 

40 Args: 

41 updated_after: Optional datetime to filter issues updated after this time 

42 

43 Yields: 

44 JiraIssue objects 

45 """ 

46 start_at = 0 

47 page_size = self.config.page_size 

48 total_issues = 0 

49 processed_count = 0 

50 

51 logger.info( 

52 "🎫 Starting JIRA issue retrieval", 

53 project_key=self.config.project_key, 

54 page_size=page_size, 

55 updated_after=updated_after.isoformat() if updated_after else None, 

56 ) 

57 

58 while True: 

59 jql = self._build_jql_filter(updated_after) 

60 

61 params = { 

62 "jql": jql, 

63 "startAt": start_at, 

64 "maxResults": page_size, 

65 "expand": "changelog", 

66 "fields": "*all", 

67 } 

68 

69 logger.debug( 

70 "Fetching JIRA issues page", 

71 start_at=start_at, 

72 page_size=page_size, 

73 jql=jql, 

74 ) 

75 

76 try: 

77 response = await self._make_request( 

78 "GET", self.SEARCH_ENDPOINT, params=params 

79 ) 

80 except Exception as e: 

81 logger.error( 

82 "Failed to fetch JIRA issues page", 

83 start_at=start_at, 

84 page_size=page_size, 

85 error=str(e), 

86 error_type=type(e).__name__, 

87 ) 

88 raise 

89 

90 if not response or not response.get("issues"): 

91 logger.debug( 

92 "No more JIRA issues found, stopping pagination", 

93 start_at=start_at, 

94 total_processed=start_at, 

95 issues_processed=processed_count, 

96 ) 

97 break 

98 

99 issues = response["issues"] 

100 

101 # Update total count if not set 

102 if total_issues == 0: 

103 total_issues = response.get("total", 0) 

104 logger.info(f"🎫 Found {total_issues} JIRA issues to process") 

105 

106 # Log progress every 100 issues instead of every 50 

107 progress_log_interval = 100 

108 

109 for i, issue in enumerate(issues): 

110 try: 

111 parsed_issue = self._parse_issue(issue) 

112 yield parsed_issue 

113 processed_count += 1 

114 

115 if (start_at + i + 1) % progress_log_interval == 0: 

116 progress_percent = ( 

117 round((start_at + i + 1) / total_issues * 100, 1) 

118 if total_issues > 0 

119 else 0 

120 ) 

121 logger.info( 

122 f"🎫 Progress: {start_at + i + 1}/{total_issues} issues ({progress_percent}%)" 

123 ) 

124 

125 except Exception as e: 

126 logger.error( 

127 "Failed to parse JIRA issue", 

128 issue_id=issue.get("id"), 

129 issue_key=issue.get("key"), 

130 error=str(e), 

131 error_type=type(e).__name__, 

132 ) 

133 # Continue processing other issues instead of failing completely 

134 continue 

135 

136 # Check if we've processed all issues 

137 start_at += len(issues) 

138 if start_at >= total_issues: 

139 logger.info( 

140 f"✅ Completed JIRA issue retrieval: " 

141 f"{start_at} issues attempted, " 

142 f"{processed_count} successfully processed" 

143 ) 

144 break