""" 多GPU多进程推理始终有问题,多个进程启动后,paddle底层报错 目前无法定位原因 """ import json import time import os import argparse import sys import subprocess import tempfile from pathlib import Path from typing import List, Dict, Any, Tuple from concurrent.futures import ProcessPoolExecutor, as_completed import threading from queue import Queue from tqdm import tqdm from utils import ( get_image_files_from_dir, get_image_files_from_list, get_image_files_from_csv, split_files, create_temp_file_list, collect_pid_files ) def collect_processed_files(results: List[Dict[str, Any]]) -> List[Tuple[str, str]]: """ 从处理结果中收集文件 Args: results: 处理结果列表 Returns: 文件列表(文件路径,处理结果), """ processed_files = [] for result in results: """ 根据output_dir+process_id找到每个进程的结果文件 { "process_id": 1, "success": true, "processing_time": 42.744526386260986, "file_count": 5, "device": "gpu:1", "output_dir": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_1", ... } """ pid_output_file = Path(result.get("output_dir", "")) / f"process_{result['process_id']}.json" if not pid_output_file.exists(): print(f"⚠️ Warning: Output file not found for process {result['process_id']}: {pid_output_file}") if not result.get("success", False): # 整个进程失败的情况 process_failed_files = result.get("failed_files", []) processed_files.extend([(f, "fail") for f in process_failed_files if f]) else: pid_files = collect_pid_files(str(pid_output_file)) processed_files.extend(pid_files) return processed_files def run_single_process(args: Tuple[List[str], Dict[str, Any], int]) -> Dict[str, Any]: """ 运行单个ppstructurev3_single_process.py进程 Args: args: (file_chunk, config, process_id) Returns: 处理结果 """ file_chunk, config, process_id = args if not file_chunk: return {"process_id": process_id, "success": False, "error": "Empty file chunk"} # 创建临时文件列表 temp_file_list = create_temp_file_list(file_chunk) try: # 创建进程专用的输出目录 process_output_dir = Path(config["output_dir"]) / f"process_{process_id}_{time.strftime('%Y%m%d_%H%M%S')}" process_output_dir.mkdir(parents=True, exist_ok=True) # 构建命令行参数 cmd = [ sys.executable, config["single_process_script"], "--input_file_list", temp_file_list, # 需要修改single_process脚本支持文件列表 "--output_dir", str(process_output_dir), "--pipeline", config["pipeline"], "--device", config["device"], ] # 添加可选参数 if config.get("test_mode", False): cmd.append("--test_mode") print(f"Process {process_id} starting with {len(file_chunk)} files on device {config['device']}") # 执行子进程 start_time = time.time() result = subprocess.run( cmd, capture_output=True, text=True, timeout=config.get("timeout", 3600) # 1小时超时 ) processing_time = time.time() - start_time if result.returncode == 0: print(f"Process {process_id} completed successfully in {processing_time:.2f}s") # 读取结果文件 result_files = list(process_output_dir.glob("*.json")) return { "process_id": process_id, "success": True, "processing_time": processing_time, "file_count": len(file_chunk), "device": config["device"], "output_dir": str(process_output_dir), "result_files": [str(f) for f in result_files], "stdout": result.stdout, "stderr": result.stderr } else: print(f"Process {process_id} failed with return code {result.returncode}") return { "process_id": process_id, "success": False, "error": f"Process failed with return code {result.returncode}", "processing_time": processing_time, "file_count": len(file_chunk), "device": config["device"], "output_dir": str(process_output_dir), "failed_files": [str(f) for f in file_chunk], "stdout": result.stdout, "stderr": result.stderr } except subprocess.TimeoutExpired: print(f"Process {process_id} timed out") return { "process_id": process_id, "success": False, "error": "Process timeout", "device": config["device"], "output_dir": str(process_output_dir), "failed_files": [str(f) for f in file_chunk] } except Exception as e: print(f"Process {process_id} error: {e}") return { "process_id": process_id, "success": False, "error": str(e), "device": config["device"], "output_dir": str(process_output_dir), "failed_files": [str(f) for f in file_chunk] } finally: # 清理临时文件 try: os.unlink(temp_file_list) except: pass def monitor_progress(total_files: int, completed_queue: Queue): """ 监控处理进度 """ with tqdm(total=total_files, desc="Total Progress", unit="files") as pbar: completed_count = 0 while completed_count < total_files: try: batch_count = completed_queue.get(timeout=1) completed_count += batch_count pbar.update(batch_count) except: continue def main(): """主函数""" parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 Multi-Process Scheduler") # 输入输出参数 input_group = parser.add_mutually_exclusive_group(required=True) input_group.add_argument("--input_dir", type=str, help="Input directory") input_group.add_argument("--input_file_list", type=str, help="Input file list (one file per line)") input_group.add_argument("--input_csv", type=str, help="Input CSV file with image_path and status columns") parser.add_argument("--output_dir", type=str, required=True, help="Output directory") parser.add_argument("--single_process_script", type=str, default="./ppstructurev3_single_process.py", help="Path to single process script") # 并行参数 parser.add_argument("--num_processes", type=int, default=4, help="Number of parallel processes") parser.add_argument("--devices", type=str, default="gpu:0,gpu:1,gpu:2,gpu:3", help="Device list (comma separated)") # Pipeline参数 parser.add_argument("--pipeline", type=str, default="PP-StructureV3", help="Pipeline name") parser.add_argument("--timeout", type=int, default=3600, help="Process timeout in seconds") # 其他参数 parser.add_argument("--test_mode", action="store_true", help="Test mode") parser.add_argument("--max_files", type=int, default=None, help="Maximum files to process") args = parser.parse_args() try: # 获取图像文件列表 if args.input_csv: # 从CSV文件读取 image_files = get_image_files_from_csv(args.input_csv, "fail") print(f"📊 Loaded {len(image_files)} files from CSV with status filter: fail") elif args.input_file_list: # 从文件列表读取 image_files = get_image_files_from_list(args.input_file_list) else: # 从目录读取 input_dir = Path(args.input_dir).resolve() print(f"📁 Input dir: {input_dir}") if not input_dir.exists(): print(f"❌ Input directory does not exist: {input_dir}") return 1 print(f"Input dir: {input_dir}") image_files = get_image_files_from_dir(input_dir, args.max_files) output_dir = Path(args.output_dir).resolve() print(f"Output dir: {output_dir}") # 限制文件数量 if args.max_files: image_files = image_files[:args.max_files] if args.test_mode: image_files = image_files[:20] print(f"Test mode: processing only {len(image_files)} images") print(f"Found {len(image_files)} image files") # 解析设备列表 devices = [d.strip() for d in args.devices.split(',')] if len(devices) < args.num_processes: # 如果设备数少于进程数,循环使用设备 devices = devices * ((args.num_processes // len(devices)) + 1) devices = devices[:args.num_processes] print(f"Using {args.num_processes} processes with devices: {devices}") # 分割文件列表 file_chunks = split_files(image_files, args.num_processes) print(f"Split into {len(file_chunks)} chunks: {[len(chunk) for chunk in file_chunks]}") # 创建输出目录 output_dir.mkdir(parents=True, exist_ok=True) # 准备进程参数 process_configs = [] for i, (chunk, device) in enumerate(zip(file_chunks, devices)): config = { "single_process_script": str(Path(args.single_process_script).resolve()), "output_dir": str(output_dir), "pipeline": args.pipeline, "device": device, "timeout": args.timeout, "test_mode": args.test_mode } process_configs.append((chunk, config, i)) # 启动进度监控 completed_queue = Queue() progress_thread = threading.Thread( target=monitor_progress, args=(len(image_files), completed_queue) ) progress_thread.daemon = True progress_thread.start() # 执行并行处理 start_time = time.time() results = [] with ProcessPoolExecutor(max_workers=args.num_processes) as executor: # 提交所有任务 future_to_process = { executor.submit(run_single_process, config): i for i, config in enumerate(process_configs) } # 收集结果 for future in as_completed(future_to_process): process_id = future_to_process[future] try: result = future.result() results.append(result) # 更新进度 if result.get("success", False): completed_queue.put(result.get("file_count", 0)) print(f"Process {process_id} finished: {result.get('success', False)}") except Exception as e: print(f"Process {process_id} generated an exception: {e}") results.append({ "process_id": process_id, "success": False, "error": str(e) }) total_time = time.time() - start_time # 统计结果 successful_processes = sum(1 for r in results if r.get('success', False)) total_processed_files = sum(r.get('file_count', 0) for r in results if r.get('success', False)) print(f"\n" + "="*60) print(f"🎉 Parallel processing completed!") print(f"📊 Statistics:") print(f" Total processes: {len(results)}") print(f" Successful processes: {successful_processes}") print(f" Total files processed: {total_processed_files}/{len(image_files)}") print(f" Success rate: {total_processed_files/len(image_files)*100:.2f}%") print(f"⏱️ Performance:") print(f" Total time: {total_time:.2f} seconds") print(f" Throughput: {total_processed_files/total_time:.2f} files/second") print(f" Avg time per file: {total_time/total_processed_files:.2f} seconds") # 保存调度结果 scheduler_stats = { "total_files": len(image_files), "total_processes": len(results), "successful_processes": successful_processes, "total_processed_files": total_processed_files, "success_rate": total_processed_files / len(image_files) if len(image_files) > 0 else 0, "total_time": total_time, "throughput": total_processed_files / total_time if total_time > 0 else 0, "avg_time_per_file": total_time / total_processed_files if total_processed_files > 0 else 0, "num_processes": args.num_processes, "devices": devices, "pipeline": args.pipeline, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } final_results = { "scheduler_stats": scheduler_stats, "process_results": results } # 保存结果 output_file = output_dir / f"scheduler_results_{args.num_processes}procs_{time.strftime('%Y%m%d_%H%M%S')}.json" with open(output_file, 'w', encoding='utf-8') as f: json.dump(final_results, f, ensure_ascii=False, indent=2) print(f"💾 Scheduler results saved to: {output_file}") # 收集文件处理结果 processed_files = [] processed_files = collect_processed_files(results) output_file_processed = output_dir / f"processed_files_{args.num_processes}procs_{time.strftime('%Y%m%d_%H%M%S')}.csv" with open(output_file_processed, 'w', encoding='utf-8') as f: f.write("image_path,status\n") for file_path, status in processed_files: f.write(f"{file_path},{status}\n") print(f"💾 Processed files saved to: {output_file_processed}") return 0 if successful_processes == len(results) else 1 except Exception as e: print(f"❌ Scheduler failed: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": print(f"🚀 启动多进程调度程序..., 约定各进程统计文件名为: process_{{process_id}}.json") if len(sys.argv) == 1: # 默认配置 default_config = { "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images", "output_dir": "./OmniDocBench_Results_Scheduler", "pipeline": "./my_config/PP-StructureV3.yaml", "num_processes": 3, "devices": "gpu:0,gpu:1,gpu:2,gpu:3", } # default_config = { # "input_csv": "./OmniDocBench_Results_Scheduler/processed_files_4procs_20250814_213138.csv", # "output_dir": "./OmniDocBench_Results_Scheduler", # "pipeline": "./my_config/PP-StructureV3.yaml", # "num_processes": 4, # "devices": "gpu:0,gpu:1,gpu:2,gpu:3", # } sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) sys.argv.append("--test_mode") sys.exit(main())