Coverage for src/qdrant_loader/connectors/localfile/file_processor.py: 52%
91 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-06-04 05:50 +0000
1"""File processing and filtering logic for LocalFile connector."""
3import fnmatch
4import os
5from typing import TYPE_CHECKING, Optional
7from qdrant_loader.utils.logging import LoggingConfig
9if TYPE_CHECKING:
10 from qdrant_loader.core.file_conversion import FileDetector
12from .config import LocalFileConfig
15class LocalFileFileProcessor:
16 """Handles file processing and filtering logic for local files."""
18 def __init__(
19 self,
20 config: LocalFileConfig,
21 base_path: str,
22 file_detector: Optional["FileDetector"] = None,
23 ):
24 """Initialize the file processor.
26 Args:
27 config: Local file configuration
28 base_path: Base directory path
29 file_detector: Optional file detector for conversion support
30 """
31 self.config = config
32 self.base_path = base_path
33 self.file_detector = file_detector
34 self.logger = LoggingConfig.get_logger(__name__)
36 def should_process_file(self, file_path: str) -> bool:
37 try:
38 self.logger.debug(
39 "Checking if file should be processed", file_path=file_path
40 )
41 self.logger.debug(
42 "Current configuration",
43 file_types=self.config.file_types,
44 include_paths=self.config.include_paths,
45 exclude_paths=self.config.exclude_paths,
46 max_file_size=self.config.max_file_size,
47 )
49 if not os.path.isfile(file_path):
50 self.logger.debug(f"Skipping {file_path}: file does not exist")
51 return False
52 if not os.access(file_path, os.R_OK):
53 self.logger.debug(f"Skipping {file_path}: file is not readable")
54 return False
56 rel_path = os.path.relpath(file_path, self.base_path)
57 file_basename = os.path.basename(rel_path)
58 if file_basename.startswith("."):
59 self.logger.debug(
60 f"Skipping {rel_path}: invalid filename (starts with dot)"
61 )
62 return False
64 for pattern in self.config.exclude_paths:
65 pattern = pattern.lstrip("/")
66 if pattern.endswith("/**"):
67 dir_pattern = pattern[:-3]
68 if dir_pattern == os.path.dirname(rel_path) or os.path.dirname(
69 rel_path
70 ).startswith(dir_pattern + "/"):
71 self.logger.debug(
72 f"Skipping {rel_path}: matches exclude directory pattern {pattern}"
73 )
74 return False
75 elif pattern.endswith("/"):
76 dir_pattern = pattern[:-1]
77 if os.path.dirname(rel_path) == dir_pattern or os.path.dirname(
78 rel_path
79 ).startswith(dir_pattern + "/"):
80 self.logger.debug(
81 f"Skipping {rel_path}: matches exclude directory pattern {pattern}"
82 )
83 return False
84 elif fnmatch.fnmatch(rel_path, pattern):
85 self.logger.debug(
86 f"Skipping {rel_path}: matches exclude pattern {pattern}"
87 )
88 return False
90 file_type_match = False
91 file_ext = os.path.splitext(file_basename)[1].lower()
92 self.logger.debug(f"Checking file extension: {file_ext}")
94 # First check configured file types
95 for pattern in self.config.file_types:
96 self.logger.debug(f"Checking file type pattern: {pattern}")
97 pattern_ext = os.path.splitext(pattern)[1].lower()
98 if pattern_ext and file_ext == pattern_ext:
99 file_type_match = True
100 self.logger.debug(
101 f"File {rel_path} matches file type pattern {pattern}"
102 )
103 break
105 # If file conversion is enabled and file doesn't match configured types,
106 # check if it can be converted
107 if (
108 not file_type_match
109 and self.config.enable_file_conversion
110 and self.file_detector
111 ):
112 if self.file_detector.is_supported_for_conversion(file_path):
113 file_type_match = True
114 self.logger.debug(f"File {rel_path} supported for conversion")
116 if not file_type_match:
117 self.logger.debug(
118 f"Skipping {rel_path}: does not match any file type patterns and not supported for conversion"
119 )
120 return False
122 file_size = os.path.getsize(file_path)
123 if file_size > self.config.max_file_size:
124 self.logger.debug(f"Skipping {rel_path}: exceeds max file size")
125 return False
127 if not self.config.include_paths:
128 return True
130 rel_dir = os.path.dirname(rel_path)
131 for pattern in self.config.include_paths:
132 pattern = pattern.lstrip("/")
133 if pattern == "" or pattern == "/":
134 if rel_dir == "":
135 return True
136 if pattern.endswith("/**/*"):
137 dir_pattern = pattern[:-5]
138 if dir_pattern == "" or dir_pattern == "/":
139 return True
140 if dir_pattern == rel_dir or rel_dir.startswith(dir_pattern + "/"):
141 return True
142 elif pattern.endswith("/"):
143 dir_pattern = pattern[:-1]
144 if dir_pattern == "" or dir_pattern == "/":
145 if rel_dir == "":
146 return True
147 if dir_pattern == rel_dir or rel_dir.startswith(dir_pattern + "/"):
148 return True
149 elif fnmatch.fnmatch(rel_path, pattern):
150 return True
151 return False
152 except Exception as e:
153 self.logger.error(f"Error checking if file should be processed: {e}")
154 return False