流式处理模式说明.md 7.3 KB

流式处理模式说明

📋 概述

流式处理模式是对原有批量处理模式的优化,主要解决大文档处理时的内存占用问题。

原有模式(批量处理)

  • 处理方式:将所有页面读入内存,处理完成后统一保存
  • 内存占用:高(所有页面的图像、OCR结果、表格数据都在内存中)
  • 适用场景:小到中等文档(< 50页)

流式处理模式

  • 处理方式:按页处理,处理完一页立即保存并释放内存
  • 内存占用:低(只保留当前页面的数据)
  • 适用场景:大文档(> 50页)或内存受限环境

🎯 核心优势

1. 内存优化

批量模式内存占用:
- 100页文档 × 每页约50MB = 5GB内存

流式模式内存占用:
- 当前页约50MB + 元数据约10MB = 60MB内存

2. 容错性提升

  • 批量模式:处理到第99页出错,前98页数据丢失
  • 流式模式:处理到第99页出错,前98页已保存,可继续处理

3. 实时反馈

  • 可以实时查看已处理页面的结果
  • 不需要等待所有页面处理完成

🔧 使用方法

命令行使用

# 使用流式处理模式
python main_v2.py -i large_doc.pdf -c config.yaml --streaming

# 批量处理模式(默认)
python main_v2.py -i small_doc.pdf -c config.yaml

代码使用

from core.pipeline_manager_v2_streaming import StreamingDocPipeline

# 初始化流式处理流水线
pipeline = StreamingDocPipeline(config_path, output_dir)

# 处理文档
results = pipeline.process_document_streaming(
    document_path="large_doc.pdf",
    page_range="1-100",
    output_config={
        'save_json': True,
        'save_markdown': True,
        'save_page_json': True,
        'normalize_numbers': True,
        'merge_cross_page_tables': True,
    }
)

📊 处理流程对比

批量处理模式

1. 加载所有页面到内存
2. 处理第1页 → 存储到 results['pages'][0]
3. 处理第2页 → 存储到 results['pages'][1]
4. ...
5. 处理第N页 → 存储到 results['pages'][N-1]
6. 统一保存所有结果
7. 生成完整Markdown

流式处理模式

1. 初始化Markdown文件(流式写入)
2. 处理第1页
   → 立即保存 page_001.json
   → 立即保存图片元素
   → 写入Markdown(单页内容)
   → 释放内存
3. 处理第2页
   → 立即保存 page_002.json
   → ...
4. ...
5. 处理第N页
   → 立即保存 page_NNN.json
   → ...
6. 关闭Markdown文件
7. 从已保存的JSON文件加载
8. 跨页表格合并
9. 重新生成完整Markdown(包含合并后的表格)
10. 生成middle.json

📁 输出文件结构

批量模式输出

output/
├── doc_name_middle.json          # 完整middle.json
├── doc_name.md                   # 完整Markdown
├── doc_name_page_001.json       # 第1页JSON
├── doc_name_page_002.json       # 第2页JSON
└── images/                       # 图片元素

流式模式输出

output/
├── doc_name_middle.json          # 完整middle.json(最后生成)
├── doc_name.md                   # 完整Markdown(包含合并表格)
├── doc_name_page_001.json       # 第1页JSON(立即保存)
├── doc_name_page_002.json       # 第2页JSON(立即保存)
├── images/                       # 图片元素(立即保存)
└── _temp_pages/                  # 临时JSON文件(最后清理)
    ├── page_001.json
    └── page_002.json

⚙️ 配置选项

output_config 参数

参数 类型 默认值 说明
save_json bool True 是否生成middle.json
save_markdown bool True 是否生成Markdown
save_page_json bool True 是否保存每页JSON
save_images bool True 是否保存图片元素
save_layout_image bool False 是否保存layout可视化图片
save_ocr_image bool False 是否保存OCR可视化图片
normalize_numbers bool True 是否标准化金额数字
merge_cross_page_tables bool True 是否合并跨页表格
cleanup_temp_files bool True 是否清理临时文件

🔍 性能对比

内存占用

文档页数 批量模式 流式模式 节省
10页 ~500MB ~60MB 88%
50页 ~2.5GB ~60MB 97.6%
100页 ~5GB ~60MB 98.8%
500页 ~25GB ~60MB 99.76%

处理时间

  • 批量模式:略快(无需重复加载JSON)
  • 流式模式:略慢(需要保存和重新加载JSON)

差异:通常 < 5%,对于大文档可以忽略

⚠️ 注意事项

1. 跨页表格合并

  • 流式模式需要重新加载所有页面JSON才能合并跨页表格
  • 这会导致额外的I/O开销,但内存占用仍然很低

2. 临时文件

  • 流式模式会在 _temp_pages/ 目录创建临时JSON文件
  • 处理完成后会自动清理(如果 cleanup_temp_files=True

3. Markdown生成

  • 流式模式会先生成一个临时Markdown(边处理边写入)
  • 跨页表格合并后,会重新生成完整的Markdown

4. 错误恢复

  • 如果处理中断,已保存的页面JSON可以用于恢复
  • 可以指定 page_range 参数继续处理剩余页面

🎯 使用建议

使用流式模式

  • ✅ 文档页数 > 50页
  • ✅ 内存受限环境(< 8GB RAM)
  • ✅ 需要实时查看处理结果
  • ✅ 需要容错性(处理中断后可恢复)

使用批量模式

  • ✅ 文档页数 < 50页
  • ✅ 内存充足(> 16GB RAM)
  • ✅ 需要最快处理速度
  • ✅ 不需要实时查看结果

📝 示例

处理大文档(100页)

# 使用流式模式,节省内存
python main_v2.py \
  -i large_report.pdf \
  -c config/bank_statement_mineru_v2.yaml \
  --streaming \
  --output_dir ./output/large_report_streaming

处理指定页面范围

# 流式模式 + 页面范围
python main_v2.py \
  -i large_report.pdf \
  -c config.yaml \
  --streaming \
  -p 1-50  # 只处理前50页

继续处理剩余页面

# 如果之前处理到第50页中断,可以继续处理
python main_v2.py \
  -i large_report.pdf \
  -c config.yaml \
  --streaming \
  -p 51-  # 从第51页到最后

🔄 迁移指南

从批量模式迁移到流式模式

  1. 添加 --streaming 参数

    # 之前
    python main_v2.py -i doc.pdf -c config.yaml
       
    # 之后
    python main_v2.py -i doc.pdf -c config.yaml --streaming
    
  2. 代码修改

    # 之前
    from core.pipeline_manager_v2 import EnhancedDocPipeline
    pipeline = EnhancedDocPipeline(config_path)
    results = pipeline.process_document(document_path)
       
    # 之后
    from core.pipeline_manager_v2_streaming import StreamingDocPipeline
    pipeline = StreamingDocPipeline(config_path, output_dir)
    results = pipeline.process_document_streaming(
       document_path,
       output_config=output_config
    )
    
  3. 输出格式保持一致

    • 流式模式和批量模式的输出格式完全相同
    • 可以直接替换使用,无需修改后续处理代码

📚 相关文档