""" 合并 MinerU 和 PaddleOCR 的结果 使用 MinerU 的表格结构识别 + PaddleOCR 的文字框坐标 """ import json import re import argparse from pathlib import Path from typing import List, Dict, Tuple, Optional from bs4 import BeautifulSoup from fuzzywuzzy import fuzz class MinerUPaddleOCRMerger: """合并 MinerU 和 PaddleOCR 的结果""" def __init__(self, look_ahead_window: int = 10, similarity_threshold: int = 90): """ Args: look_ahead_window: 向前查找的窗口大小 similarity_threshold: 文本相似度阈值 """ self.look_ahead_window = look_ahead_window self.similarity_threshold = similarity_threshold def merge_table_with_bbox(self, mineru_json_path: str, paddle_json_path: str) -> List[Dict]: """ 合并 MinerU 和 PaddleOCR 的结果 Args: mineru_json_path: MinerU 输出的 JSON 路径 paddle_json_path: PaddleOCR 输出的 JSON 路径 output_path: 输出路径(可选) Returns: 合并后的结果字典 """ merged_data = None # 加载数据 with open(mineru_json_path, 'r', encoding='utf-8') as f: mineru_data = json.load(f) with open(paddle_json_path, 'r', encoding='utf-8') as f: paddle_data = json.load(f) # 提取 PaddleOCR 的文字框信息 paddle_text_boxes = self._extract_paddle_text_boxes(paddle_data) # 处理 MinerU 的数据 merged_data = self._process_mineru_data(mineru_data, paddle_text_boxes) return merged_data def _extract_paddle_text_boxes(self, paddle_data: Dict) -> List[Dict]: """提取 PaddleOCR 的文字框信息""" text_boxes = [] if 'overall_ocr_res' in paddle_data: ocr_res = paddle_data['overall_ocr_res'] rec_texts = ocr_res.get('rec_texts', []) rec_polys = ocr_res.get('rec_polys', []) rec_scores = ocr_res.get('rec_scores', []) for i, (text, poly, score) in enumerate(zip(rec_texts, rec_polys, rec_scores)): if text and text.strip(): # 计算 bbox (x_min, y_min, x_max, y_max) xs = [p[0] for p in poly] ys = [p[1] for p in poly] bbox = [min(xs), min(ys), max(xs), max(ys)] text_boxes.append({ 'text': text, 'bbox': bbox, 'poly': poly, 'score': score, 'paddle_bbox_index': i, 'used': False # 标记是否已被使用 }) return text_boxes def _process_mineru_data(self, mineru_data: List[Dict], paddle_text_boxes: List[Dict]) -> List[Dict]: """处理 MinerU 数据,添加 bbox 信息""" merged_data = [] cells = None # 存储所有表格单元格信息 paddle_pointer = 0 # PaddleOCR 文字框指针 # 对mineru_data按bbox从上到下排序,从左到右确保顺序一致 mineru_data.sort(key=lambda x: (x['bbox'][1], x['bbox'][0]) if 'bbox' in x else (float('inf'), float('inf'))) for item in mineru_data: if item['type'] == 'table': # 处理表格 merged_item = item.copy() table_html = item.get('table_body', '') # 解析 HTML 表格并添加 bbox enhanced_html, cells, paddle_pointer = self._enhance_table_html_with_bbox( table_html, paddle_text_boxes, paddle_pointer ) merged_item['table_body'] = enhanced_html merged_item['table_body_with_bbox'] = enhanced_html merged_item['bbox_mapping'] = 'merged_from_paddle_ocr' merged_data.append(merged_item) elif item['type'] in ['text', 'header']: # 处理普通文本 merged_item = item.copy() text = item.get('text', '') # 查找匹配的 bbox matched_bbox, paddle_pointer = self._find_matching_bbox( text, paddle_text_boxes, paddle_pointer ) if matched_bbox: merged_item['bbox'] = matched_bbox['bbox'] merged_item['bbox_source'] = 'paddle_ocr' merged_item['text_score'] = matched_bbox['score'] # 标记为已使用 matched_bbox['used'] = True merged_data.append(merged_item) else: # 其他类型直接复制 merged_data.append(item.copy()) if cells: merged_data.extend(cells) return merged_data def _enhance_table_html_with_bbox(self, html: str, paddle_text_boxes: List[Dict], start_pointer: int) -> Tuple[str, List[Dict], int]: """ 为 HTML 表格添加 bbox 信息 Args: html: 原始 HTML 表格 paddle_text_boxes: PaddleOCR 文字框列表 start_pointer: 起始指针位置 Returns: (增强后的 HTML, 单元格数组, 新的指针位置) """ soup = BeautifulSoup(html, 'html.parser') current_pointer = start_pointer cells = [] # 存储单元格的 bbox 信息 # 遍历所有行 for row_idx, row in enumerate(soup.find_all('tr')): # 遍历所有单元格 for col_idx, cell in enumerate(row.find_all(['td', 'th'])): cell_text = cell.get_text(strip=True) if not cell_text: continue # 查找匹配的 bbox matched_bbox, current_pointer = self._find_matching_bbox( cell_text, paddle_text_boxes, current_pointer ) if matched_bbox: # 添加 data-bbox 属性 bbox = matched_bbox['bbox'] cell['data-bbox'] = f"[{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}]" cell['data-score'] = f"{matched_bbox['score']:.4f}" cell['data-paddle-index'] = str(matched_bbox['paddle_bbox_index']) cells.append({ 'type': 'table_cell', 'text': cell_text, 'bbox': bbox, 'row': row_idx+1, 'col': col_idx+1, 'score': matched_bbox['score'], 'paddle_bbox_index': matched_bbox['paddle_bbox_index'] }) # 标记为已使用 matched_bbox['used'] = True return str(soup), cells, current_pointer def _find_matching_bbox(self, target_text: str, text_boxes: List[Dict], start_index: int) -> tuple[Optional[Dict], int]: """ 查找匹配的文字框 Args: target_text: 目标文本 text_boxes: 文字框列表 start_index: 起始索引 Returns: (匹配的文字框信息, 新的指针位置) """ target_text = self._normalize_text(target_text) # 在窗口范围内查找, 窗口是start_index往回移动窗口的1/3到start_index + look_ahead_window search_start = max(0, int(start_index - self.look_ahead_window/3)) search_end = min(start_index + self.look_ahead_window, len(text_boxes)) best_match = None best_index = start_index for i in range(search_start, search_end): if text_boxes[i]['used']: continue box_text = self._normalize_text(text_boxes[i]['text']) # 计算相似度 similarity = fuzz.partial_ratio(target_text, box_text) # 精确匹配优先 if target_text == box_text: return text_boxes[i], i + 1 # 大于阈值就返回,不找最佳 # if similarity > best_similarity and similarity >= self.similarity_threshold: if similarity >= self.similarity_threshold: return text_boxes[i], i + 1 return best_match, best_index def _normalize_text(self, text: str) -> str: """标准化文本(去除空格、标点等)""" # 移除所有空白字符 text = re.sub(r'\s+', '', text) # 转换全角数字和字母为半角 text = self._full_to_half(text) return text.lower() def _full_to_half(self, text: str) -> str: """全角转半角""" result = [] for char in text: code = ord(char) if code == 0x3000: # 全角空格 code = 0x0020 elif 0xFF01 <= code <= 0xFF5E: # 全角字符 code -= 0xFEE0 result.append(chr(code)) return ''.join(result) def generate_enhanced_markdown(self, merged_data: List[Dict], output_path: Optional[str] = None) -> str: """ 生成增强的 Markdown(包含 bbox 信息的注释) Args: merged_data: 合并后的数据 output_path: 输出路径(可选) Returns: Markdown 内容 """ md_lines = [] for item in merged_data: if item['type'] == 'header': text = item.get('text', '') bbox = item.get('bbox', []) md_lines.append(f"") md_lines.append(f"# {text}\n") elif item['type'] == 'text': text = item.get('text', '') bbox = item.get('bbox', []) if bbox: md_lines.append(f"") md_lines.append(f"{text}\n") elif item['type'] == 'table': md_lines.append("\n") md_lines.append(item.get('table_body_with_bbox', item.get('table_body', ''))) md_lines.append("\n") elif item['type'] == 'image': img_path = item.get('img_path', '') bbox = item.get('bbox', []) if bbox: md_lines.append(f"") md_lines.append(f"![Image]({img_path})\n") markdown_content = '\n'.join(md_lines) if output_path: with open(output_path, 'w', encoding='utf-8') as f: f.write(markdown_content) return markdown_content def extract_table_cells_with_bbox(self, merged_data: List[Dict]) -> List[Dict]: """ 提取所有表格单元格及其 bbox 信息 Returns: 单元格列表,每个包含 text, bbox, row, col 等信息 """ cells = [] for item in merged_data: if item['type'] != 'table': continue html = item.get('table_body_with_bbox', item.get('table_body', '')) soup = BeautifulSoup(html, 'html.parser') # 遍历所有行 for row_idx, row in enumerate(soup.find_all('tr')): # 遍历所有单元格 for col_idx, cell in enumerate(row.find_all(['td', 'th'])): cell_text = cell.get_text(strip=True) bbox_str = cell.get('data-bbox', '') if bbox_str: try: bbox = json.loads(bbox_str) cells.append({ 'text': cell_text, 'bbox': bbox, 'row': row_idx, 'col': col_idx, 'score': float(cell.get('data-score', 0)), 'paddle_index': int(cell.get('data-paddle-index', -1)) }) except (json.JSONDecodeError, ValueError): pass return cells def merge_single_file(mineru_file: Path, paddle_file: Path, output_dir: Path, output_format: str, merger: MinerUPaddleOCRMerger) -> bool: """ 合并单个文件 Args: mineru_file: MinerU JSON 文件路径 paddle_file: PaddleOCR JSON 文件路径 output_dir: 输出目录 merger: 合并器实例 Returns: 是否成功 """ print(f"📄 处理: {mineru_file.name}") # 输出文件路径 merged_md_path = output_dir / f"{mineru_file.stem}.md" merged_json_path = output_dir / f"{mineru_file.stem}.json" try: # 合并数据 merged_data = merger.merge_table_with_bbox( str(mineru_file), str(paddle_file) ) # 生成 Markdown if output_format in ['markdown', 'both']: merger.generate_enhanced_markdown(merged_data, str(merged_md_path)) # 提取单元格信息 # cells = merger.extract_table_cells_with_bbox(merged_data) if output_format in ['json', 'both']: with open(merged_json_path, 'w', encoding='utf-8') as f: json.dump(merged_data, f, ensure_ascii=False, indent=2) print(f" ✅ 合并完成") print(f" 📊 共处理了 {len(merged_data)} 个对象") print(f" 💾 输出文件:") print(f" - {merged_json_path.name}") return True except Exception as e: print(f" ❌ 处理失败: {e}") import traceback traceback.print_exc() return False def merge_mineru_paddle_batch(mineru_dir: str, paddle_dir: str, output_dir: str, output_format: str = 'both', look_ahead_window: int = 10, similarity_threshold: int = 80): """ 批量合并 MinerU 和 PaddleOCR 的结果 Args: mineru_dir: MinerU 结果目录 paddle_dir: PaddleOCR 结果目录 output_dir: 输出目录 look_ahead_window: 向前查找窗口大小 similarity_threshold: 相似度阈值 """ mineru_path = Path(mineru_dir) paddle_path = Path(paddle_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) merger = MinerUPaddleOCRMerger( look_ahead_window=look_ahead_window, similarity_threshold=similarity_threshold ) # 查找所有 MinerU 的 JSON 文件 mineru_files = list(mineru_path.glob('*_page_*[0-9].json')) mineru_files.sort() print(f"\n🔍 找到 {len(mineru_files)} 个 MinerU 文件") print(f"📂 MinerU 目录: {mineru_dir}") print(f"📂 PaddleOCR 目录: {paddle_dir}") print(f"📂 输出目录: {output_dir}") print(f"⚙️ 查找窗口: {look_ahead_window}") print(f"⚙️ 相似度阈值: {similarity_threshold}%\n") success_count = 0 failed_count = 0 for mineru_file in mineru_files: # 查找对应的 PaddleOCR 文件 paddle_file = paddle_path / mineru_file.name if not paddle_file.exists(): print(f"⚠️ 跳过: 未找到对应的 PaddleOCR 文件: {paddle_file.name}\n") failed_count += 1 continue if merge_single_file(mineru_file, paddle_file, output_path, output_format, merger): success_count += 1 else: failed_count += 1 print() # 空行分隔 # 打印统计信息 print("=" * 60) print(f"✅ 处理完成!") print(f"📊 统计信息:") print(f" - 总文件数: {len(mineru_files)}") print(f" - 成功: {success_count}") print(f" - 失败: {failed_count}") print("=" * 60) def main(): """主函数""" parser = argparse.ArgumentParser( description='合并 MinerU 和 PaddleOCR 的识别结果,添加 bbox 坐标信息', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例用法: 1. 批量处理整个目录: python merge_mineru_paddle_ocr.py \\ --mineru-dir /path/to/mineru/results \\ --paddle-dir /path/to/paddle/results \\ --output-dir /path/to/output 2. 处理单个文件: python merge_mineru_paddle_ocr.py \\ --mineru-file /path/to/file_page_001.json \\ --paddle-file /path/to/file_page_001.json \\ --output-dir /path/to/output 3. 自定义参数: python merge_mineru_paddle_ocr.py \\ --mineru-dir /path/to/mineru \\ --paddle-dir /path/to/paddle \\ --output-dir /path/to/output \\ --window 15 \\ --threshold 85 """ ) # 文件/目录参数 file_group = parser.add_argument_group('文件参数') file_group.add_argument( '--mineru-file', type=str, help='MinerU 输出的 JSON 文件路径(单文件模式)' ) file_group.add_argument( '--paddle-file', type=str, help='PaddleOCR 输出的 JSON 文件路径(单文件模式)' ) dir_group = parser.add_argument_group('目录参数') dir_group.add_argument( '--mineru-dir', type=str, help='MinerU 结果目录(批量模式)' ) dir_group.add_argument( '--paddle-dir', type=str, help='PaddleOCR 结果目录(批量模式)' ) # 输出参数 output_group = parser.add_argument_group('输出参数') output_group.add_argument( '-o', '--output-dir', type=str, required=True, help='输出目录(必需)' ) output_group.add_argument( '-f', '--format', choices=['json', 'markdown', 'both'], default='json', help='输出格式' ) # 算法参数 algo_group = parser.add_argument_group('算法参数') algo_group.add_argument( '-w', '--window', type=int, default=10, help='向前查找的窗口大小(默认: 10)' ) algo_group.add_argument( '-t', '--threshold', type=int, default=80, help='文本相似度阈值(0-100,默认: 80)' ) args = parser.parse_args() output_format = args.format.lower() # 验证参数 if args.mineru_file and args.paddle_file: # 单文件模式 mineru_file = Path(args.mineru_file) paddle_file = Path(args.paddle_file) output_dir = Path(args.output_dir) if not mineru_file.exists(): print(f"❌ 错误: MinerU 文件不存在: {mineru_file}") return if not paddle_file.exists(): print(f"❌ 错误: PaddleOCR 文件不存在: {paddle_file}") return output_dir.mkdir(parents=True, exist_ok=True) print("\n🔧 单文件处理模式") print(f"📄 MinerU 文件: {mineru_file}") print(f"📄 PaddleOCR 文件: {paddle_file}") print(f"📂 输出目录: {output_dir}") print(f"⚙️ 查找窗口: {args.window}") print(f"⚙️ 相似度阈值: {args.threshold}%\n") merger = MinerUPaddleOCRMerger( look_ahead_window=args.window, similarity_threshold=args.threshold ) success = merge_single_file(mineru_file, paddle_file, output_dir, output_format, merger) if success: print("\n✅ 处理完成!") else: print("\n❌ 处理失败!") elif args.mineru_dir and args.paddle_dir: # 批量模式 if not Path(args.mineru_dir).exists(): print(f"❌ 错误: MinerU 目录不存在: {args.mineru_dir}") return if not Path(args.paddle_dir).exists(): print(f"❌ 错误: PaddleOCR 目录不存在: {args.paddle_dir}") return print("\n🔧 批量处理模式") merge_mineru_paddle_batch( args.mineru_dir, args.paddle_dir, args.output_dir, output_format=output_format, look_ahead_window=args.window, similarity_threshold=args.threshold ) else: parser.print_help() print("\n❌ 错误: 请指定单文件模式或批量模式的参数") print(" 单文件模式: --mineru-file 和 --paddle-file") print(" 批量模式: --mineru-dir 和 --paddle-dir") if __name__ == "__main__": print("🚀 启动 MinerU + PaddleOCR 合并程序...") import sys if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 print("ℹ️ 未提供命令行参数,使用默认配置运行...") # 默认配置 default_config = { "mineru-dir": "/Users/zhch158/workspace/data/流水分析/德_内蒙古银行照/mineru-vlm-2.5.3_Results", "paddle-dir": "/Users/zhch158/workspace/data/流水分析/德_内蒙古银行照/data_PPStructureV3_Results", "output-dir": "/Users/zhch158/workspace/data/流水分析/德_内蒙古银行照/merged_results", "format": "both", "window": "15", "threshold": "85" } print(f"📂 MinerU 目录: {default_config['mineru-dir']}") print(f"📂 PaddleOCR 目录: {default_config['paddle-dir']}") print(f"📂 输出目录: {default_config['output-dir']}") print(f"⚙️ 查找窗口: {default_config['window']}") print(f"⚙️ 相似度阈值: {default_config['threshold']}%\n") # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) sys.exit(main())