# 流式处理模式说明 ## 📋 概述 流式处理模式是对原有批量处理模式的优化,主要解决大文档处理时的内存占用问题。 ### 原有模式(批量处理) - **处理方式**:将所有页面读入内存,处理完成后统一保存 - **内存占用**:高(所有页面的图像、OCR结果、表格数据都在内存中) - **适用场景**:小到中等文档(< 50页) ### 流式处理模式 - **处理方式**:按页处理,处理完一页立即保存并释放内存 - **内存占用**:低(只保留当前页面的数据) - **适用场景**:大文档(> 50页)或内存受限环境 ## 🎯 核心优势 ### 1. **内存优化** ``` 批量模式内存占用: - 100页文档 × 每页约50MB = 5GB内存 流式模式内存占用: - 当前页约50MB + 元数据约10MB = 60MB内存 ``` ### 2. **容错性提升** - 批量模式:处理到第99页出错,前98页数据丢失 - 流式模式:处理到第99页出错,前98页已保存,可继续处理 ### 3. **实时反馈** - 可以实时查看已处理页面的结果 - 不需要等待所有页面处理完成 ## 🔧 使用方法 ### 命令行使用 ```bash # 使用流式处理模式 python main_v2.py -i large_doc.pdf -c config.yaml --streaming # 批量处理模式(默认) python main_v2.py -i small_doc.pdf -c config.yaml ``` ### 代码使用 ```python from core.pipeline_manager_v2_streaming import StreamingDocPipeline # 初始化流式处理流水线 pipeline = StreamingDocPipeline(config_path, output_dir) # 处理文档 results = pipeline.process_document_streaming( document_path="large_doc.pdf", page_range="1-100", output_config={ 'save_json': True, 'save_markdown': True, 'save_page_json': True, 'normalize_numbers': True, 'merge_cross_page_tables': True, } ) ``` ## 📊 处理流程对比 ### 批量处理模式 ``` 1. 加载所有页面到内存 2. 处理第1页 → 存储到 results['pages'][0] 3. 处理第2页 → 存储到 results['pages'][1] 4. ... 5. 处理第N页 → 存储到 results['pages'][N-1] 6. 统一保存所有结果 7. 生成完整Markdown ``` ### 流式处理模式 ``` 1. 初始化Markdown文件(流式写入) 2. 处理第1页 → 立即保存 page_001.json → 立即保存图片元素 → 写入Markdown(单页内容) → 释放内存 3. 处理第2页 → 立即保存 page_002.json → ... 4. ... 5. 处理第N页 → 立即保存 page_NNN.json → ... 6. 关闭Markdown文件 7. 从已保存的JSON文件加载 8. 跨页表格合并 9. 重新生成完整Markdown(包含合并后的表格) 10. 生成middle.json ``` ## 📁 输出文件结构 ### 批量模式输出 ``` output/ ├── doc_name_middle.json # 完整middle.json ├── doc_name.md # 完整Markdown ├── doc_name_page_001.json # 第1页JSON ├── doc_name_page_002.json # 第2页JSON └── images/ # 图片元素 ``` ### 流式模式输出 ``` output/ ├── doc_name_middle.json # 完整middle.json(最后生成) ├── doc_name.md # 完整Markdown(包含合并表格) ├── doc_name_page_001.json # 第1页JSON(立即保存) ├── doc_name_page_002.json # 第2页JSON(立即保存) ├── images/ # 图片元素(立即保存) └── _temp_pages/ # 临时JSON文件(最后清理) ├── page_001.json └── page_002.json ``` ## ⚙️ 配置选项 ### output_config 参数 | 参数 | 类型 | 默认值 | 说明 | |-----|------|--------|------| | `save_json` | bool | True | 是否生成middle.json | | `save_markdown` | bool | True | 是否生成Markdown | | `save_page_json` | bool | True | 是否保存每页JSON | | `save_images` | bool | True | 是否保存图片元素 | | `save_layout_image` | bool | False | 是否保存layout可视化图片 | | `save_ocr_image` | bool | False | 是否保存OCR可视化图片 | | `normalize_numbers` | bool | True | 是否标准化金额数字 | | `merge_cross_page_tables` | bool | True | 是否合并跨页表格 | | `cleanup_temp_files` | bool | True | 是否清理临时文件 | ## 🔍 性能对比 ### 内存占用 | 文档页数 | 批量模式 | 流式模式 | 节省 | |---------|---------|---------|------| | 10页 | ~500MB | ~60MB | 88% | | 50页 | ~2.5GB | ~60MB | 97.6% | | 100页 | ~5GB | ~60MB | 98.8% | | 500页 | ~25GB | ~60MB | 99.76% | ### 处理时间 - **批量模式**:略快(无需重复加载JSON) - **流式模式**:略慢(需要保存和重新加载JSON) **差异**:通常 < 5%,对于大文档可以忽略 ## ⚠️ 注意事项 ### 1. 跨页表格合并 - 流式模式需要重新加载所有页面JSON才能合并跨页表格 - 这会导致额外的I/O开销,但内存占用仍然很低 ### 2. 临时文件 - 流式模式会在 `_temp_pages/` 目录创建临时JSON文件 - 处理完成后会自动清理(如果 `cleanup_temp_files=True`) ### 3. Markdown生成 - 流式模式会先生成一个临时Markdown(边处理边写入) - 跨页表格合并后,会重新生成完整的Markdown ### 4. 错误恢复 - 如果处理中断,已保存的页面JSON可以用于恢复 - 可以指定 `page_range` 参数继续处理剩余页面 ## 🎯 使用建议 ### 使用流式模式 - ✅ 文档页数 > 50页 - ✅ 内存受限环境(< 8GB RAM) - ✅ 需要实时查看处理结果 - ✅ 需要容错性(处理中断后可恢复) ### 使用批量模式 - ✅ 文档页数 < 50页 - ✅ 内存充足(> 16GB RAM) - ✅ 需要最快处理速度 - ✅ 不需要实时查看结果 ## 📝 示例 ### 处理大文档(100页) ```bash # 使用流式模式,节省内存 python main_v2.py \ -i large_report.pdf \ -c config/bank_statement_mineru_v2.yaml \ --streaming \ --output_dir ./output/large_report_streaming ``` ### 处理指定页面范围 ```bash # 流式模式 + 页面范围 python main_v2.py \ -i large_report.pdf \ -c config.yaml \ --streaming \ -p 1-50 # 只处理前50页 ``` ### 继续处理剩余页面 ```bash # 如果之前处理到第50页中断,可以继续处理 python main_v2.py \ -i large_report.pdf \ -c config.yaml \ --streaming \ -p 51- # 从第51页到最后 ``` ## 🔄 迁移指南 ### 从批量模式迁移到流式模式 1. **添加 `--streaming` 参数**: ```bash # 之前 python main_v2.py -i doc.pdf -c config.yaml # 之后 python main_v2.py -i doc.pdf -c config.yaml --streaming ``` 2. **代码修改**: ```python # 之前 from core.pipeline_manager_v2 import EnhancedDocPipeline pipeline = EnhancedDocPipeline(config_path) results = pipeline.process_document(document_path) # 之后 from core.pipeline_manager_v2_streaming import StreamingDocPipeline pipeline = StreamingDocPipeline(config_path, output_dir) results = pipeline.process_document_streaming( document_path, output_config=output_config ) ``` 3. **输出格式保持一致**: - 流式模式和批量模式的输出格式完全相同 - 可以直接替换使用,无需修改后续处理代码 ## 📚 相关文档 - [模型统一框架.md](./模型统一框架.md) - 整体架构说明 - [OCR识别差异分析与改进方案.md](./OCR识别差异分析与改进方案.md) - OCR优化说明