Coverage for src / qdrant_loader / webhooks / event_processor.py: 15%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Process queued webhook change events.""" 

2 

3from __future__ import annotations 

4 

5from typing import Any 

6 

7from qdrant_loader.config import get_settings 

8from qdrant_loader.config.types import SourceType 

9from qdrant_loader.connectors.factory import get_connector_instance 

10from qdrant_loader.core.document import Document 

11from qdrant_loader.core.qdrant_manager import QdrantManager 

12from qdrant_loader.utils.logging import LoggingConfig 

13from qdrant_loader.webhooks.queue_backend import ( 

14 FULL_SCAN, 

15 SINGLE_DELETE, 

16 SINGLE_UPSERT, 

17 ChangeEvent, 

18) 

19 

20logger = LoggingConfig.get_logger(__name__) 

21 

22 

23async def process_change_event(event: ChangeEvent) -> None: 

24 """Execute a queued webhook change event.""" 

25 settings = get_settings() 

26 qdrant_manager = QdrantManager(settings) 

27 

28 from qdrant_loader.core.async_ingestion_pipeline import AsyncIngestionPipeline 

29 

30 pipeline = AsyncIngestionPipeline(settings, qdrant_manager) 

31 try: 

32 if event.operation == FULL_SCAN: 

33 await pipeline.process_documents( 

34 project_id=event.project_id, 

35 source_type=event.source_type or None, 

36 source=event.source or None, 

37 force=event.force, 

38 ) 

39 elif event.operation == SINGLE_UPSERT: 

40 await _process_single_upsert(pipeline, event) 

41 elif event.operation == SINGLE_DELETE: 

42 await _process_single_delete(pipeline, event) 

43 else: 

44 raise ValueError(f"Unsupported webhook operation: {event.operation}") 

45 finally: 

46 await pipeline.cleanup() 

47 

48 

49async def _process_single_upsert( 

50 pipeline: Any, 

51 event: ChangeEvent, 

52) -> None: 

53 if not event.entity_id: 

54 raise ValueError("SINGLE_UPSERT requires entity_id") 

55 

56 await pipeline.initialize() 

57 source_config, project_id = _resolve_source_config( 

58 pipeline, event.project_id, event.source_type, event.source 

59 ) 

60 connector = get_connector_instance(source_config) 

61 file_conversion_config = pipeline.settings.global_config.file_conversion 

62 if ( 

63 file_conversion_config 

64 and hasattr(connector, "set_file_conversion_config") 

65 and getattr(source_config, "enable_file_conversion", False) 

66 ): 

67 connector.set_file_conversion_config(file_conversion_config) 

68 

69 async with connector: 

70 document = await connector.fetch_by_id(event.entity_id) 

71 

72 if document is None: 

73 logger.warning( 

74 "Entity not found for SINGLE_UPSERT", 

75 entity_id=event.entity_id, 

76 source=event.source, 

77 ) 

78 return 

79 

80 if project_id and pipeline.project_manager: 

81 document.metadata = pipeline.project_manager.inject_project_metadata( 

82 project_id, document.metadata 

83 ) 

84 

85 result = await pipeline.orchestrator.components.document_pipeline.process_documents( 

86 [document] 

87 ) 

88 await pipeline.orchestrator._update_document_states( 

89 [document], 

90 result.successfully_processed_documents, 

91 project_id, 

92 ) 

93 logger.info( 

94 "SINGLE_UPSERT completed", 

95 entity_id=event.entity_id, 

96 source=event.source, 

97 chunks=result.success_count, 

98 ) 

99 

100 

101async def _process_single_delete( 

102 pipeline: Any, 

103 event: ChangeEvent, 

104) -> None: 

105 document = _document_from_delete_payload(event) 

106 if document is None: 

107 logger.warning( 

108 "Could not build delete document from webhook payload", 

109 entity_id=event.entity_id, 

110 source=event.source, 

111 ) 

112 return 

113 

114 project_id = event.project_id 

115 if project_id and pipeline.project_manager: 

116 document.metadata = pipeline.project_manager.inject_project_metadata( 

117 project_id, document.metadata 

118 ) 

119 

120 await pipeline.initialize() 

121 await pipeline.orchestrator._process_deleted_documents( 

122 [document], 

123 project_id, 

124 ) 

125 logger.info( 

126 "SINGLE_DELETE completed", 

127 entity_id=event.entity_id, 

128 document_id=document.id, 

129 source=event.source, 

130 ) 

131 

132 

133def _document_from_delete_payload(event: ChangeEvent) -> Document | None: 

134 """Build a minimal Document for deletion from Jira webhook payload.""" 

135 payload = event.payload if isinstance(event.payload, dict) else {} 

136 issue = payload.get("issue", {}) 

137 issue_id = issue.get("id") or event.entity_id 

138 issue_key = issue.get("key") or event.entity_id 

139 if not issue_id: 

140 return None 

141 

142 base_url = "" 

143 fields = issue.get("fields") or {} 

144 self_url = issue.get("self", "") 

145 if self_url and "/rest/api/" in self_url: 

146 base_url = self_url.split("/rest/api/")[0] 

147 

148 url = f"{base_url}/browse/{issue_key}" if base_url and issue_key else "" 

149 updated = fields.get("updated", "") 

150 

151 return Document( 

152 id=str(issue_id), 

153 content="", 

154 content_type="text", 

155 source=event.source, 

156 source_type=SourceType.JIRA, 

157 url=url, 

158 title=fields.get("summary", issue_key or "Deleted Issue"), 

159 is_deleted=True, 

160 metadata={ 

161 "key": issue_key, 

162 "updated_at": updated, 

163 "uri": f"{SourceType.JIRA}:{event.source}:{url}", 

164 }, 

165 ) 

166 

167 

168def _resolve_source_config( 

169 pipeline: Any, 

170 project_id: str | None, 

171 source_type: str, 

172 source_name: str, 

173) -> tuple[Any, str | None]: 

174 """Resolve connector config and project id for a named source.""" 

175 if source_type != "jira": 

176 raise ValueError(f"Single-event processing is not supported for {source_type}") 

177 

178 if project_id: 

179 if not pipeline.project_manager: 

180 raise ValueError("Project manager not available") 

181 project_context = pipeline.project_manager.get_project_context(project_id) 

182 if ( 

183 not project_context 

184 or not project_context.config 

185 or not project_context.config.sources 

186 ): 

187 raise ValueError(f"Project '{project_id}' not found") 

188 jira_sources = project_context.config.sources.jira or {} 

189 if source_name not in jira_sources: 

190 raise ValueError( 

191 f"Jira source '{source_name}' not found in project '{project_id}'" 

192 ) 

193 return jira_sources[source_name], project_id 

194 

195 if not pipeline.project_manager: 

196 raise ValueError("Project id is required when project manager is unavailable") 

197 

198 matches: list[tuple[Any, str]] = [] 

199 for candidate_project_id in pipeline.project_manager.list_project_ids(): 

200 project_context = pipeline.project_manager.get_project_context( 

201 candidate_project_id 

202 ) 

203 if not project_context or not project_context.config: 

204 continue 

205 jira_sources = project_context.config.sources.jira or {} 

206 if source_name in jira_sources: 

207 matches.append((jira_sources[source_name], candidate_project_id)) 

208 

209 if not matches: 

210 raise ValueError(f"Jira source '{source_name}' not found in any project") 

211 if len(matches) > 1: 

212 raise ValueError( 

213 f"Jira source '{source_name}' is ambiguous across projects; " 

214 "provide project_id explicitly." 

215 ) 

216 return matches[0]