Coverage for src/qdrant_loader/core/chunking/strategy/markdown/document_parser.py: 99%

115 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-08 06:05 +0000

1"""Document parsing for markdown chunking strategy.""" 

2 

3import re 

4from dataclasses import dataclass, field 

5from enum import Enum 

6from typing import Any, Optional 

7 

8import structlog 

9 

10logger = structlog.get_logger(__name__) 

11 

12 

13class SectionType(Enum): 

14 """Types of sections in a markdown document.""" 

15 

16 HEADER = "header" 

17 CODE_BLOCK = "code_block" 

18 LIST = "list" 

19 TABLE = "table" 

20 QUOTE = "quote" 

21 PARAGRAPH = "paragraph" 

22 

23 

24@dataclass 

25class Section: 

26 """Represents a section in a markdown document.""" 

27 

28 content: str 

29 level: int = 0 

30 type: SectionType = SectionType.PARAGRAPH 

31 parent: Optional["Section"] = None 

32 children: list["Section"] = field(default_factory=list) 

33 

34 def add_child(self, child: "Section"): 

35 """Add a child section.""" 

36 self.children.append(child) 

37 child.parent = self 

38 

39 

40class SectionIdentifier: 

41 """Identifies section types based on content patterns.""" 

42 

43 @staticmethod 

44 def identify_section_type(content: str) -> SectionType: 

45 """Identify the type of section based on its content. 

46 

47 Args: 

48 content: The section content to analyze 

49 

50 Returns: 

51 SectionType enum indicating the type of section 

52 """ 

53 if not content.strip(): 

54 return SectionType.PARAGRAPH 

55 

56 # Headers: # followed by space 

57 if re.match(r"^#{1,6}\s+", content): 

58 return SectionType.HEADER 

59 

60 # Code blocks: ``` or ~~~ (fenced) or 4+ spaces/tab indentation 

61 if ( 

62 re.search(r"^```", content, re.MULTILINE) 

63 or re.search(r"^~~~", content, re.MULTILINE) 

64 or re.match(r"^ ", content) 

65 or re.match(r"^\t", content) 

66 ): 

67 return SectionType.CODE_BLOCK 

68 

69 # Lists: -, *, + followed by space, or numbered lists 

70 if re.match(r"^[*+-]\s+", content) or re.match(r"^\d+[.)]\s+", content): 

71 return SectionType.LIST 

72 

73 # Tables: lines with pipes or markdown table format 

74 if ( 

75 re.search(r"^\|", content, re.MULTILINE) 

76 or re.search(r"\|.*\|", content) 

77 or re.search(r"^\s*[-:]+\s*\|\s*[-:]+", content, re.MULTILINE) 

78 ): 

79 return SectionType.TABLE 

80 

81 # Quotes: > followed by space (or end of line) 

82 if re.match(r"^>\s", content): 

83 return SectionType.QUOTE 

84 

85 return SectionType.PARAGRAPH 

86 

87 

88class HierarchyBuilder: 

89 """Builds hierarchical section relationships.""" 

90 

91 @staticmethod 

92 def build_section_breadcrumb(section: Section) -> str: 

93 """Build a breadcrumb path of section titles to capture hierarchy. 

94 

95 Args: 

96 section: The section to build breadcrumb for 

97 

98 Returns: 

99 String representing the hierarchical path 

100 """ 

101 breadcrumb_parts = [] 

102 current = section 

103 

104 # Walk up the parent chain to build the breadcrumb 

105 while current.parent: 

106 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", current.parent.content) 

107 if header_match: 

108 parent_title = header_match.group(2).strip() 

109 breadcrumb_parts.insert(0, parent_title) 

110 current = current.parent 

111 

112 # Add current section 

113 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", section.content) 

114 if header_match: 

115 title = header_match.group(2).strip() 

116 breadcrumb_parts.append(title) 

117 

118 return " > ".join(breadcrumb_parts) 

119 

120 @staticmethod 

121 def get_section_path( 

122 header_item: dict[str, Any], structure: list[dict[str, Any]] 

123 ) -> list[str]: 

124 """Get the path of parent headers for a section. 

125 

126 Args: 

127 header_item: The header item 

128 structure: The document structure 

129 

130 Returns: 

131 List of parent section titles 

132 """ 

133 path = [] 

134 current_level = header_item["level"] 

135 

136 # Go backward through structure to find parent headers 

137 for item in reversed(structure[: structure.index(header_item)]): 

138 if item["type"] == "header" and item["level"] < current_level: 

139 path.insert(0, item["title"]) 

140 current_level = item["level"] 

141 

142 return path 

143 

144 

145class DocumentParser: 

146 """Parses markdown documents into structured representations.""" 

147 

148 def __init__(self): 

149 """Initialize the document parser.""" 

150 self.section_identifier = SectionIdentifier() 

151 self.hierarchy_builder = HierarchyBuilder() 

152 

153 def parse_document_structure(self, text: str) -> list[dict[str, Any]]: 

154 """Parse document into a structured representation. 

155 

156 Args: 

157 text: The document text 

158 

159 Returns: 

160 List of dictionaries representing document elements 

161 """ 

162 elements = [] 

163 lines = text.split("\n") 

164 current_block = [] 

165 in_code_block = False 

166 

167 for line in lines: 

168 # Check for code block markers 

169 if line.startswith("```"): 

170 in_code_block = not in_code_block 

171 current_block.append(line) 

172 continue 

173 

174 # Inside code block, just accumulate lines 

175 if in_code_block: 

176 current_block.append(line) 

177 continue 

178 

179 # Check for headers 

180 header_match = re.match(r"^(#{1,6})\s+(.*?)$", line) 

181 if header_match and not in_code_block: 

182 # If we have a current block, save it 

183 if current_block: 

184 elements.append( 

185 { 

186 "type": "content", 

187 "text": "\n".join(current_block), 

188 "level": 0, 

189 } 

190 ) 

191 current_block = [] 

192 

193 # Save the header 

194 level = len(header_match.group(1)) 

195 elements.append( 

196 { 

197 "type": "header", 

198 "text": line, 

199 "level": level, 

200 "title": header_match.group(2).strip(), 

201 } 

202 ) 

203 else: 

204 current_block.append(line) 

205 

206 # Save the last block if not empty 

207 if current_block: 

208 elements.append( 

209 {"type": "content", "text": "\n".join(current_block), "level": 0} 

210 ) 

211 

212 return elements 

213 

214 def extract_section_metadata(self, section: Section) -> dict[str, Any]: 

215 """Extract metadata from a section. 

216 

217 Args: 

218 section: The section to analyze 

219 

220 Returns: 

221 Dictionary containing section metadata 

222 """ 

223 metadata = { 

224 "type": section.type.value, 

225 "level": section.level, 

226 "word_count": len(section.content.split()), 

227 "char_count": len(section.content), 

228 "has_code": bool(re.search(r"```", section.content)), 

229 "has_links": bool(re.search(r"\[.*?\]\(.*?\)", section.content)), 

230 "has_images": bool(re.search(r"!\[.*?\]\(.*?\)", section.content)), 

231 "is_top_level": section.level <= 2, # Mark top-level sections 

232 } 

233 

234 # Add parent section info if available 

235 if section.parent: 

236 header_match = re.match(r"^(#+)\s+(.*?)(?:\n|$)", section.parent.content) 

237 if header_match: 

238 parent_title = header_match.group(2).strip() 

239 metadata["parent_title"] = parent_title 

240 metadata["parent_level"] = section.parent.level 

241 

242 # Add breadcrumb path for hierarchical context 

243 breadcrumb = self.hierarchy_builder.build_section_breadcrumb(section) 

244 if breadcrumb: 

245 metadata["breadcrumb"] = breadcrumb 

246 

247 return metadata 

248 

249 def extract_section_title(self, chunk: str) -> str: 

250 """Extract section title from a chunk. 

251 

252 Args: 

253 chunk: The text chunk 

254 

255 Returns: 

256 Section title or default title 

257 """ 

258 # Try to find header at the beginning of the chunk 

259 header_match = re.match(r"^(#{1,6})\s+(.*?)(?:\n|$)", chunk) 

260 if header_match: 

261 return header_match.group(2).strip() 

262 

263 # Try to find the first sentence if no header 

264 first_sentence_match = re.match(r"^([^\.!?]+[\.!?])", chunk) 

265 if first_sentence_match: 

266 title = first_sentence_match.group(1).strip() 

267 # Truncate if too long 

268 if len(title) > 50: 

269 title = title[:50] + "..." 

270 return title 

271 

272 return "Untitled Section"