""" 多模态解析服务 - FastAPI 主入口 功能: 1. 解析文件 (PDF/Office/图片/音视频) 2. 可选: 直接返回解析结果 3. 可选: 发送解析结果到 Kafka (供 schedule-embedding-api 消费) """ import os import sys import uuid import time import tempfile import json from pathlib import Path from typing import Optional from contextlib import asynccontextmanager from fastapi import FastAPI, File, UploadFile, HTTPException, Query from fastapi.responses import JSONResponse from loguru import logger # 添加当前目录到路径,确保导入正确 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from core.router import ParserFactory from models.result import ParseResult from utils.logger import log # ========== 配置 ========== UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/tmp/parse-service/uploads") os.makedirs(UPLOAD_DIR, exist_ok=True) # Kafka 配置 KAFKA_ENABLED = os.getenv("KAFKA_ENABLED", "false").lower() == "true" KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "10.192.72.13:9092") KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "embedding-topic") # 全局解析器工厂和 Kafka producer parser_factory: Optional[ParserFactory] = None kafka_producer = None def init_kafka_producer(): """初始化 Kafka Producer""" global kafka_producer if not KAFKA_ENABLED: logger.info("Kafka 未启用,跳过初始化") return try: from kafka import KafkaProducer logger.info(f"正在初始化 Kafka Producer: {KAFKA_BOOTSTRAP_SERVERS}") kafka_producer = KafkaProducer( bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS.split(","), value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"), retries=3, acks="all" ) logger.info("Kafka Producer 初始化成功") except Exception as e: logger.error(f"Kafka Producer 初始化失败: {e}") kafka_producer = None def send_to_kafka(task_message: dict): """发送消息到 Kafka""" if not KAFKA_ENABLED or not kafka_producer: return False try: future = kafka_producer.send(KAFKA_TOPIC, task_message) future.get(timeout=10) logger.info(f"消息已发送到 Kafka: docId={task_message.get('docId')}") return True except Exception as e: logger.error(f"发送到 Kafka 失败: {e}") return False @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" global parser_factory logger.info("正在初始化解析服务...") # 初始化解析器工厂 parser_factory = ParserFactory() # 初始化 Kafka Producer init_kafka_producer() logger.info("解析服务初始化完成") logger.info(f" - Kafka 启用: {KAFKA_ENABLED}") if KAFKA_ENABLED: logger.info(f" - Kafka Servers: {KAFKA_BOOTSTRAP_SERVERS}") logger.info(f" - Kafka Topic: {KAFKA_TOPIC}") yield logger.info("正在关闭解析服务...") if kafka_producer: kafka_producer.close() app = FastAPI( title="多模态解析服务", description="支持 PDF/Office/图片/音视频的多模态文件解析服务", version="1.0.0", lifespan=lifespan ) class ParseResponse: """解析响应模型""" @staticmethod def success(task_id: str, file_type: str, result: ParseResult, parse_time_ms: float, sent_to_kafka: bool = False): return { "code": 200, "message": "success", "data": { "task_id": task_id, "file_type": file_type, "content": result.content, "metadata": result.metadata, "tables": result.tables, "parse_time_ms": parse_time_ms, "sent_to_kafka": sent_to_kafka } } @staticmethod def error(message: str, code: int = 500): return { "code": code, "message": message, "data": None } def build_embedding_task_message( doc_id: str, file_name: str, full_text: str, file_path: str, file_size: Optional[int], file_type: str, metadata: Optional[dict], task_id: Optional[str] = None ) -> dict: """ 构建 Kafka 消息(匹配 EmbeddingTaskMessage 格式) 参考: schedule-embedding-api 中的 EmbeddingTaskMessage.java """ return { "docId": doc_id, "fileName": file_name, "fullText": full_text, "filePath": file_path, "fileSize": file_size, "fileType": file_type, "metadata": metadata, "callbackUrl": None, "taskId": task_id or doc_id } @app.get("/health") async def health_check(): """健康检查""" return { "status": "healthy", "service": "parse-service", "kafka_enabled": KAFKA_ENABLED, "models": { "qwen_vl": "connected", "qwen_asr": "connected", "mineru": "connected" } } @app.post("/api/v1/parse") async def parse_file( file: UploadFile = File(...), send_to_kafka_flag: bool = Query(False, description="是否发送到 Kafka") ): """ 解析单个文件 Args: file: 上传的文件 send_to_kafka_flag: 是否发送解析结果到 Kafka Returns: 解析结果(Markdown + 元数据) """ global parser_factory if not parser_factory: raise HTTPException(status_code=500, detail="解析服务未初始化") task_id = str(uuid.uuid4()) start_time = time.time() sent_to_kafka = False try: # 保存上传的文件 file_ext = Path(file.filename).suffix if file.filename else "" temp_file = tempfile.NamedTemporaryFile( delete=False, suffix=file_ext, dir=UPLOAD_DIR ) try: content = await file.read() temp_file.write(content) temp_file_path = temp_file.name file_size = len(content) finally: temp_file.close() logger.info(f"[{task_id}] 开始解析文件: {file.filename}, 临时路径: {temp_file_path}") # 执行解析 result = await parser_factory.parse(temp_file_path) parse_time_ms = (time.time() - start_time) * 1000 logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms") # 如果启用 Kafka 且用户要求发送,则发送到 Kafka if KAFKA_ENABLED and send_to_kafka_flag: doc_id = f"parse-{uuid.uuid4().hex[:8]}" task_message = build_embedding_task_message( doc_id=doc_id, file_name=file.filename or "unknown", full_text=result.content, file_path=temp_file_path, file_size=file_size, file_type=result.file_type, metadata=result.metadata, task_id=task_id ) sent_to_kafka = send_to_kafka(task_message) # 清理临时文件 try: os.unlink(temp_file_path) except: pass return ParseResponse.success( task_id=task_id, file_type=result.file_type, result=result, parse_time_ms=parse_time_ms, sent_to_kafka=sent_to_kafka ) except Exception as e: logger.error(f"[{task_id}] 文件解析失败: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/v1/parse/path") async def parse_file_path( file_path: str, send_to_kafka_flag: bool = Query(False, description="是否发送到 Kafka") ): """ 解析本地路径的文件(用于 Flink 调用) Args: file_path: 文件路径 send_to_kafka_flag: 是否发送解析结果到 Kafka Returns: 解析结果 """ global parser_factory if not parser_factory: raise HTTPException(status_code=500, detail="解析服务未初始化") task_id = str(uuid.uuid4()) start_time = time.time() sent_to_kafka = False try: if not os.path.exists(file_path): raise HTTPException(status_code=404, detail=f"文件不存在: {file_path}") logger.info(f"[{task_id}] 开始解析文件: {file_path}") # 获取文件大小 file_size = None try: file_size = os.path.getsize(file_path) except: pass # 执行解析 result = await parser_factory.parse(file_path) parse_time_ms = (time.time() - start_time) * 1000 logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms") # 如果启用 Kafka 且用户要求发送,则发送到 Kafka if KAFKA_ENABLED and send_to_kafka_flag: doc_id = f"parse-{uuid.uuid4().hex[:8]}" file_name = os.path.basename(file_path) task_message = build_embedding_task_message( doc_id=doc_id, file_name=file_name, full_text=result.content, file_path=file_path, file_size=file_size, file_type=result.file_type, metadata=result.metadata, task_id=task_id ) sent_to_kafka = send_to_kafka(task_message) return ParseResponse.success( task_id=task_id, file_type=result.file_type, result=result, parse_time_ms=parse_time_ms, sent_to_kafka=sent_to_kafka ) except HTTPException: raise except Exception as e: logger.error(f"[{task_id}] 文件解析失败: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=8000, workers=1 )