"""PDF转图像后通过API统一处理""" import json import time import os import traceback import argparse import sys import warnings import base64 from pathlib import Path from typing import List, Dict, Any, Union import requests from tqdm import tqdm from dotenv import load_dotenv load_dotenv(override=True) from utils import ( collect_pid_files, get_input_files, ) from ppstructurev3_utils import ( convert_pruned_result_to_json, save_output_images, save_markdown_content ) def call_api_for_image(image_path: str, api_url: str, timeout: int = 300) -> Dict[str, Any]: """ 为单个图像调用API Args: image_path: 图像文件路径 api_url: API URL timeout: 超时时间(秒) Returns: API返回结果 """ try: # 对本地图像进行Base64编码 with open(image_path, "rb") as file: image_bytes = file.read() image_data = base64.b64encode(image_bytes).decode("ascii") payload = { "file": image_data, "fileType": 1, # 添加管道参数设置 "useDocOrientationClassify": False, "useDocUnwarping": False, "useSealRecognition": True, "useTableRecognition": True, "useFormulaRecognition": False, # 避免公式识别的索引错误 "useChartRecognition": True, "useRegionDetection": False, "useOcrResultsWithTableCells": True, "useTableOrientationClassify": True, "useWiredTableCellsTransToHtml": True, "useWirelessTableCellsTransToHtml": True, } # 调用API response = requests.post(api_url, json=payload, timeout=timeout) response.raise_for_status() return response.json()["result"] except requests.exceptions.Timeout: raise Exception(f"API调用超时 ({timeout}秒)") except requests.exceptions.RequestException as e: raise Exception(f"API调用失败: {e}") except KeyError: raise Exception("API返回格式错误,缺少'result'字段") except Exception as e: raise Exception(f"处理图像时发生错误: {e}") def process_images_via_api(image_paths: List[str], api_url: str, output_dir: str = "./output", normalize_numbers: bool = True, timeout: int = 300) -> List[Dict[str, Any]]: """ 通过API统一处理图像文件 Args: image_paths: 图像路径列表 api_url: API URL output_dir: 输出目录 normalize_numbers: 是否标准化数字格式 timeout: API调用超时时间 Returns: 处理结果列表 """ # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) all_results = [] total_images = len(image_paths) print(f"Processing {total_images} images via API") # 使用tqdm显示进度 with tqdm(total=total_images, desc="Processing images", unit="img", bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar: # 逐个处理图像 for img_path in image_paths: start_time = time.time() try: # 调用API处理图像 api_result = call_api_for_image(img_path, api_url, timeout) processing_time = time.time() - start_time # 获取主要数据 layout_parsing_results = api_result.get('layoutParsingResults', []) if not layout_parsing_results: print("⚠️ Warning: No layoutParsingResults found in API response") return [] # 处理API返回结果 input_path = Path(img_path) # 生成输出文件名 output_filename = input_path.stem # 处理结果 for idx, result in enumerate(layout_parsing_results): if idx > 0: raise ValueError("Multiple results found for a single image") json_content = result.get('prunedResult', {}) json_output_path, converted_json = convert_pruned_result_to_json( json_content, str(input_path), output_dir, output_filename, normalize_numbers=normalize_numbers ) # 保存输出图像 img_content = result.get('outputImages', {}) saved_images = save_output_images(img_content, str(output_dir), output_filename) # 保存Markdown内容 markdown_content = result.get('markdown', {}) md_output_path = save_markdown_content( markdown_content, output_dir, output_filename, normalize_numbers=normalize_numbers ) # 记录处理结果 all_results.append({ "image_path": str(input_path), "processing_time": processing_time, "success": True, "api_url": api_url, "output_json": json_output_path, "output_md": md_output_path, "is_pdf_page": "_page_" in input_path.name, # 标记是否为PDF页面 "processing_info": converted_json.get('processing_info', {}) }) # 更新进度条 success_count = sum(1 for r in all_results if r.get('success', False)) pbar.update(1) pbar.set_postfix({ 'time': f"{processing_time:.2f}s", 'success': f"{success_count}/{len(all_results)}", 'rate': f"{success_count/len(all_results)*100:.1f}%" }) except Exception as e: print(f"Error processing {Path(img_path).name}: {e}", file=sys.stderr) import traceback traceback.print_exc() # 添加错误结果 all_results.append({ "image_path": str(img_path), "processing_time": 0, "success": False, "api_url": api_url, "error": str(e), "is_pdf_page": "_page_" in Path(img_path).name }) pbar.update(1) return all_results def main(): """主函数""" parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 API Client - Unified PDF/Image Processor") # 参数定义 input_group = parser.add_mutually_exclusive_group(required=True) input_group.add_argument("--input_file", type=str, help="Input file (supports both PDF and image file)") input_group.add_argument("--input_dir", type=str, help="Input directory (supports both PDF and image files)") input_group.add_argument("--input_file_list", type=str, help="Input file list (one file per line)") input_group.add_argument("--input_csv", type=str, help="Input CSV file with image_path and status columns") parser.add_argument("--output_dir", type=str, required=True, help="Output directory") parser.add_argument("--api_url", type=str, default="http://localhost:8080/layout-parsing", help="API URL") parser.add_argument("--pdf_dpi", type=int, default=200, help="DPI for PDF to image conversion") parser.add_argument("--timeout", type=int, default=300, help="API timeout in seconds") parser.add_argument("--no-normalize", action="store_true", help="禁用数字标准化") parser.add_argument("--test_mode", action="store_true", help="Test mode (process only 20 files)") parser.add_argument("--collect_results", type=str, help="收集处理结果到指定CSV文件") args = parser.parse_args() normalize_numbers = not args.no_normalize try: # 获取并预处理输入文件 print("🔄 Preprocessing input files...") input_files = get_input_files(args) if not input_files: print("❌ No input files found or processed") return 1 if args.test_mode: input_files = input_files[:20] print(f"Test mode: processing only {len(input_files)} images") print(f"🌐 Using API: {args.api_url}") print(f"🔧 数字标准化: {'启用' if normalize_numbers else '禁用'}") print(f"⏱️ Timeout: {args.timeout} seconds") # 开始处理 start_time = time.time() results = process_images_via_api( input_files, args.api_url, args.output_dir, normalize_numbers=normalize_numbers, timeout=args.timeout ) total_time = time.time() - start_time # 统计结果 success_count = sum(1 for r in results if r.get('success', False)) error_count = len(results) - success_count pdf_page_count = sum(1 for r in results if r.get('is_pdf_page', False)) total_changes = sum(r.get('processing_info', {}).get('character_changes_count', 0) for r in results if 'processing_info' in r) print(f"\n" + "="*60) print(f"✅ API Processing completed!") print(f"📊 Statistics:") print(f" Total files processed: {len(input_files)}") print(f" PDF pages processed: {pdf_page_count}") print(f" Regular images processed: {len(input_files) - pdf_page_count}") print(f" Successful: {success_count}") print(f" Failed: {error_count}") if len(input_files) > 0: print(f" Success rate: {success_count / len(input_files) * 100:.2f}%") if normalize_numbers: print(f" 总标准化字符数: {total_changes}") print(f"⏱️ Performance:") print(f" Total time: {total_time:.2f} seconds") if total_time > 0: print(f" Throughput: {len(input_files) / total_time:.2f} files/second") print(f" Avg time per file: {total_time / len(input_files):.2f} seconds") # 保存结果统计 stats = { "total_files": len(input_files), "pdf_pages": pdf_page_count, "regular_images": len(input_files) - pdf_page_count, "success_count": success_count, "error_count": error_count, "success_rate": success_count / len(input_files) if len(input_files) > 0 else 0, "total_time": total_time, "throughput": len(input_files) / total_time if total_time > 0 else 0, "avg_time_per_file": total_time / len(input_files) if len(input_files) > 0 else 0, "api_url": args.api_url, "pdf_dpi": args.pdf_dpi, "normalize_numbers": normalize_numbers, "total_character_changes": total_changes, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } # 保存最终结果 output_file_name = Path(args.output_dir).name output_file = os.path.join(args.output_dir, f"{output_file_name}_api_results.json") final_results = { "stats": stats, "results": results } with open(output_file, 'w', encoding='utf-8') as f: json.dump(final_results, f, ensure_ascii=False, indent=2) print(f"💾 Results saved to: {output_file}") # 如果没有收集结果的路径,使用缺省文件名,和output_dir同一路径 if not args.collect_results: output_file_processed = Path(args.output_dir) / f"processed_files_{time.strftime('%Y%m%d_%H%M%S')}.csv" else: output_file_processed = Path(args.collect_results).resolve() processed_files = collect_pid_files(output_file) with open(output_file_processed, 'w', encoding='utf-8') as f: f.write("image_path,status\n") for file_path, status in processed_files: f.write(f"{file_path},{status}\n") print(f"💾 Processed files saved to: {output_file_processed}") return 0 except Exception as e: print(f"❌ Processing failed: {e}", file=sys.stderr) traceback.print_exc() return 1 if __name__ == "__main__": print(f"🚀 启动PP-StructureV3 API客户端...") if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 print("ℹ️ No command line arguments provided. Running with default configuration...") # 默认配置 default_config = { # "input_file": "/Users/zhch158/workspace/data/至远彩色印刷工业有限公司/data_PPStructureV3_Results/2023年度报告母公司/2023年度报告母公司_page_027.png", # "input_file": "/home/ubuntu/zhch/data/至远彩色印刷工业有限公司/PPStructureV3_Results/2023年度报告母公司/2023年度报告母公司_page_027.png", "input_file": "/home/ubuntu/zhch/data/至远彩色印刷工业有限公司/2023年度报告母公司.pdf", "output_dir": "/home/ubuntu/zhch/data/至远彩色印刷工业有限公司/PPStructureV3_Results", "collect_results": f"/home/ubuntu/zhch/data/至远彩色印刷工业有限公司/PPStructureV3_Results/processed_files_{time.strftime('%Y%m%d_%H%M%S')}.csv", # "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images", # "output_dir": "./OmniDocBench_API_Results", # "collect_results": f"./OmniDocBench_API_Results/processed_files_{time.strftime('%Y%m%d_%H%M%S')}.csv", "api_url": "http://10.192.72.11:8111/layout-parsing", "timeout": "300", } # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) # sys.argv.append("--no-normalize") # 测试模式 # sys.argv.append("--test_mode") sys.exit(main())