Coverage for src / qdrant_loader / webhooks / handlers.py: 87%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1from __future__ import annotations
3import json
4from typing import Any
6from qdrant_loader.utils.logging import LoggingConfig
7from qdrant_loader.webhooks.queue_backend import (
8 FULL_SCAN,
9 ChangeEvent,
10 QueueBackendManager,
11)
12from qdrant_loader.webhooks.single_event_handler import parse_webhook_event
14logger = LoggingConfig.get_logger(__name__)
16# Single-event webhook support is Jira-only in v1.1.
17SUPPORTED_SOURCE_TYPES = {"jira"}
19# Direct /ingest API supports all configured connector types.
20INGEST_SUPPORTED_SOURCE_TYPES = {
21 "jira",
22 "confluence",
23 "git",
24 "publicdocs",
25 "localfile",
26}
29def normalize_source_type(source_type: str) -> str:
30 """Normalize and validate the source type from Jira webhook routes."""
31 normalized_source_type = source_type.strip().lower()
32 if normalized_source_type not in SUPPORTED_SOURCE_TYPES:
33 raise ValueError(
34 f"Unsupported source type '{source_type}'. "
35 f"Allowed values are: {', '.join(sorted(SUPPORTED_SOURCE_TYPES))}."
36 )
37 return normalized_source_type
40def normalize_ingest_source_type(source_type: str) -> str:
41 """Normalize and validate the source type for direct /ingest requests."""
42 normalized_source_type = source_type.strip().lower()
43 if normalized_source_type not in INGEST_SUPPORTED_SOURCE_TYPES:
44 raise ValueError(
45 f"Unsupported source type '{source_type}'. "
46 f"Allowed values are: {', '.join(sorted(INGEST_SUPPORTED_SOURCE_TYPES))}."
47 )
48 return normalized_source_type
51def summarize_payload(payload: Any) -> str:
52 """Create a short summary of webhook payload data for logging."""
53 try:
54 summary = json.dumps(payload, default=str, ensure_ascii=False)
55 except Exception:
56 return "<non-serializable payload>"
57 if len(summary) > 1000:
58 return summary[:1000] + "..."
59 return summary
62async def enqueue_webhook_event(
63 project_id: str | None,
64 source_type: str,
65 source: str,
66 payload: Any,
67 force: bool = False,
68) -> dict[str, Any]:
69 """Parse webhook payload and enqueue a durable change event.
71 Jira webhooks use SINGLE_UPSERT/SINGLE_DELETE when the payload is parseable.
72 Falls back to FULL_SCAN when force=True or parsing fails.
73 """
74 normalized_source_type = normalize_source_type(source_type)
75 queue = QueueBackendManager.get_backend()
77 payload_summary = {
78 "type": type(payload).__name__,
79 "keys": sorted(payload.keys())[:20] if isinstance(payload, dict) else None,
80 }
81 logger.info(
82 "Received webhook event",
83 project_id=project_id,
84 source_type=normalized_source_type,
85 source=source,
86 force=force,
87 payload_meta=payload_summary,
88 )
90 if force:
91 event = ChangeEvent(
92 source=source,
93 source_type=normalized_source_type,
94 project_id=project_id,
95 operation=FULL_SCAN,
96 payload=payload,
97 force=True,
98 )
99 message_id = await queue.enqueue(event)
100 return {
101 "operation": FULL_SCAN,
102 "message_id": message_id,
103 "queued": True,
104 }
106 change_event = await parse_webhook_event(normalized_source_type, source, payload)
107 if change_event is not None:
108 change_event.project_id = project_id
109 message_id = await queue.enqueue(change_event)
110 logger.info(
111 "Enqueued single-event webhook",
112 operation=change_event.operation,
113 entity_id=change_event.entity_id,
114 message_id=message_id,
115 )
116 return {
117 "operation": change_event.operation,
118 "entity_id": change_event.entity_id,
119 "message_id": message_id,
120 "queued": True,
121 }
123 logger.info(
124 "Could not parse single-event; enqueueing full scan",
125 source_type=normalized_source_type,
126 source=source,
127 )
128 event = ChangeEvent(
129 source=source,
130 source_type=normalized_source_type,
131 project_id=project_id,
132 operation=FULL_SCAN,
133 payload=payload,
134 force=False,
135 )
136 message_id = await queue.enqueue(event)
137 return {
138 "operation": FULL_SCAN,
139 "message_id": message_id,
140 "queued": True,
141 }
144async def enqueue_ingest_request(
145 project_id: str | None,
146 source_type: str | None,
147 source: str | None,
148 force: bool = False,
149) -> dict[str, Any]:
150 """Enqueue a direct ingestion job (replaces CLI `qdrant-loader ingest`).
152 Always uses FULL_SCAN; the worker runs the same pipeline as the ingest command.
153 """
154 if source is not None and source_type is None:
155 raise ValueError("source_type must be provided when source is specified.")
157 normalized_source_type = (
158 normalize_ingest_source_type(source_type) if source_type else None
159 )
160 queue = QueueBackendManager.get_backend()
162 logger.info(
163 "Received direct ingest request",
164 project_id=project_id,
165 source_type=normalized_source_type,
166 source=source,
167 force=force,
168 )
170 event = ChangeEvent(
171 source=source or "",
172 source_type=normalized_source_type,
173 project_id=project_id,
174 operation=FULL_SCAN,
175 force=force,
176 )
177 message_id = await queue.enqueue(event)
178 return {
179 "operation": FULL_SCAN,
180 "message_id": message_id,
181 "queued": True,
182 }