table_merge.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. # Copyright (c) Opendatalab. All rights reserved.
  2. from loguru import logger
  3. from bs4 import BeautifulSoup
  4. from mineru.utils.enum_class import BlockType, SplitFlag
  5. def full_to_half(text: str) -> str:
  6. """Convert full-width characters to half-width characters using code point manipulation.
  7. Args:
  8. text: String containing full-width characters
  9. Returns:
  10. String with full-width characters converted to half-width
  11. """
  12. result = []
  13. for char in text:
  14. code = ord(char)
  15. # Full-width letters, numbers and punctuation (FF01-FF5E)
  16. if 0xFF01 <= code <= 0xFF5E:
  17. result.append(chr(code - 0xFEE0)) # Shift to ASCII range
  18. else:
  19. result.append(char)
  20. return ''.join(result)
  21. def calculate_table_total_columns(soup):
  22. """计算表格的总列数,通过分析整个表格结构来处理rowspan和colspan
  23. Args:
  24. soup: BeautifulSoup解析的表格
  25. Returns:
  26. int: 表格的总列数
  27. """
  28. rows = soup.find_all("tr")
  29. if not rows:
  30. return 0
  31. # 创建一个矩阵来跟踪每个位置的占用情况
  32. max_cols = 0
  33. occupied = {} # {row_idx: {col_idx: True}}
  34. for row_idx, row in enumerate(rows):
  35. col_idx = 0
  36. cells = row.find_all(["td", "th"])
  37. if row_idx not in occupied:
  38. occupied[row_idx] = {}
  39. for cell in cells:
  40. # 找到下一个未被占用的列位置
  41. while col_idx in occupied[row_idx]:
  42. col_idx += 1
  43. colspan = int(cell.get("colspan", 1))
  44. rowspan = int(cell.get("rowspan", 1))
  45. # 标记被这个单元格占用的所有位置
  46. for r in range(row_idx, row_idx + rowspan):
  47. if r not in occupied:
  48. occupied[r] = {}
  49. for c in range(col_idx, col_idx + colspan):
  50. occupied[r][c] = True
  51. col_idx += colspan
  52. max_cols = max(max_cols, col_idx)
  53. return max_cols
  54. def calculate_row_columns(row):
  55. """
  56. 计算表格行的实际列数,考虑colspan属性
  57. Args:
  58. row: BeautifulSoup的tr元素对象
  59. Returns:
  60. int: 行的实际列数
  61. """
  62. cells = row.find_all(["td", "th"])
  63. column_count = 0
  64. for cell in cells:
  65. colspan = int(cell.get("colspan", 1))
  66. column_count += colspan
  67. return column_count
  68. def calculate_visual_columns(row):
  69. """
  70. 计算表格行的视觉列数(实际td/th单元格数量,不考虑colspan)
  71. Args:
  72. row: BeautifulSoup的tr元素对象
  73. Returns:
  74. int: 行的视觉列数(实际单元格数)
  75. """
  76. cells = row.find_all(["td", "th"])
  77. return len(cells)
  78. def detect_table_headers(soup1, soup2, max_header_rows=5):
  79. """
  80. 检测并比较两个表格的表头
  81. Args:
  82. soup1: 第一个表格的BeautifulSoup对象
  83. soup2: 第二个表格的BeautifulSoup对象
  84. max_header_rows: 最大可能的表头行数
  85. Returns:
  86. tuple: (表头行数, 表头是否一致, 表头文本列表)
  87. """
  88. rows1 = soup1.find_all("tr")
  89. rows2 = soup2.find_all("tr")
  90. min_rows = min(len(rows1), len(rows2), max_header_rows)
  91. header_rows = 0
  92. headers_match = True
  93. header_texts = []
  94. for i in range(min_rows):
  95. # 提取当前行的所有单元格
  96. cells1 = rows1[i].find_all(["td", "th"])
  97. cells2 = rows2[i].find_all(["td", "th"])
  98. # 检查两行的结构和内容是否一致
  99. structure_match = True
  100. # 首先检查单元格数量
  101. if len(cells1) != len(cells2):
  102. structure_match = False
  103. else:
  104. # 然后检查单元格的属性和内容
  105. for cell1, cell2 in zip(cells1, cells2):
  106. colspan1 = int(cell1.get("colspan", 1))
  107. rowspan1 = int(cell1.get("rowspan", 1))
  108. colspan2 = int(cell2.get("colspan", 1))
  109. rowspan2 = int(cell2.get("rowspan", 1))
  110. text1 = full_to_half(cell1.get_text().strip())
  111. text2 = full_to_half(cell2.get_text().strip())
  112. if colspan1 != colspan2 or rowspan1 != rowspan2 or text1 != text2:
  113. structure_match = False
  114. break
  115. if structure_match:
  116. header_rows += 1
  117. row_texts = [full_to_half(cell.get_text().strip()) for cell in cells1]
  118. header_texts.append(row_texts) # 添加表头文本
  119. else:
  120. headers_match = header_rows > 0 # 只有当至少匹配了一行时,才认为表头匹配
  121. break
  122. # 如果没有找到匹配的表头行,则返回失败
  123. if header_rows == 0:
  124. headers_match = False
  125. return header_rows, headers_match, header_texts
  126. def can_merge_tables(current_table_block, previous_table_block):
  127. """判断两个表格是否可以合并"""
  128. # 检查表格是否有caption和footnote
  129. if any(block["type"] == BlockType.TABLE_CAPTION for block in current_table_block["blocks"]):
  130. return False, None, None, None, None
  131. if any(block["type"] == BlockType.TABLE_FOOTNOTE for block in previous_table_block["blocks"]):
  132. return False, None, None, None, None
  133. # 获取两个表格的HTML内容
  134. current_html = ""
  135. previous_html = ""
  136. for block in current_table_block["blocks"]:
  137. if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
  138. current_html = block["lines"][0]["spans"][0].get("html", "")
  139. for block in previous_table_block["blocks"]:
  140. if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
  141. previous_html = block["lines"][0]["spans"][0].get("html", "")
  142. if not current_html or not previous_html:
  143. return False, None, None, None, None
  144. # 检查表格宽度差异
  145. x0_t1, y0_t1, x1_t1, y1_t1 = current_table_block["bbox"]
  146. x0_t2, y0_t2, x1_t2, y1_t2 = previous_table_block["bbox"]
  147. table1_width = x1_t1 - x0_t1
  148. table2_width = x1_t2 - x0_t2
  149. if abs(table1_width - table2_width) / min(table1_width, table2_width) >= 0.1:
  150. return False, None, None, None, None
  151. # 解析HTML并检查表格结构
  152. soup1 = BeautifulSoup(previous_html, "html.parser")
  153. soup2 = BeautifulSoup(current_html, "html.parser")
  154. # 检查整体列数匹配
  155. table_cols1 = calculate_table_total_columns(soup1)
  156. table_cols2 = calculate_table_total_columns(soup2)
  157. # logger.debug(f"Table columns - Previous: {table_cols1}, Current: {table_cols2}")
  158. tables_match = table_cols1 == table_cols2
  159. # 检查首末行列数匹配
  160. rows_match = check_rows_match(soup1, soup2)
  161. return (tables_match or rows_match), soup1, soup2, current_html, previous_html
  162. def check_rows_match(soup1, soup2):
  163. """检查表格行是否匹配"""
  164. rows1 = soup1.find_all("tr")
  165. rows2 = soup2.find_all("tr")
  166. if not (rows1 and rows2):
  167. return False
  168. # 获取第一个表的最后一行数据行
  169. last_row = None
  170. for row in reversed(rows1):
  171. if row.find_all(["td", "th"]):
  172. last_row = row
  173. break
  174. # 检测表头行数,以便获取第二个表的首个数据行
  175. header_count, _, _ = detect_table_headers(soup1, soup2)
  176. # 获取第二个表的首个数据行
  177. first_data_row = None
  178. if len(rows2) > header_count:
  179. first_data_row = rows2[header_count] # 第一个非表头行
  180. if not (last_row and first_data_row):
  181. return False
  182. # 计算实际列数(考虑colspan)和视觉列数
  183. last_row_cols = calculate_row_columns(last_row)
  184. first_row_cols = calculate_row_columns(first_data_row)
  185. last_row_visual_cols = calculate_visual_columns(last_row)
  186. first_row_visual_cols = calculate_visual_columns(first_data_row)
  187. # logger.debug(f"行列数 - 前表最后一行: {last_row_cols}(视觉列数:{last_row_visual_cols}), 当前表首行: {first_row_cols}(视觉列数:{first_row_visual_cols})")
  188. # 同时考虑实际列数匹配和视觉列数匹配
  189. return last_row_cols == first_row_cols or last_row_visual_cols == first_row_visual_cols
  190. def perform_table_merge(soup1, soup2, previous_table_block, wait_merge_table_footnotes):
  191. """执行表格合并操作"""
  192. # 检测表头有几行,并确认表头内容是否一致
  193. header_count, headers_match, header_texts = detect_table_headers(soup1, soup2)
  194. # logger.debug(f"检测到表头行数: {header_count}, 表头匹配: {headers_match}")
  195. # logger.debug(f"表头内容: {header_texts}")
  196. # 找到第一个表格的tbody,如果没有则查找table元素
  197. tbody1 = soup1.find("tbody") or soup1.find("table")
  198. # 找到第二个表格的tbody,如果没有则查找table元素
  199. tbody2 = soup2.find("tbody") or soup2.find("table")
  200. # 将第二个表格的行添加到第一个表格中
  201. if tbody1 and tbody2:
  202. rows2 = soup2.find_all("tr")
  203. # 将第二个表格的行添加到第一个表格中(跳过表头行)
  204. for row in rows2[header_count:]:
  205. # 从原来的位置移除行,并添加到第一个表格中
  206. row.extract()
  207. tbody1.append(row)
  208. # 添加待合并表格的footnote到前一个表格中
  209. for table_footnote in wait_merge_table_footnotes:
  210. temp_table_footnote = table_footnote.copy()
  211. temp_table_footnote[SplitFlag.CROSS_PAGE] = True
  212. previous_table_block["blocks"].append(temp_table_footnote)
  213. return str(soup1)
  214. def merge_table(page_info_list):
  215. """合并跨页表格"""
  216. # 倒序遍历每一页
  217. for page_idx in range(len(page_info_list) - 1, -1, -1):
  218. # 跳过第一页,因为它没有前一页
  219. if page_idx == 0:
  220. continue
  221. page_info = page_info_list[page_idx]
  222. previous_page_info = page_info_list[page_idx - 1]
  223. # 检查当前页是否有表格块
  224. if not (page_info["para_blocks"] and page_info["para_blocks"][0]["type"] == BlockType.TABLE):
  225. continue
  226. current_table_block = page_info["para_blocks"][0]
  227. # 检查上一页是否有表格块
  228. if not (previous_page_info["para_blocks"] and previous_page_info["para_blocks"][-1]["type"] == BlockType.TABLE):
  229. continue
  230. previous_table_block = previous_page_info["para_blocks"][-1]
  231. # 收集待合并表格的footnote
  232. wait_merge_table_footnotes = [
  233. block for block in current_table_block["blocks"]
  234. if block["type"] == BlockType.TABLE_FOOTNOTE
  235. ]
  236. # 检查两个表格是否可以合并
  237. can_merge, soup1, soup2, current_html, previous_html = can_merge_tables(
  238. current_table_block, previous_table_block
  239. )
  240. if not can_merge:
  241. continue
  242. # 执行表格合并
  243. merged_html = perform_table_merge(
  244. soup1, soup2, previous_table_block, wait_merge_table_footnotes
  245. )
  246. # 更新previous_table_block的html
  247. for block in previous_table_block["blocks"]:
  248. if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
  249. block["lines"][0]["spans"][0]["html"] = merged_html
  250. break
  251. # 删除当前页的table
  252. for block in current_table_block["blocks"]:
  253. block['lines'] = []
  254. block[SplitFlag.LINES_DELETED] = True