Coverage for src/qdrant_loader/connectors/localfile/connector.py: 69%
80 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1import os
2from datetime import UTC, datetime
3from urllib.parse import unquote, urlparse
5from qdrant_loader.connectors.base import BaseConnector
6from qdrant_loader.core.document import Document
7from qdrant_loader.core.file_conversion import (
8 FileConversionConfig,
9 FileConversionError,
10 FileConverter,
11 FileDetector,
12)
13from qdrant_loader.utils.logging import LoggingConfig
15from .config import LocalFileConfig
16from .file_processor import LocalFileFileProcessor
17from .metadata_extractor import LocalFileMetadataExtractor
20class LocalFileConnector(BaseConnector):
21 """Connector for ingesting local files."""
23 def __init__(self, config: LocalFileConfig):
24 super().__init__(config)
25 self.config = config
26 # Parse base_url (file://...) to get the local path with Windows support
27 parsed = urlparse(str(config.base_url))
28 self.base_path = self._fix_windows_file_path(parsed.path)
29 self.file_processor = LocalFileFileProcessor(config, self.base_path)
30 self.metadata_extractor = LocalFileMetadataExtractor(self.base_path)
31 self.logger = LoggingConfig.get_logger(__name__)
32 self._initialized = True
34 # Initialize file conversion components if enabled
35 self.file_converter = None
36 self.file_detector = None
37 if self.config.enable_file_conversion:
38 self.logger.debug("File conversion enabled for LocalFile connector")
39 # File conversion config will be set from global config during ingestion
40 self.file_detector = FileDetector()
41 # Update file processor with file detector
42 self.file_processor = LocalFileFileProcessor(
43 config, self.base_path, self.file_detector
44 )
45 else:
46 self.logger.debug("File conversion disabled for LocalFile connector")
48 def _fix_windows_file_path(self, path: str) -> str:
49 """Fix Windows file path from URL parsing.
51 urlparse() adds a leading slash to Windows drive letters, e.g.:
52 file:///C:/Users/... -> path = "/C:/Users/..."
53 This method removes the leading slash for Windows paths and handles URL decoding.
55 Args:
56 path: Raw path from urlparse()
58 Returns:
59 Fixed path suitable for the current platform
60 """
61 # First decode URL encoding (e.g., %20 -> space)
62 path = unquote(path)
64 # Handle Windows paths: remove leading slash if it's a drive letter
65 if len(path) >= 3 and path[0] == "/" and path[2] == ":":
66 # This looks like a Windows path with leading slash: "/C:/..." or "/C:" -> "C:/..." or "C:"
67 path = path[1:]
69 return path
71 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig):
72 """Set file conversion configuration from global config.
74 Args:
75 file_conversion_config: Global file conversion configuration
76 """
77 if self.config.enable_file_conversion:
78 self.file_converter = FileConverter(file_conversion_config)
79 self.logger.debug("File converter initialized with global config")
81 async def get_documents(self) -> list[Document]:
82 """Get all documents from the local file source."""
83 documents = []
84 for root, _, files in os.walk(self.base_path):
85 for file in files:
86 file_path = os.path.join(root, file)
87 if not self.file_processor.should_process_file(file_path):
88 continue
89 try:
90 # Get relative path from base directory
91 rel_path = os.path.relpath(file_path, self.base_path)
93 # Check if file needs conversion
94 needs_conversion = (
95 self.config.enable_file_conversion
96 and self.file_detector
97 and self.file_converter
98 and self.file_detector.is_supported_for_conversion(file_path)
99 )
101 if needs_conversion:
102 self.logger.debug(
103 "File needs conversion",
104 file_path=rel_path.replace("\\", "/"),
105 )
106 try:
107 # Convert file to markdown
108 assert self.file_converter is not None # Type checker hint
109 content = self.file_converter.convert_file(file_path)
110 content_type = "md" # Converted files are markdown
111 conversion_method = "markitdown"
112 conversion_failed = False
113 self.logger.info(
114 "File conversion successful",
115 file_path=rel_path.replace("\\", "/"),
116 )
117 except FileConversionError as e:
118 self.logger.warning(
119 "File conversion failed, creating fallback document",
120 file_path=rel_path.replace("\\", "/"),
121 error=str(e),
122 )
123 # Create fallback document
124 assert self.file_converter is not None # Type checker hint
125 content = self.file_converter.create_fallback_document(
126 file_path, e
127 )
128 content_type = "md" # Fallback is also markdown
129 conversion_method = "markitdown_fallback"
130 conversion_failed = True
131 else:
132 # Read file content normally
133 with open(file_path, encoding="utf-8", errors="ignore") as f:
134 content = f.read()
135 # Get file extension without the dot
136 content_type = os.path.splitext(file)[1].lower().lstrip(".")
137 conversion_method = None
138 conversion_failed = False
140 # Get file modification time
141 file_mtime = os.path.getmtime(file_path)
142 updated_at = datetime.fromtimestamp(file_mtime, tz=UTC)
144 metadata = self.metadata_extractor.extract_all_metadata(
145 file_path, content
146 )
148 # Add file conversion metadata if applicable
149 if needs_conversion:
150 metadata.update(
151 {
152 "conversion_method": conversion_method,
153 "conversion_failed": conversion_failed,
154 "original_file_type": os.path.splitext(file)[1]
155 .lower()
156 .lstrip("."),
157 }
158 )
160 self.logger.debug(
161 f"Processed local file: {rel_path.replace('\\', '/')}"
162 )
164 # Create consistent URL with forward slashes for cross-platform compatibility
165 normalized_path = os.path.realpath(file_path).replace("\\", "/")
166 doc = Document(
167 title=os.path.basename(file_path),
168 content=content,
169 content_type=content_type,
170 metadata=metadata,
171 source_type="localfile",
172 source=self.config.source,
173 url=f"file://{normalized_path}",
174 is_deleted=False,
175 updated_at=updated_at,
176 )
177 documents.append(doc)
178 except Exception as e:
179 self.logger.error(
180 "Failed to process file",
181 file_path=file_path.replace("\\", "/"),
182 error=str(e),
183 )
184 continue
185 return documents