Coverage for src / qdrant_loader / webhooks / event_processor.py: 15%
97 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Process queued webhook change events."""
3from __future__ import annotations
5from typing import Any
7from qdrant_loader.config import get_settings
8from qdrant_loader.config.types import SourceType
9from qdrant_loader.connectors.factory import get_connector_instance
10from qdrant_loader.core.document import Document
11from qdrant_loader.core.qdrant_manager import QdrantManager
12from qdrant_loader.utils.logging import LoggingConfig
13from qdrant_loader.webhooks.queue_backend import (
14 FULL_SCAN,
15 SINGLE_DELETE,
16 SINGLE_UPSERT,
17 ChangeEvent,
18)
20logger = LoggingConfig.get_logger(__name__)
23async def process_change_event(event: ChangeEvent) -> None:
24 """Execute a queued webhook change event."""
25 settings = get_settings()
26 qdrant_manager = QdrantManager(settings)
28 from qdrant_loader.core.async_ingestion_pipeline import AsyncIngestionPipeline
30 pipeline = AsyncIngestionPipeline(settings, qdrant_manager)
31 try:
32 if event.operation == FULL_SCAN:
33 await pipeline.process_documents(
34 project_id=event.project_id,
35 source_type=event.source_type or None,
36 source=event.source or None,
37 force=event.force,
38 )
39 elif event.operation == SINGLE_UPSERT:
40 await _process_single_upsert(pipeline, event)
41 elif event.operation == SINGLE_DELETE:
42 await _process_single_delete(pipeline, event)
43 else:
44 raise ValueError(f"Unsupported webhook operation: {event.operation}")
45 finally:
46 await pipeline.cleanup()
49async def _process_single_upsert(
50 pipeline: Any,
51 event: ChangeEvent,
52) -> None:
53 if not event.entity_id:
54 raise ValueError("SINGLE_UPSERT requires entity_id")
56 await pipeline.initialize()
57 source_config, project_id = _resolve_source_config(
58 pipeline, event.project_id, event.source_type, event.source
59 )
60 connector = get_connector_instance(source_config)
61 file_conversion_config = pipeline.settings.global_config.file_conversion
62 if (
63 file_conversion_config
64 and hasattr(connector, "set_file_conversion_config")
65 and getattr(source_config, "enable_file_conversion", False)
66 ):
67 connector.set_file_conversion_config(file_conversion_config)
69 async with connector:
70 document = await connector.fetch_by_id(event.entity_id)
72 if document is None:
73 logger.warning(
74 "Entity not found for SINGLE_UPSERT",
75 entity_id=event.entity_id,
76 source=event.source,
77 )
78 return
80 if project_id and pipeline.project_manager:
81 document.metadata = pipeline.project_manager.inject_project_metadata(
82 project_id, document.metadata
83 )
85 result = await pipeline.orchestrator.components.document_pipeline.process_documents(
86 [document]
87 )
88 await pipeline.orchestrator._update_document_states(
89 [document],
90 result.successfully_processed_documents,
91 project_id,
92 )
93 logger.info(
94 "SINGLE_UPSERT completed",
95 entity_id=event.entity_id,
96 source=event.source,
97 chunks=result.success_count,
98 )
101async def _process_single_delete(
102 pipeline: Any,
103 event: ChangeEvent,
104) -> None:
105 document = _document_from_delete_payload(event)
106 if document is None:
107 logger.warning(
108 "Could not build delete document from webhook payload",
109 entity_id=event.entity_id,
110 source=event.source,
111 )
112 return
114 project_id = event.project_id
115 if project_id and pipeline.project_manager:
116 document.metadata = pipeline.project_manager.inject_project_metadata(
117 project_id, document.metadata
118 )
120 await pipeline.initialize()
121 await pipeline.orchestrator._process_deleted_documents(
122 [document],
123 project_id,
124 )
125 logger.info(
126 "SINGLE_DELETE completed",
127 entity_id=event.entity_id,
128 document_id=document.id,
129 source=event.source,
130 )
133def _document_from_delete_payload(event: ChangeEvent) -> Document | None:
134 """Build a minimal Document for deletion from Jira webhook payload."""
135 payload = event.payload if isinstance(event.payload, dict) else {}
136 issue = payload.get("issue", {})
137 issue_id = issue.get("id") or event.entity_id
138 issue_key = issue.get("key") or event.entity_id
139 if not issue_id:
140 return None
142 base_url = ""
143 fields = issue.get("fields") or {}
144 self_url = issue.get("self", "")
145 if self_url and "/rest/api/" in self_url:
146 base_url = self_url.split("/rest/api/")[0]
148 url = f"{base_url}/browse/{issue_key}" if base_url and issue_key else ""
149 updated = fields.get("updated", "")
151 return Document(
152 id=str(issue_id),
153 content="",
154 content_type="text",
155 source=event.source,
156 source_type=SourceType.JIRA,
157 url=url,
158 title=fields.get("summary", issue_key or "Deleted Issue"),
159 is_deleted=True,
160 metadata={
161 "key": issue_key,
162 "updated_at": updated,
163 "uri": f"{SourceType.JIRA}:{event.source}:{url}",
164 },
165 )
168def _resolve_source_config(
169 pipeline: Any,
170 project_id: str | None,
171 source_type: str,
172 source_name: str,
173) -> tuple[Any, str | None]:
174 """Resolve connector config and project id for a named source."""
175 if source_type != "jira":
176 raise ValueError(f"Single-event processing is not supported for {source_type}")
178 if project_id:
179 if not pipeline.project_manager:
180 raise ValueError("Project manager not available")
181 project_context = pipeline.project_manager.get_project_context(project_id)
182 if (
183 not project_context
184 or not project_context.config
185 or not project_context.config.sources
186 ):
187 raise ValueError(f"Project '{project_id}' not found")
188 jira_sources = project_context.config.sources.jira or {}
189 if source_name not in jira_sources:
190 raise ValueError(
191 f"Jira source '{source_name}' not found in project '{project_id}'"
192 )
193 return jira_sources[source_name], project_id
195 if not pipeline.project_manager:
196 raise ValueError("Project id is required when project manager is unavailable")
198 matches: list[tuple[Any, str]] = []
199 for candidate_project_id in pipeline.project_manager.list_project_ids():
200 project_context = pipeline.project_manager.get_project_context(
201 candidate_project_id
202 )
203 if not project_context or not project_context.config:
204 continue
205 jira_sources = project_context.config.sources.jira or {}
206 if source_name in jira_sources:
207 matches.append((jira_sources[source_name], candidate_project_id))
209 if not matches:
210 raise ValueError(f"Jira source '{source_name}' not found in any project")
211 if len(matches) > 1:
212 raise ValueError(
213 f"Jira source '{source_name}' is ambiguous across projects; "
214 "provide project_id explicitly."
215 )
216 return matches[0]