Coverage for src/qdrant_loader/core/chunking/strategy/default/text_metadata_extractor.py: 100%
54 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""Text-specific metadata extractor for enhanced text analysis."""
3import re
4from typing import Any
6from qdrant_loader.core.chunking.strategy.base.metadata_extractor import (
7 BaseMetadataExtractor,
8)
9from qdrant_loader.core.document import Document
12class TextMetadataExtractor(BaseMetadataExtractor):
13 """Metadata extractor for text documents with enhanced text analysis."""
15 def extract_hierarchical_metadata(
16 self, content: str, chunk_metadata: dict[str, Any], document: Document
17 ) -> dict[str, Any]:
18 """Extract comprehensive metadata specific to text chunks."""
19 metadata = chunk_metadata.copy()
21 # Add text-specific metadata
22 words = content.split()
23 sentences = self._split_sentences(content)
24 paragraphs = content.split("\n\n")
26 metadata.update(
27 {
28 "word_count": len(words),
29 "character_count": len(content),
30 "paragraph_count": len([p for p in paragraphs if p.strip()]),
31 "sentence_count": len(sentences),
32 "avg_word_length": self._calculate_avg_word_length(content),
33 "reading_time_minutes": self._estimate_reading_time(content),
34 "content_type": "text",
35 "language": self._detect_language(content),
36 "text_density": self._calculate_text_density(content),
37 "formatting_indicators": self._analyze_formatting(content),
38 }
39 )
41 return metadata
43 def extract_entities(self, text: str) -> list[str]:
44 """Extract named entities from text using basic pattern matching."""
45 entities = []
47 # Extract potential entities (capitalized words/phrases)
48 capitalized_words = re.findall(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b", text)
50 # Filter out common false positives and single letters
51 stop_words = {
52 "The",
53 "This",
54 "That",
55 "These",
56 "Those",
57 "When",
58 "Where",
59 "Why",
60 "How",
61 "What",
62 "He",
63 "She",
64 "It",
65 "We",
66 "They",
67 "I",
68 "You",
69 "Dr",
70 "Mr",
71 "Ms",
72 "Mrs",
73 }
74 entities = [
75 word
76 for word in capitalized_words
77 if word not in stop_words and len(word) > 2
78 ]
80 # Remove duplicates while preserving order
81 seen = set()
82 unique_entities = []
83 for entity in entities:
84 if entity not in seen:
85 seen.add(entity)
86 unique_entities.append(entity)
88 return unique_entities[:10] # Limit to top 10 entities
90 def _split_sentences(self, content: str) -> list[str]:
91 """Split content into sentences."""
92 sentences = re.split(r"(?<=[.!?])\s+", content)
93 return [s.strip() for s in sentences if s.strip() and len(s.strip()) > 3]
95 def _calculate_avg_word_length(self, content: str) -> float:
96 """Calculate average word length."""
97 words = re.findall(r"\b\w+\b", content)
98 if not words:
99 return 0.0
100 return sum(len(word) for word in words) / len(words)
102 def _estimate_reading_time(self, content: str) -> float:
103 """Estimate reading time in minutes (assuming 200 words per minute)."""
104 word_count = len(content.split())
105 return word_count / 200
107 def _detect_language(self, content: str) -> str:
108 """Detect content language using basic heuristics."""
109 # Simple English detection based on common words
110 english_words = {
111 "the",
112 "and",
113 "or",
114 "but",
115 "in",
116 "on",
117 "at",
118 "to",
119 "for",
120 "of",
121 "with",
122 "by",
123 "is",
124 "are",
125 "was",
126 "were",
127 "be",
128 "been",
129 "have",
130 "has",
131 "had",
132 "do",
133 "did",
134 "will",
135 "would",
136 "could",
137 "should",
138 "can",
139 "may",
140 "might",
141 "must",
142 "shall",
143 }
145 words = re.findall(r"\b\w+\b", content.lower())
146 if not words:
147 return "unknown"
149 english_count = sum(1 for word in words if word in english_words)
150 if len(words) >= 10 and english_count / len(words) > 0.10:
151 return "en"
153 return "unknown"
155 def _calculate_text_density(self, content: str) -> dict[str, float]:
156 """Calculate text density metrics."""
157 total_chars = len(content)
158 if total_chars == 0:
159 return {
160 "alphanumeric_ratio": 0.0,
161 "whitespace_ratio": 0.0,
162 "punctuation_ratio": 0.0,
163 }
165 alphanumeric_chars = len(re.findall(r"[a-zA-Z0-9]", content))
166 whitespace_chars = len(re.findall(r"\s", content))
167 punctuation_chars = len(re.findall(r"[^\w\s]", content))
169 return {
170 "alphanumeric_ratio": alphanumeric_chars / total_chars,
171 "whitespace_ratio": whitespace_chars / total_chars,
172 "punctuation_ratio": punctuation_chars / total_chars,
173 }
175 def _analyze_formatting(self, content: str) -> dict[str, bool]:
176 """Analyze text formatting indicators."""
177 return {
178 "has_bullet_points": bool(
179 re.search(r"^\s*[•\-\*]\s", content, re.MULTILINE)
180 ),
181 "has_numbered_lists": bool(
182 re.search(r"^\s*\d+\.\s", content, re.MULTILINE)
183 ),
184 "has_email_addresses": bool(
185 re.search(
186 r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", content
187 )
188 ),
189 "has_urls": bool(re.search(r"https?://\S+", content)),
190 "has_phone_numbers": bool(
191 re.search(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", content)
192 ),
193 "has_dates": bool(re.search(r"\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b", content)),
194 "has_currency": bool(
195 re.search(r"\$\d+(?:\.\d{2})?|\d+\s?(?:USD|EUR|GBP)", content)
196 ),
197 "has_percentages": bool(re.search(r"\d+(?:\.\d+)?%", content)),
198 }