Coverage for src / qdrant_loader / webhooks / server.py: 77%
111 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1from __future__ import annotations
3import asyncio
4import os
5import time
6from contextlib import asynccontextmanager
7from typing import Any
9from fastapi import Depends, FastAPI, HTTPException, Query, Request, status
10from fastapi.responses import JSONResponse
12from qdrant_loader.utils.logging import LoggingConfig
13from qdrant_loader.webhooks.auth import (
14 WEBHOOK_SECRET_ENV_VAR,
15 get_client_ip,
16 verify_cognito_token,
17 verify_ingest_auth,
18 verify_webhook_token,
19)
20from qdrant_loader.webhooks.handlers import (
21 INGEST_SUPPORTED_SOURCE_TYPES,
22 SUPPORTED_SOURCE_TYPES,
23 enqueue_ingest_request,
24 enqueue_webhook_event,
25 normalize_source_type,
26)
27from qdrant_loader.webhooks.queue_backend import QueueBackendManager
28from qdrant_loader.webhooks.worker import run_webhook_worker
30logger = LoggingConfig.get_logger(__name__)
32WEBHOOK_RATE_LIMIT_WINDOW_SECONDS = int(
33 os.getenv("WEBHOOK_RATE_LIMIT_WINDOW_SECONDS", "60")
34)
35WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW = int(
36 os.getenv("WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW", "10")
37)
39# In-memory rate limit state (process-local).
40#
41# IMPORTANT: This is NOT a substitute for infrastructure-level rate limiting.
42#
43# LIMITATION: Each uvicorn worker, container, or ECS task has its own dict instance.
44# Multiple instances → each enforces limits independently → effective limit = N × configured.
45# Example: 2 workers with limit=10 → actual limit ≈ 20 req/min.
46#
47# DESIGN INTENT (WS-6): Primary rate-limiting should live at the ALB/WAF layer:
48# "rate-limit per IP to 1000 req/min". This app-level check is a safety net.
49#
50# SECURITY NOTE: get_client_ip() relies on X-Forwarded-For header when running
51# behind a trusted proxy. In untrusted environments, set WEBHOOK_TRUSTED_PROXY_IPS
52# or similar to validate the header origin.
53_request_timestamps: dict[str, list[float]] = {}
56@asynccontextmanager
57async def _lifespan(app: FastAPI):
58 has_global_secret = bool(os.getenv(WEBHOOK_SECRET_ENV_VAR)) or bool(
59 os.getenv("WEBHOOK_SECRETS")
60 )
61 has_project_secret = any(
62 key.startswith("WEBHOOK_SECRET_") and bool(value)
63 for key, value in os.environ.items()
64 )
65 cognito_enabled = os.getenv("WEBHOOK_ENABLE_COGNITO_JWT", "false").lower() in (
66 "true",
67 "1",
68 "yes",
69 )
70 if not (has_global_secret or has_project_secret or cognito_enabled):
71 raise RuntimeError(
72 "Webhook authentication is not configured. Set WEBHOOK_SECRET, "
73 "WEBHOOK_SECRETS, WEBHOOK_SECRET_<PROJECT_ID>, or enable Cognito JWT."
74 )
76 await QueueBackendManager.initialize()
77 stop_event = asyncio.Event()
78 worker_task = asyncio.create_task(run_webhook_worker(stop_event))
79 app.state.worker_stop_event = stop_event
80 app.state.worker_task = worker_task
82 logger.info(
83 "Webhook server startup",
84 rate_limit_window_seconds=WEBHOOK_RATE_LIMIT_WINDOW_SECONDS,
85 rate_limit_requests=WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW,
86 )
87 try:
88 yield
89 finally:
90 stop_event.set()
91 worker_task.cancel()
92 try:
93 await worker_task
94 except asyncio.CancelledError:
95 pass
96 await QueueBackendManager.shutdown()
97 logger.info("Webhook server shutdown")
100app = FastAPI(
101 title="QDrant Loader Webhook Server",
102 version="1.0.0",
103 description=(
104 "Receives connector webhooks and direct /ingest API requests; "
105 "enqueues durable ingestion jobs."
106 ),
107 lifespan=_lifespan,
108)
111def _cleanup_old_timestamps(client_key: str) -> None:
112 now = time.monotonic()
113 window = WEBHOOK_RATE_LIMIT_WINDOW_SECONDS
114 timestamps = _request_timestamps.get(client_key, [])
115 _request_timestamps[client_key] = [t for t in timestamps if now - t <= window]
118def _enforce_rate_limit(request: Request) -> None:
119 """Check rate limit for this request and reject if exceeded.
121 This is a process-local safety net. For true DDoS protection, rely on ALB/WAF.
122 See _request_timestamps docstring for design details.
123 """
124 client_key = get_client_ip(request)
125 _cleanup_old_timestamps(client_key)
126 timestamps = _request_timestamps.setdefault(client_key, [])
127 if len(timestamps) >= WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW:
128 logger.warning(
129 "Rate limit exceeded for webhook client",
130 client=client_key,
131 request_count=len(timestamps),
132 window_seconds=WEBHOOK_RATE_LIMIT_WINDOW_SECONDS,
133 )
134 raise HTTPException(
135 status_code=status.HTTP_429_TOO_MANY_REQUESTS,
136 detail="Rate limit exceeded. Try again later.",
137 )
138 timestamps.append(time.monotonic())
141async def _parse_json_request(request: Request) -> object:
142 try:
143 return await request.json()
144 except Exception as exc:
145 logger.error("Invalid webhook payload", error=str(exc))
146 raise HTTPException(
147 status_code=status.HTTP_400_BAD_REQUEST,
148 detail="Webhook body must be valid JSON.",
149 ) from exc
152async def _handle_webhook(
153 project_id: str | None,
154 source_type: str,
155 source: str,
156 request: Request,
157 force: bool = False,
158) -> JSONResponse:
159 # Check rate limit BEFORE parsing to avoid wasting CPU on flooded requests
160 _enforce_rate_limit(request)
162 payload = await _parse_json_request(request)
164 try:
165 normalized_source_type = normalize_source_type(source_type)
166 except ValueError as exc:
167 raise HTTPException(
168 status_code=status.HTTP_400_BAD_REQUEST,
169 detail=str(exc),
170 ) from exc
172 try:
173 result = await enqueue_webhook_event(
174 project_id,
175 normalized_source_type,
176 source,
177 payload,
178 force,
179 )
180 except Exception as exc:
181 logger.exception("Failed to enqueue webhook event", error=str(exc))
182 raise HTTPException(
183 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
184 detail="Failed to enqueue webhook event.",
185 ) from exc
187 return JSONResponse(
188 status_code=status.HTTP_202_ACCEPTED,
189 content={
190 "status": "accepted",
191 "project_id": project_id,
192 "source_type": normalized_source_type,
193 "source": source,
194 "force": force,
195 "queued": True,
196 **result,
197 },
198 )
201@app.get("/health")
202async def health_check() -> dict[str, object]:
203 """Simple readiness endpoint (no auth required)."""
204 return {
205 "status": "healthy",
206 "supported_source_types": sorted(SUPPORTED_SOURCE_TYPES),
207 "ingest_source_types": sorted(INGEST_SUPPORTED_SOURCE_TYPES),
208 "rate_limit": {
209 "window_seconds": WEBHOOK_RATE_LIMIT_WINDOW_SECONDS,
210 "max_requests": WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW,
211 },
212 "queue": "sqlite",
213 }
216@app.get("/healthz")
217async def healthz() -> dict[str, object]:
218 """Kubernetes-compliant health check endpoint.
220 Returns 200 OK if the webhook server process is running.
221 No probing of dependencies (see /readyz for that).
222 """
223 return {"status": "ok"}
226@app.get("/readyz")
227async def readyz() -> dict[str, object]:
228 """Kubernetes-compliant readiness check endpoint.
230 Returns 200 OK if the server is ready to accept requests:
231 - Worker process is running
232 - Queue backend is initialized
234 Note: Currently does not probe Qdrant or DB connectivity;
235 those would be probed by ALB/WAF health checks or monitored separately.
236 """
237 worker_task = getattr(app.state, "worker_task", None)
239 if worker_task is None:
240 raise HTTPException(
241 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
242 detail="Worker task not initialized",
243 )
245 if worker_task.done():
246 # If task is done, check if it errored
247 try:
248 worker_task.result()
249 except Exception as err:
250 logger.exception("Worker task failed readiness check", error=str(err))
251 raise HTTPException(
252 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
253 detail="Worker task failed.",
254 ) from err
256 return {"status": "ready"}
259@app.get("/status")
260async def status_route(
261 claims: dict[str, Any] = Depends(verify_cognito_token),
262) -> dict[str, object]:
263 """Authenticated status endpoint for application clients (Cognito when enabled)."""
264 return {
265 "status": "ok",
266 "subject": claims.get("sub"),
267 }
270@app.post("/ingest")
271async def ingest_route(
272 request: Request,
273 project_id: str | None = Query(None),
274 source_type: str | None = Query(None),
275 source: str | None = Query(None),
276 force: bool = False,
277 _auth: None = Depends(verify_ingest_auth),
278) -> JSONResponse:
279 """Trigger ingestion via API (equivalent to `qdrant-loader ingest`).
281 Query parameters mirror the ingest CLI flags. The job is enqueued and
282 processed asynchronously by the background worker.
283 """
284 _enforce_rate_limit(request)
286 try:
287 result = await enqueue_ingest_request(
288 project_id=project_id,
289 source_type=source_type,
290 source=source,
291 force=force,
292 )
293 except ValueError as exc:
294 raise HTTPException(
295 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
296 detail=str(exc),
297 ) from exc
298 except Exception as exc:
299 logger.exception("Failed to enqueue ingest request", error=str(exc))
300 raise HTTPException(
301 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
302 detail="Failed to enqueue ingest request.",
303 ) from exc
305 return JSONResponse(
306 status_code=status.HTTP_202_ACCEPTED,
307 content={
308 "status": "accepted",
309 "project_id": project_id,
310 "source_type": source_type,
311 "source": source,
312 "force": force,
313 "queued": True,
314 **result,
315 },
316 )
319@app.post("/webhooks/projects/{project_id}/{source_type}/{source}")
320async def webhook_project_route(
321 project_id: str,
322 source_type: str,
323 source: str,
324 request: Request,
325 force: bool = False,
326 _auth: None = Depends(verify_webhook_token),
327) -> JSONResponse:
328 """Receive a webhook for a specific project source."""
329 return await _handle_webhook(project_id, source_type, source, request, force)
332@app.post("/webhooks/{source_type}/{source}")
333async def webhook_source_route(
334 source_type: str,
335 source: str,
336 request: Request,
337 force: bool = False,
338 _auth: None = Depends(verify_webhook_token),
339) -> JSONResponse:
340 """Receive a webhook for a source across all configured projects."""
341 return await _handle_webhook(None, source_type, source, request, force)