Coverage for src / qdrant_loader / core / file_conversion / sub_table_detector.py: 98%
63 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-06-11 09:38 +0000
1"""Connected-components sub-table detector for spreadsheet sheets.
3Algorithm reference: https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/partition/xlsx.py
4"""
6from __future__ import annotations
8from dataclasses import dataclass
10import networkx as nx
11import pandas as pd
14@dataclass(frozen=True)
15class _BoundingBox:
16 row_min: int
17 row_max: int
18 col_min: int
19 col_max: int
21 def overlaps_row_wise(self, other: _BoundingBox) -> bool:
22 return not (self.row_max < other.row_min or other.row_max < self.row_min)
25def _box_width(box: _BoundingBox) -> int:
26 return box.col_max - box.col_min + 1
29class SubTableDetector:
30 """Detect logical sub-tables within a single header-less sheet DataFrame."""
32 def detect(self, sheet: pd.DataFrame) -> list[pd.DataFrame]:
33 """Return one DataFrame per detected sub-table, with NaN padding stripped."""
34 if sheet.empty or sheet.isna().all().all():
35 return []
37 graph = self._build_cell_graph(sheet)
38 components = [c for c in nx.connected_components(graph) if c]
39 boxes = [self._bounding_box(c) for c in components]
40 merged = self._merge_row_overlapping(boxes)
42 tables: list[pd.DataFrame] = []
43 for box in merged:
44 sub = sheet.iloc[
45 box.row_min : box.row_max + 1, box.col_min : box.col_max + 1
46 ]
47 sub = sub.dropna(axis=0, how="all").dropna(axis=1, how="all")
48 sub = sub.reset_index(drop=True)
49 sub.columns = range(sub.shape[1])
50 if not sub.empty:
51 tables.append(sub)
52 return tables
54 @staticmethod
55 def _build_cell_graph(sheet: pd.DataFrame) -> nx.Graph:
56 graph = nx.Graph()
57 nrows, ncols = sheet.shape
58 non_empty = ~sheet.isna()
59 for r in range(nrows):
60 for c in range(ncols):
61 if not non_empty.iat[r, c]:
62 continue
63 graph.add_node((r, c))
64 for dr, dc in (
65 (-1, 0),
66 (0, -1),
67 ): # 4-connected: only up and left needed
68 rr, cc = r + dr, c + dc
69 if 0 <= rr < nrows and 0 <= cc < ncols and non_empty.iat[rr, cc]:
70 graph.add_edge((r, c), (rr, cc))
71 return graph
73 @staticmethod
74 def _bounding_box(component: set[tuple[int, int]]) -> _BoundingBox:
75 rows = [r for r, _ in component]
76 cols = [c for _, c in component]
77 return _BoundingBox(
78 row_min=min(rows),
79 row_max=max(rows),
80 col_min=min(cols),
81 col_max=max(cols),
82 )
84 @staticmethod
85 def _merge_row_overlapping(boxes: list[_BoundingBox]) -> list[_BoundingBox]:
86 """Merge boxes that share row range when one is a narrow annotation column.
88 Two equal-width tables side-by-side (e.g. ``[A,B] | blank | [C,D]``) are
89 intentionally NOT merged — those are independent logical tables. We only
90 merge when one component is a single column wide, which matches the
91 "main table + adjacent notes column" pattern.
92 """
93 if not boxes:
94 return []
95 boxes = sorted(boxes, key=lambda b: (b.row_min, b.col_min))
96 merged: list[_BoundingBox] = [boxes[0]]
97 for box in boxes[1:]:
98 last = merged[-1]
99 if box.overlaps_row_wise(last) and (
100 _box_width(box) == 1 or _box_width(last) == 1
101 ):
102 merged[-1] = _BoundingBox(
103 row_min=min(last.row_min, box.row_min),
104 row_max=max(last.row_max, box.row_max),
105 col_min=min(last.col_min, box.col_min),
106 col_max=max(last.col_max, box.col_max),
107 )
108 else:
109 merged.append(box)
110 return merged