Coverage for src / qdrant_loader / webhooks / auth.py: 62%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Webhook authentication and authorization (WS-6).""" 

2 

3from __future__ import annotations 

4 

5import json 

6import os 

7from functools import lru_cache 

8from typing import Any 

9 

10from fastapi import Header, HTTPException, Query, Request, status 

11 

12from qdrant_loader.utils.logging import LoggingConfig 

13 

14logger = LoggingConfig.get_logger(__name__) 

15 

16WEBHOOK_SECRET_ENV_VAR = "WEBHOOK_SECRET" 

17WEBHOOK_QUERY_PARAM = "token" 

18 

19WEBHOOK_USE_SECRETS_MANAGER = os.getenv( 

20 "WEBHOOK_USE_SECRETS_MANAGER", "false" 

21).lower() in ("true", "1", "yes") 

22 

23WEBHOOK_ENABLE_COGNITO_JWT = os.getenv( 

24 "WEBHOOK_ENABLE_COGNITO_JWT", "false" 

25).lower() in ("true", "1", "yes") 

26 

27WEBHOOK_TRUSTED_PROXY = os.getenv("WEBHOOK_TRUSTED_PROXY", None) 

28 

29COGNITO_REGION = os.getenv("COGNITO_REGION", "") 

30COGNITO_USER_POOL_ID = os.getenv("COGNITO_USER_POOL_ID", "") 

31COGNITO_APP_CLIENT_ID = os.getenv("COGNITO_APP_CLIENT_ID", "") 

32 

33 

34@lru_cache(maxsize=128) 

35def _get_webhook_secret_from_env() -> str: 

36 return os.getenv(WEBHOOK_SECRET_ENV_VAR, "") 

37 

38 

39def _load_project_secrets() -> dict[str, str]: 

40 raw = os.getenv("WEBHOOK_SECRETS", "") 

41 if not raw: 

42 return {} 

43 try: 

44 parsed = json.loads(raw) 

45 if isinstance(parsed, dict): 

46 return {str(k): str(v) for k, v in parsed.items()} 

47 except json.JSONDecodeError: 

48 logger.warning("WEBHOOK_SECRETS is not valid JSON; ignoring") 

49 return {} 

50 

51 

52async def get_webhook_secret( 

53 project_id: str | None = None, 

54 workspace_id: str | None = None, 

55) -> str: 

56 """Resolve webhook secret for a workspace/project.""" 

57 if WEBHOOK_USE_SECRETS_MANAGER: 

58 logger.warning( 

59 "Secrets Manager requested but not yet implemented (WS-6)", 

60 feature_flag="WEBHOOK_USE_SECRETS_MANAGER", 

61 ) 

62 

63 if project_id: 

64 project_secrets = _load_project_secrets() 

65 if project_id in project_secrets: 

66 return project_secrets[project_id] 

67 env_key = f"WEBHOOK_SECRET_{project_id.upper().replace('-', '_')}" 

68 project_env_secret = os.getenv(env_key) 

69 if project_env_secret: 

70 return project_env_secret 

71 

72 _ = workspace_id 

73 return _get_webhook_secret_from_env() 

74 

75 

76def get_client_ip(request: Request) -> str: 

77 """Extract client IP, honoring X-Forwarded-For only from a trusted proxy.""" 

78 forwarded_for = request.headers.get("X-Forwarded-For") 

79 if forwarded_for and WEBHOOK_TRUSTED_PROXY: 

80 if request.client and request.client.host == WEBHOOK_TRUSTED_PROXY: 

81 return forwarded_for.split(",", 1)[0].strip() 

82 logger.warning( 

83 "X-Forwarded-For header from untrusted source; ignoring", 

84 client_host=request.client.host if request.client else None, 

85 trusted_proxy=WEBHOOK_TRUSTED_PROXY, 

86 ) 

87 elif forwarded_for and not WEBHOOK_TRUSTED_PROXY: 

88 logger.debug( 

89 "X-Forwarded-For present but WEBHOOK_TRUSTED_PROXY not set; using request.client", 

90 ) 

91 

92 if request.client: 

93 return request.client.host 

94 return "unknown" 

95 

96 

97def _extract_bearer_token(authorization: str | None) -> str | None: 

98 if not authorization: 

99 return None 

100 auth = authorization.strip() 

101 if auth.lower().startswith("bearer "): 

102 return auth.split(None, 1)[1].strip() 

103 return auth 

104 

105 

106def _looks_like_jwt(token: str) -> bool: 

107 return token.count(".") == 2 

108 

109 

110class CognitoJWTValidator: 

111 """Validate Cognito JWT tokens for application routes (WS-6).""" 

112 

113 @staticmethod 

114 def _issuer() -> str: 

115 if not COGNITO_REGION or not COGNITO_USER_POOL_ID: 

116 raise HTTPException( 

117 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

118 detail="Cognito is not configured.", 

119 ) 

120 return ( 

121 f"https://cognito-idp.{COGNITO_REGION}.amazonaws.com/" 

122 f"{COGNITO_USER_POOL_ID}" 

123 ) 

124 

125 @classmethod 

126 async def validate_token(cls, token: str) -> dict[str, Any]: 

127 if not WEBHOOK_ENABLE_COGNITO_JWT: 

128 return {"sub": "local-dev"} 

129 

130 try: 

131 import jwt 

132 from jwt import PyJWKClient 

133 except ImportError as exc: 

134 raise HTTPException( 

135 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

136 detail="PyJWT is required for Cognito validation. " 

137 "Install qdrant-loader[server].", 

138 ) from exc 

139 

140 issuer = cls._issuer() 

141 jwks_url = f"{issuer}/.well-known/jwks.json" 

142 

143 try: 

144 jwk_client = PyJWKClient(jwks_url) 

145 signing_key = jwk_client.get_signing_key_from_jwt(token) 

146 decode_kwargs: dict[str, Any] = { 

147 "algorithms": ["RS256"], 

148 "issuer": issuer, 

149 "options": {"verify_aud": bool(COGNITO_APP_CLIENT_ID)}, 

150 } 

151 if COGNITO_APP_CLIENT_ID: 

152 decode_kwargs["audience"] = COGNITO_APP_CLIENT_ID 

153 return jwt.decode(token, signing_key.key, **decode_kwargs) 

154 except Exception as exc: 

155 logger.warning("Cognito JWT validation failed", error=str(exc)) 

156 raise HTTPException( 

157 status_code=status.HTTP_401_UNAUTHORIZED, 

158 detail="Invalid or expired Cognito token.", 

159 ) from exc 

160 

161 @staticmethod 

162 def extract_workspace_id(claims: dict[str, Any]) -> str | None: 

163 return claims.get("custom:workspace_id") or claims.get("workspace") 

164 

165 

166async def verify_webhook_token( 

167 project_id: str | None = None, 

168 webhook_token: str | None = Query(None, alias=WEBHOOK_QUERY_PARAM), 

169 authorization: str | None = Header(None, convert_underscores=False), 

170) -> None: 

171 """Verify webhook access for Jira-compatible endpoints. 

172 

173 Jira Cloud only supports shared-secret query tokens, so webhook routes accept 

174 the project-scoped WEBHOOK_SECRET via Bearer header or ?token= query param. 

175 Cognito JWT is validated when enabled and the bearer token is a JWT. 

176 """ 

177 secret = await get_webhook_secret(project_id=project_id) 

178 token_value = _extract_bearer_token(authorization) or webhook_token 

179 

180 if WEBHOOK_ENABLE_COGNITO_JWT and token_value and _looks_like_jwt(token_value): 

181 await CognitoJWTValidator.validate_token(token_value) 

182 return 

183 

184 if not secret: 

185 logger.error( 

186 "Webhook secret is not configured", 

187 env_var=WEBHOOK_SECRET_ENV_VAR, 

188 project_id=project_id, 

189 ) 

190 raise HTTPException( 

191 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

192 detail="Webhook authentication is not configured.", 

193 ) 

194 

195 if webhook_token and not authorization: 

196 logger.warning( 

197 "Using webhook token via URL query param is insecure; prefer Authorization: Bearer header", 

198 param=WEBHOOK_QUERY_PARAM, 

199 ) 

200 

201 if not token_value or token_value != secret: 

202 logger.warning( 

203 "Unauthorized webhook request", 

204 project_id=project_id, 

205 received=bool(token_value), 

206 ) 

207 raise HTTPException( 

208 status_code=status.HTTP_401_UNAUTHORIZED, 

209 detail="Invalid or missing webhook token.", 

210 ) 

211 

212 

213async def verify_cognito_token( 

214 authorization: str | None = Header(None, convert_underscores=False), 

215) -> dict[str, Any]: 

216 """Dependency for non-webhook routes that require Cognito JWT (WS-6).""" 

217 token_value = _extract_bearer_token(authorization) 

218 if not token_value: 

219 raise HTTPException( 

220 status_code=status.HTTP_401_UNAUTHORIZED, 

221 detail="Authorization header required.", 

222 ) 

223 return await CognitoJWTValidator.validate_token(token_value) 

224 

225 

226async def verify_ingest_auth( 

227 project_id: str | None = Query(None), 

228 webhook_token: str | None = Query(None, alias=WEBHOOK_QUERY_PARAM), 

229 authorization: str | None = Header(None, convert_underscores=False), 

230) -> None: 

231 """Authenticate POST /ingest (API clients). 

232 

233 Prefers Authorization: Bearer with Cognito JWT when enabled, otherwise the 

234 same project-scoped webhook secret used for connector webhooks. 

235 """ 

236 await verify_webhook_token( 

237 project_id=project_id, 

238 webhook_token=webhook_token, 

239 authorization=authorization, 

240 )