batch_process_pdf.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. #!/usr/bin/env python3
  2. """
  3. PDF 批量处理脚本
  4. 支持多种处理器,配置文件驱动
  5. """
  6. import os
  7. import sys
  8. import argparse
  9. import subprocess
  10. import json
  11. import yaml
  12. from pathlib import Path
  13. from datetime import datetime
  14. from typing import List, Dict, Optional, Any
  15. from dataclasses import dataclass, field
  16. import logging
  17. from tqdm import tqdm
  18. import time
  19. # ============================================================================
  20. # 数据类定义
  21. # ============================================================================
  22. @dataclass
  23. class ProcessorConfig:
  24. """处理器配置"""
  25. name: str
  26. script: str
  27. input_arg: str = "--input_file"
  28. output_arg: str = "--output_dir"
  29. extra_args: List[str] = field(default_factory=list)
  30. output_subdir: str = "results" # 新增:每个处理器独立的输出目录
  31. description: str = ""
  32. @dataclass
  33. class ProcessResult:
  34. """处理结果"""
  35. pdf_file: str
  36. success: bool
  37. duration: float
  38. error_message: str = ""
  39. # ============================================================================
  40. # 配置管理
  41. # ============================================================================
  42. class ConfigManager:
  43. """配置管理器"""
  44. DEFAULT_CONFIG = {
  45. 'processors': {
  46. 'paddleocr_vl_single_process': {
  47. 'script': 'paddleocr_vl_single_process.py',
  48. 'input_arg': '--input_file',
  49. 'output_arg': '--output_dir',
  50. 'extra_args': [
  51. '--pipeline=./my_config/PaddleOCR-VL-Client.yaml',
  52. '--no-adapter'
  53. ],
  54. 'output_subdir': 'paddleocr_vl_results',
  55. 'description': 'PaddleOCR-VL 处理器'
  56. },
  57. 'ppstructurev3_single_process': {
  58. 'script': 'ppstructurev3_single_process.py',
  59. 'input_arg': '--input_file',
  60. 'output_arg': '--output_dir',
  61. 'extra_args': [
  62. '--pipeline=./my_config/PP-StructureV3.yaml'
  63. ],
  64. 'output_subdir': 'ppstructurev3_results',
  65. 'description': 'PP-StructureV3 处理器'
  66. },
  67. 'ppstructurev3_single_client': {
  68. 'script': 'ppstructurev3_single_client.py',
  69. 'input_arg': '--input_file',
  70. 'output_arg': '--output_dir',
  71. 'extra_args': [
  72. '--api_url=http://10.192.72.11:8111/layout-parsing',
  73. '--timeout=300'
  74. ],
  75. 'output_subdir': 'ppstructurev3_client_results',
  76. 'description': 'PP-StructureV3 HTTP API 客户端'
  77. }
  78. },
  79. 'global': {
  80. 'base_dir': '/Users/zhch158/workspace/data/流水分析',
  81. 'output_subdir': 'results'
  82. }
  83. }
  84. def __init__(self, config_file: Optional[str] = None):
  85. self.config_file = config_file
  86. self.config = self._load_config()
  87. def _load_config(self) -> Dict:
  88. """加载配置文件"""
  89. if self.config_file and Path(self.config_file).exists():
  90. with open(self.config_file, 'r', encoding='utf-8') as f:
  91. if self.config_file.endswith('.yaml') or self.config_file.endswith('.yml'):
  92. return yaml.safe_load(f)
  93. else:
  94. return json.load(f)
  95. return self.DEFAULT_CONFIG.copy()
  96. def get_processor_config(self, processor_name: str) -> ProcessorConfig:
  97. """获取处理器配置"""
  98. if processor_name not in self.config['processors']:
  99. raise ValueError(f"处理器 '{processor_name}' 不存在")
  100. proc_config = self.config['processors'][processor_name]
  101. return ProcessorConfig(
  102. name=processor_name,
  103. script=proc_config['script'],
  104. input_arg=proc_config.get('input_arg', '--input_file'),
  105. output_arg=proc_config.get('output_arg', '--output_dir'),
  106. extra_args=proc_config.get('extra_args', []),
  107. output_subdir=proc_config.get('output_subdir', processor_name + '_results'),
  108. description=proc_config.get('description', '')
  109. )
  110. def get_global_config(self, key: str, default=None):
  111. """获取全局配置"""
  112. return self.config.get('global', {}).get(key, default)
  113. def list_processors(self) -> List[str]:
  114. """列出所有可用的处理器"""
  115. return list(self.config['processors'].keys())
  116. # ============================================================================
  117. # PDF 文件查找器
  118. # ============================================================================
  119. class PDFFileFinder:
  120. """PDF 文件查找器"""
  121. def __init__(self, base_dir: str):
  122. self.base_dir = Path(base_dir)
  123. def from_file_list(self, list_file: str) -> List[Path]:
  124. """从文件列表读取"""
  125. pdf_files = []
  126. with open(list_file, 'r', encoding='utf-8') as f:
  127. for line in f:
  128. # 跳过空行和注释
  129. line = line.strip()
  130. if not line or line.startswith('#'):
  131. continue
  132. # 构建完整路径
  133. pdf_path = self._resolve_path(line)
  134. if pdf_path:
  135. pdf_files.append(pdf_path)
  136. return pdf_files
  137. def from_list(self, pdf_list: List[str]) -> List[Path]:
  138. """从列表读取"""
  139. pdf_files = []
  140. for pdf in pdf_list:
  141. pdf_path = self._resolve_path(pdf.strip())
  142. if pdf_path:
  143. pdf_files.append(pdf_path)
  144. return pdf_files
  145. def find_all(self) -> List[Path]:
  146. """查找基础目录下所有 PDF"""
  147. return sorted(self.base_dir.rglob('*.pdf'))
  148. def _resolve_path(self, path_str: str) -> Optional[Path]:
  149. """解析路径"""
  150. path = Path(path_str)
  151. # 绝对路径
  152. if path.is_absolute():
  153. return path if path.exists() else path # 返回路径,即使不存在
  154. # 相对路径
  155. # 1. 尝试完整相对路径
  156. candidate1 = self.base_dir / path
  157. if candidate1.exists():
  158. return candidate1
  159. # 2. 尝试在同名子目录下查找
  160. if '/' not in path_str:
  161. pdf_name = path.stem
  162. candidate2 = self.base_dir / pdf_name / path.name
  163. if candidate2.exists():
  164. return candidate2
  165. # 3. 使用 glob 搜索
  166. matches = list(self.base_dir.rglob(path.name))
  167. if matches:
  168. return matches[0]
  169. # 返回候选路径(即使不存在)
  170. return candidate1
  171. # ============================================================================
  172. # PDF 批处理器
  173. # ============================================================================
  174. class PDFBatchProcessor:
  175. """PDF 批处理器"""
  176. def __init__(
  177. self,
  178. processor_config: ProcessorConfig,
  179. output_subdir: Optional[str] = None,
  180. dry_run: bool = False
  181. ):
  182. self.processor_config = processor_config
  183. # 如果指定了output_subdir,使用指定的;否则使用处理器配置中的
  184. self.output_subdir = output_subdir or processor_config.output_subdir
  185. self.dry_run = dry_run
  186. # 设置日志
  187. self.logger = self._setup_logger()
  188. # 统计信息
  189. self.results: List[ProcessResult] = []
  190. def _setup_logger(self) -> logging.Logger:
  191. """设置日志"""
  192. logger = logging.getLogger('PDFBatchProcessor')
  193. logger.setLevel(logging.INFO)
  194. # 避免重复添加handler
  195. if not logger.handlers:
  196. # 控制台输出
  197. console_handler = logging.StreamHandler()
  198. console_handler.setLevel(logging.INFO)
  199. console_format = logging.Formatter(
  200. '%(asctime)s - %(levelname)s - %(message)s',
  201. datefmt='%Y-%m-%d %H:%M:%S'
  202. )
  203. console_handler.setFormatter(console_format)
  204. logger.addHandler(console_handler)
  205. return logger
  206. def process_files(self, pdf_files: List[Path]) -> Dict[str, Any]:
  207. """批量处理文件"""
  208. self.logger.info(f"开始处理 {len(pdf_files)} 个文件")
  209. self.logger.info(f"处理器: {self.processor_config.description}")
  210. self.logger.info(f"脚本: {self.processor_config.script}")
  211. self.logger.info(f"输出目录: {self.output_subdir}")
  212. start_time = time.time()
  213. # 使用进度条
  214. with tqdm(total=len(pdf_files), desc="处理进度", unit="file") as pbar:
  215. for pdf_file in pdf_files:
  216. result = self._process_single_file(pdf_file)
  217. self.results.append(result)
  218. pbar.update(1)
  219. # 更新进度条描述
  220. success_count = sum(1 for r in self.results if r.success)
  221. pbar.set_postfix({
  222. 'success': success_count,
  223. 'failed': len(self.results) - success_count
  224. })
  225. total_duration = time.time() - start_time
  226. # 生成统计信息
  227. stats = self._generate_stats(total_duration)
  228. # 保存日志
  229. self._save_log(stats)
  230. return stats
  231. def _process_single_file(self, pdf_file: Path) -> ProcessResult:
  232. """处理单个文件"""
  233. self.logger.info(f"处理: {pdf_file}")
  234. # 检查文件是否存在
  235. if not pdf_file.exists():
  236. self.logger.warning(f"跳过: 文件不存在 - {pdf_file}")
  237. return ProcessResult(
  238. pdf_file=str(pdf_file),
  239. success=False,
  240. duration=0,
  241. error_message="文件不存在"
  242. )
  243. # 确定输出目录
  244. output_dir = pdf_file.parent / pdf_file.stem / self.output_subdir
  245. # if not self.dry_run:
  246. # output_dir.mkdir(parents=True, exist_ok=True)
  247. # 构建命令
  248. cmd = self._build_command(pdf_file, output_dir)
  249. self.logger.debug(f"执行命令: {' '.join(cmd)}")
  250. if self.dry_run:
  251. self.logger.info(f"[DRY RUN] 将执行: {' '.join(cmd)}")
  252. return ProcessResult(
  253. pdf_file=str(pdf_file),
  254. success=True,
  255. duration=0,
  256. error_message=""
  257. )
  258. # 执行命令
  259. start_time = time.time()
  260. try:
  261. result = subprocess.run(
  262. cmd,
  263. capture_output=True,
  264. text=True,
  265. check=True
  266. )
  267. duration = time.time() - start_time
  268. self.logger.info(f"✓ 成功 (耗时: {duration:.2f}秒)")
  269. return ProcessResult(
  270. pdf_file=str(pdf_file),
  271. success=True,
  272. duration=duration,
  273. error_message=""
  274. )
  275. except subprocess.CalledProcessError as e:
  276. duration = time.time() - start_time
  277. error_msg = e.stderr if e.stderr else str(e)
  278. self.logger.error(f"✗ 失败 (耗时: {duration:.2f}秒)")
  279. self.logger.error(f"错误信息: {error_msg}")
  280. return ProcessResult(
  281. pdf_file=str(pdf_file),
  282. success=False,
  283. duration=duration,
  284. error_message=error_msg
  285. )
  286. def _build_command(self, pdf_file: Path, output_dir: Path) -> List[str]:
  287. """构建执行命令"""
  288. cmd = [
  289. sys.executable, # 使用当前 Python 解释器
  290. self.processor_config.script,
  291. self.processor_config.input_arg, str(pdf_file),
  292. self.processor_config.output_arg, str(output_dir)
  293. ]
  294. # 添加额外参数
  295. cmd.extend(self.processor_config.extra_args)
  296. return cmd
  297. def _generate_stats(self, total_duration: float) -> Dict[str, Any]:
  298. """生成统计信息"""
  299. success_count = sum(1 for r in self.results if r.success)
  300. failed_count = len(self.results) - success_count
  301. failed_files = [r.pdf_file for r in self.results if not r.success]
  302. stats = {
  303. 'total': len(self.results),
  304. 'success': success_count,
  305. 'failed': failed_count,
  306. 'total_duration': total_duration,
  307. 'failed_files': failed_files,
  308. 'results': [
  309. {
  310. 'file': r.pdf_file,
  311. 'success': r.success,
  312. 'duration': r.duration,
  313. 'error': r.error_message
  314. }
  315. for r in self.results
  316. ]
  317. }
  318. return stats
  319. def _save_log(self, stats: Dict[str, Any]):
  320. """保存日志"""
  321. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  322. log_file = f"batch_process_{self.processor_config.name}_{timestamp}.log"
  323. with open(log_file, 'w', encoding='utf-8') as f:
  324. f.write("PDF 批量处理日志\n")
  325. f.write("=" * 80 + "\n\n")
  326. f.write(f"处理器: {self.processor_config.description}\n")
  327. f.write(f"处理器名称: {self.processor_config.name}\n")
  328. f.write(f"脚本: {self.processor_config.script}\n")
  329. f.write(f"输出目录: {self.output_subdir}\n")
  330. f.write(f"开始时间: {datetime.now()}\n")
  331. f.write(f"总耗时: {stats['total_duration']:.2f} 秒\n\n")
  332. f.write("统计信息:\n")
  333. f.write(f" 总文件数: {stats['total']}\n")
  334. f.write(f" 成功: {stats['success']}\n")
  335. f.write(f" 失败: {stats['failed']}\n\n")
  336. if stats['failed_files']:
  337. f.write("失败的文件:\n")
  338. for file in stats['failed_files']:
  339. f.write(f" - {file}\n")
  340. f.write("\n")
  341. f.write("详细结果:\n")
  342. for result in stats['results']:
  343. status = "✓" if result['success'] else "✗"
  344. f.write(f"{status} {result['file']} ({result['duration']:.2f}s)\n")
  345. if result['error']:
  346. f.write(f" 错误: {result['error']}\n")
  347. self.logger.info(f"日志已保存: {log_file}")
  348. # ============================================================================
  349. # 命令行接口
  350. # ============================================================================
  351. def create_parser() -> argparse.ArgumentParser:
  352. """创建命令行参数解析器"""
  353. parser = argparse.ArgumentParser(
  354. description='PDF 批量处理工具',
  355. formatter_class=argparse.RawDescriptionHelpFormatter,
  356. epilog="""
  357. 示例用法:
  358. 1. 使用配置文件中的处理器:
  359. python batch_process_pdf.py -p paddleocr_vl_single_process -f pdf_list.txt
  360. 2. 处理指定目录下所有 PDF:
  361. python batch_process_pdf.py -p ppstructurev3_single_client -d /path/to/pdfs
  362. 3. 手动指定脚本和参数:
  363. python batch_process_pdf.py \\
  364. -s ppstructurev3_single_client.py \\
  365. -d /path/to/pdfs \\
  366. -f pdf_list.txt \\
  367. -e "--api_url=http://localhost:8111 --timeout=600"
  368. 4. 列出所有可用的处理器:
  369. python batch_process_pdf.py --list-processors
  370. 5. 查看配置文件内容:
  371. python batch_process_pdf.py --show-config
  372. 6. 覆盖默认输出目录:
  373. python batch_process_pdf.py -p ppstructurev3_single_process -f pdf_list.txt -o custom_output
  374. """
  375. )
  376. # 处理器选择
  377. parser.add_argument(
  378. '-p', '--processor',
  379. help='处理器名称 (如: paddleocr_vl_single_process, ppstructurev3_single_process, ppstructurev3_single_client)'
  380. )
  381. # 配置文件
  382. parser.add_argument(
  383. '-c', '--config',
  384. default='processor_configs.yaml',
  385. help='配置文件路径 (默认: processor_configs.yaml)'
  386. )
  387. # 手动指定脚本
  388. parser.add_argument(
  389. '-s', '--script',
  390. help='Python 脚本路径 (覆盖配置文件)'
  391. )
  392. # 目录和文件
  393. parser.add_argument(
  394. '-d', '--base-dir',
  395. help='PDF 文件基础目录'
  396. )
  397. parser.add_argument(
  398. '-o', '--output-subdir',
  399. help='输出子目录名称 (覆盖处理器默认配置)'
  400. )
  401. parser.add_argument(
  402. '-f', '--file-list',
  403. help='PDF 文件列表文件路径'
  404. )
  405. parser.add_argument(
  406. '-l', '--pdf-list',
  407. nargs='+',
  408. help='PDF 文件列表 (空格分隔)'
  409. )
  410. # 额外参数
  411. parser.add_argument(
  412. '-e', '--extra-args',
  413. help='额外参数 (覆盖配置文件)'
  414. )
  415. # 工具选项
  416. parser.add_argument(
  417. '--list-processors',
  418. action='store_true',
  419. help='列出所有可用的处理器'
  420. )
  421. parser.add_argument(
  422. '--show-config',
  423. action='store_true',
  424. help='显示配置文件内容'
  425. )
  426. parser.add_argument(
  427. '--dry-run',
  428. action='store_true',
  429. help='模拟运行,不实际执行'
  430. )
  431. parser.add_argument(
  432. '-v', '--verbose',
  433. action='store_true',
  434. help='详细输出'
  435. )
  436. return parser
  437. def main():
  438. """主函数"""
  439. parser = create_parser()
  440. args = parser.parse_args()
  441. # 设置日志级别
  442. if args.verbose:
  443. logging.getLogger().setLevel(logging.DEBUG)
  444. # 加载配置
  445. config_manager = ConfigManager(args.config if Path(args.config).exists() else None)
  446. # 列出处理器
  447. if args.list_processors:
  448. print("可用的处理器:")
  449. for name in config_manager.list_processors():
  450. proc_config = config_manager.get_processor_config(name)
  451. print(f" • {name}")
  452. print(f" 描述: {proc_config.description}")
  453. print(f" 脚本: {proc_config.script}")
  454. print(f" 输出目录: {proc_config.output_subdir}")
  455. print()
  456. return 0
  457. # 显示配置
  458. if args.show_config:
  459. print(yaml.dump(config_manager.config, allow_unicode=True))
  460. return 0
  461. # 获取处理器配置
  462. if args.processor:
  463. processor_config = config_manager.get_processor_config(args.processor)
  464. elif args.script:
  465. # 手动指定脚本
  466. processor_config = ProcessorConfig(
  467. name='manual',
  468. script=args.script,
  469. extra_args=args.extra_args.split() if args.extra_args else [],
  470. output_subdir=args.output_subdir or 'manual_results'
  471. )
  472. else:
  473. parser.error("必须指定 -p 或 -s 参数")
  474. # 覆盖额外参数
  475. if args.extra_args and args.processor:
  476. processor_config.extra_args = args.extra_args.split()
  477. # 获取基础目录
  478. base_dir = args.base_dir or config_manager.get_global_config('base_dir')
  479. if not base_dir:
  480. parser.error("必须指定 -d 参数或在配置文件中设置 base_dir")
  481. # 查找 PDF 文件
  482. finder = PDFFileFinder(base_dir)
  483. if args.file_list:
  484. pdf_files = finder.from_file_list(args.file_list)
  485. elif args.pdf_list:
  486. pdf_files = finder.from_list(args.pdf_list)
  487. else:
  488. pdf_files = finder.find_all()
  489. if not pdf_files:
  490. print("❌ 未找到任何 PDF 文件")
  491. return 1
  492. # print(f"\n找到 {len(pdf_files)} 个 PDF 文件")
  493. valid_file_paths = [f.as_posix() for f in pdf_files if f.exists()]
  494. if valid_file_paths:
  495. print("\n".join(valid_file_paths))
  496. # 验证文件
  497. valid_files = [f for f in pdf_files if f.exists()]
  498. invalid_files = [f for f in pdf_files if not f.exists()]
  499. if invalid_files:
  500. print(f"\n⚠️ 警告: {len(invalid_files)} 个文件不存在:")
  501. for f in invalid_files[:5]: # 只显示前5个
  502. print(f" - {f}")
  503. if len(invalid_files) > 5:
  504. print(f" ... 还有 {len(invalid_files) - 5} 个")
  505. # 确认执行
  506. if not args.dry_run and valid_files:
  507. confirm = input(f"\n是否继续处理 {len(valid_files)} 个文件? [Y/n]: ")
  508. if confirm.lower() not in ['', 'y', 'yes']:
  509. print("已取消")
  510. return 0
  511. # 批量处理
  512. processor = PDFBatchProcessor(
  513. processor_config=processor_config,
  514. output_subdir=args.output_subdir, # 传递命令行指定的输出目录
  515. dry_run=args.dry_run
  516. )
  517. stats = processor.process_files(valid_files)
  518. # 显示统计信息
  519. print("\n" + "=" * 80)
  520. print("处理完成")
  521. print("=" * 80)
  522. print(f"\n📊 统计信息:")
  523. print(f" 处理器: {processor_config.description}")
  524. print(f" 输出目录: {processor.output_subdir}")
  525. print(f" 总文件数: {stats['total']}")
  526. print(f" ✓ 成功: {stats['success']}")
  527. print(f" ✗ 失败: {stats['failed']}")
  528. print(f" ⏱️ 总耗时: {stats['total_duration']:.2f} 秒")
  529. if stats['failed_files']:
  530. print(f"\n失败的文件:")
  531. for file in stats['failed_files']:
  532. print(f" ✗ {file}")
  533. return 0 if stats['failed'] == 0 else 1
  534. if __name__ == '__main__':
  535. sys.exit(main())