Coverage for src/qdrant_loader/connectors/git/metadata_extractor.py: 54%

204 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1import os 

2import re 

3from typing import Any 

4 

5import chardet 

6import git 

7 

8from qdrant_loader.connectors.git.config import GitRepoConfig 

9from qdrant_loader.utils.logging import LoggingConfig 

10 

11logger = LoggingConfig.get_logger(__name__) 

12 

13 

14class GitMetadataExtractor: 

15 """Extract metadata from Git repository files.""" 

16 

17 def __init__(self, config: GitRepoConfig): 

18 """Initialize the Git metadata extractor. 

19 

20 Args: 

21 config (GitRepoConfig): Configuration for the Git repository. 

22 """ 

23 self.config = config 

24 self.logger = LoggingConfig.get_logger(__name__) 

25 

26 def extract_all_metadata(self, file_path: str, content: str) -> dict[str, Any]: 

27 """Extract all metadata for a file. 

28 

29 Args: 

30 file_path: Path to the file. 

31 content: Content of the file. 

32 

33 Returns: 

34 dict[str, Any]: Dictionary containing all metadata. 

35 """ 

36 self.logger.debug(f"Starting metadata extraction for file: {file_path!s}") 

37 

38 file_metadata = self._extract_file_metadata(file_path, content) 

39 repo_metadata = self._extract_repo_metadata(file_path) 

40 git_metadata = self._extract_git_metadata(file_path) 

41 

42 # Only extract structure metadata for markdown files 

43 structure_metadata = {} 

44 if file_path.lower().endswith(".md"): 

45 self.logger.debug(f"Processing markdown file: {file_path!s}") 

46 structure_metadata = self._extract_structure_metadata(content) 

47 

48 metadata = { 

49 **file_metadata, 

50 **repo_metadata, 

51 **git_metadata, 

52 **structure_metadata, 

53 } 

54 

55 self.logger.debug(f"Completed metadata extraction for {file_path!s}.") 

56 self.logger.debug(f"Metadata: {metadata!s}") 

57 return metadata 

58 

59 def _extract_file_metadata(self, file_path: str, content: str) -> dict[str, Any]: 

60 """Extract metadata about the file itself.""" 

61 # Get relative path from repository root 

62 rel_path = os.path.relpath(file_path, self.config.temp_dir) 

63 file_type = os.path.splitext(rel_path)[1] 

64 file_name = os.path.basename(rel_path) 

65 file_encoding = self._detect_encoding(content) 

66 line_count = len(content.splitlines()) 

67 word_count = len(content.split()) 

68 file_size = len(content.encode(file_encoding)) 

69 

70 return { 

71 "file_type": file_type, 

72 "file_name": file_name, 

73 "file_directory": os.path.dirname("/" + file_path), 

74 "file_encoding": file_encoding, 

75 "line_count": line_count, 

76 "word_count": word_count, 

77 "file_size": file_size, 

78 "has_code_blocks": self._has_code_blocks(content), 

79 "has_images": self._has_images(content), 

80 "has_links": self._has_links(content), 

81 } 

82 

83 def _extract_repo_metadata(self, file_path: str) -> dict[str, Any]: 

84 """Extract repository metadata from the given file path. 

85 

86 Args: 

87 file_path (str): Path to the file. 

88 

89 Returns: 

90 dict[str, Any]: Dictionary containing repository metadata. 

91 """ 

92 try: 

93 # Get repository URL from config 

94 repo_url = str(self.config.base_url) 

95 if not repo_url: 

96 return {} 

97 

98 # Extract repository name and owner from normalized URL 

99 normalized_url = repo_url[:-4] if repo_url.endswith(".git") else repo_url 

100 repo_parts = normalized_url.split("/") 

101 if len(repo_parts) >= 2: 

102 repo_owner = repo_parts[-2] 

103 repo_name = repo_parts[-1] 

104 else: 

105 repo_owner = "" 

106 repo_name = normalized_url 

107 

108 # Initialize metadata with default values 

109 metadata = { 

110 "repository_name": repo_name, 

111 "repository_owner": repo_owner, 

112 "repository_url": repo_url, 

113 "repository_description": "", 

114 "repository_language": "", 

115 } 

116 

117 try: 

118 repo = git.Repo(self.config.temp_dir) 

119 if repo and not repo.bare: 

120 config = repo.config_reader() 

121 # Try to get description from github section first 

122 if config.has_section("github"): 

123 metadata["repository_description"] = str( 

124 config.get_value("github", "description", "") 

125 ) 

126 metadata["repository_language"] = str( 

127 config.get_value("github", "language", "") 

128 ) 

129 # Fall back to core section if needed 

130 if not metadata["repository_description"] and config.has_section( 

131 "core" 

132 ): 

133 metadata["repository_description"] = str( 

134 config.get_value("core", "description", "") 

135 ) 

136 self.logger.debug(f"Repository metadata extracted: {metadata!s}") 

137 except Exception as e: 

138 self.logger.error(f"Failed to read Git config: {e}") 

139 

140 return metadata 

141 except Exception as e: 

142 self.logger.error(f"Failed to extract repository metadata: {str(e)!s}") 

143 return {} 

144 

145 def _extract_git_metadata(self, file_path: str) -> dict[str, Any]: 

146 """Extract Git-specific metadata.""" 

147 try: 

148 repo = git.Repo(self.config.temp_dir) 

149 metadata = {} 

150 

151 try: 

152 # Get the relative path from the repository root 

153 rel_path = os.path.relpath(file_path, repo.working_dir) 

154 

155 # Try to get commits for the file 

156 commits = list(repo.iter_commits(paths=rel_path, max_count=1)) 

157 if commits: 

158 last_commit = commits[0] 

159 metadata.update( 

160 { 

161 "last_commit_date": last_commit.committed_datetime.isoformat(), 

162 "last_commit_author": last_commit.author.name, 

163 "last_commit_message": last_commit.message.strip(), 

164 } 

165 ) 

166 else: 

167 # If no commits found for the file, try getting the latest commit 

168 commits = list(repo.iter_commits(max_count=1)) 

169 if commits: 

170 last_commit = commits[0] 

171 metadata.update( 

172 { 

173 "last_commit_date": last_commit.committed_datetime.isoformat(), 

174 "last_commit_author": last_commit.author.name, 

175 "last_commit_message": last_commit.message.strip(), 

176 } 

177 ) 

178 else: 

179 # If still no commits found, use repository's HEAD commit 

180 head_commit = repo.head.commit 

181 metadata.update( 

182 { 

183 "last_commit_date": head_commit.committed_datetime.isoformat(), 

184 "last_commit_author": head_commit.author.name, 

185 "last_commit_message": head_commit.message.strip(), 

186 } 

187 ) 

188 except Exception as e: 

189 self.logger.debug(f"Failed to get commits: {e}") 

190 # Try one last time with HEAD commit 

191 try: 

192 head_commit = repo.head.commit 

193 metadata.update( 

194 { 

195 "last_commit_date": head_commit.committed_datetime.isoformat(), 

196 "last_commit_author": head_commit.author.name, 

197 "last_commit_message": head_commit.message.strip(), 

198 } 

199 ) 

200 except Exception as e: 

201 self.logger.debug(f"Failed to get HEAD commit: {e}") 

202 

203 return metadata 

204 except Exception as e: 

205 self.logger.warning(f"Failed to extract Git metadata: {str(e)!s}") 

206 return {} 

207 

208 def _extract_structure_metadata(self, content: str) -> dict[str, Any]: 

209 """Extract metadata about the document structure.""" 

210 self.logger.debug("Starting structure metadata extraction") 

211 self.logger.debug(f"Content to process:\n{content!s}") 

212 

213 has_toc = False 

214 heading_levels = [] 

215 sections_count = 0 

216 

217 # Check if content is markdown by looking for markdown headers 

218 # Look for markdown headers that: 

219 # 1. Start with 1-6 # characters at the start of a line or after a newline 

220 # 2. Are followed by whitespace and text 

221 # 3. Continue until the next newline or end of content 

222 headings = re.findall( 

223 r"(?:^|\n)\s*(#{1,6})\s+(.+?)(?:\n|$)", content, re.MULTILINE 

224 ) 

225 self.logger.debug(f"Found {len(headings)!s} headers in content") 

226 

227 if headings: 

228 self.logger.debug(f"Headers found: {headings!s}") 

229 has_toc = "## Table of Contents" in content or "## Contents" in content 

230 heading_levels = [len(h[0]) for h in headings] 

231 sections_count = len(heading_levels) 

232 self.logger.debug( 

233 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}" 

234 ) 

235 else: 

236 self.logger.warning("No headers found in content") 

237 # Log the first few lines of content for debugging 

238 first_lines = "\n".join(content.splitlines()[:5]) 

239 self.logger.debug(f"First few lines of content:\n{first_lines!s}") 

240 # Try alternative header detection 

241 alt_headings = re.findall(r"^#{1,6}\s+.+$", content, re.MULTILINE) 

242 if alt_headings: 

243 self.logger.debug( 

244 f"Found {len(alt_headings)!s} headers using alternative pattern" 

245 ) 

246 self.logger.debug(f"Alternative headers found: {alt_headings!s}") 

247 has_toc = "## Table of Contents" in content or "## Contents" in content 

248 heading_levels = [] 

249 for h in alt_headings: 

250 match = re.match(r"^(#{1,6})", h) 

251 if match: 

252 heading_levels.append(len(match.group(1))) 

253 sections_count = len(heading_levels) 

254 self.logger.debug( 

255 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}" 

256 ) 

257 

258 metadata = { 

259 "has_toc": has_toc, 

260 "heading_levels": heading_levels, 

261 "sections_count": sections_count, 

262 } 

263 

264 self.logger.debug(f"Structure metadata extraction completed: {metadata!s}") 

265 return metadata 

266 

267 def _get_repo_description(self, repo: git.Repo, file_path: str) -> str: 

268 """Get repository description from Git config or README.""" 

269 try: 

270 # Try to get description from Git config 

271 config = repo.config_reader() 

272 try: 

273 if config.has_section('remote "origin"'): 

274 description = str( 

275 config.get_value('remote "origin"', "description", default="") 

276 ) 

277 if ( 

278 description 

279 and isinstance(description, str) 

280 and description.strip() 

281 and "Unnamed repository;" not in description 

282 ): 

283 return description.strip() 

284 except Exception as e: 

285 self.logger.debug(f"Failed to read Git config: {e}") 

286 

287 # Try to find description in README files 

288 readme_files = ["README.md", "README.txt", "README", "README.rst"] 

289 repo_root = repo.working_dir 

290 for readme_file in readme_files: 

291 readme_path = os.path.join(repo_root, readme_file) 

292 if os.path.exists(readme_path) and os.path.isfile(readme_path): 

293 try: 

294 with open(readme_path, encoding="utf-8") as f: 

295 content = f.read() 

296 paragraphs = [] 

297 current_paragraph = [] 

298 in_title = True 

299 for line in content.splitlines(): 

300 line = line.strip() 

301 # Skip badges and links at the start 

302 if in_title and ( 

303 line.startswith("[![") or line.startswith("[") 

304 ): 

305 continue 

306 # Skip empty lines 

307 if not line: 

308 if current_paragraph: 

309 paragraphs.append(" ".join(current_paragraph)) 

310 current_paragraph = [] 

311 continue 

312 # Skip titles 

313 if line.startswith("#") or line.startswith("==="): 

314 in_title = True 

315 continue 

316 # Skip common sections 

317 if line.lower() in [ 

318 "## installation", 

319 "## usage", 

320 "## contributing", 

321 "## license", 

322 ]: 

323 break 

324 in_title = False 

325 current_paragraph.append(line) 

326 

327 if current_paragraph: 

328 paragraphs.append(" ".join(current_paragraph)) 

329 

330 # Find first meaningful paragraph 

331 for paragraph in paragraphs: 

332 if ( 

333 len(paragraph) >= 50 

334 ): # Minimum length for a meaningful description 

335 # Clean up markdown links 

336 paragraph = re.sub( 

337 r"\[([^\]]+)\]\([^)]+\)", r"\1", paragraph 

338 ) 

339 # Clean up HTML tags 

340 paragraph = re.sub(r"<[^>]+>", "", paragraph) 

341 # Limit length and break at sentence boundary 

342 if len(paragraph) > 200: 

343 sentences = re.split( 

344 r"(?<=[.!?])\s+", paragraph 

345 ) 

346 description = "" 

347 for sentence in sentences: 

348 if len(description + sentence) > 200: 

349 break 

350 description += sentence + " " 

351 description = description.strip() + "..." 

352 else: 

353 description = paragraph 

354 return description 

355 except Exception as e: 

356 self.logger.debug(f"Failed to read README {readme_file}: {e}") 

357 continue 

358 

359 except Exception as e: 

360 self.logger.debug(f"Failed to get repository description: {e}") 

361 

362 return "No description available" 

363 

364 def _detect_encoding(self, content: str) -> str: 

365 """Detect file encoding.""" 

366 if not content: 

367 return "utf-8" 

368 

369 try: 

370 result = chardet.detect(content.encode()) 

371 if ( 

372 result["encoding"] 

373 and result["encoding"].lower() != "ascii" 

374 and result["confidence"] > 0.8 

375 ): 

376 return result["encoding"].lower() 

377 except Exception as e: 

378 self.logger.error({"event": "Failed to detect encoding", "error": str(e)}) 

379 

380 return "utf-8" 

381 

382 def _detect_language(self, file_path: str) -> str: 

383 """Detect programming language based on file extension.""" 

384 ext = os.path.splitext(file_path)[1].lower() 

385 language_map = { 

386 ".py": "Python", 

387 ".js": "JavaScript", 

388 ".ts": "TypeScript", 

389 ".java": "Java", 

390 ".cpp": "C++", 

391 ".c": "C", 

392 ".go": "Go", 

393 ".rs": "Rust", 

394 ".rb": "Ruby", 

395 ".php": "PHP", 

396 ".cs": "C#", 

397 ".scala": "Scala", 

398 ".kt": "Kotlin", 

399 ".swift": "Swift", 

400 ".m": "Objective-C", 

401 ".h": "C/C++ Header", 

402 ".sh": "Shell", 

403 ".bat": "Batch", 

404 ".ps1": "PowerShell", 

405 ".md": "Markdown", 

406 ".rst": "reStructuredText", 

407 ".txt": "Text", 

408 ".json": "JSON", 

409 ".xml": "XML", 

410 ".yaml": "YAML", 

411 ".yml": "YAML", 

412 ".toml": "TOML", 

413 ".ini": "INI", 

414 ".cfg": "Configuration", 

415 ".conf": "Configuration", 

416 } 

417 return language_map.get(ext, "Unknown") 

418 

419 def _has_code_blocks(self, content: str) -> bool: 

420 """Check if content contains code blocks.""" 

421 return bool(re.search(r"```[a-zA-Z]*\n[\s\S]*?\n```", content)) 

422 

423 def _has_images(self, content: str) -> bool: 

424 """Check if content contains image references.""" 

425 return bool(re.search(r"!\[.*?\]\(.*?\)", content)) 

426 

427 def _has_links(self, content: str) -> bool: 

428 """Check if content contains links.""" 

429 return bool(re.search(r"\[.*?\]\(.*?\)", content)) 

430 

431 def _get_heading_levels(self, content: str) -> list[int]: 

432 """Get list of heading levels in the content.""" 

433 headings = re.findall(r"^(#+)\s", content, re.MULTILINE) 

434 return [len(h) for h in headings]