Coverage for src/qdrant_loader/core/chunking/strategy/default/text_chunk_processor.py: 99%

81 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""Text-specific chunk processor for document creation and management.""" 

2 

3from typing import Any 

4 

5from qdrant_loader.config import Settings 

6from qdrant_loader.core.chunking.strategy.base.chunk_processor import BaseChunkProcessor 

7from qdrant_loader.core.document import Document 

8 

9 

10class TextChunkProcessor(BaseChunkProcessor): 

11 """Chunk processor for text documents with enhanced text-specific processing.""" 

12 

13 def __init__(self, settings: Settings): 

14 super().__init__(settings) 

15 # Get strategy-specific configuration 

16 self.default_config = settings.global_config.chunking.strategies.default 

17 

18 def create_chunk_document( 

19 self, 

20 original_doc: Document, 

21 chunk_content: str, 

22 chunk_index: int, 

23 total_chunks: int, 

24 chunk_metadata: dict[str, Any], 

25 skip_nlp: bool = False, 

26 ) -> Document: 

27 """Create a document for a text chunk with enhanced metadata.""" 

28 

29 # Generate unique chunk ID 

30 chunk_id = self.generate_chunk_id(original_doc, chunk_index) 

31 

32 # Create base metadata 

33 base_metadata = self.create_base_chunk_metadata( 

34 original_doc, chunk_index, total_chunks, chunk_metadata 

35 ) 

36 

37 # Add text-specific metadata 

38 text_metadata = self._create_text_specific_metadata(chunk_content, original_doc) 

39 base_metadata.update(text_metadata) 

40 

41 # Create chunk document 

42 chunk_doc = Document( 

43 id=chunk_id, 

44 content=chunk_content, 

45 metadata=base_metadata, 

46 source=original_doc.source, 

47 source_type=original_doc.source_type, 

48 url=original_doc.url, 

49 content_type=original_doc.content_type, 

50 title=f"{original_doc.title} - Chunk {chunk_index + 1}", 

51 ) 

52 

53 return chunk_doc 

54 

55 def _create_text_specific_metadata( 

56 self, content: str, original_doc: Document 

57 ) -> dict[str, Any]: 

58 """Create text-specific metadata for the chunk.""" 

59 metadata = { 

60 "chunk_strategy": "text", 

61 "processing_method": "intelligent_splitting", 

62 "content_analysis": self._analyze_chunk_content(content), 

63 "quality_metrics": self._calculate_quality_metrics(content), 

64 "text_characteristics": self._extract_text_characteristics(content), 

65 } 

66 

67 # Add semantic analysis indicators if enabled 

68 if self.default_config.enable_semantic_analysis: 

69 metadata["semantic_analysis_enabled"] = True 

70 metadata["semantic_indicators"] = self._extract_semantic_indicators(content) 

71 

72 # Add entity extraction indicators if enabled 

73 if self.default_config.enable_entity_extraction: 

74 metadata["entity_extraction_enabled"] = True 

75 metadata["entity_hints"] = self._extract_entity_hints(content) 

76 

77 return metadata 

78 

79 def _analyze_chunk_content(self, content: str) -> dict[str, Any]: 

80 """Analyze the content structure and characteristics of the chunk.""" 

81 words = content.split() 

82 sentences = content.split(".") 

83 paragraphs = content.split("\n\n") 

84 

85 return { 

86 "word_count": len(words), 

87 "sentence_count": len([s for s in sentences if s.strip()]), 

88 "paragraph_count": len([p for p in paragraphs if p.strip()]), 

89 "avg_words_per_sentence": len(words) 

90 / max(1, len([s for s in sentences if s.strip()])), 

91 "character_count": len(content), 

92 "content_density": ( 

93 len(content.replace(" ", "")) / len(content) if content else 0 

94 ), 

95 } 

96 

97 def _calculate_quality_metrics(self, content: str) -> dict[str, Any]: 

98 """Calculate quality metrics for the chunk.""" 

99 words = content.split() 

100 

101 # Content completeness (does it end with proper punctuation?) 

102 ends_properly = content.strip().endswith((".", "!", "?", ":", ";")) 

103 

104 # Content coherence (rough estimate based on word repetition) 

105 unique_words = len({word.lower() for word in words}) 

106 word_diversity = unique_words / len(words) if words else 0 

107 

108 # Content readability (simple metric based on sentence structure) 

109 sentences = [s.strip() for s in content.split(".") if s.strip()] 

110 avg_sentence_length = ( 

111 sum(len(s.split()) for s in sentences) / len(sentences) if sentences else 0 

112 ) 

113 

114 return { 

115 "ends_properly": ends_properly, 

116 "word_diversity": round(word_diversity, 3), 

117 "avg_sentence_length": round(avg_sentence_length, 1), 

118 "readability_score": self._estimate_readability( 

119 avg_sentence_length, word_diversity 

120 ), 

121 "chunk_completeness": self._assess_chunk_completeness(content), 

122 } 

123 

124 def _extract_text_characteristics(self, content: str) -> dict[str, Any]: 

125 """Extract various text characteristics from the chunk.""" 

126 import re 

127 

128 return { 

129 "has_numbers": bool(re.search(r"\d+", content)), 

130 "has_dates": bool(re.search(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b", content)), 

131 "has_urls": bool(re.search(r"https?://\S+", content)), 

132 "has_email": bool( 

133 re.search( 

134 r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", content 

135 ) 

136 ), 

137 "has_phone": bool(re.search(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", content)), 

138 "has_currency": bool(re.search(r"\$\d+(?:\.\d{2})?", content)), 

139 "has_percentages": bool(re.search(r"\b\d+(?:\.\d+)?%\b", content)), 

140 "has_quotes": '"' in content or "'" in content, 

141 "has_parentheses": "(" in content and ")" in content, 

142 "has_formatting": bool(re.search(r"[*_`#]", content)), 

143 "language_indicators": self._detect_language_indicators(content), 

144 } 

145 

146 def _extract_semantic_indicators(self, content: str) -> dict[str, Any]: 

147 """Extract indicators for semantic analysis.""" 

148 import re 

149 

150 # Topic indicators (simple keyword-based) 

151 business_keywords = [ 

152 "company", 

153 "business", 

154 "market", 

155 "revenue", 

156 "profit", 

157 "customer", 

158 ] 

159 tech_keywords = [ 

160 "software", 

161 "technology", 

162 "system", 

163 "data", 

164 "algorithm", 

165 "code", 

166 ] 

167 academic_keywords = [ 

168 "research", 

169 "study", 

170 "analysis", 

171 "theory", 

172 "methodology", 

173 "conclusion", 

174 ] 

175 

176 content_lower = content.lower() 

177 

178 return { 

179 "topic_indicators": { 

180 "business": sum(1 for kw in business_keywords if kw in content_lower), 

181 "technology": sum(1 for kw in tech_keywords if kw in content_lower), 

182 "academic": sum(1 for kw in academic_keywords if kw in content_lower), 

183 }, 

184 "discourse_markers": { 

185 "enumeration": bool( 

186 re.search(r"\b(first|second|third|finally|lastly)\b", content_lower) 

187 ), 

188 "causation": bool( 

189 re.search( 

190 r"\b(because|therefore|thus|consequently|as a result)\b", 

191 content_lower, 

192 ) 

193 ), 

194 "contrast": bool( 

195 re.search( 

196 r"\b(however|although|despite|nevertheless|on the other hand)\b", 

197 content_lower, 

198 ) 

199 ), 

200 "comparison": bool( 

201 re.search( 

202 r"\b(similarly|likewise|compared to|in contrast)\b", 

203 content_lower, 

204 ) 

205 ), 

206 }, 

207 "complexity_indicators": { 

208 "has_subordinate_clauses": bool( 

209 re.search(r"\b(which|that|who|whom|whose)\b", content_lower) 

210 ), 

211 "has_conditionals": bool( 

212 re.search(r"\b(if|unless|provided|assuming)\b", content_lower) 

213 ), 

214 "has_temporal_references": bool( 

215 re.search(r"\b(when|while|before|after|during)\b", content_lower) 

216 ), 

217 }, 

218 } 

219 

220 def _extract_entity_hints(self, content: str) -> dict[str, Any]: 

221 """Extract hints for entity extraction.""" 

222 import re 

223 

224 # Potential entity patterns 

225 proper_nouns = re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", content) 

226 acronyms = re.findall(r"\b[A-Z]{2,}\b", content) 

227 

228 return { 

229 "proper_noun_count": len(proper_nouns), 

230 "acronym_count": len(acronyms), 

231 "capitalized_words": len(re.findall(r"\b[A-Z][a-z]+\b", content)), 

232 "potential_names": len( 

233 [noun for noun in proper_nouns if len(noun.split()) <= 3] 

234 ), 

235 "potential_organizations": len( 

236 [noun for noun in proper_nouns if len(noun.split()) > 1] 

237 ), 

238 "has_titles": bool( 

239 re.search(r"\b(Dr|Mr|Mrs|Ms|Prof|CEO|CTO|VP)\b\.?\s+[A-Z]", content) 

240 ), 

241 "has_locations": bool( 

242 re.search( 

243 r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*(?:\s+(?:City|State|Country|Street|Ave|Road))\b", 

244 content, 

245 ) 

246 ), 

247 } 

248 

249 def _estimate_readability( 

250 self, avg_sentence_length: float, word_diversity: float 

251 ) -> str: 

252 """Estimate readability level based on simple metrics.""" 

253 if avg_sentence_length < 10 and word_diversity > 0.7: 

254 return "easy" 

255 elif avg_sentence_length < 20 and word_diversity > 0.5: 

256 return "moderate" 

257 elif avg_sentence_length < 30: 

258 return "difficult" 

259 else: 

260 return "very_difficult" 

261 

262 def _assess_chunk_completeness(self, content: str) -> float: 

263 """Assess how complete/coherent the chunk appears to be.""" 

264 score = 0.0 

265 

266 # Check for proper sentence endings 

267 if content.strip().endswith((".", "!", "?")): 

268 score += 0.3 

269 

270 # Check for proper sentence beginnings 

271 if content.strip() and content.strip()[0].isupper(): 

272 score += 0.2 

273 

274 # Check for balanced punctuation 

275 open_parens = content.count("(") 

276 close_parens = content.count(")") 

277 open_quotes = content.count('"') + content.count("'") 

278 

279 if open_parens == close_parens: 

280 score += 0.2 

281 if open_quotes % 2 == 0: # Even number of quotes 

282 score += 0.1 

283 

284 # Check for paragraph structure 

285 if "\n\n" in content or len(content.split(".")) > 1: 

286 score += 0.2 

287 

288 return min(1.0, score) 

289 

290 def _detect_language_indicators(self, content: str) -> dict[str, Any]: 

291 """Detect language indicators in the content.""" 

292 content_lower = content.lower() 

293 

294 # Common English function words 

295 english_indicators = [ 

296 "the", 

297 "and", 

298 "or", 

299 "but", 

300 "in", 

301 "on", 

302 "at", 

303 "to", 

304 "for", 

305 "of", 

306 "with", 

307 ] 

308 english_count = sum( 

309 1 for word in english_indicators if f" {word} " in f" {content_lower} " 

310 ) 

311 

312 return { 

313 "english_function_words": english_count, 

314 "likely_english": english_count >= 3, 

315 "punctuation_style": "american" if ". " in content else "other", 

316 }