Coverage for src/qdrant_loader/core/chunking/strategy/base/metadata_extractor.py: 96%
80 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-13 09:19 +0000
1"""Base class for metadata extraction from document chunks."""
3import re
4from abc import ABC, abstractmethod
5from typing import TYPE_CHECKING, Any
7if TYPE_CHECKING:
8 from qdrant_loader.core.document import Document
11class BaseMetadataExtractor(ABC):
12 """Base class for metadata extraction from document chunks.
14 This class defines the interface for extracting rich metadata from chunks
15 including hierarchical relationships, entities, cross-references, and
16 semantic information. Each strategy implements its own metadata extraction
17 logic based on the document type.
18 """
20 @abstractmethod
21 def extract_hierarchical_metadata(
22 self, content: str, chunk_metadata: dict[str, Any], document: "Document"
23 ) -> dict[str, Any]:
24 """Extract hierarchical metadata from chunk content.
26 This method should analyze the chunk content and enrich the existing
27 metadata with hierarchical information such as:
28 - Document structure relationships
29 - Section breadcrumbs and navigation
30 - Parent-child relationships
31 - Cross-references and links
33 Args:
34 content: The chunk content to analyze
35 chunk_metadata: Existing chunk metadata to enrich
36 document: The source document for context
38 Returns:
39 Enriched metadata dictionary with hierarchical information
41 Raises:
42 NotImplementedError: If the extractor doesn't implement this method
43 """
44 raise NotImplementedError(
45 "Metadata extractor must implement extract_hierarchical_metadata method"
46 )
48 @abstractmethod
49 def extract_entities(self, text: str) -> list[str]:
50 """Extract entities from text content.
52 This method should identify and extract relevant entities from the text
53 such as names, places, organizations, technical terms, etc. The specific
54 types of entities extracted depend on the document type and domain.
56 Args:
57 text: The text to extract entities from
59 Returns:
60 List of extracted entities
62 Raises:
63 NotImplementedError: If the extractor doesn't implement this method
64 """
65 raise NotImplementedError(
66 "Metadata extractor must implement extract_entities method"
67 )
69 def extract_cross_references(self, text: str) -> list[dict[str, Any]]:
70 """Extract cross-references from text content.
72 This is a default implementation that can be overridden by specific
73 extractors to provide better cross-reference extraction based on
74 document type (e.g., markdown links, code imports, etc.).
76 Args:
77 text: The text to extract cross-references from
79 Returns:
80 List of cross-reference dictionaries
81 """
82 # Basic implementation - look for common reference patterns
83 cross_refs = []
85 # Look for simple references like "see Section X", "Chapter Y", etc.
86 import re
88 # Pattern for section references
89 section_pattern = r"(?i)\b(?:see|refer to)\s+(?:section|chapter|part|appendix)\s+([A-Z0-9]+(?:\.[0-9]+)*)\b"
90 section_matches = re.finditer(section_pattern, text)
91 for match in section_matches:
92 cross_refs.append(
93 {
94 "type": "section_reference",
95 "reference": match.group(1),
96 "context": match.group(0),
97 "position": match.start(),
98 }
99 )
101 # Additional pattern for standalone section references
102 standalone_pattern = (
103 r"(?i)\b(section|chapter|part|appendix)\s+([A-Z0-9]+(?:\.[0-9]+)*)\b"
104 )
105 standalone_matches = re.finditer(standalone_pattern, text)
106 for match in standalone_matches:
107 cross_refs.append(
108 {
109 "type": "section_reference",
110 "reference": match.group(2),
111 "context": match.group(0),
112 "position": match.start(),
113 }
114 )
116 # Pattern for figure/table references
117 figure_pattern = r"(?i)\b(?:figure|fig|table|tbl)\s+([A-Z0-9]+(?:\.[0-9]+)*)\b"
118 figure_matches = re.finditer(figure_pattern, text)
119 for match in figure_matches:
120 cross_refs.append(
121 {
122 "type": "figure_reference",
123 "reference": match.group(1),
124 "context": match.group(0),
125 "position": match.start(),
126 }
127 )
129 return cross_refs
131 def analyze_content_type(self, content: str) -> dict[str, Any]:
132 """Analyze the type and characteristics of the content.
134 Args:
135 content: The content to analyze
137 Returns:
138 Dictionary containing content type analysis
139 """
140 content.lower()
142 # Basic content type indicators
143 analysis = {
144 "has_code": bool(
145 re.search(r"```|def |class |function|import |#include", content)
146 ),
147 "has_math": bool(re.search(r"\$.*\$|\\[a-zA-Z]+|∑|∫|∆", content)),
148 "has_lists": bool(
149 re.search(r"^\s*[-*+]\s|^\s*\d+\.\s", content, re.MULTILINE)
150 ),
151 "has_headers": bool(
152 re.search(r"^\s*#+\s|^={3,}|^-{3,}", content, re.MULTILINE)
153 ),
154 "has_links": bool(
155 re.search(r"https?://|www\.|[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", content)
156 ),
157 "has_tables": bool(re.search(r"\|.*\|.*\|", content)),
158 "has_quotes": bool(re.search(r"^>", content, re.MULTILINE)),
159 "primary_language": self._detect_primary_language(content),
160 }
162 # Calculate content complexity score
163 complexity_score = 0
164 if analysis["has_code"]:
165 complexity_score += 2
166 if analysis["has_math"]:
167 complexity_score += 2
168 if analysis["has_tables"]:
169 complexity_score += 1
170 if analysis["has_lists"]:
171 complexity_score += 1
172 if analysis["has_headers"]:
173 complexity_score += 1
175 analysis["complexity_score"] = complexity_score
176 analysis["content_category"] = self._categorize_content(analysis)
178 return analysis
180 def _detect_primary_language(self, content: str) -> str:
181 """Detect the primary language of the content.
183 This is a basic implementation that can be enhanced with
184 proper language detection libraries.
186 Args:
187 content: The content to analyze
189 Returns:
190 Detected language code (defaults to 'en')
191 """
192 # Basic language detection based on common words
193 # This could be enhanced with proper language detection libraries
195 content_lower = content.lower()
196 # Extract words using regex to remove punctuation
197 words = re.findall(r"\b[a-zA-Z]+\b", content_lower)
199 if not words:
200 return "unknown"
202 # Count common English words (need to be whole words)
203 english_words = ["the", "and", "of", "to", "a", "in", "is", "it", "you", "that"]
204 english_count = sum(1 for word in words if word in english_words)
206 # Need at least 10% English words to consider it English
207 if len(words) > 0 and english_count / len(words) >= 0.10:
208 return "en"
210 return "unknown"
212 def _categorize_content(self, analysis: dict[str, Any]) -> str:
213 """Categorize content based on analysis results.
215 Args:
216 analysis: Content analysis results
218 Returns:
219 Content category string
220 """
221 if analysis["has_code"]:
222 return "technical"
223 elif analysis["has_math"]:
224 return "academic"
225 elif analysis["has_tables"] and analysis["has_headers"]:
226 return "structured"
227 elif analysis["has_lists"]:
228 return "informational"
229 else:
230 return "narrative"
232 def extract_keyword_density(self, text: str, top_n: int = 10) -> dict[str, float]:
233 """Extract keyword density information from text.
235 Args:
236 text: The text to analyze
237 top_n: Number of top keywords to return
239 Returns:
240 Dictionary mapping keywords to their density scores
241 """
242 import re
243 from collections import Counter
245 # Clean and tokenize text
246 words = re.findall(r"\b[a-zA-Z]+\b", text.lower())
248 # Filter out common stop words
249 stop_words = {
250 "the",
251 "and",
252 "or",
253 "but",
254 "in",
255 "on",
256 "at",
257 "to",
258 "for",
259 "of",
260 "with",
261 "by",
262 "a",
263 "an",
264 "is",
265 "are",
266 "was",
267 "were",
268 "be",
269 "been",
270 "have",
271 "has",
272 "had",
273 "do",
274 "does",
275 "did",
276 "will",
277 "would",
278 "could",
279 "should",
280 "may",
281 "might",
282 "this",
283 "that",
284 "these",
285 "those",
286 "i",
287 "you",
288 "he",
289 "she",
290 "it",
291 "we",
292 "they",
293 "over", # Add 'over' to stop words
294 }
296 # Filter words
297 filtered_words = [
298 word for word in words if len(word) > 2 and word not in stop_words
299 ]
301 # Calculate frequencies
302 word_counts = Counter(filtered_words)
303 total_words = len(filtered_words)
305 # Calculate density and return top keywords
306 if total_words == 0:
307 return {}
309 keyword_density = {}
310 for word, count in word_counts.most_common(top_n):
311 keyword_density[word] = count / total_words
313 return keyword_density
315 def create_breadcrumb_metadata(
316 self, current_section: str, parent_sections: list[str]
317 ) -> dict[str, Any]:
318 """Create breadcrumb metadata for hierarchical navigation.
320 Args:
321 current_section: Current section title
322 parent_sections: List of parent section titles (from root to immediate parent)
324 Returns:
325 Dictionary containing breadcrumb metadata
326 """
327 breadcrumb_path = (
328 parent_sections + [current_section] if current_section else parent_sections
329 )
331 return {
332 "breadcrumb_path": breadcrumb_path,
333 "breadcrumb_string": " > ".join(breadcrumb_path),
334 "section_depth": len(breadcrumb_path),
335 "parent_section": parent_sections[-1] if parent_sections else None,
336 "root_section": breadcrumb_path[0] if breadcrumb_path else None,
337 }