Coverage for src/qdrant_loader/connectors/git/connector.py: 57%

174 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""Git repository connector implementation.""" 

2 

3import os 

4import shutil 

5import tempfile 

6 

7from qdrant_loader.config.types import SourceType 

8from qdrant_loader.connectors.base import BaseConnector 

9from qdrant_loader.connectors.git.config import GitRepoConfig 

10from qdrant_loader.connectors.git.file_processor import FileProcessor 

11from qdrant_loader.connectors.git.metadata_extractor import GitMetadataExtractor 

12from qdrant_loader.connectors.git.operations import GitOperations 

13from qdrant_loader.core.document import Document 

14from qdrant_loader.core.file_conversion import ( 

15 FileConverter, 

16 FileDetector, 

17 FileConversionConfig, 

18 FileConversionError, 

19) 

20from qdrant_loader.utils.logging import LoggingConfig 

21 

22logger = LoggingConfig.get_logger(__name__) 

23 

24 

25class GitConnector(BaseConnector): 

26 """Git repository connector.""" 

27 

28 def __init__(self, config: GitRepoConfig): 

29 """Initialize the Git connector. 

30 

31 Args: 

32 config: Configuration for the Git repository 

33 """ 

34 super().__init__(config) 

35 self.config = config 

36 self.temp_dir = None # Will be set in __enter__ 

37 self.metadata_extractor = GitMetadataExtractor(config=self.config) 

38 self.git_ops = GitOperations() 

39 self.file_processor = None # Will be initialized in __enter__ 

40 self.logger = LoggingConfig.get_logger(__name__) 

41 self.logger.debug("Initializing GitConnector") 

42 self.logger.debug("GitConnector Configuration", config=config.model_dump()) 

43 self._initialized = False 

44 

45 # Initialize file conversion components if enabled 

46 self.file_converter = None 

47 self.file_detector = None 

48 if self.config.enable_file_conversion: 

49 self.logger.debug("File conversion enabled for Git connector") 

50 # File conversion config will be set from global config during ingestion 

51 self.file_detector = FileDetector() 

52 else: 

53 self.logger.debug("File conversion disabled for Git connector") 

54 

55 def set_file_conversion_config(self, file_conversion_config: FileConversionConfig): 

56 """Set file conversion configuration from global config. 

57 

58 Args: 

59 file_conversion_config: Global file conversion configuration 

60 """ 

61 if self.config.enable_file_conversion: 

62 self.file_converter = FileConverter(file_conversion_config) 

63 self.logger.debug("File converter initialized with global config") 

64 

65 async def __aenter__(self): 

66 """Async context manager entry.""" 

67 try: 

68 # Create temporary directory 

69 self.temp_dir = tempfile.mkdtemp() 

70 self.config.temp_dir = ( 

71 self.temp_dir 

72 ) # Update config with the actual temp dir 

73 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir) 

74 

75 # Initialize file processor 

76 self.file_processor = FileProcessor( 

77 config=self.config, 

78 temp_dir=self.temp_dir, 

79 file_detector=self.file_detector, 

80 ) 

81 

82 # Get auth token from config 

83 auth_token = None 

84 if self.config.token: 

85 auth_token = self.config.token 

86 self.logger.debug( 

87 "Using authentication token", token_length=len(auth_token) 

88 ) 

89 

90 # Clone repository 

91 self.logger.debug( 

92 "Attempting to clone repository", 

93 url=self.config.base_url, 

94 branch=self.config.branch, 

95 depth=self.config.depth, 

96 temp_dir=self.temp_dir, 

97 ) 

98 

99 try: 

100 self.git_ops.clone( 

101 url=str(self.config.base_url), 

102 to_path=self.temp_dir, 

103 branch=self.config.branch, 

104 depth=self.config.depth, 

105 auth_token=auth_token, 

106 ) 

107 except Exception as clone_error: 

108 self.logger.error( 

109 "Failed to clone repository", 

110 error=str(clone_error), 

111 error_type=type(clone_error).__name__, 

112 url=self.config.base_url, 

113 branch=self.config.branch, 

114 temp_dir=self.temp_dir, 

115 ) 

116 raise 

117 

118 # Verify repository initialization 

119 if not self.git_ops.repo: 

120 self.logger.error( 

121 "Repository not initialized after clone", temp_dir=self.temp_dir 

122 ) 

123 raise ValueError("Repository not initialized") 

124 

125 # Verify repository is valid 

126 try: 

127 self.git_ops.repo.git.status() 

128 self.logger.debug( 

129 "Repository is valid and accessible", temp_dir=self.temp_dir 

130 ) 

131 except Exception as status_error: 

132 self.logger.error( 

133 "Failed to verify repository status", 

134 error=str(status_error), 

135 error_type=type(status_error).__name__, 

136 temp_dir=self.temp_dir, 

137 ) 

138 raise 

139 

140 self._initialized = True 

141 return self 

142 except ValueError as e: 

143 # Preserve ValueError type 

144 self.logger.error("Failed to set up Git repository", error=str(e)) 

145 raise ValueError(str(e)) from e # Re-raise with the same message 

146 except Exception as e: 

147 self.logger.error( 

148 "Failed to set up Git repository", 

149 error=str(e), 

150 error_type=type(e).__name__, 

151 temp_dir=self.temp_dir, 

152 ) 

153 # Clean up if something goes wrong 

154 if self.temp_dir: 

155 self._cleanup() 

156 raise RuntimeError(f"Failed to set up Git repository: {e}") from e 

157 

158 def __enter__(self): 

159 """Synchronous context manager entry.""" 

160 if not self._initialized: 

161 self._initialized = True 

162 # Create temporary directory 

163 self.temp_dir = tempfile.mkdtemp() 

164 self.config.temp_dir = ( 

165 self.temp_dir 

166 ) # Update config with the actual temp dir 

167 self.logger.debug("Created temporary directory", temp_dir=self.temp_dir) 

168 

169 # Initialize file processor 

170 self.file_processor = FileProcessor( 

171 config=self.config, 

172 temp_dir=self.temp_dir, 

173 file_detector=self.file_detector, 

174 ) 

175 

176 # Get auth token from config 

177 auth_token = None 

178 if self.config.token: 

179 auth_token = self.config.token 

180 self.logger.debug( 

181 "Using authentication token", token_length=len(auth_token) 

182 ) 

183 

184 # Clone repository 

185 self.logger.debug( 

186 "Attempting to clone repository", 

187 url=self.config.base_url, 

188 branch=self.config.branch, 

189 depth=self.config.depth, 

190 temp_dir=self.temp_dir, 

191 ) 

192 

193 try: 

194 self.git_ops.clone( 

195 url=str(self.config.base_url), 

196 to_path=self.temp_dir, 

197 branch=self.config.branch, 

198 depth=self.config.depth, 

199 auth_token=auth_token, 

200 ) 

201 except Exception as clone_error: 

202 self.logger.error( 

203 "Failed to clone repository", 

204 error=str(clone_error), 

205 error_type=type(clone_error).__name__, 

206 url=self.config.base_url, 

207 branch=self.config.branch, 

208 temp_dir=self.temp_dir, 

209 ) 

210 raise 

211 

212 # Verify repository initialization 

213 if not self.git_ops.repo: 

214 self.logger.error( 

215 "Repository not initialized after clone", temp_dir=self.temp_dir 

216 ) 

217 raise ValueError("Repository not initialized") 

218 

219 # Verify repository is valid 

220 try: 

221 self.git_ops.repo.git.status() 

222 self.logger.debug( 

223 "Repository is valid and accessible", temp_dir=self.temp_dir 

224 ) 

225 except Exception as status_error: 

226 self.logger.error( 

227 "Failed to verify repository status", 

228 error=str(status_error), 

229 error_type=type(status_error).__name__, 

230 temp_dir=self.temp_dir, 

231 ) 

232 raise 

233 return self 

234 

235 async def __aexit__(self, exc_type, exc_val, exc_tb): 

236 """Async context manager exit.""" 

237 self._cleanup() 

238 self._initialized = False 

239 

240 def __exit__(self, exc_type, exc_val, exc_tb): 

241 """Clean up resources.""" 

242 self._cleanup() 

243 

244 def _cleanup(self): 

245 """Clean up temporary directory.""" 

246 if self.temp_dir and os.path.exists(self.temp_dir): 

247 try: 

248 shutil.rmtree(self.temp_dir) 

249 self.logger.debug("Cleaned up temporary directory") 

250 except Exception as e: 

251 self.logger.error(f"Failed to clean up temporary directory: {e}") 

252 

253 def _process_file(self, file_path: str) -> Document: 

254 """Process a single file. 

255 

256 Args: 

257 file_path: Path to the file 

258 

259 Returns: 

260 Document instance with file content and metadata 

261 

262 Raises: 

263 Exception: If file processing fails 

264 """ 

265 try: 

266 # Get relative path from repository root 

267 rel_path = os.path.relpath(file_path, self.temp_dir) 

268 

269 # Check if file needs conversion 

270 needs_conversion = ( 

271 self.config.enable_file_conversion 

272 and self.file_detector 

273 and self.file_converter 

274 and self.file_detector.is_supported_for_conversion(file_path) 

275 ) 

276 

277 if needs_conversion: 

278 self.logger.debug("File needs conversion", file_path=rel_path) 

279 try: 

280 # Convert file to markdown 

281 assert self.file_converter is not None # Type checker hint 

282 content = self.file_converter.convert_file(file_path) 

283 content_type = "md" # Converted files are markdown 

284 conversion_method = "markitdown" 

285 conversion_failed = False 

286 self.logger.info("File conversion successful", file_path=rel_path) 

287 except FileConversionError as e: 

288 self.logger.warning( 

289 "File conversion failed, creating fallback document", 

290 file_path=rel_path, 

291 error=str(e), 

292 ) 

293 # Create fallback document 

294 assert self.file_converter is not None # Type checker hint 

295 content = self.file_converter.create_fallback_document(file_path, e) 

296 content_type = "md" # Fallback is also markdown 

297 conversion_method = "markitdown_fallback" 

298 conversion_failed = True 

299 else: 

300 # Read file content normally 

301 content = self.git_ops.get_file_content(file_path) 

302 # Get file extension without the dot 

303 content_type = os.path.splitext(file_path)[1].lower().lstrip(".") 

304 conversion_method = None 

305 conversion_failed = False 

306 

307 first_commit_date = self.git_ops.get_first_commit_date(file_path) 

308 

309 # Get last commit date 

310 last_commit_date = self.git_ops.get_last_commit_date(file_path) 

311 

312 # Extract metadata 

313 metadata = self.metadata_extractor.extract_all_metadata( 

314 file_path=rel_path, content=content 

315 ) 

316 

317 # Add Git-specific metadata 

318 metadata.update( 

319 { 

320 "repository_url": self.config.base_url, 

321 "branch": self.config.branch, 

322 "last_commit_date": ( 

323 last_commit_date.isoformat() if last_commit_date else None 

324 ), 

325 } 

326 ) 

327 

328 # Add file conversion metadata if applicable 

329 if needs_conversion: 

330 metadata.update( 

331 { 

332 "conversion_method": conversion_method, 

333 "conversion_failed": conversion_failed, 

334 "original_file_type": os.path.splitext(file_path)[1] 

335 .lower() 

336 .lstrip("."), 

337 } 

338 ) 

339 

340 self.logger.debug(f"Processed Git file: /{rel_path!s}") 

341 

342 # Create document 

343 git_document = Document( 

344 title=os.path.basename(file_path), 

345 content=content, 

346 content_type=content_type, 

347 metadata=metadata, 

348 source_type=SourceType.GIT, 

349 source=self.config.source, 

350 url=f"{str(self.config.base_url).replace('.git', '')}/blob/{self.config.branch}/{rel_path}", 

351 is_deleted=False, 

352 created_at=first_commit_date, 

353 updated_at=last_commit_date, 

354 ) 

355 

356 return git_document 

357 except Exception as e: 

358 self.logger.error( 

359 "Failed to process file", file_path=file_path, error=str(e) 

360 ) 

361 raise 

362 

363 async def get_documents(self) -> list[Document]: 

364 """Get all documents from the repository. 

365 

366 Returns: 

367 List of documents 

368 

369 Raises: 

370 Exception: If document retrieval fails 

371 """ 

372 try: 

373 self._ensure_initialized() 

374 try: 

375 files = ( 

376 self.git_ops.list_files() 

377 ) # This will raise ValueError if not initialized 

378 except ValueError as e: 

379 self.logger.error("Failed to list files", error=str(e)) 

380 raise ValueError("Repository not initialized") from e 

381 

382 documents = [] 

383 

384 for file_path in files: 

385 if not self.file_processor.should_process_file(file_path): # type: ignore 

386 continue 

387 

388 try: 

389 document = self._process_file(file_path) 

390 documents.append(document) 

391 

392 except Exception as e: 

393 self.logger.error( 

394 "Failed to process file", file_path=file_path, error=str(e) 

395 ) 

396 continue 

397 

398 # Return all documents that need to be processed 

399 return documents 

400 

401 except ValueError as e: 

402 # Re-raise ValueError to maintain the error type 

403 self.logger.error("Failed to get documents", error=str(e)) 

404 raise 

405 except Exception as e: 

406 self.logger.error("Failed to get documents", error=str(e)) 

407 raise 

408 

409 def _ensure_initialized(self): 

410 """Ensure the repository is initialized before performing operations.""" 

411 if not self._initialized: 

412 self.logger.error( 

413 "Repository not initialized. Use the connector as a context manager." 

414 ) 

415 raise ValueError("Repository not initialized")