Coverage for src/qdrant_loader/connectors/git/connector.py: 68%

179 statements  

« prev     ^ index     » next       coverage.py v7.10.0, created at 2025-07-25 11:39 +0000

1"""Git repository connector implementation.""" 

2 

3import os 

4import shutil 

5import tempfile 

6 

7from qdrant_loader.config.types import SourceType 

8from qdrant_loader.connectors.base import BaseConnector 

9from qdrant_loader.connectors.git.config import GitRepoConfig 

10from qdrant_loader.connectors.git.file_processor import FileProcessor 

11from qdrant_loader.connectors.git.metadata_extractor import GitMetadataExtractor 

12from qdrant_loader.connectors.git.operations import GitOperations 

13from qdrant_loader.core.document import Document 

14from qdrant_loader.core.file_conversion import ( 

15 FileConversionConfig, 

16 FileConversionError, 

17 FileConverter, 

18 FileDetector, 

19) 

20from qdrant_loader.utils.logging import LoggingConfig 

21 

22logger = LoggingConfig.get_logger(__name__) 

23 

24 

25class GitConnector(BaseConnector): 

26 """Git repository connector.""" 

27 

28 def __init__(self, config: GitRepoConfig): 

29 """Initialize the Git connector. 

30 

31 Args: 

32 config: Configuration for the Git repository 

33 """ 

34 super().__init__(config) 

35 self.config = config 

36 self.temp_dir = None # Will be set in __enter__ 

37 self.metadata_extractor = GitMetadataExtractor(config=self.config) 

38 self.git_ops = GitOperations() 

39 self.file_processor = None # Will be initialized in __enter__ 

40 self.logger = LoggingConfig.get_logger(__name__) 

41 self.logger.debug("Initializing GitConnector") 

42 self.logger.debug("GitConnector Configuration", config=config.model_dump()) 

43 self._initialized = False 

44 

45 # Initialize file conversion components if enabled 

46 self.file_converter = None 

47 self.file_detector = None 

48 if self.config.enable_file_conversion: 

49 self.logger.debug("File conversion enabled for Git connector") 

50 # File conversion config will be set from global config during ingestion 

51 self.file_detector = FileDetector() 

52 else: 

53 self.logger.debug("File conversion disabled for Git connector") 

54 

55 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig): 

56 """Set file conversion configuration from global config. 

57 

58 Args: 

59 file_conversion_config: Global file conversion configuration 

60 """ 

61 if self.config.enable_file_conversion: 

62 self.file_converter = FileConverter(file_conversion_config) 

63 self.logger.debug("File converter initialized with global config") 

64 

65 async def __aenter__(self): 

66 """Async context manager entry.""" 

67 try: 

68 # Create temporary directory 

69 self.temp_dir = tempfile.mkdtemp() 

70 self.config.temp_dir = ( 

71 self.temp_dir 

72 ) # Update config with the actual temp dir 

73 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir) 

74 

75 # Initialize file processor 

76 self.file_processor = FileProcessor( 

77 config=self.config, 

78 temp_dir=self.temp_dir, 

79 file_detector=self.file_detector, 

80 ) 

81 

82 # Get auth token from config 

83 auth_token = None 

84 if self.config.token: 

85 auth_token = self.config.token 

86 self.logger.debug( 

87 "Using authentication token", token_length=len(auth_token) 

88 ) 

89 

90 # Clone repository 

91 self.logger.debug( 

92 "Attempting to clone repository", 

93 url=self.config.base_url, 

94 branch=self.config.branch, 

95 depth=self.config.depth, 

96 temp_dir=self.temp_dir, 

97 ) 

98 

99 try: 

100 self.git_ops.clone( 

101 url=str(self.config.base_url), 

102 to_path=self.temp_dir, 

103 branch=self.config.branch, 

104 depth=self.config.depth, 

105 auth_token=auth_token, 

106 ) 

107 except Exception as clone_error: 

108 self.logger.error( 

109 "Failed to clone repository", 

110 error=str(clone_error), 

111 error_type=type(clone_error).__name__, 

112 url=self.config.base_url, 

113 branch=self.config.branch, 

114 temp_dir=self.temp_dir, 

115 ) 

116 raise 

117 

118 # Verify repository initialization 

119 if not self.git_ops.repo: 

120 self.logger.error( 

121 "Repository not initialized after clone", temp_dir=self.temp_dir 

122 ) 

123 raise ValueError("Repository not initialized") 

124 

125 # Verify repository is valid 

126 try: 

127 self.git_ops.repo.git.status() 

128 self.logger.debug( 

129 "Repository is valid and accessible", temp_dir=self.temp_dir 

130 ) 

131 except Exception as status_error: 

132 self.logger.error( 

133 "Failed to verify repository status", 

134 error=str(status_error), 

135 error_type=type(status_error).__name__, 

136 temp_dir=self.temp_dir, 

137 ) 

138 raise 

139 

140 self._initialized = True 

141 return self 

142 except ValueError as e: 

143 # Preserve ValueError type 

144 self.logger.error("Failed to set up Git repository", error=str(e)) 

145 raise ValueError(str(e)) from e # Re-raise with the same message 

146 except Exception as e: 

147 self.logger.error( 

148 "Failed to set up Git repository", 

149 error=str(e), 

150 error_type=type(e).__name__, 

151 temp_dir=self.temp_dir, 

152 ) 

153 # Clean up if something goes wrong 

154 if self.temp_dir: 

155 self._cleanup() 

156 raise RuntimeError(f"Failed to set up Git repository: {e}") from e 

157 

158 def __enter__(self): 

159 """Synchronous context manager entry.""" 

160 if not self._initialized: 

161 self._initialized = True 

162 # Create temporary directory 

163 self.temp_dir = tempfile.mkdtemp() 

164 self.config.temp_dir = ( 

165 self.temp_dir 

166 ) # Update config with the actual temp dir 

167 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir) 

168 

169 # Initialize file processor 

170 self.file_processor = FileProcessor( 

171 config=self.config, 

172 temp_dir=self.temp_dir, 

173 file_detector=self.file_detector, 

174 ) 

175 

176 # Get auth token from config 

177 auth_token = None 

178 if self.config.token: 

179 auth_token = self.config.token 

180 self.logger.debug( 

181 "Using authentication token", token_length=len(auth_token) 

182 ) 

183 

184 # Clone repository 

185 self.logger.debug( 

186 "Attempting to clone repository", 

187 url=self.config.base_url, 

188 branch=self.config.branch, 

189 depth=self.config.depth, 

190 temp_dir=self.temp_dir, 

191 ) 

192 

193 try: 

194 self.git_ops.clone( 

195 url=str(self.config.base_url), 

196 to_path=self.temp_dir, 

197 branch=self.config.branch, 

198 depth=self.config.depth, 

199 auth_token=auth_token, 

200 ) 

201 except Exception as clone_error: 

202 self.logger.error( 

203 "Failed to clone repository", 

204 error=str(clone_error), 

205 error_type=type(clone_error).__name__, 

206 url=self.config.base_url, 

207 branch=self.config.branch, 

208 temp_dir=self.temp_dir, 

209 ) 

210 raise 

211 

212 # Verify repository initialization 

213 if not self.git_ops.repo: 

214 self.logger.error( 

215 "Repository not initialized after clone", temp_dir=self.temp_dir 

216 ) 

217 raise ValueError("Repository not initialized") 

218 

219 # Verify repository is valid 

220 try: 

221 self.git_ops.repo.git.status() 

222 self.logger.debug( 

223 "Repository is valid and accessible", temp_dir=self.temp_dir 

224 ) 

225 except Exception as status_error: 

226 self.logger.error( 

227 "Failed to verify repository status", 

228 error=str(status_error), 

229 error_type=type(status_error).__name__, 

230 temp_dir=self.temp_dir, 

231 ) 

232 raise 

233 return self 

234 

235 async def __aexit__(self, exc_type, exc_val, exc_tb): 

236 """Async context manager exit.""" 

237 self._cleanup() 

238 self._initialized = False 

239 

240 def __exit__(self, exc_type, exc_val, exc_tb): 

241 """Clean up resources.""" 

242 self._cleanup() 

243 

244 def _cleanup(self): 

245 """Clean up temporary directory.""" 

246 if self.temp_dir and os.path.exists(self.temp_dir): 

247 try: 

248 shutil.rmtree(self.temp_dir) 

249 self.logger.debug("Cleaned up temporary directory") 

250 except Exception as e: 

251 self.logger.error(f"Failed to clean up temporary directory: {e}") 

252 

253 def _process_file(self, file_path: str) -> Document: 

254 """Process a single file. 

255 

256 Args: 

257 file_path: Path to the file 

258 

259 Returns: 

260 Document instance with file content and metadata 

261 

262 Raises: 

263 Exception: If file processing fails 

264 """ 

265 try: 

266 # Get relative path from repository root 

267 rel_path = os.path.relpath(file_path, self.temp_dir) 

268 

269 # Fix cross-platform path issues: ensure we get a proper relative path 

270 # If relpath returns a path that goes up directories (contains ..), 

271 # it means the path calculation failed (common with mixed path styles) 

272 if rel_path.startswith("..") and self.temp_dir: 

273 # Fallback: try to extract relative path manually 

274 if file_path.startswith(self.temp_dir): 

275 # Remove temp_dir prefix and any leading separators 

276 rel_path = ( 

277 file_path[len(self.temp_dir) :] 

278 .lstrip(os.sep) 

279 .lstrip("/") 

280 .lstrip("\\") 

281 ) 

282 else: 

283 # Last resort: use basename 

284 rel_path = os.path.basename(file_path) 

285 

286 # Check if file needs conversion 

287 needs_conversion = ( 

288 self.config.enable_file_conversion 

289 and self.file_detector 

290 and self.file_converter 

291 and self.file_detector.is_supported_for_conversion(file_path) 

292 ) 

293 

294 if needs_conversion: 

295 self.logger.debug("File needs conversion", file_path=rel_path) 

296 try: 

297 # Convert file to markdown 

298 assert self.file_converter is not None # Type checker hint 

299 content = self.file_converter.convert_file(file_path) 

300 content_type = "md" # Converted files are markdown 

301 conversion_method = "markitdown" 

302 conversion_failed = False 

303 self.logger.info("File conversion successful", file_path=rel_path) 

304 except FileConversionError as e: 

305 self.logger.warning( 

306 "File conversion failed, creating fallback document", 

307 file_path=rel_path, 

308 error=str(e), 

309 ) 

310 # Create fallback document 

311 assert self.file_converter is not None # Type checker hint 

312 content = self.file_converter.create_fallback_document(file_path, e) 

313 content_type = "md" # Fallback is also markdown 

314 conversion_method = "markitdown_fallback" 

315 conversion_failed = True 

316 else: 

317 # Read file content normally 

318 content = self.git_ops.get_file_content(file_path) 

319 # Get file extension without the dot 

320 content_type = os.path.splitext(file_path)[1].lower().lstrip(".") 

321 conversion_method = None 

322 conversion_failed = False 

323 

324 first_commit_date = self.git_ops.get_first_commit_date(file_path) 

325 

326 # Get last commit date 

327 last_commit_date = self.git_ops.get_last_commit_date(file_path) 

328 

329 # Extract metadata 

330 metadata = self.metadata_extractor.extract_all_metadata( 

331 file_path=rel_path, content=content 

332 ) 

333 

334 # Add Git-specific metadata 

335 metadata.update( 

336 { 

337 "repository_url": self.config.base_url, 

338 "branch": self.config.branch, 

339 "last_commit_date": ( 

340 last_commit_date.isoformat() if last_commit_date else None 

341 ), 

342 } 

343 ) 

344 

345 # Add file conversion metadata if applicable 

346 if needs_conversion: 

347 metadata.update( 

348 { 

349 "conversion_method": conversion_method, 

350 "conversion_failed": conversion_failed, 

351 "original_file_type": os.path.splitext(file_path)[1] 

352 .lower() 

353 .lstrip("."), 

354 } 

355 ) 

356 

357 self.logger.debug(f"Processed Git file: /{rel_path!s}") 

358 

359 # Create document 

360 # Normalize path separators for URL (use forward slashes on all platforms) 

361 normalized_rel_path = rel_path.replace(os.sep, "/").replace("\\", "/") 

362 git_document = Document( 

363 title=os.path.basename(file_path), 

364 content=content, 

365 content_type=content_type, 

366 metadata=metadata, 

367 source_type=SourceType.GIT, 

368 source=self.config.source, 

369 url=f"{str(self.config.base_url).replace('.git', '')}/blob/{self.config.branch}/{normalized_rel_path}", 

370 is_deleted=False, 

371 created_at=first_commit_date, 

372 updated_at=last_commit_date, 

373 ) 

374 

375 return git_document 

376 except Exception as e: 

377 self.logger.error( 

378 "Failed to process file", file_path=file_path, error=str(e) 

379 ) 

380 raise 

381 

382 async def get_documents(self) -> list[Document]: 

383 """Get all documents from the repository. 

384 

385 Returns: 

386 List of documents 

387 

388 Raises: 

389 Exception: If document retrieval fails 

390 """ 

391 try: 

392 self._ensure_initialized() 

393 try: 

394 files = ( 

395 self.git_ops.list_files() 

396 ) # This will raise ValueError if not initialized 

397 except ValueError as e: 

398 self.logger.error("Failed to list files", error=str(e)) 

399 raise ValueError("Repository not initialized") from e 

400 

401 documents = [] 

402 

403 for file_path in files: 

404 if not self.file_processor.should_process_file(file_path): # type: ignore 

405 continue 

406 

407 try: 

408 document = self._process_file(file_path) 

409 documents.append(document) 

410 

411 except Exception as e: 

412 self.logger.error( 

413 "Failed to process file", file_path=file_path, error=str(e) 

414 ) 

415 continue 

416 

417 # Return all documents that need to be processed 

418 return documents 

419 

420 except ValueError as e: 

421 # Re-raise ValueError to maintain the error type 

422 self.logger.error("Failed to get documents", error=str(e)) 

423 raise 

424 except Exception as e: 

425 self.logger.error("Failed to get documents", error=str(e)) 

426 raise 

427 

428 def _ensure_initialized(self): 

429 """Ensure the repository is initialized before performing operations.""" 

430 if not self._initialized: 

431 self.logger.error( 

432 "Repository not initialized. Use the connector as a context manager." 

433 ) 

434 raise ValueError("Repository not initialized")