""" 多模态解析服务 - FastAPI 主入口 """ import os import uuid import time import tempfile from pathlib import Path from typing import Optional from contextlib import asynccontextmanager from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.responses import JSONResponse from loguru import logger from core.router import ParserFactory from models.result import ParseResult from utils.logger import log # 配置 UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/tmp/parse-service/uploads") os.makedirs(UPLOAD_DIR, exist_ok=True) # 全局解析器工厂 parser_factory: Optional[ParserFactory] = None @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" global parser_factory logger.info("正在初始化解析服务...") # 初始化解析器工厂 parser_factory = ParserFactory() logger.info("解析服务初始化完成") yield logger.info("正在关闭解析服务...") app = FastAPI( title="多模态解析服务", description="支持 PDF/Office/图片/音视频的多模态文件解析服务", version="1.0.0", lifespan=lifespan ) class ParseResponse: """解析响应模型""" @staticmethod def success(task_id: str, file_type: str, result: ParseResult, parse_time_ms: float): return { "code": 200, "message": "success", "data": { "task_id": task_id, "file_type": file_type, "content": result.content, "metadata": result.metadata, "tables": result.tables, "parse_time_ms": parse_time_ms } } @staticmethod def error(message: str, code: int = 500): return { "code": code, "message": message, "data": None } @app.get("/health") async def health_check(): """健康检查""" return { "status": "healthy", "service": "parse-service", "models": { "qwen_vl": "connected", "qwen_asr": "connected", "mineru": "connected" } } @app.post("/api/v1/parse") async def parse_file(file: UploadFile = File(...)): """ 解析单个文件 Args: file: 上传的文件 Returns: 解析结果(Markdown + 元数据) """ global parser_factory if not parser_factory: raise HTTPException(status_code=500, detail="解析服务未初始化") task_id = str(uuid.uuid4()) start_time = time.time() try: # 保存上传的文件 file_ext = Path(file.filename).suffix if file.filename else "" temp_file = tempfile.NamedTemporaryFile( delete=False, suffix=file_ext, dir=UPLOAD_DIR ) try: content = await file.read() temp_file.write(content) temp_file_path = temp_file.name finally: temp_file.close() logger.info(f"[{task_id}] 开始解析文件: {file.filename}, 临时路径: {temp_file_path}") # 执行解析 result = await parser_factory.parse(temp_file_path) parse_time_ms = (time.time() - start_time) * 1000 logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms") # 清理临时文件 try: os.unlink(temp_file_path) except: pass return ParseResponse.success( task_id=task_id, file_type=result.file_type, result=result, parse_time_ms=parse_time_ms ) except Exception as e: logger.error(f"[{task_id}] 文件解析失败: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v1/parse/path") async def parse_file_path(file_path: str): """ 解析本地路径的文件(用于 Flink 调用) Args: file_path: 文件路径 Returns: 解析结果 """ global parser_factory if not parser_factory: raise HTTPException(status_code=500, detail="解析服务未初始化") task_id = str(uuid.uuid4()) start_time = time.time() try: if not os.path.exists(file_path): raise HTTPException(status_code=404, detail=f"文件不存在: {file_path}") logger.info(f"[{task_id}] 开始解析文件: {file_path}") # 执行解析 result = await parser_factory.parse(file_path) parse_time_ms = (time.time() - start_time) * 1000 logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms") return ParseResponse.success( task_id=task_id, file_type=result.file_type, result=result, parse_time_ms=parse_time_ms ) except HTTPException: raise except Exception as e: logger.error(f"[{task_id}] 文件解析失败: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run( "app.main:app", host="0.0.0.0", port=8000, workers=1 )