Coverage for src/qdrant_loader/connectors/git/metadata_extractor.py: 78%

227 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1import os 

2import re 

3from typing import Any 

4from urllib.parse import urlparse 

5 

6import chardet 

7import git 

8 

9from qdrant_loader.connectors.git.config import GitRepoConfig 

10from qdrant_loader.utils.logging import LoggingConfig 

11 

12logger = LoggingConfig.get_logger(__name__) 

13 

14 

15class GitMetadataExtractor: 

16 """Extract metadata from Git repository files.""" 

17 

18 def __init__(self, config: GitRepoConfig): 

19 """Initialize the Git metadata extractor. 

20 

21 Args: 

22 config (GitRepoConfig): Configuration for the Git repository. 

23 """ 

24 self.config = config 

25 self.logger = logger 

26 

27 def extract_all_metadata(self, file_path: str, content: str) -> dict[str, Any]: 

28 """Extract all metadata for a file. 

29 

30 Args: 

31 file_path: Path to the file. 

32 content: Content of the file. 

33 

34 Returns: 

35 dict[str, Any]: Dictionary containing all metadata. 

36 """ 

37 self.logger.debug(f"Starting metadata extraction for file: {file_path!s}") 

38 

39 file_metadata = self._extract_file_metadata(file_path, content) 

40 repo_metadata = self._extract_repo_metadata(file_path) 

41 git_metadata = self._extract_git_metadata(file_path) 

42 

43 # Only extract structure metadata for markdown files 

44 structure_metadata = {} 

45 if file_path.lower().endswith(".md"): 

46 self.logger.debug(f"Processing markdown file: {file_path!s}") 

47 structure_metadata = self._extract_structure_metadata(content) 

48 

49 metadata = { 

50 **file_metadata, 

51 **repo_metadata, 

52 **git_metadata, 

53 **structure_metadata, 

54 } 

55 

56 self.logger.debug(f"Completed metadata extraction for {file_path!s}.") 

57 self.logger.debug(f"Metadata: {metadata!s}") 

58 return metadata 

59 

60 def _extract_file_metadata(self, file_path: str, content: str) -> dict[str, Any]: 

61 """Extract metadata about the file itself.""" 

62 # Get relative path from repository root 

63 rel_path = os.path.relpath(file_path, self.config.temp_dir) 

64 file_type = os.path.splitext(rel_path)[1] 

65 file_name = os.path.basename(rel_path) 

66 file_encoding = self._detect_encoding(content) 

67 # Count lines using splitlines(), but handle special case for whitespace-only content 

68 if not content: 

69 line_count = 0 

70 elif content.strip() == "" and "\n" in content: 

71 # Special case: whitespace-only content with newlines 

72 # Count newlines + 1 to include all whitespace segments 

73 line_count = content.count("\n") + 1 

74 else: 

75 # Normal content: use splitlines() which handles trailing newlines correctly 

76 line_count = len(content.splitlines()) 

77 word_count = len(content.split()) 

78 file_size = len(content.encode(file_encoding)) 

79 

80 return { 

81 "file_type": file_type, 

82 "file_name": file_name, 

83 "file_directory": os.path.dirname("/" + file_path), 

84 "file_encoding": file_encoding, 

85 "line_count": line_count, 

86 "word_count": word_count, 

87 "file_size": file_size, 

88 "has_code_blocks": self._has_code_blocks(content), 

89 "has_images": self._has_images(content), 

90 "has_links": self._has_links(content), 

91 } 

92 

93 def _extract_repo_metadata(self, file_path: str) -> dict[str, Any]: 

94 """Extract repository metadata from the given file path. 

95 

96 Args: 

97 file_path (str): Path to the file. 

98 

99 Returns: 

100 dict[str, Any]: Dictionary containing repository metadata. 

101 """ 

102 try: 

103 # Get repository URL from config 

104 repo_url = str(self.config.base_url) 

105 if not repo_url: 

106 return {} 

107 

108 # Extract repository name and owner from normalized URL 

109 normalized_url = repo_url[:-4] if repo_url.endswith(".git") else repo_url 

110 repo_parts = normalized_url.split("/") 

111 

112 # Handle different Git hosting platforms using secure URL parsing 

113 parsed_url = urlparse(repo_url) 

114 hostname = parsed_url.hostname 

115 

116 if hostname == "dev.azure.com": 

117 # Azure DevOps format: https://dev.azure.com/org/project/_git/repo 

118 if len(repo_parts) >= 5 and "_git" in repo_parts: 

119 git_index = repo_parts.index("_git") 

120 if git_index >= 1: 

121 repo_owner = repo_parts[git_index - 2] # org 

122 repo_name = repo_parts[git_index + 1] # repo 

123 else: 

124 return {} 

125 else: 

126 return {} 

127 elif hostname in ["github.com", "gitlab.com"] or ( 

128 hostname and hostname.endswith(".github.com") 

129 ): 

130 # Standard format: github.com/owner/repo or gitlab.com/owner/repo 

131 # Also handle GitHub Enterprise subdomains 

132 if len(repo_parts) >= 2: 

133 repo_owner = repo_parts[-2] 

134 repo_name = repo_parts[-1] 

135 else: 

136 return {} 

137 else: 

138 # Handle other Git hosting platforms (GitLab self-hosted, etc.) 

139 if len(repo_parts) >= 2: 

140 repo_owner = repo_parts[-2] 

141 repo_name = repo_parts[-1] 

142 else: 

143 # Invalid URL format 

144 return {} 

145 

146 # Initialize metadata with default values 

147 metadata = { 

148 "repository_name": repo_name, 

149 "repository_owner": repo_owner, 

150 "repository_url": repo_url, 

151 "repository_description": "", 

152 "repository_language": "", 

153 } 

154 

155 try: 

156 repo = git.Repo(self.config.temp_dir) 

157 if repo and not repo.bare: 

158 config = repo.config_reader() 

159 # Try to get description from github section first 

160 if config.has_section("github"): 

161 metadata["repository_description"] = str( 

162 config.get_value("github", "description", "") 

163 ) 

164 metadata["repository_language"] = str( 

165 config.get_value("github", "language", "") 

166 ) 

167 # Fall back to core section if needed 

168 if not metadata["repository_description"] and config.has_section( 

169 "core" 

170 ): 

171 metadata["repository_description"] = str( 

172 config.get_value("core", "description", "") 

173 ) 

174 self.logger.debug(f"Repository metadata extracted: {metadata!s}") 

175 except git.InvalidGitRepositoryError: 

176 # If the directory is not a valid Git repository, we can't extract any metadata 

177 self.logger.error("Invalid Git repository directory") 

178 return {} 

179 except Exception as e: 

180 self.logger.error(f"Failed to read Git config: {e}") 

181 

182 return metadata 

183 except Exception as e: 

184 self.logger.error(f"Failed to extract repository metadata: {str(e)!s}") 

185 return {} 

186 

187 def _extract_git_metadata(self, file_path: str) -> dict[str, Any]: 

188 """Extract Git-specific metadata.""" 

189 try: 

190 repo = git.Repo(self.config.temp_dir) 

191 metadata = {} 

192 

193 try: 

194 # Get the relative path from the repository root 

195 rel_path = os.path.relpath(file_path, repo.working_dir) 

196 

197 # Try to get commits for the file 

198 commits = list(repo.iter_commits(paths=rel_path, max_count=1)) 

199 if commits: 

200 last_commit = commits[0] 

201 metadata.update( 

202 { 

203 "last_commit_date": last_commit.committed_datetime.isoformat(), 

204 "last_commit_author": last_commit.author.name, 

205 "last_commit_message": last_commit.message.strip().split( 

206 "\n" 

207 )[0], 

208 } 

209 ) 

210 else: 

211 # If no commits found for the file, try getting the latest commit 

212 commits = list(repo.iter_commits(max_count=1)) 

213 if commits: 

214 last_commit = commits[0] 

215 metadata.update( 

216 { 

217 "last_commit_date": last_commit.committed_datetime.isoformat(), 

218 "last_commit_author": last_commit.author.name, 

219 "last_commit_message": last_commit.message.strip().split( 

220 "\n" 

221 )[ 

222 0 

223 ], 

224 } 

225 ) 

226 else: 

227 # If still no commits found, use repository's HEAD commit 

228 head_commit = repo.head.commit 

229 metadata.update( 

230 { 

231 "last_commit_date": head_commit.committed_datetime.isoformat(), 

232 "last_commit_author": head_commit.author.name, 

233 "last_commit_message": head_commit.message.strip().split( 

234 "\n" 

235 )[ 

236 0 

237 ], 

238 } 

239 ) 

240 except Exception as e: 

241 self.logger.debug(f"Failed to get commits: {e}") 

242 # Try one last time with HEAD commit 

243 try: 

244 head_commit = repo.head.commit 

245 metadata.update( 

246 { 

247 "last_commit_date": head_commit.committed_datetime.isoformat(), 

248 "last_commit_author": head_commit.author.name, 

249 "last_commit_message": head_commit.message.strip().split( 

250 "\n" 

251 )[0], 

252 } 

253 ) 

254 except Exception as e: 

255 self.logger.debug(f"Failed to get HEAD commit: {e}") 

256 

257 return metadata 

258 except Exception as e: 

259 self.logger.warning(f"Failed to extract Git metadata: {str(e)!s}") 

260 return {} 

261 

262 def _extract_structure_metadata(self, content: str) -> dict[str, Any]: 

263 """Extract metadata about the document structure.""" 

264 self.logger.debug("Starting structure metadata extraction") 

265 self.logger.debug(f"Content to process:\n{content!s}") 

266 

267 has_toc = False 

268 heading_levels = [] 

269 sections_count = 0 

270 

271 # Check if content is markdown by looking for markdown headers 

272 # Look for markdown headers that: 

273 # 1. Start with 1-6 # characters at the start of a line or after a newline 

274 # 2. Are followed by whitespace and text 

275 # 3. Continue until the next newline or end of content 

276 headings = re.findall(r"^[ \t]*(#{1,6})[ \t]+(.+?)$", content, re.MULTILINE) 

277 self.logger.debug(f"Found {len(headings)!s} headers in content") 

278 

279 if headings: 

280 self.logger.debug(f"Headers found: {headings!s}") 

281 # Check for various TOC formats with different heading levels 

282 toc_patterns = [ 

283 r"#+\s*Table\s+of\s+Contents", 

284 r"#+\s*Contents", 

285 r"#+\s*TOC", 

286 ] 

287 has_toc = any( 

288 re.search(pattern, content, re.IGNORECASE) for pattern in toc_patterns 

289 ) 

290 heading_levels = [len(h[0]) for h in headings] 

291 sections_count = len(heading_levels) 

292 self.logger.debug( 

293 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}" 

294 ) 

295 else: 

296 self.logger.warning("No headers found in content") 

297 # Log the first few lines of content for debugging 

298 first_lines = "\n".join(content.splitlines()[:5]) 

299 self.logger.debug(f"First few lines of content:\n{first_lines!s}") 

300 # Try alternative header detection 

301 alt_headings = re.findall(r"^#{1,6}\s+.+$", content, re.MULTILINE) 

302 if alt_headings: 

303 self.logger.debug( 

304 f"Found {len(alt_headings)!s} headers using alternative pattern" 

305 ) 

306 self.logger.debug(f"Alternative headers found: {alt_headings!s}") 

307 has_toc = "## Table of Contents" in content or "## Contents" in content 

308 heading_levels = [] 

309 for h in alt_headings: 

310 match = re.match(r"^(#{1,6})", h) 

311 if match: 

312 heading_levels.append(len(match.group(1))) 

313 sections_count = len(heading_levels) 

314 self.logger.debug( 

315 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}" 

316 ) 

317 

318 metadata = { 

319 "has_toc": has_toc, 

320 "heading_levels": heading_levels, 

321 "sections_count": sections_count, 

322 } 

323 

324 self.logger.debug(f"Structure metadata extraction completed: {metadata!s}") 

325 return metadata 

326 

327 def _get_repo_description(self, repo: git.Repo, file_path: str) -> str: 

328 """Get repository description from Git config or README.""" 

329 try: 

330 # Try to get description from Git config 

331 config = repo.config_reader() 

332 try: 

333 if config.has_section('remote "origin"'): 

334 description = str( 

335 config.get_value('remote "origin"', "description", default="") 

336 ) 

337 if ( 

338 description 

339 and isinstance(description, str) 

340 and description.strip() 

341 and "Unnamed repository;" not in description 

342 ): 

343 return description.strip() 

344 except Exception as e: 

345 self.logger.debug(f"Failed to read Git config: {e}") 

346 

347 # Try to find description in README files 

348 readme_files = ["README.md", "README.txt", "README", "README.rst"] 

349 repo_root = repo.working_dir 

350 for readme_file in readme_files: 

351 readme_path = os.path.join(repo_root, readme_file) 

352 if os.path.exists(readme_path) and os.path.isfile(readme_path): 

353 try: 

354 with open(readme_path, encoding="utf-8") as f: 

355 content = f.read() 

356 paragraphs = [] 

357 current_paragraph = [] 

358 in_title = True 

359 for line in content.splitlines(): 

360 line = line.strip() 

361 # Skip badges and links at the start 

362 if in_title and ( 

363 line.startswith("[![") or line.startswith("[") 

364 ): 

365 continue 

366 # Skip empty lines 

367 if not line: 

368 if current_paragraph: 

369 paragraphs.append(" ".join(current_paragraph)) 

370 current_paragraph = [] 

371 continue 

372 # Skip titles 

373 if line.startswith("#") or line.startswith("==="): 

374 in_title = True 

375 continue 

376 # Skip common sections 

377 if line.lower() in [ 

378 "## installation", 

379 "## usage", 

380 "## contributing", 

381 "## license", 

382 ]: 

383 break 

384 in_title = False 

385 current_paragraph.append(line) 

386 

387 if current_paragraph: 

388 paragraphs.append(" ".join(current_paragraph)) 

389 

390 # Find first meaningful paragraph 

391 for paragraph in paragraphs: 

392 if ( 

393 len(paragraph) >= 50 

394 ): # Minimum length for a meaningful description 

395 # Clean up markdown links 

396 paragraph = re.sub( 

397 r"\[([^\]]+)\]\([^)]+\)", r"\1", paragraph 

398 ) 

399 # Clean up HTML tags 

400 paragraph = re.sub(r"<[^>]+>", "", paragraph) 

401 # Limit length and break at sentence boundary 

402 if len(paragraph) > 200: 

403 sentences = re.split( 

404 r"(?<=[.!?])\s+", paragraph 

405 ) 

406 description = "" 

407 for sentence in sentences: 

408 if len(description + sentence) > 200: 

409 break 

410 description += sentence + " " 

411 description = description.strip() + "..." 

412 else: 

413 description = paragraph 

414 return description 

415 except Exception as e: 

416 self.logger.debug(f"Failed to read README {readme_file}: {e}") 

417 continue 

418 

419 except Exception as e: 

420 self.logger.debug(f"Failed to get repository description: {e}") 

421 

422 return "" 

423 

424 def _detect_encoding(self, content: str) -> str: 

425 """Detect file encoding.""" 

426 if not content: 

427 return "utf-8" 

428 

429 try: 

430 result = chardet.detect(content.encode()) 

431 if ( 

432 result["encoding"] 

433 and result["encoding"].lower() != "ascii" 

434 and result["confidence"] > 0.8 

435 ): 

436 return result["encoding"].lower() 

437 except Exception as e: 

438 self.logger.error({"event": "Failed to detect encoding", "error": str(e)}) 

439 

440 return "utf-8" 

441 

442 def _detect_language(self, file_path: str) -> str: 

443 """Detect programming language based on file extension.""" 

444 ext = os.path.splitext(file_path)[1].lower() 

445 language_map = { 

446 ".py": "Python", 

447 ".js": "JavaScript", 

448 ".ts": "TypeScript", 

449 ".java": "Java", 

450 ".cpp": "C++", 

451 ".c": "C", 

452 ".go": "Go", 

453 ".rs": "Rust", 

454 ".rb": "Ruby", 

455 ".php": "PHP", 

456 ".cs": "C#", 

457 ".scala": "Scala", 

458 ".kt": "Kotlin", 

459 ".swift": "Swift", 

460 ".m": "Objective-C", 

461 ".h": "C/C++ Header", 

462 ".sh": "Shell", 

463 ".bat": "Batch", 

464 ".ps1": "PowerShell", 

465 ".md": "Markdown", 

466 ".rst": "reStructuredText", 

467 ".txt": "Text", 

468 ".json": "JSON", 

469 ".xml": "XML", 

470 ".yaml": "YAML", 

471 ".yml": "YAML", 

472 ".toml": "TOML", 

473 ".ini": "INI", 

474 ".cfg": "Configuration", 

475 ".conf": "Configuration", 

476 } 

477 return language_map.get(ext, "Unknown") 

478 

479 def _has_code_blocks(self, content: str) -> bool: 

480 """Check if content contains code blocks.""" 

481 return bool(re.search(r"```[a-zA-Z]*\n[\s\S]*?\n```", content)) 

482 

483 def _has_images(self, content: str) -> bool: 

484 """Check if content contains image references.""" 

485 return bool(re.search(r"!\[.*?\]\(.*?\)", content)) 

486 

487 def _has_links(self, content: str) -> bool: 

488 """Check if content contains links.""" 

489 return bool(re.search(r"\[.*?\]\(.*?\)", content)) 

490 

491 def _get_heading_levels(self, content: str) -> list[int]: 

492 """Get list of heading levels in the content.""" 

493 headings = re.findall(r"^(#+)\s", content, re.MULTILINE) 

494 return [len(h) for h in headings]