Coverage for src / qdrant_loader / webhooks / server.py: 77%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import os 

5import time 

6from contextlib import asynccontextmanager 

7from typing import Any 

8 

9from fastapi import Depends, FastAPI, HTTPException, Query, Request, status 

10from fastapi.responses import JSONResponse 

11 

12from qdrant_loader.utils.logging import LoggingConfig 

13from qdrant_loader.webhooks.auth import ( 

14 WEBHOOK_SECRET_ENV_VAR, 

15 get_client_ip, 

16 verify_cognito_token, 

17 verify_ingest_auth, 

18 verify_webhook_token, 

19) 

20from qdrant_loader.webhooks.handlers import ( 

21 INGEST_SUPPORTED_SOURCE_TYPES, 

22 SUPPORTED_SOURCE_TYPES, 

23 enqueue_ingest_request, 

24 enqueue_webhook_event, 

25 normalize_source_type, 

26) 

27from qdrant_loader.webhooks.queue_backend import QueueBackendManager 

28from qdrant_loader.webhooks.worker import run_webhook_worker 

29 

30logger = LoggingConfig.get_logger(__name__) 

31 

32WEBHOOK_RATE_LIMIT_WINDOW_SECONDS = int( 

33 os.getenv("WEBHOOK_RATE_LIMIT_WINDOW_SECONDS", "60") 

34) 

35WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW = int( 

36 os.getenv("WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW", "10") 

37) 

38 

39# In-memory rate limit state (process-local). 

40# 

41# IMPORTANT: This is NOT a substitute for infrastructure-level rate limiting. 

42# 

43# LIMITATION: Each uvicorn worker, container, or ECS task has its own dict instance. 

44# Multiple instances → each enforces limits independently → effective limit = N × configured. 

45# Example: 2 workers with limit=10 → actual limit ≈ 20 req/min. 

46# 

47# DESIGN INTENT (WS-6): Primary rate-limiting should live at the ALB/WAF layer: 

48# "rate-limit per IP to 1000 req/min". This app-level check is a safety net. 

49# 

50# SECURITY NOTE: get_client_ip() relies on X-Forwarded-For header when running 

51# behind a trusted proxy. In untrusted environments, set WEBHOOK_TRUSTED_PROXY_IPS 

52# or similar to validate the header origin. 

53_request_timestamps: dict[str, list[float]] = {} 

54 

55 

56@asynccontextmanager 

57async def _lifespan(app: FastAPI): 

58 has_global_secret = bool(os.getenv(WEBHOOK_SECRET_ENV_VAR)) or bool( 

59 os.getenv("WEBHOOK_SECRETS") 

60 ) 

61 has_project_secret = any( 

62 key.startswith("WEBHOOK_SECRET_") and bool(value) 

63 for key, value in os.environ.items() 

64 ) 

65 cognito_enabled = os.getenv("WEBHOOK_ENABLE_COGNITO_JWT", "false").lower() in ( 

66 "true", 

67 "1", 

68 "yes", 

69 ) 

70 if not (has_global_secret or has_project_secret or cognito_enabled): 

71 raise RuntimeError( 

72 "Webhook authentication is not configured. Set WEBHOOK_SECRET, " 

73 "WEBHOOK_SECRETS, WEBHOOK_SECRET_<PROJECT_ID>, or enable Cognito JWT." 

74 ) 

75 

76 await QueueBackendManager.initialize() 

77 stop_event = asyncio.Event() 

78 worker_task = asyncio.create_task(run_webhook_worker(stop_event)) 

79 app.state.worker_stop_event = stop_event 

80 app.state.worker_task = worker_task 

81 

82 logger.info( 

83 "Webhook server startup", 

84 rate_limit_window_seconds=WEBHOOK_RATE_LIMIT_WINDOW_SECONDS, 

85 rate_limit_requests=WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW, 

86 ) 

87 try: 

88 yield 

89 finally: 

90 stop_event.set() 

91 worker_task.cancel() 

92 try: 

93 await worker_task 

94 except asyncio.CancelledError: 

95 pass 

96 await QueueBackendManager.shutdown() 

97 logger.info("Webhook server shutdown") 

98 

99 

100app = FastAPI( 

101 title="QDrant Loader Webhook Server", 

102 version="1.0.0", 

103 description=( 

104 "Receives connector webhooks and direct /ingest API requests; " 

105 "enqueues durable ingestion jobs." 

106 ), 

107 lifespan=_lifespan, 

108) 

109 

110 

111def _cleanup_old_timestamps(client_key: str) -> None: 

112 now = time.monotonic() 

113 window = WEBHOOK_RATE_LIMIT_WINDOW_SECONDS 

114 timestamps = _request_timestamps.get(client_key, []) 

115 _request_timestamps[client_key] = [t for t in timestamps if now - t <= window] 

116 

117 

118def _enforce_rate_limit(request: Request) -> None: 

119 """Check rate limit for this request and reject if exceeded. 

120 

121 This is a process-local safety net. For true DDoS protection, rely on ALB/WAF. 

122 See _request_timestamps docstring for design details. 

123 """ 

124 client_key = get_client_ip(request) 

125 _cleanup_old_timestamps(client_key) 

126 timestamps = _request_timestamps.setdefault(client_key, []) 

127 if len(timestamps) >= WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW: 

128 logger.warning( 

129 "Rate limit exceeded for webhook client", 

130 client=client_key, 

131 request_count=len(timestamps), 

132 window_seconds=WEBHOOK_RATE_LIMIT_WINDOW_SECONDS, 

133 ) 

134 raise HTTPException( 

135 status_code=status.HTTP_429_TOO_MANY_REQUESTS, 

136 detail="Rate limit exceeded. Try again later.", 

137 ) 

138 timestamps.append(time.monotonic()) 

139 

140 

141async def _parse_json_request(request: Request) -> object: 

142 try: 

143 return await request.json() 

144 except Exception as exc: 

145 logger.error("Invalid webhook payload", error=str(exc)) 

146 raise HTTPException( 

147 status_code=status.HTTP_400_BAD_REQUEST, 

148 detail="Webhook body must be valid JSON.", 

149 ) from exc 

150 

151 

152async def _handle_webhook( 

153 project_id: str | None, 

154 source_type: str, 

155 source: str, 

156 request: Request, 

157 force: bool = False, 

158) -> JSONResponse: 

159 # Check rate limit BEFORE parsing to avoid wasting CPU on flooded requests 

160 _enforce_rate_limit(request) 

161 

162 payload = await _parse_json_request(request) 

163 

164 try: 

165 normalized_source_type = normalize_source_type(source_type) 

166 except ValueError as exc: 

167 raise HTTPException( 

168 status_code=status.HTTP_400_BAD_REQUEST, 

169 detail=str(exc), 

170 ) from exc 

171 

172 try: 

173 result = await enqueue_webhook_event( 

174 project_id, 

175 normalized_source_type, 

176 source, 

177 payload, 

178 force, 

179 ) 

180 except Exception as exc: 

181 logger.exception("Failed to enqueue webhook event", error=str(exc)) 

182 raise HTTPException( 

183 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

184 detail="Failed to enqueue webhook event.", 

185 ) from exc 

186 

187 return JSONResponse( 

188 status_code=status.HTTP_202_ACCEPTED, 

189 content={ 

190 "status": "accepted", 

191 "project_id": project_id, 

192 "source_type": normalized_source_type, 

193 "source": source, 

194 "force": force, 

195 "queued": True, 

196 **result, 

197 }, 

198 ) 

199 

200 

201@app.get("/health") 

202async def health_check() -> dict[str, object]: 

203 """Simple readiness endpoint (no auth required).""" 

204 return { 

205 "status": "healthy", 

206 "supported_source_types": sorted(SUPPORTED_SOURCE_TYPES), 

207 "ingest_source_types": sorted(INGEST_SUPPORTED_SOURCE_TYPES), 

208 "rate_limit": { 

209 "window_seconds": WEBHOOK_RATE_LIMIT_WINDOW_SECONDS, 

210 "max_requests": WEBHOOK_RATE_LIMIT_REQUESTS_PER_WINDOW, 

211 }, 

212 "queue": "sqlite", 

213 } 

214 

215 

216@app.get("/healthz") 

217async def healthz() -> dict[str, object]: 

218 """Kubernetes-compliant health check endpoint. 

219 

220 Returns 200 OK if the webhook server process is running. 

221 No probing of dependencies (see /readyz for that). 

222 """ 

223 return {"status": "ok"} 

224 

225 

226@app.get("/readyz") 

227async def readyz() -> dict[str, object]: 

228 """Kubernetes-compliant readiness check endpoint. 

229 

230 Returns 200 OK if the server is ready to accept requests: 

231 - Worker process is running 

232 - Queue backend is initialized 

233 

234 Note: Currently does not probe Qdrant or DB connectivity; 

235 those would be probed by ALB/WAF health checks or monitored separately. 

236 """ 

237 worker_task = getattr(app.state, "worker_task", None) 

238 

239 if worker_task is None: 

240 raise HTTPException( 

241 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

242 detail="Worker task not initialized", 

243 ) 

244 

245 if worker_task.done(): 

246 # If task is done, check if it errored 

247 try: 

248 worker_task.result() 

249 except Exception as err: 

250 logger.exception("Worker task failed readiness check", error=str(err)) 

251 raise HTTPException( 

252 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

253 detail="Worker task failed.", 

254 ) from err 

255 

256 return {"status": "ready"} 

257 

258 

259@app.get("/status") 

260async def status_route( 

261 claims: dict[str, Any] = Depends(verify_cognito_token), 

262) -> dict[str, object]: 

263 """Authenticated status endpoint for application clients (Cognito when enabled).""" 

264 return { 

265 "status": "ok", 

266 "subject": claims.get("sub"), 

267 } 

268 

269 

270@app.post("/ingest") 

271async def ingest_route( 

272 request: Request, 

273 project_id: str | None = Query(None), 

274 source_type: str | None = Query(None), 

275 source: str | None = Query(None), 

276 force: bool = False, 

277 _auth: None = Depends(verify_ingest_auth), 

278) -> JSONResponse: 

279 """Trigger ingestion via API (equivalent to `qdrant-loader ingest`). 

280 

281 Query parameters mirror the ingest CLI flags. The job is enqueued and 

282 processed asynchronously by the background worker. 

283 """ 

284 _enforce_rate_limit(request) 

285 

286 try: 

287 result = await enqueue_ingest_request( 

288 project_id=project_id, 

289 source_type=source_type, 

290 source=source, 

291 force=force, 

292 ) 

293 except ValueError as exc: 

294 raise HTTPException( 

295 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

296 detail=str(exc), 

297 ) from exc 

298 except Exception as exc: 

299 logger.exception("Failed to enqueue ingest request", error=str(exc)) 

300 raise HTTPException( 

301 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

302 detail="Failed to enqueue ingest request.", 

303 ) from exc 

304 

305 return JSONResponse( 

306 status_code=status.HTTP_202_ACCEPTED, 

307 content={ 

308 "status": "accepted", 

309 "project_id": project_id, 

310 "source_type": source_type, 

311 "source": source, 

312 "force": force, 

313 "queued": True, 

314 **result, 

315 }, 

316 ) 

317 

318 

319@app.post("/webhooks/projects/{project_id}/{source_type}/{source}") 

320async def webhook_project_route( 

321 project_id: str, 

322 source_type: str, 

323 source: str, 

324 request: Request, 

325 force: bool = False, 

326 _auth: None = Depends(verify_webhook_token), 

327) -> JSONResponse: 

328 """Receive a webhook for a specific project source.""" 

329 return await _handle_webhook(project_id, source_type, source, request, force) 

330 

331 

332@app.post("/webhooks/{source_type}/{source}") 

333async def webhook_source_route( 

334 source_type: str, 

335 source: str, 

336 request: Request, 

337 force: bool = False, 

338 _auth: None = Depends(verify_webhook_token), 

339) -> JSONResponse: 

340 """Receive a webhook for a source across all configured projects.""" 

341 return await _handle_webhook(None, source_type, source, request, force)