Coverage for src/qdrant_loader/connectors/git/file_processor.py: 82%

105 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 05:50 +0000

1"""File processing and filtering logic for Git connector.""" 

2 

3import fnmatch 

4import os 

5from typing import TYPE_CHECKING, Optional 

6 

7from qdrant_loader.utils.logging import LoggingConfig 

8 

9if TYPE_CHECKING: 

10 from qdrant_loader.connectors.git.config import GitRepoConfig 

11 from qdrant_loader.core.file_conversion import FileDetector 

12 

13logger = LoggingConfig.get_logger(__name__) 

14 

15 

16class FileProcessor: 

17 """Handles file processing and filtering logic.""" 

18 

19 def __init__( 

20 self, 

21 config: "GitRepoConfig", 

22 temp_dir: str, 

23 file_detector: Optional["FileDetector"] = None, 

24 ): 

25 """Initialize the file processor. 

26 

27 Args: 

28 config: Git repository configuration 

29 temp_dir: Temporary directory path 

30 file_detector: Optional file detector for conversion support 

31 """ 

32 self.config = config 

33 self.temp_dir = temp_dir 

34 self.file_detector = file_detector 

35 self.logger = LoggingConfig.get_logger(__name__) 

36 

37 def should_process_file(self, file_path: str) -> bool: 

38 """Check if a file should be processed based on configuration. 

39 

40 Args: 

41 file_path: Path to the file 

42 

43 Returns: 

44 True if the file should be processed, False otherwise 

45 """ 

46 try: 

47 self.logger.debug( 

48 "Checking if file should be processed", file_path=file_path 

49 ) 

50 self.logger.debug( 

51 "Current configuration", 

52 file_types=self.config.file_types, 

53 include_paths=self.config.include_paths, 

54 exclude_paths=self.config.exclude_paths, 

55 max_file_size=self.config.max_file_size, 

56 ) 

57 

58 # Check if file exists and is readable 

59 if not os.path.isfile(file_path): 

60 self.logger.debug(f"Skipping {file_path}: file does not exist") 

61 return False 

62 if not os.access(file_path, os.R_OK): 

63 self.logger.debug(f"Skipping {file_path}: file is not readable") 

64 return False 

65 

66 # Get relative path from repository root 

67 rel_path = os.path.relpath(file_path, self.temp_dir) 

68 self.logger.debug(f"Relative path: {rel_path}") 

69 

70 # Skip files that are just extensions without names (e.g. ".md") 

71 file_basename = os.path.basename(rel_path) 

72 if file_basename.startswith("."): 

73 self.logger.debug( 

74 f"Skipping {rel_path}: invalid filename (starts with dot)" 

75 ) 

76 return False 

77 

78 # Check if file matches any exclude patterns first 

79 for pattern in self.config.exclude_paths: 

80 pattern = pattern.lstrip("/") 

81 self.logger.debug(f"Checking exclude pattern: {pattern}") 

82 if pattern.endswith("/**"): 

83 dir_pattern = pattern[:-3] # Remove /** suffix 

84 if dir_pattern == os.path.dirname(rel_path) or os.path.dirname( 

85 rel_path 

86 ).startswith(dir_pattern + "/"): 

87 self.logger.debug( 

88 f"Skipping {rel_path}: matches exclude directory pattern {pattern}" 

89 ) 

90 return False 

91 elif pattern.endswith("/"): 

92 dir_pattern = pattern[:-1] # Remove trailing slash 

93 if os.path.dirname(rel_path) == dir_pattern or os.path.dirname( 

94 rel_path 

95 ).startswith(dir_pattern + "/"): 

96 self.logger.debug( 

97 f"Skipping {rel_path}: matches exclude directory pattern {pattern}" 

98 ) 

99 return False 

100 elif fnmatch.fnmatch(rel_path, pattern): 

101 self.logger.debug( 

102 f"Skipping {rel_path}: matches exclude pattern {pattern}" 

103 ) 

104 return False 

105 

106 # Check if file matches any file type patterns (case-insensitive) 

107 file_type_match = False 

108 file_ext = os.path.splitext(file_basename)[ 

109 1 

110 ].lower() # Get extension with dot 

111 self.logger.debug(f"Checking file extension: {file_ext}") 

112 

113 # First check configured file types 

114 for pattern in self.config.file_types: 

115 self.logger.debug(f"Checking file type pattern: {pattern}") 

116 # Extract extension from pattern (e.g., "*.md" -> ".md") 

117 pattern_ext = os.path.splitext(pattern)[1].lower() 

118 if pattern_ext and file_ext == pattern_ext: 

119 file_type_match = True 

120 self.logger.debug( 

121 f"File {rel_path} matches file type pattern {pattern}" 

122 ) 

123 break 

124 

125 # If file conversion is enabled and file doesn't match configured types, 

126 # check if it can be converted 

127 if ( 

128 not file_type_match 

129 and self.config.enable_file_conversion 

130 and self.file_detector 

131 ): 

132 if self.file_detector.is_supported_for_conversion(file_path): 

133 file_type_match = True 

134 self.logger.debug(f"File {rel_path} supported for conversion") 

135 

136 if not file_type_match: 

137 self.logger.debug( 

138 f"Skipping {rel_path}: does not match any file type patterns and not supported for conversion" 

139 ) 

140 return False 

141 

142 # Check file size 

143 file_size = os.path.getsize(file_path) 

144 self.logger.debug( 

145 f"File size: {file_size} bytes (max: {self.config.max_file_size})" 

146 ) 

147 if file_size > self.config.max_file_size: 

148 self.logger.debug(f"Skipping {rel_path}: exceeds max file size") 

149 return False 

150 

151 # Check if file matches any include patterns 

152 if not self.config.include_paths: 

153 # If no include paths specified, include everything 

154 self.logger.debug("No include paths specified, including all files") 

155 return True 

156 

157 # Get the file's directory relative to repo root 

158 rel_dir = os.path.dirname(rel_path) 

159 self.logger.debug(f"Checking include patterns for directory: {rel_dir}") 

160 

161 for pattern in self.config.include_paths: 

162 pattern = pattern.lstrip("/") 

163 self.logger.debug(f"Checking include pattern: {pattern}") 

164 if pattern == "" or pattern == "/": 

165 # Root pattern means include only files in root directory 

166 if rel_dir == "": 

167 self.logger.debug(f"Including {rel_path}: matches root pattern") 

168 return True 

169 if pattern.endswith("/**/*"): 

170 dir_pattern = pattern[:-5] # Remove /**/* suffix 

171 if dir_pattern == "" or dir_pattern == "/": 

172 self.logger.debug( 

173 f"Including {rel_path}: matches root /**/* pattern" 

174 ) 

175 return True # Root pattern with /**/* means include everything 

176 if dir_pattern == rel_dir or rel_dir.startswith(dir_pattern + "/"): 

177 self.logger.debug( 

178 f"Including {rel_path}: matches directory pattern {pattern}" 

179 ) 

180 return True 

181 elif pattern.endswith("/"): 

182 dir_pattern = pattern[:-1] # Remove trailing slash 

183 if dir_pattern == "" or dir_pattern == "/": 

184 # Root pattern with / means include only files in root directory 

185 if rel_dir == "": 

186 self.logger.debug( 

187 f"Including {rel_path}: matches root pattern" 

188 ) 

189 return True 

190 if dir_pattern == rel_dir or rel_dir.startswith(dir_pattern + "/"): 

191 self.logger.debug( 

192 f"Including {rel_path}: matches directory pattern {pattern}" 

193 ) 

194 return True 

195 elif fnmatch.fnmatch(rel_path, pattern): 

196 self.logger.debug( 

197 f"Including {rel_path}: matches exact pattern {pattern}" 

198 ) 

199 return True 

200 

201 # If we have include patterns but none matched, exclude the file 

202 self.logger.debug(f"Skipping {rel_path}: not in include paths") 

203 return False 

204 

205 except Exception as e: 

206 self.logger.error(f"Error checking if file should be processed: {e}") 

207 return False