Coverage for src / qdrant_loader / connectors / base.py: 88%

34 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1import warnings 

2from collections.abc import AsyncIterator 

3from datetime import datetime 

4 

5from qdrant_loader.config.source_config import SourceConfig 

6from qdrant_loader.core.document import Document 

7from qdrant_loader.core.file_conversion import FileConversionConfig 

8 

9 

10class ConnectorConfigurationError(Exception): 

11 """Raised when a connector's configuration is invalid or access is denied. 

12 

13 This is a *fatal* error: the pipeline should stop rather than silently 

14 continuing with 0 documents. 

15 """ 

16 

17 

18class BaseConnector: 

19 """Base class for all connectors.""" 

20 

21 def __init__(self, config: SourceConfig): 

22 self.config = config 

23 self._initialized = False 

24 

25 async def __aenter__(self): 

26 """Async context manager entry.""" 

27 self._initialized = True 

28 return self 

29 

30 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

31 """Async context manager exit.""" 

32 self._initialized = False 

33 

34 def set_file_conversion_config( 

35 self, file_conversion_config: FileConversionConfig 

36 ) -> None: 

37 """Set file conversion configuration. 

38 

39 This default implementation stores the configuration for potential 

40 use by subclasses that choose to honor it. 

41 

42 Args: 

43 file_conversion_config: Global file conversion configuration 

44 """ 

45 # Store on the instance so connectors that opt-in can access it. 

46 self._file_conversion_config = file_conversion_config 

47 

48 async def stream_documents( 

49 self, since: datetime | None = None 

50 ) -> AsyncIterator[Document]: 

51 """Stream documents from the source (WS-1 connector contract). 

52 

53 This default implementation bridges legacy connectors by calling 

54 :meth:`get_documents` and yielding documents from the returned list. 

55 

56 Args: 

57 since: Optional datetime to fetch documents updated after this time. 

58 

59 Yields: 

60 Document objects from the source. 

61 """ 

62 if ( 

63 type(self).get_documents is BaseConnector.get_documents 

64 and type(self).stream_documents is BaseConnector.stream_documents 

65 ): 

66 raise NotImplementedError( 

67 f"{type(self).__name__} does not implement stream_documents or get_documents" 

68 ) 

69 

70 documents = await self.get_documents() 

71 for document in documents: 

72 yield document 

73 

74 async def get_documents(self) -> list[Document]: 

75 """Get documents from the source (DEPRECATED - use stream_documents).""" 

76 warnings.warn( 

77 "BaseConnector.get_documents is deprecated. Implement stream_documents() " 

78 "or use connector.stream_documents() to avoid materializing the full " 

79 "document list in memory.", 

80 DeprecationWarning, 

81 stacklevel=2, 

82 ) 

83 documents: list[Document] = [] 

84 async for document in self.stream_documents(): 

85 documents.append(document) 

86 return documents 

87 

88 async def fetch_by_id(self, entity_id: str) -> Document | None: 

89 """Fetch a single entity by ID (WS-1 connector contract). 

90 

91 Connectors that support single-event ingestion must override this method. 

92 """ 

93 raise NotImplementedError(f"{type(self).__name__} does not support fetch_by_id") 

94 

95 async def list_entity_ids(self) -> AsyncIterator[str]: 

96 """Stream all entity IDs for reconciliation (WS-1 connector contract). 

97 

98 Yields: 

99 Entity IDs from the source. 

100 """ 

101 # Make this an async generator so callers can use `async for` and 

102 # receive a NotImplementedError during iteration rather than at call-time. 

103 if False: # pragma: no cover - makes this function an async generator 

104 yield "" 

105 raise NotImplementedError( 

106 f"{type(self).__name__} does not support list_entity_ids" 

107 )