Coverage for src/qdrant_loader/connectors/git/connector.py: 57%
174 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""Git repository connector implementation."""
3import os
4import shutil
5import tempfile
7from qdrant_loader.config.types import SourceType
8from qdrant_loader.connectors.base import BaseConnector
9from qdrant_loader.connectors.git.config import GitRepoConfig
10from qdrant_loader.connectors.git.file_processor import FileProcessor
11from qdrant_loader.connectors.git.metadata_extractor import GitMetadataExtractor
12from qdrant_loader.connectors.git.operations import GitOperations
13from qdrant_loader.core.document import Document
14from qdrant_loader.core.file_conversion import (
15 FileConverter,
16 FileDetector,
17 FileConversionConfig,
18 FileConversionError,
19)
20from qdrant_loader.utils.logging import LoggingConfig
22logger = LoggingConfig.get_logger(__name__)
25class GitConnector(BaseConnector):
26 """Git repository connector."""
28 def __init__(self, config: GitRepoConfig):
29 """Initialize the Git connector.
31 Args:
32 config: Configuration for the Git repository
33 """
34 super().__init__(config)
35 self.config = config
36 self.temp_dir = None # Will be set in __enter__
37 self.metadata_extractor = GitMetadataExtractor(config=self.config)
38 self.git_ops = GitOperations()
39 self.file_processor = None # Will be initialized in __enter__
40 self.logger = LoggingConfig.get_logger(__name__)
41 self.logger.debug("Initializing GitConnector")
42 self.logger.debug("GitConnector Configuration", config=config.model_dump())
43 self._initialized = False
45 # Initialize file conversion components if enabled
46 self.file_converter = None
47 self.file_detector = None
48 if self.config.enable_file_conversion:
49 self.logger.debug("File conversion enabled for Git connector")
50 # File conversion config will be set from global config during ingestion
51 self.file_detector = FileDetector()
52 else:
53 self.logger.debug("File conversion disabled for Git connector")
55 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig):
56 """Set file conversion configuration from global config.
58 Args:
59 file_conversion_config: Global file conversion configuration
60 """
61 if self.config.enable_file_conversion:
62 self.file_converter = FileConverter(file_conversion_config)
63 self.logger.debug("File converter initialized with global config")
65 async def __aenter__(self):
66 """Async context manager entry."""
67 try:
68 # Create temporary directory
69 self.temp_dir = tempfile.mkdtemp()
70 self.config.temp_dir = (
71 self.temp_dir
72 ) # Update config with the actual temp dir
73 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir)
75 # Initialize file processor
76 self.file_processor = FileProcessor(
77 config=self.config,
78 temp_dir=self.temp_dir,
79 file_detector=self.file_detector,
80 )
82 # Get auth token from config
83 auth_token = None
84 if self.config.token:
85 auth_token = self.config.token
86 self.logger.debug(
87 "Using authentication token", token_length=len(auth_token)
88 )
90 # Clone repository
91 self.logger.debug(
92 "Attempting to clone repository",
93 url=self.config.base_url,
94 branch=self.config.branch,
95 depth=self.config.depth,
96 temp_dir=self.temp_dir,
97 )
99 try:
100 self.git_ops.clone(
101 url=str(self.config.base_url),
102 to_path=self.temp_dir,
103 branch=self.config.branch,
104 depth=self.config.depth,
105 auth_token=auth_token,
106 )
107 except Exception as clone_error:
108 self.logger.error(
109 "Failed to clone repository",
110 error=str(clone_error),
111 error_type=type(clone_error).__name__,
112 url=self.config.base_url,
113 branch=self.config.branch,
114 temp_dir=self.temp_dir,
115 )
116 raise
118 # Verify repository initialization
119 if not self.git_ops.repo:
120 self.logger.error(
121 "Repository not initialized after clone", temp_dir=self.temp_dir
122 )
123 raise ValueError("Repository not initialized")
125 # Verify repository is valid
126 try:
127 self.git_ops.repo.git.status()
128 self.logger.debug(
129 "Repository is valid and accessible", temp_dir=self.temp_dir
130 )
131 except Exception as status_error:
132 self.logger.error(
133 "Failed to verify repository status",
134 error=str(status_error),
135 error_type=type(status_error).__name__,
136 temp_dir=self.temp_dir,
137 )
138 raise
140 self._initialized = True
141 return self
142 except ValueError as e:
143 # Preserve ValueError type
144 self.logger.error("Failed to set up Git repository", error=str(e))
145 raise ValueError(str(e)) from e # Re-raise with the same message
146 except Exception as e:
147 self.logger.error(
148 "Failed to set up Git repository",
149 error=str(e),
150 error_type=type(e).__name__,
151 temp_dir=self.temp_dir,
152 )
153 # Clean up if something goes wrong
154 if self.temp_dir:
155 self._cleanup()
156 raise RuntimeError(f"Failed to set up Git repository: {e}") from e
158 def __enter__(self):
159 """Synchronous context manager entry."""
160 if not self._initialized:
161 self._initialized = True
162 # Create temporary directory
163 self.temp_dir = tempfile.mkdtemp()
164 self.config.temp_dir = (
165 self.temp_dir
166 ) # Update config with the actual temp dir
167 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir)
169 # Initialize file processor
170 self.file_processor = FileProcessor(
171 config=self.config,
172 temp_dir=self.temp_dir,
173 file_detector=self.file_detector,
174 )
176 # Get auth token from config
177 auth_token = None
178 if self.config.token:
179 auth_token = self.config.token
180 self.logger.debug(
181 "Using authentication token", token_length=len(auth_token)
182 )
184 # Clone repository
185 self.logger.debug(
186 "Attempting to clone repository",
187 url=self.config.base_url,
188 branch=self.config.branch,
189 depth=self.config.depth,
190 temp_dir=self.temp_dir,
191 )
193 try:
194 self.git_ops.clone(
195 url=str(self.config.base_url),
196 to_path=self.temp_dir,
197 branch=self.config.branch,
198 depth=self.config.depth,
199 auth_token=auth_token,
200 )
201 except Exception as clone_error:
202 self.logger.error(
203 "Failed to clone repository",
204 error=str(clone_error),
205 error_type=type(clone_error).__name__,
206 url=self.config.base_url,
207 branch=self.config.branch,
208 temp_dir=self.temp_dir,
209 )
210 raise
212 # Verify repository initialization
213 if not self.git_ops.repo:
214 self.logger.error(
215 "Repository not initialized after clone", temp_dir=self.temp_dir
216 )
217 raise ValueError("Repository not initialized")
219 # Verify repository is valid
220 try:
221 self.git_ops.repo.git.status()
222 self.logger.debug(
223 "Repository is valid and accessible", temp_dir=self.temp_dir
224 )
225 except Exception as status_error:
226 self.logger.error(
227 "Failed to verify repository status",
228 error=str(status_error),
229 error_type=type(status_error).__name__,
230 temp_dir=self.temp_dir,
231 )
232 raise
233 return self
235 async def __aexit__(self, exc_type, exc_val, exc_tb):
236 """Async context manager exit."""
237 self._cleanup()
238 self._initialized = False
240 def __exit__(self, exc_type, exc_val, exc_tb):
241 """Clean up resources."""
242 self._cleanup()
244 def _cleanup(self):
245 """Clean up temporary directory."""
246 if self.temp_dir and os.path.exists(self.temp_dir):
247 try:
248 shutil.rmtree(self.temp_dir)
249 self.logger.debug("Cleaned up temporary directory")
250 except Exception as e:
251 self.logger.error(f"Failed to clean up temporary directory: {e}")
253 def _process_file(self, file_path: str) -> Document:
254 """Process a single file.
256 Args:
257 file_path: Path to the file
259 Returns:
260 Document instance with file content and metadata
262 Raises:
263 Exception: If file processing fails
264 """
265 try:
266 # Get relative path from repository root
267 rel_path = os.path.relpath(file_path, self.temp_dir)
269 # Check if file needs conversion
270 needs_conversion = (
271 self.config.enable_file_conversion
272 and self.file_detector
273 and self.file_converter
274 and self.file_detector.is_supported_for_conversion(file_path)
275 )
277 if needs_conversion:
278 self.logger.debug("File needs conversion", file_path=rel_path)
279 try:
280 # Convert file to markdown
281 assert self.file_converter is not None # Type checker hint
282 content = self.file_converter.convert_file(file_path)
283 content_type = "md" # Converted files are markdown
284 conversion_method = "markitdown"
285 conversion_failed = False
286 self.logger.info("File conversion successful", file_path=rel_path)
287 except FileConversionError as e:
288 self.logger.warning(
289 "File conversion failed, creating fallback document",
290 file_path=rel_path,
291 error=str(e),
292 )
293 # Create fallback document
294 assert self.file_converter is not None # Type checker hint
295 content = self.file_converter.create_fallback_document(file_path, e)
296 content_type = "md" # Fallback is also markdown
297 conversion_method = "markitdown_fallback"
298 conversion_failed = True
299 else:
300 # Read file content normally
301 content = self.git_ops.get_file_content(file_path)
302 # Get file extension without the dot
303 content_type = os.path.splitext(file_path)[1].lower().lstrip(".")
304 conversion_method = None
305 conversion_failed = False
307 first_commit_date = self.git_ops.get_first_commit_date(file_path)
309 # Get last commit date
310 last_commit_date = self.git_ops.get_last_commit_date(file_path)
312 # Extract metadata
313 metadata = self.metadata_extractor.extract_all_metadata(
314 file_path=rel_path, content=content
315 )
317 # Add Git-specific metadata
318 metadata.update(
319 {
320 "repository_url": self.config.base_url,
321 "branch": self.config.branch,
322 "last_commit_date": (
323 last_commit_date.isoformat() if last_commit_date else None
324 ),
325 }
326 )
328 # Add file conversion metadata if applicable
329 if needs_conversion:
330 metadata.update(
331 {
332 "conversion_method": conversion_method,
333 "conversion_failed": conversion_failed,
334 "original_file_type": os.path.splitext(file_path)[1]
335 .lower()
336 .lstrip("."),
337 }
338 )
340 self.logger.debug(f"Processed Git file: /{rel_path!s}")
342 # Create document
343 git_document = Document(
344 title=os.path.basename(file_path),
345 content=content,
346 content_type=content_type,
347 metadata=metadata,
348 source_type=SourceType.GIT,
349 source=self.config.source,
350 url=f"{str(self.config.base_url).replace('.git', '')}/blob/{self.config.branch}/{rel_path}",
351 is_deleted=False,
352 created_at=first_commit_date,
353 updated_at=last_commit_date,
354 )
356 return git_document
357 except Exception as e:
358 self.logger.error(
359 "Failed to process file", file_path=file_path, error=str(e)
360 )
361 raise
363 async def get_documents(self) -> list[Document]:
364 """Get all documents from the repository.
366 Returns:
367 List of documents
369 Raises:
370 Exception: If document retrieval fails
371 """
372 try:
373 self._ensure_initialized()
374 try:
375 files = (
376 self.git_ops.list_files()
377 ) # This will raise ValueError if not initialized
378 except ValueError as e:
379 self.logger.error("Failed to list files", error=str(e))
380 raise ValueError("Repository not initialized") from e
382 documents = []
384 for file_path in files:
385 if not self.file_processor.should_process_file(file_path): # type: ignore
386 continue
388 try:
389 document = self._process_file(file_path)
390 documents.append(document)
392 except Exception as e:
393 self.logger.error(
394 "Failed to process file", file_path=file_path, error=str(e)
395 )
396 continue
398 # Return all documents that need to be processed
399 return documents
401 except ValueError as e:
402 # Re-raise ValueError to maintain the error type
403 self.logger.error("Failed to get documents", error=str(e))
404 raise
405 except Exception as e:
406 self.logger.error("Failed to get documents", error=str(e))
407 raise
409 def _ensure_initialized(self):
410 """Ensure the repository is initialized before performing operations."""
411 if not self._initialized:
412 self.logger.error(
413 "Repository not initialized. Use the connector as a context manager."
414 )
415 raise ValueError("Repository not initialized")