# zhch/omnidocbench_parallel_eval.py import json import time import os import glob import traceback from pathlib import Path from typing import List, Dict, Any, Tuple from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed from multiprocessing import Queue, Manager import cv2 import numpy as np from paddlex import create_pipeline from tqdm import tqdm import threading class OmniDocBenchParallelEvaluator: """ OmniDocBench并行评估器,支持多进程批处理 """ def __init__(self, pipeline_config_path: str = "PP-StructureV3"): """ 初始化评估器 Args: pipeline_config_path: PaddleX pipeline配置文件路径 """ self.pipeline_config = pipeline_config_path self.category_mapping = self._get_category_mapping() def _get_category_mapping(self) -> Dict[str, str]: """获取PaddleX类别到OmniDocBench类别的映射""" return { 'title': 'title', 'text': 'text_block', 'figure': 'figure', 'figure_caption': 'figure_caption', 'table': 'table', 'table_caption': 'table_caption', 'equation': 'equation_isolated', 'header': 'header', 'footer': 'footer', 'reference': 'reference', 'seal': 'abandon', 'number': 'page_number', } def create_pipeline(self): """创建pipeline实例(每个进程单独创建)""" return create_pipeline(pipeline=self.pipeline_config) def process_single_image(self, image_path: str, use_gpu: bool = True) -> Dict[str, Any]: """ 处理单张图像 Args: image_path: 图像路径 use_gpu: 是否使用GPU Returns: OmniDocBench格式的结果字典 """ try: # 每个进程创建自己的pipeline pipeline = self.create_pipeline() # 读取图像获取尺寸信息 image = cv2.imread(image_path) if image is None: return None height, width = image.shape[:2] # 运行PaddleX pipeline start_time = time.time() output = list(pipeline.predict( input=image_path, device="gpu" if use_gpu else "cpu", use_doc_orientation_classify=True, use_doc_unwarping=False, use_seal_recognition=True, use_chart_recognition=True, use_table_recognition=True, use_formula_recognition=True, )) process_time = time.time() - start_time # 转换为OmniDocBench格式 result = self._convert_to_omnidocbench_format( output, image_path, width, height ) # 添加处理时间信息 if result: result["processing_time"] = process_time result["success"] = True return result except Exception as e: return { "image_path": Path(image_path).name, "error": str(e), "success": False, "processing_time": 0 } def process_batch(self, image_paths: List[str], use_gpu: bool = True) -> List[Dict[str, Any]]: """ 批处理图像 Args: image_paths: 图像路径列表 use_gpu: 是否使用GPU Returns: 结果列表 """ results = [] pipeline = self.create_pipeline() for image_path in image_paths: try: result = self._process_with_pipeline(pipeline, image_path, use_gpu) if result: results.append(result) except Exception as e: error_result = { "image_path": Path(image_path).name, "error": str(e), "success": False, "processing_time": 0 } results.append(error_result) return results def _process_with_pipeline(self, pipeline, image_path: str, use_gpu: bool) -> Dict[str, Any]: """使用给定的pipeline处理图像""" # 读取图像获取尺寸信息 image = cv2.imread(image_path) if image is None: return None height, width = image.shape[:2] # 运行pipeline start_time = time.time() output = list(pipeline.predict( input=image_path, device="gpu" if use_gpu else "cpu", use_doc_orientation_classify=True, use_doc_unwarping=False, use_seal_recognition=True, use_chart_recognition=True, use_table_recognition=True, use_formula_recognition=True, )) process_time = time.time() - start_time # 转换格式 result = self._convert_to_omnidocbench_format( output, image_path, width, height ) if result: result["processing_time"] = process_time result["success"] = True return result def _convert_to_omnidocbench_format(self, paddlex_output: List, image_path: str, width: int, height: int) -> Dict[str, Any]: """将PaddleX输出转换为OmniDocBench格式""" layout_dets = [] anno_id_counter = 0 # 处理PaddleX的输出 for res in paddlex_output: res_json = res.json.get('res', {}) parsing_list = res_json.get('parsing_res_list', []) for item in parsing_list: bbox = item.get('block_bbox', []) category = item.get('block_label', 'text_block') content = item.get('block_content', '') # 转换bbox格式 if len(bbox) == 4: x1, y1, x2, y2 = bbox poly = [x1, y1, x2, y1, x2, y2, x1, y2] else: poly = bbox # 映射类别 omni_category = self.category_mapping.get(category, 'text_block') # 创建layout检测结果 layout_det = { "category_type": omni_category, "poly": poly, "ignore": False, "order": anno_id_counter, "anno_id": anno_id_counter, } # 添加内容 if content and content.strip(): if omni_category == 'table': layout_det["html"] = content else: layout_det["text"] = content.strip() # 添加属性 layout_det["attribute"] = self._extract_attributes(item, omni_category) layout_det["line_with_spans"] = [] layout_dets.append(layout_det) anno_id_counter += 1 # 构建完整结果 result = { "layout_dets": layout_dets, "page_info": { "page_no": 0, "height": height, "width": width, "image_path": Path(image_path).name, "page_attribute": { "data_source": "research_report", "language": "simplified_chinese", "layout": "single_column", "watermark": False, "fuzzy_scan": False, "colorful_backgroud": False } }, "extra": { "relation": [] } } return result def _extract_attributes(self, item: Dict, category: str) -> Dict: """提取属性标签""" attributes = {} if category == 'table': attributes.update({ "table_layout": "vertical", "with_span": False, "line": "full_line", "language": "table_simplified_chinese", "include_equation": False, "include_backgroud": False, "table_vertical": False }) content = item.get('block_content', '') if 'colspan' in content or 'rowspan' in content: attributes["with_span"] = True elif category in ['text_block', 'title']: attributes.update({ "text_language": "text_simplified_chinese", "text_background": "white", "text_rotate": "normal" }) elif 'equation' in category: attributes.update({ "formula_type": "print" }) return attributes def parallel_process_with_threading(self, image_paths: List[str], batch_size: int = 4, max_workers: int = 4, use_gpu: bool = True) -> List[Dict[str, Any]]: """ 使用多线程并行处理(推荐用于GPU) Args: image_paths: 图像路径列表 batch_size: 批处理大小 max_workers: 最大工作线程数 use_gpu: 是否使用GPU Returns: 处理结果列表 """ # 将图像路径分批 batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)] all_results = [] completed_count = 0 total_images = len(image_paths) # 创建进度条 with tqdm(total=total_images, desc="处理图像", unit="张") as pbar: with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有批处理任务 future_to_batch = { executor.submit(self.process_batch, batch, use_gpu): batch for batch in batches } # 收集结果 for future in as_completed(future_to_batch): batch = future_to_batch[future] try: batch_results = future.result() all_results.extend(batch_results) completed_count += len(batch) pbar.update(len(batch)) # 更新进度条描述 success_count = sum(1 for r in batch_results if r.get('success', False)) pbar.set_postfix({ 'batch_success': f"{success_count}/{len(batch)}", 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}" }) except Exception as e: print(f"批处理失败: {e}") # 为失败的批次创建错误结果 for img_path in batch: error_result = { "image_path": Path(img_path).name, "error": str(e), "success": False, "processing_time": 0 } all_results.append(error_result) pbar.update(len(batch)) return all_results def parallel_process_with_multiprocessing(self, image_paths: List[str], batch_size: int = 4, max_workers: int = 4, use_gpu: bool = False) -> List[Dict[str, Any]]: """ 使用多进程并行处理(推荐用于CPU) Args: image_paths: 图像路径列表 batch_size: 批处理大小 max_workers: 最大工作进程数 use_gpu: 是否使用GPU Returns: 处理结果列表 """ # 将图像路径分批 batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)] all_results = [] completed_count = 0 total_images = len(image_paths) # 创建进度条 with tqdm(total=total_images, desc="处理图像", unit="张") as pbar: with ProcessPoolExecutor(max_workers=max_workers) as executor: # 提交所有批处理任务 future_to_batch = { executor.submit(process_batch_worker, batch, self.pipeline_config, use_gpu): batch for batch in batches } # 收集结果 for future in as_completed(future_to_batch): batch = future_to_batch[future] try: batch_results = future.result() all_results.extend(batch_results) completed_count += len(batch) pbar.update(len(batch)) # 更新进度条描述 success_count = sum(1 for r in batch_results if r.get('success', False)) pbar.set_postfix({ 'batch_success': f"{success_count}/{len(batch)}", 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}" }) except Exception as e: print(f"批处理失败: {e}") # 为失败的批次创建错误结果 for img_path in batch: error_result = { "image_path": Path(img_path).name, "error": str(e), "success": False, "processing_time": 0 } all_results.append(error_result) pbar.update(len(batch)) return all_results def save_results_incrementally(self, results: List[Dict[str, Any]], output_file: str, save_interval: int = 50): """ 增量保存结果 Args: results: 结果列表 output_file: 输出文件路径 save_interval: 保存间隔 """ if len(results) % save_interval == 0 and len(results) > 0: try: with open(output_file, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"已保存 {len(results)} 个结果到 {output_file}") except Exception as e: print(f"保存结果时出错: {e}") def process_batch_worker(image_paths: List[str], pipeline_config: str, use_gpu: bool) -> List[Dict[str, Any]]: """ 多进程工作函数 """ try: # 在每个进程中创建评估器 evaluator = OmniDocBenchParallelEvaluator(pipeline_config) return evaluator.process_batch(image_paths, use_gpu) except Exception as e: # 返回错误结果 error_results = [] for img_path in image_paths: error_results.append({ "image_path": Path(img_path).name, "error": str(e), "success": False, "processing_time": 0 }) return error_results def main(): """主函数 - 并行处理OmniDocBench数据集""" # 配置参数 dataset_path = "/Users/zhch158/workspace/repository.git/OmniDocBench/OpenDataLab___OmniDocBench/images" output_dir = "/Users/zhch158/workspace/repository.git/PaddleX/zhch/OmniDocBench_Results" pipeline_config = "PP-StructureV3" # 并行处理参数 batch_size = 4 # 批处理大小 max_workers = 4 # 最大工作进程/线程数 use_gpu = True # 是否使用GPU use_multiprocessing = False # False=多线程(GPU推荐), True=多进程(CPU推荐) # 确保输出目录存在 os.makedirs(output_dir, exist_ok=True) print("="*60) print("OmniDocBench 并行评估开始") print("="*60) print(f"数据集路径: {dataset_path}") print(f"输出目录: {output_dir}") print(f"批处理大小: {batch_size}") print(f"最大工作线程/进程数: {max_workers}") print(f"使用GPU: {use_gpu}") print(f"并行方式: {'多进程' if use_multiprocessing else '多线程'}") # 查找所有图像文件 image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff'] image_files = [] for ext in image_extensions: image_files.extend(glob.glob(os.path.join(dataset_path, ext))) print(f"找到 {len(image_files)} 个图像文件") if not image_files: print("未找到任何图像文件,程序终止") return # 创建评估器 evaluator = OmniDocBenchParallelEvaluator(pipeline_config) # 开始处理 start_time = time.time() if use_multiprocessing: # 多进程处理(推荐用于CPU) print("使用多进程并行处理...") results = evaluator.parallel_process_with_multiprocessing( image_files, batch_size, max_workers, use_gpu ) else: # 多线程处理(推荐用于GPU) print("使用多线程并行处理...") results = evaluator.parallel_process_with_threading( image_files, batch_size, max_workers, use_gpu ) total_time = time.time() - start_time # 保存最终结果 output_file = os.path.join(output_dir, f"OmniDocBench_PPStructureV3_batch{batch_size}.json") try: with open(output_file, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) print("\n" + "="*60) print("处理完成!") print("="*60) # 统计信息 success_count = sum(1 for r in results if r.get('success', False)) error_count = len(results) - success_count total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False)) avg_processing_time = total_processing_time / success_count if success_count > 0 else 0 print(f"总文件数: {len(image_files)}") print(f"成功处理: {success_count}") print(f"失败数量: {error_count}") print(f"成功率: {success_count / len(image_files) * 100:.2f}%") print(f"总耗时: {total_time:.2f}秒") print(f"平均处理时间: {avg_processing_time:.2f}秒/张") print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒") print(f"结果保存至: {output_file}") # 保存统计信息 stats = { "total_files": len(image_files), "success_count": success_count, "error_count": error_count, "success_rate": success_count / len(image_files), "total_time": total_time, "avg_processing_time": avg_processing_time, "throughput": len(image_files) / total_time, "batch_size": batch_size, "max_workers": max_workers, "use_gpu": use_gpu, "use_multiprocessing": use_multiprocessing } stats_file = os.path.join(output_dir, f"processing_stats_batch{batch_size}.json") with open(stats_file, 'w', encoding='utf-8') as f: json.dump(stats, f, ensure_ascii=False, indent=2) print(f"统计信息保存至: {stats_file}") except Exception as e: print(f"保存结果文件时发生错误: {str(e)}") traceback.print_exc() if __name__ == "__main__": main()