|
|
vor 1 Monat | |
|---|---|---|
| .. | ||
| README.md | vor 1 Monat | |
| api_server.py | vor 1 Monat | |
| client_example.py | vor 1 Monat | |
| litserve_worker.py | vor 1 Monat | |
| requirements.txt | vor 1 Monat | |
| start_all.py | vor 1 Monat | |
| task_db.py | vor 1 Monat | |
| task_scheduler.py | vor 1 Monat | |
天枢 - 企业级多GPU文档解析服务
结合 SQLite 任务队列 + LitServe GPU负载均衡的最佳方案
客户端请求 → FastAPI Server (立即返回 task_id)
↓
SQLite 任务队列 (并发安全)
↓
LitServe Worker Pool (主动拉取 + GPU自动负载均衡)
↓
MinerU / MarkItDown 解析
↓
Task Scheduler (可选监控组件)
架构特点:
cd projects/mineru_tianshu
pip install -r requirements.txt
支持的文件格式:
- 📄 PDF 和图片 (.pdf, .png, .jpg, .jpeg, .bmp, .tiff, .webp) - 使用 MinerU 解析(GPU 加速)
- 📊 其他所有格式 (Office、HTML、文本等) - 使用 MarkItDown 解析(快速处理)
- Office: .docx, .doc, .xlsx, .xls, .pptx, .ppt
- 网页: .html, .htm
- 文本: .txt, .md, .csv, .json, .xml 等
# 一键启动所有服务(推荐)
python start_all.py
# 或自定义配置
python start_all.py --workers-per-device 2 --devices 0,1
Windows 用户注意: 项目已针对 Windows 的 multiprocessing 进行优化,可直接运行。
方式A: 浏览器访问 API 文档
http://localhost:8000/docs
方式B: Python 客户端
python client_example.py
方式C: cURL 命令
# 提交任务
curl -X POST http://localhost:8000/api/v1/tasks/submit \
-F "file=@document.pdf" \
-F "lang=ch"
# 查询状态(任务完成后自动返回解析内容)
curl http://localhost:8000/api/v1/tasks/{task_id}
# 查询状态并上传图片到MinIO
curl http://localhost:8000/api/v1/tasks/{task_id}?upload_images=true
mineru_tianshu/
├── task_db.py # 数据库管理 (并发安全,支持清理)
├── api_server.py # API 服务器 (自动返回内容)
├── litserve_worker.py # Worker Pool (主动拉取 + 双解析器)
├── task_scheduler.py # 任务调度器 (可选监控)
├── start_all.py # 启动脚本
├── client_example.py # 客户端示例
└── requirements.txt # 依赖配置
核心组件说明:
task_db.py: 使用原子操作保证并发安全,支持旧任务清理api_server.py: 查询接口自动返回Markdown内容,支持MinIO图片上传litserve_worker.py: Worker主动循环拉取任务,支持MinerU和MarkItDown双解析task_scheduler.py: 可选组件,仅用于监控和健康检查(默认5分钟监控,15分钟健康检查)import requests
import time
# 提交文档
with open('document.pdf', 'rb') as f:
response = requests.post(
'http://localhost:8000/api/v1/tasks/submit',
files={'file': f},
data={'lang': 'ch', 'priority': 0}
)
task_id = response.json()['task_id']
print(f"✅ 任务已提交: {task_id}")
# 轮询等待完成
while True:
response = requests.get(f'http://localhost:8000/api/v1/tasks/{task_id}')
result = response.json()
if result['status'] == 'completed':
# v2.0 新特性: 任务完成后自动返回解析内容
if result.get('data'):
content = result['data']['content']
print(f"✅ 解析完成,内容长度: {len(content)} 字符")
print(f" 解析方法: {result['data'].get('parser', 'Unknown')}")
# 保存结果
with open('output.md', 'w', encoding='utf-8') as f:
f.write(content)
else:
# 结果文件已被清理
print(f"⚠️ 任务完成但结果文件已清理: {result.get('message', '')}")
break
elif result['status'] == 'failed':
print(f"❌ 失败: {result['error_message']}")
break
print(f"⏳ 处理中... 状态: {result['status']}")
time.sleep(2)
import requests
task_id = "your-task-id"
# v2.0: 查询时自动返回内容,同时可选上传图片到 MinIO
response = requests.get(
f'http://localhost:8000/api/v1/tasks/{task_id}',
params={'upload_images': True} # 启用图片上传
)
result = response.json()
if result['status'] == 'completed' and result.get('data'):
# 图片已替换为 MinIO URL (HTML img 标签格式)
content = result['data']['content']
images_uploaded = result['data']['images_uploaded']
print(f"✅ 图片已上传到 MinIO: {images_uploaded}")
print(f" 内容长度: {len(content)} 字符")
# 保存包含 MinIO 图片链接的 Markdown
with open('output_with_cloud_images.md', 'w', encoding='utf-8') as f:
f.write(content)
import requests
import concurrent.futures
files = ['doc1.pdf', 'report.docx', 'data.xlsx']
def process_file(file_path):
# 提交任务
with open(file_path, 'rb') as f:
response = requests.post(
'http://localhost:8000/api/v1/tasks/submit',
files={'file': f}
)
return response.json()['task_id']
# 并发提交
with concurrent.futures.ThreadPoolExecutor() as executor:
task_ids = list(executor.map(process_file, files))
print(f"✅ 已提交 {len(task_ids)} 个任务")
# 运行完整示例
python client_example.py
# 运行特定示例
python client_example.py single # 单任务
python client_example.py batch # 批量任务
python client_example.py priority # 优先级队列
python start_all.py [选项]
选项:
--output-dir PATH 输出目录 (默认: /tmp/mineru_tianshu_output)
--api-port PORT API端口 (默认: 8000)
--worker-port PORT Worker端口 (默认: 9000)
--accelerator TYPE 加速器类型: auto/cuda/cpu/mps (默认: auto)
--workers-per-device N 每个GPU的worker数 (默认: 1)
--devices DEVICES 使用的GPU设备 (默认: auto,使用所有GPU)
--poll-interval SECONDS Worker拉取任务间隔 (默认: 0.5秒)
--enable-scheduler 启用可选的任务调度器 (默认: 不启动)
--monitor-interval SECONDS 调度器监控间隔 (默认: 300秒=5分钟)
--cleanup-old-files-days N 清理N天前的结果文件 (默认: 7天, 0=禁用)
新增功能说明:
--poll-interval: Worker空闲时拉取任务的频率,默认0.5秒响应极快--enable-scheduler: 是否启动调度器(可选),仅用于监控和健康检查--monitor-interval: 调度器日志输出频率,建议5-10分钟避免刷屏--cleanup-old-files-days: 自动清理旧结果文件但保留数据库记录# 基础启动(推荐)
python start_all.py
# CPU模式(无GPU或测试)
python start_all.py --accelerator cpu
# GPU模式: 24GB显卡,每卡2个worker
python start_all.py --accelerator cuda --workers-per-device 2
# 指定GPU: 只使用GPU 0和1
python start_all.py --accelerator cuda --devices 0,1
# 启用监控调度器(可选)
python start_all.py --enable-scheduler --monitor-interval 300
# 调整Worker拉取频率(高负载场景)
python start_all.py --poll-interval 1.0
# 禁用旧文件清理(保留所有结果)
python start_all.py --cleanup-old-files-days 0
# 完整配置示例
python start_all.py \
--accelerator cuda \
--devices 0,1 \
--workers-per-device 2 \
--poll-interval 0.5 \
--enable-scheduler \
--monitor-interval 300 \
--cleanup-old-files-days 7
# Mac M系列芯片
python start_all.py --accelerator mps
如需使用图片上传到 MinIO 功能:
export MINIO_ENDPOINT="your-endpoint.com"
export MINIO_ACCESS_KEY="your-access-key"
export MINIO_SECRET_KEY="your-secret-key"
export MINIO_BUCKET="your-bucket"
| 后端 | 显存要求 | 推荐配置 |
|---|---|---|
| pipeline | 6GB+ | RTX 2060 以上 |
| vlm-transformers | 8GB+ | RTX 3060 以上 |
| vlm-vllm-engine | 8GB+ | RTX 4070 以上 |
POST /api/v1/tasks/submit
参数:
file: 文件 (必需)
backend: pipeline | vlm-transformers | vlm-vllm-engine (默认: pipeline)
lang: ch | en | korean | japan | ... (默认: ch)
priority: 0-100 (数字越大越优先,默认: 0)
GET /api/v1/tasks/{task_id}?upload_images=false
参数:
upload_images: 是否上传图片到 MinIO (默认: false)
返回:
- status: pending | processing | completed | failed
- data: 任务完成后**自动返回** Markdown 内容
- markdown_file: 文件名
- content: 完整的 Markdown 内容
- images_uploaded: 是否已上传图片
- has_images: 是否包含图片
- message: 如果结果文件已清理会提示
注意:
- v2.0 新特性: 完成的任务会自动返回内容,无需额外请求
- 如果结果文件已被清理(超过保留期),data 为 null 但任务记录仍可查询
GET /api/v1/queue/stats
返回: 各状态任务数量统计
DELETE /api/v1/tasks/{task_id}
只能取消 pending 状态的任务
重置超时任务
POST /api/v1/admin/reset-stale?timeout_minutes=60
将超时的 processing 任务重置为 pending
清理旧任务
POST /api/v1/admin/cleanup?days=7
仅用于手动触发清理(自动清理会每24小时执行一次)
检查GPU
nvidia-smi # 应显示GPU信息
检查依赖
pip list | grep -E "(mineru|litserve|torch)"
⚠️ 重要: Worker 现在是主动拉取模式,不需要调度器触发!
检查 Worker 是否运行
# Windows
tasklist | findstr python
# Linux/Mac
ps aux | grep litserve_worker
检查 Worker 健康状态
curl -X POST http://localhost:9000/predict \
-H "Content-Type: application/json" \
-d '{"action":"health"}'
查看数据库状态
python -c "from task_db import TaskDB; db = TaskDB(); print(db.get_queue_stats())"
减少worker数量
python start_all.py --workers-per-device 1
设置显存限制
export MINERU_VIRTUAL_VRAM_SIZE=6
python start_all.py
指定特定GPU
# 只使用GPU 0
python start_all.py --devices 0
💡 提示: 新版本已修复多卡显存占用问题,通过设置
CUDA_VISIBLE_DEVICES确保每个进程只使用分配的GPU
查看占用
# Windows
netstat -ano | findstr :8000
# Linux/Mac
lsof -i :8000
使用其他端口
python start_all.py --api-port 8080 --worker-port 9090
查询任务状态
curl http://localhost:8000/api/v1/tasks/{task_id}
说明: 如果返回 result files have been cleaned up,说明结果文件已被清理(默认7天后)
解决方案:
# 延长保留时间为30天
python start_all.py --cleanup-old-files-days 30
# 或禁用自动清理
python start_all.py --cleanup-old-files-days 0
症状: 同一个任务被多个 worker 处理
原因: 这不应该发生,数据库使用了原子操作防止重复
排查:
# 检查是否有多个 TaskDB 实例连接不同的数据库文件
# 确保所有组件使用同一个 mineru_tianshu.db
1. Worker 主动拉取模式
2. 数据库并发安全增强
BEGIN IMMEDIATE 和原子操作3. 调度器变为可选
4. 结果文件清理功能
5. API 自动返回内容
6. 多GPU显存优化
CUDA_VISIBLE_DEVICES 隔离无需修改代码,只需注意:
--cleanup-old-files-days 0data 字段包含完整内容| 指标 | v1.x | v2.0 | 提升 |
|---|---|---|---|
| 任务响应延迟※ | 5-10秒 (调度器轮询) | 0.5秒 (Worker主动拉取) | 10-20倍 |
| 并发安全性 | 基础锁机制 | 原子操作 + 状态检查 | 可靠性提升 |
| 多GPU效率 | 有时会出现显存冲突 | 完全隔离,无冲突 | 稳定性提升 |
| 系统开销 | 调度器持续运行 | 可选监控(5分钟) | 资源节省 |
※ 任务响应延迟指任务添加到被 Worker 开始处理的时间间隔。v1.x 主要受调度器轮询间隔影响,非测量端到端处理时间。实际端到端响应时间还包括任务类型和系统负载所有因子。
mineru[core]>=2.5.0 # MinerU 核心
fastapi>=0.115.0 # Web 框架
litserve>=0.2.0 # GPU 负载均衡
markitdown>=0.1.3 # Office 文档解析
minio>=7.2.0 # MinIO 对象存储
欢迎提交 Issue 和 Pull Request!
遵循 MinerU 主项目许可证
天枢 (Tianshu) - 企业级多 GPU 文档解析服务 ⚡️
北斗第一星,寓意核心调度能力