Coverage for src/qdrant_loader/connectors/shared/http/rate_limit.py: 94%

34 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1from __future__ import annotations 

2 

3import asyncio 

4 

5 

6class RateLimiter: 

7 """Simple async rate limiter enforcing a minimum interval between calls. 

8 

9 Default factory `per_minute` configures a per-minute limit. 

10 """ 

11 

12 def __init__( 

13 self, 

14 *, 

15 requests_per_interval: int, 

16 interval_seconds: float = 60.0, 

17 ) -> None: 

18 if requests_per_interval <= 0: 

19 raise ValueError("requests_per_interval must be > 0") 

20 if interval_seconds <= 0: 

21 raise ValueError("interval_seconds must be > 0") 

22 

23 self._min_interval: float = interval_seconds / float(requests_per_interval) 

24 self._lock: asyncio.Lock = asyncio.Lock() 

25 self._last_request_time: float | None = None 

26 

27 @classmethod 

28 def per_minute(cls, requests_per_minute: int) -> RateLimiter: 

29 return cls(requests_per_interval=requests_per_minute, interval_seconds=60.0) 

30 

31 def _get_delay(self, now: float) -> float: 

32 if self._last_request_time is None: 

33 return 0.0 

34 elapsed = now - self._last_request_time 

35 delay = self._min_interval - elapsed 

36 return delay if delay > 0.0 else 0.0 

37 

38 async def acquire(self) -> None: 

39 async with self._lock: 

40 loop = asyncio.get_running_loop() 

41 now = loop.time() 

42 delay = self._get_delay(now) 

43 if delay > 0: 

44 await asyncio.sleep(delay) 

45 now = loop.time() 

46 self._last_request_time = now 

47 

48 async def __aenter__(self) -> RateLimiter: 

49 await self.acquire() 

50 return self 

51 

52 async def __aexit__( 

53 self, exc_type, exc, tb 

54 ) -> None: # noqa: D401 (intentional no-op) 

55 # No cleanup required 

56 return None