Coverage for src / qdrant_loader / webhooks / auth.py: 62%
114 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Webhook authentication and authorization (WS-6)."""
3from __future__ import annotations
5import json
6import os
7from functools import lru_cache
8from typing import Any
10from fastapi import Header, HTTPException, Query, Request, status
12from qdrant_loader.utils.logging import LoggingConfig
14logger = LoggingConfig.get_logger(__name__)
16WEBHOOK_SECRET_ENV_VAR = "WEBHOOK_SECRET"
17WEBHOOK_QUERY_PARAM = "token"
19WEBHOOK_USE_SECRETS_MANAGER = os.getenv(
20 "WEBHOOK_USE_SECRETS_MANAGER", "false"
21).lower() in ("true", "1", "yes")
23WEBHOOK_ENABLE_COGNITO_JWT = os.getenv(
24 "WEBHOOK_ENABLE_COGNITO_JWT", "false"
25).lower() in ("true", "1", "yes")
27WEBHOOK_TRUSTED_PROXY = os.getenv("WEBHOOK_TRUSTED_PROXY", None)
29COGNITO_REGION = os.getenv("COGNITO_REGION", "")
30COGNITO_USER_POOL_ID = os.getenv("COGNITO_USER_POOL_ID", "")
31COGNITO_APP_CLIENT_ID = os.getenv("COGNITO_APP_CLIENT_ID", "")
34@lru_cache(maxsize=128)
35def _get_webhook_secret_from_env() -> str:
36 return os.getenv(WEBHOOK_SECRET_ENV_VAR, "")
39def _load_project_secrets() -> dict[str, str]:
40 raw = os.getenv("WEBHOOK_SECRETS", "")
41 if not raw:
42 return {}
43 try:
44 parsed = json.loads(raw)
45 if isinstance(parsed, dict):
46 return {str(k): str(v) for k, v in parsed.items()}
47 except json.JSONDecodeError:
48 logger.warning("WEBHOOK_SECRETS is not valid JSON; ignoring")
49 return {}
52async def get_webhook_secret(
53 project_id: str | None = None,
54 workspace_id: str | None = None,
55) -> str:
56 """Resolve webhook secret for a workspace/project."""
57 if WEBHOOK_USE_SECRETS_MANAGER:
58 logger.warning(
59 "Secrets Manager requested but not yet implemented (WS-6)",
60 feature_flag="WEBHOOK_USE_SECRETS_MANAGER",
61 )
63 if project_id:
64 project_secrets = _load_project_secrets()
65 if project_id in project_secrets:
66 return project_secrets[project_id]
67 env_key = f"WEBHOOK_SECRET_{project_id.upper().replace('-', '_')}"
68 project_env_secret = os.getenv(env_key)
69 if project_env_secret:
70 return project_env_secret
72 _ = workspace_id
73 return _get_webhook_secret_from_env()
76def get_client_ip(request: Request) -> str:
77 """Extract client IP, honoring X-Forwarded-For only from a trusted proxy."""
78 forwarded_for = request.headers.get("X-Forwarded-For")
79 if forwarded_for and WEBHOOK_TRUSTED_PROXY:
80 if request.client and request.client.host == WEBHOOK_TRUSTED_PROXY:
81 return forwarded_for.split(",", 1)[0].strip()
82 logger.warning(
83 "X-Forwarded-For header from untrusted source; ignoring",
84 client_host=request.client.host if request.client else None,
85 trusted_proxy=WEBHOOK_TRUSTED_PROXY,
86 )
87 elif forwarded_for and not WEBHOOK_TRUSTED_PROXY:
88 logger.debug(
89 "X-Forwarded-For present but WEBHOOK_TRUSTED_PROXY not set; using request.client",
90 )
92 if request.client:
93 return request.client.host
94 return "unknown"
97def _extract_bearer_token(authorization: str | None) -> str | None:
98 if not authorization:
99 return None
100 auth = authorization.strip()
101 if auth.lower().startswith("bearer "):
102 return auth.split(None, 1)[1].strip()
103 return auth
106def _looks_like_jwt(token: str) -> bool:
107 return token.count(".") == 2
110class CognitoJWTValidator:
111 """Validate Cognito JWT tokens for application routes (WS-6)."""
113 @staticmethod
114 def _issuer() -> str:
115 if not COGNITO_REGION or not COGNITO_USER_POOL_ID:
116 raise HTTPException(
117 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
118 detail="Cognito is not configured.",
119 )
120 return (
121 f"https://cognito-idp.{COGNITO_REGION}.amazonaws.com/"
122 f"{COGNITO_USER_POOL_ID}"
123 )
125 @classmethod
126 async def validate_token(cls, token: str) -> dict[str, Any]:
127 if not WEBHOOK_ENABLE_COGNITO_JWT:
128 return {"sub": "local-dev"}
130 try:
131 import jwt
132 from jwt import PyJWKClient
133 except ImportError as exc:
134 raise HTTPException(
135 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
136 detail="PyJWT is required for Cognito validation. "
137 "Install qdrant-loader[server].",
138 ) from exc
140 issuer = cls._issuer()
141 jwks_url = f"{issuer}/.well-known/jwks.json"
143 try:
144 jwk_client = PyJWKClient(jwks_url)
145 signing_key = jwk_client.get_signing_key_from_jwt(token)
146 decode_kwargs: dict[str, Any] = {
147 "algorithms": ["RS256"],
148 "issuer": issuer,
149 "options": {"verify_aud": bool(COGNITO_APP_CLIENT_ID)},
150 }
151 if COGNITO_APP_CLIENT_ID:
152 decode_kwargs["audience"] = COGNITO_APP_CLIENT_ID
153 return jwt.decode(token, signing_key.key, **decode_kwargs)
154 except Exception as exc:
155 logger.warning("Cognito JWT validation failed", error=str(exc))
156 raise HTTPException(
157 status_code=status.HTTP_401_UNAUTHORIZED,
158 detail="Invalid or expired Cognito token.",
159 ) from exc
161 @staticmethod
162 def extract_workspace_id(claims: dict[str, Any]) -> str | None:
163 return claims.get("custom:workspace_id") or claims.get("workspace")
166async def verify_webhook_token(
167 project_id: str | None = None,
168 webhook_token: str | None = Query(None, alias=WEBHOOK_QUERY_PARAM),
169 authorization: str | None = Header(None, convert_underscores=False),
170) -> None:
171 """Verify webhook access for Jira-compatible endpoints.
173 Jira Cloud only supports shared-secret query tokens, so webhook routes accept
174 the project-scoped WEBHOOK_SECRET via Bearer header or ?token= query param.
175 Cognito JWT is validated when enabled and the bearer token is a JWT.
176 """
177 secret = await get_webhook_secret(project_id=project_id)
178 token_value = _extract_bearer_token(authorization) or webhook_token
180 if WEBHOOK_ENABLE_COGNITO_JWT and token_value and _looks_like_jwt(token_value):
181 await CognitoJWTValidator.validate_token(token_value)
182 return
184 if not secret:
185 logger.error(
186 "Webhook secret is not configured",
187 env_var=WEBHOOK_SECRET_ENV_VAR,
188 project_id=project_id,
189 )
190 raise HTTPException(
191 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
192 detail="Webhook authentication is not configured.",
193 )
195 if webhook_token and not authorization:
196 logger.warning(
197 "Using webhook token via URL query param is insecure; prefer Authorization: Bearer header",
198 param=WEBHOOK_QUERY_PARAM,
199 )
201 if not token_value or token_value != secret:
202 logger.warning(
203 "Unauthorized webhook request",
204 project_id=project_id,
205 received=bool(token_value),
206 )
207 raise HTTPException(
208 status_code=status.HTTP_401_UNAUTHORIZED,
209 detail="Invalid or missing webhook token.",
210 )
213async def verify_cognito_token(
214 authorization: str | None = Header(None, convert_underscores=False),
215) -> dict[str, Any]:
216 """Dependency for non-webhook routes that require Cognito JWT (WS-6)."""
217 token_value = _extract_bearer_token(authorization)
218 if not token_value:
219 raise HTTPException(
220 status_code=status.HTTP_401_UNAUTHORIZED,
221 detail="Authorization header required.",
222 )
223 return await CognitoJWTValidator.validate_token(token_value)
226async def verify_ingest_auth(
227 project_id: str | None = Query(None),
228 webhook_token: str | None = Query(None, alias=WEBHOOK_QUERY_PARAM),
229 authorization: str | None = Header(None, convert_underscores=False),
230) -> None:
231 """Authenticate POST /ingest (API clients).
233 Prefers Authorization: Bearer with Cognito JWT when enabled, otherwise the
234 same project-scoped webhook secret used for connector webhooks.
235 """
236 await verify_webhook_token(
237 project_id=project_id,
238 webhook_token=webhook_token,
239 authorization=authorization,
240 )