table_merge.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. # Copyright (c) Opendatalab. All rights reserved.
  2. from loguru import logger
  3. from bs4 import BeautifulSoup
  4. from mineru.backend.vlm.vlm_middle_json_mkcontent import merge_para_with_text
  5. from mineru.utils.enum_class import BlockType, SplitFlag
  6. def full_to_half(text: str) -> str:
  7. """Convert full-width characters to half-width characters using code point manipulation.
  8. Args:
  9. text: String containing full-width characters
  10. Returns:
  11. String with full-width characters converted to half-width
  12. """
  13. result = []
  14. for char in text:
  15. code = ord(char)
  16. # Full-width letters, numbers and punctuation (FF01-FF5E)
  17. if 0xFF01 <= code <= 0xFF5E:
  18. result.append(chr(code - 0xFEE0)) # Shift to ASCII range
  19. else:
  20. result.append(char)
  21. return ''.join(result)
  22. def calculate_table_total_columns(soup):
  23. """计算表格的总列数,通过分析整个表格结构来处理rowspan和colspan
  24. Args:
  25. soup: BeautifulSoup解析的表格
  26. Returns:
  27. int: 表格的总列数
  28. """
  29. rows = soup.find_all("tr")
  30. if not rows:
  31. return 0
  32. # 创建一个矩阵来跟踪每个位置的占用情况
  33. max_cols = 0
  34. occupied = {} # {row_idx: {col_idx: True}}
  35. for row_idx, row in enumerate(rows):
  36. col_idx = 0
  37. cells = row.find_all(["td", "th"])
  38. if row_idx not in occupied:
  39. occupied[row_idx] = {}
  40. for cell in cells:
  41. # 找到下一个未被占用的列位置
  42. while col_idx in occupied[row_idx]:
  43. col_idx += 1
  44. colspan = int(cell.get("colspan", 1))
  45. rowspan = int(cell.get("rowspan", 1))
  46. # 标记被这个单元格占用的所有位置
  47. for r in range(row_idx, row_idx + rowspan):
  48. if r not in occupied:
  49. occupied[r] = {}
  50. for c in range(col_idx, col_idx + colspan):
  51. occupied[r][c] = True
  52. col_idx += colspan
  53. max_cols = max(max_cols, col_idx)
  54. return max_cols
  55. def calculate_row_columns(row):
  56. """
  57. 计算表格行的实际列数,考虑colspan属性
  58. Args:
  59. row: BeautifulSoup的tr元素对象
  60. Returns:
  61. int: 行的实际列数
  62. """
  63. cells = row.find_all(["td", "th"])
  64. column_count = 0
  65. for cell in cells:
  66. colspan = int(cell.get("colspan", 1))
  67. column_count += colspan
  68. return column_count
  69. def calculate_visual_columns(row):
  70. """
  71. 计算表格行的视觉列数(实际td/th单元格数量,不考虑colspan)
  72. Args:
  73. row: BeautifulSoup的tr元素对象
  74. Returns:
  75. int: 行的视觉列数(实际单元格数)
  76. """
  77. cells = row.find_all(["td", "th"])
  78. return len(cells)
  79. def detect_table_headers(soup1, soup2, max_header_rows=5):
  80. """
  81. 检测并比较两个表格的表头
  82. Args:
  83. soup1: 第一个表格的BeautifulSoup对象
  84. soup2: 第二个表格的BeautifulSoup对象
  85. max_header_rows: 最大可能的表头行数
  86. Returns:
  87. tuple: (表头行数, 表头是否一致, 表头文本列表)
  88. """
  89. rows1 = soup1.find_all("tr")
  90. rows2 = soup2.find_all("tr")
  91. min_rows = min(len(rows1), len(rows2), max_header_rows)
  92. header_rows = 0
  93. headers_match = True
  94. header_texts = []
  95. for i in range(min_rows):
  96. # 提取当前行的所有单元格
  97. cells1 = rows1[i].find_all(["td", "th"])
  98. cells2 = rows2[i].find_all(["td", "th"])
  99. # 检查两行的结构和内容是否一致
  100. structure_match = True
  101. # 首先检查单元格数量
  102. if len(cells1) != len(cells2):
  103. structure_match = False
  104. else:
  105. # 然后检查单元格的属性和内容
  106. for cell1, cell2 in zip(cells1, cells2):
  107. colspan1 = int(cell1.get("colspan", 1))
  108. rowspan1 = int(cell1.get("rowspan", 1))
  109. colspan2 = int(cell2.get("colspan", 1))
  110. rowspan2 = int(cell2.get("rowspan", 1))
  111. text1 = full_to_half(cell1.get_text().strip())
  112. text2 = full_to_half(cell2.get_text().strip())
  113. if colspan1 != colspan2 or rowspan1 != rowspan2 or text1 != text2:
  114. structure_match = False
  115. break
  116. if structure_match:
  117. header_rows += 1
  118. row_texts = [full_to_half(cell.get_text().strip()) for cell in cells1]
  119. header_texts.append(row_texts) # 添加表头文本
  120. else:
  121. headers_match = header_rows > 0 # 只有当至少匹配了一行时,才认为表头匹配
  122. break
  123. # 如果没有找到匹配的表头行,则返回失败
  124. if header_rows == 0:
  125. headers_match = False
  126. return header_rows, headers_match, header_texts
  127. def can_merge_tables(current_table_block, previous_table_block):
  128. """判断两个表格是否可以合并"""
  129. # 检查表格是否有caption和footnote
  130. # if any(block["type"] == BlockType.TABLE_CAPTION for block in current_table_block["blocks"]):
  131. # return False, None, None, None, None
  132. # current_table_block["blocks"]中有任何TABLE_CAPTION类型的块,且任意caption块内不以"(续)"结尾,则不合并
  133. if any(block["type"] == BlockType.TABLE_CAPTION and not full_to_half(merge_para_with_text(block).strip()).endswith("(续)") for block in current_table_block["blocks"]):
  134. return False, None, None, None, None
  135. if any(block["type"] == BlockType.TABLE_FOOTNOTE for block in previous_table_block["blocks"]):
  136. return False, None, None, None, None
  137. # 获取两个表格的HTML内容
  138. current_html = ""
  139. previous_html = ""
  140. for block in current_table_block["blocks"]:
  141. if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
  142. current_html = block["lines"][0]["spans"][0].get("html", "")
  143. for block in previous_table_block["blocks"]:
  144. if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
  145. previous_html = block["lines"][0]["spans"][0].get("html", "")
  146. if not current_html or not previous_html:
  147. return False, None, None, None, None
  148. # 检查表格宽度差异
  149. x0_t1, y0_t1, x1_t1, y1_t1 = current_table_block["bbox"]
  150. x0_t2, y0_t2, x1_t2, y1_t2 = previous_table_block["bbox"]
  151. table1_width = x1_t1 - x0_t1
  152. table2_width = x1_t2 - x0_t2
  153. if abs(table1_width - table2_width) / min(table1_width, table2_width) >= 0.1:
  154. return False, None, None, None, None
  155. # 解析HTML并检查表格结构
  156. soup1 = BeautifulSoup(previous_html, "html.parser")
  157. soup2 = BeautifulSoup(current_html, "html.parser")
  158. # 检查整体列数匹配
  159. table_cols1 = calculate_table_total_columns(soup1)
  160. table_cols2 = calculate_table_total_columns(soup2)
  161. # logger.debug(f"Table columns - Previous: {table_cols1}, Current: {table_cols2}")
  162. tables_match = table_cols1 == table_cols2
  163. # 检查首末行列数匹配
  164. rows_match = check_rows_match(soup1, soup2)
  165. return (tables_match or rows_match), soup1, soup2, current_html, previous_html
  166. def check_rows_match(soup1, soup2):
  167. """检查表格行是否匹配"""
  168. rows1 = soup1.find_all("tr")
  169. rows2 = soup2.find_all("tr")
  170. if not (rows1 and rows2):
  171. return False
  172. # 获取第一个表的最后一行数据行
  173. last_row = None
  174. for row in reversed(rows1):
  175. if row.find_all(["td", "th"]):
  176. last_row = row
  177. break
  178. # 检测表头行数,以便获取第二个表的首个数据行
  179. header_count, _, _ = detect_table_headers(soup1, soup2)
  180. # 获取第二个表的首个数据行
  181. first_data_row = None
  182. if len(rows2) > header_count:
  183. first_data_row = rows2[header_count] # 第一个非表头行
  184. if not (last_row and first_data_row):
  185. return False
  186. # 计算实际列数(考虑colspan)和视觉列数
  187. last_row_cols = calculate_row_columns(last_row)
  188. first_row_cols = calculate_row_columns(first_data_row)
  189. last_row_visual_cols = calculate_visual_columns(last_row)
  190. first_row_visual_cols = calculate_visual_columns(first_data_row)
  191. # logger.debug(f"行列数 - 前表最后一行: {last_row_cols}(视觉列数:{last_row_visual_cols}), 当前表首行: {first_row_cols}(视觉列数:{first_row_visual_cols})")
  192. # 同时考虑实际列数匹配和视觉列数匹配
  193. return last_row_cols == first_row_cols or last_row_visual_cols == first_row_visual_cols
  194. def perform_table_merge(soup1, soup2, previous_table_block, wait_merge_table_footnotes):
  195. """执行表格合并操作"""
  196. # 检测表头有几行,并确认表头内容是否一致
  197. header_count, headers_match, header_texts = detect_table_headers(soup1, soup2)
  198. # logger.debug(f"检测到表头行数: {header_count}, 表头匹配: {headers_match}")
  199. # logger.debug(f"表头内容: {header_texts}")
  200. # 找到第一个表格的tbody,如果没有则查找table元素
  201. tbody1 = soup1.find("tbody") or soup1.find("table")
  202. # 获取表1和表2的所有行
  203. rows1 = soup1.find_all("tr")
  204. rows2 = soup2.find_all("tr")
  205. if rows1 and rows2 and header_count < len(rows2):
  206. # 获取表1最后一行
  207. last_row1 = rows1[-1]
  208. # 获取表2第一个非表头行
  209. first_data_row2 = rows2[header_count]
  210. # 分析两行的colspan结构
  211. last_row1_structure = []
  212. has_colspan_last_row1 = False
  213. for cell in last_row1.find_all(["td", "th"]):
  214. colspan = int(cell.get("colspan", 1))
  215. last_row1_structure.append(colspan)
  216. if colspan > 1:
  217. has_colspan_last_row1 = True
  218. first_row2_structure = []
  219. has_colspan_first_row2 = False
  220. for cell in first_data_row2.find_all(["td", "th"]):
  221. colspan = int(cell.get("colspan", 1))
  222. first_row2_structure.append(colspan)
  223. if colspan > 1:
  224. has_colspan_first_row2 = True
  225. # 确定基准结构(优先使用有colspan的行)
  226. if has_colspan_last_row1:
  227. reference_structure = last_row1_structure
  228. reference_visual_cols = calculate_visual_columns(last_row1)
  229. elif has_colspan_first_row2:
  230. reference_structure = first_row2_structure
  231. reference_visual_cols = calculate_visual_columns(first_data_row2)
  232. else:
  233. # 都没有colspan时使用表1最后一行作为默认基准
  234. reference_structure = last_row1_structure
  235. reference_visual_cols = calculate_visual_columns(last_row1)
  236. # 如果表1最后一行没有colspan但表2首行有,则调整表1相关行
  237. if not has_colspan_last_row1 and has_colspan_first_row2:
  238. # 找到表1中所有具有相同视觉列数的行
  239. rows_to_adjust = []
  240. for i in range(len(rows1) - 1, -1, -1):
  241. if calculate_visual_columns(rows1[i]) == reference_visual_cols:
  242. rows_to_adjust.append(rows1[i])
  243. else:
  244. break
  245. # 应用参考结构到这些行
  246. for row in rows_to_adjust:
  247. cells = row.find_all(["td", "th"])
  248. if cells and len(cells) <= len(reference_structure):
  249. for j, cell in enumerate(cells):
  250. if j < len(reference_structure) and reference_structure[j] > 1:
  251. cell["colspan"] = str(reference_structure[j])
  252. # 如果表2首行没有colspan但表1最后一行有,则调整表2相关行
  253. elif has_colspan_last_row1 and not has_colspan_first_row2:
  254. # 调整表2中所有具有相同视觉列数的行
  255. for i in range(header_count, len(rows2)):
  256. row = rows2[i]
  257. if calculate_visual_columns(row) == reference_visual_cols:
  258. cells = row.find_all(["td", "th"])
  259. if cells and len(cells) <= len(reference_structure):
  260. for j, cell in enumerate(cells):
  261. if j < len(reference_structure) and reference_structure[j] > 1:
  262. cell["colspan"] = str(reference_structure[j])
  263. # 将第二个表格的行添加到第一个表格中
  264. if tbody1:
  265. tbody2 = soup2.find("tbody") or soup2.find("table")
  266. if tbody2:
  267. # 将第二个表格的行添加到第一个表格中(跳过表头行)
  268. for row in rows2[header_count:]:
  269. row.extract()
  270. tbody1.append(row)
  271. # 添加待合并表格的footnote到前一个表格中
  272. for table_footnote in wait_merge_table_footnotes:
  273. temp_table_footnote = table_footnote.copy()
  274. temp_table_footnote[SplitFlag.CROSS_PAGE] = True
  275. previous_table_block["blocks"].append(temp_table_footnote)
  276. return str(soup1)
  277. def merge_table(page_info_list):
  278. """合并跨页表格"""
  279. # 倒序遍历每一页
  280. for page_idx in range(len(page_info_list) - 1, -1, -1):
  281. # 跳过第一页,因为它没有前一页
  282. if page_idx == 0:
  283. continue
  284. page_info = page_info_list[page_idx]
  285. previous_page_info = page_info_list[page_idx - 1]
  286. # 检查当前页是否有表格块
  287. if not (page_info["para_blocks"] and page_info["para_blocks"][0]["type"] == BlockType.TABLE):
  288. continue
  289. current_table_block = page_info["para_blocks"][0]
  290. # 检查上一页是否有表格块
  291. if not (previous_page_info["para_blocks"] and previous_page_info["para_blocks"][-1]["type"] == BlockType.TABLE):
  292. continue
  293. previous_table_block = previous_page_info["para_blocks"][-1]
  294. # 收集待合并表格的footnote
  295. wait_merge_table_footnotes = [
  296. block for block in current_table_block["blocks"]
  297. if block["type"] == BlockType.TABLE_FOOTNOTE
  298. ]
  299. # 检查两个表格是否可以合并
  300. can_merge, soup1, soup2, current_html, previous_html = can_merge_tables(
  301. current_table_block, previous_table_block
  302. )
  303. if not can_merge:
  304. continue
  305. # 执行表格合并
  306. merged_html = perform_table_merge(
  307. soup1, soup2, previous_table_block, wait_merge_table_footnotes
  308. )
  309. # 更新previous_table_block的html
  310. for block in previous_table_block["blocks"]:
  311. if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
  312. block["lines"][0]["spans"][0]["html"] = merged_html
  313. break
  314. # 删除当前页的table
  315. for block in current_table_block["blocks"]:
  316. block['lines'] = []
  317. block[SplitFlag.LINES_DELETED] = True