Coverage for src/qdrant_loader/core/chunking/strategy/markdown/document_parser.py: 99%
115 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-08 06:05 +0000
1"""Document parsing for markdown chunking strategy."""
3import re
4from dataclasses import dataclass, field
5from enum import Enum
6from typing import Any, Optional
8import structlog
10logger = structlog.get_logger(__name__)
13class SectionType(Enum):
14 """Types of sections in a markdown document."""
16 HEADER = "header"
17 CODE_BLOCK = "code_block"
18 LIST = "list"
19 TABLE = "table"
20 QUOTE = "quote"
21 PARAGRAPH = "paragraph"
24@dataclass
25class Section:
26 """Represents a section in a markdown document."""
28 content: str
29 level: int = 0
30 type: SectionType = SectionType.PARAGRAPH
31 parent: Optional["Section"] = None
32 children: list["Section"] = field(default_factory=list)
34 def add_child(self, child: "Section"):
35 """Add a child section."""
36 self.children.append(child)
37 child.parent = self
40class SectionIdentifier:
41 """Identifies section types based on content patterns."""
43 @staticmethod
44 def identify_section_type(content: str) -> SectionType:
45 """Identify the type of section based on its content.
47 Args:
48 content: The section content to analyze
50 Returns:
51 SectionType enum indicating the type of section
52 """
53 if not content.strip():
54 return SectionType.PARAGRAPH
56 # Headers: # followed by space
57 if re.match(r"^#{1,6}\s+", content):
58 return SectionType.HEADER
60 # Code blocks: ``` or ~~~ (fenced) or 4+ spaces/tab indentation
61 if (
62 re.search(r"^```", content, re.MULTILINE)
63 or re.search(r"^~~~", content, re.MULTILINE)
64 or re.match(r"^ ", content)
65 or re.match(r"^\t", content)
66 ):
67 return SectionType.CODE_BLOCK
69 # Lists: -, *, + followed by space, or numbered lists
70 if re.match(r"^[*+-]\s+", content) or re.match(r"^\d+[.)]\s+", content):
71 return SectionType.LIST
73 # Tables: lines with pipes or markdown table format
74 if (
75 re.search(r"^\|", content, re.MULTILINE)
76 or re.search(r"\|.*\|", content)
77 or re.search(r"^\s*[-:]+\s*\|\s*[-:]+", content, re.MULTILINE)
78 ):
79 return SectionType.TABLE
81 # Quotes: > followed by space (or end of line)
82 if re.match(r"^>\s", content):
83 return SectionType.QUOTE
85 return SectionType.PARAGRAPH
88class HierarchyBuilder:
89 """Builds hierarchical section relationships."""
91 @staticmethod
92 def build_section_breadcrumb(section: Section) -> str:
93 """Build a breadcrumb path of section titles to capture hierarchy.
95 Args:
96 section: The section to build breadcrumb for
98 Returns:
99 String representing the hierarchical path
100 """
101 breadcrumb_parts = []
102 current = section
104 # Walk up the parent chain to build the breadcrumb
105 while current.parent:
106 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", current.parent.content)
107 if header_match:
108 parent_title = header_match.group(2).strip()
109 breadcrumb_parts.insert(0, parent_title)
110 current = current.parent
112 # Add current section
113 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", section.content)
114 if header_match:
115 title = header_match.group(2).strip()
116 breadcrumb_parts.append(title)
118 return " > ".join(breadcrumb_parts)
120 @staticmethod
121 def get_section_path(
122 header_item: dict[str, Any], structure: list[dict[str, Any]]
123 ) -> list[str]:
124 """Get the path of parent headers for a section.
126 Args:
127 header_item: The header item
128 structure: The document structure
130 Returns:
131 List of parent section titles
132 """
133 path = []
134 current_level = header_item["level"]
136 # Go backward through structure to find parent headers
137 for item in reversed(structure[: structure.index(header_item)]):
138 if item["type"] == "header" and item["level"] < current_level:
139 path.insert(0, item["title"])
140 current_level = item["level"]
142 return path
145class DocumentParser:
146 """Parses markdown documents into structured representations."""
148 def __init__(self):
149 """Initialize the document parser."""
150 self.section_identifier = SectionIdentifier()
151 self.hierarchy_builder = HierarchyBuilder()
153 def parse_document_structure(self, text: str) -> list[dict[str, Any]]:
154 """Parse document into a structured representation.
156 Args:
157 text: The document text
159 Returns:
160 List of dictionaries representing document elements
161 """
162 elements = []
163 lines = text.split("\n")
164 current_block = []
165 in_code_block = False
167 for line in lines:
168 # Check for code block markers
169 if line.startswith("```"):
170 in_code_block = not in_code_block
171 current_block.append(line)
172 continue
174 # Inside code block, just accumulate lines
175 if in_code_block:
176 current_block.append(line)
177 continue
179 # Check for headers
180 header_match = re.match(r"^(#{1,6})\s+(.*?)$", line)
181 if header_match and not in_code_block:
182 # If we have a current block, save it
183 if current_block:
184 elements.append(
185 {
186 "type": "content",
187 "text": "\n".join(current_block),
188 "level": 0,
189 }
190 )
191 current_block = []
193 # Save the header
194 level = len(header_match.group(1))
195 elements.append(
196 {
197 "type": "header",
198 "text": line,
199 "level": level,
200 "title": header_match.group(2).strip(),
201 }
202 )
203 else:
204 current_block.append(line)
206 # Save the last block if not empty
207 if current_block:
208 elements.append(
209 {"type": "content", "text": "\n".join(current_block), "level": 0}
210 )
212 return elements
214 def extract_section_metadata(self, section: Section) -> dict[str, Any]:
215 """Extract metadata from a section.
217 Args:
218 section: The section to analyze
220 Returns:
221 Dictionary containing section metadata
222 """
223 metadata = {
224 "type": section.type.value,
225 "level": section.level,
226 "word_count": len(section.content.split()),
227 "char_count": len(section.content),
228 "has_code": bool(re.search(r"```", section.content)),
229 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", section.content)),
230 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", section.content)),
231 "is_top_level": section.level <= 2, # Mark top-level sections
232 }
234 # Add parent section info if available
235 if section.parent:
236 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", section.parent.content)
237 if header_match:
238 parent_title = header_match.group(2).strip()
239 metadata["parent_title"] = parent_title
240 metadata["parent_level"] = section.parent.level
242 # Add breadcrumb path for hierarchical context
243 breadcrumb = self.hierarchy_builder.build_section_breadcrumb(section)
244 if breadcrumb:
245 metadata["breadcrumb"] = breadcrumb
247 return metadata
249 def extract_section_title(self, chunk: str) -> str:
250 """Extract section title from a chunk.
252 Args:
253 chunk: The text chunk
255 Returns:
256 Section title or default title
257 """
258 # Try to find header at the beginning of the chunk
259 header_match = re.match(r"^(#{1,6})\s+(.*?)(?:\n|$)", chunk)
260 if header_match:
261 return header_match.group(2).strip()
263 # Try to find the first sentence if no header
264 first_sentence_match = re.match(r"^([^\.!?]+[\.!?])", chunk)
265 if first_sentence_match:
266 title = first_sentence_match.group(1).strip()
267 # Truncate if too long
268 if len(title) > 50:
269 title = title[:50] + "..."
270 return title
272 return "Untitled Section"