Selaa lähdekoodia

Merge pull request #3742 from magicyuan876/master

feat: MinerU Tianshu 项目 - 开箱即用的多GPU文档解析服务
Xiaomeng Zhao 1 kuukausi sitten
vanhempi
commit
504fe6ada3

+ 380 - 0
projects/mineru_tianshu/README.md

@@ -0,0 +1,380 @@
+# MinerU Tianshu (天枢)
+
+> 天枢 - 企业级多GPU文档解析服务  
+> 结合 SQLite 任务队列 + LitServe GPU负载均衡的最佳方案
+
+## 🌟 核心特性
+
+- ✅ **异步处理** - 客户端立即响应(~100ms),无需等待处理完成
+- ✅ **任务持久化** - SQLite 存储,服务重启任务不丢失
+- ✅ **GPU 负载均衡** - LitServe 自动调度,资源利用最优
+- ✅ **优先级队列** - 重要任务优先处理
+- ✅ **实时查询** - 随时查看任务进度和状态
+- ✅ **RESTful API** - 支持任何编程语言接入
+- ✅ **智能解析器** - PDF/图片用 MinerU,其他所有格式用 MarkItDown
+- ✅ **内容获取** - 获取解析后的 Markdown 内容,支持图片上传到 MinIO
+
+## 🏗️ 系统架构
+
+```
+客户端请求 → FastAPI Server (立即返回 task_id)
+                    ↓
+              SQLite 任务队列
+                    ↓
+            Task Scheduler (调度器)
+                    ↓
+         LitServe Worker Pool (GPU自动负载均衡)
+                    ↓
+              MinerU 核心处理
+```
+
+## 🚀 快速开始
+
+### 1. 安装依赖
+
+```bash
+cd projects/mineru_tianshu
+pip install -r requirements.txt
+```
+
+> **支持的文件格式**:
+> - 📄 **PDF 和图片** (.pdf, .png, .jpg, .jpeg, .bmp, .tiff, .webp) - 使用 MinerU 解析(GPU 加速)
+> - 📊 **其他所有格式** (Office、HTML、文本等) - 使用 MarkItDown 解析(快速处理)
+>   - Office: .docx, .doc, .xlsx, .xls, .pptx, .ppt
+>   - 网页: .html, .htm
+>   - 文本: .txt, .md, .csv, .json, .xml 等
+
+### 2. 启动服务
+
+```bash
+# 一键启动所有服务(推荐)
+python start_all.py
+
+# 或自定义配置
+python start_all.py --workers-per-device 2 --devices 0,1
+```
+
+> **Windows 用户注意**: 项目已针对 Windows 的 multiprocessing 进行优化,可直接运行。
+
+### 3. 使用 API
+
+**方式A: 浏览器访问 API 文档**
+```
+http://localhost:8000/docs
+```
+
+**方式B: Python 客户端**
+```python
+python client_example.py
+```
+
+**方式C: cURL 命令**
+```bash
+# 提交任务
+curl -X POST http://localhost:8000/api/v1/tasks/submit \
+  -F "file=@document.pdf" \
+  -F "lang=ch"
+
+# 查询状态(任务完成后自动返回解析内容)
+curl http://localhost:8000/api/v1/tasks/{task_id}
+
+# 查询状态并上传图片到MinIO
+curl http://localhost:8000/api/v1/tasks/{task_id}?upload_images=true
+```
+
+## 📁 项目结构
+
+```
+mineru_tianshu/
+├── task_db.py              # 数据库管理
+├── api_server.py           # API 服务器
+├── litserve_worker.py      # Worker Pool (MinerU + MarkItDown)
+├── task_scheduler.py       # 任务调度器
+├── start_all.py            # 启动脚本
+├── client_example.py       # 客户端示例
+└── requirements.txt        # 依赖配置
+```
+
+## 📚 使用示例
+
+### 示例 1: 提交任务并等待结果
+
+```python
+import requests
+import time
+
+# 提交文档
+with open('document.pdf', 'rb') as f:
+    response = requests.post(
+        'http://localhost:8000/api/v1/tasks/submit',
+        files={'file': f},
+        data={'lang': 'ch', 'priority': 0}
+    )
+    task_id = response.json()['task_id']
+    print(f"✅ 任务已提交: {task_id}")
+
+# 轮询等待完成
+while True:
+    response = requests.get(f'http://localhost:8000/api/v1/tasks/{task_id}')
+    result = response.json()
+    
+    if result['status'] == 'completed':
+        # 任务完成,自动返回解析内容
+        if result.get('data'):
+            content = result['data']['content']
+            print(f"✅ 解析完成,内容长度: {len(content)} 字符")
+            
+            # 保存结果
+            with open('output.md', 'w', encoding='utf-8') as f:
+                f.write(content)
+        break
+    elif result['status'] == 'failed':
+        print(f"❌ 失败: {result['error_message']}")
+        break
+    
+    print(f"⏳ 处理中... 状态: {result['status']}")
+    time.sleep(2)
+```
+
+### 示例 2: 图片上传到 MinIO
+
+```python
+import requests
+
+task_id = "your-task-id"
+
+# 查询状态并上传图片到 MinIO
+response = requests.get(
+    f'http://localhost:8000/api/v1/tasks/{task_id}',
+    params={'upload_images': True}
+)
+
+result = response.json()
+if result['status'] == 'completed' and result.get('data'):
+    # 图片已替换为 MinIO URL
+    content = result['data']['content']
+    print(f"✅ 图片已上传: {result['data']['images_uploaded']}")
+    
+    with open('output_with_cloud_images.md', 'w', encoding='utf-8') as f:
+        f.write(content)
+```
+
+### 示例 3: 批量处理
+
+```python
+import requests
+import concurrent.futures
+
+files = ['doc1.pdf', 'report.docx', 'data.xlsx']
+
+def process_file(file_path):
+    # 提交任务
+    with open(file_path, 'rb') as f:
+        response = requests.post(
+            'http://localhost:8000/api/v1/tasks/submit',
+            files={'file': f}
+        )
+    return response.json()['task_id']
+
+# 并发提交
+with concurrent.futures.ThreadPoolExecutor() as executor:
+    task_ids = list(executor.map(process_file, files))
+    print(f"✅ 已提交 {len(task_ids)} 个任务")
+```
+
+### 示例 4: 使用内置客户端
+
+```bash
+# 运行完整示例
+python client_example.py
+
+# 运行特定示例
+python client_example.py single   # 单任务
+python client_example.py batch    # 批量任务
+python client_example.py priority # 优先级队列
+```
+
+## ⚙️ 配置说明
+
+### 启动参数
+
+```bash
+python start_all.py [选项]
+
+选项:
+  --output-dir PATH         输出目录 (默认: /tmp/mineru_tianshu_output)
+  --api-port PORT          API端口 (默认: 8000)
+  --worker-port PORT       Worker端口 (默认: 9000)
+  --accelerator TYPE       加速器类型: auto/cuda/cpu/mps (默认: auto)
+  --workers-per-device N   每个GPU的worker数 (默认: 1)
+  --devices DEVICES        使用的GPU设备 (默认: auto,使用所有GPU)
+```
+
+### 配置示例
+
+```bash
+# CPU模式(无GPU或测试)
+python start_all.py --accelerator cpu
+
+# GPU模式: 24GB显卡,每卡2个worker
+python start_all.py --accelerator cuda --workers-per-device 2
+
+# 指定GPU: 只使用GPU 0和1
+python start_all.py --accelerator cuda --devices 0,1
+
+# 自定义端口
+python start_all.py --api-port 8080 --worker-port 9090
+
+# Mac M系列芯片
+python start_all.py --accelerator mps
+```
+
+### MinIO 配置(可选)
+
+如需使用图片上传到 MinIO 功能:
+
+```bash
+export MINIO_ENDPOINT="your-endpoint.com"
+export MINIO_ACCESS_KEY="your-access-key"
+export MINIO_SECRET_KEY="your-secret-key"
+export MINIO_BUCKET="your-bucket"
+```
+
+### 硬件要求
+
+| 后端 | 显存要求 | 推荐配置 |
+|------|---------|---------|
+| pipeline | 6GB+ | RTX 2060 以上 |
+| vlm-transformers | 8GB+ | RTX 3060 以上 |
+| vlm-vllm-engine | 8GB+ | RTX 4070 以上 |
+
+## 📡 API 接口
+
+> 完整文档: http://localhost:8000/docs
+
+### 1. 提交任务
+```http
+POST /api/v1/tasks/submit
+
+参数:
+  file: 文件 (必需)
+  backend: pipeline | vlm-transformers | vlm-vllm-engine (默认: pipeline)
+  lang: ch | en | korean | japan | ... (默认: ch)
+  priority: 0-100 (数字越大越优先,默认: 0)
+```
+
+### 2. 查询任务
+```http
+GET /api/v1/tasks/{task_id}?upload_images=false
+
+参数:
+  upload_images: 是否上传图片到 MinIO (默认: false)
+
+返回:
+  - status: pending | processing | completed | failed
+  - data: 任务完成后返回 Markdown 内容
+```
+
+### 3. 队列统计
+```http
+GET /api/v1/queue/stats
+
+返回: 各状态任务数量统计
+```
+
+### 4. 取消任务
+```http
+DELETE /api/v1/tasks/{task_id}
+
+只能取消 pending 状态的任务
+```
+
+## 🔧 故障排查
+
+### 问题1: Worker 无法启动
+
+**检查GPU**
+```bash
+nvidia-smi  # 应显示GPU信息
+```
+
+**检查依赖**
+```bash
+pip list | grep -E "(mineru|litserve|torch)"
+```
+
+### 问题2: 任务一直 pending
+
+**检查调度器**
+```bash
+ps aux | grep task_scheduler.py
+```
+
+**手动触发**
+```bash
+curl -X POST http://localhost:9000/predict \
+  -H "Content-Type: application/json" \
+  -d '{"action":"poll"}'
+```
+
+### 问题3: 显存不足
+
+**减少worker数量**
+```bash
+python start_all.py --workers-per-device 1
+```
+
+**设置显存限制**
+```bash
+export MINERU_VIRTUAL_VRAM_SIZE=6
+python start_all.py
+```
+
+### 问题4: 端口被占用
+
+**查看占用**
+```bash
+# Windows
+netstat -ano | findstr :8000
+
+# Linux/Mac
+lsof -i :8000
+```
+
+**使用其他端口**
+```bash
+python start_all.py --api-port 8080
+```
+
+## 🛠️ 技术栈
+
+- **Web**: FastAPI + Uvicorn
+- **解析器**: MinerU (PDF/图片) + MarkItDown (Office/文本)
+- **GPU 调度**: LitServe
+- **存储**: SQLite + MinIO (可选)
+- **日志**: Loguru
+
+## 📝 核心依赖
+
+```txt
+mineru[core]>=2.5.0      # MinerU 核心
+fastapi>=0.115.0         # Web 框架
+litserve>=0.2.0          # GPU 负载均衡
+markitdown>=0.1.3        # Office 文档解析
+minio>=7.2.0             # MinIO 对象存储
+```
+
+## 🤝 贡献
+
+欢迎提交 Issue 和 Pull Request!
+
+## 📄 许可证
+
+遵循 MinerU 主项目许可证
+
+---
+
+**天枢 (Tianshu)** - 企业级多 GPU 文档解析服务 ⚡️
+
+*北斗第一星,寓意核心调度能力*
+

+ 427 - 0
projects/mineru_tianshu/api_server.py

@@ -0,0 +1,427 @@
+"""
+MinerU Tianshu - API Server
+天枢API服务器
+
+提供RESTful API接口用于任务提交、查询和管理
+"""
+from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Query
+from fastapi.responses import JSONResponse
+from fastapi.middleware.cors import CORSMiddleware
+import tempfile
+from pathlib import Path
+from loguru import logger
+import uvicorn
+from typing import Optional
+from datetime import datetime
+import os
+import re
+import uuid
+from minio import Minio
+
+from task_db import TaskDB
+
+# 初始化 FastAPI 应用
+app = FastAPI(
+    title="MinerU Tianshu API",
+    description="天枢 - 企业级多GPU文档解析服务",
+    version="1.0.0"
+)
+
+# 添加 CORS 中间件
+app.add_middleware(
+    CORSMiddleware,
+    allow_origins=["*"],
+    allow_credentials=True,
+    allow_methods=["*"],
+    allow_headers=["*"],
+)
+
+# 初始化数据库
+db = TaskDB()
+
+# 配置输出目录
+OUTPUT_DIR = Path('/tmp/mineru_tianshu_output')
+OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
+
+# MinIO 配置
+MINIO_CONFIG = {
+    'endpoint': os.getenv('MINIO_ENDPOINT', ''),
+    'access_key': os.getenv('MINIO_ACCESS_KEY', ''),
+    'secret_key': os.getenv('MINIO_SECRET_KEY', ''),
+    'secure': True,
+    'bucket_name': os.getenv('MINIO_BUCKET', '')
+}
+
+
+def get_minio_client():
+    """获取MinIO客户端实例"""
+    return Minio(
+        MINIO_CONFIG['endpoint'],
+        access_key=MINIO_CONFIG['access_key'],
+        secret_key=MINIO_CONFIG['secret_key'],
+        secure=MINIO_CONFIG['secure']
+    )
+
+
+def process_markdown_images(md_content: str, image_dir: Path, upload_images: bool = False):
+    """
+    处理 Markdown 中的图片引用
+    
+    Args:
+        md_content: Markdown 内容
+        image_dir: 图片所在目录
+        upload_images: 是否上传图片到 MinIO 并替换链接
+        
+    Returns:
+        处理后的 Markdown 内容
+    """
+    if not upload_images:
+        return md_content
+    
+    try:
+        minio_client = get_minio_client()
+        bucket_name = MINIO_CONFIG['bucket_name']
+        minio_endpoint = MINIO_CONFIG['endpoint']
+        
+        # 查找所有 markdown 格式的图片
+        img_pattern = r'!\[([^\]]*)\]\(([^)]+)\)'
+        
+        def replace_image(match):
+            alt_text = match.group(1)
+            image_path = match.group(2)
+            
+            # 构建完整的本地图片路径
+            full_image_path = image_dir / Path(image_path).name
+            
+            if full_image_path.exists():
+                # 获取文件后缀
+                file_extension = full_image_path.suffix
+                # 生成 UUID 作为新文件名
+                new_filename = f"{uuid.uuid4()}{file_extension}"
+                
+                try:
+                    # 上传到 MinIO
+                    object_name = f"images/{new_filename}"
+                    minio_client.fput_object(bucket_name, object_name, str(full_image_path))
+                    
+                    # 生成 MinIO 访问 URL
+                    scheme = 'https' if MINIO_CONFIG['secure'] else 'http'
+                    minio_url = f"{scheme}://{minio_endpoint}/{bucket_name}/{object_name}"
+                    
+                    # 返回 HTML 格式的 img 标签
+                    return f'<img src="{minio_url}" alt="{alt_text}">'
+                except Exception as e:
+                    logger.error(f"Failed to upload image to MinIO: {e}")
+                    return match.group(0)  # 上传失败,保持原样
+            
+            return match.group(0)
+        
+        # 替换所有图片引用
+        new_content = re.sub(img_pattern, replace_image, md_content)
+        return new_content
+        
+    except Exception as e:
+        logger.error(f"Error processing markdown images: {e}")
+        return md_content  # 出错时返回原内容
+
+
+@app.get("/")
+async def root():
+    """API根路径"""
+    return {
+        "service": "MinerU Tianshu",
+        "version": "1.0.0",
+        "description": "天枢 - 企业级多GPU文档解析服务",
+        "docs": "/docs"
+    }
+
+
+@app.post("/api/v1/tasks/submit")
+async def submit_task(
+    file: UploadFile = File(..., description="文档文件: PDF/图片(MinerU解析) 或 Office/HTML/文本等(MarkItDown解析)"),
+    backend: str = Form('pipeline', description="处理后端: pipeline/vlm-transformers/vlm-vllm-engine"),
+    lang: str = Form('ch', description="语言: ch/en/korean/japan等"),
+    method: str = Form('auto', description="解析方法: auto/txt/ocr"),
+    formula_enable: bool = Form(True, description="是否启用公式识别"),
+    table_enable: bool = Form(True, description="是否启用表格识别"),
+    priority: int = Form(0, description="优先级,数字越大越优先"),
+):
+    """
+    提交文档解析任务
+    
+    立即返回 task_id,任务在后台异步处理
+    """
+    try:
+        # 保存上传的文件到临时目录
+        temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix)
+        
+        # 流式写入文件到磁盘,避免高内存使用
+        while True:
+            chunk = await file.read(1 << 23)  # 8MB chunks
+            if not chunk:
+                break
+            temp_file.write(chunk)
+        
+        temp_file.close()
+        
+        # 创建任务
+        task_id = db.create_task(
+            file_name=file.filename,
+            file_path=temp_file.name,
+            backend=backend,
+            options={
+                'lang': lang,
+                'method': method,
+                'formula_enable': formula_enable,
+                'table_enable': table_enable,
+            },
+            priority=priority
+        )
+        
+        logger.info(f"✅ Task submitted: {task_id} - {file.filename} (priority: {priority})")
+        
+        return {
+            'success': True,
+            'task_id': task_id,
+            'status': 'pending',
+            'message': 'Task submitted successfully',
+            'file_name': file.filename,
+            'created_at': datetime.now().isoformat()
+        }
+    
+    except Exception as e:
+        logger.error(f"❌ Failed to submit task: {e}")
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+@app.get("/api/v1/tasks/{task_id}")
+async def get_task_status(
+    task_id: str,
+    upload_images: bool = Query(False, description="是否上传图片到MinIO并替换链接(仅当任务完成时有效)")
+):
+    """
+    查询任务状态和详情
+    
+    当任务完成时,会自动返回解析后的 Markdown 内容(data 字段)
+    可选择是否上传图片到 MinIO 并替换为 URL
+    """
+    task = db.get_task(task_id)
+    
+    if not task:
+        raise HTTPException(status_code=404, detail="Task not found")
+    
+    response = {
+        'success': True,
+        'task_id': task_id,
+        'status': task['status'],
+        'file_name': task['file_name'],
+        'backend': task['backend'],
+        'priority': task['priority'],
+        'error_message': task['error_message'],
+        'created_at': task['created_at'],
+        'started_at': task['started_at'],
+        'completed_at': task['completed_at'],
+        'worker_id': task['worker_id'],
+        'retry_count': task['retry_count']
+    }
+    logger.info(f"✅ Task status: {task['status']} - (result_path: {task['result_path']})")
+    
+    # 如果任务已完成,自动返回解析内容
+    if task['status'] == 'completed' and task['result_path']:
+        result_dir = Path(task['result_path'])
+        logger.info(f"📂 Checking result directory: {result_dir}")
+        
+        if result_dir.exists():
+            logger.info(f"✅ Result directory exists")
+            # 递归查找 Markdown 文件(MinerU 输出结构:task_id/filename/auto/*.md)
+            md_files = list(result_dir.rglob('*.md'))
+            logger.info(f"📄 Found {len(md_files)} markdown files: {[f.relative_to(result_dir) for f in md_files]}")
+            
+            if md_files:
+                try:
+                    # 读取 Markdown 内容
+                    md_file = md_files[0]
+                    logger.info(f"📖 Reading markdown file: {md_file}")
+                    with open(md_file, 'r', encoding='utf-8') as f:
+                        md_content = f.read()
+                    
+                    logger.info(f"✅ Markdown content loaded, length: {len(md_content)} characters")
+                    
+                    # 查找图片目录(在 markdown 文件的同级目录下)
+                    image_dir = md_file.parent / 'images'
+                    
+                    # 处理图片(如果需要)
+                    if upload_images and image_dir.exists():
+                        logger.info(f"🖼️  Processing images for task {task_id}, upload_images={upload_images}")
+                        md_content = process_markdown_images(md_content, image_dir, upload_images)
+                    
+                    # 添加 data 字段
+                    response['data'] = {
+                        'markdown_file': md_file.name,
+                        'content': md_content,
+                        'images_uploaded': upload_images,
+                        'has_images': image_dir.exists() if not upload_images else None
+                    }
+                    logger.info(f"✅ Response data field added successfully")
+                    
+                except Exception as e:
+                    logger.error(f"❌ Failed to read markdown content: {e}")
+                    logger.exception(e)
+                    # 读取失败不影响状态查询,只是不返回 data
+                    response['data'] = None
+            else:
+                logger.warning(f"⚠️  No markdown files found in {result_dir}")
+        else:
+            logger.error(f"❌ Result directory does not exist: {result_dir}")
+    elif task['status'] == 'completed':
+        logger.warning(f"⚠️  Task completed but result_path is empty")
+    else:
+        logger.info(f"ℹ️  Task status is {task['status']}, skipping content loading")
+    
+    return response
+
+
+@app.delete("/api/v1/tasks/{task_id}")
+async def cancel_task(task_id: str):
+    """
+    取消任务(仅限 pending 状态)
+    """
+    task = db.get_task(task_id)
+    
+    if not task:
+        raise HTTPException(status_code=404, detail="Task not found")
+    
+    if task['status'] == 'pending':
+        db.update_task_status(task_id, 'cancelled')
+        
+        # 删除临时文件
+        file_path = Path(task['file_path'])
+        if file_path.exists():
+            file_path.unlink()
+        
+        logger.info(f"⏹️  Task cancelled: {task_id}")
+        return {
+            'success': True,
+            'message': 'Task cancelled successfully'
+        }
+    else:
+        raise HTTPException(
+            status_code=400, 
+            detail=f"Cannot cancel task in {task['status']} status"
+        )
+
+
+@app.get("/api/v1/queue/stats")
+async def get_queue_stats():
+    """
+    获取队列统计信息
+    """
+    stats = db.get_queue_stats()
+    
+    return {
+        'success': True,
+        'stats': stats,
+        'total': sum(stats.values()),
+        'timestamp': datetime.now().isoformat()
+    }
+
+
+@app.get("/api/v1/queue/tasks")
+async def list_tasks(
+    status: Optional[str] = Query(None, description="筛选状态: pending/processing/completed/failed"),
+    limit: int = Query(100, description="返回数量限制", le=1000)
+):
+    """
+    获取任务列表
+    """
+    if status:
+        tasks = db.get_tasks_by_status(status, limit)
+    else:
+        # 返回所有任务(需要修改 TaskDB 添加这个方法)
+        with db.get_cursor() as cursor:
+            cursor.execute('''
+                SELECT * FROM tasks 
+                ORDER BY created_at DESC 
+                LIMIT ?
+            ''', (limit,))
+            tasks = [dict(row) for row in cursor.fetchall()]
+    
+    return {
+        'success': True,
+        'count': len(tasks),
+        'tasks': tasks
+    }
+
+
+@app.post("/api/v1/admin/cleanup")
+async def cleanup_old_tasks(days: int = Query(7, description="清理N天前的任务")):
+    """
+    清理旧任务记录(管理接口)
+    """
+    deleted_count = db.cleanup_old_tasks(days)
+    
+    logger.info(f"🧹 Cleaned up {deleted_count} old tasks")
+    
+    return {
+        'success': True,
+        'deleted_count': deleted_count,
+        'message': f'Cleaned up tasks older than {days} days'
+    }
+
+
+@app.post("/api/v1/admin/reset-stale")
+async def reset_stale_tasks(timeout_minutes: int = Query(60, description="超时时间(分钟)")):
+    """
+    重置超时的 processing 任务(管理接口)
+    """
+    reset_count = db.reset_stale_tasks(timeout_minutes)
+    
+    logger.info(f"🔄 Reset {reset_count} stale tasks")
+    
+    return {
+        'success': True,
+        'reset_count': reset_count,
+        'message': f'Reset tasks processing for more than {timeout_minutes} minutes'
+    }
+
+
+@app.get("/api/v1/health")
+async def health_check():
+    """
+    健康检查接口
+    """
+    try:
+        # 检查数据库连接
+        stats = db.get_queue_stats()
+        
+        return {
+            'status': 'healthy',
+            'timestamp': datetime.now().isoformat(),
+            'database': 'connected',
+            'queue_stats': stats
+        }
+    except Exception as e:
+        logger.error(f"Health check failed: {e}")
+        return JSONResponse(
+            status_code=503,
+            content={
+                'status': 'unhealthy',
+                'error': str(e)
+            }
+        )
+
+
+if __name__ == '__main__':
+    # 从环境变量读取端口,默认为8000
+    api_port = int(os.getenv('API_PORT', '8000'))
+    
+    logger.info("🚀 Starting MinerU Tianshu API Server...")
+    logger.info(f"📖 API Documentation: http://localhost:{api_port}/docs")
+    
+    uvicorn.run(
+        app, 
+        host='0.0.0.0', 
+        port=api_port,
+        log_level='info'
+    )
+

+ 318 - 0
projects/mineru_tianshu/client_example.py

@@ -0,0 +1,318 @@
+"""
+MinerU Tianshu - Client Example
+天枢客户端示例
+
+演示如何使用 Python 客户端提交任务和查询状态
+"""
+import asyncio
+import aiohttp
+from pathlib import Path
+from loguru import logger
+import time
+from typing import Dict
+
+
+class TianshuClient:
+    """天枢客户端"""
+    
+    def __init__(self, api_url='http://localhost:8000'):
+        self.api_url = api_url
+        self.base_url = f"{api_url}/api/v1"
+    
+    async def submit_task(
+        self,
+        session: aiohttp.ClientSession,
+        file_path: str,
+        backend: str = 'pipeline',
+        lang: str = 'ch',
+        method: str = 'auto',
+        formula_enable: bool = True,
+        table_enable: bool = True,
+        priority: int = 0
+    ) -> Dict:
+        """
+        提交任务
+        
+        Args:
+            session: aiohttp session
+            file_path: 文件路径
+            backend: 处理后端
+            lang: 语言
+            method: 解析方法
+            formula_enable: 是否启用公式识别
+            table_enable: 是否启用表格识别
+            priority: 优先级
+            
+        Returns:
+            响应字典,包含 task_id
+        """
+        with open(file_path, 'rb') as f:
+            data = aiohttp.FormData()
+            data.add_field('file', f, filename=Path(file_path).name)
+            data.add_field('backend', backend)
+            data.add_field('lang', lang)
+            data.add_field('method', method)
+            data.add_field('formula_enable', str(formula_enable).lower())
+            data.add_field('table_enable', str(table_enable).lower())
+            data.add_field('priority', str(priority))
+            
+            async with session.post(f'{self.base_url}/tasks/submit', data=data) as resp:
+                if resp.status == 200:
+                    result = await resp.json()
+                    logger.info(f"✅ Submitted: {file_path} -> Task ID: {result['task_id']}")
+                    return result
+                else:
+                    error = await resp.text()
+                    logger.error(f"❌ Failed to submit {file_path}: {error}")
+                    return {'success': False, 'error': error}
+    
+    async def get_task_status(self, session: aiohttp.ClientSession, task_id: str) -> Dict:
+        """
+        查询任务状态
+        
+        Args:
+            session: aiohttp session
+            task_id: 任务ID
+            
+        Returns:
+            任务状态字典
+        """
+        async with session.get(f'{self.base_url}/tasks/{task_id}') as resp:
+            if resp.status == 200:
+                return await resp.json()
+            else:
+                return {'success': False, 'error': 'Task not found'}
+    
+    async def wait_for_task(
+        self,
+        session: aiohttp.ClientSession,
+        task_id: str,
+        timeout: int = 600,
+        poll_interval: int = 2
+    ) -> Dict:
+        """
+        等待任务完成
+        
+        Args:
+            session: aiohttp session
+            task_id: 任务ID
+            timeout: 超时时间(秒)
+            poll_interval: 轮询间隔(秒)
+            
+        Returns:
+            最终任务状态
+        """
+        start_time = time.time()
+        
+        while True:
+            status = await self.get_task_status(session, task_id)
+            
+            if not status.get('success'):
+                logger.error(f"❌ Failed to get status for task {task_id}")
+                return status
+            
+            task_status = status.get('status')
+            
+            if task_status == 'completed':
+                logger.info(f"✅ Task {task_id} completed!")
+                logger.info(f"   Output: {status.get('result_path')}")
+                return status
+            
+            elif task_status == 'failed':
+                logger.error(f"❌ Task {task_id} failed!")
+                logger.error(f"   Error: {status.get('error_message')}")
+                return status
+            
+            elif task_status == 'cancelled':
+                logger.warning(f"⚠️  Task {task_id} was cancelled")
+                return status
+            
+            # 检查超时
+            if time.time() - start_time > timeout:
+                logger.error(f"⏱️  Task {task_id} timeout after {timeout}s")
+                return {'success': False, 'error': 'timeout'}
+            
+            # 等待后继续轮询
+            await asyncio.sleep(poll_interval)
+    
+    async def get_queue_stats(self, session: aiohttp.ClientSession) -> Dict:
+        """获取队列统计"""
+        async with session.get(f'{self.base_url}/queue/stats') as resp:
+            return await resp.json()
+    
+    async def cancel_task(self, session: aiohttp.ClientSession, task_id: str) -> Dict:
+        """取消任务"""
+        async with session.delete(f'{self.base_url}/tasks/{task_id}') as resp:
+            return await resp.json()
+
+
+async def example_single_task():
+    """示例1:提交单个任务并等待完成"""
+    logger.info("=" * 60)
+    logger.info("示例1:提交单个任务")
+    logger.info("=" * 60)
+    
+    client = TianshuClient()
+    
+    async with aiohttp.ClientSession() as session:
+        # 提交任务
+        result = await client.submit_task(
+            session,
+            file_path='../../demo/pdfs/demo1.pdf',
+            backend='pipeline',
+            lang='ch',
+            formula_enable=True,
+            table_enable=True
+        )
+        
+        if result.get('success'):
+            task_id = result['task_id']
+            
+            # 等待完成
+            logger.info(f"⏳ Waiting for task {task_id} to complete...")
+            final_status = await client.wait_for_task(session, task_id)
+            
+            return final_status
+
+
+async def example_batch_tasks():
+    """示例2:批量提交多个任务并并发等待"""
+    logger.info("=" * 60)
+    logger.info("示例2:批量提交多个任务")
+    logger.info("=" * 60)
+    
+    client = TianshuClient()
+    
+    # 准备任务列表
+    files = [
+        '../../demo/pdfs/demo1.pdf',
+        '../../demo/pdfs/demo2.pdf',
+        '../../demo/pdfs/demo3.pdf',
+    ]
+    
+    async with aiohttp.ClientSession() as session:
+        # 并发提交所有任务
+        logger.info(f"📤 Submitting {len(files)} tasks...")
+        submit_tasks = [
+            client.submit_task(session, file) 
+            for file in files
+        ]
+        results = await asyncio.gather(*submit_tasks)
+        
+        # 提取 task_ids
+        task_ids = [r['task_id'] for r in results if r.get('success')]
+        logger.info(f"✅ Submitted {len(task_ids)} tasks successfully")
+        
+        # 并发等待所有任务完成
+        logger.info(f"⏳ Waiting for all tasks to complete...")
+        wait_tasks = [
+            client.wait_for_task(session, task_id) 
+            for task_id in task_ids
+        ]
+        final_results = await asyncio.gather(*wait_tasks)
+        
+        # 统计结果
+        completed = sum(1 for r in final_results if r.get('status') == 'completed')
+        failed = sum(1 for r in final_results if r.get('status') == 'failed')
+        
+        logger.info("=" * 60)
+        logger.info(f"📊 Results: {completed} completed, {failed} failed")
+        logger.info("=" * 60)
+        
+        return final_results
+
+
+async def example_priority_tasks():
+    """示例3:使用优先级队列"""
+    logger.info("=" * 60)
+    logger.info("示例3:优先级队列")
+    logger.info("=" * 60)
+    
+    client = TianshuClient()
+    
+    async with aiohttp.ClientSession() as session:
+        # 提交低优先级任务
+        low_priority = await client.submit_task(
+            session,
+            file_path='../../demo/pdfs/demo1.pdf',
+            priority=0
+        )
+        logger.info(f"📝 Low priority task: {low_priority['task_id']}")
+        
+        # 提交高优先级任务
+        high_priority = await client.submit_task(
+            session,
+            file_path='../../demo/pdfs/demo2.pdf',
+            priority=10
+        )
+        logger.info(f"🔥 High priority task: {high_priority['task_id']}")
+        
+        # 高优先级任务会先被处理
+        logger.info("⏳ 高优先级任务将优先处理...")
+
+
+async def example_queue_monitoring():
+    """示例4:监控队列状态"""
+    logger.info("=" * 60)
+    logger.info("示例4:监控队列状态")
+    logger.info("=" * 60)
+    
+    client = TianshuClient()
+    
+    async with aiohttp.ClientSession() as session:
+        # 获取队列统计
+        stats = await client.get_queue_stats(session)
+        
+        logger.info("📊 Queue Statistics:")
+        logger.info(f"   Total: {stats.get('total', 0)}")
+        for status, count in stats.get('stats', {}).items():
+            logger.info(f"   {status:12s}: {count}")
+
+
+async def main():
+    """主函数"""
+    import sys
+    
+    if len(sys.argv) > 1:
+        example = sys.argv[1]
+    else:
+        example = 'all'
+    
+    try:
+        if example == 'single' or example == 'all':
+            await example_single_task()
+            print()
+        
+        if example == 'batch' or example == 'all':
+            await example_batch_tasks()
+            print()
+        
+        if example == 'priority' or example == 'all':
+            await example_priority_tasks()
+            print()
+        
+        if example == 'monitor' or example == 'all':
+            await example_queue_monitoring()
+            print()
+            
+    except Exception as e:
+        logger.error(f"Example failed: {e}")
+        import traceback
+        traceback.print_exc()
+
+
+if __name__ == '__main__':
+    """
+    使用方法:
+    
+    # 运行所有示例
+    python client_example.py
+    
+    # 运行特定示例
+    python client_example.py single
+    python client_example.py batch
+    python client_example.py priority
+    python client_example.py monitor
+    """
+    asyncio.run(main())
+

+ 363 - 0
projects/mineru_tianshu/litserve_worker.py

@@ -0,0 +1,363 @@
+"""
+MinerU Tianshu - LitServe Worker
+天枢 LitServe Worker
+
+使用 LitServe 实现 GPU 资源的自动负载均衡
+从 SQLite 队列拉取任务并处理
+"""
+import os
+import json
+import sys
+from pathlib import Path
+import litserve as ls
+from loguru import logger
+
+# 添加父目录到路径以导入 MinerU
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
+
+from task_db import TaskDB
+from mineru.cli.common import do_parse, read_fn
+from mineru.utils.config_reader import get_device
+from mineru.utils.model_utils import get_vram
+
+# 尝试导入 markitdown
+try:
+    from markitdown import MarkItDown
+    MARKITDOWN_AVAILABLE = True
+except ImportError:
+    MARKITDOWN_AVAILABLE = False
+    logger.warning("⚠️  markitdown not available, Office format parsing will be disabled")
+
+
+class MinerUWorkerAPI(ls.LitAPI):
+    """
+    LitServe API Worker
+    
+    从 SQLite 队列拉取任务,利用 LitServe 的自动 GPU 负载均衡
+    支持两种解析方式:
+    - PDF/图片 -> MinerU 解析(GPU 加速)
+    - 其他所有格式 -> MarkItDown 解析(快速处理)
+    """
+    
+    # 支持的文件格式定义
+    # MinerU 专用格式:PDF 和图片
+    PDF_IMAGE_FORMATS = {'.pdf', '.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif', '.webp'}
+    # 其他所有格式都使用 MarkItDown 解析
+    
+    def __init__(self, output_dir='/tmp/mineru_tianshu_output', worker_id_prefix='tianshu'):
+        super().__init__()
+        self.output_dir = Path(output_dir)
+        self.output_dir.mkdir(parents=True, exist_ok=True)
+        self.worker_id_prefix = worker_id_prefix
+        self.db = TaskDB()
+        self.worker_id = None
+        self.markitdown = None
+    
+    def setup(self, device):
+        """
+        初始化环境(每个 worker 进程调用一次)
+        
+        Args:
+            device: LitServe 分配的设备 (cuda:0, cuda:1, etc.)
+        """
+        # 生成唯一的 worker_id
+        import socket
+        hostname = socket.gethostname()
+        pid = os.getpid()
+        self.worker_id = f"{self.worker_id_prefix}-{hostname}-{device}-{pid}"
+        
+        logger.info(f"⚙️  Worker {self.worker_id} setting up on device: {device}")
+        
+        # 配置 MinerU 环境
+        if os.getenv('MINERU_DEVICE_MODE', None) is None:
+            os.environ['MINERU_DEVICE_MODE'] = device if device != 'auto' else get_device()
+        
+        device_mode = os.environ['MINERU_DEVICE_MODE']
+        
+        # 配置显存
+        if os.getenv('MINERU_VIRTUAL_VRAM_SIZE', None) is None:
+            if device_mode.startswith("cuda") or device_mode.startswith("npu"):
+                try:
+                    vram = round(get_vram(device_mode))
+                    os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = str(vram)
+                except:
+                    os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = '8'  # 默认值
+            else:
+                os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = '1'
+        
+        # 初始化 MarkItDown(如果可用)
+        if MARKITDOWN_AVAILABLE:
+            self.markitdown = MarkItDown()
+            logger.info(f"✅ MarkItDown initialized for Office format parsing")
+        
+        logger.info(f"✅ Worker {self.worker_id} ready")
+        logger.info(f"   Device: {device_mode}")
+        logger.info(f"   VRAM: {os.environ['MINERU_VIRTUAL_VRAM_SIZE']}GB")
+    
+    def decode_request(self, request):
+        """
+        解码请求
+        
+        接收一个 'poll' 信号来触发从数据库拉取任务
+        """
+        return request.get('action', 'poll')
+    
+    def _get_file_type(self, file_path: str) -> str:
+        """
+        判断文件类型
+        
+        Args:
+            file_path: 文件路径
+            
+        Returns:
+            'pdf_image': PDF 或图片格式,使用 MinerU 解析
+            'markitdown': 其他所有格式,使用 markitdown 解析
+        """
+        suffix = Path(file_path).suffix.lower()
+        
+        if suffix in self.PDF_IMAGE_FORMATS:
+            return 'pdf_image'
+        else:
+            # 所有非 PDF/图片格式都使用 markitdown
+            return 'markitdown'
+    
+    def _parse_with_mineru(self, file_path: Path, file_name: str, task_id: str, 
+                           backend: str, options: dict, output_path: Path):
+        """
+        使用 MinerU 解析 PDF 和图片格式
+        
+        Args:
+            file_path: 文件路径
+            file_name: 文件名
+            task_id: 任务ID
+            backend: 后端类型
+            options: 解析选项
+            output_path: 输出路径
+        """
+        logger.info(f"📄 Using MinerU to parse: {file_name}")
+        
+        # 读取文件
+        pdf_bytes = read_fn(file_path)
+        
+        # 执行解析
+        do_parse(
+            output_dir=str(output_path),
+            pdf_file_names=[Path(file_name).stem],
+            pdf_bytes_list=[pdf_bytes],
+            p_lang_list=[options.get('lang', 'ch')],
+            backend=backend,
+            parse_method=options.get('method', 'auto'),
+            formula_enable=options.get('formula_enable', True),
+            table_enable=options.get('table_enable', True),
+        )
+    
+    def _parse_with_markitdown(self, file_path: Path, file_name: str, 
+                               output_path: Path):
+        """
+        使用 markitdown 解析文档(支持 Office、HTML、文本等多种格式)
+        
+        Args:
+            file_path: 文件路径
+            file_name: 文件名
+            output_path: 输出路径
+        """
+        if not MARKITDOWN_AVAILABLE or self.markitdown is None:
+            raise RuntimeError("markitdown is not available. Please install it: pip install markitdown")
+        
+        logger.info(f"📊 Using MarkItDown to parse: {file_name}")
+        
+        # 使用 markitdown 转换文档
+        result = self.markitdown.convert(str(file_path))
+        
+        # 保存为 markdown 文件
+        output_file = output_path / f"{Path(file_name).stem}.md"
+        output_file.write_text(result.text_content, encoding='utf-8')
+        
+        logger.info(f"📝 Markdown saved to: {output_file}")
+    
+    def predict(self, action):
+        """
+        从数据库拉取任务并处理
+        
+        这里是实际的任务处理逻辑,LitServe 会自动管理 GPU 负载均衡
+        支持根据文件类型选择不同的解析器:
+        - PDF/图片 -> MinerU(GPU 加速)
+        - 其他所有格式 -> MarkItDown(快速处理)
+        """
+        if action != 'poll':
+            return {
+                'status': 'error', 
+                'message': 'Invalid action. Use {"action": "poll"} to trigger task processing.'
+            }
+        
+        # 从数据库获取任务
+        task = self.db.get_next_task(self.worker_id)
+        
+        if not task:
+            # 没有任务时返回空闲状态
+            return {
+                'status': 'idle', 
+                'message': 'No pending tasks in queue',
+                'worker_id': self.worker_id
+            }
+        
+        # 提取任务信息
+        task_id = task['task_id']
+        file_path = task['file_path']
+        file_name = task['file_name']
+        backend = task['backend']
+        options = json.loads(task['options'])
+        
+        logger.info(f"🔄 Worker {self.worker_id} processing task {task_id}: {file_name}")
+        
+        try:
+            # 准备输出目录
+            output_path = self.output_dir / task_id
+            output_path.mkdir(parents=True, exist_ok=True)
+            
+            # 判断文件类型并选择解析方式
+            file_type = self._get_file_type(file_path)
+            
+            if file_type == 'pdf_image':
+                # 使用 MinerU 解析 PDF 和图片
+                self._parse_with_mineru(
+                    file_path=Path(file_path),
+                    file_name=file_name,
+                    task_id=task_id,
+                    backend=backend,
+                    options=options,
+                    output_path=output_path
+                )
+                parse_method = 'MinerU'
+                
+            else:  # file_type == 'markitdown'
+                # 使用 markitdown 解析所有其他格式
+                self._parse_with_markitdown(
+                    file_path=Path(file_path),
+                    file_name=file_name,
+                    output_path=output_path
+                )
+                parse_method = 'MarkItDown'
+            
+            # 更新状态为成功
+            self.db.update_task_status(task_id, 'completed', str(output_path))
+            
+            logger.info(f"✅ Task {task_id} completed by {self.worker_id}")
+            logger.info(f"   Parser: {parse_method}")
+            logger.info(f"   Output: {output_path}")
+            
+            return {
+                'status': 'completed',
+                'task_id': task_id,
+                'file_name': file_name,
+                'parse_method': parse_method,
+                'file_type': file_type,
+                'output_path': str(output_path),
+                'worker_id': self.worker_id
+            }
+            
+        except Exception as e:
+            logger.error(f"❌ Task {task_id} failed: {e}")
+            self.db.update_task_status(task_id, 'failed', error_message=str(e))
+            
+            return {
+                'status': 'failed',
+                'task_id': task_id,
+                'error': str(e),
+                'worker_id': self.worker_id
+            }
+        
+        finally:
+            # 清理临时文件
+            try:
+                if Path(file_path).exists():
+                    Path(file_path).unlink()
+            except Exception as e:
+                logger.warning(f"Failed to clean up temp file {file_path}: {e}")
+    
+    def encode_response(self, response):
+        """编码响应"""
+        return response
+
+
+def start_litserve_workers(
+    output_dir='/tmp/mineru_tianshu_output',
+    accelerator='auto',
+    devices='auto',
+    workers_per_device=1,
+    port=9000
+):
+    """
+    启动 LitServe Worker Pool
+    
+    Args:
+        output_dir: 输出目录
+        accelerator: 加速器类型 (auto/cuda/cpu/mps)
+        devices: 使用的设备 (auto/[0,1,2])
+        workers_per_device: 每个 GPU 的 worker 数量
+        port: 服务端口
+    """
+    logger.info("=" * 60)
+    logger.info("🚀 Starting MinerU Tianshu LitServe Worker Pool")
+    logger.info("=" * 60)
+    logger.info(f"📂 Output Directory: {output_dir}")
+    logger.info(f"🎮 Accelerator: {accelerator}")
+    logger.info(f"💾 Devices: {devices}")
+    logger.info(f"👷 Workers per Device: {workers_per_device}")
+    logger.info(f"🔌 Port: {port}")
+    logger.info("=" * 60)
+    
+    # 创建 LitServe 服务器
+    api = MinerUWorkerAPI(output_dir=output_dir)
+    server = ls.LitServer(
+        api,
+        accelerator=accelerator,
+        devices=devices,
+        workers_per_device=workers_per_device,
+        timeout=False,  # 不设置超时
+    )
+    
+    logger.info(f"✅ LitServe worker pool initialized")
+    logger.info(f"📡 Listening on: http://0.0.0.0:{port}/predict")
+    logger.info(f"🔄 Workers will poll SQLite queue for tasks")
+    logger.info("=" * 60)
+    
+    # 启动服务器
+    server.run(port=port, generate_client_file=False)
+
+
+if __name__ == '__main__':
+    import argparse
+    
+    parser = argparse.ArgumentParser(description='MinerU Tianshu LitServe Worker Pool')
+    parser.add_argument('--output-dir', type=str, default='/tmp/mineru_tianshu_output',
+                       help='Output directory for processed files')
+    parser.add_argument('--accelerator', type=str, default='auto',
+                       choices=['auto', 'cuda', 'cpu', 'mps'],
+                       help='Accelerator type')
+    parser.add_argument('--devices', type=str, default='auto',
+                       help='Devices to use (auto or comma-separated list like 0,1,2)')
+    parser.add_argument('--workers-per-device', type=int, default=1,
+                       help='Number of workers per device')
+    parser.add_argument('--port', type=int, default=9000,
+                       help='Server port')
+    
+    args = parser.parse_args()
+    
+    # 处理 devices 参数
+    devices = args.devices
+    if devices != 'auto':
+        try:
+            devices = [int(d) for d in devices.split(',')]
+        except:
+            logger.warning(f"Invalid devices format: {devices}, using 'auto'")
+            devices = 'auto'
+    
+    start_litserve_workers(
+        output_dir=args.output_dir,
+        accelerator=args.accelerator,
+        devices=devices,
+        workers_per_device=args.workers_per_device,
+        port=args.port
+    )
+

+ 32 - 0
projects/mineru_tianshu/requirements.txt

@@ -0,0 +1,32 @@
+# MinerU Tianshu Requirements
+# 天枢项目依赖
+
+# Core MinerU
+mineru[core]>=2.5.0
+
+# Image Augmentation (Version Pinned for Compatibility)
+albumentations>=1.4.4,<2.0.0
+albucore>=0.0.13,<0.0.20
+
+# Web Framework
+fastapi>=0.115.0
+uvicorn[standard]>=0.32.0
+
+# LitServe for GPU Load Balancing
+litserve>=0.2.0
+
+# Async HTTP Client
+aiohttp>=3.11.0
+
+# Logging
+loguru>=0.7.0
+
+# Office Document Parsing
+markitdown>=0.1.3
+
+# MinIO Object Storage
+minio>=7.2.0
+
+# Optional: For better performance
+# ujson>=5.10.0
+

+ 256 - 0
projects/mineru_tianshu/start_all.py

@@ -0,0 +1,256 @@
+"""
+MinerU Tianshu - Unified Startup Script
+天枢统一启动脚本
+
+一键启动所有服务:API Server + LitServe Workers + Task Scheduler
+"""
+import subprocess
+import signal
+import sys
+import time
+import os
+from loguru import logger
+from pathlib import Path
+import argparse
+
+
+class TianshuLauncher:
+    """天枢服务启动器"""
+    
+    def __init__(
+        self,
+        output_dir='/tmp/mineru_tianshu_output',
+        api_port=8000,
+        worker_port=9000,
+        workers_per_device=1,
+        devices='auto',
+        accelerator='auto'
+    ):
+        self.output_dir = output_dir
+        self.api_port = api_port
+        self.worker_port = worker_port
+        self.workers_per_device = workers_per_device
+        self.devices = devices
+        self.accelerator = accelerator
+        self.processes = []
+    
+    def start_services(self):
+        """启动所有服务"""
+        logger.info("=" * 70)
+        logger.info("🚀 MinerU Tianshu - Starting All Services")
+        logger.info("=" * 70)
+        logger.info("天枢 - 企业级多GPU文档解析服务")
+        logger.info("")
+        
+        try:
+            # 1. 启动 API Server
+            logger.info("📡 [1/3] Starting API Server...")
+            env = os.environ.copy()
+            env['API_PORT'] = str(self.api_port)
+            api_proc = subprocess.Popen(
+                [sys.executable, 'api_server.py'],
+                cwd=Path(__file__).parent,
+                env=env
+            )
+            self.processes.append(('API Server', api_proc))
+            time.sleep(3)
+            
+            if api_proc.poll() is not None:
+                logger.error("❌ API Server failed to start!")
+                return False
+            
+            logger.info(f"   ✅ API Server started (PID: {api_proc.pid})")
+            logger.info(f"   📖 API Docs: http://localhost:{self.api_port}/docs")
+            logger.info("")
+            
+            # 2. 启动 LitServe Worker Pool
+            logger.info("⚙️  [2/3] Starting LitServe Worker Pool...")
+            worker_cmd = [
+                sys.executable, 'litserve_worker.py',
+                '--output-dir', self.output_dir,
+                '--accelerator', self.accelerator,
+                '--workers-per-device', str(self.workers_per_device),
+                '--port', str(self.worker_port),
+                '--devices', str(self.devices) if isinstance(self.devices, str) else ','.join(map(str, self.devices))
+            ]
+            
+            worker_proc = subprocess.Popen(
+                worker_cmd,
+                cwd=Path(__file__).parent
+            )
+            self.processes.append(('LitServe Workers', worker_proc))
+            time.sleep(5)
+            
+            if worker_proc.poll() is not None:
+                logger.error("❌ LitServe Workers failed to start!")
+                return False
+            
+            logger.info(f"   ✅ LitServe Workers started (PID: {worker_proc.pid})")
+            logger.info(f"   🔌 Worker Port: {self.worker_port}")
+            logger.info(f"   👷 Workers per Device: {self.workers_per_device}")
+            logger.info("")
+            
+            # 3. 启动 Task Scheduler
+            logger.info("🔄 [3/3] Starting Task Scheduler...")
+            scheduler_cmd = [
+                sys.executable, 'task_scheduler.py',
+                '--litserve-url', f'http://localhost:{self.worker_port}/predict',
+                '--wait-for-workers'
+            ]
+            
+            scheduler_proc = subprocess.Popen(
+                scheduler_cmd,
+                cwd=Path(__file__).parent
+            )
+            self.processes.append(('Task Scheduler', scheduler_proc))
+            time.sleep(3)
+            
+            if scheduler_proc.poll() is not None:
+                logger.error("❌ Task Scheduler failed to start!")
+                return False
+            
+            logger.info(f"   ✅ Task Scheduler started (PID: {scheduler_proc.pid})")
+            logger.info("")
+            
+            # 启动成功
+            logger.info("=" * 70)
+            logger.info("✅ All Services Started Successfully!")
+            logger.info("=" * 70)
+            logger.info("")
+            logger.info("📚 Quick Start:")
+            logger.info(f"   • API Documentation: http://localhost:{self.api_port}/docs")
+            logger.info(f"   • Submit Task:       POST http://localhost:{self.api_port}/api/v1/tasks/submit")
+            logger.info(f"   • Query Status:      GET  http://localhost:{self.api_port}/api/v1/tasks/{{task_id}}")
+            logger.info(f"   • Queue Stats:       GET  http://localhost:{self.api_port}/api/v1/queue/stats")
+            logger.info("")
+            logger.info("🔧 Service Details:")
+            for name, proc in self.processes:
+                logger.info(f"   • {name:20s} PID: {proc.pid}")
+            logger.info("")
+            logger.info("⚠️  Press Ctrl+C to stop all services")
+            logger.info("=" * 70)
+            
+            return True
+            
+        except Exception as e:
+            logger.error(f"❌ Failed to start services: {e}")
+            self.stop_services()
+            return False
+    
+    def stop_services(self, signum=None, frame=None):
+        """停止所有服务"""
+        logger.info("")
+        logger.info("=" * 70)
+        logger.info("⏹️  Stopping All Services...")
+        logger.info("=" * 70)
+        
+        for name, proc in self.processes:
+            if proc.poll() is None:  # 进程仍在运行
+                logger.info(f"   Stopping {name} (PID: {proc.pid})...")
+                proc.terminate()
+        
+        # 等待所有进程结束
+        for name, proc in self.processes:
+            try:
+                proc.wait(timeout=10)
+                logger.info(f"   ✅ {name} stopped")
+            except subprocess.TimeoutExpired:
+                logger.warning(f"   ⚠️  {name} did not stop gracefully, forcing...")
+                proc.kill()
+                proc.wait()
+        
+        logger.info("=" * 70)
+        logger.info("✅ All Services Stopped")
+        logger.info("=" * 70)
+        sys.exit(0)
+    
+    def wait(self):
+        """等待所有服务"""
+        try:
+            while True:
+                time.sleep(1)
+                
+                # 检查进程状态
+                for name, proc in self.processes:
+                    if proc.poll() is not None:
+                        logger.error(f"❌ {name} unexpectedly stopped!")
+                        self.stop_services()
+                        return
+                        
+        except KeyboardInterrupt:
+            self.stop_services()
+
+
+def main():
+    """主函数"""
+    parser = argparse.ArgumentParser(
+        description='MinerU Tianshu - 统一启动脚本',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+示例:
+  # 使用默认配置启动(自动检测GPU)
+  python start_all.py
+  
+  # 使用CPU模式
+  python start_all.py --accelerator cpu
+  
+  # 指定输出目录和端口
+  python start_all.py --output-dir /data/output --api-port 8080
+  
+  # 每个GPU启动2个worker
+  python start_all.py --accelerator cuda --workers-per-device 2
+  
+  # 只使用指定的GPU
+  python start_all.py --accelerator cuda --devices 0,1
+        """
+    )
+    
+    parser.add_argument('--output-dir', type=str, default='/tmp/mineru_tianshu_output',
+                       help='输出目录 (默认: /tmp/mineru_tianshu_output)')
+    parser.add_argument('--api-port', type=int, default=8000,
+                       help='API服务器端口 (默认: 8000)')
+    parser.add_argument('--worker-port', type=int, default=9000,
+                       help='Worker服务器端口 (默认: 9000)')
+    parser.add_argument('--accelerator', type=str, default='auto',
+                       choices=['auto', 'cuda', 'cpu', 'mps'],
+                       help='加速器类型 (默认: auto,自动检测)')
+    parser.add_argument('--workers-per-device', type=int, default=1,
+                       help='每个GPU的worker数量 (默认: 1)')
+    parser.add_argument('--devices', type=str, default='auto',
+                       help='使用的GPU设备,逗号分隔 (默认: auto,使用所有GPU)')
+    
+    args = parser.parse_args()
+    
+    # 处理 devices 参数
+    devices = args.devices
+    if devices != 'auto':
+        try:
+            devices = [int(d) for d in devices.split(',')]
+        except:
+            logger.warning(f"Invalid devices format: {devices}, using 'auto'")
+            devices = 'auto'
+    
+    # 创建启动器
+    launcher = TianshuLauncher(
+        output_dir=args.output_dir,
+        api_port=args.api_port,
+        worker_port=args.worker_port,
+        workers_per_device=args.workers_per_device,
+        devices=devices,
+        accelerator=args.accelerator
+    )
+    
+    # 设置信号处理
+    signal.signal(signal.SIGINT, launcher.stop_services)
+    signal.signal(signal.SIGTERM, launcher.stop_services)
+    
+    # 启动服务
+    if launcher.start_services():
+        launcher.wait()
+    else:
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()
+

+ 279 - 0
projects/mineru_tianshu/task_db.py

@@ -0,0 +1,279 @@
+"""
+MinerU Tianshu - SQLite Task Database Manager
+天枢任务数据库管理器
+
+负责任务的持久化存储、状态管理和原子性操作
+"""
+import sqlite3
+import json
+import uuid
+from contextlib import contextmanager
+from typing import Optional, List, Dict
+from pathlib import Path
+
+
+class TaskDB:
+    """任务数据库管理类"""
+    
+    def __init__(self, db_path='mineru_tianshu.db'):
+        self.db_path = db_path
+        self._init_db()
+    
+    def _get_conn(self):
+        """获取数据库连接(每次创建新连接,避免 pickle 问题)"""
+        conn = sqlite3.connect(
+            self.db_path, 
+            check_same_thread=False,
+            timeout=30.0
+        )
+        conn.row_factory = sqlite3.Row
+        return conn
+    
+    @contextmanager
+    def get_cursor(self):
+        """上下文管理器,自动提交和错误处理"""
+        conn = self._get_conn()
+        cursor = conn.cursor()
+        try:
+            yield cursor
+            conn.commit()
+        except Exception as e:
+            conn.rollback()
+            raise e
+        finally:
+            conn.close()  # 关闭连接
+    
+    def _init_db(self):
+        """初始化数据库表"""
+        with self.get_cursor() as cursor:
+            cursor.execute('''
+                CREATE TABLE IF NOT EXISTS tasks (
+                    task_id TEXT PRIMARY KEY,
+                    file_name TEXT NOT NULL,
+                    file_path TEXT,
+                    status TEXT DEFAULT 'pending',
+                    priority INTEGER DEFAULT 0,
+                    backend TEXT DEFAULT 'pipeline',
+                    options TEXT,
+                    result_path TEXT,
+                    error_message TEXT,
+                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+                    started_at TIMESTAMP,
+                    completed_at TIMESTAMP,
+                    worker_id TEXT,
+                    retry_count INTEGER DEFAULT 0
+                )
+            ''')
+            
+            # 创建索引加速查询
+            cursor.execute('CREATE INDEX IF NOT EXISTS idx_status ON tasks(status)')
+            cursor.execute('CREATE INDEX IF NOT EXISTS idx_priority ON tasks(priority DESC)')
+            cursor.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON tasks(created_at)')
+            cursor.execute('CREATE INDEX IF NOT EXISTS idx_worker_id ON tasks(worker_id)')
+    
+    def create_task(self, file_name: str, file_path: str, 
+                   backend: str = 'pipeline', options: dict = None,
+                   priority: int = 0) -> str:
+        """
+        创建新任务
+        
+        Args:
+            file_name: 文件名
+            file_path: 文件路径
+            backend: 处理后端 (pipeline/vlm-transformers/vlm-vllm-engine)
+            options: 处理选项 (dict)
+            priority: 优先级,数字越大越优先
+            
+        Returns:
+            task_id: 任务ID
+        """
+        task_id = str(uuid.uuid4())
+        with self.get_cursor() as cursor:
+            cursor.execute('''
+                INSERT INTO tasks (task_id, file_name, file_path, backend, options, priority)
+                VALUES (?, ?, ?, ?, ?, ?)
+            ''', (task_id, file_name, file_path, backend, json.dumps(options or {}), priority))
+        return task_id
+    
+    def get_next_task(self, worker_id: str) -> Optional[Dict]:
+        """
+        获取下一个待处理任务(原子操作,防止并发冲突)
+        
+        Args:
+            worker_id: Worker ID
+            
+        Returns:
+            task: 任务字典,如果没有任务返回 None
+        """
+        with self.get_cursor() as cursor:
+            # 使用事务确保原子性
+            cursor.execute('BEGIN IMMEDIATE')
+            
+            # 按优先级和创建时间获取任务
+            cursor.execute('''
+                SELECT * FROM tasks 
+                WHERE status = 'pending' 
+                ORDER BY priority DESC, created_at ASC 
+                LIMIT 1
+            ''')
+            
+            task = cursor.fetchone()
+            if task:
+                # 立即标记为 processing
+                cursor.execute('''
+                    UPDATE tasks 
+                    SET status = 'processing', 
+                        started_at = CURRENT_TIMESTAMP, 
+                        worker_id = ?
+                    WHERE task_id = ?
+                ''', (worker_id, task['task_id']))
+                
+                return dict(task)
+            
+            return None
+    
+    def update_task_status(self, task_id: str, status: str, 
+                          result_path: str = None, error_message: str = None):
+        """
+        更新任务状态
+        
+        Args:
+            task_id: 任务ID
+            status: 新状态 (pending/processing/completed/failed/cancelled)
+            result_path: 结果路径(可选)
+            error_message: 错误信息(可选)
+        """
+        with self.get_cursor() as cursor:
+            updates = ['status = ?']
+            params = [status]
+            
+            if status == 'completed':
+                updates.append('completed_at = CURRENT_TIMESTAMP')
+                if result_path:
+                    updates.append('result_path = ?')
+                    params.append(result_path)
+            
+            if status == 'failed' and error_message:
+                updates.append('error_message = ?')
+                params.append(error_message)
+                updates.append('completed_at = CURRENT_TIMESTAMP')
+            
+            params.append(task_id)
+            cursor.execute(f'''
+                UPDATE tasks SET {', '.join(updates)}
+                WHERE task_id = ?
+            ''', params)
+    
+    def get_task(self, task_id: str) -> Optional[Dict]:
+        """
+        查询任务详情
+        
+        Args:
+            task_id: 任务ID
+            
+        Returns:
+            task: 任务字典,如果不存在返回 None
+        """
+        with self.get_cursor() as cursor:
+            cursor.execute('SELECT * FROM tasks WHERE task_id = ?', (task_id,))
+            task = cursor.fetchone()
+            return dict(task) if task else None
+    
+    def get_queue_stats(self) -> Dict[str, int]:
+        """
+        获取队列统计信息
+        
+        Returns:
+            stats: 各状态的任务数量
+        """
+        with self.get_cursor() as cursor:
+            cursor.execute('''
+                SELECT status, COUNT(*) as count 
+                FROM tasks 
+                GROUP BY status
+            ''')
+            stats = {row['status']: row['count'] for row in cursor.fetchall()}
+            return stats
+    
+    def get_tasks_by_status(self, status: str, limit: int = 100) -> List[Dict]:
+        """
+        根据状态获取任务列表
+        
+        Args:
+            status: 任务状态
+            limit: 返回数量限制
+            
+        Returns:
+            tasks: 任务列表
+        """
+        with self.get_cursor() as cursor:
+            cursor.execute('''
+                SELECT * FROM tasks 
+                WHERE status = ? 
+                ORDER BY created_at DESC 
+                LIMIT ?
+            ''', (status, limit))
+            return [dict(row) for row in cursor.fetchall()]
+    
+    def cleanup_old_tasks(self, days: int = 7):
+        """
+        清理旧任务记录
+        
+        Args:
+            days: 保留最近N天的任务
+        """
+        with self.get_cursor() as cursor:
+            cursor.execute('''
+                DELETE FROM tasks 
+                WHERE completed_at < datetime('now', '-' || ? || ' days')
+                AND status IN ('completed', 'failed')
+            ''', (days,))
+            deleted_count = cursor.rowcount
+            return deleted_count
+    
+    def reset_stale_tasks(self, timeout_minutes: int = 60):
+        """
+        重置超时的 processing 任务为 pending
+        
+        Args:
+            timeout_minutes: 超时时间(分钟)
+        """
+        with self.get_cursor() as cursor:
+            cursor.execute('''
+                UPDATE tasks 
+                SET status = 'pending',
+                    worker_id = NULL,
+                    retry_count = retry_count + 1
+                WHERE status = 'processing' 
+                AND started_at < datetime('now', '-' || ? || ' minutes')
+            ''', (timeout_minutes,))
+            reset_count = cursor.rowcount
+            return reset_count
+
+
+if __name__ == '__main__':
+    # 测试代码
+    db = TaskDB('test_tianshu.db')
+    
+    # 创建测试任务
+    task_id = db.create_task(
+        file_name='test.pdf',
+        file_path='/tmp/test.pdf',
+        backend='pipeline',
+        options={'lang': 'ch', 'formula_enable': True},
+        priority=1
+    )
+    print(f"Created task: {task_id}")
+    
+    # 查询任务
+    task = db.get_task(task_id)
+    print(f"Task details: {task}")
+    
+    # 获取统计
+    stats = db.get_queue_stats()
+    print(f"Queue stats: {stats}")
+    
+    # 清理测试数据库
+    Path('test_tianshu.db').unlink(missing_ok=True)
+    print("Test completed!")
+

+ 195 - 0
projects/mineru_tianshu/task_scheduler.py

@@ -0,0 +1,195 @@
+"""
+MinerU Tianshu - Task Scheduler
+天枢任务调度器
+
+定期检查任务队列,触发 LitServe Workers 拉取和处理任务
+"""
+import asyncio
+import aiohttp
+from loguru import logger
+from task_db import TaskDB
+import signal
+
+
+class TaskScheduler:
+    """
+    任务调度器
+    
+    职责:
+    1. 监控 SQLite 任务队列
+    2. 当有待处理任务时,触发 LitServe Workers
+    3. 管理调度策略(轮询间隔、并发控制等)
+    """
+    
+    def __init__(
+        self, 
+        litserve_url='http://localhost:9000/predict', 
+        poll_interval=2,
+        max_concurrent_polls=10
+    ):
+        """
+        初始化调度器
+        
+        Args:
+            litserve_url: LitServe Worker 的 URL
+            poll_interval: 轮询间隔(秒)
+            max_concurrent_polls: 最大并发轮询数
+        """
+        self.litserve_url = litserve_url
+        self.poll_interval = poll_interval
+        self.max_concurrent_polls = max_concurrent_polls
+        self.db = TaskDB()
+        self.running = True
+        self.semaphore = asyncio.Semaphore(max_concurrent_polls)
+    
+    async def trigger_worker_poll(self, session: aiohttp.ClientSession):
+        """
+        触发一个 worker 拉取任务
+        """
+        async with self.semaphore:
+            try:
+                async with session.post(
+                    self.litserve_url,
+                    json={'action': 'poll'},
+                    timeout=aiohttp.ClientTimeout(total=600)  # 10分钟超时
+                ) as resp:
+                    if resp.status == 200:
+                        result = await resp.json()
+                        
+                        if result.get('status') == 'completed':
+                            logger.info(f"✅ Task completed: {result.get('task_id')} by {result.get('worker_id')}")
+                        elif result.get('status') == 'failed':
+                            logger.error(f"❌ Task failed: {result.get('task_id')} - {result.get('error')}")
+                        elif result.get('status') == 'idle':
+                            # Worker 空闲,没有任务
+                            pass
+                        
+                        return result
+                    else:
+                        logger.error(f"Worker poll failed with status {resp.status}")
+                        
+            except asyncio.TimeoutError:
+                logger.warning("Worker poll timeout")
+            except Exception as e:
+                logger.error(f"Worker poll error: {e}")
+    
+    async def schedule_loop(self):
+        """
+        主调度循环
+        """
+        logger.info("🔄 Task scheduler started")
+        logger.info(f"   LitServe URL: {self.litserve_url}")
+        logger.info(f"   Poll Interval: {self.poll_interval}s")
+        logger.info(f"   Max Concurrent Polls: {self.max_concurrent_polls}")
+        
+        async with aiohttp.ClientSession() as session:
+            while self.running:
+                try:
+                    # 获取队列统计
+                    stats = self.db.get_queue_stats()
+                    pending_count = stats.get('pending', 0)
+                    processing_count = stats.get('processing', 0)
+                    
+                    if pending_count > 0:
+                        logger.info(f"📋 Queue status: {pending_count} pending, {processing_count} processing")
+                        
+                        # 计算需要触发的 worker 数量
+                        # 考虑:待处理任务数
+                        needed_workers = min(
+                            pending_count,  # 待处理任务数
+                            self.max_concurrent_polls  # 最大并发数
+                        )
+                        
+                        if needed_workers > 0:
+                            # 并发触发多个 worker
+                            # semaphore 会自动控制实际并发数
+                            tasks = [
+                                self.trigger_worker_poll(session) 
+                                for _ in range(needed_workers)
+                            ]
+                            await asyncio.gather(*tasks, return_exceptions=True)
+                    
+                    # 等待下一次轮询
+                    await asyncio.sleep(self.poll_interval)
+                    
+                except Exception as e:
+                    logger.error(f"Scheduler loop error: {e}")
+                    await asyncio.sleep(self.poll_interval)
+        
+        logger.info("⏹️  Task scheduler stopped")
+    
+    def start(self):
+        """启动调度器"""
+        logger.info("🚀 Starting MinerU Tianshu Task Scheduler...")
+        
+        # 设置信号处理
+        def signal_handler(sig, frame):
+            logger.info("\n🛑 Received stop signal, shutting down...")
+            self.running = False
+        
+        signal.signal(signal.SIGINT, signal_handler)
+        signal.signal(signal.SIGTERM, signal_handler)
+        
+        # 运行调度循环
+        asyncio.run(self.schedule_loop())
+    
+    def stop(self):
+        """停止调度器"""
+        self.running = False
+
+
+async def health_check(litserve_url: str) -> bool:
+    """
+    健康检查:验证 LitServe Worker 是否可用
+    """
+    try:
+        async with aiohttp.ClientSession() as session:
+            async with session.get(
+                litserve_url.replace('/predict', '/health'),
+                timeout=aiohttp.ClientTimeout(total=5)
+            ) as resp:
+                return resp.status == 200
+    except:
+        return False
+
+
+if __name__ == '__main__':
+    import argparse
+    
+    parser = argparse.ArgumentParser(description='MinerU Tianshu Task Scheduler')
+    parser.add_argument('--litserve-url', type=str, default='http://localhost:9000/predict',
+                       help='LitServe worker URL')
+    parser.add_argument('--poll-interval', type=int, default=2,
+                       help='Poll interval in seconds')
+    parser.add_argument('--max-concurrent', type=int, default=10,
+                       help='Maximum concurrent worker polls')
+    parser.add_argument('--wait-for-workers', action='store_true',
+                       help='Wait for workers to be ready before starting')
+    
+    args = parser.parse_args()
+    
+    # 等待 workers 就绪(可选)
+    if args.wait_for_workers:
+        logger.info("⏳ Waiting for LitServe workers to be ready...")
+        import time
+        max_retries = 30
+        for i in range(max_retries):
+            if asyncio.run(health_check(args.litserve_url)):
+                logger.info("✅ LitServe workers are ready!")
+                break
+            time.sleep(2)
+            if i == max_retries - 1:
+                logger.error("❌ LitServe workers not responding, starting anyway...")
+    
+    # 创建并启动调度器
+    scheduler = TaskScheduler(
+        litserve_url=args.litserve_url,
+        poll_interval=args.poll_interval,
+        max_concurrent_polls=args.max_concurrent
+    )
+    
+    try:
+        scheduler.start()
+    except KeyboardInterrupt:
+        logger.info("👋 Scheduler interrupted by user")
+