Coverage for src / qdrant_loader / connectors / localfile / connector.py: 78%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 04:48 +0000
1import os
2from datetime import UTC, datetime
3from urllib.parse import unquote, urlparse
5from qdrant_loader.connectors.base import BaseConnector
6from qdrant_loader.core.document import Document
7from qdrant_loader.core.file_conversion import (
8 FileConversionConfig,
9 FileConversionError,
10 FileConverter,
11 FileDetector,
12)
13from qdrant_loader.utils.logging import LoggingConfig
15from .config import LocalFileConfig
16from .file_processor import LocalFileFileProcessor
17from .metadata_extractor import LocalFileMetadataExtractor
20class LocalFileConnector(BaseConnector):
21 """Connector for ingesting local files."""
23 def __init__(self, config: LocalFileConfig):
24 super().__init__(config)
25 self.config = config
26 # Parse base_url (file://...) to get the local path with Windows support
27 parsed = urlparse(str(config.base_url))
28 self.base_path = self._fix_windows_file_path(parsed.path)
29 self.file_processor = LocalFileFileProcessor(config, self.base_path)
30 self.metadata_extractor = LocalFileMetadataExtractor(self.base_path)
31 self.logger = LoggingConfig.get_logger(__name__)
32 self._initialized = True
34 # Initialize file conversion components if enabled
35 self.file_converter = None
36 self.file_detector = None
37 if self.config.enable_file_conversion:
38 self.logger.debug("File conversion enabled for LocalFile connector")
39 # File conversion config will be set from global config during ingestion
40 self.file_detector = FileDetector()
41 # Update file processor with file detector
42 self.file_processor = LocalFileFileProcessor(
43 config, self.base_path, self.file_detector
44 )
45 else:
46 self.logger.debug("File conversion disabled for LocalFile connector")
48 def _fix_windows_file_path(self, path: str) -> str:
49 """Fix Windows file path from URL parsing.
51 urlparse() adds a leading slash to Windows drive letters, e.g.:
52 file:///C:/Users/... -> path = "/C:/Users/..."
53 This method removes the leading slash for Windows paths and handles URL decoding.
55 Args:
56 path: Raw path from urlparse()
58 Returns:
59 Fixed path suitable for the current platform
60 """
61 # First decode URL encoding (e.g., %20 -> space)
62 path = unquote(path)
64 # Handle Windows paths: remove leading slash if it's a drive letter
65 if len(path) >= 3 and path[0] == "/" and path[2] == ":":
66 # This looks like a Windows path with leading slash: "/C:/..." or "/C:" -> "C:/..." or "C:"
67 path = path[1:]
69 return path
71 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig):
72 """Set file conversion configuration from global config.
74 Args:
75 file_conversion_config: Global file conversion configuration
76 """
77 if self.config.enable_file_conversion:
78 self.file_converter = FileConverter(file_conversion_config)
79 self.logger.debug("File converter initialized with global config")
81 async def get_documents(self) -> list[Document]:
82 """Get all documents from the local file source."""
83 documents = []
84 for root, _, files in os.walk(self.base_path):
85 for file in files:
86 file_path = os.path.join(root, file)
87 if not self.file_processor.should_process_file(file_path):
88 continue
89 try:
90 # Get relative path from base directory
91 rel_path = os.path.relpath(file_path, self.base_path)
92 file_extension = os.path.splitext(file)[1].lower()
94 if self.config.enable_file_conversion and file_extension in {
95 ".doc",
96 ".ppt",
97 }:
98 file_info = (
99 self.file_detector.get_file_type_info(file_path)
100 if self.file_detector
101 else {
102 "mime_type": None,
103 "file_extension": file_extension,
104 }
105 )
106 self.logger.warning(
107 "Skipping file: old doc/ppt are not supported for MarkItDown conversion",
108 file_path=rel_path.replace("\\", "/"),
109 mime_type=file_info.get("mime_type"),
110 file_extension=file_info.get("file_extension"),
111 )
112 continue
114 # Check if file needs conversion
115 needs_conversion = (
116 self.config.enable_file_conversion
117 and self.file_detector
118 and self.file_converter
119 and self.file_detector.is_supported_for_conversion(file_path)
120 )
122 if needs_conversion:
123 self.logger.debug(
124 "File needs conversion",
125 file_path=rel_path.replace("\\", "/"),
126 )
127 try:
128 # Convert file to markdown
129 assert self.file_converter is not None # Type checker hint
130 content = self.file_converter.convert_file(file_path)
131 content_type = "md" # Converted files are markdown
132 conversion_method = "markitdown"
133 conversion_failed = False
134 self.logger.info(
135 "File conversion successful",
136 file_path=rel_path.replace("\\", "/"),
137 )
138 except FileConversionError as e:
139 self.logger.warning(
140 "File conversion failed, creating fallback document",
141 file_path=rel_path.replace("\\", "/"),
142 error=str(e),
143 )
144 # Create fallback document
145 assert self.file_converter is not None # Type checker hint
146 content = self.file_converter.create_fallback_document(
147 file_path, e
148 )
149 content_type = "md" # Fallback is also markdown
150 conversion_method = "markitdown_fallback"
151 conversion_failed = True
152 else:
153 # Read file content normally
154 with open(file_path, encoding="utf-8", errors="ignore") as f:
155 content = f.read()
156 # Get file extension without the dot
157 content_type = os.path.splitext(file)[1].lower().lstrip(".")
158 conversion_method = None
159 conversion_failed = False
161 # Get file modification time
162 file_mtime = os.path.getmtime(file_path)
163 updated_at = datetime.fromtimestamp(file_mtime, tz=UTC)
165 metadata = self.metadata_extractor.extract_all_metadata(
166 file_path, content
167 )
169 # Add file conversion metadata if applicable
170 if needs_conversion:
171 metadata.update(
172 {
173 "conversion_method": conversion_method,
174 "conversion_failed": conversion_failed,
175 "original_file_type": os.path.splitext(file)[1]
176 .lower()
177 .lstrip("."),
178 }
179 )
181 self.logger.debug(
182 f"Processed local file: {rel_path.replace('\\', '/')}"
183 )
185 # Create consistent URL with forward slashes for cross-platform compatibility
186 normalized_path = os.path.realpath(file_path).replace("\\", "/")
187 doc = Document(
188 title=os.path.basename(file_path),
189 content=content,
190 content_type=content_type,
191 metadata=metadata,
192 source_type="localfile",
193 source=self.config.source,
194 url=f"file://{normalized_path}",
195 is_deleted=False,
196 updated_at=updated_at,
197 )
198 documents.append(doc)
199 except Exception as e:
200 self.logger.error(
201 "Failed to process file",
202 file_path=file_path.replace("\\", "/"),
203 error=str(e),
204 )
205 continue
206 return documents