""" 合并 MinerU 和 PaddleOCR 的结果 使用 MinerU 的表格结构识别 + PaddleOCR 的文字框坐标 """ import json import re import argparse from pathlib import Path from typing import List, Dict, Tuple, Optional from bs4 import BeautifulSoup from fuzzywuzzy import fuzz import shutil class MinerUPaddleOCRMerger: """合并 MinerU 和 PaddleOCR 的结果""" def __init__(self, look_ahead_window: int = 10, similarity_threshold: int = 90): """ Args: look_ahead_window: 向前查找的窗口大小 similarity_threshold: 文本相似度阈值 """ self.look_ahead_window = look_ahead_window self.similarity_threshold = similarity_threshold def merge_table_with_bbox(self, mineru_json_path: str, paddle_json_path: str) -> List[Dict]: """ 合并 MinerU 和 PaddleOCR 的结果 Args: mineru_json_path: MinerU 输出的 JSON 路径 paddle_json_path: PaddleOCR 输出的 JSON 路径 output_path: 输出路径(可选) Returns: 合并后的结果字典 """ merged_data = None # 加载数据 with open(mineru_json_path, 'r', encoding='utf-8') as f: mineru_data = json.load(f) with open(paddle_json_path, 'r', encoding='utf-8') as f: paddle_data = json.load(f) # 提取 PaddleOCR 的文字框信息 paddle_text_boxes = self._extract_paddle_text_boxes(paddle_data) # 处理 MinerU 的数据 merged_data = self._process_mineru_data(mineru_data, paddle_text_boxes) return merged_data def _extract_paddle_text_boxes(self, paddle_data: Dict) -> List[Dict]: """提取 PaddleOCR 的文字框信息""" text_boxes = [] if 'overall_ocr_res' in paddle_data: ocr_res = paddle_data['overall_ocr_res'] rec_texts = ocr_res.get('rec_texts', []) rec_polys = ocr_res.get('rec_polys', []) rec_scores = ocr_res.get('rec_scores', []) for i, (text, poly, score) in enumerate(zip(rec_texts, rec_polys, rec_scores)): if text and text.strip(): # 计算 bbox (x_min, y_min, x_max, y_max) xs = [p[0] for p in poly] ys = [p[1] for p in poly] bbox = [min(xs), min(ys), max(xs), max(ys)] text_boxes.append({ 'text': text, 'bbox': bbox, 'poly': poly, 'score': score, 'paddle_bbox_index': i, 'used': False # 标记是否已被使用 }) return text_boxes def _process_mineru_data(self, mineru_data: List[Dict], paddle_text_boxes: List[Dict]) -> List[Dict]: """处理 MinerU 数据,添加 bbox 信息 Args: mineru_data (List[Dict]): _description_ paddle_text_boxes (List[Dict]): _description_ Returns: List[Dict]: _description_ """ merged_data = [] cells = None # 存储所有表格单元格信息 paddle_pointer = 0 # PaddleOCR 文字框指针 last_matched_index = 0 # 上次匹配成功的索引 # 对mineru_data按bbox从上到下排序,从左到右确保顺序一致 mineru_data.sort(key=lambda x: (x['bbox'][1], x['bbox'][0]) if 'bbox' in x else (float('inf'), float('inf'))) for item in mineru_data: if item['type'] == 'table': # 处理表格 merged_item = item.copy() table_html = item.get('table_body', '') # 解析 HTML 表格并添加 bbox enhanced_html, cells, paddle_pointer = self._enhance_table_html_with_bbox( table_html, paddle_text_boxes, paddle_pointer ) merged_item['table_body'] = enhanced_html merged_item['table_body_with_bbox'] = enhanced_html merged_item['bbox_mapping'] = 'merged_from_paddle_ocr' merged_item['table_cells'] = cells if cells else [] merged_data.append(merged_item) elif item['type'] in ['text', 'title']: # 处理普通文本 merged_item = item.copy() text = item.get('text', '') # 查找匹配的 bbox matched_bbox, paddle_pointer, last_matched_index = self._find_matching_bbox( text, paddle_text_boxes, paddle_pointer, last_matched_index ) if matched_bbox: # merged_item['bbox'] = matched_bbox['bbox'] # merged_item['bbox_source'] = 'paddle_ocr' # merged_item['text_score'] = matched_bbox['score'] # 沿用mineru的bbox, 就是要移动位置paddle_pointer, last_matched_index # 标记为已使用 matched_bbox['used'] = True merged_data.append(merged_item) elif item['type'] == 'list': # 处理列表项 merged_item = item.copy() list_items = item.get('list_items', []) sub_type = item.get('sub_type', 'unordered') # 有序或无序 for list_item in list_items: # 查找匹配的 bbox matched_bbox, paddle_pointer, last_matched_index = self._find_matching_bbox( list_item, paddle_text_boxes, paddle_pointer, last_matched_index ) if matched_bbox: # 沿用mineru的bbox, 就是要移动位置paddle_pointer, last_matched_index # 标记为已使用 matched_bbox['used'] = True merged_data.append(merged_item) else: # 其他类型直接复制 merged_data.append(item.copy()) return merged_data def _enhance_table_html_with_bbox(self, html: str, paddle_text_boxes: List[Dict], start_pointer: int) -> Tuple[str, List[Dict], int]: """ 为 HTML 表格添加 bbox 信息 Args: html: 原始 HTML 表格 paddle_text_boxes: PaddleOCR 文字框列表 start_pointer: 起始指针位置 Returns: (增强后的 HTML, 单元格数组, 新的指针位置) """ # 需要处理minerU识别为2个连着的cell,如: -741.00|357,259.63, paddle识别为一个cell,如: -741.00357,259.63 soup = BeautifulSoup(html, 'html.parser') current_pointer = start_pointer last_matched_index = start_pointer cells = [] # 存储单元格的 bbox 信息 # 遍历所有行 for row_idx, row in enumerate(soup.find_all('tr')): # 遍历所有单元格 for col_idx, cell in enumerate(row.find_all(['td', 'th'])): cell_text = cell.get_text(strip=True) if not cell_text: continue # 查找匹配的 bbox matched_bbox, current_pointer, last_matched_index = self._find_matching_bbox( cell_text, paddle_text_boxes, current_pointer, last_matched_index ) if matched_bbox: # 添加 data-bbox 属性 bbox = matched_bbox['bbox'] cell['data-bbox'] = f"[{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}]" cell['data-score'] = f"{matched_bbox['score']:.4f}" cell['data-paddle-index'] = str(matched_bbox['paddle_bbox_index']) cells.append({ 'type': 'table_cell', 'text': cell_text, 'bbox': bbox, 'row': row_idx+1, 'col': col_idx+1, 'score': matched_bbox['score'], 'paddle_bbox_index': matched_bbox['paddle_bbox_index'] }) # 标记为已使用 matched_bbox['used'] = True return str(soup), cells, current_pointer def _find_matching_bbox(self, target_text: str, text_boxes: List[Dict], start_index: int, last_match_index: int) -> tuple[Optional[Dict], int, int]: """ 查找匹配的文字框 Args: target_text: 目标文本 text_boxes: 文字框列表 start_index: 起始索引, 是最后一个used=True的位置+1 last_match_index: 上次匹配成功的索引, 可能比start_index小 Returns: (匹配的文字框信息, 新的指针位置, last_match_index) """ target_text = self._normalize_text(target_text) # 过滤过短的目标文本 if len(target_text) < 2: return None, start_index, last_match_index # 由于minerU和Paddle的顺序基本一致, 也有不一致的地方, 所以需要向前找第一个未使用的位置 # MinerU和Paddle都可能识别错误,所以需要一个look_ahead_window来避免漏掉匹配 # 匹配时会遇到一些特殊情况,比如Paddle把两个连着的cell识别为一个字符串,MinerU将单元格上下2行识别为一行 # '1|2024-08-11|扫二维码付' minerU识别为“扫二维码付款”,Paddle识别为'12024-08-11扫二维码付' # 款 # 字符串的顺序极大概率是一致的,所以如果短字符串是长字符串的子串,可以增加相似权重 search_start = last_match_index - 1 unused_count = 0 while search_start >= 0: if text_boxes[search_start]['used'] == False: unused_count += 1 if unused_count >= self.look_ahead_window: break search_start -= 1 if search_start < 0: search_start = 0 while search_start < start_index and text_boxes[search_start]['used']: search_start += 1 search_end = min(start_index + self.look_ahead_window, len(text_boxes)) best_match = None best_index = start_index for i in range(search_start, search_end): if text_boxes[i]['used']: continue box_text = self._normalize_text(text_boxes[i]['text']) # 精确匹配优先 if target_text == box_text: if i >= start_index: return text_boxes[i], i + 1, i else: return text_boxes[i], start_index, i # 过滤过短的候选文本(避免单字符匹配) if len(box_text) < 2: continue # 长度比例检查 - 避免长度差异过大的匹配 length_ratio = min(len(target_text), len(box_text)) / max(len(target_text), len(box_text)) if length_ratio < 0.3: # 长度差异超过70%则跳过 continue # 子串检查 shorter = target_text if len(target_text) < len(box_text) else box_text longer = box_text if len(target_text) < len(box_text) else target_text is_substring = shorter in longer # 计算多种相似度 # token_sort_ratio = fuzz.token_sort_ratio(target_text, box_text) partial_ratio = fuzz.partial_ratio(target_text, box_text) if is_substring: partial_ratio += 10 # 子串时提升相似度 # 综合相似度 - 两种算法都要达到阈值 if (partial_ratio >= self.similarity_threshold): if i >= start_index: return text_boxes[i], i + 1, last_match_index else: return text_boxes[i], start_index, last_match_index return best_match, best_index, last_match_index def _normalize_text(self, text: str) -> str: """标准化文本(去除空格、标点等)""" # 移除所有空白字符 text = re.sub(r'\s+', '', text) # 转换全角数字和字母为半角 text = self._full_to_half(text) return text.lower() def _full_to_half(self, text: str) -> str: """全角转半角""" result = [] for char in text: code = ord(char) if code == 0x3000: # 全角空格 code = 0x0020 elif 0xFF01 <= code <= 0xFF5E: # 全角字符 code -= 0xFEE0 result.append(chr(code)) return ''.join(result) def generate_enhanced_markdown(self, merged_data: List[Dict], output_path: Optional[str] = None, mineru_file: Optional[str] = None) -> str: """ 生成增强的 Markdown(包含 bbox 信息的注释) 参考 MinerU 的实现,支持标题、列表、表格标题等 Args: merged_data: 合并后的数据 output_path: 输出路径(可选) mineru_file: MinerU 源文件路径(用于复制图片) Returns: Markdown 内容 """ md_lines = [] for item in merged_data: item_type = item.get('type', '') bbox = item.get('bbox', []) # 添加 bbox 注释 if bbox: md_lines.append(f"") # 根据类型处理 if item_type == 'title': # 标题 - 使用 text_level 确定标题级别 text = item.get('text', '') text_level = item.get('text_level', 1) heading = '#' * min(text_level, 6) # 最多6级标题 md_lines.append(f"{heading} {text}\n") elif item_type == 'text': # 普通文本 - 可能也有 text_level text = item.get('text', '') text_level = item.get('text_level', 0) if text_level > 0: # 作为标题处理 heading = '#' * min(text_level, 6) md_lines.append(f"{heading} {text}\n") else: # 普通段落 md_lines.append(f"{text}\n") elif item_type == 'list': # 列表 sub_type = item.get('sub_type', 'text') list_items = item.get('list_items', []) for list_item in list_items: md_lines.append(f"{list_item}\n") md_lines.append("") # 列表后添加空行 elif item_type == 'table': # 表格标题 table_caption = item.get('table_caption', []) if table_caption: for caption in table_caption: if caption: # 跳过空标题 md_lines.append(f"**{caption}**\n") # 表格内容 table_body = item.get('table_body_with_bbox', item.get('table_body', '')) if table_body: md_lines.append(table_body) md_lines.append("") # 表格脚注 table_footnote = item.get('table_footnote', []) if table_footnote: for footnote in table_footnote: if footnote: md_lines.append(f"*{footnote}*") md_lines.append("") elif item_type == 'image': # 图片 img_path = item.get('img_path', '') # 复制图片到输出目录 if img_path and mineru_file and output_path: mineru_dir = Path(mineru_file).parent img_full_path = mineru_dir / img_path if img_full_path.exists(): output_img_path = Path(output_path).parent / img_path output_img_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy(img_full_path, output_img_path) # 图片标题 image_caption = item.get('image_caption', []) if image_caption: for caption in image_caption: if caption: md_lines.append(f"**{caption}**\n") # 插入图片 md_lines.append(f"![Image]({img_path})\n") # 图片脚注 image_footnote = item.get('image_footnote', []) if image_footnote: for footnote in image_footnote: if footnote: md_lines.append(f"*{footnote}*") md_lines.append("") elif item_type == 'equation': # 公式 latex = item.get('latex', '') if latex: md_lines.append(f"$$\n{latex}\n$$\n") elif item_type == 'inline_equation': # 行内公式 latex = item.get('latex', '') if latex: md_lines.append(f"${latex}$\n") elif item_type == 'page_number': # 页码 - 通常跳过或作为注释 text = item.get('text', '') md_lines.append(f"\n") elif item_type == 'header': # 页眉 text = item.get('text', '') md_lines.append(f"\n") elif item_type == 'footer': # 页脚 text = item.get('text', '') if text: md_lines.append(f"\n") elif item_type == 'reference': # 参考文献 text = item.get('text', '') md_lines.append(f"> {text}\n") else: # 未知类型 - 尝试提取文本 text = item.get('text', '') if text: md_lines.append(f"{text}\n") markdown_content = '\n'.join(md_lines) # 保存文件 if output_path: with open(output_path, 'w', encoding='utf-8') as f: f.write(markdown_content) return markdown_content def extract_table_cells_with_bbox(self, merged_data: List[Dict]) -> List[Dict]: """ 提取所有表格单元格及其 bbox 信息 Returns: 单元格列表,每个包含 text, bbox, row, col 等信息 """ cells = [] for item in merged_data: if item['type'] != 'table': continue html = item.get('table_body_with_bbox', item.get('table_body', '')) soup = BeautifulSoup(html, 'html.parser') # 遍历所有行 for row_idx, row in enumerate(soup.find_all('tr')): # 遍历所有单元格 for col_idx, cell in enumerate(row.find_all(['td', 'th'])): cell_text = cell.get_text(strip=True) bbox_str = cell.get('data-bbox', '') if bbox_str: try: bbox = json.loads(bbox_str) cells.append({ 'text': cell_text, 'bbox': bbox, 'row': row_idx, 'col': col_idx, 'score': float(cell.get('data-score', 0)), 'paddle_index': int(cell.get('data-paddle-index', -1)) }) except (json.JSONDecodeError, ValueError): pass return cells def merge_single_file(mineru_file: Path, paddle_file: Path, output_dir: Path, output_format: str, merger: MinerUPaddleOCRMerger) -> bool: """ 合并单个文件 Args: mineru_file: MinerU JSON 文件路径 paddle_file: PaddleOCR JSON 文件路径 output_dir: 输出目录 merger: 合并器实例 Returns: 是否成功 """ print(f"📄 处理: {mineru_file.name}") # 输出文件路径 merged_md_path = output_dir / f"{mineru_file.stem}.md" merged_json_path = output_dir / f"{mineru_file.stem}.json" try: # 合并数据 merged_data = merger.merge_table_with_bbox( str(mineru_file), str(paddle_file) ) # 生成 Markdown if output_format in ['markdown', 'both']: merger.generate_enhanced_markdown(merged_data, str(merged_md_path), mineru_file) # 提取单元格信息 # cells = merger.extract_table_cells_with_bbox(merged_data) if output_format in ['json', 'both']: with open(merged_json_path, 'w', encoding='utf-8') as f: json.dump(merged_data, f, ensure_ascii=False, indent=2) print(f" ✅ 合并完成") print(f" 📊 共处理了 {len(merged_data)} 个对象") print(f" 💾 输出文件:") if output_format in ['markdown', 'both']: print(f" - {merged_md_path.name}") if output_format in ['json', 'both']: print(f" - {merged_json_path.name}") return True except Exception as e: print(f" ❌ 处理失败: {e}") import traceback traceback.print_exc() return False def merge_mineru_paddle_batch(mineru_dir: str, paddle_dir: str, output_dir: str, output_format: str = 'both', look_ahead_window: int = 10, similarity_threshold: int = 80): """ 批量合并 MinerU 和 PaddleOCR 的结果 Args: mineru_dir: MinerU 结果目录 paddle_dir: PaddleOCR 结果目录 output_dir: 输出目录 look_ahead_window: 向前查找窗口大小 similarity_threshold: 相似度阈值 """ mineru_path = Path(mineru_dir) paddle_path = Path(paddle_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) merger = MinerUPaddleOCRMerger( look_ahead_window=look_ahead_window, similarity_threshold=similarity_threshold ) # 查找所有 MinerU 的 JSON 文件 mineru_files = list(mineru_path.glob('*_page_*[0-9].json')) mineru_files.sort() print(f"\n🔍 找到 {len(mineru_files)} 个 MinerU 文件") print(f"📂 MinerU 目录: {mineru_dir}") print(f"📂 PaddleOCR 目录: {paddle_dir}") print(f"📂 输出目录: {output_dir}") print(f"⚙️ 查找窗口: {look_ahead_window}") print(f"⚙️ 相似度阈值: {similarity_threshold}%\n") success_count = 0 failed_count = 0 for mineru_file in mineru_files: # 查找对应的 PaddleOCR 文件 paddle_file = paddle_path / mineru_file.name if not paddle_file.exists(): print(f"⚠️ 跳过: 未找到对应的 PaddleOCR 文件: {paddle_file.name}\n") failed_count += 1 continue if merge_single_file(mineru_file, paddle_file, output_path, output_format, merger): success_count += 1 else: failed_count += 1 print() # 空行分隔 # 打印统计信息 print("=" * 60) print(f"✅ 处理完成!") print(f"📊 统计信息:") print(f" - 总文件数: {len(mineru_files)}") print(f" - 成功: {success_count}") print(f" - 失败: {failed_count}") print("=" * 60) def main(): """主函数""" parser = argparse.ArgumentParser( description='合并 MinerU 和 PaddleOCR 的识别结果,添加 bbox 坐标信息', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例用法: 1. 批量处理整个目录: python merge_mineru_paddle_ocr.py \\ --mineru-dir /path/to/mineru/results \\ --paddle-dir /path/to/paddle/results \\ --output-dir /path/to/output 2. 处理单个文件: python merge_mineru_paddle_ocr.py \\ --mineru-file /path/to/file_page_001.json \\ --paddle-file /path/to/file_page_001.json \\ --output-dir /path/to/output 3. 自定义参数: python merge_mineru_paddle_ocr.py \\ --mineru-dir /path/to/mineru \\ --paddle-dir /path/to/paddle \\ --output-dir /path/to/output \\ --window 15 \\ --threshold 85 """ ) # 文件/目录参数 file_group = parser.add_argument_group('文件参数') file_group.add_argument( '--mineru-file', type=str, help='MinerU 输出的 JSON 文件路径(单文件模式)' ) file_group.add_argument( '--paddle-file', type=str, help='PaddleOCR 输出的 JSON 文件路径(单文件模式)' ) dir_group = parser.add_argument_group('目录参数') dir_group.add_argument( '--mineru-dir', type=str, help='MinerU 结果目录(批量模式)' ) dir_group.add_argument( '--paddle-dir', type=str, help='PaddleOCR 结果目录(批量模式)' ) # 输出参数 output_group = parser.add_argument_group('输出参数') output_group.add_argument( '-o', '--output-dir', type=str, required=True, help='输出目录(必需)' ) output_group.add_argument( '-f', '--format', choices=['json', 'markdown', 'both'], default='both', help='输出格式' ) # 算法参数 algo_group = parser.add_argument_group('算法参数') algo_group.add_argument( '-w', '--window', type=int, default=15, help='向前查找的窗口大小(默认: 10)' ) algo_group.add_argument( '-t', '--threshold', type=int, default=80, help='文本相似度阈值(0-100,默认: 80)' ) args = parser.parse_args() output_format = args.format.lower() # 验证参数 if args.mineru_file and args.paddle_file: # 单文件模式 mineru_file = Path(args.mineru_file) paddle_file = Path(args.paddle_file) output_dir = Path(args.output_dir) if not mineru_file.exists(): print(f"❌ 错误: MinerU 文件不存在: {mineru_file}") return if not paddle_file.exists(): print(f"❌ 错误: PaddleOCR 文件不存在: {paddle_file}") return output_dir.mkdir(parents=True, exist_ok=True) print("\n🔧 单文件处理模式") print(f"📄 MinerU 文件: {mineru_file}") print(f"📄 PaddleOCR 文件: {paddle_file}") print(f"📂 输出目录: {output_dir}") print(f"⚙️ 查找窗口: {args.window}") print(f"⚙️ 相似度阈值: {args.threshold}%\n") merger = MinerUPaddleOCRMerger( look_ahead_window=args.window, similarity_threshold=args.threshold ) success = merge_single_file(mineru_file, paddle_file, output_dir, output_format, merger) if success: print("\n✅ 处理完成!") else: print("\n❌ 处理失败!") elif args.mineru_dir and args.paddle_dir: # 批量模式 if not Path(args.mineru_dir).exists(): print(f"❌ 错误: MinerU 目录不存在: {args.mineru_dir}") return if not Path(args.paddle_dir).exists(): print(f"❌ 错误: PaddleOCR 目录不存在: {args.paddle_dir}") return print("\n🔧 批量处理模式") merge_mineru_paddle_batch( args.mineru_dir, args.paddle_dir, args.output_dir, output_format=output_format, look_ahead_window=args.window, similarity_threshold=args.threshold ) else: parser.print_help() print("\n❌ 错误: 请指定单文件模式或批量模式的参数") print(" 单文件模式: --mineru-file 和 --paddle-file") print(" 批量模式: --mineru-dir 和 --paddle-dir") if __name__ == "__main__": print("🚀 启动 MinerU + PaddleOCR 合并程序...") import sys if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 print("ℹ️ 未提供命令行参数,使用默认配置运行...") # 默认配置 default_config = { "mineru-file": "/Users/zhch158/workspace/data/流水分析/对公_招商银行图/mineru-vlm-2.5.3_Results/对公_招商银行图_page_001.json", "paddle-file": "/Users/zhch158/workspace/data/流水分析/对公_招商银行图/data_PPStructureV3_Results/对公_招商银行图_page_001.json", "output-dir": "/Users/zhch158/workspace/data/流水分析/对公_招商银行图/merged_results", # "mineru-dir": "/Users/zhch158/workspace/data/流水分析/德_内蒙古银行照/mineru-vlm-2.5.3_Results", # "paddle-dir": "/Users/zhch158/workspace/data/流水分析/德_内蒙古银行照/data_PPStructureV3_Results", # "output-dir": "/Users/zhch158/workspace/data/流水分析/德_内蒙古银行照/merged_results", "format": "both", "window": "15", "threshold": "85" } print("⚙️ 默认参数:") for key, value in default_config.items(): print(f" --{key}: {value}") # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) sys.exit(main())