Coverage for src/qdrant_loader/connectors/shared/http/client.py: 74%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import random 

5from typing import Any 

6 

7import requests 

8 

9try: # Optional import for async HTTP client 

10 import aiohttp # type: ignore 

11except Exception: # pragma: no cover - optional import for http client 

12 aiohttp = None # type: ignore 

13 

14 

15async def make_request_async( 

16 session: requests.Session, 

17 method: str, 

18 url: str, 

19 **kwargs: Any, 

20) -> requests.Response: 

21 """Execute a `requests` call in a worker thread and return the response. 

22 

23 Caller is responsible for calling `response.raise_for_status()` and `.json()` if needed. 

24 """ 

25 

26 def _do_request() -> requests.Response: 

27 return session.request(method, url, **kwargs) 

28 

29 return await asyncio.to_thread(_do_request) 

30 

31 

32async def make_request_with_retries_async( 

33 session: requests.Session, 

34 method: str, 

35 url: str, 

36 *, 

37 retries: int = 3, 

38 backoff_factor: float = 0.5, 

39 status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504), 

40 **kwargs: Any, 

41) -> requests.Response: 

42 """Make an async HTTP request with exponential backoff and jitter. 

43 

44 Retries only on listed status codes and `RequestException`. 

45 """ 

46 attempt = 0 

47 while True: 

48 try: 

49 response = await make_request_async(session, method, url, **kwargs) 

50 if response.status_code in status_forcelist and attempt < retries: 

51 attempt += 1 

52 sleep_s = backoff_factor * (2 ** (attempt - 1)) + random.uniform( 

53 0, 0.25 

54 ) 

55 await asyncio.sleep(sleep_s) 

56 continue 

57 return response 

58 except requests.RequestException: 

59 if attempt >= retries: 

60 raise 

61 attempt += 1 

62 sleep_s = backoff_factor * (2 ** (attempt - 1)) + random.uniform(0, 0.25) 

63 await asyncio.sleep(sleep_s) 

64 

65 

66async def aiohttp_request_with_retries( 

67 session: aiohttp.ClientSession, 

68 method: str, 

69 url: str, 

70 *, 

71 retries: int = 3, 

72 backoff_factor: float = 0.5, 

73 status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504), 

74 **kwargs: Any, 

75): 

76 """Issue an `aiohttp` request with exponential backoff and jitter.""" 

77 attempt = 0 

78 last_exc: Exception | None = None 

79 while attempt <= retries: 

80 try: 

81 # Prefer method-specific calls (get/post/...) so test doubles that 

82 # patch e.g. `session.get` still work. Fallback to `request`. 

83 requester = getattr(session, method.lower(), None) 

84 if requester is None: 

85 requester = session.request 

86 response = ( 

87 await requester(method, url, **kwargs) 

88 if requester is session.request 

89 else await requester(url, **kwargs) 

90 ) 

91 if response.status in status_forcelist and attempt < retries: 

92 await response.release() 

93 attempt += 1 

94 sleep_s = backoff_factor * (2 ** (attempt - 1)) + random.uniform( 

95 0, 0.25 

96 ) 

97 await asyncio.sleep(sleep_s) 

98 continue 

99 return response 

100 except Exception as e: # pragma: no cover - exercised in integration 

101 last_exc = e 

102 if attempt >= retries: 

103 raise 

104 attempt += 1 

105 sleep_s = backoff_factor * (2 ** (attempt - 1)) + random.uniform(0, 0.25) 

106 await asyncio.sleep(sleep_s) 

107 if last_exc: 

108 raise last_exc