Coverage for src/qdrant_loader/core/chunking/strategy/default/text_document_parser.py: 97%
117 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""Document parser for plain text documents."""
3import re
4from typing import Any
6from qdrant_loader.core.chunking.strategy.base.document_parser import BaseDocumentParser
9class TextDocumentParser(BaseDocumentParser):
10 """Parser for plain text documents.
12 This parser analyzes text structure including paragraphs, sentences,
13 and basic content characteristics to support intelligent chunking.
14 """
16 def parse_document_structure(self, content: str) -> dict[str, Any]:
17 """Analyze text structure (paragraphs, sentences, formatting).
19 Args:
20 content: The text content to analyze
22 Returns:
23 Dictionary containing structural analysis of the text
24 """
25 paragraphs = self._split_paragraphs(content)
26 sentences = self._split_sentences(content)
28 # Analyze content characteristics
29 analysis = self.analyze_content_characteristics(content)
31 # Add text-specific structure information
32 structure = {
33 "structure_type": "plain_text",
34 "paragraph_count": len(paragraphs),
35 "sentence_count": len(sentences),
36 "avg_paragraph_length": (
37 sum(len(p) for p in paragraphs) / len(paragraphs) if paragraphs else 0
38 ),
39 "avg_sentence_length": (
40 sum(len(s) for s in sentences) / len(sentences) if sentences else 0
41 ),
42 "has_list_items": self._has_list_items(content),
43 "has_numbered_sections": self._has_numbered_sections(content),
44 "formatting_indicators": self._analyze_formatting(content),
45 "content_density": self._calculate_content_density(content),
46 }
48 # Merge with base analysis
49 structure.update(analysis)
51 return structure
53 def extract_section_metadata(self, section: Any) -> dict[str, Any]:
54 """Extract metadata from a text section.
56 Args:
57 section: The text section (string content)
59 Returns:
60 Dictionary containing section metadata
61 """
62 if not isinstance(section, str):
63 section = str(section)
65 metadata = {
66 "section_type": "text_paragraph",
67 "length": len(section),
68 "word_count": len(section.split()),
69 "sentence_count": len(self._split_sentences(section)),
70 "has_formatting": self._has_formatting_markers(section),
71 "is_list_item": self._is_list_item(section),
72 "is_numbered_item": self._is_numbered_item(section),
73 "content_type": self._classify_content_type(section),
74 }
76 return metadata
78 def _split_paragraphs(self, content: str) -> list[str]:
79 """Split content into paragraphs.
81 Args:
82 content: The content to split
84 Returns:
85 List of paragraph strings
86 """
87 # Split on double newlines, but also handle single newlines with significant whitespace
88 paragraphs = []
90 # First split on double newlines
91 double_newline_splits = content.split("\n\n")
93 for split in double_newline_splits:
94 # Further split on single newlines if they separate distinct content
95 lines = split.split("\n")
96 current_paragraph = []
98 for line in lines:
99 line = line.strip()
100 if not line:
101 # Empty line - finish current paragraph if it has content
102 if current_paragraph:
103 paragraphs.append("\n".join(current_paragraph))
104 current_paragraph = []
105 elif self._is_new_paragraph_start(line, current_paragraph):
106 # This line starts a new paragraph
107 if current_paragraph:
108 paragraphs.append("\n".join(current_paragraph))
109 current_paragraph = [line]
110 else:
111 # This line continues the current paragraph
112 current_paragraph.append(line)
114 # Add any remaining paragraph
115 if current_paragraph:
116 paragraphs.append("\n".join(current_paragraph))
118 return [p.strip() for p in paragraphs if p.strip()]
120 def _split_sentences(self, content: str) -> list[str]:
121 """Split content into sentences.
123 Args:
124 content: The content to split
126 Returns:
127 List of sentence strings
128 """
129 # Use a more sophisticated sentence splitting pattern
130 sentence_pattern = r"(?<=[.!?])\s+(?=[A-Z])"
131 sentences = re.split(sentence_pattern, content)
133 # Clean up and filter sentences
134 cleaned_sentences = []
135 for sentence in sentences:
136 sentence = sentence.strip()
137 if sentence and len(sentence) > 3: # Filter out very short fragments
138 cleaned_sentences.append(sentence)
140 return cleaned_sentences
142 def _has_list_items(self, content: str) -> bool:
143 """Check if content contains list items.
145 Args:
146 content: The content to check
148 Returns:
149 True if content contains list items
150 """
151 list_patterns = [
152 r"^\s*[-*+]\s+", # Bullet points
153 r"^\s*\d+\.\s+", # Numbered lists
154 r"^\s*[a-zA-Z]\.\s+", # Lettered lists
155 r"^\s*[ivxlcdm]+\.\s+", # Roman numerals
156 ]
158 for pattern in list_patterns:
159 if re.search(pattern, content, re.MULTILINE):
160 return True
162 return False
164 def _has_numbered_sections(self, content: str) -> bool:
165 """Check if content has numbered sections.
167 Args:
168 content: The content to check
170 Returns:
171 True if content has numbered sections
172 """
173 # Look for patterns like "1. Introduction", "Section 1", "Chapter 1", etc.
174 section_patterns = [
175 r"^\s*\d+\.\s+[A-Z]", # "1. Section Title"
176 r"^\s*Section\s+\d+", # "Section 1"
177 r"^\s*Chapter\s+\d+", # "Chapter 1"
178 r"^\s*Part\s+\d+", # "Part 1"
179 ]
181 for pattern in section_patterns:
182 if re.search(pattern, content, re.MULTILINE | re.IGNORECASE):
183 return True
185 return False
187 def _analyze_formatting(self, content: str) -> dict[str, bool]:
188 """Analyze formatting indicators in the text.
190 Args:
191 content: The content to analyze
193 Returns:
194 Dictionary of formatting indicators
195 """
196 return {
197 "has_bold_text": bool(re.search(r"\*\*.*?\*\*|__.*?__", content)),
198 "has_italic_text": bool(re.search(r"\*.*?\*|_.*?_", content)),
199 "has_quotes": bool(re.search(r'["""].*?["""]', content)),
200 "has_parenthetical": bool(re.search(r"\(.*?\)", content)),
201 "has_brackets": bool(re.search(r"\[.*?\]", content)),
202 "has_caps_words": bool(re.search(r"\b[A-Z]{2,}\b", content)),
203 "has_email_addresses": bool(
204 re.search(
205 r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", content
206 )
207 ),
208 "has_urls": bool(re.search(r"https?://[^\s]+", content)),
209 }
211 def _calculate_content_density(self, content: str) -> float:
212 """Calculate content density (ratio of content to whitespace).
214 Args:
215 content: The content to analyze
217 Returns:
218 Content density ratio (0.0 to 1.0)
219 """
220 if not content:
221 return 0.0
223 non_whitespace_chars = len(re.sub(r"\s", "", content))
224 total_chars = len(content)
226 return non_whitespace_chars / total_chars if total_chars > 0 else 0.0
228 def _is_new_paragraph_start(self, line: str, current_paragraph: list[str]) -> bool:
229 """Determine if a line starts a new paragraph.
231 Args:
232 line: The line to check
233 current_paragraph: Current paragraph content
235 Returns:
236 True if line should start a new paragraph
237 """
238 if not current_paragraph:
239 return True
241 # Check for list items
242 if self._is_list_item(line) or self._is_numbered_item(line):
243 return True
245 # Check for section headers
246 if self._looks_like_header(line):
247 return True
249 # Check for significant indentation change
250 prev_line = current_paragraph[-1] if current_paragraph else ""
251 if self._has_significant_indentation_change(prev_line, line):
252 return True
254 return False
256 def _has_formatting_markers(self, text: str) -> bool:
257 """Check if text has formatting markers.
259 Args:
260 text: The text to check
262 Returns:
263 True if text has formatting markers
264 """
265 formatting_patterns = [
266 r"\*\*.*?\*\*", # Bold
267 r"__.*?__", # Bold alternative
268 r"\*.*?\*", # Italic
269 r"_.*?_", # Italic alternative
270 r"`.*?`", # Code
271 ]
273 for pattern in formatting_patterns:
274 if re.search(pattern, text):
275 return True
277 return False
279 def _is_list_item(self, text: str) -> bool:
280 """Check if text is a list item.
282 Args:
283 text: The text to check
285 Returns:
286 True if text is a list item
287 """
288 return bool(re.match(r"^\s*[-*+]\s+", text))
290 def _is_numbered_item(self, text: str) -> bool:
291 """Check if text is a numbered item.
293 Args:
294 text: The text to check
296 Returns:
297 True if text is a numbered item
298 """
299 return bool(re.match(r"^\s*\d+\.\s+", text))
301 def _classify_content_type(self, text: str) -> str:
302 """Classify the type of content.
304 Args:
305 text: The text to classify
307 Returns:
308 Content type classification
309 """
310 if self._is_list_item(text):
311 return "list_item"
312 elif self._is_numbered_item(text):
313 return "numbered_item"
314 elif self._looks_like_header(text):
315 return "header"
316 elif len(text.split()) < 5:
317 return "fragment"
318 elif "." not in text:
319 return "title_or_label"
320 else:
321 return "paragraph"
323 def _looks_like_header(self, text: str) -> bool:
324 """Check if text looks like a header or title.
326 Args:
327 text: The text to check
329 Returns:
330 True if text looks like a header
331 """
332 # Headers typically:
333 # - Are short
334 # - Don't end with punctuation
335 # - May be in title case
336 # - May have numbers
338 if len(text) > 100: # Too long for a header
339 return False
341 if text.endswith((".", "!", "?")): # Headers usually don't end with punctuation
342 return False
344 # Check for title case or all caps
345 words = text.split()
346 if len(words) > 1:
347 title_case_words = sum(1 for word in words if word[0].isupper())
348 if title_case_words / len(words) > 0.5:
349 return True
351 # Check for section numbering
352 if re.match(r"^\d+\.?\s+", text):
353 return True
355 return False
357 def _has_significant_indentation_change(
358 self, prev_line: str, current_line: str
359 ) -> bool:
360 """Check for significant indentation change between lines.
362 Args:
363 prev_line: Previous line
364 current_line: Current line
366 Returns:
367 True if there's a significant indentation change
368 """
369 if not prev_line:
370 return False
372 prev_indent = len(prev_line) - len(prev_line.lstrip())
373 current_indent = len(current_line) - len(current_line.lstrip())
375 # Significant change is more than 4 spaces
376 return abs(current_indent - prev_indent) > 4