Coverage for src / qdrant_loader / core / chunking / strategy / markdown / splitters / row_kv_excel.py: 95%
115 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Row-level KV chunking for converted xlsx documents.
3Implements the structure-aware chunking technique from arXiv 2605.00318
4("Structure-Aware Chunking for Tabular Data in Retrieval-Augmented Generation"):
5each row becomes a key-value block, prefixed with sheet/subtable/column context.
6"""
8from __future__ import annotations
10import re
11from collections.abc import Iterable
12from dataclasses import dataclass
14import structlog
16from qdrant_loader.core.chunking.strategy.markdown.splitters.base import BaseSplitter
17from qdrant_loader.core.file_conversion.xlsx_markdown_format import SHEET_HEADING_RE
19logger = structlog.get_logger(__name__)
21# Split on `|` only when it is NOT preceded by a backslash (escaped pipe).
22_UNESCAPED_PIPE_RE = re.compile(r"(?<!\\)\|")
24# Separator between row blocks within a chunk body — a blank line.
25_ROW_SEP = "\n\n"
28@dataclass(frozen=True)
29class _SubTableContext:
30 sheet: str
31 subtable: int | None
32 columns: tuple[str, ...]
35class MarkdownTableParser:
36 """Parse `## Sheet: ... / Subtable: N` sections back into structured rows."""
38 def parse(
39 self, content: str
40 ) -> list[tuple[_SubTableContext, list[dict[str, str]]]]:
41 sections: list[tuple[_SubTableContext, list[dict[str, str]]]] = []
42 matches = list(SHEET_HEADING_RE.finditer(content))
43 for i, m in enumerate(matches):
44 start = m.end()
45 end = matches[i + 1].start() if i + 1 < len(matches) else len(content)
46 block = content[start:end]
47 columns, rows = self._parse_table(block)
48 if not columns:
49 continue
50 ctx = _SubTableContext(
51 sheet=m.group("sheet").strip(),
52 subtable=int(m.group("idx")) if m.group("idx") else None,
53 columns=columns,
54 )
55 sections.append((ctx, rows))
56 return sections
58 @staticmethod
59 def _parse_table(block: str) -> tuple[tuple[str, ...], list[dict[str, str]]]:
60 lines = [ln.strip() for ln in block.splitlines() if ln.strip().startswith("|")]
61 if len(lines) < 2:
62 return (), []
63 header_cells = MarkdownTableParser._row_cells(lines[0])
64 body: list[dict[str, str]] = []
65 dropped = 0
66 # lines[1] is the separator (`|---|---|`); skip it.
67 for line in lines[2:]:
68 cells = MarkdownTableParser._row_cells(line)
69 if len(cells) != len(header_cells):
70 dropped += 1
71 continue
72 body.append(dict(zip(header_cells, cells, strict=True)))
73 if dropped:
74 logger.debug(
75 "row_kv_excel: parser dropped rows whose cell count != header count",
76 dropped=dropped,
77 kept=len(body),
78 header_cells=len(header_cells),
79 )
80 return tuple(header_cells), body
82 @staticmethod
83 def _row_cells(line: str) -> list[str]:
84 # Strip the leading and trailing pipe before splitting on UNESCAPED
85 # pipes (the converter escapes literal `|` in cell values as `\|` so
86 # they don't break this split). After splitting, unescape each cell.
87 inner = line.strip().strip("|")
88 cells = _UNESCAPED_PIPE_RE.split(inner)
89 return [c.strip().replace(r"\|", "|") for c in cells]
92class RowChunkContextualizer:
93 """Render a slice of rows with contextual preamble for embedding."""
95 def preamble(self, ctx: _SubTableContext) -> str:
96 """Render the sheet/subtable/columns header that prefixes every chunk."""
97 lines: list[str] = [f"Sheet: {ctx.sheet}"]
98 if ctx.subtable is not None:
99 lines.append(f"Subtable: {ctx.subtable}")
100 lines.append(f"Columns: {', '.join(ctx.columns)}")
101 return "\n".join(lines)
103 def row_block(self, ctx: _SubTableContext, row: dict[str, str]) -> str:
104 """Render one row as a KV block (no trailing separator)."""
105 lines: list[str] = ["Row:"]
106 for col in ctx.columns:
107 value = row.get(col, "")
108 if value == "":
109 continue
110 lines.append(f" {col}: {value}")
111 return "\n".join(lines)
113 def build(self, ctx: _SubTableContext, rows: Iterable[dict[str, str]]) -> str:
114 """Render preamble + body for the given rows. Compatible output format."""
115 preamble = self.preamble(ctx)
116 blocks = [self.row_block(ctx, r) for r in rows]
117 if not blocks:
118 return f"{preamble}\n"
119 return f"{preamble}\n\n{_ROW_SEP.join(blocks)}\n"
122class RowKVChunker:
123 """Pack rows into chunks under a character budget, re-emitting context per chunk.
125 Single-pass O(N): pre-renders each row block once, then walks them while
126 tracking the projected chunk length. Only assembles the full chunk string
127 at emit time, not on every probe.
128 """
130 def __init__(self) -> None:
131 self._renderer = RowChunkContextualizer()
133 def chunk(
134 self,
135 ctx: _SubTableContext,
136 rows: list[dict[str, str]],
137 max_size: int,
138 ) -> list[str]:
139 if not rows:
140 return []
141 preamble = self._renderer.preamble(ctx)
142 blocks = [self._renderer.row_block(ctx, r) for r in rows]
143 # Chunk shape: "{preamble}\n\n{block}{_ROW_SEP}{block}...\n"
144 # — preamble + "\n\n" + (N blocks joined by _ROW_SEP) + trailing "\n".
145 overhead = len(preamble) + 2 + 1 # "\n\n" after preamble + trailing "\n"
146 sep_len = len(_ROW_SEP)
148 chunks: list[str] = []
149 current: list[str] = []
150 current_body_len = 0
151 for block in blocks:
152 addition = len(block) if not current else len(block) + sep_len
153 if current and overhead + current_body_len + addition > max_size:
154 chunks.append(f"{preamble}\n\n{_ROW_SEP.join(current)}\n")
155 current = [block]
156 current_body_len = len(block)
157 else:
158 current.append(block)
159 current_body_len += addition
160 if current:
161 chunks.append(f"{preamble}\n\n{_ROW_SEP.join(current)}\n")
162 return chunks
165class RowKVExcelSplitter(BaseSplitter):
166 """BaseSplitter that emits row-level KV chunks for converted xlsx content.
168 Type-substitutable for the legacy ExcelSplitter at the BaseSplitter
169 interface — same `split_content(content, max_size) -> list[str]` signature
170 and the same dispatch site at section_splitter.py:104. The output shape is
171 materially different, however: ExcelSplitter emits markdown-table fragments,
172 while RowKVExcelSplitter emits prose-like context-prefixed KV blocks (the
173 STC technique from arXiv 2605.00318). Downstream consumers that re-parsed
174 chunks as markdown tables need updating; in-tree consumers (embedding,
175 reranking, retrieval) treat chunks as opaque text.
176 """
178 def __init__(self, settings) -> None:
179 super().__init__(settings)
180 self._parser = MarkdownTableParser()
181 self._chunker = RowKVChunker()
183 def split_content(self, content: str, max_size: int) -> list[str]:
184 sections = self._parser.parse(content)
185 if not sections:
186 # No structured heading — preserve content as a single chunk so
187 # upstream mis-classification doesn't drop content.
188 return [content]
190 # Match legacy ExcelSplitter: bound per-section output so a 50k-row
191 # sheet doesn't silently emit 50k chunks.
192 md_cfg = self.settings.global_config.chunking.strategies.markdown
193 per_section_cap = min(
194 md_cfg.max_chunks_per_section,
195 self.settings.global_config.chunking.max_chunks_per_document // 2,
196 )
198 chunks: list[str] = []
199 for ctx, rows in sections:
200 section_chunks = self._chunker.chunk(ctx, rows, max_size=max_size)
201 if len(section_chunks) > per_section_cap:
202 logger.warning(
203 "row_kv_excel: section reached max_chunks_per_section cap, truncating",
204 sheet=ctx.sheet,
205 subtable=ctx.subtable,
206 produced=len(section_chunks),
207 cap=per_section_cap,
208 rows_dropped=len(rows)
209 - sum(c.count("Row:") for c in section_chunks[:per_section_cap]),
210 )
211 section_chunks = section_chunks[:per_section_cap]
212 chunks.extend(section_chunks)
213 return chunks