#!/usr/bin/env python3 """ 批量处理图片/PDF文件并生成符合评测要求的预测结果(MinerU版本) 根据 MinerU demo.py 框架调用方式: - 输入:支持 PDF 和各种图片格式(统一使用 --input 参数) - 输出:每个文件对应的 .md、.json 文件,所有图片保存为单独的图片文件 - 调用方式:通过 vlm-http-client 连接到 MinerU vLLM 服务器 使用方法: python main.py --input document.pdf --output_dir ./output python main.py --input ./images/ --output_dir ./output python main.py --input file_list.txt --output_dir ./output python main.py --input results.csv --output_dir ./output --dry_run """ import os import sys import json import time import traceback from pathlib import Path from typing import List, Dict, Any from tqdm import tqdm import argparse from loguru import logger # 导入 MinerU 核心组件 from mineru.cli.common import read_fn, convert_pdf_bytes_to_bytes_by_pypdfium2 # 导入 ocr_utils ocr_platform_root = Path(__file__).parents[2] if str(ocr_platform_root) not in sys.path: sys.path.insert(0, str(ocr_platform_root)) from ocr_utils import ( get_input_files, collect_pid_files, PDFUtils, setup_logging ) # 导入处理器 try: from .processor import MinerUVLLMProcessor except ImportError: from processor import MinerUVLLMProcessor def process_images_single_process( image_paths: List[str], processor: MinerUVLLMProcessor, batch_size: int = 1, output_dir: str = "./output" ) -> List[Dict[str, Any]]: """ 单进程版本的图像处理函数 Args: image_paths: 图像文件路径列表 processor: MinerU vLLM 处理器 batch_size: 批次大小 output_dir: 输出目录 Returns: 处理结果列表 """ # 创建输出目录 output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) all_results = [] total_images = len(image_paths) logger.info(f"Processing {total_images} images with batch size {batch_size}") with tqdm(total=total_images, desc="Processing images", unit="img", bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar: for i in range(0, total_images, batch_size): batch = image_paths[i:i + batch_size] batch_start_time = time.time() batch_results = [] try: for image_path in batch: try: result = processor.process_single_image(image_path, output_dir) batch_results.append(result) except Exception as e: logger.error(f"Error processing {image_path}: {e}") batch_results.append({ "image_path": image_path, "processing_time": 0, "success": False, "server": processor.server_url, "error": str(e) }) batch_processing_time = time.time() - batch_start_time all_results.extend(batch_results) # 更新进度条 success_count = sum(1 for r in batch_results if r.get('success', False)) skipped_count = sum(1 for r in batch_results if r.get('skipped', False)) total_success = sum(1 for r in all_results if r.get('success', False)) total_skipped = sum(1 for r in all_results if r.get('skipped', False)) avg_time = batch_processing_time / len(batch) if len(batch) > 0 else 0 total_blocks = sum(r.get('extraction_stats', {}).get('total_blocks', 0) for r in batch_results) pbar.update(len(batch)) pbar.set_postfix({ 'batch_time': f"{batch_processing_time:.2f}s", 'avg_time': f"{avg_time:.2f}s/img", 'success': f"{total_success}/{len(all_results)}", 'skipped': f"{total_skipped}", 'blocks': f"{total_blocks}", 'rate': f"{total_success/len(all_results)*100:.1f}%" if len(all_results) > 0 else "0%" }) except Exception as e: logger.error(f"Error processing batch {[Path(p).name for p in batch]}: {e}") error_results = [] for img_path in batch: error_results.append({ "image_path": str(img_path), "processing_time": 0, "success": False, "server": processor.server_url, "error": str(e) }) all_results.extend(error_results) pbar.update(len(batch)) return all_results def main(): """主函数""" parser = argparse.ArgumentParser( description="MinerU vLLM Batch Processing (demo.py framework)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 处理单个PDF文件 python main.py --input document.pdf --output_dir ./output # 处理图片目录 python main.py --input ./images/ --output_dir ./output # 处理文件列表 python main.py --input file_list.txt --output_dir ./output # 处理CSV文件(失败的文件) python main.py --input results.csv --output_dir ./output # 指定页面范围(仅PDF) python main.py --input document.pdf --output_dir ./output --pages "1-5,7" # 启用调试模式 python main.py --input document.pdf --output_dir ./output --debug # 仅验证配置(dry run) python main.py --input document.pdf --output_dir ./output --dry_run """ ) # 输入参数(统一使用 --input) parser.add_argument( "--input", "-i", required=True, type=str, help="输入路径(支持PDF文件、图片文件、图片目录、文件列表.txt、CSV文件)" ) # 输出参数 parser.add_argument( "--output_dir", "-o", type=str, required=True, help="输出目录" ) # MinerU vLLM 参数 parser.add_argument( "--server_url", type=str, default="http://127.0.0.1:20006", help="MinerU vLLM server URL" ) parser.add_argument( "--timeout", type=int, default=300, help="Request timeout in seconds" ) parser.add_argument( "--pdf_dpi", type=int, default=200, help="DPI for PDF to image conversion" ) parser.add_argument( '--no-normalize', action='store_true', help='禁用数字标准化' ) parser.add_argument( '--debug', action='store_true', help='启用调试模式' ) # 处理参数 parser.add_argument( "--batch_size", type=int, default=1, help="Batch size" ) parser.add_argument( "--pages", "-p", type=str, help="页面范围(PDF和图片目录有效),如: '1-5,7,9-12', '1-', '-10'" ) parser.add_argument( "--collect_results", type=str, help="收集处理结果到指定CSV文件" ) # 日志参数 parser.add_argument( "--log_level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="日志级别(默认: INFO)" ) parser.add_argument( "--log_file", type=str, help="日志文件路径" ) # Dry run 参数 parser.add_argument( "--dry_run", action="store_true", help="仅验证配置和输入,不执行实际处理" ) args = parser.parse_args() # 设置日志 setup_logging(args.log_level, args.log_file) try: # 创建参数对象(用于 get_input_files) class Args: def __init__(self, input_path, output_dir, pdf_dpi): self.input = input_path self.output_dir = output_dir self.pdf_dpi = pdf_dpi args_obj = Args(args.input, args.output_dir, args.pdf_dpi) # 获取并预处理输入文件(页面范围过滤已在 get_input_files 中处理) logger.info("🔄 Preprocessing input files...") if args.pages: logger.info(f"📄 页面范围: {args.pages}") image_files = get_input_files(args_obj, page_range=args.pages) if not image_files: logger.error("❌ No input files found or processed") return 1 output_dir = Path(args.output_dir).resolve() logger.info(f"📁 Output dir: {output_dir}") logger.info(f"📊 Found {len(image_files)} image files to process") # Dry run 模式 if args.dry_run: logger.info("🔍 Dry run mode: 仅验证配置,不执行处理") logger.info(f"📋 配置信息:") logger.info(f" - 输入: {args.input}") logger.info(f" - 输出目录: {output_dir}") logger.info(f" - 服务器: {args.server_url}") logger.info(f" - 超时: {args.timeout}s") logger.info(f" - 批次大小: {args.batch_size}") logger.info(f" - PDF DPI: {args.pdf_dpi}") logger.info(f" - 数字标准化: {not args.no_normalize}") logger.info(f" - 调试模式: {args.debug}") if args.pages: logger.info(f" - 页面范围: {args.pages}") logger.info(f"📋 将要处理的文件 ({len(image_files)} 个):") for i, img_file in enumerate(image_files[:20], 1): # 只显示前20个 logger.info(f" {i}. {img_file}") if len(image_files) > 20: logger.info(f" ... 还有 {len(image_files) - 20} 个文件") logger.info("✅ Dry run 完成:配置验证通过") return 0 logger.info(f"🌐 Using server: {args.server_url}") logger.info(f"📦 Batch size: {args.batch_size}") logger.info(f"⏱️ Timeout: {args.timeout}s") # 创建处理器 processor = MinerUVLLMProcessor( server_url=args.server_url, timeout=args.timeout, normalize_numbers=not args.no_normalize, debug=args.debug ) # 开始处理 start_time = time.time() results = process_images_single_process( image_files, processor, args.batch_size, str(output_dir) ) total_time = time.time() - start_time # 统计结果 success_count = sum(1 for r in results if r.get('success', False)) skipped_count = sum(1 for r in results if r.get('skipped', False)) error_count = len(results) - success_count pdf_page_count = sum(1 for r in results if r.get('is_pdf_page', False)) # 统计提取的块信息 total_blocks = sum(r.get('extraction_stats', {}).get('total_blocks', 0) for r in results) block_type_stats = {} for result in results: if 'extraction_stats' in result and 'block_types' in result['extraction_stats']: for block_type, count in result['extraction_stats']['block_types'].items(): block_type_stats[block_type] = block_type_stats.get(block_type, 0) + count print(f"\n" + "="*60) print(f"✅ Processing completed!") print(f"📊 Statistics:") print(f" Total files processed: {len(image_files)}") print(f" PDF pages processed: {pdf_page_count}") print(f" Regular images processed: {len(image_files) - pdf_page_count}") print(f" Successful: {success_count}") print(f" Skipped: {skipped_count}") print(f" Failed: {error_count}") if len(image_files) > 0: print(f" Success rate: {success_count / len(image_files) * 100:.2f}%") print(f"📋 Content Extraction:") print(f" Total blocks extracted: {total_blocks}") if block_type_stats: print(f" Block types:") for block_type, count in sorted(block_type_stats.items()): print(f" {block_type}: {count}") print(f"⏱️ Performance:") print(f" Total time: {total_time:.2f} seconds") if total_time > 0: print(f" Throughput: {len(image_files) / total_time:.2f} images/second") print(f" Avg time per image: {total_time / len(image_files):.2f} seconds") print(f"\n📁 Output Structure (demo.py compatible):") print(f" output_dir/") print(f" ├── filename.md # Markdown content") print(f" ├── filename.json # Content list") print(f" ├── filename_layout.pdf # Debug: layout bbox") print(f" └── images/ # Extracted images") print(f" └── filename.png") if args.debug: print(f" ├── filename_middle.json # Debug: middle JSON") print(f" └── filename_model.json # Debug: model output") # 保存结果统计 stats = { "total_files": len(image_files), "pdf_pages": pdf_page_count, "regular_images": len(image_files) - pdf_page_count, "success_count": success_count, "skipped_count": skipped_count, "error_count": error_count, "success_rate": success_count / len(image_files) if len(image_files) > 0 else 0, "total_time": total_time, "throughput": len(image_files) / total_time if total_time > 0 else 0, "avg_time_per_image": total_time / len(image_files) if len(image_files) > 0 else 0, "batch_size": args.batch_size, "server": args.server_url, "backend": "vlm-http-client", "timeout": args.timeout, "pdf_dpi": args.pdf_dpi, "total_blocks": total_blocks, "block_type_stats": block_type_stats, "normalization_enabled": not args.no_normalize, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S") } # 保存最终结果 output_file_name = Path(output_dir).name output_file = output_dir / f"{output_file_name}_results.json" final_results = { "stats": stats, "results": results } with open(output_file, 'w', encoding='utf-8') as f: json.dump(final_results, f, ensure_ascii=False, indent=2) logger.info(f"💾 Results saved to: {output_file}") # 收集处理结果 if not args.collect_results: output_file_processed = output_dir / f"processed_files_{time.strftime('%Y%m%d_%H%M%S')}.csv" else: output_file_processed = Path(args.collect_results).resolve() processed_files = collect_pid_files(str(output_file)) with open(output_file_processed, 'w', encoding='utf-8') as f: f.write("image_path,status\n") for file_path, status in processed_files: f.write(f"{file_path},{status}\n") logger.info(f"💾 Processed files saved to: {output_file_processed}") return 0 except Exception as e: logger.error(f"Processing failed: {e}") traceback.print_exc() return 1 if __name__ == "__main__": logger.info(f"🚀 启动MinerU vLLM统一PDF/图像处理程序...") logger.info(f"🔧 CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES', 'Not set')}") if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 logger.info("ℹ️ No command line arguments provided. Running with default configuration...") # 默认配置 default_config = { "input": "/Users/zhch158/workspace/data/流水分析/马公账流水_工商银行.pdf", "output_dir": "./output", "server_url": "http://10.192.72.11:20006", "timeout": "300", "batch_size": "1", "pdf_dpi": "200", "pages": "-1", } # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) # 调试模式 sys.argv.append("--debug") sys.exit(main())