| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- import json
- import time
- import os
- import traceback
- import argparse
- import sys
- import warnings
- from pathlib import Path
- from typing import List, Dict, Any
- import cv2
- import numpy as np
- # 抑制特定警告
- warnings.filterwarnings("ignore", message="To copy construct from a tensor")
- warnings.filterwarnings("ignore", message="Setting `pad_token_id`")
- warnings.filterwarnings("ignore", category=UserWarning, module="paddlex")
- from paddlex import create_pipeline
- from paddlex.utils.device import constr_device, parse_device
- from tqdm import tqdm
- from dotenv import load_dotenv
- load_dotenv(override=True)
- def process_images_single_process(image_paths: List[str],
- pipeline_name: str = "PP-StructureV3",
- device: str = "gpu:0",
- batch_size: int = 1,
- output_dir: str = "./output") -> List[Dict[str, Any]]:
- """
- 单进程版本的图像处理函数
-
- Args:
- image_paths: 图像路径列表
- pipeline_name: Pipeline名称
- device: 设备字符串,如"gpu:0"或"cpu"
- batch_size: 批处理大小
- output_dir: 输出目录
-
- Returns:
- 处理结果列表
- """
- # 创建输出目录
- output_path = Path(output_dir)
- output_path.mkdir(parents=True, exist_ok=True)
-
- print(f"Initializing pipeline '{pipeline_name}' on device '{device}'...")
-
- try:
- # 设置环境变量以减少警告
- os.environ['PYTHONWARNINGS'] = 'ignore::UserWarning'
-
- # 初始化pipeline
- pipeline = create_pipeline(pipeline_name, device=device)
- print(f"Pipeline initialized successfully on {device}")
-
- except Exception as e:
- print(f"Failed to initialize pipeline: {e}", file=sys.stderr)
- traceback.print_exc()
- return []
-
- all_results = []
- total_images = len(image_paths)
-
- print(f"Processing {total_images} images with batch size {batch_size}")
-
- # 使用tqdm显示进度,添加更多统计信息
- with tqdm(total=total_images, desc="Processing images", unit="img",
- bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar:
-
- # 按批次处理图像
- for i in range(0, total_images, batch_size):
- batch = image_paths[i:i + batch_size]
- batch_start_time = time.time()
-
- try:
- # 使用pipeline预测
- results = pipeline.predict(
- batch,
- use_doc_orientation_classify=True,
- use_doc_unwarping=False,
- use_seal_recognition=True,
- use_chart_recognition=True,
- use_table_recognition=True,
- use_formula_recognition=True,
- )
-
- batch_processing_time = time.time() - batch_start_time
- batch_results = []
-
- # 处理每个结果
- for result in results:
- try:
- input_path = Path(result["input_path"])
-
- # 生成输出文件名
- if result.get("page_index") is not None:
- output_filename = f"{input_path.stem}_{result['page_index']}"
- else:
- output_filename = f"{input_path.stem}"
-
- # 保存JSON和Markdown文件
- json_output_path = str(Path(output_dir, f"{output_filename}.json"))
- md_output_path = str(Path(output_dir, f"{output_filename}.md"))
-
- result.save_to_json(json_output_path)
- result.save_to_markdown(md_output_path)
-
- # 记录处理结果
- batch_results.append({
- "image_path": input_path.name,
- "processing_time": batch_processing_time / len(batch), # 平均时间
- "success": True,
- "device": device,
- "output_json": json_output_path,
- "output_md": md_output_path
- })
-
- except Exception as e:
- print(f"Error saving result for {result.get('input_path', 'unknown')}: {e}", file=sys.stderr)
- traceback.print_exc()
- batch_results.append({
- "image_path": Path(result["input_path"]).name,
- "processing_time": 0,
- "success": False,
- "device": device,
- "error": str(e)
- })
-
- all_results.extend(batch_results)
-
- # 更新进度条
- success_count = sum(1 for r in batch_results if r.get('success', False))
- total_success = sum(1 for r in all_results if r.get('success', False))
- avg_time = batch_processing_time / len(batch)
-
- pbar.update(len(batch))
- pbar.set_postfix({
- 'batch_time': f"{batch_processing_time:.2f}s",
- 'avg_time': f"{avg_time:.2f}s/img",
- 'success': f"{total_success}/{len(all_results)}",
- 'rate': f"{total_success/len(all_results)*100:.1f}%"
- })
-
- except Exception as e:
- print(f"Error processing batch {[Path(p).name for p in batch]}: {e}", file=sys.stderr)
- traceback.print_exc()
-
- # 为批次中的所有图像添加错误结果
- error_results = []
- for img_path in batch:
- error_results.append({
- "image_path": Path(img_path).name,
- "processing_time": 0,
- "success": False,
- "device": device,
- "error": str(e)
- })
- all_results.extend(error_results)
- pbar.update(len(batch))
-
- return all_results
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 Single Process Processing")
-
- # 参数定义
- parser.add_argument("--input_dir", type=str, default="../../OmniDocBench/OpenDataLab___OmniDocBench/images", help="Input directory")
- parser.add_argument("--output_dir", type=str, default="./OmniDocBench_Results_Single", help="Output directory")
- parser.add_argument("--pipeline", type=str, default="PP-StructureV3", help="Pipeline name")
- parser.add_argument("--device", type=str, default="gpu:0", help="Device string (e.g., 'gpu:0', 'cpu')")
- parser.add_argument("--batch_size", type=int, default=4, help="Batch size")
- parser.add_argument("--input_pattern", type=str, default="*", help="Input file pattern")
- parser.add_argument("--test_mode", action="store_true", help="Test mode (process only 20 images)")
-
- args = parser.parse_args()
-
- try:
- # 获取图像文件列表
- input_dir = Path(args.input_dir).resolve()
- output_dir = Path(args.output_dir).resolve()
-
- print(f"Input dir: {input_dir}")
- print(f"Output dir: {output_dir}")
-
- if not input_dir.exists():
- print(f"Input directory does not exist: {input_dir}")
- return 1
-
- # 查找图像文件
- image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']
- image_files = []
- for ext in image_extensions:
- image_files.extend(list(input_dir.glob(f"*{ext}")))
- image_files.extend(list(input_dir.glob(f"*{ext.upper()}")))
-
- if not image_files:
- print(f"No image files found in {input_dir}")
- return 1
-
- # 去重并排序
- image_files = sorted(list(set(str(f) for f in image_files)))
- print(f"Found {len(image_files)} image files")
-
- if args.test_mode:
- image_files = image_files[:20]
- print(f"Test mode: processing only {len(image_files)} images")
-
- # 验证设备
- if args.device.startswith('gpu'):
- try:
- import paddle
- if not paddle.device.is_compiled_with_cuda():
- print("GPU requested but CUDA not available, falling back to CPU")
- args.device = "cpu"
- else:
- gpu_count = paddle.device.cuda.device_count()
- device_id = int(args.device.split(':')[1]) if ':' in args.device else 0
- if device_id >= gpu_count:
- print(f"GPU {device_id} not available (only {gpu_count} GPUs), falling back to GPU 0")
- args.device = "gpu:0"
-
- # 显示GPU信息
- if args.verbose:
- for i in range(gpu_count):
- props = paddle.device.cuda.get_device_properties(i)
- print(f"GPU {i}: {props.name} - {props.total_memory // 1024**3}GB")
-
- except Exception as e:
- print(f"Error checking GPU availability: {e}, falling back to CPU")
- args.device = "cpu"
-
- print(f"Using device: {args.device}")
- print(f"Batch size: {args.batch_size}")
-
- # 开始处理
- start_time = time.time()
- results = process_images_single_process(
- image_files,
- args.pipeline,
- args.device,
- args.batch_size,
- str(output_dir)
- )
- total_time = time.time() - start_time
-
- # 统计结果
- success_count = sum(1 for r in results if r.get('success', False))
- error_count = len(results) - success_count
-
- print(f"\n" + "="*60)
- print(f"✅ Processing completed!")
- print(f"📊 Statistics:")
- print(f" Total files: {len(image_files)}")
- print(f" Successful: {success_count}")
- print(f" Failed: {error_count}")
- if len(image_files) > 0:
- print(f" Success rate: {success_count / len(image_files) * 100:.2f}%")
- print(f"⏱️ Performance:")
- print(f" Total time: {total_time:.2f} seconds")
- if total_time > 0:
- print(f" Throughput: {len(image_files) / total_time:.2f} images/second")
- print(f" Avg time per image: {total_time / len(image_files):.2f} seconds")
-
- # 保存结果统计
- stats = {
- "total_files": len(image_files),
- "success_count": success_count,
- "error_count": error_count,
- "success_rate": success_count / len(image_files) if len(image_files) > 0 else 0,
- "total_time": total_time,
- "throughput": len(image_files) / total_time if total_time > 0 else 0,
- "avg_time_per_image": total_time / len(image_files) if len(image_files) > 0 else 0,
- "batch_size": args.batch_size,
- "device": args.device,
- "pipeline": args.pipeline,
- "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
- }
-
- # 保存最终结果
- output_file = os.path.join(output_dir, f"OmniDocBench_Single_batch{args.batch_size}.json")
- final_results = {
- "stats": stats,
- "results": results
- }
-
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(final_results, f, ensure_ascii=False, indent=2)
-
- print(f"💾 Results saved to: {output_file}")
-
- return 0
-
- except Exception as e:
- print(f"❌ Processing failed: {e}", file=sys.stderr)
- traceback.print_exc()
- return 1
- if __name__ == "__main__":
- print(f"🚀 启动单进程OCR程序...")
- print(f"🔧 CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES', 'Not set')}")
-
- if len(sys.argv) == 1:
- # 如果没有命令行参数,使用默认配置运行
- print("ℹ️ No command line arguments provided. Running with default configuration...")
-
- # 默认配置
- default_config = {
- "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images",
- "output_dir": "./OmniDocBench_Results_Single",
- "pipeline": "PP-StructureV3",
- "device": "gpu:0",
- "batch_size": 4,
- }
-
- # 构造参数
- sys.argv = [sys.argv[0]]
- for key, value in default_config.items():
- sys.argv.extend([f"--{key}", str(value)])
-
- # 测试模式
- sys.argv.append("--test_mode")
-
- sys.exit(main())
|