import json import time import os import traceback import argparse import sys import warnings from pathlib import Path from typing import List, Dict, Any import cv2 import numpy as np # 抑制特定警告 warnings.filterwarnings("ignore", message="To copy construct from a tensor") warnings.filterwarnings("ignore", message="Setting `pad_token_id`") warnings.filterwarnings("ignore", category=UserWarning, module="paddlex") from paddlex import create_pipeline from paddlex.utils.device import constr_device, parse_device from tqdm import tqdm from dotenv import load_dotenv load_dotenv(override=True) def process_images_single_process(image_paths: List[str], pipeline_name: str = "PP-StructureV3", device: str = "gpu:0", batch_size: int = 1, output_dir: str = "./output") -> List[Dict[str, Any]]: """ 单进程版本的图像处理函数 Args: image_paths: 图像路径列表 pipeline_name: Pipeline名称 device: 设备字符串,如"gpu:0"或"cpu" batch_size: 批处理大小 output_dir: 输出目录 Returns: 处理结果列表 """ # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) print(f"Initializing pipeline '{pipeline_name}' on device '{device}'...") try: # 设置环境变量以减少警告 os.environ['PYTHONWARNINGS'] = 'ignore::UserWarning' # 初始化pipeline pipeline = create_pipeline(pipeline_name, device=device) print(f"Pipeline initialized successfully on {device}") except Exception as e: print(f"Failed to initialize pipeline: {e}", file=sys.stderr) traceback.print_exc() return [] all_results = [] total_images = len(image_paths) print(f"Processing {total_images} images with batch size {batch_size}") # 使用tqdm显示进度,添加更多统计信息 with tqdm(total=total_images, desc="Processing images", unit="img", bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar: # 按批次处理图像 for i in range(0, total_images, batch_size): batch = image_paths[i:i + batch_size] batch_start_time = time.time() try: # 使用pipeline预测 results = pipeline.predict( batch, use_doc_orientation_classify=True, use_doc_unwarping=False, use_seal_recognition=True, use_chart_recognition=True, use_table_recognition=True, use_formula_recognition=True, ) batch_processing_time = time.time() - batch_start_time batch_results = [] # 处理每个结果 for result in results: try: input_path = Path(result["input_path"]) # 生成输出文件名 if result.get("page_index") is not None: output_filename = f"{input_path.stem}_{result['page_index']}" else: output_filename = f"{input_path.stem}" # 保存JSON和Markdown文件 json_output_path = str(Path(output_dir, f"{output_filename}.json")) md_output_path = str(Path(output_dir, f"{output_filename}.md")) result.save_to_json(json_output_path) result.save_to_markdown(md_output_path) # 记录处理结果 batch_results.append({ "image_path": input_path.name, "processing_time": batch_processing_time / len(batch), # 平均时间 "success": True, "device": device, "output_json": json_output_path, "output_md": md_output_path }) except Exception as e: print(f"Error saving result for {result.get('input_path', 'unknown')}: {e}", file=sys.stderr) traceback.print_exc() batch_results.append({ "image_path": Path(result["input_path"]).name, "processing_time": 0, "success": False, "device": device, "error": str(e) }) all_results.extend(batch_results) # 更新进度条 success_count = sum(1 for r in batch_results if r.get('success', False)) total_success = sum(1 for r in all_results if r.get('success', False)) avg_time = batch_processing_time / len(batch) pbar.update(len(batch)) pbar.set_postfix({ 'batch_time': f"{batch_processing_time:.2f}s", 'avg_time': f"{avg_time:.2f}s/img", 'success': f"{total_success}/{len(all_results)}", 'rate': f"{total_success/len(all_results)*100:.1f}%" }) except Exception as e: print(f"Error processing batch {[Path(p).name for p in batch]}: {e}", file=sys.stderr) traceback.print_exc() # 为批次中的所有图像添加错误结果 error_results = [] for img_path in batch: error_results.append({ "image_path": Path(img_path).name, "processing_time": 0, "success": False, "device": device, "error": str(e) }) all_results.extend(error_results) pbar.update(len(batch)) return all_results def main(): """主函数""" parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 Single Process Processing") # 参数定义 parser.add_argument("--input_dir", type=str, default="../../OmniDocBench/OpenDataLab___OmniDocBench/images", help="Input directory") parser.add_argument("--output_dir", type=str, default="./OmniDocBench_Results_Single", help="Output directory") parser.add_argument("--pipeline", type=str, default="PP-StructureV3", help="Pipeline name") parser.add_argument("--device", type=str, default="gpu:0", help="Device string (e.g., 'gpu:0', 'cpu')") parser.add_argument("--batch_size", type=int, default=4, help="Batch size") parser.add_argument("--input_pattern", type=str, default="*", help="Input file pattern") parser.add_argument("--test_mode", action="store_true", help="Test mode (process only 20 images)") args = parser.parse_args() try: # 获取图像文件列表 input_dir = Path(args.input_dir).resolve() output_dir = Path(args.output_dir).resolve() print(f"Input dir: {input_dir}") print(f"Output dir: {output_dir}") if not input_dir.exists(): print(f"Input directory does not exist: {input_dir}") return 1 # 查找图像文件 image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'] image_files = [] for ext in image_extensions: image_files.extend(list(input_dir.glob(f"*{ext}"))) image_files.extend(list(input_dir.glob(f"*{ext.upper()}"))) if not image_files: print(f"No image files found in {input_dir}") return 1 # 去重并排序 image_files = sorted(list(set(str(f) for f in image_files))) print(f"Found {len(image_files)} image files") if args.test_mode: image_files = image_files[:20] print(f"Test mode: processing only {len(image_files)} images") # 验证设备 if args.device.startswith('gpu'): try: import paddle if not paddle.device.is_compiled_with_cuda(): print("GPU requested but CUDA not available, falling back to CPU") args.device = "cpu" else: gpu_count = paddle.device.cuda.device_count() device_id = int(args.device.split(':')[1]) if ':' in args.device else 0 if device_id >= gpu_count: print(f"GPU {device_id} not available (only {gpu_count} GPUs), falling back to GPU 0") args.device = "gpu:0" # 显示GPU信息 if args.verbose: for i in range(gpu_count): props = paddle.device.cuda.get_device_properties(i) print(f"GPU {i}: {props.name} - {props.total_memory // 1024**3}GB") except Exception as e: print(f"Error checking GPU availability: {e}, falling back to CPU") args.device = "cpu" print(f"Using device: {args.device}") print(f"Batch size: {args.batch_size}") # 开始处理 start_time = time.time() results = process_images_single_process( image_files, args.pipeline, args.device, args.batch_size, str(output_dir) ) total_time = time.time() - start_time # 统计结果 success_count = sum(1 for r in results if r.get('success', False)) error_count = len(results) - success_count print(f"\n" + "="*60) print(f"✅ Processing completed!") print(f"📊 Statistics:") print(f" Total files: {len(image_files)}") print(f" Successful: {success_count}") print(f" Failed: {error_count}") if len(image_files) > 0: print(f" Success rate: {success_count / len(image_files) * 100:.2f}%") print(f"⏱️ Performance:") print(f" Total time: {total_time:.2f} seconds") if total_time > 0: print(f" Throughput: {len(image_files) / total_time:.2f} images/second") print(f" Avg time per image: {total_time / len(image_files):.2f} seconds") # 保存结果统计 stats = { "total_files": len(image_files), "success_count": success_count, "error_count": error_count, "success_rate": success_count / len(image_files) if len(image_files) > 0 else 0, "total_time": total_time, "throughput": len(image_files) / total_time if total_time > 0 else 0, "avg_time_per_image": total_time / len(image_files) if len(image_files) > 0 else 0, "batch_size": args.batch_size, "device": args.device, "pipeline": args.pipeline, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } # 保存最终结果 output_file = os.path.join(output_dir, f"OmniDocBench_Single_batch{args.batch_size}.json") final_results = { "stats": stats, "results": results } with open(output_file, 'w', encoding='utf-8') as f: json.dump(final_results, f, ensure_ascii=False, indent=2) print(f"💾 Results saved to: {output_file}") return 0 except Exception as e: print(f"❌ Processing failed: {e}", file=sys.stderr) traceback.print_exc() return 1 if __name__ == "__main__": print(f"🚀 启动单进程OCR程序...") print(f"🔧 CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES', 'Not set')}") if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 print("ℹ️ No command line arguments provided. Running with default configuration...") # 默认配置 default_config = { "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images", "output_dir": "./OmniDocBench_Results_Single", "pipeline": "PP-StructureV3", "device": "gpu:0", "batch_size": 4, } # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) # 测试模式 sys.argv.append("--test_mode") sys.exit(main())