Coverage for src/qdrant_loader/connectors/git/connector.py: 68%

179 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Git repository connector implementation.""" 

2 

3import os 

4import shutil 

5import tempfile 

6 

7from qdrant_loader.config.types import SourceType 

8from qdrant_loader.connectors.base import BaseConnector 

9from qdrant_loader.connectors.git.config import GitRepoConfig 

10from qdrant_loader.connectors.git.file_processor import FileProcessor 

11from qdrant_loader.connectors.git.metadata_extractor import GitMetadataExtractor 

12from qdrant_loader.connectors.git.operations import GitOperations 

13from qdrant_loader.core.document import Document 

14from qdrant_loader.core.file_conversion import ( 

15 FileConversionConfig, 

16 FileConversionError, 

17 FileConverter, 

18 FileDetector, 

19) 

20from qdrant_loader.utils.logging import LoggingConfig 

21 

22logger = LoggingConfig.get_logger(__name__) 

23 

24 

25class GitConnector(BaseConnector): 

26 """Git repository connector.""" 

27 

28 def __init__(self, config: GitRepoConfig): 

29 """Initialize the Git connector. 

30 

31 Args: 

32 config: Configuration for the Git repository 

33 """ 

34 super().__init__(config) 

35 self.config = config 

36 self.temp_dir = None # Will be set in __enter__ 

37 self.metadata_extractor = GitMetadataExtractor(config=self.config) 

38 self.git_ops = GitOperations() 

39 self.file_processor = None # Will be initialized in __enter__ 

40 self.logger = LoggingConfig.get_logger(__name__) 

41 self.logger.debug("Initializing GitConnector") 

42 self.logger.debug("GitConnector Configuration", config=config.model_dump()) 

43 self._initialized = False 

44 

45 # Initialize file conversion components if enabled 

46 self.file_converter = None 

47 self.file_detector = None 

48 if self.config.enable_file_conversion: 

49 self.logger.debug("File conversion enabled for Git connector") 

50 # File conversion config will be set from global config during ingestion 

51 self.file_detector = FileDetector() 

52 else: 

53 self.logger.debug("File conversion disabled for Git connector") 

54 

55 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig): 

56 """Set file conversion configuration from global config. 

57 

58 Args: 

59 file_conversion_config: Global file conversion configuration 

60 """ 

61 if self.config.enable_file_conversion: 

62 self.file_converter = FileConverter(file_conversion_config) 

63 self.logger.debug("File converter initialized with global config") 

64 

65 async def __aenter__(self): 

66 """Async context manager entry.""" 

67 try: 

68 # Create temporary directory 

69 self.temp_dir = tempfile.mkdtemp() 

70 self.config.temp_dir = ( 

71 self.temp_dir 

72 ) # Update config with the actual temp dir 

73 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir) 

74 

75 # Initialize file processor 

76 self.file_processor = FileProcessor( 

77 config=self.config, 

78 temp_dir=self.temp_dir, 

79 file_detector=self.file_detector, 

80 ) 

81 

82 # Get auth token from config 

83 auth_token = None 

84 if self.config.token: 

85 auth_token = self.config.token 

86 self.logger.debug( 

87 "Using authentication token", token_length=len(auth_token) 

88 ) 

89 

90 # Clone repository 

91 self.logger.debug( 

92 "Attempting to clone repository", 

93 url=self.config.base_url, 

94 branch=self.config.branch, 

95 depth=self.config.depth, 

96 temp_dir=self.temp_dir, 

97 ) 

98 

99 try: 

100 self.git_ops.clone( 

101 url=str(self.config.base_url), 

102 to_path=self.temp_dir, 

103 branch=self.config.branch, 

104 depth=self.config.depth, 

105 auth_token=auth_token, 

106 ) 

107 except Exception as clone_error: 

108 self.logger.error( 

109 "Failed to clone repository", 

110 error=str(clone_error), 

111 error_type=type(clone_error).__name__, 

112 url=self.config.base_url, 

113 branch=self.config.branch, 

114 temp_dir=self.temp_dir, 

115 ) 

116 raise 

117 

118 # Verify repository initialization 

119 if not self.git_ops.repo: 

120 self.logger.error( 

121 "Repository not initialized after clone", temp_dir=self.temp_dir 

122 ) 

123 raise ValueError("Repository not initialized") 

124 

125 # Verify repository is valid 

126 try: 

127 self.git_ops.repo.git.status() 

128 self.logger.debug( 

129 "Repository is valid and accessible", temp_dir=self.temp_dir 

130 ) 

131 except Exception as status_error: 

132 self.logger.error( 

133 "Failed to verify repository status", 

134 error=str(status_error), 

135 error_type=type(status_error).__name__, 

136 temp_dir=self.temp_dir, 

137 ) 

138 raise 

139 

140 self._initialized = True 

141 return self 

142 except ValueError as e: 

143 # Standardized error logging: user-friendly message + troubleshooting context 

144 self.logger.error( 

145 "Git repository setup failed due to invalid configuration", 

146 error=str(e), 

147 error_type="ValueError", 

148 suggestion="Verify Git URL format, credentials, and repository accessibility", 

149 ) 

150 raise ValueError(str(e)) from e # Re-raise with the same message 

151 except Exception as e: 

152 # Standardized error logging: user-friendly message + technical details + cleanup context 

153 self.logger.error( 

154 "Git repository setup failed during initialization", 

155 error=str(e), 

156 error_type=type(e).__name__, 

157 temp_dir=self.temp_dir, 

158 suggestion="Check Git URL, network connectivity, authentication, and disk space", 

159 ) 

160 # Clean up if something goes wrong 

161 if self.temp_dir: 

162 self._cleanup() 

163 raise RuntimeError(f"Failed to set up Git repository: {e}") from e 

164 

165 def __enter__(self): 

166 """Synchronous context manager entry.""" 

167 if not self._initialized: 

168 self._initialized = True 

169 # Create temporary directory 

170 self.temp_dir = tempfile.mkdtemp() 

171 self.config.temp_dir = ( 

172 self.temp_dir 

173 ) # Update config with the actual temp dir 

174 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir) 

175 

176 # Initialize file processor 

177 self.file_processor = FileProcessor( 

178 config=self.config, 

179 temp_dir=self.temp_dir, 

180 file_detector=self.file_detector, 

181 ) 

182 

183 # Get auth token from config 

184 auth_token = None 

185 if self.config.token: 

186 auth_token = self.config.token 

187 self.logger.debug( 

188 "Using authentication token", token_length=len(auth_token) 

189 ) 

190 

191 # Clone repository 

192 self.logger.debug( 

193 "Attempting to clone repository", 

194 url=self.config.base_url, 

195 branch=self.config.branch, 

196 depth=self.config.depth, 

197 temp_dir=self.temp_dir, 

198 ) 

199 

200 try: 

201 self.git_ops.clone( 

202 url=str(self.config.base_url), 

203 to_path=self.temp_dir, 

204 branch=self.config.branch, 

205 depth=self.config.depth, 

206 auth_token=auth_token, 

207 ) 

208 except Exception as clone_error: 

209 self.logger.error( 

210 "Failed to clone repository", 

211 error=str(clone_error), 

212 error_type=type(clone_error).__name__, 

213 url=self.config.base_url, 

214 branch=self.config.branch, 

215 temp_dir=self.temp_dir, 

216 ) 

217 raise 

218 

219 # Verify repository initialization 

220 if not self.git_ops.repo: 

221 self.logger.error( 

222 "Repository not initialized after clone", temp_dir=self.temp_dir 

223 ) 

224 raise ValueError("Repository not initialized") 

225 

226 # Verify repository is valid 

227 try: 

228 self.git_ops.repo.git.status() 

229 self.logger.debug( 

230 "Repository is valid and accessible", temp_dir=self.temp_dir 

231 ) 

232 except Exception as status_error: 

233 self.logger.error( 

234 "Failed to verify repository status", 

235 error=str(status_error), 

236 error_type=type(status_error).__name__, 

237 temp_dir=self.temp_dir, 

238 ) 

239 raise 

240 return self 

241 

242 async def __aexit__(self, exc_type, exc_val, _exc_tb): 

243 """Async context manager exit.""" 

244 self._cleanup() 

245 self._initialized = False 

246 

247 def __exit__(self, exc_type, exc_val, _exc_tb): 

248 """Clean up resources.""" 

249 self._cleanup() 

250 

251 def _cleanup(self): 

252 """Clean up temporary directory.""" 

253 if self.temp_dir and os.path.exists(self.temp_dir): 

254 try: 

255 shutil.rmtree(self.temp_dir) 

256 self.logger.debug("Cleaned up temporary directory") 

257 except Exception as e: 

258 self.logger.error(f"Failed to clean up temporary directory: {e}") 

259 

260 def _process_file(self, file_path: str) -> Document: 

261 """Process a single file. 

262 

263 Args: 

264 file_path: Path to the file 

265 

266 Returns: 

267 Document instance with file content and metadata 

268 

269 Raises: 

270 Exception: If file processing fails 

271 """ 

272 try: 

273 # Get relative path from repository root 

274 rel_path = os.path.relpath(file_path, self.temp_dir) 

275 

276 # Fix cross-platform path issues: ensure we get a proper relative path 

277 # If relpath returns a path that goes up directories (contains ..), 

278 # it means the path calculation failed (common with mixed path styles) 

279 if rel_path.startswith("..") and self.temp_dir: 

280 # Fallback: try to extract relative path manually 

281 if file_path.startswith(self.temp_dir): 

282 # Remove temp_dir prefix and any leading separators 

283 rel_path = ( 

284 file_path[len(self.temp_dir) :] 

285 .lstrip(os.sep) 

286 .lstrip("/") 

287 .lstrip("\\") 

288 ) 

289 else: 

290 # Last resort: use basename 

291 rel_path = os.path.basename(file_path) 

292 

293 # Check if file needs conversion 

294 needs_conversion = ( 

295 self.config.enable_file_conversion 

296 and self.file_detector 

297 and self.file_converter 

298 and self.file_detector.is_supported_for_conversion(file_path) 

299 ) 

300 

301 if needs_conversion: 

302 self.logger.debug("File needs conversion", file_path=rel_path) 

303 try: 

304 # Convert file to markdown 

305 assert self.file_converter is not None # Type checker hint 

306 content = self.file_converter.convert_file(file_path) 

307 content_type = "md" # Converted files are markdown 

308 conversion_method = "markitdown" 

309 conversion_failed = False 

310 self.logger.info("File conversion successful", file_path=rel_path) 

311 except FileConversionError as e: 

312 self.logger.warning( 

313 "File conversion failed, creating fallback document", 

314 file_path=rel_path, 

315 error=str(e), 

316 ) 

317 # Create fallback document 

318 assert self.file_converter is not None # Type checker hint 

319 content = self.file_converter.create_fallback_document(file_path, e) 

320 content_type = "md" # Fallback is also markdown 

321 conversion_method = "markitdown_fallback" 

322 conversion_failed = True 

323 else: 

324 # Read file content normally 

325 content = self.git_ops.get_file_content(file_path) 

326 # Get file extension without the dot 

327 content_type = os.path.splitext(file_path)[1].lower().lstrip(".") 

328 conversion_method = None 

329 conversion_failed = False 

330 

331 first_commit_date = self.git_ops.get_first_commit_date(file_path) 

332 

333 # Get last commit date 

334 last_commit_date = self.git_ops.get_last_commit_date(file_path) 

335 

336 # Extract metadata 

337 metadata = self.metadata_extractor.extract_all_metadata( 

338 file_path=rel_path, content=content 

339 ) 

340 

341 # Add Git-specific metadata 

342 metadata.update( 

343 { 

344 "repository_url": self.config.base_url, 

345 "branch": self.config.branch, 

346 "last_commit_date": ( 

347 last_commit_date.isoformat() if last_commit_date else None 

348 ), 

349 } 

350 ) 

351 

352 # Add file conversion metadata if applicable 

353 if needs_conversion: 

354 metadata.update( 

355 { 

356 "conversion_method": conversion_method, 

357 "conversion_failed": conversion_failed, 

358 "original_file_type": os.path.splitext(file_path)[1] 

359 .lower() 

360 .lstrip("."), 

361 } 

362 ) 

363 

364 self.logger.debug(f"Processed Git file: /{rel_path!s}") 

365 

366 # Create document 

367 # Normalize path separators for URL (use forward slashes on all platforms) 

368 normalized_rel_path = rel_path.replace(os.sep, "/").replace("\\", "/") 

369 git_document = Document( 

370 title=os.path.basename(file_path), 

371 content=content, 

372 content_type=content_type, 

373 metadata=metadata, 

374 source_type=SourceType.GIT, 

375 source=self.config.source, 

376 url=f"{str(self.config.base_url).replace('.git', '')}/blob/{self.config.branch}/{normalized_rel_path}", 

377 is_deleted=False, 

378 created_at=first_commit_date, 

379 updated_at=last_commit_date, 

380 ) 

381 

382 return git_document 

383 except Exception as e: 

384 self.logger.error( 

385 "Failed to process file", file_path=file_path, error=str(e) 

386 ) 

387 raise 

388 

389 async def get_documents(self) -> list[Document]: 

390 """Get all documents from the repository. 

391 

392 Returns: 

393 List of documents 

394 

395 Raises: 

396 Exception: If document retrieval fails 

397 """ 

398 try: 

399 self._ensure_initialized() 

400 try: 

401 files = ( 

402 self.git_ops.list_files() 

403 ) # This will raise ValueError if not initialized 

404 except ValueError as e: 

405 self.logger.error("Failed to list files", error=str(e)) 

406 raise ValueError("Repository not initialized") from e 

407 

408 documents = [] 

409 

410 for file_path in files: 

411 if not self.file_processor.should_process_file(file_path): # type: ignore 

412 continue 

413 

414 try: 

415 document = self._process_file(file_path) 

416 documents.append(document) 

417 

418 except Exception as e: 

419 self.logger.error( 

420 "Failed to process file", file_path=file_path, error=str(e) 

421 ) 

422 continue 

423 

424 # Return all documents that need to be processed 

425 return documents 

426 

427 except ValueError as e: 

428 # Re-raise ValueError to maintain the error type 

429 self.logger.error("Failed to get documents", error=str(e)) 

430 raise 

431 except Exception as e: 

432 self.logger.error("Failed to get documents", error=str(e)) 

433 raise 

434 

435 def _ensure_initialized(self): 

436 """Ensure the repository is initialized before performing operations.""" 

437 if not self._initialized: 

438 self.logger.error( 

439 "Repository not initialized. Use the connector as a context manager." 

440 ) 

441 raise ValueError("Repository not initialized")