#!/usr/bin/env python3 """ 批量合并 OCR 结果 自动读取配置文件,对所有 VL 处理器的输出进行 bbox 合并 支持执行器输出日志重定向 """ import os import sys import yaml import argparse import subprocess from pathlib import Path from datetime import datetime from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass import logging from tqdm import tqdm # 添加 merger 模块路径 sys.path.insert(0, str(Path(__file__).parent.parent / 'merger')) @dataclass class MergeTask: """合并任务""" processor_name: str vl_result_dir: Path paddle_result_dir: Path output_dir: Path merger_script: str description: str log_file: str = "" # 🎯 新增:日志文件路径 class BatchMerger: """批量合并器""" # VL 处理器类型映射到合并脚本 MERGER_SCRIPTS = { 'paddleocr_vl': 'merge_paddleocr_vl_paddleocr.py', 'mineru': 'merge_mineru_paddle_ocr.py', 'dotsocr': 'merge_dotsocr_paddleocr.py' } def __init__(self, config_file: str, base_dir: str = None): """ Args: config_file: processor_configs.yaml 路径 base_dir: PDF 基础目录,覆盖配置文件中的设置 """ self.config_file = Path(config_file) self.config = self._load_config() self.base_dir = Path(base_dir) if base_dir else Path(self.config['global']['base_dir']) # 🎯 日志基础目录 self.log_base_dir = self.base_dir / self.config['global'].get('log_dir', 'logs') # 设置日志 self.logger = self._setup_logger() # merger 脚本目录 self.merger_dir = Path(__file__).parent.parent / 'merger' # 🎯 统计信息 self.merge_results: List[Dict[str, Any]] = [] def _load_config(self) -> Dict: """加载配置文件""" with open(self.config_file, 'r', encoding='utf-8') as f: return yaml.safe_load(f) def _setup_logger(self) -> logging.Logger: """设置日志""" logger = logging.getLogger('BatchMerger') logger.setLevel(logging.INFO) if not logger.handlers: console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger def _detect_processor_type(self, processor_name: str) -> str: """ 检测处理器类型 Returns: 'paddleocr_vl', 'mineru', 'dotsocr', 'ppstructure' 等 """ name_lower = processor_name.lower() if 'paddleocr_vl' in name_lower or 'paddleocr-vl' in name_lower: return 'paddleocr_vl' elif 'mineru' in name_lower: return 'mineru' elif 'dotsocr' in name_lower or 'dots' in name_lower: return 'dotsocr' elif 'ppstructure' in name_lower or 'pp-structure' in name_lower: return 'ppstructure' else: return 'unknown' def _get_merger_script(self, processor_type: str) -> str: """获取合并脚本路径""" script_name = self.MERGER_SCRIPTS.get(processor_type) if not script_name: return None script_path = self.merger_dir / script_name return str(script_path) if script_path.exists() else None def _find_paddle_result_dir(self, pdf_dir: Path) -> Path: """ 查找对应的 PaddleOCR 结果目录 优先级: 1. ppstructurev3_cpu_results (本地 CPU) 2. ppstructurev3_results (默认) 3. data_PPStructureV3_Results (旧格式) """ candidates = [ pdf_dir / 'ppstructurev3_client_results', pdf_dir / 'ppstructurev3_single_process_results', ] for candidate in candidates: if candidate.exists(): return candidate return None def _get_log_file_path(self, pdf_dir: Path, processor_name: str) -> Path: """ 🎯 获取合并任务的日志文件路径 日志结构: PDF目录/ └── logs/ └── merge_processor_name/ └── PDF名称_merge_YYYYMMDD_HHMMSS.log """ # 日志目录 log_dir = pdf_dir / 'logs' / f'merge_{processor_name}' log_dir.mkdir(parents=True, exist_ok=True) # 日志文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') log_file = log_dir / f"{pdf_dir.name}_merge_{timestamp}.log" return log_file def discover_merge_tasks( self, pdf_list: List[str] = None, processors: List[str] = None ) -> List[MergeTask]: """ 自动发现需要合并的任务 Args: pdf_list: PDF 文件列表(不含扩展名),如 ['德_内蒙古银行照', ...] processors: 处理器列表,如 ['paddleocr_vl_single_process', ...] Returns: MergeTask 列表 """ tasks = [] # 如果没有指定处理器,扫描所有 VL 类型的处理器 if not processors: processors = [] for proc_name, proc_config in self.config['processors'].items(): proc_type = self._detect_processor_type(proc_name) if proc_type in ['paddleocr_vl', 'mineru', 'dotsocr']: processors.append(proc_name) # 如果没有指定 PDF 列表,扫描基础目录 if not pdf_list: pdf_list = [d.name for d in self.base_dir.iterdir() if d.is_dir()] self.logger.info(f"📂 基础目录: {self.base_dir}") self.logger.info(f"🔍 发现 {len(pdf_list)} 个 PDF 目录") self.logger.info(f"⚙️ 发现 {len(processors)} 个 VL 处理器") # 遍历每个 PDF 目录和处理器组合 for pdf_name in pdf_list: pdf_dir = self.base_dir / pdf_name if not pdf_dir.exists(): self.logger.warning(f"⚠️ 目录不存在: {pdf_dir}") continue # 查找 PaddleOCR 结果目录 paddle_result_dir = self._find_paddle_result_dir(pdf_dir) if not paddle_result_dir: self.logger.warning(f"⚠️ 未找到 PaddleOCR 结果: {pdf_name}") continue # 遍历每个 VL 处理器 for proc_name in processors: if proc_name not in self.config['processors']: self.logger.warning(f"⚠️ 处理器不存在: {proc_name}") continue proc_config = self.config['processors'][proc_name] proc_type = self._detect_processor_type(proc_name) # 获取合并脚本 merger_script = self._get_merger_script(proc_type) if not merger_script: self.logger.warning(f"⚠️ 不支持的处理器类型: {proc_name} ({proc_type})") continue # VL 结果目录 vl_output_subdir = proc_config.get('output_subdir', f'{proc_name}_results') vl_result_dir = pdf_dir / vl_output_subdir if not vl_result_dir.exists(): self.logger.debug(f"⏭️ VL 结果不存在: {vl_result_dir}") continue # 输出目录 output_dir = pdf_dir / f"{vl_output_subdir}_cell_bbox" # 🎯 日志文件路径 log_file = self._get_log_file_path(pdf_dir, proc_name) # 创建任务 task = MergeTask( processor_name=proc_name, vl_result_dir=vl_result_dir, paddle_result_dir=paddle_result_dir, output_dir=output_dir, merger_script=merger_script, description=proc_config.get('description', proc_name), log_file=str(log_file) # 🎯 新增 ) tasks.append(task) return tasks def execute_merge_task( self, task: MergeTask, window: int = 15, threshold: int = 85, output_type: str = 'both', dry_run: bool = False ) -> Dict[str, Any]: """ 🎯 执行单个合并任务(支持日志重定向) Args: task: 合并任务 window: 查找窗口 threshold: 相似度阈值 output_type: 输出格式 dry_run: 模拟运行 Returns: 执行结果字典 """ self.logger.info(f"\n{'='*80}") self.logger.info(f"📄 处理: {task.vl_result_dir.parent.name}") self.logger.info(f"🔧 处理器: {task.description}") self.logger.info(f"📂 VL 结果: {task.vl_result_dir}") self.logger.info(f"📂 PaddleOCR 结果: {task.paddle_result_dir}") self.logger.info(f"📂 输出目录: {task.output_dir}") self.logger.info(f"📄 日志文件: {task.log_file}") self.logger.info(f"{'='*80}") # 构建命令 cmd = [ sys.executable, # 当前 Python 解释器 task.merger_script, f"--{self._get_vl_arg_name(task.merger_script)}-dir", str(task.vl_result_dir), '--paddle-dir', str(task.paddle_result_dir), '--output-dir', str(task.output_dir), '--output-type', output_type, '--window', str(window), '--threshold', str(threshold) ] if dry_run: self.logger.info(f"[DRY RUN] 命令: {' '.join(cmd)}") return { 'task': task, 'success': True, 'duration': 0, 'error': '', 'dry_run': True } # 🎯 执行命令并重定向输出到日志文件 import time start_time = time.time() try: with open(task.log_file, 'w', encoding='utf-8') as log_f: # 写入日志头 log_f.write(f"{'='*80}\n") log_f.write(f"合并任务日志\n") log_f.write(f"{'='*80}\n\n") log_f.write(f"PDF 目录: {task.vl_result_dir.parent}\n") log_f.write(f"处理器: {task.description}\n") log_f.write(f"处理器名称: {task.processor_name}\n") log_f.write(f"VL 结果目录: {task.vl_result_dir}\n") log_f.write(f"PaddleOCR 结果目录: {task.paddle_result_dir}\n") log_f.write(f"输出目录: {task.output_dir}\n") log_f.write(f"合并脚本: {task.merger_script}\n") log_f.write(f"查找窗口: {window}\n") log_f.write(f"相似度阈值: {threshold}\n") log_f.write(f"输出格式: {output_type}\n") log_f.write(f"开始时间: {datetime.now()}\n") log_f.write(f"{'='*80}\n\n") log_f.flush() # 执行命令 result = subprocess.run( cmd, stdout=log_f, # 🎯 重定向 stdout stderr=subprocess.STDOUT, # 🎯 合并 stderr 到 stdout text=True, check=True ) # 写入日志尾 log_f.write(f"\n{'='*80}\n") log_f.write(f"结束时间: {datetime.now()}\n") log_f.write(f"状态: 成功\n") log_f.write(f"{'='*80}\n") duration = time.time() - start_time self.logger.info(f"✅ 合并成功 (耗时: {duration:.2f}秒)") return { 'task': task, 'success': True, 'duration': duration, 'error': '', 'dry_run': False } except subprocess.CalledProcessError as e: duration = time.time() - start_time error_msg = f"命令执行失败 (退出码: {e.returncode})" # 🎯 在日志文件中追加错误信息 with open(task.log_file, 'a', encoding='utf-8') as log_f: log_f.write(f"\n{'='*80}\n") log_f.write(f"结束时间: {datetime.now()}\n") log_f.write(f"状态: 失败\n") log_f.write(f"错误: {error_msg}\n") log_f.write(f"{'='*80}\n") self.logger.error(f"❌ 合并失败 (耗时: {duration:.2f}秒)") self.logger.error(f"错误信息: {error_msg}") self.logger.error(f"详细日志: {task.log_file}") return { 'task': task, 'success': False, 'duration': duration, 'error': error_msg, 'dry_run': False } except Exception as e: duration = time.time() - start_time error_msg = str(e) with open(task.log_file, 'a', encoding='utf-8') as log_f: log_f.write(f"\n{'='*80}\n") log_f.write(f"结束时间: {datetime.now()}\n") log_f.write(f"状态: 异常\n") log_f.write(f"错误: {error_msg}\n") log_f.write(f"{'='*80}\n") self.logger.error(f"❌ 合并异常 (耗时: {duration:.2f}秒)") self.logger.error(f"错误信息: {error_msg}") self.logger.error(f"详细日志: {task.log_file}") return { 'task': task, 'success': False, 'duration': duration, 'error': error_msg, 'dry_run': False } def _get_vl_arg_name(self, merger_script: str) -> str: """获取 VL 参数名称""" script_name = Path(merger_script).stem if 'paddleocr_vl' in script_name: return 'paddleocr-vl' elif 'mineru' in script_name: return 'mineru' elif 'dotsocr' in script_name: return 'dotsocr' else: return 'vl' def _save_summary_log(self, stats: Dict[str, Any]): """🎯 保存汇总日志""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') summary_log_file = self.log_base_dir / f"merge_batch_summary_{timestamp}.log" # 确保目录存在 summary_log_file.parent.mkdir(parents=True, exist_ok=True) with open(summary_log_file, 'w', encoding='utf-8') as f: f.write("OCR 结果批量合并汇总日志\n") f.write("=" * 80 + "\n\n") f.write(f"配置文件: {self.config_file}\n") f.write(f"基础目录: {self.base_dir}\n") f.write(f"日志目录: {self.log_base_dir}\n") f.write(f"开始时间: {datetime.now()}\n") f.write(f"总耗时: {stats['total_duration']:.2f} 秒\n\n") f.write("统计信息:\n") f.write(f" 总任务数: {stats['total']}\n") f.write(f" 成功: {stats['success']}\n") f.write(f" 失败: {stats['failed']}\n\n") if stats['failed_tasks']: f.write("失败的任务:\n") for item in stats['failed_tasks']: f.write(f" ✗ {item['pdf_dir']} / {item['processor']}\n") f.write(f" 错误: {item['error']}\n") f.write(f" 日志: {item['log']}\n\n") f.write("详细结果:\n") for result in self.merge_results: task = result['task'] status = "✓" if result['success'] else "✗" f.write(f"{status} {task.vl_result_dir.parent.name} / {task.processor_name} ({result['duration']:.2f}s)\n") f.write(f" 日志: {task.log_file}\n") if result['error']: f.write(f" 错误: {result['error']}\n") self.logger.info(f"汇总日志已保存: {summary_log_file}") def batch_merge( self, pdf_list: List[str] = None, processors: List[str] = None, window: int = 15, threshold: int = 85, output_type: str = 'both', dry_run: bool = False ) -> Dict: """ 批量合并 Returns: 统计信息字典 """ # 发现任务 tasks = self.discover_merge_tasks(pdf_list, processors) if not tasks: self.logger.warning("⚠️ 没有发现任何合并任务") return { 'total': 0, 'success': 0, 'failed': 0, 'total_duration': 0, 'failed_tasks': [] } self.logger.info(f"\n🎯 发现 {len(tasks)} 个合并任务\n") # 显示任务列表 for i, task in enumerate(tasks, 1): self.logger.info(f"{i}. {task.vl_result_dir.parent.name} / {task.processor_name}") # 确认执行 if not dry_run: confirm = input(f"\n是否继续执行 {len(tasks)} 个合并任务? [Y/n]: ") if confirm.lower() not in ['', 'y', 'yes']: self.logger.info("❌ 已取消") return { 'total': 0, 'success': 0, 'failed': 0, 'total_duration': 0, 'failed_tasks': [] } # 执行任务 import time batch_start_time = time.time() success_count = 0 failed_count = 0 with tqdm(total=len(tasks), desc="合并进度", unit="task") as pbar: for task in tasks: result = self.execute_merge_task( task, window=window, threshold=threshold, output_type=output_type, dry_run=dry_run ) self.merge_results.append(result) if result['success']: success_count += 1 else: failed_count += 1 pbar.update(1) pbar.set_postfix({ 'success': success_count, 'failed': failed_count }) total_duration = time.time() - batch_start_time # 统计失败任务 failed_tasks = [ { 'pdf_dir': r['task'].vl_result_dir.parent.name, 'processor': r['task'].processor_name, 'error': r['error'], 'log': r['task'].log_file } for r in self.merge_results if not r['success'] ] # 统计信息 stats = { 'total': len(tasks), 'success': success_count, 'failed': failed_count, 'total_duration': total_duration, 'failed_tasks': failed_tasks } # 🎯 保存汇总日志 self._save_summary_log(stats) # 打印总结 self.logger.info(f"\n{'='*80}") self.logger.info("📊 合并完成") self.logger.info(f" 总任务数: {stats['total']}") self.logger.info(f" ✅ 成功: {stats['success']}") self.logger.info(f" ❌ 失败: {stats['failed']}") self.logger.info(f" ⏱️ 总耗时: {stats['total_duration']:.2f} 秒") self.logger.info(f"{'='*80}") if failed_tasks: self.logger.info(f"\n失败的任务:") for item in failed_tasks: self.logger.info(f" ✗ {item['pdf_dir']} / {item['processor']}") self.logger.info(f" 错误: {item['error']}") self.logger.info(f" 日志: {item['log']}") return stats def create_parser() -> argparse.ArgumentParser: """创建命令行参数解析器""" parser = argparse.ArgumentParser( description='批量合并 OCR 结果(VL + PaddleOCR)', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例用法: 1. 合并配置文件中所有 VL 处理器的结果: python batch_merge_results.py 2. 合并指定 PDF 的结果: python batch_merge_results.py -f pdf_list.txt 3. 合并指定处理器的结果: python batch_merge_results.py -p paddleocr_vl_single_process -p mineru_vllm 4. 自定义参数: python batch_merge_results.py -w 20 -t 90 5. 模拟运行(不实际执行): python batch_merge_results.py --dry-run """ ) # 配置文件 parser.add_argument( '-c', '--config', default='processor_configs.yaml', help='配置文件路径 (默认: processor_configs.yaml)' ) # PDF 和处理器 parser.add_argument( '-d', '--base-dir', help='PDF 基础目录(覆盖配置文件)' ) parser.add_argument( '-f', '--file-list', help='PDF 列表文件(每行一个 PDF 名称,不含扩展名)' ) parser.add_argument( '-l', '--pdf-list', nargs='+', help='PDF 名称列表(不含扩展名)' ) parser.add_argument( '-p', '--processors', nargs='+', help='处理器列表(不指定则自动检测所有 VL 处理器)' ) # 合并参数 parser.add_argument( '-w', '--window', type=int, default=15, help='查找窗口大小 (默认: 15)' ) parser.add_argument( '-t', '--threshold', type=int, default=85, help='相似度阈值 (默认: 85)' ) parser.add_argument( '--output-type', choices=['json', 'markdown', 'both'], default='both', help='输出格式 (默认: both)' ) # 工具选项 parser.add_argument( '--dry-run', action='store_true', help='模拟运行,不实际执行' ) parser.add_argument( '-v', '--verbose', action='store_true', help='详细输出' ) return parser def main(): """主函数""" parser = create_parser() args = parser.parse_args() # 设置日志级别 if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # 读取 PDF 列表 pdf_list = None if args.file_list: pdf_list = [] with open(args.file_list, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#'): # 移除 .pdf 扩展名 pdf_name = line.replace('.pdf', '') pdf_list.append(pdf_name) elif args.pdf_list: pdf_list = [p.replace('.pdf', '') for p in args.pdf_list] # 创建批量合并器 merger = BatchMerger( config_file=args.config, base_dir=args.base_dir ) # 执行批量合并 stats = merger.batch_merge( pdf_list=pdf_list, processors=args.processors, window=args.window, threshold=args.threshold, output_type=args.output_type, dry_run=args.dry_run ) return 0 if stats['failed'] == 0 else 1 if __name__ == '__main__': print("🚀 启动批量OCR bbox 合并程序...") import sys if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 print("ℹ️ 未提供命令行参数,使用默认配置运行...") # 默认配置 default_config = { "file-list": "pdf_list.txt", } print("⚙️ 默认参数:") for key, value in default_config.items(): print(f" --{key}: {value}") # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) sys.argv.append("--dry-run") sys.argv.append("--verbose") # 添加详细输出参数 sys.exit(main())