Coverage for src / qdrant_loader / core / chunking / strategy / default / text_chunk_processor.py: 99%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-10 09:40 +0000

1"""Text-specific chunk processor for document creation and management.""" 

2 

3from typing import Any 

4 

5from qdrant_loader.config import Settings 

6from qdrant_loader.core.chunking.strategy.base.chunk_processor import BaseChunkProcessor 

7from qdrant_loader.core.document import Document 

8 

9 

10class TextChunkProcessor(BaseChunkProcessor): 

11 """Chunk processor for text documents with enhanced text-specific processing.""" 

12 

13 def __init__(self, settings: Settings): 

14 super().__init__(settings) 

15 # Get strategy-specific configuration 

16 self.default_config = settings.global_config.chunking.strategies.default 

17 self._semantic_analysis_enabled = ( 

18 settings.global_config.chunking.enable_semantic_analysis 

19 ) 

20 

21 def create_chunk_document( 

22 self, 

23 original_doc: Document, 

24 chunk_content: str, 

25 chunk_index: int, 

26 total_chunks: int, 

27 chunk_metadata: dict[str, Any], 

28 skip_nlp: bool = False, 

29 ) -> Document: 

30 """Create a document for a text chunk with enhanced metadata.""" 

31 

32 # Generate unique chunk ID 

33 chunk_id = self.generate_chunk_id(original_doc, chunk_index) 

34 

35 # Create base metadata 

36 base_metadata = self.create_base_chunk_metadata( 

37 original_doc, chunk_index, total_chunks, chunk_metadata 

38 ) 

39 

40 # Add text-specific metadata 

41 text_metadata = self._create_text_specific_metadata(chunk_content, original_doc) 

42 base_metadata.update(text_metadata) 

43 

44 # Always set NLP-state metadata so consumers can rely on these keys 

45 if not self._semantic_analysis_enabled: 

46 base_metadata.update( 

47 { 

48 "entities": [], 

49 "pos_tags": [], 

50 "nlp_skipped": True, 

51 "skip_reason": "semantic_analysis_disabled", 

52 } 

53 ) 

54 else: 

55 # Initialize with defaults when semantic analysis is enabled 

56 # These will be updated if actual NLP processing occurs 

57 base_metadata.update( 

58 { 

59 "entities": [], 

60 "pos_tags": [], 

61 "nlp_skipped": False, 

62 "skip_reason": None, 

63 } 

64 ) 

65 

66 # Create chunk document 

67 chunk_doc = Document( 

68 id=chunk_id, 

69 content=chunk_content, 

70 metadata=base_metadata, 

71 source=original_doc.source, 

72 source_type=original_doc.source_type, 

73 url=original_doc.url, 

74 content_type=original_doc.content_type, 

75 title=f"{original_doc.title} - Chunk {chunk_index + 1}", 

76 ) 

77 

78 return chunk_doc 

79 

80 def _create_text_specific_metadata( 

81 self, content: str, original_doc: Document 

82 ) -> dict[str, Any]: 

83 """Create text-specific metadata for the chunk.""" 

84 metadata = { 

85 "chunk_strategy": "text", 

86 "processing_method": "intelligent_splitting", 

87 "content_analysis": self._analyze_chunk_content(content), 

88 "quality_metrics": self._calculate_quality_metrics(content), 

89 "text_characteristics": self._extract_text_characteristics(content), 

90 } 

91 

92 # Add semantic analysis indicators if enabled 

93 if self._semantic_analysis_enabled: 

94 metadata["semantic_analysis_enabled"] = True 

95 metadata["semantic_indicators"] = self._extract_semantic_indicators(content) 

96 

97 # Add entity extraction indicators if enabled 

98 if self.default_config.enable_entity_extraction: 

99 metadata["entity_extraction_enabled"] = True 

100 metadata["entity_hints"] = self._extract_entity_hints(content) 

101 

102 return metadata 

103 

104 def _analyze_chunk_content(self, content: str) -> dict[str, Any]: 

105 """Analyze the content structure and characteristics of the chunk.""" 

106 words = content.split() 

107 sentences = content.split(".") 

108 paragraphs = content.split("\n\n") 

109 

110 return { 

111 "word_count": len(words), 

112 "sentence_count": len([s for s in sentences if s.strip()]), 

113 "paragraph_count": len([p for p in paragraphs if p.strip()]), 

114 "avg_words_per_sentence": len(words) 

115 / max(1, len([s for s in sentences if s.strip()])), 

116 "character_count": len(content), 

117 "content_density": ( 

118 len(content.replace(" ", "")) / len(content) if content else 0 

119 ), 

120 } 

121 

122 def _calculate_quality_metrics(self, content: str) -> dict[str, Any]: 

123 """Calculate quality metrics for the chunk.""" 

124 words = content.split() 

125 

126 # Content completeness (does it end with proper punctuation?) 

127 ends_properly = content.strip().endswith((".", "!", "?", ":", ";")) 

128 

129 # Content coherence (rough estimate based on word repetition) 

130 unique_words = len({word.lower() for word in words}) 

131 word_diversity = unique_words / len(words) if words else 0 

132 

133 # Content readability (simple metric based on sentence structure) 

134 sentences = [s.strip() for s in content.split(".") if s.strip()] 

135 avg_sentence_length = ( 

136 sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0 

137 ) 

138 

139 return { 

140 "ends_properly": ends_properly, 

141 "word_diversity": round(word_diversity, 3), 

142 "avg_sentence_length": round(avg_sentence_length, 1), 

143 "readability_score": self._estimate_readability( 

144 avg_sentence_length, word_diversity 

145 ), 

146 "chunk_completeness": self._assess_chunk_completeness(content), 

147 } 

148 

149 def _extract_text_characteristics(self, content: str) -> dict[str, Any]: 

150 """Extract various text characteristics from the chunk.""" 

151 import re 

152 

153 return { 

154 "has_numbers": bool(re.search(r"\d+", content)), 

155 "has_dates": bool(re.search(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b", content)), 

156 "has_urls": bool(re.search(r"https?://\S+", content)), 

157 "has_email": bool( 

158 re.search( 

159 r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", content 

160 ) 

161 ), 

162 "has_phone": bool(re.search(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", content)), 

163 "has_currency": bool(re.search(r"\$\d+(?:\.\d{2})?", content)), 

164 "has_percentages": bool(re.search(r"\b\d+(?:\.\d+)?%\b", content)), 

165 "has_quotes": '"' in content or "'" in content, 

166 "has_parentheses": "(" in content and ")" in content, 

167 "has_formatting": bool(re.search(r"[*_`#]", content)), 

168 "language_indicators": self._detect_language_indicators(content), 

169 } 

170 

171 def _extract_semantic_indicators(self, content: str) -> dict[str, Any]: 

172 """Extract indicators for semantic analysis.""" 

173 import re 

174 

175 # Topic indicators (simple keyword-based) 

176 business_keywords = [ 

177 "company", 

178 "business", 

179 "market", 

180 "revenue", 

181 "profit", 

182 "customer", 

183 ] 

184 tech_keywords = [ 

185 "software", 

186 "technology", 

187 "system", 

188 "data", 

189 "algorithm", 

190 "code", 

191 ] 

192 academic_keywords = [ 

193 "research", 

194 "study", 

195 "analysis", 

196 "theory", 

197 "methodology", 

198 "conclusion", 

199 ] 

200 

201 content_lower = content.lower() 

202 

203 return { 

204 "topic_indicators": { 

205 "business": sum(1 for kw in business_keywords if kw in content_lower), 

206 "technology": sum(1 for kw in tech_keywords if kw in content_lower), 

207 "academic": sum(1 for kw in academic_keywords if kw in content_lower), 

208 }, 

209 "discourse_markers": { 

210 "enumeration": bool( 

211 re.search(r"\b(first|second|third|finally|lastly)\b", content_lower) 

212 ), 

213 "causation": bool( 

214 re.search( 

215 r"\b(because|therefore|thus|consequently|as a result)\b", 

216 content_lower, 

217 ) 

218 ), 

219 "contrast": bool( 

220 re.search( 

221 r"\b(however|although|despite|nevertheless|on the other hand)\b", 

222 content_lower, 

223 ) 

224 ), 

225 "comparison": bool( 

226 re.search( 

227 r"\b(similarly|likewise|compared to|in contrast)\b", 

228 content_lower, 

229 ) 

230 ), 

231 }, 

232 "complexity_indicators": { 

233 "has_subordinate_clauses": bool( 

234 re.search(r"\b(which|that|who|whom|whose)\b", content_lower) 

235 ), 

236 "has_conditionals": bool( 

237 re.search(r"\b(if|unless|provided|assuming)\b", content_lower) 

238 ), 

239 "has_temporal_references": bool( 

240 re.search(r"\b(when|while|before|after|during)\b", content_lower) 

241 ), 

242 }, 

243 } 

244 

245 def _extract_entity_hints(self, content: str) -> dict[str, Any]: 

246 """Extract hints for entity extraction.""" 

247 import re 

248 

249 # Potential entity patterns 

250 proper_nouns = re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", content) 

251 acronyms = re.findall(r"\b[A-Z]{2,}\b", content) 

252 

253 return { 

254 "proper_noun_count": len(proper_nouns), 

255 "acronym_count": len(acronyms), 

256 "capitalized_words": len(re.findall(r"\b[A-Z][a-z]+\b", content)), 

257 "potential_names": len( 

258 [noun for noun in proper_nouns if len(noun.split()) <= 3] 

259 ), 

260 "potential_organizations": len( 

261 [noun for noun in proper_nouns if len(noun.split()) > 1] 

262 ), 

263 "has_titles": bool( 

264 re.search(r"\b(Dr|Mr|Mrs|Ms|Prof|CEO|CTO|VP)\b\.?\s+[A-Z]", content) 

265 ), 

266 "has_locations": bool( 

267 re.search( 

268 r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:City|State|Country|Street|Ave|Road))\b", 

269 content, 

270 ) 

271 ), 

272 } 

273 

274 def _estimate_readability( 

275 self, avg_sentence_length: float, word_diversity: float 

276 ) -> str: 

277 """Estimate readability level based on simple metrics.""" 

278 if avg_sentence_length < 10 and word_diversity > 0.7: 

279 return "easy" 

280 elif avg_sentence_length < 20 and word_diversity > 0.5: 

281 return "moderate" 

282 elif avg_sentence_length < 30: 

283 return "difficult" 

284 else: 

285 return "very_difficult" 

286 

287 def _assess_chunk_completeness(self, content: str) -> float: 

288 """Assess how complete/coherent the chunk appears to be.""" 

289 score = 0.0 

290 

291 # Check for proper sentence endings 

292 if content.strip().endswith((".", "!", "?")): 

293 score += 0.3 

294 

295 # Check for proper sentence beginnings 

296 if content.strip() and content.strip()[0].isupper(): 

297 score += 0.2 

298 

299 # Check for balanced punctuation 

300 open_parens = content.count("(") 

301 close_parens = content.count(")") 

302 open_quotes = content.count('"') + content.count("'") 

303 

304 if open_parens == close_parens: 

305 score += 0.2 

306 if open_quotes % 2 == 0: # Even number of quotes 

307 score += 0.1 

308 

309 # Check for paragraph structure 

310 if "\n\n" in content or len(content.split(".")) > 1: 

311 score += 0.2 

312 

313 return min(1.0, score) 

314 

315 def _detect_language_indicators(self, content: str) -> dict[str, Any]: 

316 """Detect language indicators in the content.""" 

317 content_lower = content.lower() 

318 

319 # Common English function words 

320 english_indicators = [ 

321 "the", 

322 "and", 

323 "or", 

324 "but", 

325 "in", 

326 "on", 

327 "at", 

328 "to", 

329 "for", 

330 "of", 

331 "with", 

332 ] 

333 english_count = sum( 

334 1 for word in english_indicators if f" {word} " in f" {content_lower} " 

335 ) 

336 

337 return { 

338 "english_function_words": english_count, 

339 "likely_english": english_count >= 3, 

340 "punctuation_style": "american" if ". " in content else "other", 

341 }