"""单进程运行稳定""" import json import time import os import traceback import argparse import sys import warnings from pathlib import Path from typing import List, Dict, Any import cv2 import numpy as np # 抑制特定警告 warnings.filterwarnings("ignore", message="To copy construct from a tensor") warnings.filterwarnings("ignore", message="Setting `pad_token_id`") warnings.filterwarnings("ignore", category=UserWarning, module="paddlex") from paddlex import create_pipeline from paddlex.utils.device import constr_device, parse_device from tqdm import tqdm from dotenv import load_dotenv load_dotenv(override=True) from utils import ( get_image_files_from_dir, get_image_files_from_list, get_image_files_from_csv, collect_pid_files ) def process_images_single_process(image_paths: List[str], pipeline_name: str = "PP-StructureV3", device: str = "gpu:0", batch_size: int = 1, output_dir: str = "./output") -> List[Dict[str, Any]]: """ 单进程版本的图像处理函数 Args: image_paths: 图像路径列表 pipeline_name: Pipeline名称 device: 设备字符串,如"gpu:0"或"cpu" batch_size: 批处理大小 output_dir: 输出目录 Returns: 处理结果列表 """ # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) print(f"Initializing pipeline '{pipeline_name}' on device '{device}'...") try: # 设置环境变量以减少警告 os.environ['PYTHONWARNINGS'] = 'ignore::UserWarning' # 初始化pipeline pipeline = create_pipeline(pipeline_name, device=device) print(f"Pipeline initialized successfully on {device}") except Exception as e: print(f"Failed to initialize pipeline: {e}", file=sys.stderr) traceback.print_exc() return [] all_results = [] total_images = len(image_paths) print(f"Processing {total_images} images with batch size {batch_size}") # 使用tqdm显示进度,添加更多统计信息 with tqdm(total=total_images, desc="Processing images", unit="img", bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar: # 按批次处理图像 for i in range(0, total_images, batch_size): batch = image_paths[i:i + batch_size] batch_start_time = time.time() try: # 使用pipeline预测 results = pipeline.predict( batch, use_doc_orientation_classify=True, use_doc_unwarping=False, use_seal_recognition=True, use_chart_recognition=True, use_table_recognition=True, use_formula_recognition=True, ) batch_processing_time = time.time() - batch_start_time batch_results = [] # 处理每个结果 for result in results: try: input_path = Path(result["input_path"]) # 生成输出文件名 if result.get("page_index") is not None: output_filename = f"{input_path.stem}_{result['page_index']}" else: output_filename = f"{input_path.stem}" # 保存JSON和Markdown文件 json_output_path = str(Path(output_dir, f"{output_filename}.json")) md_output_path = str(Path(output_dir, f"{output_filename}.md")) result.save_to_json(json_output_path) result.save_to_markdown(md_output_path) # 记录处理结果 batch_results.append({ "image_path": str(input_path), "processing_time": batch_processing_time / len(batch), # 平均时间 "success": True, "device": device, "output_json": json_output_path, "output_md": md_output_path }) except Exception as e: print(f"Error saving result for {result.get('input_path', 'unknown')}: {e}", file=sys.stderr) traceback.print_exc() batch_results.append({ "image_path": str(input_path), "processing_time": 0, "success": False, "device": device, "error": str(e) }) all_results.extend(batch_results) # 更新进度条 success_count = sum(1 for r in batch_results if r.get('success', False)) total_success = sum(1 for r in all_results if r.get('success', False)) avg_time = batch_processing_time / len(batch) pbar.update(len(batch)) pbar.set_postfix({ 'batch_time': f"{batch_processing_time:.2f}s", 'avg_time': f"{avg_time:.2f}s/img", 'success': f"{total_success}/{len(all_results)}", 'rate': f"{total_success/len(all_results)*100:.1f}%" }) except Exception as e: print(f"Error processing batch {[Path(p).name for p in batch]}: {e}", file=sys.stderr) traceback.print_exc() # 为批次中的所有图像添加错误结果 error_results = [] for img_path in batch: error_results.append({ "image_path": str(img_path), "processing_time": 0, "success": False, "device": device, "error": str(e) }) all_results.extend(error_results) pbar.update(len(batch)) return all_results def main(): """主函数""" parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 Single Process Processing") # 参数定义 input_group = parser.add_mutually_exclusive_group(required=True) input_group.add_argument("--input_dir", type=str, help="Input directory") input_group.add_argument("--input_file_list", type=str, help="Input file list (one file per line)") input_group.add_argument("--input_csv", type=str, help="Input CSV file with image_path and status columns") parser.add_argument("--output_dir", type=str, help="Output directory") parser.add_argument("--pipeline", type=str, default="PP-StructureV3", help="Pipeline name") parser.add_argument("--device", type=str, default="gpu:0", help="Device string (e.g., 'gpu:0', 'cpu')") parser.add_argument("--batch_size", type=int, default=1, help="Batch size") parser.add_argument("--input_pattern", type=str, default="*", help="Input file pattern") parser.add_argument("--test_mode", action="store_true", help="Test mode (process only 20 images)") parser.add_argument("--collect_results",type=str, help="收集处理结果到指定CSV文件") args = parser.parse_args() try: # 获取图像文件列表 if args.input_csv: # 从CSV文件读取 image_files = get_image_files_from_csv(args.input_csv, "fail") print(f"📊 Loaded {len(image_files)} files from CSV with status filter: fail") elif args.input_file_list: # 从文件列表读取 image_files = get_image_files_from_list(args.input_file_list) else: # 从目录读取 input_dir = Path(args.input_dir).resolve() print(f"📁 Input dir: {input_dir}") if not input_dir.exists(): print(f"❌ Input directory does not exist: {input_dir}") return 1 print(f"Input dir: {input_dir}") image_files = get_image_files_from_dir(input_dir) output_dir = Path(args.output_dir).resolve() print(f"Output dir: {output_dir}") print(f"Found {len(image_files)} image files") if args.test_mode: image_files = image_files[:20] print(f"Test mode: processing only {len(image_files)} images") print(f"Using device: {args.device}") print(f"Batch size: {args.batch_size}") # 开始处理 start_time = time.time() results = process_images_single_process( image_files, args.pipeline, args.device, args.batch_size, str(output_dir) ) total_time = time.time() - start_time # 统计结果 success_count = sum(1 for r in results if r.get('success', False)) error_count = len(results) - success_count print(f"\n" + "="*60) print(f"✅ Processing completed!") print(f"📊 Statistics:") print(f" Total files: {len(image_files)}") print(f" Successful: {success_count}") print(f" Failed: {error_count}") if len(image_files) > 0: print(f" Success rate: {success_count / len(image_files) * 100:.2f}%") print(f"⏱️ Performance:") print(f" Total time: {total_time:.2f} seconds") if total_time > 0: print(f" Throughput: {len(image_files) / total_time:.2f} images/second") print(f" Avg time per image: {total_time / len(image_files):.2f} seconds") # 保存结果统计 stats = { "total_files": len(image_files), "success_count": success_count, "error_count": error_count, "success_rate": success_count / len(image_files) if len(image_files) > 0 else 0, "total_time": total_time, "throughput": len(image_files) / total_time if total_time > 0 else 0, "avg_time_per_image": total_time / len(image_files) if len(image_files) > 0 else 0, "batch_size": args.batch_size, "device": args.device, "pipeline": args.pipeline, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } # 保存最终结果 output_file_name = Path(output_dir).name output_file = os.path.join(output_dir, f"{output_file_name}.json") final_results = { "stats": stats, "results": results } with open(output_file, 'w', encoding='utf-8') as f: json.dump(final_results, f, ensure_ascii=False, indent=2) print(f"💾 Results saved to: {output_file}") if args.collect_results: processed_files = collect_pid_files(output_file) output_file_processed = Path(args.collect_results).resolve() with open(output_file_processed, 'w', encoding='utf-8') as f: f.write("image_path,status\n") for file_path, status in processed_files: f.write(f"{file_path},{status}\n") print(f"💾 Processed files saved to: {output_file_processed}") return 0 except Exception as e: print(f"❌ Processing failed: {e}", file=sys.stderr) traceback.print_exc() return 1 if __name__ == "__main__": print(f"🚀 启动单进程OCR程序...") print(f"🔧 CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES', 'Not set')}") if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 print("ℹ️ No command line arguments provided. Running with default configuration...") # 默认配置 default_config = { "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images", "output_dir": "./OmniDocBench_PPStructureV3_Results", "pipeline": "PP-StructureV3", "device": "gpu:0", "batch_size": 2, "collect_results": "./OmniDocBench_PPStructureV3_Results/processed_files.csv", } # default_config = { # "input_csv": "./OmniDocBench_PPStructureV3_Results/processed_files.csv", # "output_dir": "./OmniDocBench_PPStructureV3_Results", # "pipeline": "PP-StructureV3", # "device": "gpu:0", # "batch_size": 2, # "collect_results": f"./OmniDocBench_PPStructureV3_Results/processed_files_{time.strftime('%Y%m%d_%H%M%S')}.csv", # } # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) # 测试模式 # sys.argv.append("--test_mode") sys.exit(main())