# Copyright (c) Opendatalab. All rights reserved. from loguru import logger from bs4 import BeautifulSoup from mineru.utils.enum_class import BlockType, SplitFlag def full_to_half(text: str) -> str: """Convert full-width characters to half-width characters using code point manipulation. Args: text: String containing full-width characters Returns: String with full-width characters converted to half-width """ result = [] for char in text: code = ord(char) # Full-width letters, numbers and punctuation (FF01-FF5E) if 0xFF01 <= code <= 0xFF5E: result.append(chr(code - 0xFEE0)) # Shift to ASCII range else: result.append(char) return ''.join(result) def calculate_table_total_columns(soup): """计算表格的总列数,通过分析整个表格结构来处理rowspan和colspan Args: soup: BeautifulSoup解析的表格 Returns: int: 表格的总列数 """ rows = soup.find_all("tr") if not rows: return 0 # 创建一个矩阵来跟踪每个位置的占用情况 max_cols = 0 occupied = {} # {row_idx: {col_idx: True}} for row_idx, row in enumerate(rows): col_idx = 0 cells = row.find_all(["td", "th"]) if row_idx not in occupied: occupied[row_idx] = {} for cell in cells: # 找到下一个未被占用的列位置 while col_idx in occupied[row_idx]: col_idx += 1 colspan = int(cell.get("colspan", 1)) rowspan = int(cell.get("rowspan", 1)) # 标记被这个单元格占用的所有位置 for r in range(row_idx, row_idx + rowspan): if r not in occupied: occupied[r] = {} for c in range(col_idx, col_idx + colspan): occupied[r][c] = True col_idx += colspan max_cols = max(max_cols, col_idx) return max_cols def calculate_row_columns(row): """ 计算表格行的实际列数,考虑colspan属性 Args: row: BeautifulSoup的tr元素对象 Returns: int: 行的实际列数 """ cells = row.find_all(["td", "th"]) column_count = 0 for cell in cells: colspan = int(cell.get("colspan", 1)) column_count += colspan return column_count def calculate_visual_columns(row): """ 计算表格行的视觉列数(实际td/th单元格数量,不考虑colspan) Args: row: BeautifulSoup的tr元素对象 Returns: int: 行的视觉列数(实际单元格数) """ cells = row.find_all(["td", "th"]) return len(cells) def detect_table_headers(soup1, soup2, max_header_rows=5): """ 检测并比较两个表格的表头 Args: soup1: 第一个表格的BeautifulSoup对象 soup2: 第二个表格的BeautifulSoup对象 max_header_rows: 最大可能的表头行数 Returns: tuple: (表头行数, 表头是否一致, 表头文本列表) """ rows1 = soup1.find_all("tr") rows2 = soup2.find_all("tr") min_rows = min(len(rows1), len(rows2), max_header_rows) header_rows = 0 headers_match = True header_texts = [] for i in range(min_rows): # 提取当前行的所有单元格 cells1 = rows1[i].find_all(["td", "th"]) cells2 = rows2[i].find_all(["td", "th"]) # 检查两行的结构和内容是否一致 structure_match = True # 首先检查单元格数量 if len(cells1) != len(cells2): structure_match = False else: # 然后检查单元格的属性和内容 for cell1, cell2 in zip(cells1, cells2): colspan1 = int(cell1.get("colspan", 1)) rowspan1 = int(cell1.get("rowspan", 1)) colspan2 = int(cell2.get("colspan", 1)) rowspan2 = int(cell2.get("rowspan", 1)) text1 = full_to_half(cell1.get_text().strip()) text2 = full_to_half(cell2.get_text().strip()) if colspan1 != colspan2 or rowspan1 != rowspan2 or text1 != text2: structure_match = False break if structure_match: header_rows += 1 row_texts = [full_to_half(cell.get_text().strip()) for cell in cells1] header_texts.append(row_texts) # 添加表头文本 else: headers_match = header_rows > 0 # 只有当至少匹配了一行时,才认为表头匹配 break # 如果没有找到匹配的表头行,则返回失败 if header_rows == 0: headers_match = False return header_rows, headers_match, header_texts def can_merge_tables(current_table_block, previous_table_block): """判断两个表格是否可以合并""" # 检查表格是否有caption和footnote if any(block["type"] == BlockType.TABLE_CAPTION for block in current_table_block["blocks"]): return False, None, None, None, None if any(block["type"] == BlockType.TABLE_FOOTNOTE for block in previous_table_block["blocks"]): return False, None, None, None, None # 获取两个表格的HTML内容 current_html = "" previous_html = "" for block in current_table_block["blocks"]: if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]): current_html = block["lines"][0]["spans"][0].get("html", "") for block in previous_table_block["blocks"]: if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]): previous_html = block["lines"][0]["spans"][0].get("html", "") if not current_html or not previous_html: return False, None, None, None, None # 检查表格宽度差异 x0_t1, y0_t1, x1_t1, y1_t1 = current_table_block["bbox"] x0_t2, y0_t2, x1_t2, y1_t2 = previous_table_block["bbox"] table1_width = x1_t1 - x0_t1 table2_width = x1_t2 - x0_t2 if abs(table1_width - table2_width) / min(table1_width, table2_width) >= 0.1: return False, None, None, None, None # 解析HTML并检查表格结构 soup1 = BeautifulSoup(previous_html, "html.parser") soup2 = BeautifulSoup(current_html, "html.parser") # 检查整体列数匹配 table_cols1 = calculate_table_total_columns(soup1) table_cols2 = calculate_table_total_columns(soup2) # logger.debug(f"Table columns - Previous: {table_cols1}, Current: {table_cols2}") tables_match = table_cols1 == table_cols2 # 检查首末行列数匹配 rows_match = check_rows_match(soup1, soup2) return (tables_match or rows_match), soup1, soup2, current_html, previous_html def check_rows_match(soup1, soup2): """检查表格行是否匹配""" rows1 = soup1.find_all("tr") rows2 = soup2.find_all("tr") if not (rows1 and rows2): return False # 获取第一个表的最后一行数据行 last_row = None for row in reversed(rows1): if row.find_all(["td", "th"]): last_row = row break # 检测表头行数,以便获取第二个表的首个数据行 header_count, _, _ = detect_table_headers(soup1, soup2) # 获取第二个表的首个数据行 first_data_row = None if len(rows2) > header_count: first_data_row = rows2[header_count] # 第一个非表头行 if not (last_row and first_data_row): return False # 计算实际列数(考虑colspan)和视觉列数 last_row_cols = calculate_row_columns(last_row) first_row_cols = calculate_row_columns(first_data_row) last_row_visual_cols = calculate_visual_columns(last_row) first_row_visual_cols = calculate_visual_columns(first_data_row) # logger.debug(f"行列数 - 前表最后一行: {last_row_cols}(视觉列数:{last_row_visual_cols}), 当前表首行: {first_row_cols}(视觉列数:{first_row_visual_cols})") # 同时考虑实际列数匹配和视觉列数匹配 return last_row_cols == first_row_cols or last_row_visual_cols == first_row_visual_cols def perform_table_merge(soup1, soup2, previous_table_block, wait_merge_table_footnotes): """执行表格合并操作""" # 检测表头有几行,并确认表头内容是否一致 header_count, headers_match, header_texts = detect_table_headers(soup1, soup2) # logger.debug(f"检测到表头行数: {header_count}, 表头匹配: {headers_match}") # logger.debug(f"表头内容: {header_texts}") # 找到第一个表格的tbody,如果没有则查找table元素 tbody1 = soup1.find("tbody") or soup1.find("table") # 找到第二个表格的tbody,如果没有则查找table元素 tbody2 = soup2.find("tbody") or soup2.find("table") # 将第二个表格的行添加到第一个表格中 if tbody1 and tbody2: rows2 = soup2.find_all("tr") # 将第二个表格的行添加到第一个表格中(跳过表头行) for row in rows2[header_count:]: # 从原来的位置移除行,并添加到第一个表格中 row.extract() tbody1.append(row) # 添加待合并表格的footnote到前一个表格中 for table_footnote in wait_merge_table_footnotes: temp_table_footnote = table_footnote.copy() temp_table_footnote[SplitFlag.CROSS_PAGE] = True previous_table_block["blocks"].append(temp_table_footnote) return str(soup1) def merge_table(page_info_list): """合并跨页表格""" # 倒序遍历每一页 for page_idx in range(len(page_info_list) - 1, -1, -1): # 跳过第一页,因为它没有前一页 if page_idx == 0: continue page_info = page_info_list[page_idx] previous_page_info = page_info_list[page_idx - 1] # 检查当前页是否有表格块 if not (page_info["para_blocks"] and page_info["para_blocks"][0]["type"] == BlockType.TABLE): continue current_table_block = page_info["para_blocks"][0] # 检查上一页是否有表格块 if not (previous_page_info["para_blocks"] and previous_page_info["para_blocks"][-1]["type"] == BlockType.TABLE): continue previous_table_block = previous_page_info["para_blocks"][-1] # 收集待合并表格的footnote wait_merge_table_footnotes = [ block for block in current_table_block["blocks"] if block["type"] == BlockType.TABLE_FOOTNOTE ] # 检查两个表格是否可以合并 can_merge, soup1, soup2, current_html, previous_html = can_merge_tables( current_table_block, previous_table_block ) if not can_merge: continue # 执行表格合并 merged_html = perform_table_merge( soup1, soup2, previous_table_block, wait_merge_table_footnotes ) # 更新previous_table_block的html for block in previous_table_block["blocks"]: if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]): block["lines"][0]["spans"][0]["html"] = merged_html break # 删除当前页的table for block in current_table_block["blocks"]: block['lines'] = [] block[SplitFlag.LINES_DELETED] = True