ppstructurev3_parallel_predict_optimized.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. # zhch/ppstructurev3_parallel_predict_optimized.py
  2. import json
  3. import time
  4. import os
  5. import glob
  6. import traceback
  7. from pathlib import Path
  8. from typing import List, Dict, Any, Tuple
  9. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
  10. from multiprocessing import Queue, Manager
  11. import cv2
  12. import numpy as np
  13. from paddlex import create_pipeline
  14. from tqdm import tqdm
  15. import threading
  16. import queue
  17. from dotenv import load_dotenv
  18. load_dotenv(override=True)
  19. class PPStructureV3ParallelPredictor:
  20. """
  21. PP-StructureV3并行预测器,支持多进程批处理
  22. """
  23. def __init__(self, pipeline_config_path: str = "PP-StructureV3", output_path: str = "output", use_gpu: bool = True):
  24. """
  25. 初始化预测器
  26. Args:
  27. pipeline_config_path: PaddleX pipeline配置文件路径
  28. """
  29. self.pipeline_config = pipeline_config_path
  30. self.pipeline = None # 延迟初始化
  31. self.output_path = output_path
  32. self.use_gpu = use_gpu
  33. def _ensure_pipeline(self):
  34. """确保pipeline已初始化(线程安全)"""
  35. if self.pipeline is None:
  36. self.pipeline = create_pipeline(pipeline=self.pipeline_config)
  37. def process_single_image(self, image_path: str) -> Dict[str, Any]:
  38. """
  39. 处理单张图像
  40. Args:
  41. image_path: 图像路径
  42. Returns:
  43. 处理结果{"image_path": str, "success": bool, "processing_time": float, "error": str}
  44. """
  45. try:
  46. # 确保pipeline已初始化
  47. self._ensure_pipeline()
  48. # 读取图像获取尺寸信息
  49. image = cv2.imread(image_path)
  50. if image is None:
  51. return {
  52. "image_path": Path(image_path).name,
  53. "error": "无法读取图像",
  54. "success": False,
  55. "processing_time": 0
  56. }
  57. height, width = image.shape[:2]
  58. # 运行PaddleX pipeline
  59. start_time = time.time()
  60. output = self.pipeline.predict(
  61. input=image_path,
  62. device="gpu" if self.use_gpu else "cpu",
  63. use_doc_orientation_classify=True,
  64. use_doc_unwarping=False,
  65. use_seal_recognition=True,
  66. use_chart_recognition=True,
  67. use_table_recognition=True,
  68. use_formula_recognition=True,
  69. )
  70. # 保存结果
  71. for res in output:
  72. res.save_to_json(save_path=self.output_path)
  73. res.save_to_markdown(save_path=self.output_path)
  74. process_time = time.time() - start_time
  75. # 返回处理结果
  76. return {
  77. "image_path": Path(image_path).name,
  78. "processing_time": process_time,
  79. "success": True
  80. }
  81. except Exception as e:
  82. return {
  83. "image_path": Path(image_path).name,
  84. "error": str(e),
  85. "success": False,
  86. "processing_time": 0
  87. }
  88. def process_batch(self, image_paths: List[str]) -> List[Dict[str, Any]]:
  89. """
  90. 批处理图像
  91. Args:
  92. image_paths: 图像路径列表
  93. Returns:
  94. 结果列表
  95. """
  96. results = []
  97. for image_path in image_paths:
  98. result = self.process_single_image(image_path)
  99. results.append(result)
  100. return results
  101. class ThreadWorker:
  102. """线程工作器 - 每个线程维护自己的pipeline实例"""
  103. def __init__(self, pipeline_config: str, output_path: str, use_gpu: bool, worker_id: int):
  104. self.worker_id = worker_id
  105. self.predictor = PPStructureV3ParallelPredictor(
  106. pipeline_config,
  107. output_path=f"{output_path}/worker_{worker_id}",
  108. use_gpu=use_gpu
  109. )
  110. self.task_queue = queue.Queue()
  111. self.result_queue = queue.Queue()
  112. self.running = True
  113. def add_batch(self, batch: List[str]):
  114. """添加批处理任务"""
  115. self.task_queue.put(batch)
  116. def get_results(self) -> List[Dict[str, Any]]:
  117. """获取处理结果"""
  118. results = []
  119. while not self.result_queue.empty():
  120. try:
  121. result = self.result_queue.get_nowait()
  122. results.extend(result)
  123. except queue.Empty:
  124. break
  125. return results
  126. def worker_loop(self):
  127. """工作循环"""
  128. while self.running:
  129. try:
  130. batch = self.task_queue.get(timeout=1.0)
  131. if batch is None: # 结束信号
  132. break
  133. # 处理批次
  134. batch_results = self.predictor.process_batch(batch)
  135. self.result_queue.put(batch_results)
  136. except queue.Empty:
  137. continue
  138. except Exception as e:
  139. print(f"工作线程 {self.worker_id} 处理出错: {e}")
  140. def stop(self):
  141. """停止工作线程"""
  142. self.running = False
  143. self.task_queue.put(None) # 发送结束信号
  144. def parallel_process_with_optimized_threading(image_paths: List[str],
  145. batch_size: int = 4,
  146. max_workers: int = 2, # GPU限制为2个worker
  147. pipeline_config: str = "PP-StructureV3",
  148. output_path: str = "./output",
  149. use_gpu: bool = True) -> List[Dict[str, Any]]:
  150. """
  151. 使用优化的多线程并行处理(每个线程一个pipeline实例)
  152. Args:
  153. image_paths: 图像路径列表
  154. batch_size: 批处理大小
  155. max_workers: 最大工作线程数(GPU推荐2个)
  156. pipeline_config: pipeline配置
  157. output_path: 输出路径
  158. use_gpu: 是否使用GPU
  159. Returns:
  160. 处理结果列表
  161. """
  162. # 确保输出目录存在
  163. os.makedirs(output_path, exist_ok=True)
  164. # 将图像路径分批
  165. batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
  166. # 创建工作线程
  167. workers = []
  168. threads = []
  169. for i in range(max_workers):
  170. worker = ThreadWorker(pipeline_config, output_path, use_gpu, i)
  171. workers.append(worker)
  172. thread = threading.Thread(target=worker.worker_loop)
  173. thread.daemon = True
  174. thread.start()
  175. threads.append(thread)
  176. print(f"启动了 {max_workers} 个工作线程,每个线程独立的pipeline实例")
  177. # 分发任务
  178. all_results = []
  179. total_images = len(image_paths)
  180. completed_count = 0
  181. try:
  182. with tqdm(total=total_images, desc="处理图像", unit="张") as pbar:
  183. # 轮流分发批次到不同的worker
  184. for i, batch in enumerate(batches):
  185. worker_id = i % max_workers
  186. workers[worker_id].add_batch(batch)
  187. # 等待所有任务完成
  188. while completed_count < total_images:
  189. time.sleep(0.1) # 短暂等待
  190. # 收集结果
  191. for worker in workers:
  192. batch_results = worker.get_results()
  193. if batch_results:
  194. all_results.extend(batch_results)
  195. completed_count += len(batch_results)
  196. pbar.update(len(batch_results))
  197. # 更新进度条
  198. success_count = sum(1 for r in batch_results if r.get('success', False))
  199. pbar.set_postfix({
  200. 'recent_success': f"{success_count}/{len(batch_results)}",
  201. 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
  202. })
  203. finally:
  204. # 停止所有工作线程
  205. for worker in workers:
  206. worker.stop()
  207. # 等待线程结束
  208. for thread in threads:
  209. thread.join(timeout=2.0)
  210. return all_results
  211. def process_batch_worker_optimized(worker_id: int,
  212. task_queue: Queue,
  213. result_queue: Queue,
  214. pipeline_config: str,
  215. output_path: str,
  216. use_gpu: bool):
  217. """
  218. 优化的多进程工作函数 - 每个进程只初始化一次pipeline
  219. """
  220. try:
  221. # 每个进程创建自己的输出目录
  222. worker_output = f"{output_path}/worker_{worker_id}"
  223. os.makedirs(worker_output, exist_ok=True)
  224. # 只初始化一次pipeline
  225. predictor = PPStructureV3ParallelPredictor(
  226. pipeline_config,
  227. output_path=worker_output,
  228. use_gpu=use_gpu
  229. )
  230. print(f"进程 {worker_id} 初始化完成")
  231. # 持续处理任务
  232. while True:
  233. try:
  234. batch = task_queue.get(timeout=2.0)
  235. if batch is None: # 结束信号
  236. break
  237. # 处理批次
  238. batch_results = predictor.process_batch(batch)
  239. result_queue.put(batch_results)
  240. except Exception as e:
  241. print(f"进程 {worker_id} 处理批次时出错: {e}")
  242. continue
  243. except Exception as e:
  244. print(f"进程 {worker_id} 初始化失败: {e}")
  245. traceback.print_exc()
  246. def parallel_process_with_optimized_multiprocessing(image_paths: List[str],
  247. batch_size: int = 4,
  248. max_workers: int = 4,
  249. pipeline_config: str = "PP-StructureV3",
  250. output_path: str = "./output",
  251. use_gpu: bool = False) -> List[Dict[str, Any]]:
  252. """
  253. 使用优化的多进程并行处理(每个进程一个pipeline实例)
  254. Args:
  255. image_paths: 图像路径列表
  256. batch_size: 批处理大小
  257. max_workers: 最大工作进程数
  258. pipeline_config: pipeline配置
  259. output_path: 输出路径
  260. use_gpu: 是否使用GPU
  261. Returns:
  262. 处理结果列表
  263. """
  264. # 确保输出目录存在
  265. os.makedirs(output_path, exist_ok=True)
  266. # 将图像路径分批
  267. batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
  268. # 创建进程间通信队列
  269. manager = Manager()
  270. task_queue = manager.Queue()
  271. result_queue = manager.Queue()
  272. # 启动工作进程
  273. processes = []
  274. for i in range(max_workers):
  275. p = Process(
  276. target=process_batch_worker_optimized,
  277. args=(i, task_queue, result_queue, pipeline_config, output_path, use_gpu)
  278. )
  279. p.start()
  280. processes.append(p)
  281. print(f"启动了 {max_workers} 个工作进程,每个进程独立的pipeline实例")
  282. # 分发任务
  283. for batch in batches:
  284. task_queue.put(batch)
  285. # 发送结束信号
  286. for _ in range(max_workers):
  287. task_queue.put(None)
  288. # 收集结果
  289. all_results = []
  290. total_images = len(image_paths)
  291. completed_count = 0
  292. with tqdm(total=total_images, desc="处理图像", unit="张") as pbar:
  293. # 等待所有结果
  294. expected_batches = len(batches)
  295. received_batches = 0
  296. while received_batches < expected_batches:
  297. try:
  298. batch_results = result_queue.get(timeout=30.0)
  299. all_results.extend(batch_results)
  300. completed_count += len(batch_results)
  301. received_batches += 1
  302. pbar.update(len(batch_results))
  303. # 更新进度条
  304. success_count = sum(1 for r in batch_results if r.get('success', False))
  305. pbar.set_postfix({
  306. 'batch_success': f"{success_count}/{len(batch_results)}",
  307. 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
  308. })
  309. except Exception as e:
  310. print(f"等待结果时出错: {e}")
  311. break
  312. # 等待所有进程结束
  313. for p in processes:
  314. p.join(timeout=10.0)
  315. if p.is_alive():
  316. p.terminate()
  317. return all_results
  318. def main():
  319. """主函数 - 优化的并行处理"""
  320. # 配置参数
  321. dataset_path = "../../OmniDocBench/OpenDataLab___OmniDocBench/images"
  322. output_dir = "./OmniDocBench_Results_Optimized"
  323. pipeline_config = "PP-StructureV3"
  324. # 并行处理参数
  325. batch_size = 4 # 批处理大小
  326. use_gpu = True # 是否使用GPU
  327. max_workers = 4 # CPU可以用更多进程
  328. use_multiprocessing = False # CPU用进程
  329. # 确保输出目录存在
  330. print(f"输出目录: {Path(output_dir).absolute()}")
  331. os.makedirs(output_dir, exist_ok=True)
  332. dataset_path = Path(dataset_path).resolve()
  333. output_dir = Path(output_dir).resolve()
  334. print("="*60)
  335. print("OmniDocBench 优化并行处理开始")
  336. print("="*60)
  337. print(f"数据集路径: {dataset_path}")
  338. print(f"输出目录: {output_dir}")
  339. print(f"批处理大小: {batch_size}")
  340. print(f"最大工作线程/进程数: {max_workers}")
  341. print(f"使用GPU: {use_gpu}")
  342. print(f"并行方式: {'多进程' if use_multiprocessing else '多线程'}")
  343. print(f"Pipeline实例数: {max_workers} (每个进程/线程一个)")
  344. # 查找所有图像文件
  345. image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff']
  346. image_files = []
  347. for ext in image_extensions:
  348. image_files.extend(glob.glob(os.path.join(dataset_path, ext)))
  349. print(f"找到 {len(image_files)} 个图像文件")
  350. if not image_files:
  351. print("未找到任何图像文件,程序终止")
  352. return
  353. # 限制处理数量用于测试
  354. # image_files = image_files[:20] # 取消注释以限制处理数量
  355. # 开始处理
  356. start_time = time.time()
  357. try:
  358. if use_multiprocessing:
  359. # 多进程处理(推荐用于CPU)
  360. print("使用优化的多进程并行处理...")
  361. results = parallel_process_with_optimized_multiprocessing(
  362. image_files, batch_size, max_workers, pipeline_config, str(output_dir), use_gpu
  363. )
  364. else:
  365. # 多线程处理(推荐用于GPU)
  366. print("使用优化的多线程并行处理...")
  367. results = parallel_process_with_optimized_threading(
  368. image_files, batch_size, max_workers, pipeline_config, str(output_dir), use_gpu
  369. )
  370. total_time = time.time() - start_time
  371. # 统计信息
  372. success_count = sum(1 for r in results if r.get('success', False))
  373. error_count = len(results) - success_count
  374. total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False))
  375. avg_processing_time = total_processing_time / success_count if success_count > 0 else 0
  376. # 保存结果统计
  377. stats = {
  378. "total_files": len(image_files),
  379. "success_count": success_count,
  380. "error_count": error_count,
  381. "success_rate": success_count / len(image_files),
  382. "total_time": total_time,
  383. "avg_processing_time": avg_processing_time,
  384. "throughput": len(image_files) / total_time,
  385. "batch_size": batch_size,
  386. "max_workers": max_workers,
  387. "use_gpu": use_gpu,
  388. "use_multiprocessing": use_multiprocessing,
  389. "optimization": "单进程/线程单pipeline实例"
  390. }
  391. results['stats'] = stats
  392. # 保存最终结果
  393. output_file = os.path.join(output_dir, f"OmniDocBench_PPStructureV3_batch{batch_size}.json")
  394. with open(output_file, 'w', encoding='utf-8') as f:
  395. json.dump(results, f, ensure_ascii=False, indent=2)
  396. print("\n" + "="*60)
  397. print("优化并行处理完成!")
  398. print("="*60)
  399. print(f"总文件数: {len(image_files)}")
  400. print(f"成功处理: {success_count}")
  401. print(f"失败数量: {error_count}")
  402. print(f"成功率: {success_count / len(image_files) * 100:.2f}%")
  403. print(f"总耗时: {total_time:.2f}秒")
  404. print(f"平均处理时间: {avg_processing_time:.2f}秒/张")
  405. print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒")
  406. print(f"Pipeline实例数: {max_workers}")
  407. print(f"统计信息保存至: {output_file}")
  408. except Exception as e:
  409. print(f"处理过程中发生错误: {str(e)}")
  410. traceback.print_exc()
  411. if __name__ == "__main__":
  412. main()