Coverage for src/qdrant_loader/core/chunking/strategy/default/text_document_parser.py: 97%

117 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""Document parser for plain text documents.""" 

2 

3import re 

4from typing import Any 

5 

6from qdrant_loader.core.chunking.strategy.base.document_parser import BaseDocumentParser 

7 

8 

9class TextDocumentParser(BaseDocumentParser): 

10 """Parser for plain text documents. 

11 

12 This parser analyzes text structure including paragraphs, sentences, 

13 and basic content characteristics to support intelligent chunking. 

14 """ 

15 

16 def parse_document_structure(self, content: str) -> dict[str, Any]: 

17 """Analyze text structure (paragraphs, sentences, formatting). 

18 

19 Args: 

20 content: The text content to analyze 

21 

22 Returns: 

23 Dictionary containing structural analysis of the text 

24 """ 

25 paragraphs = self._split_paragraphs(content) 

26 sentences = self._split_sentences(content) 

27 

28 # Analyze content characteristics 

29 analysis = self.analyze_content_characteristics(content) 

30 

31 # Add text-specific structure information 

32 structure = { 

33 "structure_type": "plain_text", 

34 "paragraph_count": len(paragraphs), 

35 "sentence_count": len(sentences), 

36 "avg_paragraph_length": ( 

37 sum(len(p) for p in paragraphs) / len(paragraphs) if paragraphs else 0 

38 ), 

39 "avg_sentence_length": ( 

40 sum(len(s) for s in sentences) / len(sentences) if sentences else 0 

41 ), 

42 "has_list_items": self._has_list_items(content), 

43 "has_numbered_sections": self._has_numbered_sections(content), 

44 "formatting_indicators": self._analyze_formatting(content), 

45 "content_density": self._calculate_content_density(content), 

46 } 

47 

48 # Merge with base analysis 

49 structure.update(analysis) 

50 

51 return structure 

52 

53 def extract_section_metadata(self, section: Any) -> dict[str, Any]: 

54 """Extract metadata from a text section. 

55 

56 Args: 

57 section: The text section (string content) 

58 

59 Returns: 

60 Dictionary containing section metadata 

61 """ 

62 if not isinstance(section, str): 

63 section = str(section) 

64 

65 metadata = { 

66 "section_type": "text_paragraph", 

67 "length": len(section), 

68 "word_count": len(section.split()), 

69 "sentence_count": len(self._split_sentences(section)), 

70 "has_formatting": self._has_formatting_markers(section), 

71 "is_list_item": self._is_list_item(section), 

72 "is_numbered_item": self._is_numbered_item(section), 

73 "content_type": self._classify_content_type(section), 

74 } 

75 

76 return metadata 

77 

78 def _split_paragraphs(self, content: str) -> list[str]: 

79 """Split content into paragraphs. 

80 

81 Args: 

82 content: The content to split 

83 

84 Returns: 

85 List of paragraph strings 

86 """ 

87 # Split on double newlines, but also handle single newlines with significant whitespace 

88 paragraphs = [] 

89 

90 # First split on double newlines 

91 double_newline_splits = content.split("\n\n") 

92 

93 for split in double_newline_splits: 

94 # Further split on single newlines if they separate distinct content 

95 lines = split.split("\n") 

96 current_paragraph = [] 

97 

98 for line in lines: 

99 line = line.strip() 

100 if not line: 

101 # Empty line - finish current paragraph if it has content 

102 if current_paragraph: 

103 paragraphs.append("\n".join(current_paragraph)) 

104 current_paragraph = [] 

105 elif self._is_new_paragraph_start(line, current_paragraph): 

106 # This line starts a new paragraph 

107 if current_paragraph: 

108 paragraphs.append("\n".join(current_paragraph)) 

109 current_paragraph = [line] 

110 else: 

111 # This line continues the current paragraph 

112 current_paragraph.append(line) 

113 

114 # Add any remaining paragraph 

115 if current_paragraph: 

116 paragraphs.append("\n".join(current_paragraph)) 

117 

118 return [p.strip() for p in paragraphs if p.strip()] 

119 

120 def _split_sentences(self, content: str) -> list[str]: 

121 """Split content into sentences. 

122 

123 Args: 

124 content: The content to split 

125 

126 Returns: 

127 List of sentence strings 

128 """ 

129 # Use a more sophisticated sentence splitting pattern 

130 sentence_pattern = r"(?<=[.!?])\s+(?=[A-Z])" 

131 sentences = re.split(sentence_pattern, content) 

132 

133 # Clean up and filter sentences 

134 cleaned_sentences = [] 

135 for sentence in sentences: 

136 sentence = sentence.strip() 

137 if sentence and len(sentence) > 3: # Filter out very short fragments 

138 cleaned_sentences.append(sentence) 

139 

140 return cleaned_sentences 

141 

142 def _has_list_items(self, content: str) -> bool: 

143 """Check if content contains list items. 

144 

145 Args: 

146 content: The content to check 

147 

148 Returns: 

149 True if content contains list items 

150 """ 

151 list_patterns = [ 

152 r"^\s*[-*+]\s+", # Bullet points 

153 r"^\s*\d+\.\s+", # Numbered lists 

154 r"^\s*[a-zA-Z]\.\s+", # Lettered lists 

155 r"^\s*[ivxlcdm]+\.\s+", # Roman numerals 

156 ] 

157 

158 for pattern in list_patterns: 

159 if re.search(pattern, content, re.MULTILINE): 

160 return True 

161 

162 return False 

163 

164 def _has_numbered_sections(self, content: str) -> bool: 

165 """Check if content has numbered sections. 

166 

167 Args: 

168 content: The content to check 

169 

170 Returns: 

171 True if content has numbered sections 

172 """ 

173 # Look for patterns like "1. Introduction", "Section 1", "Chapter 1", etc. 

174 section_patterns = [ 

175 r"^\s*\d+\.\s+[A-Z]", # "1. Section Title" 

176 r"^\s*Section\s+\d+", # "Section 1" 

177 r"^\s*Chapter\s+\d+", # "Chapter 1" 

178 r"^\s*Part\s+\d+", # "Part 1" 

179 ] 

180 

181 for pattern in section_patterns: 

182 if re.search(pattern, content, re.MULTILINE | re.IGNORECASE): 

183 return True 

184 

185 return False 

186 

187 def _analyze_formatting(self, content: str) -> dict[str, bool]: 

188 """Analyze formatting indicators in the text. 

189 

190 Args: 

191 content: The content to analyze 

192 

193 Returns: 

194 Dictionary of formatting indicators 

195 """ 

196 return { 

197 "has_bold_text": bool(re.search(r"\*\*.*?\*\*|__.*?__", content)), 

198 "has_italic_text": bool(re.search(r"\*.*?\*|_.*?_", content)), 

199 "has_quotes": bool(re.search(r'["""].*?["""]', content)), 

200 "has_parenthetical": bool(re.search(r"\(.*?\)", content)), 

201 "has_brackets": bool(re.search(r"\[.*?\]", content)), 

202 "has_caps_words": bool(re.search(r"\b[A-Z]{2,}\b", content)), 

203 "has_email_addresses": bool( 

204 re.search( 

205 r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", content 

206 ) 

207 ), 

208 "has_urls": bool(re.search(r"https?://[^\s]+", content)), 

209 } 

210 

211 def _calculate_content_density(self, content: str) -> float: 

212 """Calculate content density (ratio of content to whitespace). 

213 

214 Args: 

215 content: The content to analyze 

216 

217 Returns: 

218 Content density ratio (0.0 to 1.0) 

219 """ 

220 if not content: 

221 return 0.0 

222 

223 non_whitespace_chars = len(re.sub(r"\s", "", content)) 

224 total_chars = len(content) 

225 

226 return non_whitespace_chars / total_chars if total_chars > 0 else 0.0 

227 

228 def _is_new_paragraph_start(self, line: str, current_paragraph: list[str]) -> bool: 

229 """Determine if a line starts a new paragraph. 

230 

231 Args: 

232 line: The line to check 

233 current_paragraph: Current paragraph content 

234 

235 Returns: 

236 True if line should start a new paragraph 

237 """ 

238 if not current_paragraph: 

239 return True 

240 

241 # Check for list items 

242 if self._is_list_item(line) or self._is_numbered_item(line): 

243 return True 

244 

245 # Check for section headers 

246 if self._looks_like_header(line): 

247 return True 

248 

249 # Check for significant indentation change 

250 prev_line = current_paragraph[-1] if current_paragraph else "" 

251 if self._has_significant_indentation_change(prev_line, line): 

252 return True 

253 

254 return False 

255 

256 def _has_formatting_markers(self, text: str) -> bool: 

257 """Check if text has formatting markers. 

258 

259 Args: 

260 text: The text to check 

261 

262 Returns: 

263 True if text has formatting markers 

264 """ 

265 formatting_patterns = [ 

266 r"\*\*.*?\*\*", # Bold 

267 r"__.*?__", # Bold alternative 

268 r"\*.*?\*", # Italic 

269 r"_.*?_", # Italic alternative 

270 r"`.*?`", # Code 

271 ] 

272 

273 for pattern in formatting_patterns: 

274 if re.search(pattern, text): 

275 return True 

276 

277 return False 

278 

279 def _is_list_item(self, text: str) -> bool: 

280 """Check if text is a list item. 

281 

282 Args: 

283 text: The text to check 

284 

285 Returns: 

286 True if text is a list item 

287 """ 

288 return bool(re.match(r"^\s*[-*+]\s+", text)) 

289 

290 def _is_numbered_item(self, text: str) -> bool: 

291 """Check if text is a numbered item. 

292 

293 Args: 

294 text: The text to check 

295 

296 Returns: 

297 True if text is a numbered item 

298 """ 

299 return bool(re.match(r"^\s*\d+\.\s+", text)) 

300 

301 def _classify_content_type(self, text: str) -> str: 

302 """Classify the type of content. 

303 

304 Args: 

305 text: The text to classify 

306 

307 Returns: 

308 Content type classification 

309 """ 

310 if self._is_list_item(text): 

311 return "list_item" 

312 elif self._is_numbered_item(text): 

313 return "numbered_item" 

314 elif self._looks_like_header(text): 

315 return "header" 

316 elif len(text.split()) < 5: 

317 return "fragment" 

318 elif "." not in text: 

319 return "title_or_label" 

320 else: 

321 return "paragraph" 

322 

323 def _looks_like_header(self, text: str) -> bool: 

324 """Check if text looks like a header or title. 

325 

326 Args: 

327 text: The text to check 

328 

329 Returns: 

330 True if text looks like a header 

331 """ 

332 # Headers typically: 

333 # - Are short 

334 # - Don't end with punctuation 

335 # - May be in title case 

336 # - May have numbers 

337 

338 if len(text) > 100: # Too long for a header 

339 return False 

340 

341 if text.endswith((".", "!", "?")): # Headers usually don't end with punctuation 

342 return False 

343 

344 # Check for title case or all caps 

345 words = text.split() 

346 if len(words) > 1: 

347 title_case_words = sum(1 for word in words if word[0].isupper()) 

348 if title_case_words / len(words) > 0.5: 

349 return True 

350 

351 # Check for section numbering 

352 if re.match(r"^\d+\.?\s+", text): 

353 return True 

354 

355 return False 

356 

357 def _has_significant_indentation_change( 

358 self, prev_line: str, current_line: str 

359 ) -> bool: 

360 """Check for significant indentation change between lines. 

361 

362 Args: 

363 prev_line: Previous line 

364 current_line: Current line 

365 

366 Returns: 

367 True if there's a significant indentation change 

368 """ 

369 if not prev_line: 

370 return False 

371 

372 prev_indent = len(prev_line) - len(prev_line.lstrip()) 

373 current_indent = len(current_line) - len(current_line.lstrip()) 

374 

375 # Significant change is more than 4 spaces 

376 return abs(current_indent - prev_indent) > 4