Coverage for src/qdrant_loader/core/pipeline/workers/base_worker.py: 81%

16 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Base worker interface for pipeline workers.""" 

2 

3import asyncio 

4from abc import ABC, abstractmethod 

5from typing import Any 

6 

7from qdrant_loader.utils.logging import LoggingConfig 

8 

9logger = LoggingConfig.get_logger(__name__) 

10 

11 

12class BaseWorker(ABC): 

13 """Base class for all pipeline workers.""" 

14 

15 def __init__(self, max_workers: int, queue_size: int = 1000): 

16 self.max_workers = max_workers 

17 self.queue_size = queue_size 

18 self.semaphore = asyncio.Semaphore(max_workers) 

19 

20 @abstractmethod 

21 async def process(self, input_data: Any) -> Any: 

22 """Process input data and return result. 

23 

24 Args: 

25 input_data: The data to process 

26 

27 Returns: 

28 Processed result 

29 """ 

30 pass 

31 

32 async def process_with_semaphore(self, input_data: Any) -> Any: 

33 """Process input data with semaphore control. 

34 

35 Args: 

36 input_data: The data to process 

37 

38 Returns: 

39 Processed result 

40 """ 

41 async with self.semaphore: 

42 return await self.process(input_data)