Coverage for src/qdrant_loader/core/pipeline/source_processor.py: 37%
35 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Source processor for handling different source types."""
3import asyncio
4from collections.abc import Mapping
6from qdrant_loader.config.source_config import SourceConfig
7from qdrant_loader.connectors.base import BaseConnector
8from qdrant_loader.core.document import Document
9from qdrant_loader.core.file_conversion import FileConversionConfig
10from qdrant_loader.utils.logging import LoggingConfig
12logger = LoggingConfig.get_logger(__name__)
15class SourceProcessor:
16 """Handles processing of different source types."""
18 def __init__(
19 self,
20 shutdown_event: asyncio.Event | None = None,
21 file_conversion_config: FileConversionConfig | None = None,
22 ):
23 self.shutdown_event = shutdown_event or asyncio.Event()
24 self.file_conversion_config = file_conversion_config
26 async def process_source_type(
27 self,
28 source_configs: Mapping[str, SourceConfig],
29 connector_class: type[BaseConnector],
30 source_type: str,
31 ) -> list[Document]:
32 """Process documents from a specific source type.
34 Args:
35 source_configs: Mapping of source name to source configuration
36 connector_class: The connector class to use for this source type
37 source_type: The type of source being processed
39 Returns:
40 List of documents from all sources of this type
41 """
42 logger.debug(f"Processing {source_type} sources: {list(source_configs.keys())}")
44 all_documents = []
46 for source_name, source_config in source_configs.items():
47 if self.shutdown_event.is_set():
48 logger.info(
49 f"Shutdown requested, skipping {source_type} source: {source_name}"
50 )
51 break
53 try:
54 logger.debug(f"Processing {source_type} source: {source_name}")
56 # Create connector instance and use as async context manager
57 connector = connector_class(source_config)
59 # Set file conversion config if available and connector supports it
60 if (
61 self.file_conversion_config
62 and hasattr(connector, "set_file_conversion_config")
63 and hasattr(source_config, "enable_file_conversion")
64 and source_config.enable_file_conversion
65 ):
66 logger.debug(
67 f"Setting file conversion config for {source_type} source: {source_name}"
68 )
69 connector.set_file_conversion_config(self.file_conversion_config)
71 # Use the connector as an async context manager to ensure proper initialization
72 async with connector:
73 # Get documents from this source
74 documents = await connector.get_documents()
76 logger.debug(
77 f"Retrieved {len(documents)} documents from {source_type} source: {source_name}"
78 )
79 all_documents.extend(documents)
81 except Exception as e:
82 logger.error(
83 f"Failed to process {source_type} source {source_name}: {e}",
84 exc_info=True,
85 )
86 # Continue processing other sources even if one fails
87 continue
89 if all_documents:
90 logger.info(
91 f"📥 {source_type}: {len(all_documents)} documents from {len(source_configs)} sources"
92 )
93 return all_documents