ppstructurev3_multi_gpu_multiprocess_official.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. # zhch/ppstructurev3_multi_gpu_multiprocess_official.py
  2. """
  3. 多GPU多进程推理始终有问题,多个进程启动后,paddle底层报错
  4. 目前无法定位原因
  5. """
  6. import json
  7. import time
  8. import os
  9. import glob
  10. import traceback
  11. import argparse
  12. import sys
  13. from pathlib import Path
  14. from typing import List, Dict, Any, Tuple
  15. from multiprocessing import Manager, Process, Queue
  16. from queue import Empty
  17. import cv2
  18. import numpy as np
  19. from paddlex import create_pipeline
  20. from paddlex.utils.device import constr_device, parse_device
  21. from tqdm import tqdm
  22. # import paddle # ❌ 不要在主模块导入paddle
  23. # from cuda_utils import detect_available_gpus, monitor_gpu_memory # ❌ 不要在主进程使用
  24. from dotenv import load_dotenv
  25. load_dotenv(override=True)
  26. def worker(pipeline_name_or_config_path: str,
  27. device: str,
  28. task_queue: Queue,
  29. result_queue: Queue,
  30. batch_size: int,
  31. output_dir: str,
  32. worker_id: int):
  33. """
  34. 工作进程函数 - 基于官方parallel_inference.md实现
  35. Args:
  36. pipeline_name_or_config_path: Pipeline名称或配置路径
  37. device: 设备字符串
  38. task_queue: 任务队列
  39. result_queue: 结果队列
  40. batch_size: 批处理大小
  41. output_dir: 输出目录
  42. worker_id: 工作进程ID
  43. """
  44. try:
  45. # 在子进程中导入paddle,避免主进程CUDA冲突
  46. import paddle
  47. import os
  48. # 设置子进程的CUDA设备
  49. device_id = device.split(':')[1] if ':' in device else '0'
  50. os.environ['CUDA_VISIBLE_DEVICES'] = device_id
  51. # 设置paddle使用单精度,避免混合精度问题
  52. # paddle.set_default_dtype("float32")
  53. # 清理GPU缓存
  54. if paddle.device.cuda.device_count() > 0:
  55. paddle.device.cuda.empty_cache()
  56. # 直接创建pipeline,让PaddleX自动处理设备初始化
  57. pipeline = create_pipeline(pipeline_name_or_config_path, device=device)
  58. print(f"Worker {worker_id} initialized with device {device}")
  59. except Exception as e:
  60. print(f"Worker {worker_id} ({device}) initialization failed: {e}", file=sys.stderr)
  61. traceback.print_exc()
  62. # 发送错误信息到结果队列
  63. result_queue.put([{
  64. "error": f"Worker initialization failed: {str(e)}",
  65. "worker_id": worker_id,
  66. "device": device,
  67. "success": False
  68. }])
  69. return
  70. try:
  71. should_end = False
  72. batch = []
  73. processed_count = 0
  74. while not should_end:
  75. try:
  76. input_path = task_queue.get_nowait()
  77. except Empty:
  78. should_end = True
  79. except Exception as e:
  80. # 处理其他可能的异常
  81. print(f"Unexpected error while getting task: {e}", file=sys.stderr)
  82. traceback.print_exc()
  83. should_end = True
  84. else:
  85. # 检查是否为结束信号
  86. if input_path is None:
  87. should_end = True
  88. else:
  89. batch.append(input_path)
  90. if batch and (len(batch) == batch_size or should_end):
  91. try:
  92. start_time = time.time()
  93. # 使用pipeline预测
  94. results = pipeline.predict(
  95. batch,
  96. use_doc_orientation_classify=True,
  97. use_doc_unwarping=False,
  98. use_seal_recognition=True,
  99. use_chart_recognition=True,
  100. use_table_recognition=True,
  101. use_formula_recognition=True,
  102. )
  103. batch_processing_time = time.time() - start_time
  104. batch_results = []
  105. for result in results:
  106. try:
  107. input_path = Path(result["input_path"])
  108. # 保存结果
  109. if result.get("page_index") is not None:
  110. output_filename = f"{input_path.stem}_{result['page_index']}"
  111. else:
  112. output_filename = f"{input_path.stem}"
  113. # 保存JSON和Markdown
  114. json_output_path = str(Path(output_dir, f"{output_filename}.json"))
  115. md_output_path = str(Path(output_dir, f"{output_filename}.md"))
  116. result.save_to_json(json_output_path)
  117. result.save_to_markdown(md_output_path)
  118. # 记录处理结果
  119. batch_results.append({
  120. "image_path": input_path.name,
  121. "processing_time": batch_processing_time / len(batch), # 平均时间
  122. "success": True,
  123. "device": device,
  124. "worker_id": worker_id,
  125. "output_json": json_output_path,
  126. "output_md": md_output_path
  127. })
  128. processed_count += 1
  129. except Exception as e:
  130. traceback.print_exc()
  131. batch_results.append({
  132. "image_path": Path(result["input_path"]).name,
  133. "processing_time": 0,
  134. "success": False,
  135. "device": device,
  136. "worker_id": worker_id,
  137. "error": str(e)
  138. })
  139. # 将结果放入结果队列
  140. result_queue.put(batch_results)
  141. # print(f"Worker {worker_id} ({device}) processed batch of {len(batch)} files. Total: {processed_count}")
  142. except Exception as e:
  143. # 批处理失败
  144. error_results = []
  145. for img_path in batch:
  146. error_results.append({
  147. "image_path": Path(img_path).name,
  148. "processing_time": 0,
  149. "success": False,
  150. "device": device,
  151. "worker_id": worker_id,
  152. "error": str(e)
  153. })
  154. result_queue.put(error_results)
  155. print(f"Error processing batch {batch} on {device}: {e}", file=sys.stderr)
  156. traceback.print_exc()
  157. batch.clear()
  158. except Exception as e:
  159. print(f"Worker {worker_id} ({device}) initialization failed: {e}", file=sys.stderr)
  160. traceback.print_exc()
  161. finally:
  162. # 清理GPU缓存
  163. try:
  164. paddle.device.cuda.empty_cache()
  165. except Exception as e:
  166. print(f"Error clearing GPU cache: {e}", file=sys.stderr)
  167. print(f"Worker {worker_id} ({device}) finished")
  168. def parallel_process_with_official_approach(image_paths: List[str],
  169. pipeline_name: str = "PP-StructureV3",
  170. device_str: str = "gpu:0,1",
  171. instances_per_device: int = 1,
  172. batch_size: int = 1,
  173. output_dir: str = "./output") -> List[Dict[str, Any]]:
  174. """
  175. 使用官方推荐的方法进行多GPU多进程并行处理
  176. Args:
  177. image_paths: 图像路径列表
  178. pipeline_name: Pipeline名称
  179. device_str: 设备字符串,如"gpu:0,1,2,3"
  180. instances_per_device: 每个设备的实例数
  181. batch_size: 批处理大小
  182. output_dir: 输出目录
  183. Returns:
  184. 处理结果列表
  185. """
  186. # 创建输出目录
  187. output_path = Path(output_dir)
  188. output_path.mkdir(parents=True, exist_ok=True)
  189. # 解析设备 - 不要在主进程中初始化paddle
  190. try:
  191. device_type, device_ids = parse_device(device_str)
  192. if device_ids is None or len(device_ids) < 1:
  193. print("No valid devices specified.", file=sys.stderr)
  194. return []
  195. print(f"Parsed devices: {device_type}:{device_ids}")
  196. except Exception as e:
  197. print(f"Failed to parse device string '{device_str}': {e}", file=sys.stderr)
  198. return []
  199. # 验证批处理大小
  200. if batch_size <= 0:
  201. print("Batch size must be greater than 0.", file=sys.stderr)
  202. return []
  203. total_instances = len(device_ids) * instances_per_device
  204. print(f"Configuration:")
  205. print(f" Devices: {device_ids}")
  206. print(f" Instances per device: {instances_per_device}")
  207. print(f" Total instances: {total_instances}")
  208. print(f" Batch size: {batch_size}")
  209. print(f" Total images: {len(image_paths)}")
  210. # 使用Manager创建队列
  211. with Manager() as manager:
  212. task_queue = manager.Queue()
  213. result_queue = manager.Queue()
  214. # 将任务放入队列
  215. for img_path in image_paths:
  216. task_queue.put(str(img_path))
  217. print(f"Added {len(image_paths)} tasks to queue")
  218. # 创建并启动工作进程
  219. processes = []
  220. worker_id = 0
  221. for device_id in device_ids:
  222. for _ in range(instances_per_device):
  223. device = constr_device(device_type, [device_id])
  224. p = Process(
  225. target=worker,
  226. args=(
  227. pipeline_name,
  228. device,
  229. task_queue,
  230. result_queue,
  231. batch_size,
  232. str(output_path),
  233. worker_id,
  234. ),
  235. )
  236. p.start()
  237. processes.append(p)
  238. worker_id += 1
  239. print(f"Started {len(processes)} worker processes")
  240. # 发送结束信号
  241. for _ in range(total_instances):
  242. task_queue.put(None)
  243. # 收集结果
  244. all_results = []
  245. total_images = len(image_paths)
  246. with tqdm(total=total_images, desc="Processing images", unit="img") as pbar:
  247. completed_count = 0
  248. while completed_count < total_images:
  249. try:
  250. batch_results = result_queue.get(timeout=600) # 10分钟超时
  251. all_results.extend(batch_results)
  252. # 更新进度条
  253. batch_success_count = sum(1 for r in batch_results if r.get('success', False))
  254. completed_count += len(batch_results)
  255. pbar.update(len(batch_results))
  256. # 显示当前批次状态
  257. pbar.set_postfix({
  258. 'batch_success': f"{batch_success_count}/{len(batch_results)}",
  259. 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
  260. })
  261. except Exception as e:
  262. print(f"Error collecting results: {e}")
  263. break
  264. # 等待所有进程结束
  265. for p in processes:
  266. p.join()
  267. return all_results
  268. def main():
  269. """主函数"""
  270. parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 Multi-GPU Parallel Processing")
  271. # 必需参数
  272. parser.add_argument("--input_dir", type=str, default="../../OmniDocBench/OpenDataLab___OmniDocBench/images", help="Input directory")
  273. parser.add_argument("--output_dir", type=str, default="./OmniDocBench_Results_Official", help="Output directory")
  274. parser.add_argument("--pipeline", type=str, default="PP-StructureV3", help="Pipeline name")
  275. parser.add_argument("--device", type=str, default="gpu:0", help="Device string")
  276. parser.add_argument("--instances_per_device", type=int, default=1, help="Instances per device")
  277. parser.add_argument("--batch_size", type=int, default=4, help="Batch size")
  278. parser.add_argument("--input_pattern", type=str, default="*", help="Input file pattern")
  279. parser.add_argument("--test_mode", action="store_true", help="Test mode (process only 20 images)")
  280. args = parser.parse_args()
  281. try:
  282. # 获取图像文件列表
  283. input_dir = Path(args.input_dir).resolve()
  284. output_dir = Path(args.output_dir).resolve()
  285. print(f"Input dir: {input_dir}, Output dir: {output_dir}")
  286. if not input_dir.exists():
  287. print(f"Input directory does not exist: {input_dir}")
  288. return 1
  289. # 查找图像文件
  290. image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']
  291. image_files = []
  292. for ext in image_extensions:
  293. image_files.extend(list(input_dir.glob(f"*{ext}")))
  294. image_files.extend(list(input_dir.glob(f"*{ext.upper()}")))
  295. if not image_files:
  296. print(f"No image files found in {input_dir}")
  297. return 1
  298. image_files = [str(f) for f in image_files]
  299. print(f"Found {len(image_files)} image files")
  300. if args.test_mode:
  301. image_files = image_files[:20]
  302. print(f"Test mode: processing only {len(image_files)} images")
  303. # 开始处理
  304. start_time = time.time()
  305. results = parallel_process_with_official_approach(
  306. image_files,
  307. args.pipeline,
  308. args.device,
  309. args.instances_per_device,
  310. args.batch_size,
  311. str(output_dir)
  312. )
  313. total_time = time.time() - start_time
  314. # 统计结果
  315. success_count = sum(1 for r in results if r.get('success', False))
  316. error_count = len(results) - success_count
  317. print(f"\n" + "="*50)
  318. print(f"Processing completed!")
  319. print(f"Total files: {len(image_files)}")
  320. print(f"Successful: {success_count}")
  321. print(f"Failed: {error_count}")
  322. print(f"Success rate: {success_count / len(image_files) * 100:.2f}%")
  323. print(f"Total time: {total_time:.2f} seconds")
  324. print(f"Throughput: {len(image_files) / total_time:.2f} images/second")
  325. # 保存结果统计
  326. stats = {
  327. "total_files": len(image_files),
  328. "success_count": success_count,
  329. "error_count": error_count,
  330. "success_rate": success_count / len(image_files),
  331. "total_time": total_time,
  332. "throughput": len(image_files) / total_time,
  333. "batch_size": args.batch_size,
  334. "gpu_ids": args.device,
  335. "pipelines_per_gpu": args.instances_per_device
  336. }
  337. # 保存最终结果
  338. output_file = os.path.join(output_dir, f"OmniDocBench_MultiGPU_batch{args.batch_size}.json")
  339. final_results = {
  340. "stats": stats,
  341. "results": results
  342. }
  343. with open(output_file, 'w', encoding='utf-8') as f:
  344. json.dump(final_results, f, ensure_ascii=False, indent=2)
  345. return 0
  346. except Exception as e:
  347. print(f"Processing failed: {e}", file=sys.stderr)
  348. traceback.print_exc()
  349. return 1
  350. if __name__ == "__main__":
  351. # ❌ 移除所有主进程CUDA操作
  352. # print(f"🚀 启动OCR程序...")
  353. # print(f"CUDA 版本: {paddle.device.cuda.get_device_name()}")
  354. # print(f"CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES')}")
  355. # available_gpus = detect_available_gpus()
  356. # monitor_gpu_memory(available_gpus)
  357. # ✅ 只进行简单的环境检查
  358. print(f"🚀 启动OCR程序...")
  359. print(f"CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES')}")
  360. if len(sys.argv) == 1:
  361. # 如果没有命令行参数,使用默认配置运行
  362. print("No command line arguments provided. Running with default configuration...")
  363. # 默认配置
  364. default_config = {
  365. "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images",
  366. "output_dir": "./OmniDocBench_Results_Official",
  367. "pipeline": "PP-StructureV3",
  368. "device": "gpu:3",
  369. "instances_per_device": 1,
  370. "batch_size": 1,
  371. # "test_mode": False
  372. }
  373. # 构造参数
  374. sys.argv = [sys.argv[0]]
  375. for key, value in default_config.items():
  376. sys.argv.extend([f"--{key}", str(value)])
  377. # 测试模式
  378. sys.argv.append("--test_mode")
  379. sys.exit(main())