Coverage for src/qdrant_loader/core/pipeline/workers/base_worker.py: 81%
16 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Base worker interface for pipeline workers."""
3import asyncio
4from abc import ABC, abstractmethod
5from typing import Any
7from qdrant_loader.utils.logging import LoggingConfig
9logger = LoggingConfig.get_logger(__name__)
12class BaseWorker(ABC):
13 """Base class for all pipeline workers."""
15 def __init__(self, max_workers: int, queue_size: int = 1000):
16 self.max_workers = max_workers
17 self.queue_size = queue_size
18 self.semaphore = asyncio.Semaphore(max_workers)
20 @abstractmethod
21 async def process(self, input_data: Any) -> Any:
22 """Process input data and return result.
24 Args:
25 input_data: The data to process
27 Returns:
28 Processed result
29 """
30 pass
32 async def process_with_semaphore(self, input_data: Any) -> Any:
33 """Process input data with semaphore control.
35 Args:
36 input_data: The data to process
38 Returns:
39 Processed result
40 """
41 async with self.semaphore:
42 return await self.process(input_data)