Coverage for src/qdrant_loader/connectors/localfile/connector.py: 66%
74 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1import os
2from datetime import UTC, datetime
3from urllib.parse import urlparse
5from qdrant_loader.connectors.base import BaseConnector
6from qdrant_loader.core.document import Document
7from qdrant_loader.core.file_conversion import (
8 FileConverter,
9 FileDetector,
10 FileConversionConfig,
11 FileConversionError,
12)
13from qdrant_loader.utils.logging import LoggingConfig
15from .config import LocalFileConfig
16from .file_processor import LocalFileFileProcessor
17from .metadata_extractor import LocalFileMetadataExtractor
20class LocalFileConnector(BaseConnector):
21 """Connector for ingesting local files."""
23 def __init__(self, config: LocalFileConfig):
24 super().__init__(config)
25 self.config = config
26 # Parse base_url (file://...) to get the local path
27 parsed = urlparse(str(config.base_url))
28 self.base_path = parsed.path
29 self.file_processor = LocalFileFileProcessor(config, self.base_path)
30 self.metadata_extractor = LocalFileMetadataExtractor(self.base_path)
31 self.logger = LoggingConfig.get_logger(__name__)
32 self._initialized = True
34 # Initialize file conversion components if enabled
35 self.file_converter = None
36 self.file_detector = None
37 if self.config.enable_file_conversion:
38 self.logger.debug("File conversion enabled for LocalFile connector")
39 # File conversion config will be set from global config during ingestion
40 self.file_detector = FileDetector()
41 # Update file processor with file detector
42 self.file_processor = LocalFileFileProcessor(
43 config, self.base_path, self.file_detector
44 )
45 else:
46 self.logger.debug("File conversion disabled for LocalFile connector")
48 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig):
49 """Set file conversion configuration from global config.
51 Args:
52 file_conversion_config: Global file conversion configuration
53 """
54 if self.config.enable_file_conversion:
55 self.file_converter = FileConverter(file_conversion_config)
56 self.logger.debug("File converter initialized with global config")
58 async def get_documents(self) -> list[Document]:
59 """Get all documents from the local file source."""
60 documents = []
61 for root, _, files in os.walk(self.base_path):
62 for file in files:
63 file_path = os.path.join(root, file)
64 if not self.file_processor.should_process_file(file_path):
65 continue
66 try:
67 # Get relative path from base directory
68 rel_path = os.path.relpath(file_path, self.base_path)
70 # Check if file needs conversion
71 needs_conversion = (
72 self.config.enable_file_conversion
73 and self.file_detector
74 and self.file_converter
75 and self.file_detector.is_supported_for_conversion(file_path)
76 )
78 if needs_conversion:
79 self.logger.debug("File needs conversion", file_path=rel_path)
80 try:
81 # Convert file to markdown
82 assert self.file_converter is not None # Type checker hint
83 content = self.file_converter.convert_file(file_path)
84 content_type = "md" # Converted files are markdown
85 conversion_method = "markitdown"
86 conversion_failed = False
87 self.logger.info(
88 "File conversion successful", file_path=rel_path
89 )
90 except FileConversionError as e:
91 self.logger.warning(
92 "File conversion failed, creating fallback document",
93 file_path=rel_path,
94 error=str(e),
95 )
96 # Create fallback document
97 assert self.file_converter is not None # Type checker hint
98 content = self.file_converter.create_fallback_document(
99 file_path, e
100 )
101 content_type = "md" # Fallback is also markdown
102 conversion_method = "markitdown_fallback"
103 conversion_failed = True
104 else:
105 # Read file content normally
106 with open(file_path, encoding="utf-8", errors="ignore") as f:
107 content = f.read()
108 # Get file extension without the dot
109 content_type = os.path.splitext(file)[1].lower().lstrip(".")
110 conversion_method = None
111 conversion_failed = False
113 # Get file modification time
114 file_mtime = os.path.getmtime(file_path)
115 updated_at = datetime.fromtimestamp(file_mtime, tz=UTC)
117 metadata = self.metadata_extractor.extract_all_metadata(
118 file_path, content
119 )
121 # Add file conversion metadata if applicable
122 if needs_conversion:
123 metadata.update(
124 {
125 "conversion_method": conversion_method,
126 "conversion_failed": conversion_failed,
127 "original_file_type": os.path.splitext(file)[1]
128 .lower()
129 .lstrip("."),
130 }
131 )
133 self.logger.debug(f"Processed local file: {rel_path}")
135 doc = Document(
136 title=os.path.basename(file_path),
137 content=content,
138 content_type=content_type,
139 metadata=metadata,
140 source_type="localfile",
141 source=self.config.source,
142 url=f"file://{os.path.realpath(file_path)}",
143 is_deleted=False,
144 updated_at=updated_at,
145 )
146 documents.append(doc)
147 except Exception as e:
148 self.logger.error(
149 "Failed to process file", file_path=file_path, error=str(e)
150 )
151 continue
152 return documents