Coverage for src/qdrant_loader/connectors/git/metadata_extractor.py: 54%
204 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1import os
2import re
3from typing import Any
5import chardet
6import git
8from qdrant_loader.connectors.git.config import GitRepoConfig
9from qdrant_loader.utils.logging import LoggingConfig
11logger = LoggingConfig.get_logger(__name__)
14class GitMetadataExtractor:
15 """Extract metadata from Git repository files."""
17 def __init__(self, config: GitRepoConfig):
18 """Initialize the Git metadata extractor.
20 Args:
21 config (GitRepoConfig): Configuration for the Git repository.
22 """
23 self.config = config
24 self.logger = LoggingConfig.get_logger(__name__)
26 def extract_all_metadata(self, file_path: str, content: str) -> dict[str, Any]:
27 """Extract all metadata for a file.
29 Args:
30 file_path: Path to the file.
31 content: Content of the file.
33 Returns:
34 dict[str, Any]: Dictionary containing all metadata.
35 """
36 self.logger.debug(f"Starting metadata extraction for file: {file_path!s}")
38 file_metadata = self._extract_file_metadata(file_path, content)
39 repo_metadata = self._extract_repo_metadata(file_path)
40 git_metadata = self._extract_git_metadata(file_path)
42 # Only extract structure metadata for markdown files
43 structure_metadata = {}
44 if file_path.lower().endswith(".md"):
45 self.logger.debug(f"Processing markdown file: {file_path!s}")
46 structure_metadata = self._extract_structure_metadata(content)
48 metadata = {
49 **file_metadata,
50 **repo_metadata,
51 **git_metadata,
52 **structure_metadata,
53 }
55 self.logger.debug(f"Completed metadata extraction for {file_path!s}.")
56 self.logger.debug(f"Metadata: {metadata!s}")
57 return metadata
59 def _extract_file_metadata(self, file_path: str, content: str) -> dict[str, Any]:
60 """Extract metadata about the file itself."""
61 # Get relative path from repository root
62 rel_path = os.path.relpath(file_path, self.config.temp_dir)
63 file_type = os.path.splitext(rel_path)[1]
64 file_name = os.path.basename(rel_path)
65 file_encoding = self._detect_encoding(content)
66 line_count = len(content.splitlines())
67 word_count = len(content.split())
68 file_size = len(content.encode(file_encoding))
70 return {
71 "file_type": file_type,
72 "file_name": file_name,
73 "file_directory": os.path.dirname("/" + file_path),
74 "file_encoding": file_encoding,
75 "line_count": line_count,
76 "word_count": word_count,
77 "file_size": file_size,
78 "has_code_blocks": self._has_code_blocks(content),
79 "has_images": self._has_images(content),
80 "has_links": self._has_links(content),
81 }
83 def _extract_repo_metadata(self, file_path: str) -> dict[str, Any]:
84 """Extract repository metadata from the given file path.
86 Args:
87 file_path (str): Path to the file.
89 Returns:
90 dict[str, Any]: Dictionary containing repository metadata.
91 """
92 try:
93 # Get repository URL from config
94 repo_url = str(self.config.base_url)
95 if not repo_url:
96 return {}
98 # Extract repository name and owner from normalized URL
99 normalized_url = repo_url[:-4] if repo_url.endswith(".git") else repo_url
100 repo_parts = normalized_url.split("/")
101 if len(repo_parts) >= 2:
102 repo_owner = repo_parts[-2]
103 repo_name = repo_parts[-1]
104 else:
105 repo_owner = ""
106 repo_name = normalized_url
108 # Initialize metadata with default values
109 metadata = {
110 "repository_name": repo_name,
111 "repository_owner": repo_owner,
112 "repository_url": repo_url,
113 "repository_description": "",
114 "repository_language": "",
115 }
117 try:
118 repo = git.Repo(self.config.temp_dir)
119 if repo and not repo.bare:
120 config = repo.config_reader()
121 # Try to get description from github section first
122 if config.has_section("github"):
123 metadata["repository_description"] = str(
124 config.get_value("github", "description", "")
125 )
126 metadata["repository_language"] = str(
127 config.get_value("github", "language", "")
128 )
129 # Fall back to core section if needed
130 if not metadata["repository_description"] and config.has_section(
131 "core"
132 ):
133 metadata["repository_description"] = str(
134 config.get_value("core", "description", "")
135 )
136 self.logger.debug(f"Repository metadata extracted: {metadata!s}")
137 except Exception as e:
138 self.logger.error(f"Failed to read Git config: {e}")
140 return metadata
141 except Exception as e:
142 self.logger.error(f"Failed to extract repository metadata: {str(e)!s}")
143 return {}
145 def _extract_git_metadata(self, file_path: str) -> dict[str, Any]:
146 """Extract Git-specific metadata."""
147 try:
148 repo = git.Repo(self.config.temp_dir)
149 metadata = {}
151 try:
152 # Get the relative path from the repository root
153 rel_path = os.path.relpath(file_path, repo.working_dir)
155 # Try to get commits for the file
156 commits = list(repo.iter_commits(paths=rel_path, max_count=1))
157 if commits:
158 last_commit = commits[0]
159 metadata.update(
160 {
161 "last_commit_date": last_commit.committed_datetime.isoformat(),
162 "last_commit_author": last_commit.author.name,
163 "last_commit_message": last_commit.message.strip(),
164 }
165 )
166 else:
167 # If no commits found for the file, try getting the latest commit
168 commits = list(repo.iter_commits(max_count=1))
169 if commits:
170 last_commit = commits[0]
171 metadata.update(
172 {
173 "last_commit_date": last_commit.committed_datetime.isoformat(),
174 "last_commit_author": last_commit.author.name,
175 "last_commit_message": last_commit.message.strip(),
176 }
177 )
178 else:
179 # If still no commits found, use repository's HEAD commit
180 head_commit = repo.head.commit
181 metadata.update(
182 {
183 "last_commit_date": head_commit.committed_datetime.isoformat(),
184 "last_commit_author": head_commit.author.name,
185 "last_commit_message": head_commit.message.strip(),
186 }
187 )
188 except Exception as e:
189 self.logger.debug(f"Failed to get commits: {e}")
190 # Try one last time with HEAD commit
191 try:
192 head_commit = repo.head.commit
193 metadata.update(
194 {
195 "last_commit_date": head_commit.committed_datetime.isoformat(),
196 "last_commit_author": head_commit.author.name,
197 "last_commit_message": head_commit.message.strip(),
198 }
199 )
200 except Exception as e:
201 self.logger.debug(f"Failed to get HEAD commit: {e}")
203 return metadata
204 except Exception as e:
205 self.logger.warning(f"Failed to extract Git metadata: {str(e)!s}")
206 return {}
208 def _extract_structure_metadata(self, content: str) -> dict[str, Any]:
209 """Extract metadata about the document structure."""
210 self.logger.debug("Starting structure metadata extraction")
211 self.logger.debug(f"Content to process:\n{content!s}")
213 has_toc = False
214 heading_levels = []
215 sections_count = 0
217 # Check if content is markdown by looking for markdown headers
218 # Look for markdown headers that:
219 # 1. Start with 1-6 # characters at the start of a line or after a newline
220 # 2. Are followed by whitespace and text
221 # 3. Continue until the next newline or end of content
222 headings = re.findall(
223 r"(?:^|\n)\s*(#{1,6})\s+(.+?)(?:\n|$)", content, re.MULTILINE
224 )
225 self.logger.debug(f"Found {len(headings)!s} headers in content")
227 if headings:
228 self.logger.debug(f"Headers found: {headings!s}")
229 has_toc = "## Table of Contents" in content or "## Contents" in content
230 heading_levels = [len(h[0]) for h in headings]
231 sections_count = len(heading_levels)
232 self.logger.debug(
233 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}"
234 )
235 else:
236 self.logger.warning("No headers found in content")
237 # Log the first few lines of content for debugging
238 first_lines = "\n".join(content.splitlines()[:5])
239 self.logger.debug(f"First few lines of content:\n{first_lines!s}")
240 # Try alternative header detection
241 alt_headings = re.findall(r"^#{1,6}\s+.+$", content, re.MULTILINE)
242 if alt_headings:
243 self.logger.debug(
244 f"Found {len(alt_headings)!s} headers using alternative pattern"
245 )
246 self.logger.debug(f"Alternative headers found: {alt_headings!s}")
247 has_toc = "## Table of Contents" in content or "## Contents" in content
248 heading_levels = []
249 for h in alt_headings:
250 match = re.match(r"^(#{1,6})", h)
251 if match:
252 heading_levels.append(len(match.group(1)))
253 sections_count = len(heading_levels)
254 self.logger.debug(
255 f"Has TOC: {has_toc!s}, Heading levels: {heading_levels!s}, Sections count: {sections_count!s}"
256 )
258 metadata = {
259 "has_toc": has_toc,
260 "heading_levels": heading_levels,
261 "sections_count": sections_count,
262 }
264 self.logger.debug(f"Structure metadata extraction completed: {metadata!s}")
265 return metadata
267 def _get_repo_description(self, repo: git.Repo, file_path: str) -> str:
268 """Get repository description from Git config or README."""
269 try:
270 # Try to get description from Git config
271 config = repo.config_reader()
272 try:
273 if config.has_section('remote "origin"'):
274 description = str(
275 config.get_value('remote "origin"', "description", default="")
276 )
277 if (
278 description
279 and isinstance(description, str)
280 and description.strip()
281 and "Unnamed repository;" not in description
282 ):
283 return description.strip()
284 except Exception as e:
285 self.logger.debug(f"Failed to read Git config: {e}")
287 # Try to find description in README files
288 readme_files = ["README.md", "README.txt", "README", "README.rst"]
289 repo_root = repo.working_dir
290 for readme_file in readme_files:
291 readme_path = os.path.join(repo_root, readme_file)
292 if os.path.exists(readme_path) and os.path.isfile(readme_path):
293 try:
294 with open(readme_path, encoding="utf-8") as f:
295 content = f.read()
296 paragraphs = []
297 current_paragraph = []
298 in_title = True
299 for line in content.splitlines():
300 line = line.strip()
301 # Skip badges and links at the start
302 if in_title and (
303 line.startswith("[![") or line.startswith("[")
304 ):
305 continue
306 # Skip empty lines
307 if not line:
308 if current_paragraph:
309 paragraphs.append(" ".join(current_paragraph))
310 current_paragraph = []
311 continue
312 # Skip titles
313 if line.startswith("#") or line.startswith("==="):
314 in_title = True
315 continue
316 # Skip common sections
317 if line.lower() in [
318 "## installation",
319 "## usage",
320 "## contributing",
321 "## license",
322 ]:
323 break
324 in_title = False
325 current_paragraph.append(line)
327 if current_paragraph:
328 paragraphs.append(" ".join(current_paragraph))
330 # Find first meaningful paragraph
331 for paragraph in paragraphs:
332 if (
333 len(paragraph) >= 50
334 ): # Minimum length for a meaningful description
335 # Clean up markdown links
336 paragraph = re.sub(
337 r"\[([^\]]+)\]\([^)]+\)", r"\1", paragraph
338 )
339 # Clean up HTML tags
340 paragraph = re.sub(r"<[^>]+>", "", paragraph)
341 # Limit length and break at sentence boundary
342 if len(paragraph) > 200:
343 sentences = re.split(
344 r"(?<=[.!?])\s+", paragraph
345 )
346 description = ""
347 for sentence in sentences:
348 if len(description + sentence) > 200:
349 break
350 description += sentence + " "
351 description = description.strip() + "..."
352 else:
353 description = paragraph
354 return description
355 except Exception as e:
356 self.logger.debug(f"Failed to read README {readme_file}: {e}")
357 continue
359 except Exception as e:
360 self.logger.debug(f"Failed to get repository description: {e}")
362 return "No description available"
364 def _detect_encoding(self, content: str) -> str:
365 """Detect file encoding."""
366 if not content:
367 return "utf-8"
369 try:
370 result = chardet.detect(content.encode())
371 if (
372 result["encoding"]
373 and result["encoding"].lower() != "ascii"
374 and result["confidence"] > 0.8
375 ):
376 return result["encoding"].lower()
377 except Exception as e:
378 self.logger.error({"event": "Failed to detect encoding", "error": str(e)})
380 return "utf-8"
382 def _detect_language(self, file_path: str) -> str:
383 """Detect programming language based on file extension."""
384 ext = os.path.splitext(file_path)[1].lower()
385 language_map = {
386 ".py": "Python",
387 ".js": "JavaScript",
388 ".ts": "TypeScript",
389 ".java": "Java",
390 ".cpp": "C++",
391 ".c": "C",
392 ".go": "Go",
393 ".rs": "Rust",
394 ".rb": "Ruby",
395 ".php": "PHP",
396 ".cs": "C#",
397 ".scala": "Scala",
398 ".kt": "Kotlin",
399 ".swift": "Swift",
400 ".m": "Objective-C",
401 ".h": "C/C++ Header",
402 ".sh": "Shell",
403 ".bat": "Batch",
404 ".ps1": "PowerShell",
405 ".md": "Markdown",
406 ".rst": "reStructuredText",
407 ".txt": "Text",
408 ".json": "JSON",
409 ".xml": "XML",
410 ".yaml": "YAML",
411 ".yml": "YAML",
412 ".toml": "TOML",
413 ".ini": "INI",
414 ".cfg": "Configuration",
415 ".conf": "Configuration",
416 }
417 return language_map.get(ext, "Unknown")
419 def _has_code_blocks(self, content: str) -> bool:
420 """Check if content contains code blocks."""
421 return bool(re.search(r"```[a-zA-Z]*\n[\s\S]*?\n```", content))
423 def _has_images(self, content: str) -> bool:
424 """Check if content contains image references."""
425 return bool(re.search(r"!\[.*?\]\(.*?\)", content))
427 def _has_links(self, content: str) -> bool:
428 """Check if content contains links."""
429 return bool(re.search(r"\[.*?\]\(.*?\)", content))
431 def _get_heading_levels(self, content: str) -> list[int]:
432 """Get list of heading levels in the content."""
433 headings = re.findall(r"^(#+)\s", content, re.MULTILINE)
434 return [len(h) for h in headings]