Coverage for src/qdrant_loader/connectors/shared/http/policy.py: 72%
29 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1from __future__ import annotations
3import asyncio
4from typing import Any
6import requests
8from .client import (
9 aiohttp_request_with_retries,
10 make_request_with_retries_async,
11)
12from .rate_limit import RateLimiter
14DEFAULT_STATUS_FORCELIST: tuple[int, ...] = (429, 500, 502, 503, 504)
17async def request_with_policy(
18 session: requests.Session,
19 method: str,
20 url: str,
21 *,
22 rate_limiter: RateLimiter | None = None,
23 retries: int = 3,
24 backoff_factor: float = 0.5,
25 status_forcelist: tuple[int, ...] = DEFAULT_STATUS_FORCELIST,
26 overall_timeout: float | None = None,
27 **kwargs: Any,
28) -> requests.Response:
29 """Perform a requests-based HTTP call with optional rate limiting and retries.
31 This helper centralizes our connectors' behavior by combining an optional
32 rate limiter with retry-and-jitter semantics.
34 Args:
35 session: Synchronous requests session (executed via thread offloading)
36 method: HTTP method (GET/POST/...)
37 url: Target URL
38 rate_limiter: Optional async rate limiter enforcing minimum spacing
39 retries: Maximum retry attempts for transient failures
40 backoff_factor: Base backoff factor for exponential backoff with jitter
41 status_forcelist: HTTP status codes that should be retried
42 timeout: Optional overall timeout (seconds) applied to the awaitable
43 **kwargs: Forwarded to requests.Session.request
45 Returns:
46 requests.Response
47 """
49 async def _do_call() -> requests.Response:
50 return await make_request_with_retries_async(
51 session,
52 method,
53 url,
54 retries=retries,
55 backoff_factor=backoff_factor,
56 status_forcelist=status_forcelist,
57 **kwargs,
58 )
60 # Apply rate limiting if provided
61 if rate_limiter is not None:
62 async with rate_limiter:
63 if overall_timeout is not None:
64 return await asyncio.wait_for(_do_call(), timeout=overall_timeout)
65 return await _do_call()
67 # No rate limiter
68 if overall_timeout is not None:
69 return await asyncio.wait_for(_do_call(), timeout=overall_timeout)
70 return await _do_call()
73async def aiohttp_request_with_policy(
74 session: Any,
75 method: str,
76 url: str,
77 *,
78 rate_limiter: RateLimiter | None = None,
79 retries: int = 3,
80 backoff_factor: float = 0.5,
81 status_forcelist: tuple[int, ...] = DEFAULT_STATUS_FORCELIST,
82 overall_timeout: float | None = None,
83 **kwargs: Any,
84):
85 """Perform an aiohttp-based HTTP call with optional rate limiting and retries.
87 Args mirror request_with_policy but operate on an aiohttp.ClientSession.
88 """
90 async def _do_call():
91 return await aiohttp_request_with_retries(
92 session,
93 method,
94 url,
95 retries=retries,
96 backoff_factor=backoff_factor,
97 status_forcelist=status_forcelist,
98 **kwargs,
99 )
101 if rate_limiter is not None:
102 async with rate_limiter:
103 if overall_timeout is not None:
104 return await asyncio.wait_for(_do_call(), timeout=overall_timeout)
105 return await _do_call()
107 if overall_timeout is not None:
108 return await asyncio.wait_for(_do_call(), timeout=overall_timeout)
109 return await _do_call()