Coverage for src/qdrant_loader/connectors/git/file_processor.py: 80%
110 statements
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
« prev ^ index » next coverage.py v7.10.0, created at 2025-07-25 11:39 +0000
1"""File processing and filtering logic for Git connector."""
3import fnmatch
4import os
5from typing import TYPE_CHECKING, Optional
7from qdrant_loader.utils.logging import LoggingConfig
9if TYPE_CHECKING:
10 from qdrant_loader.connectors.git.config import GitRepoConfig
11 from qdrant_loader.core.file_conversion import FileDetector
13logger = LoggingConfig.get_logger(__name__)
16class FileProcessor:
17 """Handles file processing and filtering logic."""
19 def __init__(
20 self,
21 config: "GitRepoConfig",
22 temp_dir: str,
23 file_detector: Optional["FileDetector"] = None,
24 ):
25 """Initialize the file processor.
27 Args:
28 config: Git repository configuration
29 temp_dir: Temporary directory path
30 file_detector: Optional file detector for conversion support
31 """
32 self.config = config
33 self.temp_dir = temp_dir
34 self.file_detector = file_detector
35 self.logger = LoggingConfig.get_logger(__name__)
37 def should_process_file(self, file_path: str) -> bool:
38 """Check if a file should be processed based on configuration.
40 Args:
41 file_path: Path to the file
43 Returns:
44 True if the file should be processed, False otherwise
45 """
46 try:
47 self.logger.debug(
48 "Checking if file should be processed",
49 file_path=file_path.replace("\\", "/"),
50 )
51 self.logger.debug(
52 "Current configuration",
53 file_types=self.config.file_types,
54 include_paths=self.config.include_paths,
55 exclude_paths=self.config.exclude_paths,
56 max_file_size=self.config.max_file_size,
57 )
59 # Check if file exists and is readable
60 if not os.path.isfile(file_path):
61 self.logger.debug(f"Skipping {file_path}: file does not exist")
62 return False
63 if not os.access(file_path, os.R_OK):
64 self.logger.debug(f"Skipping {file_path}: file is not readable")
65 return False
67 # Get relative path from repository root
68 rel_path = os.path.relpath(file_path, self.temp_dir)
69 self.logger.debug(f"Relative path: {rel_path}")
71 # Skip files that are just extensions without names (e.g. ".md")
72 file_basename = os.path.basename(rel_path)
73 if file_basename.startswith("."):
74 self.logger.debug(
75 f"Skipping {rel_path}: invalid filename (starts with dot)"
76 )
77 return False
79 # Check if file matches any exclude patterns first
80 for pattern in self.config.exclude_paths:
81 pattern = pattern.lstrip("/")
82 self.logger.debug(f"Checking exclude pattern: {pattern}")
83 if pattern.endswith("/**"):
84 dir_pattern = pattern[:-3] # Remove /** suffix
85 if dir_pattern == os.path.dirname(rel_path) or os.path.dirname(
86 rel_path
87 ).startswith(dir_pattern + "/"):
88 self.logger.debug(
89 f"Skipping {rel_path}: matches exclude directory pattern {pattern}"
90 )
91 return False
92 elif pattern.endswith("/"):
93 dir_pattern = pattern[:-1] # Remove trailing slash
94 if os.path.dirname(rel_path) == dir_pattern or os.path.dirname(
95 rel_path
96 ).startswith(dir_pattern + "/"):
97 self.logger.debug(
98 f"Skipping {rel_path}: matches exclude directory pattern {pattern}"
99 )
100 return False
101 elif fnmatch.fnmatch(rel_path, pattern):
102 self.logger.debug(
103 f"Skipping {rel_path}: matches exclude pattern {pattern}"
104 )
105 return False
107 # Check if file matches any file type patterns (case-insensitive)
108 file_type_match = False
109 file_ext = os.path.splitext(file_basename)[
110 1
111 ].lower() # Get extension with dot
112 self.logger.debug(f"Checking file extension: {file_ext}")
114 # If no file types are configured, process all files (default behavior)
115 if not self.config.file_types:
116 self.logger.debug(
117 "No file types configured, processing all readable files"
118 )
119 file_type_match = True
120 else:
121 # Check configured file types
122 for pattern in self.config.file_types:
123 self.logger.debug(f"Checking file type pattern: {pattern}")
124 # Handle patterns that start with a dot (e.g., ".txt") or extract from glob (e.g., "*.md" -> ".md")
125 if pattern.startswith('.'):
126 pattern_ext = pattern.lower()
127 else:
128 pattern_ext = os.path.splitext(pattern)[1].lower()
130 if pattern_ext and file_ext == pattern_ext:
131 file_type_match = True
132 self.logger.debug(
133 f"File {rel_path} matches file type pattern {pattern}"
134 )
135 break
137 # If file conversion is enabled and file doesn't match configured types,
138 # check if it can be converted
139 if (
140 not file_type_match
141 and self.config.enable_file_conversion
142 and self.file_detector
143 ):
144 if self.file_detector.is_supported_for_conversion(file_path):
145 file_type_match = True
146 self.logger.debug(f"File {rel_path} supported for conversion")
148 if not file_type_match:
149 self.logger.debug(
150 f"Skipping {rel_path}: does not match any file type patterns and not supported for conversion"
151 )
152 return False
154 # Check file size
155 file_size = os.path.getsize(file_path)
156 self.logger.debug(
157 f"File size: {file_size} bytes (max: {self.config.max_file_size})"
158 )
159 if file_size > self.config.max_file_size:
160 self.logger.debug(f"Skipping {rel_path}: exceeds max file size")
161 return False
163 # Check if file matches any include patterns
164 if not self.config.include_paths:
165 # If no include paths specified, include everything
166 self.logger.debug("No include paths specified, including all files")
167 return True
169 # Get the file's directory relative to repo root
170 rel_dir = os.path.dirname(rel_path)
171 self.logger.debug(f"Checking include patterns for directory: {rel_dir}")
173 for pattern in self.config.include_paths:
174 pattern = pattern.lstrip("/")
175 self.logger.debug(f"Checking include pattern: {pattern}")
176 if pattern == "" or pattern == "/":
177 # Root pattern means include only files in root directory
178 if rel_dir == "":
179 self.logger.debug(f"Including {rel_path}: matches root pattern")
180 return True
181 if pattern.endswith("/**/*"):
182 dir_pattern = pattern[:-5] # Remove /**/* suffix
183 if dir_pattern == "" or dir_pattern == "/":
184 self.logger.debug(
185 f"Including {rel_path}: matches root /**/* pattern"
186 )
187 return True # Root pattern with /**/* means include everything
188 if dir_pattern == rel_dir or rel_dir.startswith(dir_pattern + "/"):
189 self.logger.debug(
190 f"Including {rel_path}: matches directory pattern {pattern}"
191 )
192 return True
193 elif pattern.endswith("/"):
194 dir_pattern = pattern[:-1] # Remove trailing slash
195 if dir_pattern == "" or dir_pattern == "/":
196 # Root pattern with / means include only files in root directory
197 if rel_dir == "":
198 self.logger.debug(
199 f"Including {rel_path}: matches root pattern"
200 )
201 return True
202 if dir_pattern == rel_dir or rel_dir.startswith(dir_pattern + "/"):
203 self.logger.debug(
204 f"Including {rel_path}: matches directory pattern {pattern}"
205 )
206 return True
207 elif fnmatch.fnmatch(rel_path, pattern):
208 self.logger.debug(
209 f"Including {rel_path}: matches exact pattern {pattern}"
210 )
211 return True
213 # If we have include patterns but none matched, exclude the file
214 self.logger.debug(f"Skipping {rel_path}: not in include paths")
215 return False
217 except Exception as e:
218 self.logger.error(f"Error checking if file should be processed: {e}")
219 return False