Coverage for src/qdrant_loader/connectors/shared/http/policy.py: 72%

29 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3import asyncio 

4from typing import Any 

5 

6import requests 

7 

8from .client import ( 

9 aiohttp_request_with_retries, 

10 make_request_with_retries_async, 

11) 

12from .rate_limit import RateLimiter 

13 

14DEFAULT_STATUS_FORCELIST: tuple[int, ...] = (429, 500, 502, 503, 504) 

15 

16 

17async def request_with_policy( 

18 session: requests.Session, 

19 method: str, 

20 url: str, 

21 *, 

22 rate_limiter: RateLimiter | None = None, 

23 retries: int = 3, 

24 backoff_factor: float = 0.5, 

25 status_forcelist: tuple[int, ...] = DEFAULT_STATUS_FORCELIST, 

26 overall_timeout: float | None = None, 

27 **kwargs: Any, 

28) -> requests.Response: 

29 """Perform a requests-based HTTP call with optional rate limiting and retries. 

30 

31 This helper centralizes our connectors' behavior by combining an optional 

32 rate limiter with retry-and-jitter semantics. 

33 

34 Args: 

35 session: Synchronous requests session (executed via thread offloading) 

36 method: HTTP method (GET/POST/...) 

37 url: Target URL 

38 rate_limiter: Optional async rate limiter enforcing minimum spacing 

39 retries: Maximum retry attempts for transient failures 

40 backoff_factor: Base backoff factor for exponential backoff with jitter 

41 status_forcelist: HTTP status codes that should be retried 

42 timeout: Optional overall timeout (seconds) applied to the awaitable 

43 **kwargs: Forwarded to requests.Session.request 

44 

45 Returns: 

46 requests.Response 

47 """ 

48 

49 async def _do_call() -> requests.Response: 

50 return await make_request_with_retries_async( 

51 session, 

52 method, 

53 url, 

54 retries=retries, 

55 backoff_factor=backoff_factor, 

56 status_forcelist=status_forcelist, 

57 **kwargs, 

58 ) 

59 

60 # Apply rate limiting if provided 

61 if rate_limiter is not None: 

62 async with rate_limiter: 

63 if overall_timeout is not None: 

64 return await asyncio.wait_for(_do_call(), timeout=overall_timeout) 

65 return await _do_call() 

66 

67 # No rate limiter 

68 if overall_timeout is not None: 

69 return await asyncio.wait_for(_do_call(), timeout=overall_timeout) 

70 return await _do_call() 

71 

72 

73async def aiohttp_request_with_policy( 

74 session: Any, 

75 method: str, 

76 url: str, 

77 *, 

78 rate_limiter: RateLimiter | None = None, 

79 retries: int = 3, 

80 backoff_factor: float = 0.5, 

81 status_forcelist: tuple[int, ...] = DEFAULT_STATUS_FORCELIST, 

82 overall_timeout: float | None = None, 

83 **kwargs: Any, 

84): 

85 """Perform an aiohttp-based HTTP call with optional rate limiting and retries. 

86 

87 Args mirror request_with_policy but operate on an aiohttp.ClientSession. 

88 """ 

89 

90 async def _do_call(): 

91 return await aiohttp_request_with_retries( 

92 session, 

93 method, 

94 url, 

95 retries=retries, 

96 backoff_factor=backoff_factor, 

97 status_forcelist=status_forcelist, 

98 **kwargs, 

99 ) 

100 

101 if rate_limiter is not None: 

102 async with rate_limiter: 

103 if overall_timeout is not None: 

104 return await asyncio.wait_for(_do_call(), timeout=overall_timeout) 

105 return await _do_call() 

106 

107 if overall_timeout is not None: 

108 return await asyncio.wait_for(_do_call(), timeout=overall_timeout) 

109 return await _do_call()