| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- """
- 多模态解析服务 - FastAPI 主入口
- """
- import os
- import uuid
- import time
- import tempfile
- from pathlib import Path
- from typing import Optional
- from contextlib import asynccontextmanager
- from fastapi import FastAPI, File, UploadFile, HTTPException
- from fastapi.responses import JSONResponse
- from loguru import logger
- from core.router import ParserFactory
- from models.result import ParseResult
- from utils.logger import log
- # 配置
- UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/tmp/parse-service/uploads")
- os.makedirs(UPLOAD_DIR, exist_ok=True)
- # 全局解析器工厂
- parser_factory: Optional[ParserFactory] = None
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- """应用生命周期管理"""
- global parser_factory
- logger.info("正在初始化解析服务...")
- # 初始化解析器工厂
- parser_factory = ParserFactory()
- logger.info("解析服务初始化完成")
- yield
- logger.info("正在关闭解析服务...")
- app = FastAPI(
- title="多模态解析服务",
- description="支持 PDF/Office/图片/音视频的多模态文件解析服务",
- version="1.0.0",
- lifespan=lifespan
- )
- class ParseResponse:
- """解析响应模型"""
- @staticmethod
- def success(task_id: str, file_type: str, result: ParseResult, parse_time_ms: float):
- return {
- "code": 200,
- "message": "success",
- "data": {
- "task_id": task_id,
- "file_type": file_type,
- "content": result.content,
- "metadata": result.metadata,
- "tables": result.tables,
- "parse_time_ms": parse_time_ms
- }
- }
- @staticmethod
- def error(message: str, code: int = 500):
- return {
- "code": code,
- "message": message,
- "data": None
- }
- @app.get("/health")
- async def health_check():
- """健康检查"""
- return {
- "status": "healthy",
- "service": "parse-service",
- "models": {
- "qwen_vl": "connected",
- "qwen_asr": "connected",
- "mineru": "connected"
- }
- }
- @app.post("/api/v1/parse")
- async def parse_file(file: UploadFile = File(...)):
- """
- 解析单个文件
- Args:
- file: 上传的文件
- Returns:
- 解析结果(Markdown + 元数据)
- """
- global parser_factory
- if not parser_factory:
- raise HTTPException(status_code=500, detail="解析服务未初始化")
- task_id = str(uuid.uuid4())
- start_time = time.time()
- try:
- # 保存上传的文件
- file_ext = Path(file.filename).suffix if file.filename else ""
- temp_file = tempfile.NamedTemporaryFile(
- delete=False, suffix=file_ext, dir=UPLOAD_DIR
- )
- try:
- content = await file.read()
- temp_file.write(content)
- temp_file_path = temp_file.name
- finally:
- temp_file.close()
- logger.info(f"[{task_id}] 开始解析文件: {file.filename}, 临时路径: {temp_file_path}")
- # 执行解析
- result = await parser_factory.parse(temp_file_path)
- parse_time_ms = (time.time() - start_time) * 1000
- logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms")
- # 清理临时文件
- try:
- os.unlink(temp_file_path)
- except:
- pass
- return ParseResponse.success(
- task_id=task_id,
- file_type=result.file_type,
- result=result,
- parse_time_ms=parse_time_ms
- )
- except Exception as e:
- logger.error(f"[{task_id}] 文件解析失败: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/v1/parse/path")
- async def parse_file_path(file_path: str):
- """
- 解析本地路径的文件(用于 Flink 调用)
- Args:
- file_path: 文件路径
- Returns:
- 解析结果
- """
- global parser_factory
- if not parser_factory:
- raise HTTPException(status_code=500, detail="解析服务未初始化")
- task_id = str(uuid.uuid4())
- start_time = time.time()
- try:
- if not os.path.exists(file_path):
- raise HTTPException(status_code=404, detail=f"文件不存在: {file_path}")
- logger.info(f"[{task_id}] 开始解析文件: {file_path}")
- # 执行解析
- result = await parser_factory.parse(file_path)
- parse_time_ms = (time.time() - start_time) * 1000
- logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms")
- return ParseResponse.success(
- task_id=task_id,
- file_type=result.file_type,
- result=result,
- parse_time_ms=parse_time_ms
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"[{task_id}] 文件解析失败: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(
- "app.main:app",
- host="0.0.0.0",
- port=8000,
- workers=1
- )
|