ppstructurev3_scheduler.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. """
  2. 多GPU多进程推理始终有问题,多个进程启动后,paddle底层报错
  3. 目前无法定位原因
  4. """
  5. import json
  6. import time
  7. import os
  8. import argparse
  9. import sys
  10. import subprocess
  11. import tempfile
  12. from pathlib import Path
  13. from typing import List, Dict, Any, Tuple
  14. from concurrent.futures import ProcessPoolExecutor, as_completed
  15. import threading
  16. from queue import Queue
  17. from tqdm import tqdm
  18. from utils import (
  19. get_image_files_from_dir,
  20. get_image_files_from_list,
  21. get_image_files_from_csv,
  22. split_files,
  23. create_temp_file_list,
  24. collect_pid_files
  25. )
  26. def collect_processed_files(results: List[Dict[str, Any]]) -> List[Tuple[str, str]]:
  27. """
  28. 从处理结果中收集文件
  29. Args:
  30. results: 处理结果列表
  31. Returns:
  32. 文件列表(文件路径,处理结果),
  33. """
  34. processed_files = []
  35. for result in results:
  36. """
  37. 根据output_dir+process_id找到每个进程的结果文件
  38. {
  39. "process_id": 1,
  40. "success": true,
  41. "processing_time": 42.744526386260986,
  42. "file_count": 5,
  43. "device": "gpu:1",
  44. "output_dir": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_1",
  45. ...
  46. }
  47. """
  48. pid_output_file = Path(result.get("output_dir", "")) / f"process_{result['process_id']}.json"
  49. if not pid_output_file.exists():
  50. print(f"⚠️ Warning: Output file not found for process {result['process_id']}: {pid_output_file}")
  51. if not result.get("success", False):
  52. # 整个进程失败的情况
  53. process_failed_files = result.get("failed_files", [])
  54. processed_files.extend([(f, "fail") for f in process_failed_files if f])
  55. else:
  56. pid_files = collect_pid_files(str(pid_output_file))
  57. processed_files.extend(pid_files)
  58. return processed_files
  59. def run_single_process(args: Tuple[List[str], Dict[str, Any], int]) -> Dict[str, Any]:
  60. """
  61. 运行单个ppstructurev3_single_process.py进程
  62. Args:
  63. args: (file_chunk, config, process_id)
  64. Returns:
  65. 处理结果
  66. """
  67. file_chunk, config, process_id = args
  68. if not file_chunk:
  69. return {"process_id": process_id, "success": False, "error": "Empty file chunk"}
  70. # 创建临时文件列表
  71. temp_file_list = create_temp_file_list(file_chunk)
  72. try:
  73. # 创建进程专用的输出目录
  74. process_output_dir = Path(config["output_dir"]) / f"process_{process_id}_{time.strftime('%Y%m%d_%H%M%S')}"
  75. process_output_dir.mkdir(parents=True, exist_ok=True)
  76. # 构建命令行参数
  77. cmd = [
  78. sys.executable,
  79. config["single_process_script"],
  80. "--input_file_list", temp_file_list, # 需要修改single_process脚本支持文件列表
  81. "--output_dir", str(process_output_dir),
  82. "--pipeline", config["pipeline"],
  83. "--device", config["device"],
  84. ]
  85. # 添加可选参数
  86. if config.get("test_mode", False):
  87. cmd.append("--test_mode")
  88. print(f"Process {process_id} starting with {len(file_chunk)} files on device {config['device']}")
  89. # 执行子进程
  90. start_time = time.time()
  91. result = subprocess.run(
  92. cmd,
  93. capture_output=True,
  94. text=True,
  95. timeout=config.get("timeout", 3600) # 1小时超时
  96. )
  97. processing_time = time.time() - start_time
  98. if result.returncode == 0:
  99. print(f"Process {process_id} completed successfully in {processing_time:.2f}s")
  100. # 读取结果文件
  101. result_files = list(process_output_dir.glob("*.json"))
  102. return {
  103. "process_id": process_id,
  104. "success": True,
  105. "processing_time": processing_time,
  106. "file_count": len(file_chunk),
  107. "device": config["device"],
  108. "output_dir": str(process_output_dir),
  109. "result_files": [str(f) for f in result_files],
  110. "stdout": result.stdout,
  111. "stderr": result.stderr
  112. }
  113. else:
  114. print(f"Process {process_id} failed with return code {result.returncode}")
  115. return {
  116. "process_id": process_id,
  117. "success": False,
  118. "error": f"Process failed with return code {result.returncode}",
  119. "processing_time": processing_time,
  120. "file_count": len(file_chunk),
  121. "device": config["device"],
  122. "output_dir": str(process_output_dir),
  123. "failed_files": [str(f) for f in file_chunk],
  124. "stdout": result.stdout,
  125. "stderr": result.stderr
  126. }
  127. except subprocess.TimeoutExpired:
  128. print(f"Process {process_id} timed out")
  129. return {
  130. "process_id": process_id,
  131. "success": False,
  132. "error": "Process timeout",
  133. "device": config["device"],
  134. "output_dir": str(process_output_dir),
  135. "failed_files": [str(f) for f in file_chunk]
  136. }
  137. except Exception as e:
  138. print(f"Process {process_id} error: {e}")
  139. return {
  140. "process_id": process_id,
  141. "success": False,
  142. "error": str(e),
  143. "device": config["device"],
  144. "output_dir": str(process_output_dir),
  145. "failed_files": [str(f) for f in file_chunk]
  146. }
  147. finally:
  148. # 清理临时文件
  149. try:
  150. os.unlink(temp_file_list)
  151. except:
  152. pass
  153. def monitor_progress(total_files: int, completed_queue: Queue):
  154. """
  155. 监控处理进度
  156. """
  157. with tqdm(total=total_files, desc="Total Progress", unit="files") as pbar:
  158. completed_count = 0
  159. while completed_count < total_files:
  160. try:
  161. batch_count = completed_queue.get(timeout=1)
  162. completed_count += batch_count
  163. pbar.update(batch_count)
  164. except:
  165. continue
  166. def main():
  167. """主函数"""
  168. parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 Multi-Process Scheduler")
  169. # 输入输出参数
  170. input_group = parser.add_mutually_exclusive_group(required=True)
  171. input_group.add_argument("--input_dir", type=str, help="Input directory")
  172. input_group.add_argument("--input_file_list", type=str, help="Input file list (one file per line)")
  173. input_group.add_argument("--input_csv", type=str, help="Input CSV file with image_path and status columns")
  174. parser.add_argument("--output_dir", type=str, required=True, help="Output directory")
  175. parser.add_argument("--single_process_script", type=str,
  176. default="./ppstructurev3_single_process.py",
  177. help="Path to single process script")
  178. # 并行参数
  179. parser.add_argument("--num_processes", type=int, default=4, help="Number of parallel processes")
  180. parser.add_argument("--devices", type=str, default="gpu:0,gpu:1,gpu:2,gpu:3",
  181. help="Device list (comma separated)")
  182. # Pipeline参数
  183. parser.add_argument("--pipeline", type=str, default="PP-StructureV3", help="Pipeline name")
  184. parser.add_argument("--timeout", type=int, default=3600, help="Process timeout in seconds")
  185. # 其他参数
  186. parser.add_argument("--test_mode", action="store_true", help="Test mode")
  187. parser.add_argument("--max_files", type=int, default=None, help="Maximum files to process")
  188. args = parser.parse_args()
  189. try:
  190. # 获取图像文件列表
  191. if args.input_csv:
  192. # 从CSV文件读取
  193. image_files = get_image_files_from_csv(args.input_csv, "fail")
  194. print(f"📊 Loaded {len(image_files)} files from CSV with status filter: fail")
  195. elif args.input_file_list:
  196. # 从文件列表读取
  197. image_files = get_image_files_from_list(args.input_file_list)
  198. else:
  199. # 从目录读取
  200. input_dir = Path(args.input_dir).resolve()
  201. print(f"📁 Input dir: {input_dir}")
  202. if not input_dir.exists():
  203. print(f"❌ Input directory does not exist: {input_dir}")
  204. return 1
  205. print(f"Input dir: {input_dir}")
  206. image_files = get_image_files_from_dir(input_dir, args.max_files)
  207. output_dir = Path(args.output_dir).resolve()
  208. print(f"Output dir: {output_dir}")
  209. # 限制文件数量
  210. if args.max_files:
  211. image_files = image_files[:args.max_files]
  212. if args.test_mode:
  213. image_files = image_files[:20]
  214. print(f"Test mode: processing only {len(image_files)} images")
  215. print(f"Found {len(image_files)} image files")
  216. # 解析设备列表
  217. devices = [d.strip() for d in args.devices.split(',')]
  218. if len(devices) < args.num_processes:
  219. # 如果设备数少于进程数,循环使用设备
  220. devices = devices * ((args.num_processes // len(devices)) + 1)
  221. devices = devices[:args.num_processes]
  222. print(f"Using {args.num_processes} processes with devices: {devices}")
  223. # 分割文件列表
  224. file_chunks = split_files(image_files, args.num_processes)
  225. print(f"Split into {len(file_chunks)} chunks: {[len(chunk) for chunk in file_chunks]}")
  226. # 创建输出目录
  227. output_dir.mkdir(parents=True, exist_ok=True)
  228. # 准备进程参数
  229. process_configs = []
  230. for i, (chunk, device) in enumerate(zip(file_chunks, devices)):
  231. config = {
  232. "single_process_script": str(Path(args.single_process_script).resolve()),
  233. "output_dir": str(output_dir),
  234. "pipeline": args.pipeline,
  235. "device": device,
  236. "timeout": args.timeout,
  237. "test_mode": args.test_mode
  238. }
  239. process_configs.append((chunk, config, i))
  240. # 启动进度监控
  241. completed_queue = Queue()
  242. progress_thread = threading.Thread(
  243. target=monitor_progress,
  244. args=(len(image_files), completed_queue)
  245. )
  246. progress_thread.daemon = True
  247. progress_thread.start()
  248. # 执行并行处理
  249. start_time = time.time()
  250. results = []
  251. with ProcessPoolExecutor(max_workers=args.num_processes) as executor:
  252. # 提交所有任务
  253. future_to_process = {
  254. executor.submit(run_single_process, config): i
  255. for i, config in enumerate(process_configs)
  256. }
  257. # 收集结果
  258. for future in as_completed(future_to_process):
  259. process_id = future_to_process[future]
  260. try:
  261. result = future.result()
  262. results.append(result)
  263. # 更新进度
  264. if result.get("success", False):
  265. completed_queue.put(result.get("file_count", 0))
  266. print(f"Process {process_id} finished: {result.get('success', False)}")
  267. except Exception as e:
  268. print(f"Process {process_id} generated an exception: {e}")
  269. results.append({
  270. "process_id": process_id,
  271. "success": False,
  272. "error": str(e)
  273. })
  274. total_time = time.time() - start_time
  275. # 统计结果
  276. successful_processes = sum(1 for r in results if r.get('success', False))
  277. total_processed_files = sum(r.get('file_count', 0) for r in results if r.get('success', False))
  278. print(f"\n" + "="*60)
  279. print(f"🎉 Parallel processing completed!")
  280. print(f"📊 Statistics:")
  281. print(f" Total processes: {len(results)}")
  282. print(f" Successful processes: {successful_processes}")
  283. print(f" Total files processed: {total_processed_files}/{len(image_files)}")
  284. print(f" Success rate: {total_processed_files/len(image_files)*100:.2f}%")
  285. print(f"⏱️ Performance:")
  286. print(f" Total time: {total_time:.2f} seconds")
  287. print(f" Throughput: {total_processed_files/total_time:.2f} files/second")
  288. print(f" Avg time per file: {total_time/total_processed_files:.2f} seconds")
  289. # 保存调度结果
  290. scheduler_stats = {
  291. "total_files": len(image_files),
  292. "total_processes": len(results),
  293. "successful_processes": successful_processes,
  294. "total_processed_files": total_processed_files,
  295. "success_rate": total_processed_files / len(image_files) if len(image_files) > 0 else 0,
  296. "total_time": total_time,
  297. "throughput": total_processed_files / total_time if total_time > 0 else 0,
  298. "avg_time_per_file": total_time / total_processed_files if total_processed_files > 0 else 0,
  299. "num_processes": args.num_processes,
  300. "devices": devices,
  301. "pipeline": args.pipeline,
  302. "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
  303. }
  304. final_results = {
  305. "scheduler_stats": scheduler_stats,
  306. "process_results": results
  307. }
  308. # 保存结果
  309. output_file = output_dir / f"scheduler_results_{args.num_processes}procs_{time.strftime('%Y%m%d_%H%M%S')}.json"
  310. with open(output_file, 'w', encoding='utf-8') as f:
  311. json.dump(final_results, f, ensure_ascii=False, indent=2)
  312. print(f"💾 Scheduler results saved to: {output_file}")
  313. # 收集文件处理结果
  314. processed_files = []
  315. processed_files = collect_processed_files(results)
  316. output_file_processed = output_dir / f"processed_files_{args.num_processes}procs_{time.strftime('%Y%m%d_%H%M%S')}.csv"
  317. with open(output_file_processed, 'w', encoding='utf-8') as f:
  318. f.write("image_path,status\n")
  319. for file_path, status in processed_files:
  320. f.write(f"{file_path},{status}\n")
  321. print(f"💾 Processed files saved to: {output_file_processed}")
  322. return 0 if successful_processes == len(results) else 1
  323. except Exception as e:
  324. print(f"❌ Scheduler failed: {e}")
  325. import traceback
  326. traceback.print_exc()
  327. return 1
  328. if __name__ == "__main__":
  329. print(f"🚀 启动多进程调度程序..., 约定各进程统计文件名为: process_{{process_id}}.json")
  330. if len(sys.argv) == 1:
  331. # 默认配置
  332. default_config = {
  333. "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images",
  334. "output_dir": "./OmniDocBench_Results_Scheduler",
  335. "pipeline": "./my_config/PP-StructureV3.yaml",
  336. "num_processes": 3,
  337. "devices": "gpu:0,gpu:1,gpu:2,gpu:3",
  338. }
  339. # default_config = {
  340. # "input_csv": "./OmniDocBench_Results_Scheduler/processed_files_4procs_20250814_213138.csv",
  341. # "output_dir": "./OmniDocBench_Results_Scheduler",
  342. # "pipeline": "./my_config/PP-StructureV3.yaml",
  343. # "num_processes": 4,
  344. # "devices": "gpu:0,gpu:1,gpu:2,gpu:3",
  345. # }
  346. sys.argv = [sys.argv[0]]
  347. for key, value in default_config.items():
  348. sys.argv.extend([f"--{key}", str(value)])
  349. sys.argv.append("--test_mode")
  350. sys.exit(main())