Coverage for src / qdrant_loader / connectors / git / metadata_extractor.py: 77%
230 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 09:46 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 09:46 +0000
1import os
2import re
3from typing import Any
4from urllib.parse import urlparse
6import chardet
7import git
9from qdrant_loader.connectors.git.config import GitRepoConfig
10from qdrant_loader.utils.logging import LoggingConfig
12logger = LoggingConfig.get_logger(__name__)
15class GitMetadataExtractor:
16 """Extract metadata from Git repository files."""
18 def __init__(self, config: GitRepoConfig):
19 """Initialize the Git metadata extractor.
21 Args:
22 config (GitRepoConfig): Configuration for the Git repository.
23 """
24 self.config = config
25 self.logger = logger
27 def extract_all_metadata(self, file_path: str, content: str) -> dict[str, Any]:
28 """Extract all metadata for a file.
30 Args:
31 file_path: Path to the file.
32 content: Content of the file.
34 Returns:
35 dict[str, Any]: Dictionary containing all metadata.
36 """
37 self.logger.debug(f"Starting metadata extraction for file: {file_path!s}")
39 file_metadata = self._extract_file_metadata(file_path, content)
40 repo_metadata = self._extract_repo_metadata(file_path)
41 git_metadata = self._extract_git_metadata(file_path)
43 # Only extract structure metadata for markdown files
44 structure_metadata = {}
45 if file_path.lower().endswith(".md"):
46 self.logger.debug(f"Processing markdown file: {file_path!s}")
47 structure_metadata = self._extract_structure_metadata(content)
49 metadata = {
50 **file_metadata,
51 **repo_metadata,
52 **git_metadata,
53 **structure_metadata,
54 }
56 self.logger.debug(f"Completed metadata extraction for {file_path!s}.")
57 self.logger.debug(f"Metadata: {metadata!s}")
58 return metadata
60 def _extract_file_metadata(self, file_path: str, content: str) -> dict[str, Any]:
61 """Extract metadata about the file itself."""
62 # Get relative path from repository root
63 # Handle cross-drive paths on Windows (ValueError when paths are on different drives)
64 try:
65 rel_path = os.path.relpath(file_path, self.config.temp_dir)
66 except ValueError:
67 raise ValueError(
68 f"Cannot compute relative path for {file_path} from {self.config.temp_dir}. "
69 "Files on different drives should be filtered during file processing."
70 )
71 file_type = os.path.splitext(rel_path)[1]
72 file_name = os.path.basename(rel_path)
73 file_encoding = self._detect_encoding(content)
74 # Count lines using splitlines(), but handle special case for whitespace-only content
75 if not content:
76 line_count = 0
77 elif content.strip() == "" and "\n" in content:
78 # Special case: whitespace-only content with newlines
79 # Count newlines + 1 to include all whitespace segments
80 line_count = content.count("\n") + 1
81 else:
82 # Normal content: use splitlines() which handles trailing newlines correctly
83 line_count = len(content.splitlines())
84 word_count = len(content.split())
85 file_size = len(content.encode(file_encoding))
87 return {
88 "file_type": file_type,
89 "file_name": file_name,
90 "file_directory": (
91 os.path.dirname(rel_path)
92 if not os.path.isabs(rel_path)
93 else os.path.dirname(file_path)
94 ),
95 "file_encoding": file_encoding,
96 "line_count": line_count,
97 "word_count": word_count,
98 "file_size": file_size,
99 "has_code_blocks": self._has_code_blocks(content),
100 "has_images": self._has_images(content),
101 "has_links": self._has_links(content),
102 }
104 def _extract_repo_metadata(self, file_path: str) -> dict[str, Any]:
105 """Extract repository metadata from the given file path.
107 Args:
108 file_path (str): Path to the file.
110 Returns:
111 dict[str, Any]: Dictionary containing repository metadata.
112 """
113 try:
114 # Get repository URL from config
115 repo_url = str(self.config.base_url)
116 if not repo_url:
117 return {}
119 # Extract repository name and owner from normalized URL
120 normalized_url = repo_url[:-4] if repo_url.endswith(".git") else repo_url
121 repo_parts = normalized_url.split("/")
123 # Handle different Git hosting platforms using secure URL parsing
124 parsed_url = urlparse(repo_url)
125 hostname = parsed_url.hostname
127 if hostname == "dev.azure.com":
128 # Azure DevOps format: https://dev.azure.com/org/project/_git/repo
129 if len(repo_parts) >= 5 and "_git" in repo_parts:
130 git_index = repo_parts.index("_git")
131 if git_index >= 1:
132 repo_owner = repo_parts[git_index - 2] # org
133 repo_name = repo_parts[git_index + 1] # repo
134 else:
135 return {}
136 else:
137 return {}
138 elif hostname in ["github.com", "gitlab.com"] or (
139 hostname and hostname.endswith(".github.com")
140 ):
141 # Standard format: github.com/owner/repo or gitlab.com/owner/repo
142 # Also handle GitHub Enterprise subdomains
143 if len(repo_parts) >= 2:
144 repo_owner = repo_parts[-2]
145 repo_name = repo_parts[-1]
146 else:
147 return {}
148 else:
149 # Handle other Git hosting platforms (GitLab self-hosted, etc.)
150 if len(repo_parts) >= 2:
151 repo_owner = repo_parts[-2]
152 repo_name = repo_parts[-1]
153 else:
154 # Invalid URL format
155 return {}
157 # Initialize metadata with default values
158 metadata = {
159 "repository_name": repo_name,
160 "repository_owner": repo_owner,
161 "repository_url": repo_url,
162 "repository_description": "",
163 "repository_language": "",
164 }
166 try:
167 repo = git.Repo(self.config.temp_dir)
168 if repo and not repo.bare:
169 config = repo.config_reader()
170 # Try to get description from github section first
171 if config.has_section("github"):
172 metadata["repository_description"] = str(
173 config.get_value("github", "description", "")
174 )
175 metadata["repository_language"] = str(
176 config.get_value("github", "language", "")
177 )
178 # Fall back to core section if needed
179 if not metadata["repository_description"] and config.has_section(
180 "core"
181 ):
182 metadata["repository_description"] = str(
183 config.get_value("core", "description", "")
184 )
185 self.logger.debug(f"Repository metadata extracted: {metadata!s}")
186 except git.InvalidGitRepositoryError:
187 # If the directory is not a valid Git repository, we can't extract any metadata
188 self.logger.error("Invalid Git repository directory")
189 return {}
190 except Exception as e:
191 self.logger.error(f"Failed to read Git config: {e}")
193 return metadata
194 except Exception as e:
195 self.logger.error(f"Failed to extract repository metadata: {str(e)!s}")
196 return {}
198 def _extract_git_metadata(self, file_path: str) -> dict[str, Any]:
199 """Extract Git-specific metadata."""
200 try:
201 repo = git.Repo(self.config.temp_dir)
202 metadata = {}
204 try:
205 # Get the relative path from the repository root
206 rel_path = os.path.relpath(file_path, repo.working_dir)
208 # Try to get commits for the file
209 commits = list(repo.iter_commits(paths=rel_path, max_count=1))
210 if commits:
211 last_commit = commits[0]
212 metadata.update(
213 {
214 "last_commit_date": last_commit.committed_datetime.isoformat(),
215 "last_commit_author": last_commit.author.name,
216 "last_commit_message": last_commit.message.strip().split(
217 "\n"
218 )[0],
219 }
220 )
221 else:
222 # If no commits found for the file, try getting the latest commit
223 commits = list(repo.iter_commits(max_count=1))
224 if commits:
225 last_commit = commits[0]
226 metadata.update(
227 {
228 "last_commit_date": last_commit.committed_datetime.isoformat(),
229 "last_commit_author": last_commit.author.name,
230 "last_commit_message": last_commit.message.strip().split(
231 "\n"
232 )[
233 0
234 ],
235 }
236 )
237 else:
238 # If still no commits found, use repository's HEAD commit
239 head_commit = repo.head.commit
240 metadata.update(
241 {
242 "last_commit_date": head_commit.committed_datetime.isoformat(),
243 "last_commit_author": head_commit.author.name,
244 "last_commit_message": head_commit.message.strip().split(
245 "\n"
246 )[
247 0
248 ],
249 }
250 )
251 except Exception as e:
252 self.logger.debug(f"Failed to get commits: {e}")
253 # Try one last time with HEAD commit
254 try:
255 head_commit = repo.head.commit
256 metadata.update(
257 {
258 "last_commit_date": head_commit.committed_datetime.isoformat(),
259 "last_commit_author": head_commit.author.name,
260 "last_commit_message": head_commit.message.strip().split(
261 "\n"
262 )[0],
263 }
264 )
265 except Exception as e:
266 self.logger.debug(f"Failed to get HEAD commit: {e}")
268 return metadata
269 except Exception as e:
270 self.logger.warning(f"Failed to extract Git metadata: {str(e)!s}")
271 return {}
273 def _extract_structure_metadata(self, content: str) -> dict[str, Any]:
274 """Extract metadata about the document structure."""
275 self.logger.debug("Starting structure metadata extraction")
276 self.logger.debug(f"Content to process:\n{content!s}")
278 has_toc = False
279 heading_levels = []
280 sections_count = 0
282 # Check if content is markdown by looking for markdown headers
283 # Look for markdown headers that:
284 # 1. Start with 1-6 # characters at the start of a line or after a newline
285 # 2. Are followed by whitespace and text
286 # 3. Continue until the next newline or end of content
287 headings = re.findall(r"^[ \t]*(#{1,6})[ \t]+(.+?)$", content, re.MULTILINE)
288 self.logger.debug(f"Found {len(headings)!s} headers in content")
290 if headings:
291 self.logger.debug(f"Headers found: {headings!s}")
292 # Check for various TOC formats with different heading levels
293 toc_patterns = [
294 r"#+\s*Table\s+of\s+Contents",
295 r"#+\s*Contents",
296 r"#+\s*TOC",
297 ]
298 has_toc = any(
299 re.search(pattern, content, re.IGNORECASE) for pattern in toc_patterns
300 )
301 heading_levels = [len(h[0]) for h in headings]
302 sections_count = len(heading_levels)
303 self.logger.debug(
304 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}"
305 )
306 else:
307 self.logger.warning("No headers found in content")
308 # Log the first few lines of content for debugging
309 first_lines = "\n".join(content.splitlines()[:5])
310 self.logger.debug(f"First few lines of content:\n{first_lines!s}")
311 # Try alternative header detection
312 alt_headings = re.findall(r"^#{1,6}\s+.+$", content, re.MULTILINE)
313 if alt_headings:
314 self.logger.debug(
315 f"Found {len(alt_headings)!s} headers using alternative pattern"
316 )
317 self.logger.debug(f"Alternative headers found: {alt_headings!s}")
318 has_toc = "## Table of Contents" in content or "## Contents" in content
319 heading_levels = []
320 for h in alt_headings:
321 match = re.match(r"^(#{1,6})", h)
322 if match:
323 heading_levels.append(len(match.group(1)))
324 sections_count = len(heading_levels)
325 self.logger.debug(
326 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}"
327 )
329 metadata = {
330 "has_toc": has_toc,
331 "heading_levels": heading_levels,
332 "sections_count": sections_count,
333 }
335 self.logger.debug(f"Structure metadata extraction completed: {metadata!s}")
336 return metadata
338 def _get_repo_description(self, repo: git.Repo, file_path: str) -> str:
339 """Get repository description from Git config or README."""
340 try:
341 # Try to get description from Git config
342 config = repo.config_reader()
343 try:
344 if config.has_section('remote "origin"'):
345 description = str(
346 config.get_value('remote "origin"', "description", default="")
347 )
348 if (
349 description
350 and isinstance(description, str)
351 and description.strip()
352 and "Unnamed repository;" not in description
353 ):
354 return description.strip()
355 except Exception as e:
356 self.logger.debug(f"Failed to read Git config: {e}")
358 # Try to find description in README files
359 readme_files = ["README.md", "README.txt", "README", "README.rst"]
360 repo_root = repo.working_dir
361 for readme_file in readme_files:
362 readme_path = os.path.join(repo_root, readme_file)
363 if os.path.exists(readme_path) and os.path.isfile(readme_path):
364 try:
365 with open(readme_path, encoding="utf-8") as f:
366 content = f.read()
367 paragraphs = []
368 current_paragraph = []
369 in_title = True
370 for line in content.splitlines():
371 line = line.strip()
372 # Skip badges and links at the start
373 if in_title and (
374 line.startswith("[![") or line.startswith("[")
375 ):
376 continue
377 # Skip empty lines
378 if not line:
379 if current_paragraph:
380 paragraphs.append(" ".join(current_paragraph))
381 current_paragraph = []
382 continue
383 # Skip titles
384 if line.startswith("#") or line.startswith("==="):
385 in_title = True
386 continue
387 # Skip common sections
388 if line.lower() in [
389 "## installation",
390 "## usage",
391 "## contributing",
392 "## license",
393 ]:
394 break
395 in_title = False
396 current_paragraph.append(line)
398 if current_paragraph:
399 paragraphs.append(" ".join(current_paragraph))
401 # Find first meaningful paragraph
402 for paragraph in paragraphs:
403 if (
404 len(paragraph) >= 50
405 ): # Minimum length for a meaningful description
406 # Clean up markdown links
407 paragraph = re.sub(
408 r"\[([^\]]+)\]\([^)]+\)", r"\1", paragraph
409 )
410 # Clean up HTML tags
411 paragraph = re.sub(r"<[^>]+>", "", paragraph)
412 # Limit length and break at sentence boundary
413 if len(paragraph) > 200:
414 sentences = re.split(
415 r"(?<=[.!?])\s+", paragraph
416 )
417 description = ""
418 for sentence in sentences:
419 if len(description + sentence) > 200:
420 break
421 description += sentence + " "
422 description = description.strip() + "..."
423 else:
424 description = paragraph
425 return description
426 except Exception as e:
427 self.logger.debug(f"Failed to read README {readme_file}: {e}")
428 continue
430 except Exception as e:
431 self.logger.debug(f"Failed to get repository description: {e}")
433 return ""
435 def _detect_encoding(self, content: str) -> str:
436 """Detect file encoding."""
437 if not content:
438 return "utf-8"
440 try:
441 result = chardet.detect(content.encode())
442 if (
443 result["encoding"]
444 and result["encoding"].lower() != "ascii"
445 and result["confidence"] > 0.8
446 ):
447 return result["encoding"].lower()
448 except Exception as e:
449 self.logger.error({"event": "Failed to detect encoding", "error": str(e)})
451 return "utf-8"
453 def _detect_language(self, file_path: str) -> str:
454 """Detect programming language based on file extension."""
455 ext = os.path.splitext(file_path)[1].lower()
456 language_map = {
457 ".py": "Python",
458 ".js": "JavaScript",
459 ".ts": "TypeScript",
460 ".java": "Java",
461 ".cpp": "C++",
462 ".c": "C",
463 ".go": "Go",
464 ".rs": "Rust",
465 ".rb": "Ruby",
466 ".php": "PHP",
467 ".cs": "C#",
468 ".scala": "Scala",
469 ".kt": "Kotlin",
470 ".swift": "Swift",
471 ".m": "Objective-C",
472 ".h": "C/C++ Header",
473 ".sh": "Shell",
474 ".bat": "Batch",
475 ".ps1": "PowerShell",
476 ".md": "Markdown",
477 ".rst": "reStructuredText",
478 ".txt": "Text",
479 ".json": "JSON",
480 ".xml": "XML",
481 ".yaml": "YAML",
482 ".yml": "YAML",
483 ".toml": "TOML",
484 ".ini": "INI",
485 ".cfg": "Configuration",
486 ".conf": "Configuration",
487 }
488 return language_map.get(ext, "Unknown")
490 def _has_code_blocks(self, content: str) -> bool:
491 """Check if content contains code blocks."""
492 return bool(re.search(r"```[a-zA-Z]*\n[\s\S]*?\n```", content))
494 def _has_images(self, content: str) -> bool:
495 """Check if content contains image references."""
496 return bool(re.search(r"!\[.*?\]\(.*?\)", content))
498 def _has_links(self, content: str) -> bool:
499 """Check if content contains links."""
500 return bool(re.search(r"\[.*?\]\(.*?\)", content))
502 def _get_heading_levels(self, content: str) -> list[int]:
503 """Get list of heading levels in the content."""
504 headings = re.findall(r"^(#+)\s", content, re.MULTILINE)
505 return [len(h) for h in headings]