Coverage for src / qdrant_loader / connectors / base.py: 88%
34 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1import warnings
2from collections.abc import AsyncIterator
3from datetime import datetime
5from qdrant_loader.config.source_config import SourceConfig
6from qdrant_loader.core.document import Document
7from qdrant_loader.core.file_conversion import FileConversionConfig
10class ConnectorConfigurationError(Exception):
11 """Raised when a connector's configuration is invalid or access is denied.
13 This is a *fatal* error: the pipeline should stop rather than silently
14 continuing with 0 documents.
15 """
18class BaseConnector:
19 """Base class for all connectors."""
21 def __init__(self, config: SourceConfig):
22 self.config = config
23 self._initialized = False
25 async def __aenter__(self):
26 """Async context manager entry."""
27 self._initialized = True
28 return self
30 async def __aexit__(self, exc_type, exc_val, _exc_tb):
31 """Async context manager exit."""
32 self._initialized = False
34 def set_file_conversion_config(
35 self, file_conversion_config: FileConversionConfig
36 ) -> None:
37 """Set file conversion configuration.
39 This default implementation stores the configuration for potential
40 use by subclasses that choose to honor it.
42 Args:
43 file_conversion_config: Global file conversion configuration
44 """
45 # Store on the instance so connectors that opt-in can access it.
46 self._file_conversion_config = file_conversion_config
48 async def stream_documents(
49 self, since: datetime | None = None
50 ) -> AsyncIterator[Document]:
51 """Stream documents from the source (WS-1 connector contract).
53 This default implementation bridges legacy connectors by calling
54 :meth:`get_documents` and yielding documents from the returned list.
56 Args:
57 since: Optional datetime to fetch documents updated after this time.
59 Yields:
60 Document objects from the source.
61 """
62 if (
63 type(self).get_documents is BaseConnector.get_documents
64 and type(self).stream_documents is BaseConnector.stream_documents
65 ):
66 raise NotImplementedError(
67 f"{type(self).__name__} does not implement stream_documents or get_documents"
68 )
70 documents = await self.get_documents()
71 for document in documents:
72 yield document
74 async def get_documents(self) -> list[Document]:
75 """Get documents from the source (DEPRECATED - use stream_documents)."""
76 warnings.warn(
77 "BaseConnector.get_documents is deprecated. Implement stream_documents() "
78 "or use connector.stream_documents() to avoid materializing the full "
79 "document list in memory.",
80 DeprecationWarning,
81 stacklevel=2,
82 )
83 documents: list[Document] = []
84 async for document in self.stream_documents():
85 documents.append(document)
86 return documents
88 async def fetch_by_id(self, entity_id: str) -> Document | None:
89 """Fetch a single entity by ID (WS-1 connector contract).
91 Connectors that support single-event ingestion must override this method.
92 """
93 raise NotImplementedError(f"{type(self).__name__} does not support fetch_by_id")
95 async def list_entity_ids(self) -> AsyncIterator[str]:
96 """Stream all entity IDs for reconciliation (WS-1 connector contract).
98 Yields:
99 Entity IDs from the source.
100 """
101 # Make this an async generator so callers can use `async for` and
102 # receive a NotImplementedError during iteration rather than at call-time.
103 if False: # pragma: no cover - makes this function an async generator
104 yield ""
105 raise NotImplementedError(
106 f"{type(self).__name__} does not support list_entity_ids"
107 )