| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657 |
- #!/usr/bin/env python3
- """
- PDF 批量处理脚本
- 支持多种处理器,配置文件驱动
- """
- import os
- import sys
- import argparse
- import subprocess
- import json
- import yaml
- from pathlib import Path
- from datetime import datetime
- from typing import List, Dict, Optional, Any
- from dataclasses import dataclass, field
- import logging
- from tqdm import tqdm
- import time
- # ============================================================================
- # 数据类定义
- # ============================================================================
- @dataclass
- class ProcessorConfig:
- """处理器配置"""
- name: str
- script: str
- input_arg: str = "--input_file"
- output_arg: str = "--output_dir"
- extra_args: List[str] = field(default_factory=list)
- output_subdir: str = "results" # 新增:每个处理器独立的输出目录
- description: str = ""
- @dataclass
- class ProcessResult:
- """处理结果"""
- pdf_file: str
- success: bool
- duration: float
- error_message: str = ""
- # ============================================================================
- # 配置管理
- # ============================================================================
- class ConfigManager:
- """配置管理器"""
-
- DEFAULT_CONFIG = {
- 'processors': {
- 'paddleocr_vl_single_process': {
- 'script': 'paddleocr_vl_single_process.py',
- 'input_arg': '--input_file',
- 'output_arg': '--output_dir',
- 'extra_args': [
- '--pipeline=./my_config/PaddleOCR-VL-Client.yaml',
- '--no-adapter'
- ],
- 'output_subdir': 'paddleocr_vl_results',
- 'description': 'PaddleOCR-VL 处理器'
- },
- 'ppstructurev3_single_process': {
- 'script': 'ppstructurev3_single_process.py',
- 'input_arg': '--input_file',
- 'output_arg': '--output_dir',
- 'extra_args': [
- '--pipeline=./my_config/PP-StructureV3.yaml'
- ],
- 'output_subdir': 'ppstructurev3_results',
- 'description': 'PP-StructureV3 处理器'
- },
- 'ppstructurev3_single_client': {
- 'script': 'ppstructurev3_single_client.py',
- 'input_arg': '--input_file',
- 'output_arg': '--output_dir',
- 'extra_args': [
- '--api_url=http://10.192.72.11:8111/layout-parsing',
- '--timeout=300'
- ],
- 'output_subdir': 'ppstructurev3_client_results',
- 'description': 'PP-StructureV3 HTTP API 客户端'
- }
- },
- 'global': {
- 'base_dir': '/Users/zhch158/workspace/data/流水分析',
- 'output_subdir': 'results'
- }
- }
-
- def __init__(self, config_file: Optional[str] = None):
- self.config_file = config_file
- self.config = self._load_config()
-
- def _load_config(self) -> Dict:
- """加载配置文件"""
- if self.config_file and Path(self.config_file).exists():
- with open(self.config_file, 'r', encoding='utf-8') as f:
- if self.config_file.endswith('.yaml') or self.config_file.endswith('.yml'):
- return yaml.safe_load(f)
- else:
- return json.load(f)
- return self.DEFAULT_CONFIG.copy()
-
- def get_processor_config(self, processor_name: str) -> ProcessorConfig:
- """获取处理器配置"""
- if processor_name not in self.config['processors']:
- raise ValueError(f"处理器 '{processor_name}' 不存在")
-
- proc_config = self.config['processors'][processor_name]
- return ProcessorConfig(
- name=processor_name,
- script=proc_config['script'],
- input_arg=proc_config.get('input_arg', '--input_file'),
- output_arg=proc_config.get('output_arg', '--output_dir'),
- extra_args=proc_config.get('extra_args', []),
- output_subdir=proc_config.get('output_subdir', processor_name + '_results'),
- description=proc_config.get('description', '')
- )
-
- def get_global_config(self, key: str, default=None):
- """获取全局配置"""
- return self.config.get('global', {}).get(key, default)
-
- def list_processors(self) -> List[str]:
- """列出所有可用的处理器"""
- return list(self.config['processors'].keys())
- # ============================================================================
- # PDF 文件查找器
- # ============================================================================
- class PDFFileFinder:
- """PDF 文件查找器"""
-
- def __init__(self, base_dir: str):
- self.base_dir = Path(base_dir)
-
- def from_file_list(self, list_file: str) -> List[Path]:
- """从文件列表读取"""
- pdf_files = []
-
- with open(list_file, 'r', encoding='utf-8') as f:
- for line in f:
- # 跳过空行和注释
- line = line.strip()
- if not line or line.startswith('#'):
- continue
-
- # 构建完整路径
- pdf_path = self._resolve_path(line)
- if pdf_path:
- pdf_files.append(pdf_path)
-
- return pdf_files
-
- def from_list(self, pdf_list: List[str]) -> List[Path]:
- """从列表读取"""
- pdf_files = []
-
- for pdf in pdf_list:
- pdf_path = self._resolve_path(pdf.strip())
- if pdf_path:
- pdf_files.append(pdf_path)
-
- return pdf_files
-
- def find_all(self) -> List[Path]:
- """查找基础目录下所有 PDF"""
- return sorted(self.base_dir.rglob('*.pdf'))
-
- def _resolve_path(self, path_str: str) -> Optional[Path]:
- """解析路径"""
- path = Path(path_str)
-
- # 绝对路径
- if path.is_absolute():
- return path if path.exists() else path # 返回路径,即使不存在
-
- # 相对路径
- # 1. 尝试完整相对路径
- candidate1 = self.base_dir / path
- if candidate1.exists():
- return candidate1
-
- # 2. 尝试在同名子目录下查找
- if '/' not in path_str:
- pdf_name = path.stem
- candidate2 = self.base_dir / pdf_name / path.name
- if candidate2.exists():
- return candidate2
-
- # 3. 使用 glob 搜索
- matches = list(self.base_dir.rglob(path.name))
- if matches:
- return matches[0]
-
- # 返回候选路径(即使不存在)
- return candidate1
- # ============================================================================
- # PDF 批处理器
- # ============================================================================
- class PDFBatchProcessor:
- """PDF 批处理器"""
-
- def __init__(
- self,
- processor_config: ProcessorConfig,
- output_subdir: Optional[str] = None,
- dry_run: bool = False
- ):
- self.processor_config = processor_config
- # 如果指定了output_subdir,使用指定的;否则使用处理器配置中的
- self.output_subdir = output_subdir or processor_config.output_subdir
- self.dry_run = dry_run
-
- # 设置日志
- self.logger = self._setup_logger()
-
- # 统计信息
- self.results: List[ProcessResult] = []
-
- def _setup_logger(self) -> logging.Logger:
- """设置日志"""
- logger = logging.getLogger('PDFBatchProcessor')
- logger.setLevel(logging.INFO)
-
- # 避免重复添加handler
- if not logger.handlers:
- # 控制台输出
- console_handler = logging.StreamHandler()
- console_handler.setLevel(logging.INFO)
- console_format = logging.Formatter(
- '%(asctime)s - %(levelname)s - %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S'
- )
- console_handler.setFormatter(console_format)
- logger.addHandler(console_handler)
-
- return logger
-
- def process_files(self, pdf_files: List[Path]) -> Dict[str, Any]:
- """批量处理文件"""
- self.logger.info(f"开始处理 {len(pdf_files)} 个文件")
- self.logger.info(f"处理器: {self.processor_config.description}")
- self.logger.info(f"脚本: {self.processor_config.script}")
- self.logger.info(f"输出目录: {self.output_subdir}")
-
- start_time = time.time()
-
- # 使用进度条
- with tqdm(total=len(pdf_files), desc="处理进度", unit="file") as pbar:
- for pdf_file in pdf_files:
- result = self._process_single_file(pdf_file)
- self.results.append(result)
- pbar.update(1)
-
- # 更新进度条描述
- success_count = sum(1 for r in self.results if r.success)
- pbar.set_postfix({
- 'success': success_count,
- 'failed': len(self.results) - success_count
- })
-
- total_duration = time.time() - start_time
-
- # 生成统计信息
- stats = self._generate_stats(total_duration)
-
- # 保存日志
- self._save_log(stats)
-
- return stats
-
- def _process_single_file(self, pdf_file: Path) -> ProcessResult:
- """处理单个文件"""
- self.logger.info(f"处理: {pdf_file}")
-
- # 检查文件是否存在
- if not pdf_file.exists():
- self.logger.warning(f"跳过: 文件不存在 - {pdf_file}")
- return ProcessResult(
- pdf_file=str(pdf_file),
- success=False,
- duration=0,
- error_message="文件不存在"
- )
-
- # 确定输出目录
- output_dir = pdf_file.parent / pdf_file.stem / self.output_subdir
-
- # if not self.dry_run:
- # output_dir.mkdir(parents=True, exist_ok=True)
-
- # 构建命令
- cmd = self._build_command(pdf_file, output_dir)
-
- self.logger.debug(f"执行命令: {' '.join(cmd)}")
-
- if self.dry_run:
- self.logger.info(f"[DRY RUN] 将执行: {' '.join(cmd)}")
- return ProcessResult(
- pdf_file=str(pdf_file),
- success=True,
- duration=0,
- error_message=""
- )
-
- # 执行命令
- start_time = time.time()
- try:
- result = subprocess.run(
- cmd,
- capture_output=True,
- text=True,
- check=True
- )
- duration = time.time() - start_time
-
- self.logger.info(f"✓ 成功 (耗时: {duration:.2f}秒)")
-
- return ProcessResult(
- pdf_file=str(pdf_file),
- success=True,
- duration=duration,
- error_message=""
- )
-
- except subprocess.CalledProcessError as e:
- duration = time.time() - start_time
- error_msg = e.stderr if e.stderr else str(e)
-
- self.logger.error(f"✗ 失败 (耗时: {duration:.2f}秒)")
- self.logger.error(f"错误信息: {error_msg}")
-
- return ProcessResult(
- pdf_file=str(pdf_file),
- success=False,
- duration=duration,
- error_message=error_msg
- )
-
- def _build_command(self, pdf_file: Path, output_dir: Path) -> List[str]:
- """构建执行命令"""
- cmd = [
- sys.executable, # 使用当前 Python 解释器
- self.processor_config.script,
- self.processor_config.input_arg, str(pdf_file),
- self.processor_config.output_arg, str(output_dir)
- ]
-
- # 添加额外参数
- cmd.extend(self.processor_config.extra_args)
-
- return cmd
-
- def _generate_stats(self, total_duration: float) -> Dict[str, Any]:
- """生成统计信息"""
- success_count = sum(1 for r in self.results if r.success)
- failed_count = len(self.results) - success_count
-
- failed_files = [r.pdf_file for r in self.results if not r.success]
-
- stats = {
- 'total': len(self.results),
- 'success': success_count,
- 'failed': failed_count,
- 'total_duration': total_duration,
- 'failed_files': failed_files,
- 'results': [
- {
- 'file': r.pdf_file,
- 'success': r.success,
- 'duration': r.duration,
- 'error': r.error_message
- }
- for r in self.results
- ]
- }
-
- return stats
-
- def _save_log(self, stats: Dict[str, Any]):
- """保存日志"""
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- log_file = f"batch_process_{self.processor_config.name}_{timestamp}.log"
-
- with open(log_file, 'w', encoding='utf-8') as f:
- f.write("PDF 批量处理日志\n")
- f.write("=" * 80 + "\n\n")
-
- f.write(f"处理器: {self.processor_config.description}\n")
- f.write(f"处理器名称: {self.processor_config.name}\n")
- f.write(f"脚本: {self.processor_config.script}\n")
- f.write(f"输出目录: {self.output_subdir}\n")
- f.write(f"开始时间: {datetime.now()}\n")
- f.write(f"总耗时: {stats['total_duration']:.2f} 秒\n\n")
-
- f.write("统计信息:\n")
- f.write(f" 总文件数: {stats['total']}\n")
- f.write(f" 成功: {stats['success']}\n")
- f.write(f" 失败: {stats['failed']}\n\n")
-
- if stats['failed_files']:
- f.write("失败的文件:\n")
- for file in stats['failed_files']:
- f.write(f" - {file}\n")
- f.write("\n")
-
- f.write("详细结果:\n")
- for result in stats['results']:
- status = "✓" if result['success'] else "✗"
- f.write(f"{status} {result['file']} ({result['duration']:.2f}s)\n")
- if result['error']:
- f.write(f" 错误: {result['error']}\n")
-
- self.logger.info(f"日志已保存: {log_file}")
- # ============================================================================
- # 命令行接口
- # ============================================================================
- def create_parser() -> argparse.ArgumentParser:
- """创建命令行参数解析器"""
- parser = argparse.ArgumentParser(
- description='PDF 批量处理工具',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例用法:
- 1. 使用配置文件中的处理器:
- python batch_process_pdf.py -p paddleocr_vl_single_process -f pdf_list.txt
- 2. 处理指定目录下所有 PDF:
- python batch_process_pdf.py -p ppstructurev3_single_client -d /path/to/pdfs
- 3. 手动指定脚本和参数:
- python batch_process_pdf.py \\
- -s ppstructurev3_single_client.py \\
- -d /path/to/pdfs \\
- -f pdf_list.txt \\
- -e "--api_url=http://localhost:8111 --timeout=600"
- 4. 列出所有可用的处理器:
- python batch_process_pdf.py --list-processors
- 5. 查看配置文件内容:
- python batch_process_pdf.py --show-config
- 6. 覆盖默认输出目录:
- python batch_process_pdf.py -p ppstructurev3_single_process -f pdf_list.txt -o custom_output
- """
- )
-
- # 处理器选择
- parser.add_argument(
- '-p', '--processor',
- help='处理器名称 (如: paddleocr_vl_single_process, ppstructurev3_single_process, ppstructurev3_single_client)'
- )
-
- # 配置文件
- parser.add_argument(
- '-c', '--config',
- default='processor_configs.yaml',
- help='配置文件路径 (默认: processor_configs.yaml)'
- )
-
- # 手动指定脚本
- parser.add_argument(
- '-s', '--script',
- help='Python 脚本路径 (覆盖配置文件)'
- )
-
- # 目录和文件
- parser.add_argument(
- '-d', '--base-dir',
- help='PDF 文件基础目录'
- )
-
- parser.add_argument(
- '-o', '--output-subdir',
- help='输出子目录名称 (覆盖处理器默认配置)'
- )
-
- parser.add_argument(
- '-f', '--file-list',
- help='PDF 文件列表文件路径'
- )
-
- parser.add_argument(
- '-l', '--pdf-list',
- nargs='+',
- help='PDF 文件列表 (空格分隔)'
- )
-
- # 额外参数
- parser.add_argument(
- '-e', '--extra-args',
- help='额外参数 (覆盖配置文件)'
- )
-
- # 工具选项
- parser.add_argument(
- '--list-processors',
- action='store_true',
- help='列出所有可用的处理器'
- )
-
- parser.add_argument(
- '--show-config',
- action='store_true',
- help='显示配置文件内容'
- )
-
- parser.add_argument(
- '--dry-run',
- action='store_true',
- help='模拟运行,不实际执行'
- )
-
- parser.add_argument(
- '-v', '--verbose',
- action='store_true',
- help='详细输出'
- )
-
- return parser
- def main():
- """主函数"""
- parser = create_parser()
- args = parser.parse_args()
-
- # 设置日志级别
- if args.verbose:
- logging.getLogger().setLevel(logging.DEBUG)
-
- # 加载配置
- config_manager = ConfigManager(args.config if Path(args.config).exists() else None)
-
- # 列出处理器
- if args.list_processors:
- print("可用的处理器:")
- for name in config_manager.list_processors():
- proc_config = config_manager.get_processor_config(name)
- print(f" • {name}")
- print(f" 描述: {proc_config.description}")
- print(f" 脚本: {proc_config.script}")
- print(f" 输出目录: {proc_config.output_subdir}")
- print()
- return 0
-
- # 显示配置
- if args.show_config:
- print(yaml.dump(config_manager.config, allow_unicode=True))
- return 0
-
- # 获取处理器配置
- if args.processor:
- processor_config = config_manager.get_processor_config(args.processor)
- elif args.script:
- # 手动指定脚本
- processor_config = ProcessorConfig(
- name='manual',
- script=args.script,
- extra_args=args.extra_args.split() if args.extra_args else [],
- output_subdir=args.output_subdir or 'manual_results'
- )
- else:
- parser.error("必须指定 -p 或 -s 参数")
-
- # 覆盖额外参数
- if args.extra_args and args.processor:
- processor_config.extra_args = args.extra_args.split()
-
- # 获取基础目录
- base_dir = args.base_dir or config_manager.get_global_config('base_dir')
- if not base_dir:
- parser.error("必须指定 -d 参数或在配置文件中设置 base_dir")
-
- # 查找 PDF 文件
- finder = PDFFileFinder(base_dir)
-
- if args.file_list:
- pdf_files = finder.from_file_list(args.file_list)
- elif args.pdf_list:
- pdf_files = finder.from_list(args.pdf_list)
- else:
- pdf_files = finder.find_all()
-
- if not pdf_files:
- print("❌ 未找到任何 PDF 文件")
- return 1
-
- # print(f"\n找到 {len(pdf_files)} 个 PDF 文件")
- valid_file_paths = [f.as_posix() for f in pdf_files if f.exists()]
- if valid_file_paths:
- print("\n".join(valid_file_paths))
- # 验证文件
- valid_files = [f for f in pdf_files if f.exists()]
- invalid_files = [f for f in pdf_files if not f.exists()]
-
- if invalid_files:
- print(f"\n⚠️ 警告: {len(invalid_files)} 个文件不存在:")
- for f in invalid_files[:5]: # 只显示前5个
- print(f" - {f}")
- if len(invalid_files) > 5:
- print(f" ... 还有 {len(invalid_files) - 5} 个")
-
- # 确认执行
- if not args.dry_run and valid_files:
- confirm = input(f"\n是否继续处理 {len(valid_files)} 个文件? [Y/n]: ")
- if confirm.lower() not in ['', 'y', 'yes']:
- print("已取消")
- return 0
-
- # 批量处理
- processor = PDFBatchProcessor(
- processor_config=processor_config,
- output_subdir=args.output_subdir, # 传递命令行指定的输出目录
- dry_run=args.dry_run
- )
-
- stats = processor.process_files(valid_files)
-
- # 显示统计信息
- print("\n" + "=" * 80)
- print("处理完成")
- print("=" * 80)
- print(f"\n📊 统计信息:")
- print(f" 处理器: {processor_config.description}")
- print(f" 输出目录: {processor.output_subdir}")
- print(f" 总文件数: {stats['total']}")
- print(f" ✓ 成功: {stats['success']}")
- print(f" ✗ 失败: {stats['failed']}")
- print(f" ⏱️ 总耗时: {stats['total_duration']:.2f} 秒")
-
- if stats['failed_files']:
- print(f"\n失败的文件:")
- for file in stats['failed_files']:
- print(f" ✗ {file}")
-
- return 0 if stats['failed'] == 0 else 1
- if __name__ == '__main__':
- sys.exit(main())
|