| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730 |
- #!/usr/bin/env python3
- """
- 批量合并 OCR 结果
- 自动读取配置文件,对所有 VL 处理器的输出进行 bbox 合并
- 支持执行器输出日志重定向
- """
- import os
- import sys
- import yaml
- import argparse
- import subprocess
- from pathlib import Path
- from datetime import datetime
- from typing import Dict, List, Tuple, Optional, Any
- from dataclasses import dataclass
- import logging
- from tqdm import tqdm
- # 添加 merger 模块路径
- sys.path.insert(0, str(Path(__file__).parent.parent / 'merger'))
- @dataclass
- class MergeTask:
- """合并任务"""
- processor_name: str
- vl_result_dir: Path
- paddle_result_dir: Path
- output_dir: Path
- merger_script: str
- description: str
- log_file: str = "" # 🎯 新增:日志文件路径
- class BatchMerger:
- """批量合并器"""
-
- # VL 处理器类型映射到合并脚本
- MERGER_SCRIPTS = {
- 'paddleocr_vl': 'merge_paddleocr_vl_paddleocr.py',
- 'mineru': 'merge_mineru_paddle_ocr.py',
- 'dotsocr': 'merge_mineru_paddle_ocr.py', # DotsOCR 也用 MinerU 格式
- }
-
- def __init__(self, config_file: str, base_dir: str = None):
- """
- Args:
- config_file: processor_configs.yaml 路径
- base_dir: PDF 基础目录,覆盖配置文件中的设置
- """
- self.config_file = Path(config_file)
- self.config = self._load_config()
- self.base_dir = Path(base_dir) if base_dir else Path(self.config['global']['base_dir'])
-
- # 🎯 日志基础目录
- self.log_base_dir = self.base_dir / self.config['global'].get('log_dir', 'logs')
-
- # 设置日志
- self.logger = self._setup_logger()
-
- # merger 脚本目录
- self.merger_dir = Path(__file__).parent.parent / 'merger'
-
- # 🎯 统计信息
- self.merge_results: List[Dict[str, Any]] = []
-
- def _load_config(self) -> Dict:
- """加载配置文件"""
- with open(self.config_file, 'r', encoding='utf-8') as f:
- return yaml.safe_load(f)
-
- def _setup_logger(self) -> logging.Logger:
- """设置日志"""
- logger = logging.getLogger('BatchMerger')
- logger.setLevel(logging.INFO)
-
- if not logger.handlers:
- console_handler = logging.StreamHandler()
- console_handler.setLevel(logging.INFO)
- formatter = logging.Formatter(
- '%(asctime)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- console_handler.setFormatter(formatter)
- logger.addHandler(console_handler)
-
- return logger
-
- def _detect_processor_type(self, processor_name: str) -> str:
- """
- 检测处理器类型
-
- Returns:
- 'paddleocr_vl', 'mineru', 'dotsocr', 'ppstructure' 等
- """
- name_lower = processor_name.lower()
-
- if 'paddleocr_vl' in name_lower or 'paddleocr-vl' in name_lower:
- return 'paddleocr_vl'
- elif 'mineru' in name_lower:
- return 'mineru'
- elif 'dotsocr' in name_lower or 'dots' in name_lower:
- return 'dotsocr'
- elif 'ppstructure' in name_lower or 'pp-structure' in name_lower:
- return 'ppstructure'
- else:
- return 'unknown'
-
- def _get_merger_script(self, processor_type: str) -> str:
- """获取合并脚本路径"""
- script_name = self.MERGER_SCRIPTS.get(processor_type)
- if not script_name:
- return None
-
- script_path = self.merger_dir / script_name
- return str(script_path) if script_path.exists() else None
-
- def _find_paddle_result_dir(self, pdf_dir: Path) -> Path:
- """
- 查找对应的 PaddleOCR 结果目录
-
- 优先级:
- 1. ppstructurev3_cpu_results (本地 CPU)
- 2. ppstructurev3_results (默认)
- 3. data_PPStructureV3_Results (旧格式)
- """
- candidates = [
- pdf_dir / 'ppstructurev3_client_results',
- pdf_dir / 'ppstructurev3_single_process_results',
- ]
-
- for candidate in candidates:
- if candidate.exists():
- return candidate
-
- return None
-
- def _get_log_file_path(self, pdf_dir: Path, processor_name: str) -> Path:
- """
- 🎯 获取合并任务的日志文件路径
-
- 日志结构:
- PDF目录/
- └── logs/
- └── merge_processor_name/
- └── PDF名称_merge_YYYYMMDD_HHMMSS.log
- """
- # 日志目录
- log_dir = pdf_dir / 'logs' / f'merge_{processor_name}'
- log_dir.mkdir(parents=True, exist_ok=True)
-
- # 日志文件名
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- log_file = log_dir / f"{pdf_dir.name}_merge_{timestamp}.log"
-
- return log_file
-
- def discover_merge_tasks(
- self,
- pdf_list: List[str] = None,
- processors: List[str] = None
- ) -> List[MergeTask]:
- """
- 自动发现需要合并的任务
-
- Args:
- pdf_list: PDF 文件列表(不含扩展名),如 ['德_内蒙古银行照', ...]
- processors: 处理器列表,如 ['paddleocr_vl_single_process', ...]
-
- Returns:
- MergeTask 列表
- """
- tasks = []
-
- # 如果没有指定处理器,扫描所有 VL 类型的处理器
- if not processors:
- processors = []
- for proc_name, proc_config in self.config['processors'].items():
- proc_type = self._detect_processor_type(proc_name)
- if proc_type in ['paddleocr_vl', 'mineru', 'dotsocr']:
- processors.append(proc_name)
-
- # 如果没有指定 PDF 列表,扫描基础目录
- if not pdf_list:
- pdf_list = [d.name for d in self.base_dir.iterdir() if d.is_dir()]
-
- self.logger.info(f"📂 基础目录: {self.base_dir}")
- self.logger.info(f"🔍 发现 {len(pdf_list)} 个 PDF 目录")
- self.logger.info(f"⚙️ 发现 {len(processors)} 个 VL 处理器")
-
- # 遍历每个 PDF 目录和处理器组合
- for pdf_name in pdf_list:
- pdf_dir = self.base_dir / pdf_name
-
- if not pdf_dir.exists():
- self.logger.warning(f"⚠️ 目录不存在: {pdf_dir}")
- continue
-
- # 查找 PaddleOCR 结果目录
- paddle_result_dir = self._find_paddle_result_dir(pdf_dir)
-
- if not paddle_result_dir:
- self.logger.warning(f"⚠️ 未找到 PaddleOCR 结果: {pdf_name}")
- continue
-
- # 遍历每个 VL 处理器
- for proc_name in processors:
- if proc_name not in self.config['processors']:
- self.logger.warning(f"⚠️ 处理器不存在: {proc_name}")
- continue
-
- proc_config = self.config['processors'][proc_name]
- proc_type = self._detect_processor_type(proc_name)
-
- # 获取合并脚本
- merger_script = self._get_merger_script(proc_type)
- if not merger_script:
- self.logger.warning(f"⚠️ 不支持的处理器类型: {proc_name} ({proc_type})")
- continue
-
- # VL 结果目录
- vl_output_subdir = proc_config.get('output_subdir', f'{proc_name}_results')
- vl_result_dir = pdf_dir / vl_output_subdir
-
- if not vl_result_dir.exists():
- self.logger.debug(f"⏭️ VL 结果不存在: {vl_result_dir}")
- continue
-
- # 输出目录
- output_dir = pdf_dir / f"{vl_output_subdir}_cell_bbox"
-
- # 🎯 日志文件路径
- log_file = self._get_log_file_path(pdf_dir, proc_name)
-
- # 创建任务
- task = MergeTask(
- processor_name=proc_name,
- vl_result_dir=vl_result_dir,
- paddle_result_dir=paddle_result_dir,
- output_dir=output_dir,
- merger_script=merger_script,
- description=proc_config.get('description', proc_name),
- log_file=str(log_file) # 🎯 新增
- )
-
- tasks.append(task)
-
- return tasks
-
- def execute_merge_task(
- self,
- task: MergeTask,
- window: int = 15,
- threshold: int = 85,
- output_type: str = 'both',
- dry_run: bool = False
- ) -> Dict[str, Any]:
- """
- 🎯 执行单个合并任务(支持日志重定向)
-
- Args:
- task: 合并任务
- window: 查找窗口
- threshold: 相似度阈值
- output_type: 输出格式
- dry_run: 模拟运行
-
- Returns:
- 执行结果字典
- """
- self.logger.info(f"\n{'='*80}")
- self.logger.info(f"📄 处理: {task.vl_result_dir.parent.name}")
- self.logger.info(f"🔧 处理器: {task.description}")
- self.logger.info(f"📂 VL 结果: {task.vl_result_dir}")
- self.logger.info(f"📂 PaddleOCR 结果: {task.paddle_result_dir}")
- self.logger.info(f"📂 输出目录: {task.output_dir}")
- self.logger.info(f"📄 日志文件: {task.log_file}")
- self.logger.info(f"{'='*80}")
-
- # 构建命令
- cmd = [
- sys.executable, # 当前 Python 解释器
- task.merger_script,
- f"--{self._get_vl_arg_name(task.merger_script)}-dir", str(task.vl_result_dir),
- '--paddle-dir', str(task.paddle_result_dir),
- '--output-dir', str(task.output_dir),
- '--output-type', output_type,
- '--window', str(window),
- '--threshold', str(threshold)
- ]
-
- if dry_run:
- self.logger.info(f"[DRY RUN] 命令: {' '.join(cmd)}")
- return {
- 'task': task,
- 'success': True,
- 'duration': 0,
- 'error': '',
- 'dry_run': True
- }
-
- # 🎯 执行命令并重定向输出到日志文件
- import time
- start_time = time.time()
-
- try:
- with open(task.log_file, 'w', encoding='utf-8') as log_f:
- # 写入日志头
- log_f.write(f"{'='*80}\n")
- log_f.write(f"合并任务日志\n")
- log_f.write(f"{'='*80}\n\n")
- log_f.write(f"PDF 目录: {task.vl_result_dir.parent}\n")
- log_f.write(f"处理器: {task.description}\n")
- log_f.write(f"处理器名称: {task.processor_name}\n")
- log_f.write(f"VL 结果目录: {task.vl_result_dir}\n")
- log_f.write(f"PaddleOCR 结果目录: {task.paddle_result_dir}\n")
- log_f.write(f"输出目录: {task.output_dir}\n")
- log_f.write(f"合并脚本: {task.merger_script}\n")
- log_f.write(f"查找窗口: {window}\n")
- log_f.write(f"相似度阈值: {threshold}\n")
- log_f.write(f"输出格式: {output_type}\n")
- log_f.write(f"开始时间: {datetime.now()}\n")
- log_f.write(f"{'='*80}\n\n")
- log_f.flush()
-
- # 执行命令
- result = subprocess.run(
- cmd,
- stdout=log_f, # 🎯 重定向 stdout
- stderr=subprocess.STDOUT, # 🎯 合并 stderr 到 stdout
- text=True,
- check=True
- )
-
- # 写入日志尾
- log_f.write(f"\n{'='*80}\n")
- log_f.write(f"结束时间: {datetime.now()}\n")
- log_f.write(f"状态: 成功\n")
- log_f.write(f"{'='*80}\n")
-
- duration = time.time() - start_time
- self.logger.info(f"✅ 合并成功 (耗时: {duration:.2f}秒)")
-
- return {
- 'task': task,
- 'success': True,
- 'duration': duration,
- 'error': '',
- 'dry_run': False
- }
-
- except subprocess.CalledProcessError as e:
- duration = time.time() - start_time
- error_msg = f"命令执行失败 (退出码: {e.returncode})"
-
- # 🎯 在日志文件中追加错误信息
- with open(task.log_file, 'a', encoding='utf-8') as log_f:
- log_f.write(f"\n{'='*80}\n")
- log_f.write(f"结束时间: {datetime.now()}\n")
- log_f.write(f"状态: 失败\n")
- log_f.write(f"错误: {error_msg}\n")
- log_f.write(f"{'='*80}\n")
-
- self.logger.error(f"❌ 合并失败 (耗时: {duration:.2f}秒)")
- self.logger.error(f"错误信息: {error_msg}")
- self.logger.error(f"详细日志: {task.log_file}")
-
- return {
- 'task': task,
- 'success': False,
- 'duration': duration,
- 'error': error_msg,
- 'dry_run': False
- }
-
- except Exception as e:
- duration = time.time() - start_time
- error_msg = str(e)
-
- with open(task.log_file, 'a', encoding='utf-8') as log_f:
- log_f.write(f"\n{'='*80}\n")
- log_f.write(f"结束时间: {datetime.now()}\n")
- log_f.write(f"状态: 异常\n")
- log_f.write(f"错误: {error_msg}\n")
- log_f.write(f"{'='*80}\n")
-
- self.logger.error(f"❌ 合并异常 (耗时: {duration:.2f}秒)")
- self.logger.error(f"错误信息: {error_msg}")
- self.logger.error(f"详细日志: {task.log_file}")
-
- return {
- 'task': task,
- 'success': False,
- 'duration': duration,
- 'error': error_msg,
- 'dry_run': False
- }
-
- def _get_vl_arg_name(self, merger_script: str) -> str:
- """获取 VL 参数名称"""
- script_name = Path(merger_script).stem
-
- if 'paddleocr_vl' in script_name:
- return 'paddleocr-vl'
- elif 'mineru' in script_name:
- return 'mineru'
- else:
- return 'vl'
-
- def _save_summary_log(self, stats: Dict[str, Any]):
- """🎯 保存汇总日志"""
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- summary_log_file = self.log_base_dir / f"merge_batch_summary_{timestamp}.log"
-
- # 确保目录存在
- summary_log_file.parent.mkdir(parents=True, exist_ok=True)
-
- with open(summary_log_file, 'w', encoding='utf-8') as f:
- f.write("OCR 结果批量合并汇总日志\n")
- f.write("=" * 80 + "\n\n")
-
- f.write(f"配置文件: {self.config_file}\n")
- f.write(f"基础目录: {self.base_dir}\n")
- f.write(f"日志目录: {self.log_base_dir}\n")
- f.write(f"开始时间: {datetime.now()}\n")
- f.write(f"总耗时: {stats['total_duration']:.2f} 秒\n\n")
-
- f.write("统计信息:\n")
- f.write(f" 总任务数: {stats['total']}\n")
- f.write(f" 成功: {stats['success']}\n")
- f.write(f" 失败: {stats['failed']}\n\n")
-
- if stats['failed_tasks']:
- f.write("失败的任务:\n")
- for item in stats['failed_tasks']:
- f.write(f" ✗ {item['pdf_dir']} / {item['processor']}\n")
- f.write(f" 错误: {item['error']}\n")
- f.write(f" 日志: {item['log']}\n\n")
-
- f.write("详细结果:\n")
- for result in self.merge_results:
- task = result['task']
- status = "✓" if result['success'] else "✗"
- f.write(f"{status} {task.vl_result_dir.parent.name} / {task.processor_name} ({result['duration']:.2f}s)\n")
- f.write(f" 日志: {task.log_file}\n")
- if result['error']:
- f.write(f" 错误: {result['error']}\n")
-
- self.logger.info(f"汇总日志已保存: {summary_log_file}")
-
- def batch_merge(
- self,
- pdf_list: List[str] = None,
- processors: List[str] = None,
- window: int = 15,
- threshold: int = 85,
- output_type: str = 'both',
- dry_run: bool = False
- ) -> Dict:
- """
- 批量合并
-
- Returns:
- 统计信息字典
- """
- # 发现任务
- tasks = self.discover_merge_tasks(pdf_list, processors)
-
- if not tasks:
- self.logger.warning("⚠️ 没有发现任何合并任务")
- return {
- 'total': 0,
- 'success': 0,
- 'failed': 0,
- 'total_duration': 0,
- 'failed_tasks': []
- }
-
- self.logger.info(f"\n🎯 发现 {len(tasks)} 个合并任务\n")
-
- # 显示任务列表
- for i, task in enumerate(tasks, 1):
- self.logger.info(f"{i}. {task.vl_result_dir.parent.name} / {task.processor_name}")
-
- # 确认执行
- if not dry_run:
- confirm = input(f"\n是否继续执行 {len(tasks)} 个合并任务? [Y/n]: ")
- if confirm.lower() not in ['', 'y', 'yes']:
- self.logger.info("❌ 已取消")
- return {
- 'total': 0,
- 'success': 0,
- 'failed': 0,
- 'total_duration': 0,
- 'failed_tasks': []
- }
-
- # 执行任务
- import time
- batch_start_time = time.time()
- success_count = 0
- failed_count = 0
-
- with tqdm(total=len(tasks), desc="合并进度", unit="task") as pbar:
- for task in tasks:
- result = self.execute_merge_task(
- task,
- window=window,
- threshold=threshold,
- output_type=output_type,
- dry_run=dry_run
- )
-
- self.merge_results.append(result)
-
- if result['success']:
- success_count += 1
- else:
- failed_count += 1
-
- pbar.update(1)
- pbar.set_postfix({
- 'success': success_count,
- 'failed': failed_count
- })
-
- total_duration = time.time() - batch_start_time
-
- # 统计失败任务
- failed_tasks = [
- {
- 'pdf_dir': r['task'].vl_result_dir.parent.name,
- 'processor': r['task'].processor_name,
- 'error': r['error'],
- 'log': r['task'].log_file
- }
- for r in self.merge_results if not r['success']
- ]
-
- # 统计信息
- stats = {
- 'total': len(tasks),
- 'success': success_count,
- 'failed': failed_count,
- 'total_duration': total_duration,
- 'failed_tasks': failed_tasks
- }
-
- # 🎯 保存汇总日志
- self._save_summary_log(stats)
-
- # 打印总结
- self.logger.info(f"\n{'='*80}")
- self.logger.info("📊 合并完成")
- self.logger.info(f" 总任务数: {stats['total']}")
- self.logger.info(f" ✅ 成功: {stats['success']}")
- self.logger.info(f" ❌ 失败: {stats['failed']}")
- self.logger.info(f" ⏱️ 总耗时: {stats['total_duration']:.2f} 秒")
- self.logger.info(f"{'='*80}")
-
- if failed_tasks:
- self.logger.info(f"\n失败的任务:")
- for item in failed_tasks:
- self.logger.info(f" ✗ {item['pdf_dir']} / {item['processor']}")
- self.logger.info(f" 错误: {item['error']}")
- self.logger.info(f" 日志: {item['log']}")
-
- return stats
- def create_parser() -> argparse.ArgumentParser:
- """创建命令行参数解析器"""
- parser = argparse.ArgumentParser(
- description='批量合并 OCR 结果(VL + PaddleOCR)',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例用法:
- 1. 合并配置文件中所有 VL 处理器的结果:
- python batch_merge_results.py
- 2. 合并指定 PDF 的结果:
- python batch_merge_results.py -f pdf_list.txt
- 3. 合并指定处理器的结果:
- python batch_merge_results.py -p paddleocr_vl_single_process -p mineru_vllm
- 4. 自定义参数:
- python batch_merge_results.py -w 20 -t 90
- 5. 模拟运行(不实际执行):
- python batch_merge_results.py --dry-run
- """
- )
-
- # 配置文件
- parser.add_argument(
- '-c', '--config',
- default='processor_configs.yaml',
- help='配置文件路径 (默认: processor_configs.yaml)'
- )
-
- # PDF 和处理器
- parser.add_argument(
- '-d', '--base-dir',
- help='PDF 基础目录(覆盖配置文件)'
- )
-
- parser.add_argument(
- '-f', '--file-list',
- help='PDF 列表文件(每行一个 PDF 名称,不含扩展名)'
- )
-
- parser.add_argument(
- '-l', '--pdf-list',
- nargs='+',
- help='PDF 名称列表(不含扩展名)'
- )
-
- parser.add_argument(
- '-p', '--processors',
- nargs='+',
- help='处理器列表(不指定则自动检测所有 VL 处理器)'
- )
-
- # 合并参数
- parser.add_argument(
- '-w', '--window',
- type=int,
- default=15,
- help='查找窗口大小 (默认: 15)'
- )
-
- parser.add_argument(
- '-t', '--threshold',
- type=int,
- default=85,
- help='相似度阈值 (默认: 85)'
- )
-
- parser.add_argument(
- '--output-type',
- choices=['json', 'markdown', 'both'],
- default='both',
- help='输出格式 (默认: both)'
- )
-
- # 工具选项
- parser.add_argument(
- '--dry-run',
- action='store_true',
- help='模拟运行,不实际执行'
- )
-
- parser.add_argument(
- '-v', '--verbose',
- action='store_true',
- help='详细输出'
- )
-
- return parser
- def main():
- """主函数"""
- parser = create_parser()
- args = parser.parse_args()
-
- # 设置日志级别
- if args.verbose:
- logging.getLogger().setLevel(logging.DEBUG)
-
- # 读取 PDF 列表
- pdf_list = None
- if args.file_list:
- pdf_list = []
- with open(args.file_list, 'r', encoding='utf-8') as f:
- for line in f:
- line = line.strip()
- if line and not line.startswith('#'):
- # 移除 .pdf 扩展名
- pdf_name = line.replace('.pdf', '')
- pdf_list.append(pdf_name)
- elif args.pdf_list:
- pdf_list = [p.replace('.pdf', '') for p in args.pdf_list]
-
- # 创建批量合并器
- merger = BatchMerger(
- config_file=args.config,
- base_dir=args.base_dir
- )
-
- # 执行批量合并
- stats = merger.batch_merge(
- pdf_list=pdf_list,
- processors=args.processors,
- window=args.window,
- threshold=args.threshold,
- output_type=args.output_type,
- dry_run=args.dry_run
- )
-
- return 0 if stats['failed'] == 0 else 1
- if __name__ == '__main__':
- print("🚀 启动批量OCR bbox 合并程序...")
-
- import sys
-
- if len(sys.argv) == 1:
- # 如果没有命令行参数,使用默认配置运行
- print("ℹ️ 未提供命令行参数,使用默认配置运行...")
-
- # 默认配置
- default_config = {
- "file-list": "pdf_list.txt",
- }
-
- print("⚙️ 默认参数:")
- for key, value in default_config.items():
- print(f" --{key}: {value}")
- # 构造参数
- sys.argv = [sys.argv[0]]
- for key, value in default_config.items():
- sys.argv.extend([f"--{key}", str(value)])
- sys.argv.append("--dry-run")
- sys.argv.append("--verbose") # 添加详细输出参数
- sys.exit(main())
|