Coverage for src / qdrant_loader / core / file_conversion / sub_table_detector.py: 98%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-06-11 09:38 +0000

1"""Connected-components sub-table detector for spreadsheet sheets. 

2 

3Algorithm reference: https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/partition/xlsx.py 

4""" 

5 

6from __future__ import annotations 

7 

8from dataclasses import dataclass 

9 

10import networkx as nx 

11import pandas as pd 

12 

13 

14@dataclass(frozen=True) 

15class _BoundingBox: 

16 row_min: int 

17 row_max: int 

18 col_min: int 

19 col_max: int 

20 

21 def overlaps_row_wise(self, other: _BoundingBox) -> bool: 

22 return not (self.row_max < other.row_min or other.row_max < self.row_min) 

23 

24 

25def _box_width(box: _BoundingBox) -> int: 

26 return box.col_max - box.col_min + 1 

27 

28 

29class SubTableDetector: 

30 """Detect logical sub-tables within a single header-less sheet DataFrame.""" 

31 

32 def detect(self, sheet: pd.DataFrame) -> list[pd.DataFrame]: 

33 """Return one DataFrame per detected sub-table, with NaN padding stripped.""" 

34 if sheet.empty or sheet.isna().all().all(): 

35 return [] 

36 

37 graph = self._build_cell_graph(sheet) 

38 components = [c for c in nx.connected_components(graph) if c] 

39 boxes = [self._bounding_box(c) for c in components] 

40 merged = self._merge_row_overlapping(boxes) 

41 

42 tables: list[pd.DataFrame] = [] 

43 for box in merged: 

44 sub = sheet.iloc[ 

45 box.row_min : box.row_max + 1, box.col_min : box.col_max + 1 

46 ] 

47 sub = sub.dropna(axis=0, how="all").dropna(axis=1, how="all") 

48 sub = sub.reset_index(drop=True) 

49 sub.columns = range(sub.shape[1]) 

50 if not sub.empty: 

51 tables.append(sub) 

52 return tables 

53 

54 @staticmethod 

55 def _build_cell_graph(sheet: pd.DataFrame) -> nx.Graph: 

56 graph = nx.Graph() 

57 nrows, ncols = sheet.shape 

58 non_empty = ~sheet.isna() 

59 for r in range(nrows): 

60 for c in range(ncols): 

61 if not non_empty.iat[r, c]: 

62 continue 

63 graph.add_node((r, c)) 

64 for dr, dc in ( 

65 (-1, 0), 

66 (0, -1), 

67 ): # 4-connected: only up and left needed 

68 rr, cc = r + dr, c + dc 

69 if 0 <= rr < nrows and 0 <= cc < ncols and non_empty.iat[rr, cc]: 

70 graph.add_edge((r, c), (rr, cc)) 

71 return graph 

72 

73 @staticmethod 

74 def _bounding_box(component: set[tuple[int, int]]) -> _BoundingBox: 

75 rows = [r for r, _ in component] 

76 cols = [c for _, c in component] 

77 return _BoundingBox( 

78 row_min=min(rows), 

79 row_max=max(rows), 

80 col_min=min(cols), 

81 col_max=max(cols), 

82 ) 

83 

84 @staticmethod 

85 def _merge_row_overlapping(boxes: list[_BoundingBox]) -> list[_BoundingBox]: 

86 """Merge boxes that share row range when one is a narrow annotation column. 

87 

88 Two equal-width tables side-by-side (e.g. ``[A,B] | blank | [C,D]``) are 

89 intentionally NOT merged — those are independent logical tables. We only 

90 merge when one component is a single column wide, which matches the 

91 "main table + adjacent notes column" pattern. 

92 """ 

93 if not boxes: 

94 return [] 

95 boxes = sorted(boxes, key=lambda b: (b.row_min, b.col_min)) 

96 merged: list[_BoundingBox] = [boxes[0]] 

97 for box in boxes[1:]: 

98 last = merged[-1] 

99 if box.overlaps_row_wise(last) and ( 

100 _box_width(box) == 1 or _box_width(last) == 1 

101 ): 

102 merged[-1] = _BoundingBox( 

103 row_min=min(last.row_min, box.row_min), 

104 row_max=max(last.row_max, box.row_max), 

105 col_min=min(last.col_min, box.col_min), 

106 col_max=max(last.col_max, box.col_max), 

107 ) 

108 else: 

109 merged.append(box) 

110 return merged