Coverage for src/qdrant_loader/connectors/git/connector.py: 68%
179 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Git repository connector implementation."""
3import os
4import shutil
5import tempfile
7from qdrant_loader.config.types import SourceType
8from qdrant_loader.connectors.base import BaseConnector
9from qdrant_loader.connectors.git.config import GitRepoConfig
10from qdrant_loader.connectors.git.file_processor import FileProcessor
11from qdrant_loader.connectors.git.metadata_extractor import GitMetadataExtractor
12from qdrant_loader.connectors.git.operations import GitOperations
13from qdrant_loader.core.document import Document
14from qdrant_loader.core.file_conversion import (
15 FileConversionConfig,
16 FileConversionError,
17 FileConverter,
18 FileDetector,
19)
20from qdrant_loader.utils.logging import LoggingConfig
22logger = LoggingConfig.get_logger(__name__)
25class GitConnector(BaseConnector):
26 """Git repository connector."""
28 def __init__(self, config: GitRepoConfig):
29 """Initialize the Git connector.
31 Args:
32 config: Configuration for the Git repository
33 """
34 super().__init__(config)
35 self.config = config
36 self.temp_dir = None # Will be set in __enter__
37 self.metadata_extractor = GitMetadataExtractor(config=self.config)
38 self.git_ops = GitOperations()
39 self.file_processor = None # Will be initialized in __enter__
40 self.logger = LoggingConfig.get_logger(__name__)
41 self.logger.debug("Initializing GitConnector")
42 self.logger.debug("GitConnector Configuration", config=config.model_dump())
43 self._initialized = False
45 # Initialize file conversion components if enabled
46 self.file_converter = None
47 self.file_detector = None
48 if self.config.enable_file_conversion:
49 self.logger.debug("File conversion enabled for Git connector")
50 # File conversion config will be set from global config during ingestion
51 self.file_detector = FileDetector()
52 else:
53 self.logger.debug("File conversion disabled for Git connector")
55 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig):
56 """Set file conversion configuration from global config.
58 Args:
59 file_conversion_config: Global file conversion configuration
60 """
61 if self.config.enable_file_conversion:
62 self.file_converter = FileConverter(file_conversion_config)
63 self.logger.debug("File converter initialized with global config")
65 async def __aenter__(self):
66 """Async context manager entry."""
67 try:
68 # Create temporary directory
69 self.temp_dir = tempfile.mkdtemp()
70 self.config.temp_dir = (
71 self.temp_dir
72 ) # Update config with the actual temp dir
73 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir)
75 # Initialize file processor
76 self.file_processor = FileProcessor(
77 config=self.config,
78 temp_dir=self.temp_dir,
79 file_detector=self.file_detector,
80 )
82 # Get auth token from config
83 auth_token = None
84 if self.config.token:
85 auth_token = self.config.token
86 self.logger.debug(
87 "Using authentication token", token_length=len(auth_token)
88 )
90 # Clone repository
91 self.logger.debug(
92 "Attempting to clone repository",
93 url=self.config.base_url,
94 branch=self.config.branch,
95 depth=self.config.depth,
96 temp_dir=self.temp_dir,
97 )
99 try:
100 self.git_ops.clone(
101 url=str(self.config.base_url),
102 to_path=self.temp_dir,
103 branch=self.config.branch,
104 depth=self.config.depth,
105 auth_token=auth_token,
106 )
107 except Exception as clone_error:
108 self.logger.error(
109 "Failed to clone repository",
110 error=str(clone_error),
111 error_type=type(clone_error).__name__,
112 url=self.config.base_url,
113 branch=self.config.branch,
114 temp_dir=self.temp_dir,
115 )
116 raise
118 # Verify repository initialization
119 if not self.git_ops.repo:
120 self.logger.error(
121 "Repository not initialized after clone", temp_dir=self.temp_dir
122 )
123 raise ValueError("Repository not initialized")
125 # Verify repository is valid
126 try:
127 self.git_ops.repo.git.status()
128 self.logger.debug(
129 "Repository is valid and accessible", temp_dir=self.temp_dir
130 )
131 except Exception as status_error:
132 self.logger.error(
133 "Failed to verify repository status",
134 error=str(status_error),
135 error_type=type(status_error).__name__,
136 temp_dir=self.temp_dir,
137 )
138 raise
140 self._initialized = True
141 return self
142 except ValueError as e:
143 # Standardized error logging: user-friendly message + troubleshooting context
144 self.logger.error(
145 "Git repository setup failed due to invalid configuration",
146 error=str(e),
147 error_type="ValueError",
148 suggestion="Verify Git URL format, credentials, and repository accessibility",
149 )
150 raise ValueError(str(e)) from e # Re-raise with the same message
151 except Exception as e:
152 # Standardized error logging: user-friendly message + technical details + cleanup context
153 self.logger.error(
154 "Git repository setup failed during initialization",
155 error=str(e),
156 error_type=type(e).__name__,
157 temp_dir=self.temp_dir,
158 suggestion="Check Git URL, network connectivity, authentication, and disk space",
159 )
160 # Clean up if something goes wrong
161 if self.temp_dir:
162 self._cleanup()
163 raise RuntimeError(f"Failed to set up Git repository: {e}") from e
165 def __enter__(self):
166 """Synchronous context manager entry."""
167 if not self._initialized:
168 self._initialized = True
169 # Create temporary directory
170 self.temp_dir = tempfile.mkdtemp()
171 self.config.temp_dir = (
172 self.temp_dir
173 ) # Update config with the actual temp dir
174 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir)
176 # Initialize file processor
177 self.file_processor = FileProcessor(
178 config=self.config,
179 temp_dir=self.temp_dir,
180 file_detector=self.file_detector,
181 )
183 # Get auth token from config
184 auth_token = None
185 if self.config.token:
186 auth_token = self.config.token
187 self.logger.debug(
188 "Using authentication token", token_length=len(auth_token)
189 )
191 # Clone repository
192 self.logger.debug(
193 "Attempting to clone repository",
194 url=self.config.base_url,
195 branch=self.config.branch,
196 depth=self.config.depth,
197 temp_dir=self.temp_dir,
198 )
200 try:
201 self.git_ops.clone(
202 url=str(self.config.base_url),
203 to_path=self.temp_dir,
204 branch=self.config.branch,
205 depth=self.config.depth,
206 auth_token=auth_token,
207 )
208 except Exception as clone_error:
209 self.logger.error(
210 "Failed to clone repository",
211 error=str(clone_error),
212 error_type=type(clone_error).__name__,
213 url=self.config.base_url,
214 branch=self.config.branch,
215 temp_dir=self.temp_dir,
216 )
217 raise
219 # Verify repository initialization
220 if not self.git_ops.repo:
221 self.logger.error(
222 "Repository not initialized after clone", temp_dir=self.temp_dir
223 )
224 raise ValueError("Repository not initialized")
226 # Verify repository is valid
227 try:
228 self.git_ops.repo.git.status()
229 self.logger.debug(
230 "Repository is valid and accessible", temp_dir=self.temp_dir
231 )
232 except Exception as status_error:
233 self.logger.error(
234 "Failed to verify repository status",
235 error=str(status_error),
236 error_type=type(status_error).__name__,
237 temp_dir=self.temp_dir,
238 )
239 raise
240 return self
242 async def __aexit__(self, exc_type, exc_val, _exc_tb):
243 """Async context manager exit."""
244 self._cleanup()
245 self._initialized = False
247 def __exit__(self, exc_type, exc_val, _exc_tb):
248 """Clean up resources."""
249 self._cleanup()
251 def _cleanup(self):
252 """Clean up temporary directory."""
253 if self.temp_dir and os.path.exists(self.temp_dir):
254 try:
255 shutil.rmtree(self.temp_dir)
256 self.logger.debug("Cleaned up temporary directory")
257 except Exception as e:
258 self.logger.error(f"Failed to clean up temporary directory: {e}")
260 def _process_file(self, file_path: str) -> Document:
261 """Process a single file.
263 Args:
264 file_path: Path to the file
266 Returns:
267 Document instance with file content and metadata
269 Raises:
270 Exception: If file processing fails
271 """
272 try:
273 # Get relative path from repository root
274 rel_path = os.path.relpath(file_path, self.temp_dir)
276 # Fix cross-platform path issues: ensure we get a proper relative path
277 # If relpath returns a path that goes up directories (contains ..),
278 # it means the path calculation failed (common with mixed path styles)
279 if rel_path.startswith("..") and self.temp_dir:
280 # Fallback: try to extract relative path manually
281 if file_path.startswith(self.temp_dir):
282 # Remove temp_dir prefix and any leading separators
283 rel_path = (
284 file_path[len(self.temp_dir) :]
285 .lstrip(os.sep)
286 .lstrip("/")
287 .lstrip("\\")
288 )
289 else:
290 # Last resort: use basename
291 rel_path = os.path.basename(file_path)
293 # Check if file needs conversion
294 needs_conversion = (
295 self.config.enable_file_conversion
296 and self.file_detector
297 and self.file_converter
298 and self.file_detector.is_supported_for_conversion(file_path)
299 )
301 if needs_conversion:
302 self.logger.debug("File needs conversion", file_path=rel_path)
303 try:
304 # Convert file to markdown
305 assert self.file_converter is not None # Type checker hint
306 content = self.file_converter.convert_file(file_path)
307 content_type = "md" # Converted files are markdown
308 conversion_method = "markitdown"
309 conversion_failed = False
310 self.logger.info("File conversion successful", file_path=rel_path)
311 except FileConversionError as e:
312 self.logger.warning(
313 "File conversion failed, creating fallback document",
314 file_path=rel_path,
315 error=str(e),
316 )
317 # Create fallback document
318 assert self.file_converter is not None # Type checker hint
319 content = self.file_converter.create_fallback_document(file_path, e)
320 content_type = "md" # Fallback is also markdown
321 conversion_method = "markitdown_fallback"
322 conversion_failed = True
323 else:
324 # Read file content normally
325 content = self.git_ops.get_file_content(file_path)
326 # Get file extension without the dot
327 content_type = os.path.splitext(file_path)[1].lower().lstrip(".")
328 conversion_method = None
329 conversion_failed = False
331 first_commit_date = self.git_ops.get_first_commit_date(file_path)
333 # Get last commit date
334 last_commit_date = self.git_ops.get_last_commit_date(file_path)
336 # Extract metadata
337 metadata = self.metadata_extractor.extract_all_metadata(
338 file_path=rel_path, content=content
339 )
341 # Add Git-specific metadata
342 metadata.update(
343 {
344 "repository_url": self.config.base_url,
345 "branch": self.config.branch,
346 "last_commit_date": (
347 last_commit_date.isoformat() if last_commit_date else None
348 ),
349 }
350 )
352 # Add file conversion metadata if applicable
353 if needs_conversion:
354 metadata.update(
355 {
356 "conversion_method": conversion_method,
357 "conversion_failed": conversion_failed,
358 "original_file_type": os.path.splitext(file_path)[1]
359 .lower()
360 .lstrip("."),
361 }
362 )
364 self.logger.debug(f"Processed Git file: /{rel_path!s}")
366 # Create document
367 # Normalize path separators for URL (use forward slashes on all platforms)
368 normalized_rel_path = rel_path.replace(os.sep, "/").replace("\\", "/")
369 git_document = Document(
370 title=os.path.basename(file_path),
371 content=content,
372 content_type=content_type,
373 metadata=metadata,
374 source_type=SourceType.GIT,
375 source=self.config.source,
376 url=f"{str(self.config.base_url).replace('.git', '')}/blob/{self.config.branch}/{normalized_rel_path}",
377 is_deleted=False,
378 created_at=first_commit_date,
379 updated_at=last_commit_date,
380 )
382 return git_document
383 except Exception as e:
384 self.logger.error(
385 "Failed to process file", file_path=file_path, error=str(e)
386 )
387 raise
389 async def get_documents(self) -> list[Document]:
390 """Get all documents from the repository.
392 Returns:
393 List of documents
395 Raises:
396 Exception: If document retrieval fails
397 """
398 try:
399 self._ensure_initialized()
400 try:
401 files = (
402 self.git_ops.list_files()
403 ) # This will raise ValueError if not initialized
404 except ValueError as e:
405 self.logger.error("Failed to list files", error=str(e))
406 raise ValueError("Repository not initialized") from e
408 documents = []
410 for file_path in files:
411 if not self.file_processor.should_process_file(file_path): # type: ignore
412 continue
414 try:
415 document = self._process_file(file_path)
416 documents.append(document)
418 except Exception as e:
419 self.logger.error(
420 "Failed to process file", file_path=file_path, error=str(e)
421 )
422 continue
424 # Return all documents that need to be processed
425 return documents
427 except ValueError as e:
428 # Re-raise ValueError to maintain the error type
429 self.logger.error("Failed to get documents", error=str(e))
430 raise
431 except Exception as e:
432 self.logger.error("Failed to get documents", error=str(e))
433 raise
435 def _ensure_initialized(self):
436 """Ensure the repository is initialized before performing operations."""
437 if not self._initialized:
438 self.logger.error(
439 "Repository not initialized. Use the connector as a context manager."
440 )
441 raise ValueError("Repository not initialized")