Coverage for src/qdrant_loader/connectors/shared/http/client.py: 74%
46 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1from __future__ import annotations
3import asyncio
4import random
5from typing import Any
7import requests
9try: # Optional import for async HTTP client
10 import aiohttp # type: ignore
11except Exception: # pragma: no cover - optional import for http client
12 aiohttp = None # type: ignore
15async def make_request_async(
16 session: requests.Session,
17 method: str,
18 url: str,
19 **kwargs: Any,
20) -> requests.Response:
21 """Execute a `requests` call in a worker thread and return the response.
23 Caller is responsible for calling `response.raise_for_status()` and `.json()` if needed.
24 """
26 def _do_request() -> requests.Response:
27 return session.request(method, url, **kwargs)
29 return await asyncio.to_thread(_do_request)
32async def make_request_with_retries_async(
33 session: requests.Session,
34 method: str,
35 url: str,
36 *,
37 retries: int = 3,
38 backoff_factor: float = 0.5,
39 status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504),
40 **kwargs: Any,
41) -> requests.Response:
42 """Make an async HTTP request with exponential backoff and jitter.
44 Retries only on listed status codes and `RequestException`.
45 """
46 attempt = 0
47 while True:
48 try:
49 response = await make_request_async(session, method, url, **kwargs)
50 if response.status_code in status_forcelist and attempt < retries:
51 attempt += 1
52 sleep_s = backoff_factor * (2 ** (attempt - 1)) + random.uniform(
53 0, 0.25
54 )
55 await asyncio.sleep(sleep_s)
56 continue
57 return response
58 except requests.RequestException:
59 if attempt >= retries:
60 raise
61 attempt += 1
62 sleep_s = backoff_factor * (2 ** (attempt - 1)) + random.uniform(0, 0.25)
63 await asyncio.sleep(sleep_s)
66async def aiohttp_request_with_retries(
67 session: aiohttp.ClientSession,
68 method: str,
69 url: str,
70 *,
71 retries: int = 3,
72 backoff_factor: float = 0.5,
73 status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504),
74 **kwargs: Any,
75):
76 """Issue an `aiohttp` request with exponential backoff and jitter."""
77 attempt = 0
78 last_exc: Exception | None = None
79 while attempt <= retries:
80 try:
81 # Prefer method-specific calls (get/post/...) so test doubles that
82 # patch e.g. `session.get` still work. Fallback to `request`.
83 requester = getattr(session, method.lower(), None)
84 if requester is None:
85 requester = session.request
86 response = (
87 await requester(method, url, **kwargs)
88 if requester is session.request
89 else await requester(url, **kwargs)
90 )
91 if response.status in status_forcelist and attempt < retries:
92 await response.release()
93 attempt += 1
94 sleep_s = backoff_factor * (2 ** (attempt - 1)) + random.uniform(
95 0, 0.25
96 )
97 await asyncio.sleep(sleep_s)
98 continue
99 return response
100 except Exception as e: # pragma: no cover - exercised in integration
101 last_exc = e
102 if attempt >= retries:
103 raise
104 attempt += 1
105 sleep_s = backoff_factor * (2 ** (attempt - 1)) + random.uniform(0, 0.25)
106 await asyncio.sleep(sleep_s)
107 if last_exc:
108 raise last_exc