#!/usr/bin/env python3 """ 金融文档处理统一入口 v2 支持完整的处理流程: 1. PDF分类(扫描件/数字原生PDF) 2. 页面方向识别 3. Layout检测 4. 并行处理:文本OCR + 表格VLM识别 5. 单元格坐标匹配 6. 多格式输出(JSON、Markdown、HTML、可视化图片) 使用方法: # 处理单个PDF python main_v2.py -i /path/to/document.pdf -c ./config/bank_statement_mineru_vl.yaml # 处理图片目录 python main_v2.py -i /path/to/images/ -c ./config/bank_statement_paddle_vl.yaml # 开启debug模式(输出可视化图片) python main_v2.py -i /path/to/doc.pdf -c ./config/xxx.yaml --debug """ import argparse import json import sys import os from pathlib import Path from typing import Optional from loguru import logger from datetime import datetime # 添加 ocr_platform 根目录到 Python 路径(用于导入 ocr_utils) ocr_platform_root = Path(__file__).parents[2] # universal_doc_parser -> ocr_tools -> ocr_platform -> repository.git if str(ocr_platform_root) not in sys.path: sys.path.insert(0, str(ocr_platform_root)) # 添加当前目录到 Python 路径(用于相对导入) project_root = Path(__file__).parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from dotenv import load_dotenv load_dotenv(override=True) from core.pipeline_manager_v2 import EnhancedDocPipeline from core.pipeline_manager_v2_streaming import StreamingDocPipeline # 从 ocr_utils 导入工具函数 try: from ocr_utils import OutputFormatterV2 except ImportError: # 降级:从 utils 导入(向后兼容) from utils import OutputFormatterV2 # ==================== Helper Functions ==================== def _print_environment_info(): """打印环境变量信息""" env_vars = [ 'CUDA_VISIBLE_DEVICES', 'HF_HOME', 'HF_ENDPOINT', 'HF_HUB_OFFLINE', 'TORCH_HOME', 'MODELSCOPE_CACHE', 'USE_MODELSCOPE_HUB', 'MINERU_MODEL_SOURCE' ] for var in env_vars: print(f"🔧 {var}: {os.environ.get(var, 'Not set')}") def _validate_arguments(args: argparse.Namespace) -> bool: """验证命令行参数""" input_path = Path(args.input) if not input_path.exists(): logger.error(f"❌ 输入路径不存在: {input_path}") return False config_path = Path(args.config) if not config_path.exists(): logger.error(f"❌ 配置文件不存在: {config_path}") return False return True def _handle_dry_run(args: argparse.Namespace) -> bool: """处理dry run模式""" if args.dry_run: if _validate_arguments(args): logger.info("✅ 配置验证通过(dry run)") return True return False return False def _create_pipeline( streaming: bool, config_path: str, output_dir: str, debug: bool = False, debug_layout: bool = False, debug_table: bool = False, debug_ocr: bool = False ): """ 创建并初始化处理流水线(应用 debug 覆盖) Args: streaming: 是否使用流式处理模式 config_path: 配置文件路径 output_dir: 输出目录 debug: 全局 debug 开关 debug_layout: 布局检测 debug 开关 debug_table: 表格识别 debug 开关 debug_ocr: OCR 识别 debug 开关 Returns: 初始化后的 pipeline 实例 """ # 1. 先加载配置 from core.config_manager import ConfigManager config = ConfigManager.load_config(config_path) # 2. 应用 debug 覆盖(在创建 pipeline 之前) if debug or debug_layout or debug_table or debug_ocr: _apply_debug_overrides_to_config(config, debug, debug_layout, debug_table, debug_ocr) # 3. 创建 pipeline(adapter 会读取到已修改的 config) if streaming: logger.info("🔄 Using streaming processing mode (memory-efficient)") pipeline = StreamingDocPipeline(config, output_dir, config_is_dict=True) else: logger.info("🔄 Using batch processing mode (all pages in memory)") pipeline = EnhancedDocPipeline(config, config_is_dict=True) return pipeline def _get_default_output_config(debug: bool) -> dict: """获取默认输出配置""" return { 'create_subdir': True, 'save_pdf_images': False, 'save_json': True, 'save_markdown': True, 'save_html': True, 'save_page_json': True, 'save_images': True, 'save_layout_image': debug, 'save_ocr_image': debug, 'draw_type_label': True, 'draw_bbox_number': True, 'save_enhanced_json': True, 'normalize_numbers': True, 'merge_cross_page_tables': True, } def _apply_debug_overrides_to_config( config: dict, debug: bool, debug_layout: bool, debug_table: bool, debug_ocr: bool ): """ 应用命令行 debug 参数覆盖配置文件设置(在创建 pipeline 之前) 优先级规则: 1. --debug: 启用所有模块的 debug 2. --debug-layout/--debug-table/--debug-ocr: 精细控制各模块 3. 配置文件的 debug_options 只提供默认值 Args: config: 配置字典(会被直接修改) debug: 全局 debug 开关 debug_layout: 布局检测 debug 开关 debug_table: 表格识别 debug 开关 debug_ocr: OCR 识别 debug 开关 """ # 确定需要启用哪些模块的 debug enable_layout_debug = debug or debug_layout enable_table_debug = debug or debug_table enable_ocr_debug = debug or debug_ocr # 1. 布局检测 debug if enable_layout_debug: if 'layout_detection' in config: if 'debug_options' not in config['layout_detection']: config['layout_detection']['debug_options'] = {} config['layout_detection']['debug_options']['enabled'] = True logger.info("✅ 启用布局检测 debug 输出") # 2. 表格分类 debug if enable_table_debug: if 'table_classification' in config: if 'debug_options' not in config['table_classification']: config['table_classification']['debug_options'] = {} config['table_classification']['debug_options']['enabled'] = True logger.info("✅ 启用表格分类 debug 输出") # 3. 有线表格识别 debug if enable_table_debug: if 'table_recognition_wired' in config: if 'debug_options' not in config['table_recognition_wired']: config['table_recognition_wired']['debug_options'] = {} config['table_recognition_wired']['debug_options']['enabled'] = True logger.info("✅ 启用有线表格识别 debug 输出") # 4. OCR 识别 debug(如果有 debug_options) if enable_ocr_debug: if 'ocr_recognition' in config: if 'debug_options' not in config['ocr_recognition']: config['ocr_recognition']['debug_options'] = {} config['ocr_recognition']['debug_options']['enabled'] = True logger.info("✅ 启用 OCR 识别 debug 输出") # 5. 更新输出配置 if enable_layout_debug or enable_ocr_debug or enable_table_debug: output_config = config.get('output', {}) output_config['debug_mode'] = True if enable_layout_debug or enable_ocr_debug: output_config.setdefault('save_layout_image', True) output_config.setdefault('save_ocr_image', True) # 输出当前 debug 状态 if debug: logger.info("🐛 全局 Debug 模式已启用(所有模块)") else: debug_modules = [] if debug_layout: debug_modules.append("布局检测") if debug_table: debug_modules.append("表格识别") if debug_ocr: debug_modules.append("OCR识别") if debug_modules: logger.info(f"🐛 Debug 模式已启用: {', '.join(debug_modules)}") def setup_logging(log_level: str = "INFO", log_file: Optional[str] = None): """设置日志""" logger.remove() # 控制台输出 logger.add( sys.stdout, level=log_level, format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}" ) # 文件输出 if log_file: logger.add( log_file, level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", rotation="10 MB" ) def process_single_input( input_path: Path, config_path: Path, output_dir: Path, debug: bool = False, debug_layout: bool = False, debug_table: bool = False, debug_ocr: bool = False, scene: Optional[str] = None, page_range: Optional[str] = None, streaming: bool = False ) -> dict: """ 处理单个输入(文件或目录) Args: input_path: 输入路径 config_path: 配置文件路径 output_dir: 输出目录 debug: 全局debug开关(启用所有模块debug) debug_layout: 仅启用布局检测debug debug_table: 仅启用表格识别debug debug_ocr: 仅启用OCR识别debug scene: 场景类型覆盖 page_range: 页面范围(如 "1-5,7,9-12") streaming: 是否使用流式处理模式(按页处理,立即保存,节省内存) Returns: 处理结果和输出路径 """ try: # 创建流水线(debug 覆盖已在 _create_pipeline 中应用) pipeline = _create_pipeline( streaming, str(config_path), str(output_dir), debug=debug, debug_layout=debug_layout, debug_table=debug_table, debug_ocr=debug_ocr ) output_config = pipeline.config.get('output', {}) or _get_default_output_config(debug) use_context = not streaming and hasattr(pipeline, '__enter__') if use_context: pipeline = pipeline.__enter__() try: if scene: pipeline.scene_name = scene if hasattr(pipeline, 'set_scene_name'): pipeline.set_scene_name(scene) logger.info(f"🔄 Scene overridden to: {scene}") logger.info(f"🚀 开始处理: {input_path}") logger.info(f"📋 场景配置: {pipeline.scene_name}") logger.info(f"📁 输出目录: {output_dir}") if page_range: logger.info(f"📄 页面范围: {page_range}") start_time = datetime.now() if streaming: # 流式处理模式 results = pipeline.process_document_streaming( str(input_path), page_range=page_range, output_config=output_config ) process_time = (datetime.now() - start_time).total_seconds() _print_summary_streaming(results, process_time) return { 'success': True, 'results': results, 'output_paths': results.get('output_paths', {}), 'process_time': process_time } else: # 批量处理模式 results = pipeline.process_document( str(input_path), page_range=page_range, output_dir=str(output_dir) ) process_time = (datetime.now() - start_time).total_seconds() logger.info(f"⏱️ 处理耗时: {process_time:.2f}秒") logger.info("💾 保存结果...") formatter = OutputFormatterV2(str(output_dir)) output_paths = formatter.save_results(results, output_config) _print_summary(results, output_paths, process_time) return { 'success': True, 'results': results, 'output_paths': output_paths, 'process_time': process_time } finally: if use_context: pipeline.__exit__(None, None, None) except Exception as e: logger.error(f"❌ 处理失败: {e}") import traceback traceback.print_exc() return { 'success': False, 'error': str(e) } def _print_summary(results: dict, output_paths: dict, process_time: float): """打印处理结果摘要""" total_pages = len(results.get('pages', [])) total_tables = 0 total_text_blocks = 0 total_cells = 0 for page in results.get('pages', []): for element in page.get('elements', []): elem_type = element.get('type', '') if elem_type in ['table', 'table_body']: total_tables += 1 cells = element.get('content', {}).get('cells', []) total_cells += len(cells) elif elem_type in ['text', 'title', 'ocr_text', 'ref_text']: total_text_blocks += 1 print(f"\n{'='*60}") print(f"📊 处理摘要") print(f"{'='*60}") print(f" 📄 文档: {results.get('document_path', 'N/A')}") print(f" 🎯 场景: {results.get('scene', 'N/A')}") print(f" 📋 PDF类型: {results.get('metadata', {}).get('pdf_type', 'N/A')}") print(f" 📖 页面数: {total_pages}") print(f" 📋 表格数: {total_tables}") print(f" 📝 文本块: {total_text_blocks}") print(f" 🔢 单元格: {total_cells} (带坐标)") print(f" ⏱️ 耗时: {process_time:.2f}秒") print(f"{'='*60}") print(f"📁 输出文件:") for key, path in output_paths.items(): if isinstance(path, list): for p in path: print(f" - {p}") else: print(f" - {path}") print(f"{'='*60}\n") def _print_summary_streaming(results_summary: dict, process_time: float): """打印流式处理结果摘要""" print(f"\n{'='*60}") print(f"📊 处理摘要(流式模式)") print(f"{'='*60}") print(f" 📄 文档: {results_summary.get('document_path', 'N/A')}") print(f" 🎯 场景: {results_summary.get('scene', 'N/A')}") print(f" 📋 PDF类型: {results_summary.get('metadata', {}).get('pdf_type', 'N/A')}") print(f" 📖 页面数: {results_summary.get('total_pages', 0)}") print(f" ⏱️ 耗时: {process_time:.2f}秒") print(f"{'='*60}") print(f"📁 输出文件:") output_paths = results_summary.get('output_paths', {}) if output_paths.get('middle_json'): print(f" - {output_paths['middle_json']}") if output_paths.get('json_pages'): print(f" - {len(output_paths['json_pages'])} 个页面JSON文件") if output_paths.get('images'): print(f" - {len(output_paths['images'])} 个图片文件") print(f"{'='*60}\n") def main(): parser = argparse.ArgumentParser( description="金融文档处理工具 v2", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 处理单个PDF文件 python main_v2.py -i document.pdf -c config/bank_statement_mineru_vl.yaml # 处理图片目录 python main_v2.py -i ./images/ -c config/bank_statement_paddle_vl.yaml # 开启全局debug模式(所有模块输出可视化图片) python main_v2.py -i doc.pdf -c config.yaml --debug # 开启特定模块的debug(精细控制) python main_v2.py -i doc.pdf -c config.yaml --debug-layout # 仅布局debug python main_v2.py -i doc.pdf -c config.yaml --debug-table # 仅表格debug python main_v2.py -i doc.pdf -c config.yaml --debug-layout --debug-table # 组合 # 指定输出目录 python main_v2.py -i doc.pdf -c config.yaml -o ./my_output/ # 指定页面范围(PDF按页码,图片目录按排序位置) python main_v2.py -i doc.pdf -c config.yaml -p 1-5 # 处理第1-5页 python main_v2.py -i doc.pdf -c config.yaml -p 3,7,10 # 处理第3、7、10页 python main_v2.py -i doc.pdf -c config.yaml -p 1-5,8-10 # 处理第1-5、8-10页 python main_v2.py -i doc.pdf -c config.yaml -p 5- # 从第5页到最后 # 使用流式处理模式(节省内存,适合大文档) python main_v2.py -i large_doc.pdf -c config.yaml --streaming """ ) parser.add_argument( "--input", "-i", required=True, help="输入路径(PDF文件、图片文件或图片目录)" ) parser.add_argument( "--config", "-c", required=True, help="配置文件路径" ) parser.add_argument( "--output_dir", "-o", default="./output", help="输出目录(默认: ./output)" ) parser.add_argument( "--scene", "-s", required=True, choices=["bank_statement", "financial_report"], help="场景类型(覆盖配置文件设置)" ) parser.add_argument( "--debug", action="store_true", help="开启全局debug模式(启用所有模块的调试输出)" ) parser.add_argument( "--debug-layout", action="store_true", help="仅开启布局检测的debug输出" ) parser.add_argument( "--debug-table", action="store_true", help="仅开启表格识别的debug输出" ) parser.add_argument( "--debug-ocr", action="store_true", help="仅开启OCR识别的debug输出" ) parser.add_argument( "--log_level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="日志级别(默认: INFO)" ) parser.add_argument( "--log_file", help="日志文件路径" ) parser.add_argument( "--dry_run", action="store_true", help="仅验证配置,不执行处理" ) parser.add_argument( "--pages", "-p", help="页面范围(PDF按页码,图片目录按排序位置),如: 1-5,7,9-12" ) parser.add_argument( "--streaming", action="store_true", help="使用流式处理模式(按页处理,立即保存,节省内存,适合大文档)" ) args = parser.parse_args() setup_logging(args.log_level, args.log_file) if _handle_dry_run(args): return 0 if not _validate_arguments(args): return 1 result = process_single_input( input_path=Path(args.input), config_path=Path(args.config), output_dir=Path(args.output_dir), debug=args.debug, debug_layout=args.debug_layout, debug_table=args.debug_table, debug_ocr=args.debug_ocr, scene=args.scene, page_range=args.pages, streaming=args.streaming ) return 0 if result.get('success') else 1 if __name__ == "__main__": _print_environment_info() if len(sys.argv) == 1: print("ℹ️ 未提供命令行参数,使用默认配置运行...") # 默认配置(用于开发测试) default_config = { # 测试输入 # "input": "/Users/zhch158/workspace/data/流水分析/湛_平安银行图.pdf", # "output_dir": "./output/湛_平安银行图/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/data/流水分析/张_微信图.pdf", # "output_dir": "./output/张_微信图/bank_statement_yusys_v4", # "input": "/Users/zhch158/workspace/data/流水分析/许_民生银行图.pdf", # "output_dir": "./output/许_民生银行图/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/data/流水分析/康强_北京农村商业银行.pdf", # "output_dir": "./output/康强_北京农村商业银行/bank_statement_mineru_vl", # "input": "/Users/zhch158/workspace/repository.git/ocr_platform/ocr_tools/universal_doc_parser/tests/A用户_单元格扫描流水_page_002.png", # "output_dir": "./output/A用户_单元格扫描流水_bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/data/流水分析/B用户_扫描流水.pdf", # "output_dir": "/Users/zhch158/workspace/data/流水分析/B用户_扫描流水/bank_statement_yusys_v2", # "input": "/Users/zhch158/workspace/repository.git/ocr_platform/ocr_tools/universal_doc_parser/tests/2023年度报告母公司_page_005.png", # "input": "/Users/zhch158/workspace/repository.git/ocr_platform/ocr_tools/universal_doc_parser/tests/2023年度报告母公司_page_003.png", # "input": "/Users/zhch158/workspace/repository.git/ocr_platform/ocr_tools/universal_doc_parser/tests/2023年度报告母公司_page_003.png", # "input": "/Users/zhch158/workspace/repository.git/ocr_platform/ocr_tools/universal_doc_parser/tests/2023年度报告母公司_page_003_270_skew(-0.4).png", # "input": "/Users/zhch158/workspace/data/流水分析/2023年度报告母公司.pdf", # "output_dir": "./output/2023年度报告母公司/bank_statement_yusys_v3", # "output_dir": "/Users/zhch158/workspace/data/流水分析/2023年度报告母公司/bank_statement_yusys_v3", # "output_dir": "/Users/zhch158/workspace/data/流水分析/2023年度报告母公司/bank_statement_glm_vl", # "input": "/Users/zhch158/workspace/data/流水分析/2023年度报告母公司.pdf", # "output_dir": "/Users/zhch158/workspace/data/流水分析/2023年度报告母公司/bank_statement_yusys_v2", # # "input": "/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水.pdf", # "output_dir": "/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/repository.git/ocr_platform/ocr_tools/universal_doc_parser/tests/600916_中国黄金_2022年报_page_096.png", # "output_dir": "./output/600916_中国黄金_2022年报/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/data/流水分析/600916_中国黄金_2022年报.pdf", # "output_dir": "./output/600916_中国黄金_2022年报/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/data/流水分析/德_内蒙古银行照.pdf", # "output_dir": "./output/德_内蒙古银行照/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/repository.git/ocr_platform/ocr_tools/universal_doc_parser/tests/提取自赤峰黄金2023年报.pdf", # "output_dir": "./output/提取自赤峰黄金2023年报/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/data/流水分析/提取自赤峰黄金2023年报.pdf", # "output_dir": "./output/提取自赤峰黄金2023年报/bank_statement_yusys_v4", # "output_dir": "/Users/zhch158/workspace/data/流水分析/提取自赤峰黄金2023年报/bank_statement_yusys_v4", # "input": "/Users/zhch158/workspace/data/流水分析/施博深.pdf", # "output_dir": "/Users/zhch158/workspace/data/流水分析/施博深/bank_statement_yusys_v3", # "output_dir": "./output/施博深/bank_statement_smart_router", # "input": "/Users/zhch158/workspace/data/流水分析/施博深.wiredtable/施博深_page_020.png", # "output_dir": "./output/施博深/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/data/流水分析/施博深.wiredtable", # "output_dir": "/Users/zhch158/workspace/data/流水分析/施博深/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/data/流水分析/山西云集科技有限公司.pdf", # "output_dir": "/Users/zhch158/workspace/data/流水分析/山西云集科技有限公司/bank_statement_yusys_v3", # "input": "/Users/zhch158/workspace/data/OCBC/数据迁移_20260316173209_180_7.jpg", # "input": "/Users/zhch158/workspace/data/OCBC/微信图片_20260316173209_180_7.jpg", # "output_dir": "/Users/zhch158/workspace/data/OCBC/bank_statement_yusys_v4", # "input": "/Users/zhch158/workspace/data/流水分析/韩_中国银行图.pdf", # "output_dir": "/Users/zhch158/workspace/data/流水分析/韩_中国银行图/bank_statement_yusys_v4", # "input": "/Users/zhch158/workspace/data/流水分析/杨万益_福建农信.pdf", # "output_dir": "/Users/zhch158/workspace/data/流水分析/杨万益_福建农信/bank_statement_yusys_local", # "config": "./config/bank_statement_yusys_local.yaml", "input": "/Users/zhch158/workspace/data/流水分析/杨万益_福建农信.pdf", "output_dir": "/Users/zhch158/workspace/data/流水分析/杨万益_福建农信/bank_statement_paddle_vl_local", "config": "./config/bank_statement_paddle_vl_local.yaml", # 日志文件 "log_file": "./output/logs/bank_statement_paddle_vl_local/process.log", # 配置文件 # "config": "./config/bank_statement_yusys_v4.yaml", # "config": "./config/bank_statement_yusys_v3.yaml", # "config": "./config/bank_statement_smart_router.yaml", # "config": "./config/bank_statement_mineru_vl.yaml", # "config": "./config/bank_statement_yusys_v2.yaml", # "config": "./config/bank_statement_paddle_vl.yaml", # 场景 "scene": "bank_statement", # "scene": "financial_report", # 页面范围(可选) # "pages": "1", # 只处理前1页 # "pages": "1-3,5,7-10", # 处理指定页面 # "pages": "83-109", # 处理指定页面 "streaming": True, # Debug模式 "debug": True, # 日志级别 "log_level": "DEBUG", } # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): if isinstance(value, bool): if value: sys.argv.append(f"--{key}") else: sys.argv.extend([f"--{key}", str(value)]) sys.exit(main())