#!/usr/bin/env python3 """ 批量处理图片/PDF文件并通过 API 调用远程服务(PP-StructureV3 API 客户端版本) 通过 HTTP API 调用远程 PP-StructureV3 服务进行文档处理。 适用于远程服务、分布式处理场景。 使用方法: python api_client.py --input document.pdf --output_dir ./output --api_url http://10.192.72.11:20026/layout-parsing python api_client.py --input ./images/ --output_dir ./output --api_url http://10.192.72.11:20026/layout-parsing python api_client.py --input file_list.txt --output_dir ./output --api_url http://10.192.72.11:20026/layout-parsing """ import os import sys import json import time import traceback import base64 from pathlib import Path from typing import List, Dict, Any from tqdm import tqdm import argparse import requests from loguru import logger # 导入 ocr_utils ocr_platform_root = Path(__file__).parents[2] if str(ocr_platform_root) not in sys.path: sys.path.insert(0, str(ocr_platform_root)) from ocr_utils import ( get_input_files, collect_pid_files, setup_logging ) # 导入共享工具函数 tools_root = Path(__file__).parents[1] if str(tools_root) not in sys.path: sys.path.insert(0, str(tools_root)) from paddle_common.utils import ( convert_pruned_result_to_json, save_output_images, save_markdown_content ) def call_api_for_image(image_path: str, api_url: str, timeout: int = 300) -> Dict[str, Any]: """ 为单个图像调用API Args: image_path: 图像文件路径 api_url: API URL timeout: 超时时间(秒) Returns: API返回结果 """ try: # 对本地图像进行Base64编码 with open(image_path, "rb") as file: image_bytes = file.read() image_data = base64.b64encode(image_bytes).decode("ascii") payload = { "file": image_data, "fileType": 1, # 添加管道参数设置 "useDocOrientationClassify": True, "useDocUnwarping": False, "useSealRecognition": True, "useTableRecognition": True, "useFormulaRecognition": False, # 避免公式识别的索引错误 "useChartRecognition": True, "useRegionDetection": False, "useOcrResultsWithTableCells": True, "useTableOrientationClassify": False, "useWiredTableCellsTransToHtml": True, "useWirelessTableCellsTransToHtml": True, } # 调用API response = requests.post(api_url, json=payload, timeout=timeout) response.raise_for_status() return response.json()["result"] except requests.exceptions.Timeout: raise Exception(f"API调用超时 ({timeout}秒)") except requests.exceptions.RequestException as e: raise Exception(f"API调用失败: {e}") except KeyError: raise Exception("API返回格式错误,缺少'result'字段") except Exception as e: raise Exception(f"处理图像时发生错误: {e}") def process_images_via_api(image_paths: List[str], api_url: str, output_dir: str = "./output", normalize_numbers: bool = True, timeout: int = 300, log_level: str = "INFO") -> List[Dict[str, Any]]: """ 通过API统一处理图像文件 Args: image_paths: 图像路径列表 api_url: API URL output_dir: 输出目录 normalize_numbers: 是否标准化数字格式 timeout: API调用超时时间 log_level: 日志级别 Returns: 处理结果列表 """ # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) all_results = [] total_images = len(image_paths) logger.info(f"Processing {total_images} images via API") # 使用tqdm显示进度 with tqdm(total=total_images, desc="Processing images", unit="img", bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar: # 逐个处理图像 for img_path in image_paths: start_time = time.time() try: # 调用API处理图像 api_result = call_api_for_image(img_path, api_url, timeout) processing_time = time.time() - start_time # 获取主要数据 layout_parsing_results = api_result.get('layoutParsingResults', []) if not layout_parsing_results: logger.warning("⚠️ Warning: No layoutParsingResults found in API response") all_results.append({ "image_path": str(img_path), "processing_time": processing_time, "success": False, "api_url": api_url, "error": "No layoutParsingResults found in API response", "is_pdf_page": "_page_" in Path(img_path).name }) pbar.update(1) continue # 处理API返回结果 input_path = Path(img_path) # 生成输出文件名 output_filename = input_path.stem # 处理结果(应该只有一个结果) for idx, result in enumerate(layout_parsing_results): if idx > 0: raise ValueError("Multiple results found for a single image") json_content = result.get('prunedResult', {}) json_output_path, converted_json = convert_pruned_result_to_json( json_content, str(input_path), output_dir, output_filename, normalize_numbers=normalize_numbers ) # 保存输出图像 img_content = result.get('outputImages', {}) saved_images = save_output_images(img_content, str(output_dir), output_filename) # 保存Markdown内容 markdown_content = result.get('markdown', {}) md_output_path = save_markdown_content( markdown_content, output_dir, output_filename, normalize_numbers=normalize_numbers, key_text='markdown_texts', key_images='markdown_images', json_data=converted_json ) # 根据实际保存的文件路径判断成功(成功判断标准:.md 和 .json 文件都存在) actual_md_path = Path(md_output_path) if md_output_path else Path(output_dir) / f"{output_filename}.md" actual_json_path = Path(json_output_path) if json_output_path else Path(output_dir) / f"{output_filename}.json" success = actual_md_path.exists() and actual_json_path.exists() # 记录处理结果 result_info = { "image_path": str(input_path), "processing_time": processing_time, "success": success, "api_url": api_url, "is_pdf_page": "_page_" in input_path.name, "processing_info": converted_json.get('processing_info', {}) } if success: result_info.update({ "output_json": json_output_path, "output_md": md_output_path, "output_files": { "md": str(actual_md_path), "json": str(actual_json_path), **saved_images } }) logger.info(f"✅ 处理成功: {input_path.stem}") else: missing_files = [] if not actual_md_path.exists(): missing_files.append("md") if not actual_json_path.exists(): missing_files.append("json") result_info["error"] = f"输出文件不存在: {', '.join(missing_files)}" result_info["success"] = False logger.error(f"❌ 处理失败: {input_path.stem} - {result_info['error']}") all_results.append(result_info) # 更新进度条 success_count = sum(1 for r in all_results if r.get('success', False)) pbar.update(1) pbar.set_postfix({ 'time': f"{processing_time:.2f}s", 'success': f"{success_count}/{len(all_results)}", 'rate': f"{success_count/len(all_results)*100:.1f}%" if len(all_results) > 0 else "0%" }) except Exception as e: logger.error(f"Error processing {Path(img_path).name}: {e}") if log_level == "DEBUG": traceback.print_exc() # 添加错误结果 all_results.append({ "image_path": str(img_path), "processing_time": 0, "success": False, "api_url": api_url, "error": str(e), "is_pdf_page": "_page_" in Path(img_path).name }) pbar.update(1) return all_results def main(): """主函数""" parser = argparse.ArgumentParser( description="PP-StructureV3 API Client Batch Processing", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 处理单个PDF文件 python api_client.py --input document.pdf --output_dir ./output --api_url http://localhost:8080/layout-parsing # 处理图片目录 python api_client.py --input ./images/ --output_dir ./output --api_url http://10.192.72.11:8111/layout-parsing # 处理文件列表 python api_client.py --input file_list.txt --output_dir ./output --api_url http://localhost:8080/layout-parsing # 指定页面范围(PDF或图片目录) python api_client.py --input document.pdf --output_dir ./output --pages "1-5,7" --api_url http://localhost:20026/layout-parsing # 仅验证配置(dry run) python api_client.py --input document.pdf --output_dir ./output --api_url http://localhost:20026/layout-parsing --dry_run # 使用 DEBUG 日志级别获取详细错误信息 python api_client.py --input document.pdf --output_dir ./output --api_url http://localhost:20026/layout-parsing --log_level DEBUG """ ) # 输入参数(统一使用 --input) parser.add_argument( "--input", "-i", required=True, type=str, help="输入路径(支持PDF文件、图片文件、图片目录、文件列表.txt、CSV文件)" ) # 输出参数 parser.add_argument( "--output_dir", "-o", type=str, required=True, help="输出目录" ) # API 参数 parser.add_argument( "--api_url", type=str, default="http://localhost:8080/layout-parsing", help="API URL(默认: http://localhost:8080/layout-parsing)" ) parser.add_argument( "--timeout", type=int, default=300, help="API 调用超时时间(秒,默认: 300)" ) parser.add_argument( "--pdf_dpi", type=int, default=200, help="PDF 转图片的 DPI(默认: 200)" ) parser.add_argument( '--no-normalize', action='store_true', help='禁用数字标准化' ) # 处理参数 parser.add_argument( "--pages", "-p", type=str, help="页面范围(PDF和图片目录有效),如: '1-5,7,9-12', '1-', '-10'" ) parser.add_argument( "--collect_results", type=str, help="收集处理结果到指定CSV文件" ) # 日志参数 parser.add_argument( "--log_level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="日志级别(默认: INFO)" ) parser.add_argument( "--log_file", type=str, help="日志文件路径" ) # Dry run 参数 parser.add_argument( "--dry_run", action="store_true", help="仅验证配置和输入,不执行实际处理" ) args = parser.parse_args() # 设置日志 setup_logging(args.log_level, args.log_file) try: # 创建参数对象(用于 get_input_files) class Args: def __init__(self, input_path, output_dir, pdf_dpi): self.input = input_path self.output_dir = output_dir self.pdf_dpi = pdf_dpi args_obj = Args(args.input, args.output_dir, args.pdf_dpi) # 获取并预处理输入文件(页面范围过滤已在 get_input_files 中处理) logger.info("🔄 Preprocessing input files...") if args.pages: logger.info(f"📄 页面范围: {args.pages}") image_files = get_input_files(args_obj, page_range=args.pages) if not image_files: logger.error("❌ No input files found or processed") return 1 output_dir = Path(args.output_dir).resolve() logger.info(f"📁 Output dir: {output_dir}") logger.info(f"📊 Found {len(image_files)} image files to process") # Dry run 模式 if args.dry_run: logger.info("🔍 Dry run mode: 仅验证配置,不执行处理") logger.info(f"📋 配置信息:") logger.info(f" - 输入: {args.input}") logger.info(f" - 输出目录: {output_dir}") logger.info(f" - API URL: {args.api_url}") logger.info(f" - 超时时间: {args.timeout} 秒") logger.info(f" - PDF DPI: {args.pdf_dpi}") logger.info(f" - 数字标准化: {not args.no_normalize}") logger.info(f" - 日志级别: {args.log_level}") if args.pages: logger.info(f" - 页面范围: {args.pages}") logger.info(f"📋 将要处理的文件 ({len(image_files)} 个):") for i, img_file in enumerate(image_files[:20], 1): # 只显示前20个 logger.info(f" {i}. {img_file}") if len(image_files) > 20: logger.info(f" ... 还有 {len(image_files) - 20} 个文件") logger.info("✅ Dry run 完成:配置验证通过") return 0 logger.info(f"🌐 Using API: {args.api_url}") logger.info(f"⏱️ Timeout: {args.timeout} seconds") logger.info(f"🔧 数字标准化: {'启用' if not args.no_normalize else '禁用'}") # 开始处理 start_time = time.time() results = process_images_via_api( image_files, args.api_url, str(output_dir), normalize_numbers=not args.no_normalize, timeout=args.timeout, log_level=args.log_level ) total_time = time.time() - start_time # 统计结果 success_count = sum(1 for r in results if r.get('success', False)) error_count = len(results) - success_count pdf_page_count = sum(1 for r in results if r.get('is_pdf_page', False)) # 统计标准化信息 total_changes = sum(r.get('processing_info', {}).get('character_changes_count', 0) for r in results if 'processing_info' in r) print(f"\n" + "="*60) print(f"✅ API Processing completed!") print(f"📊 Statistics:") print(f" Total files processed: {len(image_files)}") print(f" PDF pages processed: {pdf_page_count}") print(f" Regular images processed: {len(image_files) - pdf_page_count}") print(f" Successful: {success_count}") print(f" Failed: {error_count}") if len(image_files) > 0: print(f" Success rate: {success_count / len(image_files) * 100:.2f}%") if not args.no_normalize and total_changes > 0: print(f" 总标准化字符数: {total_changes}") print(f"⏱️ Performance:") print(f" Total time: {total_time:.2f} seconds") if total_time > 0: print(f" Throughput: {len(image_files) / total_time:.2f} images/second") print(f" Avg time per image: {total_time / len(image_files):.2f} seconds") print(f"\n📁 Output Structure:") print(f" output_dir/") print(f" ├── filename.md # Markdown content") print(f" ├── filename.json # Content list JSON") print(f" └── filename_*.jpg # Output images") # 保存结果统计 stats = { "total_files": len(image_files), "pdf_pages": pdf_page_count, "regular_images": len(image_files) - pdf_page_count, "success_count": success_count, "error_count": error_count, "success_rate": success_count / len(image_files) if len(image_files) > 0 else 0, "total_time": total_time, "throughput": len(image_files) / total_time if total_time > 0 else 0, "avg_time_per_image": total_time / len(image_files) if len(image_files) > 0 else 0, "api_url": args.api_url, "timeout": args.timeout, "pdf_dpi": args.pdf_dpi, "normalization_enabled": not args.no_normalize, "total_character_changes": total_changes, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } # 保存最终结果 output_file_name = Path(output_dir).name output_file = output_dir / f"{output_file_name}_api_results.json" final_results = { "stats": stats, "results": results } with open(output_file, 'w', encoding='utf-8') as f: json.dump(final_results, f, ensure_ascii=False, indent=2) logger.info(f"💾 Results saved to: {output_file}") # 收集处理结果 if not args.collect_results: output_file_processed = output_dir / f"processed_files_{time.strftime('%Y%m%d_%H%M%S')}.csv" else: output_file_processed = Path(args.collect_results).resolve() processed_files = collect_pid_files(str(output_file)) with open(output_file_processed, 'w', encoding='utf-8') as f: f.write("image_path,status\n") for file_path, status in processed_files: f.write(f"{file_path},{status}\n") logger.info(f"💾 Processed files saved to: {output_file_processed}") return 0 except Exception as e: logger.error(f"Processing failed: {e}") traceback.print_exc() return 1 if __name__ == "__main__": logger.info(f"🚀 启动PP-StructureV3 API客户端...") logger.info(f"🔧 CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES', 'Not set')}") if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 logger.info("ℹ️ No command line arguments provided. Running with default configuration...") # 默认配置(API 客户端) default_config = { "input": "/Users/zhch158/workspace/data/流水分析/马公账流水_工商银行.pdf", "output_dir": "./output", "api_url": "http://10.192.72.11:20026/layout-parsing", # 默认 API URL "timeout": "300", "pdf_dpi": "200", "pages": "2", "log_level": "DEBUG", } # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) sys.exit(main())