Coverage for src/qdrant_loader/connectors/git/file_processor.py: 82%
105 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""File processing and filtering logic for Git connector."""
3import fnmatch
4import os
5from typing import TYPE_CHECKING, Optional
7from qdrant_loader.utils.logging import LoggingConfig
9if TYPE_CHECKING:
10 from qdrant_loader.connectors.git.config import GitRepoConfig
11 from qdrant_loader.core.file_conversion import FileDetector
13logger = LoggingConfig.get_logger(__name__)
16class FileProcessor:
17 """Handles file processing and filtering logic."""
19 def __init__(
20 self,
21 config: "GitRepoConfig",
22 temp_dir: str,
23 file_detector: Optional["FileDetector"] = None,
24 ):
25 """Initialize the file processor.
27 Args:
28 config: Git repository configuration
29 temp_dir: Temporary directory path
30 file_detector: Optional file detector for conversion support
31 """
32 self.config = config
33 self.temp_dir = temp_dir
34 self.file_detector = file_detector
35 self.logger = LoggingConfig.get_logger(__name__)
37 def should_process_file(self, file_path: str) -> bool:
38 """Check if a file should be processed based on configuration.
40 Args:
41 file_path: Path to the file
43 Returns:
44 True if the file should be processed, False otherwise
45 """
46 try:
47 self.logger.debug(
48 "Checking if file should be processed", file_path=file_path
49 )
50 self.logger.debug(
51 "Current configuration",
52 file_types=self.config.file_types,
53 include_paths=self.config.include_paths,
54 exclude_paths=self.config.exclude_paths,
55 max_file_size=self.config.max_file_size,
56 )
58 # Check if file exists and is readable
59 if not os.path.isfile(file_path):
60 self.logger.debug(f"Skipping {file_path}: file does not exist")
61 return False
62 if not os.access(file_path, os.R_OK):
63 self.logger.debug(f"Skipping {file_path}: file is not readable")
64 return False
66 # Get relative path from repository root
67 rel_path = os.path.relpath(file_path, self.temp_dir)
68 self.logger.debug(f"Relative path: {rel_path}")
70 # Skip files that are just extensions without names (e.g. ".md")
71 file_basename = os.path.basename(rel_path)
72 if file_basename.startswith("."):
73 self.logger.debug(
74 f"Skipping {rel_path}: invalid filename (starts with dot)"
75 )
76 return False
78 # Check if file matches any exclude patterns first
79 for pattern in self.config.exclude_paths:
80 pattern = pattern.lstrip("/")
81 self.logger.debug(f"Checking exclude pattern: {pattern}")
82 if pattern.endswith("/**"):
83 dir_pattern = pattern[:-3] # Remove /** suffix
84 if dir_pattern == os.path.dirname(rel_path) or os.path.dirname(
85 rel_path
86 ).startswith(dir_pattern + "/"):
87 self.logger.debug(
88 f"Skipping {rel_path}: matches exclude directory pattern {pattern}"
89 )
90 return False
91 elif pattern.endswith("/"):
92 dir_pattern = pattern[:-1] # Remove trailing slash
93 if os.path.dirname(rel_path) == dir_pattern or os.path.dirname(
94 rel_path
95 ).startswith(dir_pattern + "/"):
96 self.logger.debug(
97 f"Skipping {rel_path}: matches exclude directory pattern {pattern}"
98 )
99 return False
100 elif fnmatch.fnmatch(rel_path, pattern):
101 self.logger.debug(
102 f"Skipping {rel_path}: matches exclude pattern {pattern}"
103 )
104 return False
106 # Check if file matches any file type patterns (case-insensitive)
107 file_type_match = False
108 file_ext = os.path.splitext(file_basename)[
109 1
110 ].lower() # Get extension with dot
111 self.logger.debug(f"Checking file extension: {file_ext}")
113 # First check configured file types
114 for pattern in self.config.file_types:
115 self.logger.debug(f"Checking file type pattern: {pattern}")
116 # Extract extension from pattern (e.g., "*.md" -> ".md")
117 pattern_ext = os.path.splitext(pattern)[1].lower()
118 if pattern_ext and file_ext == pattern_ext:
119 file_type_match = True
120 self.logger.debug(
121 f"File {rel_path} matches file type pattern {pattern}"
122 )
123 break
125 # If file conversion is enabled and file doesn't match configured types,
126 # check if it can be converted
127 if (
128 not file_type_match
129 and self.config.enable_file_conversion
130 and self.file_detector
131 ):
132 if self.file_detector.is_supported_for_conversion(file_path):
133 file_type_match = True
134 self.logger.debug(f"File {rel_path} supported for conversion")
136 if not file_type_match:
137 self.logger.debug(
138 f"Skipping {rel_path}: does not match any file type patterns and not supported for conversion"
139 )
140 return False
142 # Check file size
143 file_size = os.path.getsize(file_path)
144 self.logger.debug(
145 f"File size: {file_size} bytes (max: {self.config.max_file_size})"
146 )
147 if file_size > self.config.max_file_size:
148 self.logger.debug(f"Skipping {rel_path}: exceeds max file size")
149 return False
151 # Check if file matches any include patterns
152 if not self.config.include_paths:
153 # If no include paths specified, include everything
154 self.logger.debug("No include paths specified, including all files")
155 return True
157 # Get the file's directory relative to repo root
158 rel_dir = os.path.dirname(rel_path)
159 self.logger.debug(f"Checking include patterns for directory: {rel_dir}")
161 for pattern in self.config.include_paths:
162 pattern = pattern.lstrip("/")
163 self.logger.debug(f"Checking include pattern: {pattern}")
164 if pattern == "" or pattern == "/":
165 # Root pattern means include only files in root directory
166 if rel_dir == "":
167 self.logger.debug(f"Including {rel_path}: matches root pattern")
168 return True
169 if pattern.endswith("/**/*"):
170 dir_pattern = pattern[:-5] # Remove /**/* suffix
171 if dir_pattern == "" or dir_pattern == "/":
172 self.logger.debug(
173 f"Including {rel_path}: matches root /**/* pattern"
174 )
175 return True # Root pattern with /**/* means include everything
176 if dir_pattern == rel_dir or rel_dir.startswith(dir_pattern + "/"):
177 self.logger.debug(
178 f"Including {rel_path}: matches directory pattern {pattern}"
179 )
180 return True
181 elif pattern.endswith("/"):
182 dir_pattern = pattern[:-1] # Remove trailing slash
183 if dir_pattern == "" or dir_pattern == "/":
184 # Root pattern with / means include only files in root directory
185 if rel_dir == "":
186 self.logger.debug(
187 f"Including {rel_path}: matches root pattern"
188 )
189 return True
190 if dir_pattern == rel_dir or rel_dir.startswith(dir_pattern + "/"):
191 self.logger.debug(
192 f"Including {rel_path}: matches directory pattern {pattern}"
193 )
194 return True
195 elif fnmatch.fnmatch(rel_path, pattern):
196 self.logger.debug(
197 f"Including {rel_path}: matches exact pattern {pattern}"
198 )
199 return True
201 # If we have include patterns but none matched, exclude the file
202 self.logger.debug(f"Skipping {rel_path}: not in include paths")
203 return False
205 except Exception as e:
206 self.logger.error(f"Error checking if file should be processed: {e}")
207 return False