Coverage for src/qdrant_loader/connectors/git/metadata_extractor.py: 78%
227 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1import os
2import re
3from typing import Any
4from urllib.parse import urlparse
6import chardet
7import git
9from qdrant_loader.connectors.git.config import GitRepoConfig
10from qdrant_loader.utils.logging import LoggingConfig
12logger = LoggingConfig.get_logger(__name__)
15class GitMetadataExtractor:
16 """Extract metadata from Git repository files."""
18 def __init__(self, config: GitRepoConfig):
19 """Initialize the Git metadata extractor.
21 Args:
22 config (GitRepoConfig): Configuration for the Git repository.
23 """
24 self.config = config
25 self.logger = logger
27 def extract_all_metadata(self, file_path: str, content: str) -> dict[str, Any]:
28 """Extract all metadata for a file.
30 Args:
31 file_path: Path to the file.
32 content: Content of the file.
34 Returns:
35 dict[str, Any]: Dictionary containing all metadata.
36 """
37 self.logger.debug(f"Starting metadata extraction for file: {file_path!s}")
39 file_metadata = self._extract_file_metadata(file_path, content)
40 repo_metadata = self._extract_repo_metadata(file_path)
41 git_metadata = self._extract_git_metadata(file_path)
43 # Only extract structure metadata for markdown files
44 structure_metadata = {}
45 if file_path.lower().endswith(".md"):
46 self.logger.debug(f"Processing markdown file: {file_path!s}")
47 structure_metadata = self._extract_structure_metadata(content)
49 metadata = {
50 **file_metadata,
51 **repo_metadata,
52 **git_metadata,
53 **structure_metadata,
54 }
56 self.logger.debug(f"Completed metadata extraction for {file_path!s}.")
57 self.logger.debug(f"Metadata: {metadata!s}")
58 return metadata
60 def _extract_file_metadata(self, file_path: str, content: str) -> dict[str, Any]:
61 """Extract metadata about the file itself."""
62 # Get relative path from repository root
63 rel_path = os.path.relpath(file_path, self.config.temp_dir)
64 file_type = os.path.splitext(rel_path)[1]
65 file_name = os.path.basename(rel_path)
66 file_encoding = self._detect_encoding(content)
67 # Count lines using splitlines(), but handle special case for whitespace-only content
68 if not content:
69 line_count = 0
70 elif content.strip() == "" and "\n" in content:
71 # Special case: whitespace-only content with newlines
72 # Count newlines + 1 to include all whitespace segments
73 line_count = content.count("\n") + 1
74 else:
75 # Normal content: use splitlines() which handles trailing newlines correctly
76 line_count = len(content.splitlines())
77 word_count = len(content.split())
78 file_size = len(content.encode(file_encoding))
80 return {
81 "file_type": file_type,
82 "file_name": file_name,
83 "file_directory": os.path.dirname("/" + file_path),
84 "file_encoding": file_encoding,
85 "line_count": line_count,
86 "word_count": word_count,
87 "file_size": file_size,
88 "has_code_blocks": self._has_code_blocks(content),
89 "has_images": self._has_images(content),
90 "has_links": self._has_links(content),
91 }
93 def _extract_repo_metadata(self, file_path: str) -> dict[str, Any]:
94 """Extract repository metadata from the given file path.
96 Args:
97 file_path (str): Path to the file.
99 Returns:
100 dict[str, Any]: Dictionary containing repository metadata.
101 """
102 try:
103 # Get repository URL from config
104 repo_url = str(self.config.base_url)
105 if not repo_url:
106 return {}
108 # Extract repository name and owner from normalized URL
109 normalized_url = repo_url[:-4] if repo_url.endswith(".git") else repo_url
110 repo_parts = normalized_url.split("/")
112 # Handle different Git hosting platforms using secure URL parsing
113 parsed_url = urlparse(repo_url)
114 hostname = parsed_url.hostname
116 if hostname == "dev.azure.com":
117 # Azure DevOps format: https://dev.azure.com/org/project/_git/repo
118 if len(repo_parts) >= 5 and "_git" in repo_parts:
119 git_index = repo_parts.index("_git")
120 if git_index >= 1:
121 repo_owner = repo_parts[git_index - 2] # org
122 repo_name = repo_parts[git_index + 1] # repo
123 else:
124 return {}
125 else:
126 return {}
127 elif hostname in ["github.com", "gitlab.com"] or (
128 hostname and hostname.endswith(".github.com")
129 ):
130 # Standard format: github.com/owner/repo or gitlab.com/owner/repo
131 # Also handle GitHub Enterprise subdomains
132 if len(repo_parts) >= 2:
133 repo_owner = repo_parts[-2]
134 repo_name = repo_parts[-1]
135 else:
136 return {}
137 else:
138 # Handle other Git hosting platforms (GitLab self-hosted, etc.)
139 if len(repo_parts) >= 2:
140 repo_owner = repo_parts[-2]
141 repo_name = repo_parts[-1]
142 else:
143 # Invalid URL format
144 return {}
146 # Initialize metadata with default values
147 metadata = {
148 "repository_name": repo_name,
149 "repository_owner": repo_owner,
150 "repository_url": repo_url,
151 "repository_description": "",
152 "repository_language": "",
153 }
155 try:
156 repo = git.Repo(self.config.temp_dir)
157 if repo and not repo.bare:
158 config = repo.config_reader()
159 # Try to get description from github section first
160 if config.has_section("github"):
161 metadata["repository_description"] = str(
162 config.get_value("github", "description", "")
163 )
164 metadata["repository_language"] = str(
165 config.get_value("github", "language", "")
166 )
167 # Fall back to core section if needed
168 if not metadata["repository_description"] and config.has_section(
169 "core"
170 ):
171 metadata["repository_description"] = str(
172 config.get_value("core", "description", "")
173 )
174 self.logger.debug(f"Repository metadata extracted: {metadata!s}")
175 except git.InvalidGitRepositoryError:
176 # If the directory is not a valid Git repository, we can't extract any metadata
177 self.logger.error("Invalid Git repository directory")
178 return {}
179 except Exception as e:
180 self.logger.error(f"Failed to read Git config: {e}")
182 return metadata
183 except Exception as e:
184 self.logger.error(f"Failed to extract repository metadata: {str(e)!s}")
185 return {}
187 def _extract_git_metadata(self, file_path: str) -> dict[str, Any]:
188 """Extract Git-specific metadata."""
189 try:
190 repo = git.Repo(self.config.temp_dir)
191 metadata = {}
193 try:
194 # Get the relative path from the repository root
195 rel_path = os.path.relpath(file_path, repo.working_dir)
197 # Try to get commits for the file
198 commits = list(repo.iter_commits(paths=rel_path, max_count=1))
199 if commits:
200 last_commit = commits[0]
201 metadata.update(
202 {
203 "last_commit_date": last_commit.committed_datetime.isoformat(),
204 "last_commit_author": last_commit.author.name,
205 "last_commit_message": last_commit.message.strip().split(
206 "\n"
207 )[0],
208 }
209 )
210 else:
211 # If no commits found for the file, try getting the latest commit
212 commits = list(repo.iter_commits(max_count=1))
213 if commits:
214 last_commit = commits[0]
215 metadata.update(
216 {
217 "last_commit_date": last_commit.committed_datetime.isoformat(),
218 "last_commit_author": last_commit.author.name,
219 "last_commit_message": last_commit.message.strip().split(
220 "\n"
221 )[
222 0
223 ],
224 }
225 )
226 else:
227 # If still no commits found, use repository's HEAD commit
228 head_commit = repo.head.commit
229 metadata.update(
230 {
231 "last_commit_date": head_commit.committed_datetime.isoformat(),
232 "last_commit_author": head_commit.author.name,
233 "last_commit_message": head_commit.message.strip().split(
234 "\n"
235 )[
236 0
237 ],
238 }
239 )
240 except Exception as e:
241 self.logger.debug(f"Failed to get commits: {e}")
242 # Try one last time with HEAD commit
243 try:
244 head_commit = repo.head.commit
245 metadata.update(
246 {
247 "last_commit_date": head_commit.committed_datetime.isoformat(),
248 "last_commit_author": head_commit.author.name,
249 "last_commit_message": head_commit.message.strip().split(
250 "\n"
251 )[0],
252 }
253 )
254 except Exception as e:
255 self.logger.debug(f"Failed to get HEAD commit: {e}")
257 return metadata
258 except Exception as e:
259 self.logger.warning(f"Failed to extract Git metadata: {str(e)!s}")
260 return {}
262 def _extract_structure_metadata(self, content: str) -> dict[str, Any]:
263 """Extract metadata about the document structure."""
264 self.logger.debug("Starting structure metadata extraction")
265 self.logger.debug(f"Content to process:\n{content!s}")
267 has_toc = False
268 heading_levels = []
269 sections_count = 0
271 # Check if content is markdown by looking for markdown headers
272 # Look for markdown headers that:
273 # 1. Start with 1-6 # characters at the start of a line or after a newline
274 # 2. Are followed by whitespace and text
275 # 3. Continue until the next newline or end of content
276 headings = re.findall(r"^[ \t]*(#{1,6})[ \t]+(.+?)$", content, re.MULTILINE)
277 self.logger.debug(f"Found {len(headings)!s} headers in content")
279 if headings:
280 self.logger.debug(f"Headers found: {headings!s}")
281 # Check for various TOC formats with different heading levels
282 toc_patterns = [
283 r"#+\s*Table\s+of\s+Contents",
284 r"#+\s*Contents",
285 r"#+\s*TOC",
286 ]
287 has_toc = any(
288 re.search(pattern, content, re.IGNORECASE) for pattern in toc_patterns
289 )
290 heading_levels = [len(h[0]) for h in headings]
291 sections_count = len(heading_levels)
292 self.logger.debug(
293 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}"
294 )
295 else:
296 self.logger.warning("No headers found in content")
297 # Log the first few lines of content for debugging
298 first_lines = "\n".join(content.splitlines()[:5])
299 self.logger.debug(f"First few lines of content:\n{first_lines!s}")
300 # Try alternative header detection
301 alt_headings = re.findall(r"^#{1,6}\s+.+$", content, re.MULTILINE)
302 if alt_headings:
303 self.logger.debug(
304 f"Found {len(alt_headings)!s} headers using alternative pattern"
305 )
306 self.logger.debug(f"Alternative headers found: {alt_headings!s}")
307 has_toc = "## Table of Contents" in content or "## Contents" in content
308 heading_levels = []
309 for h in alt_headings:
310 match = re.match(r"^(#{1,6})", h)
311 if match:
312 heading_levels.append(len(match.group(1)))
313 sections_count = len(heading_levels)
314 self.logger.debug(
315 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}"
316 )
318 metadata = {
319 "has_toc": has_toc,
320 "heading_levels": heading_levels,
321 "sections_count": sections_count,
322 }
324 self.logger.debug(f"Structure metadata extraction completed: {metadata!s}")
325 return metadata
327 def _get_repo_description(self, repo: git.Repo, file_path: str) -> str:
328 """Get repository description from Git config or README."""
329 try:
330 # Try to get description from Git config
331 config = repo.config_reader()
332 try:
333 if config.has_section('remote "origin"'):
334 description = str(
335 config.get_value('remote "origin"', "description", default="")
336 )
337 if (
338 description
339 and isinstance(description, str)
340 and description.strip()
341 and "Unnamed repository;" not in description
342 ):
343 return description.strip()
344 except Exception as e:
345 self.logger.debug(f"Failed to read Git config: {e}")
347 # Try to find description in README files
348 readme_files = ["README.md", "README.txt", "README", "README.rst"]
349 repo_root = repo.working_dir
350 for readme_file in readme_files:
351 readme_path = os.path.join(repo_root, readme_file)
352 if os.path.exists(readme_path) and os.path.isfile(readme_path):
353 try:
354 with open(readme_path, encoding="utf-8") as f:
355 content = f.read()
356 paragraphs = []
357 current_paragraph = []
358 in_title = True
359 for line in content.splitlines():
360 line = line.strip()
361 # Skip badges and links at the start
362 if in_title and (
363 line.startswith("[![") or line.startswith("[")
364 ):
365 continue
366 # Skip empty lines
367 if not line:
368 if current_paragraph:
369 paragraphs.append(" ".join(current_paragraph))
370 current_paragraph = []
371 continue
372 # Skip titles
373 if line.startswith("#") or line.startswith("==="):
374 in_title = True
375 continue
376 # Skip common sections
377 if line.lower() in [
378 "## installation",
379 "## usage",
380 "## contributing",
381 "## license",
382 ]:
383 break
384 in_title = False
385 current_paragraph.append(line)
387 if current_paragraph:
388 paragraphs.append(" ".join(current_paragraph))
390 # Find first meaningful paragraph
391 for paragraph in paragraphs:
392 if (
393 len(paragraph) >= 50
394 ): # Minimum length for a meaningful description
395 # Clean up markdown links
396 paragraph = re.sub(
397 r"\[([^\]]+)\]\([^)]+\)", r"\1", paragraph
398 )
399 # Clean up HTML tags
400 paragraph = re.sub(r"<[^>]+>", "", paragraph)
401 # Limit length and break at sentence boundary
402 if len(paragraph) > 200:
403 sentences = re.split(
404 r"(?<=[.!?])\s+", paragraph
405 )
406 description = ""
407 for sentence in sentences:
408 if len(description + sentence) > 200:
409 break
410 description += sentence + " "
411 description = description.strip() + "..."
412 else:
413 description = paragraph
414 return description
415 except Exception as e:
416 self.logger.debug(f"Failed to read README {readme_file}: {e}")
417 continue
419 except Exception as e:
420 self.logger.debug(f"Failed to get repository description: {e}")
422 return ""
424 def _detect_encoding(self, content: str) -> str:
425 """Detect file encoding."""
426 if not content:
427 return "utf-8"
429 try:
430 result = chardet.detect(content.encode())
431 if (
432 result["encoding"]
433 and result["encoding"].lower() != "ascii"
434 and result["confidence"] > 0.8
435 ):
436 return result["encoding"].lower()
437 except Exception as e:
438 self.logger.error({"event": "Failed to detect encoding", "error": str(e)})
440 return "utf-8"
442 def _detect_language(self, file_path: str) -> str:
443 """Detect programming language based on file extension."""
444 ext = os.path.splitext(file_path)[1].lower()
445 language_map = {
446 ".py": "Python",
447 ".js": "JavaScript",
448 ".ts": "TypeScript",
449 ".java": "Java",
450 ".cpp": "C++",
451 ".c": "C",
452 ".go": "Go",
453 ".rs": "Rust",
454 ".rb": "Ruby",
455 ".php": "PHP",
456 ".cs": "C#",
457 ".scala": "Scala",
458 ".kt": "Kotlin",
459 ".swift": "Swift",
460 ".m": "Objective-C",
461 ".h": "C/C++ Header",
462 ".sh": "Shell",
463 ".bat": "Batch",
464 ".ps1": "PowerShell",
465 ".md": "Markdown",
466 ".rst": "reStructuredText",
467 ".txt": "Text",
468 ".json": "JSON",
469 ".xml": "XML",
470 ".yaml": "YAML",
471 ".yml": "YAML",
472 ".toml": "TOML",
473 ".ini": "INI",
474 ".cfg": "Configuration",
475 ".conf": "Configuration",
476 }
477 return language_map.get(ext, "Unknown")
479 def _has_code_blocks(self, content: str) -> bool:
480 """Check if content contains code blocks."""
481 return bool(re.search(r"```[a-zA-Z]*\n[\s\S]*?\n```", content))
483 def _has_images(self, content: str) -> bool:
484 """Check if content contains image references."""
485 return bool(re.search(r"!\[.*?\]\(.*?\)", content))
487 def _has_links(self, content: str) -> bool:
488 """Check if content contains links."""
489 return bool(re.search(r"\[.*?\]\(.*?\)", content))
491 def _get_heading_levels(self, content: str) -> list[int]:
492 """Get list of heading levels in the content."""
493 headings = re.findall(r"^(#+)\s", content, re.MULTILINE)
494 return [len(h) for h in headings]