| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- # Copyright (c) Opendatalab. All rights reserved.
- from loguru import logger
- from bs4 import BeautifulSoup
- from mineru.utils.enum_class import BlockType, SplitFlag
- def full_to_half(text: str) -> str:
- """Convert full-width characters to half-width characters using code point manipulation.
- Args:
- text: String containing full-width characters
- Returns:
- String with full-width characters converted to half-width
- """
- result = []
- for char in text:
- code = ord(char)
- # Full-width letters, numbers and punctuation (FF01-FF5E)
- if 0xFF01 <= code <= 0xFF5E:
- result.append(chr(code - 0xFEE0)) # Shift to ASCII range
- else:
- result.append(char)
- return ''.join(result)
- def calculate_table_total_columns(soup):
- """计算表格的总列数,通过分析整个表格结构来处理rowspan和colspan
- Args:
- soup: BeautifulSoup解析的表格
- Returns:
- int: 表格的总列数
- """
- rows = soup.find_all("tr")
- if not rows:
- return 0
- # 创建一个矩阵来跟踪每个位置的占用情况
- max_cols = 0
- occupied = {} # {row_idx: {col_idx: True}}
- for row_idx, row in enumerate(rows):
- col_idx = 0
- cells = row.find_all(["td", "th"])
- if row_idx not in occupied:
- occupied[row_idx] = {}
- for cell in cells:
- # 找到下一个未被占用的列位置
- while col_idx in occupied[row_idx]:
- col_idx += 1
- colspan = int(cell.get("colspan", 1))
- rowspan = int(cell.get("rowspan", 1))
- # 标记被这个单元格占用的所有位置
- for r in range(row_idx, row_idx + rowspan):
- if r not in occupied:
- occupied[r] = {}
- for c in range(col_idx, col_idx + colspan):
- occupied[r][c] = True
- col_idx += colspan
- max_cols = max(max_cols, col_idx)
- return max_cols
- def calculate_row_columns(row):
- """
- 计算表格行的实际列数,考虑colspan属性
- Args:
- row: BeautifulSoup的tr元素对象
- Returns:
- int: 行的实际列数
- """
- cells = row.find_all(["td", "th"])
- column_count = 0
- for cell in cells:
- colspan = int(cell.get("colspan", 1))
- column_count += colspan
- return column_count
- def calculate_visual_columns(row):
- """
- 计算表格行的视觉列数(实际td/th单元格数量,不考虑colspan)
- Args:
- row: BeautifulSoup的tr元素对象
- Returns:
- int: 行的视觉列数(实际单元格数)
- """
- cells = row.find_all(["td", "th"])
- return len(cells)
- def detect_table_headers(soup1, soup2, max_header_rows=5):
- """
- 检测并比较两个表格的表头
- Args:
- soup1: 第一个表格的BeautifulSoup对象
- soup2: 第二个表格的BeautifulSoup对象
- max_header_rows: 最大可能的表头行数
- Returns:
- tuple: (表头行数, 表头是否一致, 表头文本列表)
- """
- rows1 = soup1.find_all("tr")
- rows2 = soup2.find_all("tr")
- min_rows = min(len(rows1), len(rows2), max_header_rows)
- header_rows = 0
- headers_match = True
- header_texts = []
- for i in range(min_rows):
- # 提取当前行的所有单元格
- cells1 = rows1[i].find_all(["td", "th"])
- cells2 = rows2[i].find_all(["td", "th"])
- # 检查两行的结构和内容是否一致
- structure_match = True
- # 首先检查单元格数量
- if len(cells1) != len(cells2):
- structure_match = False
- else:
- # 然后检查单元格的属性和内容
- for cell1, cell2 in zip(cells1, cells2):
- colspan1 = int(cell1.get("colspan", 1))
- rowspan1 = int(cell1.get("rowspan", 1))
- colspan2 = int(cell2.get("colspan", 1))
- rowspan2 = int(cell2.get("rowspan", 1))
- text1 = full_to_half(cell1.get_text().strip())
- text2 = full_to_half(cell2.get_text().strip())
- if colspan1 != colspan2 or rowspan1 != rowspan2 or text1 != text2:
- structure_match = False
- break
- if structure_match:
- header_rows += 1
- row_texts = [full_to_half(cell.get_text().strip()) for cell in cells1]
- header_texts.append(row_texts) # 添加表头文本
- else:
- headers_match = header_rows > 0 # 只有当至少匹配了一行时,才认为表头匹配
- break
- # 如果没有找到匹配的表头行,则返回失败
- if header_rows == 0:
- headers_match = False
- return header_rows, headers_match, header_texts
- def can_merge_tables(current_table_block, previous_table_block):
- """判断两个表格是否可以合并"""
- # 检查表格是否有caption和footnote
- if any(block["type"] == BlockType.TABLE_CAPTION for block in current_table_block["blocks"]):
- return False, None, None, None, None
- if any(block["type"] == BlockType.TABLE_FOOTNOTE for block in previous_table_block["blocks"]):
- return False, None, None, None, None
- # 获取两个表格的HTML内容
- current_html = ""
- previous_html = ""
- for block in current_table_block["blocks"]:
- if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
- current_html = block["lines"][0]["spans"][0].get("html", "")
- for block in previous_table_block["blocks"]:
- if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
- previous_html = block["lines"][0]["spans"][0].get("html", "")
- if not current_html or not previous_html:
- return False, None, None, None, None
- # 检查表格宽度差异
- x0_t1, y0_t1, x1_t1, y1_t1 = current_table_block["bbox"]
- x0_t2, y0_t2, x1_t2, y1_t2 = previous_table_block["bbox"]
- table1_width = x1_t1 - x0_t1
- table2_width = x1_t2 - x0_t2
- if abs(table1_width - table2_width) / min(table1_width, table2_width) >= 0.1:
- return False, None, None, None, None
- # 解析HTML并检查表格结构
- soup1 = BeautifulSoup(previous_html, "html.parser")
- soup2 = BeautifulSoup(current_html, "html.parser")
- # 检查整体列数匹配
- table_cols1 = calculate_table_total_columns(soup1)
- table_cols2 = calculate_table_total_columns(soup2)
- # logger.debug(f"Table columns - Previous: {table_cols1}, Current: {table_cols2}")
- tables_match = table_cols1 == table_cols2
- # 检查首末行列数匹配
- rows_match = check_rows_match(soup1, soup2)
- return (tables_match or rows_match), soup1, soup2, current_html, previous_html
- def check_rows_match(soup1, soup2):
- """检查表格行是否匹配"""
- rows1 = soup1.find_all("tr")
- rows2 = soup2.find_all("tr")
- if not (rows1 and rows2):
- return False
- # 获取第一个表的最后一行数据行
- last_row = None
- for row in reversed(rows1):
- if row.find_all(["td", "th"]):
- last_row = row
- break
- # 检测表头行数,以便获取第二个表的首个数据行
- header_count, _, _ = detect_table_headers(soup1, soup2)
- # 获取第二个表的首个数据行
- first_data_row = None
- if len(rows2) > header_count:
- first_data_row = rows2[header_count] # 第一个非表头行
- if not (last_row and first_data_row):
- return False
- # 计算实际列数(考虑colspan)和视觉列数
- last_row_cols = calculate_row_columns(last_row)
- first_row_cols = calculate_row_columns(first_data_row)
- last_row_visual_cols = calculate_visual_columns(last_row)
- first_row_visual_cols = calculate_visual_columns(first_data_row)
- # logger.debug(f"行列数 - 前表最后一行: {last_row_cols}(视觉列数:{last_row_visual_cols}), 当前表首行: {first_row_cols}(视觉列数:{first_row_visual_cols})")
- # 同时考虑实际列数匹配和视觉列数匹配
- return last_row_cols == first_row_cols or last_row_visual_cols == first_row_visual_cols
- def perform_table_merge(soup1, soup2, previous_table_block, wait_merge_table_footnotes):
- """执行表格合并操作"""
- # 检测表头有几行,并确认表头内容是否一致
- header_count, headers_match, header_texts = detect_table_headers(soup1, soup2)
- # logger.debug(f"检测到表头行数: {header_count}, 表头匹配: {headers_match}")
- # logger.debug(f"表头内容: {header_texts}")
- # 找到第一个表格的tbody,如果没有则查找table元素
- tbody1 = soup1.find("tbody") or soup1.find("table")
- # 找到第二个表格的tbody,如果没有则查找table元素
- tbody2 = soup2.find("tbody") or soup2.find("table")
- # 将第二个表格的行添加到第一个表格中
- if tbody1 and tbody2:
- rows2 = soup2.find_all("tr")
- # 将第二个表格的行添加到第一个表格中(跳过表头行)
- for row in rows2[header_count:]:
- # 从原来的位置移除行,并添加到第一个表格中
- row.extract()
- tbody1.append(row)
- # 添加待合并表格的footnote到前一个表格中
- for table_footnote in wait_merge_table_footnotes:
- temp_table_footnote = table_footnote.copy()
- temp_table_footnote[SplitFlag.CROSS_PAGE] = True
- previous_table_block["blocks"].append(temp_table_footnote)
- return str(soup1)
- def merge_table(page_info_list):
- """合并跨页表格"""
- # 倒序遍历每一页
- for page_idx in range(len(page_info_list) - 1, -1, -1):
- # 跳过第一页,因为它没有前一页
- if page_idx == 0:
- continue
- page_info = page_info_list[page_idx]
- previous_page_info = page_info_list[page_idx - 1]
- # 检查当前页是否有表格块
- if not (page_info["para_blocks"] and page_info["para_blocks"][0]["type"] == BlockType.TABLE):
- continue
- current_table_block = page_info["para_blocks"][0]
- # 检查上一页是否有表格块
- if not (previous_page_info["para_blocks"] and previous_page_info["para_blocks"][-1]["type"] == BlockType.TABLE):
- continue
- previous_table_block = previous_page_info["para_blocks"][-1]
- # 收集待合并表格的footnote
- wait_merge_table_footnotes = [
- block for block in current_table_block["blocks"]
- if block["type"] == BlockType.TABLE_FOOTNOTE
- ]
- # 检查两个表格是否可以合并
- can_merge, soup1, soup2, current_html, previous_html = can_merge_tables(
- current_table_block, previous_table_block
- )
- if not can_merge:
- continue
- # 执行表格合并
- merged_html = perform_table_merge(
- soup1, soup2, previous_table_block, wait_merge_table_footnotes
- )
- # 更新previous_table_block的html
- for block in previous_table_block["blocks"]:
- if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
- block["lines"][0]["spans"][0]["html"] = merged_html
- break
- # 删除当前页的table
- for block in current_table_block["blocks"]:
- block['lines'] = []
- block[SplitFlag.LINES_DELETED] = True
|