Coverage for src / qdrant_loader / connectors / git / metadata_extractor.py: 77%

230 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-12 09:46 +0000

1import os 

2import re 

3from typing import Any 

4from urllib.parse import urlparse 

5 

6import chardet 

7import git 

8 

9from qdrant_loader.connectors.git.config import GitRepoConfig 

10from qdrant_loader.utils.logging import LoggingConfig 

11 

12logger = LoggingConfig.get_logger(__name__) 

13 

14 

15class GitMetadataExtractor: 

16 """Extract metadata from Git repository files.""" 

17 

18 def __init__(self, config: GitRepoConfig): 

19 """Initialize the Git metadata extractor. 

20 

21 Args: 

22 config (GitRepoConfig): Configuration for the Git repository. 

23 """ 

24 self.config = config 

25 self.logger = logger 

26 

27 def extract_all_metadata(self, file_path: str, content: str) -> dict[str, Any]: 

28 """Extract all metadata for a file. 

29 

30 Args: 

31 file_path: Path to the file. 

32 content: Content of the file. 

33 

34 Returns: 

35 dict[str, Any]: Dictionary containing all metadata. 

36 """ 

37 self.logger.debug(f"Starting metadata extraction for file: {file_path!s}") 

38 

39 file_metadata = self._extract_file_metadata(file_path, content) 

40 repo_metadata = self._extract_repo_metadata(file_path) 

41 git_metadata = self._extract_git_metadata(file_path) 

42 

43 # Only extract structure metadata for markdown files 

44 structure_metadata = {} 

45 if file_path.lower().endswith(".md"): 

46 self.logger.debug(f"Processing markdown file: {file_path!s}") 

47 structure_metadata = self._extract_structure_metadata(content) 

48 

49 metadata = { 

50 **file_metadata, 

51 **repo_metadata, 

52 **git_metadata, 

53 **structure_metadata, 

54 } 

55 

56 self.logger.debug(f"Completed metadata extraction for {file_path!s}.") 

57 self.logger.debug(f"Metadata: {metadata!s}") 

58 return metadata 

59 

60 def _extract_file_metadata(self, file_path: str, content: str) -> dict[str, Any]: 

61 """Extract metadata about the file itself.""" 

62 # Get relative path from repository root 

63 # Handle cross-drive paths on Windows (ValueError when paths are on different drives) 

64 try: 

65 rel_path = os.path.relpath(file_path, self.config.temp_dir) 

66 except ValueError: 

67 raise ValueError( 

68 f"Cannot compute relative path for {file_path} from {self.config.temp_dir}. " 

69 "Files on different drives should be filtered during file processing." 

70 ) 

71 file_type = os.path.splitext(rel_path)[1] 

72 file_name = os.path.basename(rel_path) 

73 file_encoding = self._detect_encoding(content) 

74 # Count lines using splitlines(), but handle special case for whitespace-only content 

75 if not content: 

76 line_count = 0 

77 elif content.strip() == "" and "\n" in content: 

78 # Special case: whitespace-only content with newlines 

79 # Count newlines + 1 to include all whitespace segments 

80 line_count = content.count("\n") + 1 

81 else: 

82 # Normal content: use splitlines() which handles trailing newlines correctly 

83 line_count = len(content.splitlines()) 

84 word_count = len(content.split()) 

85 file_size = len(content.encode(file_encoding)) 

86 

87 return { 

88 "file_type": file_type, 

89 "file_name": file_name, 

90 "file_directory": ( 

91 os.path.dirname(rel_path) 

92 if not os.path.isabs(rel_path) 

93 else os.path.dirname(file_path) 

94 ), 

95 "file_encoding": file_encoding, 

96 "line_count": line_count, 

97 "word_count": word_count, 

98 "file_size": file_size, 

99 "has_code_blocks": self._has_code_blocks(content), 

100 "has_images": self._has_images(content), 

101 "has_links": self._has_links(content), 

102 } 

103 

104 def _extract_repo_metadata(self, file_path: str) -> dict[str, Any]: 

105 """Extract repository metadata from the given file path. 

106 

107 Args: 

108 file_path (str): Path to the file. 

109 

110 Returns: 

111 dict[str, Any]: Dictionary containing repository metadata. 

112 """ 

113 try: 

114 # Get repository URL from config 

115 repo_url = str(self.config.base_url) 

116 if not repo_url: 

117 return {} 

118 

119 # Extract repository name and owner from normalized URL 

120 normalized_url = repo_url[:-4] if repo_url.endswith(".git") else repo_url 

121 repo_parts = normalized_url.split("/") 

122 

123 # Handle different Git hosting platforms using secure URL parsing 

124 parsed_url = urlparse(repo_url) 

125 hostname = parsed_url.hostname 

126 

127 if hostname == "dev.azure.com": 

128 # Azure DevOps format: https://dev.azure.com/org/project/_git/repo 

129 if len(repo_parts) >= 5 and "_git" in repo_parts: 

130 git_index = repo_parts.index("_git") 

131 if git_index >= 1: 

132 repo_owner = repo_parts[git_index - 2] # org 

133 repo_name = repo_parts[git_index + 1] # repo 

134 else: 

135 return {} 

136 else: 

137 return {} 

138 elif hostname in ["github.com", "gitlab.com"] or ( 

139 hostname and hostname.endswith(".github.com") 

140 ): 

141 # Standard format: github.com/owner/repo or gitlab.com/owner/repo 

142 # Also handle GitHub Enterprise subdomains 

143 if len(repo_parts) >= 2: 

144 repo_owner = repo_parts[-2] 

145 repo_name = repo_parts[-1] 

146 else: 

147 return {} 

148 else: 

149 # Handle other Git hosting platforms (GitLab self-hosted, etc.) 

150 if len(repo_parts) >= 2: 

151 repo_owner = repo_parts[-2] 

152 repo_name = repo_parts[-1] 

153 else: 

154 # Invalid URL format 

155 return {} 

156 

157 # Initialize metadata with default values 

158 metadata = { 

159 "repository_name": repo_name, 

160 "repository_owner": repo_owner, 

161 "repository_url": repo_url, 

162 "repository_description": "", 

163 "repository_language": "", 

164 } 

165 

166 try: 

167 repo = git.Repo(self.config.temp_dir) 

168 if repo and not repo.bare: 

169 config = repo.config_reader() 

170 # Try to get description from github section first 

171 if config.has_section("github"): 

172 metadata["repository_description"] = str( 

173 config.get_value("github", "description", "") 

174 ) 

175 metadata["repository_language"] = str( 

176 config.get_value("github", "language", "") 

177 ) 

178 # Fall back to core section if needed 

179 if not metadata["repository_description"] and config.has_section( 

180 "core" 

181 ): 

182 metadata["repository_description"] = str( 

183 config.get_value("core", "description", "") 

184 ) 

185 self.logger.debug(f"Repository metadata extracted: {metadata!s}") 

186 except git.InvalidGitRepositoryError: 

187 # If the directory is not a valid Git repository, we can't extract any metadata 

188 self.logger.error("Invalid Git repository directory") 

189 return {} 

190 except Exception as e: 

191 self.logger.error(f"Failed to read Git config: {e}") 

192 

193 return metadata 

194 except Exception as e: 

195 self.logger.error(f"Failed to extract repository metadata: {str(e)!s}") 

196 return {} 

197 

198 def _extract_git_metadata(self, file_path: str) -> dict[str, Any]: 

199 """Extract Git-specific metadata.""" 

200 try: 

201 repo = git.Repo(self.config.temp_dir) 

202 metadata = {} 

203 

204 try: 

205 # Get the relative path from the repository root 

206 rel_path = os.path.relpath(file_path, repo.working_dir) 

207 

208 # Try to get commits for the file 

209 commits = list(repo.iter_commits(paths=rel_path, max_count=1)) 

210 if commits: 

211 last_commit = commits[0] 

212 metadata.update( 

213 { 

214 "last_commit_date": last_commit.committed_datetime.isoformat(), 

215 "last_commit_author": last_commit.author.name, 

216 "last_commit_message": last_commit.message.strip().split( 

217 "\n" 

218 )[0], 

219 } 

220 ) 

221 else: 

222 # If no commits found for the file, try getting the latest commit 

223 commits = list(repo.iter_commits(max_count=1)) 

224 if commits: 

225 last_commit = commits[0] 

226 metadata.update( 

227 { 

228 "last_commit_date": last_commit.committed_datetime.isoformat(), 

229 "last_commit_author": last_commit.author.name, 

230 "last_commit_message": last_commit.message.strip().split( 

231 "\n" 

232 )[ 

233 0 

234 ], 

235 } 

236 ) 

237 else: 

238 # If still no commits found, use repository's HEAD commit 

239 head_commit = repo.head.commit 

240 metadata.update( 

241 { 

242 "last_commit_date": head_commit.committed_datetime.isoformat(), 

243 "last_commit_author": head_commit.author.name, 

244 "last_commit_message": head_commit.message.strip().split( 

245 "\n" 

246 )[ 

247 0 

248 ], 

249 } 

250 ) 

251 except Exception as e: 

252 self.logger.debug(f"Failed to get commits: {e}") 

253 # Try one last time with HEAD commit 

254 try: 

255 head_commit = repo.head.commit 

256 metadata.update( 

257 { 

258 "last_commit_date": head_commit.committed_datetime.isoformat(), 

259 "last_commit_author": head_commit.author.name, 

260 "last_commit_message": head_commit.message.strip().split( 

261 "\n" 

262 )[0], 

263 } 

264 ) 

265 except Exception as e: 

266 self.logger.debug(f"Failed to get HEAD commit: {e}") 

267 

268 return metadata 

269 except Exception as e: 

270 self.logger.warning(f"Failed to extract Git metadata: {str(e)!s}") 

271 return {} 

272 

273 def _extract_structure_metadata(self, content: str) -> dict[str, Any]: 

274 """Extract metadata about the document structure.""" 

275 self.logger.debug("Starting structure metadata extraction") 

276 self.logger.debug(f"Content to process:\n{content!s}") 

277 

278 has_toc = False 

279 heading_levels = [] 

280 sections_count = 0 

281 

282 # Check if content is markdown by looking for markdown headers 

283 # Look for markdown headers that: 

284 # 1. Start with 1-6 # characters at the start of a line or after a newline 

285 # 2. Are followed by whitespace and text 

286 # 3. Continue until the next newline or end of content 

287 headings = re.findall(r"^[ \t]*(#{1,6})[ \t]+(.+?)$", content, re.MULTILINE) 

288 self.logger.debug(f"Found {len(headings)!s} headers in content") 

289 

290 if headings: 

291 self.logger.debug(f"Headers found: {headings!s}") 

292 # Check for various TOC formats with different heading levels 

293 toc_patterns = [ 

294 r"#+\s*Table\s+of\s+Contents", 

295 r"#+\s*Contents", 

296 r"#+\s*TOC", 

297 ] 

298 has_toc = any( 

299 re.search(pattern, content, re.IGNORECASE) for pattern in toc_patterns 

300 ) 

301 heading_levels = [len(h[0]) for h in headings] 

302 sections_count = len(heading_levels) 

303 self.logger.debug( 

304 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}" 

305 ) 

306 else: 

307 self.logger.warning("No headers found in content") 

308 # Log the first few lines of content for debugging 

309 first_lines = "\n".join(content.splitlines()[:5]) 

310 self.logger.debug(f"First few lines of content:\n{first_lines!s}") 

311 # Try alternative header detection 

312 alt_headings = re.findall(r"^#{1,6}\s+.+$", content, re.MULTILINE) 

313 if alt_headings: 

314 self.logger.debug( 

315 f"Found {len(alt_headings)!s} headers using alternative pattern" 

316 ) 

317 self.logger.debug(f"Alternative headers found: {alt_headings!s}") 

318 has_toc = "## Table of Contents" in content or "## Contents" in content 

319 heading_levels = [] 

320 for h in alt_headings: 

321 match = re.match(r"^(#{1,6})", h) 

322 if match: 

323 heading_levels.append(len(match.group(1))) 

324 sections_count = len(heading_levels) 

325 self.logger.debug( 

326 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}" 

327 ) 

328 

329 metadata = { 

330 "has_toc": has_toc, 

331 "heading_levels": heading_levels, 

332 "sections_count": sections_count, 

333 } 

334 

335 self.logger.debug(f"Structure metadata extraction completed: {metadata!s}") 

336 return metadata 

337 

338 def _get_repo_description(self, repo: git.Repo, file_path: str) -> str: 

339 """Get repository description from Git config or README.""" 

340 try: 

341 # Try to get description from Git config 

342 config = repo.config_reader() 

343 try: 

344 if config.has_section('remote "origin"'): 

345 description = str( 

346 config.get_value('remote "origin"', "description", default="") 

347 ) 

348 if ( 

349 description 

350 and isinstance(description, str) 

351 and description.strip() 

352 and "Unnamed repository;" not in description 

353 ): 

354 return description.strip() 

355 except Exception as e: 

356 self.logger.debug(f"Failed to read Git config: {e}") 

357 

358 # Try to find description in README files 

359 readme_files = ["README.md", "README.txt", "README", "README.rst"] 

360 repo_root = repo.working_dir 

361 for readme_file in readme_files: 

362 readme_path = os.path.join(repo_root, readme_file) 

363 if os.path.exists(readme_path) and os.path.isfile(readme_path): 

364 try: 

365 with open(readme_path, encoding="utf-8") as f: 

366 content = f.read() 

367 paragraphs = [] 

368 current_paragraph = [] 

369 in_title = True 

370 for line in content.splitlines(): 

371 line = line.strip() 

372 # Skip badges and links at the start 

373 if in_title and ( 

374 line.startswith("[![") or line.startswith("[") 

375 ): 

376 continue 

377 # Skip empty lines 

378 if not line: 

379 if current_paragraph: 

380 paragraphs.append(" ".join(current_paragraph)) 

381 current_paragraph = [] 

382 continue 

383 # Skip titles 

384 if line.startswith("#") or line.startswith("==="): 

385 in_title = True 

386 continue 

387 # Skip common sections 

388 if line.lower() in [ 

389 "## installation", 

390 "## usage", 

391 "## contributing", 

392 "## license", 

393 ]: 

394 break 

395 in_title = False 

396 current_paragraph.append(line) 

397 

398 if current_paragraph: 

399 paragraphs.append(" ".join(current_paragraph)) 

400 

401 # Find first meaningful paragraph 

402 for paragraph in paragraphs: 

403 if ( 

404 len(paragraph) >= 50 

405 ): # Minimum length for a meaningful description 

406 # Clean up markdown links 

407 paragraph = re.sub( 

408 r"\[([^\]]+)\]\([^)]+\)", r"\1", paragraph 

409 ) 

410 # Clean up HTML tags 

411 paragraph = re.sub(r"<[^>]+>", "", paragraph) 

412 # Limit length and break at sentence boundary 

413 if len(paragraph) > 200: 

414 sentences = re.split( 

415 r"(?<=[.!?])\s+", paragraph 

416 ) 

417 description = "" 

418 for sentence in sentences: 

419 if len(description + sentence) > 200: 

420 break 

421 description += sentence + " " 

422 description = description.strip() + "..." 

423 else: 

424 description = paragraph 

425 return description 

426 except Exception as e: 

427 self.logger.debug(f"Failed to read README {readme_file}: {e}") 

428 continue 

429 

430 except Exception as e: 

431 self.logger.debug(f"Failed to get repository description: {e}") 

432 

433 return "" 

434 

435 def _detect_encoding(self, content: str) -> str: 

436 """Detect file encoding.""" 

437 if not content: 

438 return "utf-8" 

439 

440 try: 

441 result = chardet.detect(content.encode()) 

442 if ( 

443 result["encoding"] 

444 and result["encoding"].lower() != "ascii" 

445 and result["confidence"] > 0.8 

446 ): 

447 return result["encoding"].lower() 

448 except Exception as e: 

449 self.logger.error({"event": "Failed to detect encoding", "error": str(e)}) 

450 

451 return "utf-8" 

452 

453 def _detect_language(self, file_path: str) -> str: 

454 """Detect programming language based on file extension.""" 

455 ext = os.path.splitext(file_path)[1].lower() 

456 language_map = { 

457 ".py": "Python", 

458 ".js": "JavaScript", 

459 ".ts": "TypeScript", 

460 ".java": "Java", 

461 ".cpp": "C++", 

462 ".c": "C", 

463 ".go": "Go", 

464 ".rs": "Rust", 

465 ".rb": "Ruby", 

466 ".php": "PHP", 

467 ".cs": "C#", 

468 ".scala": "Scala", 

469 ".kt": "Kotlin", 

470 ".swift": "Swift", 

471 ".m": "Objective-C", 

472 ".h": "C/C++ Header", 

473 ".sh": "Shell", 

474 ".bat": "Batch", 

475 ".ps1": "PowerShell", 

476 ".md": "Markdown", 

477 ".rst": "reStructuredText", 

478 ".txt": "Text", 

479 ".json": "JSON", 

480 ".xml": "XML", 

481 ".yaml": "YAML", 

482 ".yml": "YAML", 

483 ".toml": "TOML", 

484 ".ini": "INI", 

485 ".cfg": "Configuration", 

486 ".conf": "Configuration", 

487 } 

488 return language_map.get(ext, "Unknown") 

489 

490 def _has_code_blocks(self, content: str) -> bool: 

491 """Check if content contains code blocks.""" 

492 return bool(re.search(r"```[a-zA-Z]*\n[\s\S]*?\n```", content)) 

493 

494 def _has_images(self, content: str) -> bool: 

495 """Check if content contains image references.""" 

496 return bool(re.search(r"!\[.*?\]\(.*?\)", content)) 

497 

498 def _has_links(self, content: str) -> bool: 

499 """Check if content contains links.""" 

500 return bool(re.search(r"\[.*?\]\(.*?\)", content)) 

501 

502 def _get_heading_levels(self, content: str) -> list[int]: 

503 """Get list of heading levels in the content.""" 

504 headings = re.findall(r"^(#+)\s", content, re.MULTILINE) 

505 return [len(h) for h in headings]