Coverage for src/qdrant_loader/core/chunking/strategy/default/text_metadata_extractor.py: 100%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-13 09:19 +0000

1"""Text-specific metadata extractor for enhanced text analysis.""" 

2 

3import re 

4from typing import Any 

5 

6from qdrant_loader.core.chunking.strategy.base.metadata_extractor import ( 

7 BaseMetadataExtractor, 

8) 

9from qdrant_loader.core.document import Document 

10 

11 

12class TextMetadataExtractor(BaseMetadataExtractor): 

13 """Metadata extractor for text documents with enhanced text analysis.""" 

14 

15 def extract_hierarchical_metadata( 

16 self, content: str, chunk_metadata: dict[str, Any], document: Document 

17 ) -> dict[str, Any]: 

18 """Extract comprehensive metadata specific to text chunks.""" 

19 metadata = chunk_metadata.copy() 

20 

21 # Add text-specific metadata 

22 words = content.split() 

23 sentences = self._split_sentences(content) 

24 paragraphs = content.split("\n\n") 

25 

26 metadata.update( 

27 { 

28 "word_count": len(words), 

29 "character_count": len(content), 

30 "paragraph_count": len([p for p in paragraphs if p.strip()]), 

31 "sentence_count": len(sentences), 

32 "avg_word_length": self._calculate_avg_word_length(content), 

33 "reading_time_minutes": self._estimate_reading_time(content), 

34 "content_type": "text", 

35 "language": self._detect_language(content), 

36 "text_density": self._calculate_text_density(content), 

37 "formatting_indicators": self._analyze_formatting(content), 

38 } 

39 ) 

40 

41 return metadata 

42 

43 def extract_entities(self, text: str) -> list[str]: 

44 """Extract named entities from text using basic pattern matching.""" 

45 entities = [] 

46 

47 # Extract potential entities (capitalized words/phrases) 

48 capitalized_words = re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", text) 

49 

50 # Filter out common false positives and single letters 

51 stop_words = { 

52 "The", 

53 "This", 

54 "That", 

55 "These", 

56 "Those", 

57 "When", 

58 "Where", 

59 "Why", 

60 "How", 

61 "What", 

62 "He", 

63 "She", 

64 "It", 

65 "We", 

66 "They", 

67 "I", 

68 "You", 

69 "Dr", 

70 "Mr", 

71 "Ms", 

72 "Mrs", 

73 } 

74 entities = [ 

75 word 

76 for word in capitalized_words 

77 if word not in stop_words and len(word) > 2 

78 ] 

79 

80 # Remove duplicates while preserving order 

81 seen = set() 

82 unique_entities = [] 

83 for entity in entities: 

84 if entity not in seen: 

85 seen.add(entity) 

86 unique_entities.append(entity) 

87 

88 return unique_entities[:10] # Limit to top 10 entities 

89 

90 def _split_sentences(self, content: str) -> list[str]: 

91 """Split content into sentences.""" 

92 sentences = re.split(r"(?<=[.!?])\s+", content) 

93 return [s.strip() for s in sentences if s.strip() and len(s.strip()) > 3] 

94 

95 def _calculate_avg_word_length(self, content: str) -> float: 

96 """Calculate average word length.""" 

97 words = re.findall(r"\b\w+\b", content) 

98 if not words: 

99 return 0.0 

100 return sum(len(word) for word in words) / len(words) 

101 

102 def _estimate_reading_time(self, content: str) -> float: 

103 """Estimate reading time in minutes (assuming 200 words per minute).""" 

104 word_count = len(content.split()) 

105 return word_count / 200 

106 

107 def _detect_language(self, content: str) -> str: 

108 """Detect content language using basic heuristics.""" 

109 # Simple English detection based on common words 

110 english_words = { 

111 "the", 

112 "and", 

113 "or", 

114 "but", 

115 "in", 

116 "on", 

117 "at", 

118 "to", 

119 "for", 

120 "of", 

121 "with", 

122 "by", 

123 "is", 

124 "are", 

125 "was", 

126 "were", 

127 "be", 

128 "been", 

129 "have", 

130 "has", 

131 "had", 

132 "do", 

133 "did", 

134 "will", 

135 "would", 

136 "could", 

137 "should", 

138 "can", 

139 "may", 

140 "might", 

141 "must", 

142 "shall", 

143 } 

144 

145 words = re.findall(r"\b\w+\b", content.lower()) 

146 if not words: 

147 return "unknown" 

148 

149 english_count = sum(1 for word in words if word in english_words) 

150 if len(words) >= 10 and english_count / len(words) > 0.10: 

151 return "en" 

152 

153 return "unknown" 

154 

155 def _calculate_text_density(self, content: str) -> dict[str, float]: 

156 """Calculate text density metrics.""" 

157 total_chars = len(content) 

158 if total_chars == 0: 

159 return { 

160 "alphanumeric_ratio": 0.0, 

161 "whitespace_ratio": 0.0, 

162 "punctuation_ratio": 0.0, 

163 } 

164 

165 alphanumeric_chars = len(re.findall(r"[a-zA-Z0-9]", content)) 

166 whitespace_chars = len(re.findall(r"\s", content)) 

167 punctuation_chars = len(re.findall(r"[^\w\s]", content)) 

168 

169 return { 

170 "alphanumeric_ratio": alphanumeric_chars / total_chars, 

171 "whitespace_ratio": whitespace_chars / total_chars, 

172 "punctuation_ratio": punctuation_chars / total_chars, 

173 } 

174 

175 def _analyze_formatting(self, content: str) -> dict[str, bool]: 

176 """Analyze text formatting indicators.""" 

177 return { 

178 "has_bullet_points": bool( 

179 re.search(r"^\s*[•\-\*]\s", content, re.MULTILINE) 

180 ), 

181 "has_numbered_lists": bool( 

182 re.search(r"^\s*\d+\.\s", content, re.MULTILINE) 

183 ), 

184 "has_email_addresses": bool( 

185 re.search( 

186 r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", content 

187 ) 

188 ), 

189 "has_urls": bool(re.search(r"https?://\S+", content)), 

190 "has_phone_numbers": bool( 

191 re.search(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", content) 

192 ), 

193 "has_dates": bool(re.search(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b", content)), 

194 "has_currency": bool( 

195 re.search(r"\$\d+(?:\.\d{2})?|\d+\s?(?:USD|EUR|GBP)", content) 

196 ), 

197 "has_percentages": bool(re.search(r"\d+(?:\.\d+)?%", content)), 

198 }