Coverage for src / qdrant_loader / core / chunking / strategy / markdown / splitters / row_kv_excel.py: 95%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Row-level KV chunking for converted xlsx documents. 

2 

3Implements the structure-aware chunking technique from arXiv 2605.00318 

4("Structure-Aware Chunking for Tabular Data in Retrieval-Augmented Generation"): 

5each row becomes a key-value block, prefixed with sheet/subtable/column context. 

6""" 

7 

8from __future__ import annotations 

9 

10import re 

11from collections.abc import Iterable 

12from dataclasses import dataclass 

13 

14import structlog 

15 

16from qdrant_loader.core.chunking.strategy.markdown.splitters.base import BaseSplitter 

17from qdrant_loader.core.file_conversion.xlsx_markdown_format import SHEET_HEADING_RE 

18 

19logger = structlog.get_logger(__name__) 

20 

21# Split on `|` only when it is NOT preceded by a backslash (escaped pipe). 

22_UNESCAPED_PIPE_RE = re.compile(r"(?<!\\)\|") 

23 

24# Separator between row blocks within a chunk body — a blank line. 

25_ROW_SEP = "\n\n" 

26 

27 

28@dataclass(frozen=True) 

29class _SubTableContext: 

30 sheet: str 

31 subtable: int | None 

32 columns: tuple[str, ...] 

33 

34 

35class MarkdownTableParser: 

36 """Parse `## Sheet: ... / Subtable: N` sections back into structured rows.""" 

37 

38 def parse( 

39 self, content: str 

40 ) -> list[tuple[_SubTableContext, list[dict[str, str]]]]: 

41 sections: list[tuple[_SubTableContext, list[dict[str, str]]]] = [] 

42 matches = list(SHEET_HEADING_RE.finditer(content)) 

43 for i, m in enumerate(matches): 

44 start = m.end() 

45 end = matches[i + 1].start() if i + 1 < len(matches) else len(content) 

46 block = content[start:end] 

47 columns, rows = self._parse_table(block) 

48 if not columns: 

49 continue 

50 ctx = _SubTableContext( 

51 sheet=m.group("sheet").strip(), 

52 subtable=int(m.group("idx")) if m.group("idx") else None, 

53 columns=columns, 

54 ) 

55 sections.append((ctx, rows)) 

56 return sections 

57 

58 @staticmethod 

59 def _parse_table(block: str) -> tuple[tuple[str, ...], list[dict[str, str]]]: 

60 lines = [ln.strip() for ln in block.splitlines() if ln.strip().startswith("|")] 

61 if len(lines) < 2: 

62 return (), [] 

63 header_cells = MarkdownTableParser._row_cells(lines[0]) 

64 body: list[dict[str, str]] = [] 

65 dropped = 0 

66 # lines[1] is the separator (`|---|---|`); skip it. 

67 for line in lines[2:]: 

68 cells = MarkdownTableParser._row_cells(line) 

69 if len(cells) != len(header_cells): 

70 dropped += 1 

71 continue 

72 body.append(dict(zip(header_cells, cells, strict=True))) 

73 if dropped: 

74 logger.debug( 

75 "row_kv_excel: parser dropped rows whose cell count != header count", 

76 dropped=dropped, 

77 kept=len(body), 

78 header_cells=len(header_cells), 

79 ) 

80 return tuple(header_cells), body 

81 

82 @staticmethod 

83 def _row_cells(line: str) -> list[str]: 

84 # Strip the leading and trailing pipe before splitting on UNESCAPED 

85 # pipes (the converter escapes literal `|` in cell values as `\|` so 

86 # they don't break this split). After splitting, unescape each cell. 

87 inner = line.strip().strip("|") 

88 cells = _UNESCAPED_PIPE_RE.split(inner) 

89 return [c.strip().replace(r"\|", "|") for c in cells] 

90 

91 

92class RowChunkContextualizer: 

93 """Render a slice of rows with contextual preamble for embedding.""" 

94 

95 def preamble(self, ctx: _SubTableContext) -> str: 

96 """Render the sheet/subtable/columns header that prefixes every chunk.""" 

97 lines: list[str] = [f"Sheet: {ctx.sheet}"] 

98 if ctx.subtable is not None: 

99 lines.append(f"Subtable: {ctx.subtable}") 

100 lines.append(f"Columns: {', '.join(ctx.columns)}") 

101 return "\n".join(lines) 

102 

103 def row_block(self, ctx: _SubTableContext, row: dict[str, str]) -> str: 

104 """Render one row as a KV block (no trailing separator).""" 

105 lines: list[str] = ["Row:"] 

106 for col in ctx.columns: 

107 value = row.get(col, "") 

108 if value == "": 

109 continue 

110 lines.append(f" {col}: {value}") 

111 return "\n".join(lines) 

112 

113 def build(self, ctx: _SubTableContext, rows: Iterable[dict[str, str]]) -> str: 

114 """Render preamble + body for the given rows. Compatible output format.""" 

115 preamble = self.preamble(ctx) 

116 blocks = [self.row_block(ctx, r) for r in rows] 

117 if not blocks: 

118 return f"{preamble}\n" 

119 return f"{preamble}\n\n{_ROW_SEP.join(blocks)}\n" 

120 

121 

122class RowKVChunker: 

123 """Pack rows into chunks under a character budget, re-emitting context per chunk. 

124 

125 Single-pass O(N): pre-renders each row block once, then walks them while 

126 tracking the projected chunk length. Only assembles the full chunk string 

127 at emit time, not on every probe. 

128 """ 

129 

130 def __init__(self) -> None: 

131 self._renderer = RowChunkContextualizer() 

132 

133 def chunk( 

134 self, 

135 ctx: _SubTableContext, 

136 rows: list[dict[str, str]], 

137 max_size: int, 

138 ) -> list[str]: 

139 if not rows: 

140 return [] 

141 preamble = self._renderer.preamble(ctx) 

142 blocks = [self._renderer.row_block(ctx, r) for r in rows] 

143 # Chunk shape: "{preamble}\n\n{block}{_ROW_SEP}{block}...\n" 

144 # — preamble + "\n\n" + (N blocks joined by _ROW_SEP) + trailing "\n". 

145 overhead = len(preamble) + 2 + 1 # "\n\n" after preamble + trailing "\n" 

146 sep_len = len(_ROW_SEP) 

147 

148 chunks: list[str] = [] 

149 current: list[str] = [] 

150 current_body_len = 0 

151 for block in blocks: 

152 addition = len(block) if not current else len(block) + sep_len 

153 if current and overhead + current_body_len + addition > max_size: 

154 chunks.append(f"{preamble}\n\n{_ROW_SEP.join(current)}\n") 

155 current = [block] 

156 current_body_len = len(block) 

157 else: 

158 current.append(block) 

159 current_body_len += addition 

160 if current: 

161 chunks.append(f"{preamble}\n\n{_ROW_SEP.join(current)}\n") 

162 return chunks 

163 

164 

165class RowKVExcelSplitter(BaseSplitter): 

166 """BaseSplitter that emits row-level KV chunks for converted xlsx content. 

167 

168 Type-substitutable for the legacy ExcelSplitter at the BaseSplitter 

169 interface — same `split_content(content, max_size) -> list[str]` signature 

170 and the same dispatch site at section_splitter.py:104. The output shape is 

171 materially different, however: ExcelSplitter emits markdown-table fragments, 

172 while RowKVExcelSplitter emits prose-like context-prefixed KV blocks (the 

173 STC technique from arXiv 2605.00318). Downstream consumers that re-parsed 

174 chunks as markdown tables need updating; in-tree consumers (embedding, 

175 reranking, retrieval) treat chunks as opaque text. 

176 """ 

177 

178 def __init__(self, settings) -> None: 

179 super().__init__(settings) 

180 self._parser = MarkdownTableParser() 

181 self._chunker = RowKVChunker() 

182 

183 def split_content(self, content: str, max_size: int) -> list[str]: 

184 sections = self._parser.parse(content) 

185 if not sections: 

186 # No structured heading — preserve content as a single chunk so 

187 # upstream mis-classification doesn't drop content. 

188 return [content] 

189 

190 # Match legacy ExcelSplitter: bound per-section output so a 50k-row 

191 # sheet doesn't silently emit 50k chunks. 

192 md_cfg = self.settings.global_config.chunking.strategies.markdown 

193 per_section_cap = min( 

194 md_cfg.max_chunks_per_section, 

195 self.settings.global_config.chunking.max_chunks_per_document // 2, 

196 ) 

197 

198 chunks: list[str] = [] 

199 for ctx, rows in sections: 

200 section_chunks = self._chunker.chunk(ctx, rows, max_size=max_size) 

201 if len(section_chunks) > per_section_cap: 

202 logger.warning( 

203 "row_kv_excel: section reached max_chunks_per_section cap, truncating", 

204 sheet=ctx.sheet, 

205 subtable=ctx.subtable, 

206 produced=len(section_chunks), 

207 cap=per_section_cap, 

208 rows_dropped=len(rows) 

209 - sum(c.count("Row:") for c in section_chunks[:per_section_cap]), 

210 ) 

211 section_chunks = section_chunks[:per_section_cap] 

212 chunks.extend(section_chunks) 

213 return chunks