| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- """
- 多模态解析服务 - FastAPI 主入口
- 功能:
- 1. 解析文件 (PDF/Office/图片/音视频)
- 2. 可选: 直接返回解析结果
- 3. 可选: 发送解析结果到 Kafka (供 schedule-embedding-api 消费)
- """
- import os
- import sys
- import uuid
- import time
- import tempfile
- import json
- from pathlib import Path
- from typing import Optional
- from contextlib import asynccontextmanager
- from fastapi import FastAPI, File, UploadFile, HTTPException, Query
- from fastapi.responses import JSONResponse
- from loguru import logger
- # 添加当前目录到路径,确保导入正确
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- from core.router import ParserFactory
- from models.result import ParseResult
- from utils.logger import log
- # ========== 配置 ==========
- UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/tmp/parse-service/uploads")
- os.makedirs(UPLOAD_DIR, exist_ok=True)
- # Kafka 配置
- KAFKA_ENABLED = os.getenv("KAFKA_ENABLED", "false").lower() == "true"
- KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "10.192.72.13:9092")
- KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "embedding-topic")
- # 全局解析器工厂和 Kafka producer
- parser_factory: Optional[ParserFactory] = None
- kafka_producer = None
- def init_kafka_producer():
- """初始化 Kafka Producer"""
- global kafka_producer
- if not KAFKA_ENABLED:
- logger.info("Kafka 未启用,跳过初始化")
- return
- try:
- from kafka import KafkaProducer
- logger.info(f"正在初始化 Kafka Producer: {KAFKA_BOOTSTRAP_SERVERS}")
- kafka_producer = KafkaProducer(
- bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS.split(","),
- value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
- retries=3,
- acks="all"
- )
- logger.info("Kafka Producer 初始化成功")
- except Exception as e:
- logger.error(f"Kafka Producer 初始化失败: {e}")
- kafka_producer = None
- def send_to_kafka(task_message: dict):
- """发送消息到 Kafka"""
- if not KAFKA_ENABLED or not kafka_producer:
- return False
- try:
- future = kafka_producer.send(KAFKA_TOPIC, task_message)
- future.get(timeout=10)
- logger.info(f"消息已发送到 Kafka: docId={task_message.get('docId')}")
- return True
- except Exception as e:
- logger.error(f"发送到 Kafka 失败: {e}")
- return False
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- """应用生命周期管理"""
- global parser_factory
- logger.info("正在初始化解析服务...")
- # 初始化解析器工厂
- parser_factory = ParserFactory()
- # 初始化 Kafka Producer
- init_kafka_producer()
- logger.info("解析服务初始化完成")
- logger.info(f" - Kafka 启用: {KAFKA_ENABLED}")
- if KAFKA_ENABLED:
- logger.info(f" - Kafka Servers: {KAFKA_BOOTSTRAP_SERVERS}")
- logger.info(f" - Kafka Topic: {KAFKA_TOPIC}")
- yield
- logger.info("正在关闭解析服务...")
- if kafka_producer:
- kafka_producer.close()
- app = FastAPI(
- title="多模态解析服务",
- description="支持 PDF/Office/图片/音视频的多模态文件解析服务",
- version="1.0.0",
- lifespan=lifespan
- )
- class ParseResponse:
- """解析响应模型"""
- @staticmethod
- def success(task_id: str, file_type: str, result: ParseResult, parse_time_ms: float, sent_to_kafka: bool = False):
- return {
- "code": 200,
- "message": "success",
- "data": {
- "task_id": task_id,
- "file_type": file_type,
- "content": result.content,
- "metadata": result.metadata,
- "tables": result.tables,
- "parse_time_ms": parse_time_ms,
- "sent_to_kafka": sent_to_kafka
- }
- }
- @staticmethod
- def error(message: str, code: int = 500):
- return {
- "code": code,
- "message": message,
- "data": None
- }
- def build_embedding_task_message(
- doc_id: str,
- file_name: str,
- full_text: str,
- file_path: str,
- file_size: Optional[int],
- file_type: str,
- metadata: Optional[dict],
- task_id: Optional[str] = None
- ) -> dict:
- """
- 构建 Kafka 消息(匹配 EmbeddingTaskMessage 格式)
- 参考: schedule-embedding-api 中的 EmbeddingTaskMessage.java
- """
- return {
- "docId": doc_id,
- "fileName": file_name,
- "fullText": full_text,
- "filePath": file_path,
- "fileSize": file_size,
- "fileType": file_type,
- "metadata": metadata,
- "callbackUrl": None,
- "taskId": task_id or doc_id
- }
- @app.get("/health")
- async def health_check():
- """健康检查"""
- return {
- "status": "healthy",
- "service": "parse-service",
- "kafka_enabled": KAFKA_ENABLED,
- "models": {
- "qwen_vl": "connected",
- "qwen_asr": "connected",
- "mineru": "connected"
- }
- }
- @app.post("/api/v1/parse")
- async def parse_file(
- file: UploadFile = File(...),
- send_to_kafka_flag: bool = Query(False, description="是否发送到 Kafka")
- ):
- """
- 解析单个文件
- Args:
- file: 上传的文件
- send_to_kafka_flag: 是否发送解析结果到 Kafka
- Returns:
- 解析结果(Markdown + 元数据)
- """
- global parser_factory
- if not parser_factory:
- raise HTTPException(status_code=500, detail="解析服务未初始化")
- task_id = str(uuid.uuid4())
- start_time = time.time()
- sent_to_kafka = False
- try:
- # 保存上传的文件
- file_ext = Path(file.filename).suffix if file.filename else ""
- temp_file = tempfile.NamedTemporaryFile(
- delete=False, suffix=file_ext, dir=UPLOAD_DIR
- )
- try:
- content = await file.read()
- temp_file.write(content)
- temp_file_path = temp_file.name
- file_size = len(content)
- finally:
- temp_file.close()
- logger.info(f"[{task_id}] 开始解析文件: {file.filename}, 临时路径: {temp_file_path}")
- # 执行解析
- result = await parser_factory.parse(temp_file_path)
- parse_time_ms = (time.time() - start_time) * 1000
- logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms")
- # 如果启用 Kafka 且用户要求发送,则发送到 Kafka
- if KAFKA_ENABLED and send_to_kafka_flag:
- doc_id = f"parse-{uuid.uuid4().hex[:8]}"
- task_message = build_embedding_task_message(
- doc_id=doc_id,
- file_name=file.filename or "unknown",
- full_text=result.content,
- file_path=temp_file_path,
- file_size=file_size,
- file_type=result.file_type,
- metadata=result.metadata,
- task_id=task_id
- )
- sent_to_kafka = send_to_kafka(task_message)
- # 清理临时文件
- try:
- os.unlink(temp_file_path)
- except:
- pass
- return ParseResponse.success(
- task_id=task_id,
- file_type=result.file_type,
- result=result,
- parse_time_ms=parse_time_ms,
- sent_to_kafka=sent_to_kafka
- )
- except Exception as e:
- logger.error(f"[{task_id}] 文件解析失败: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/v1/parse/path")
- async def parse_file_path(
- file_path: str,
- send_to_kafka_flag: bool = Query(False, description="是否发送到 Kafka")
- ):
- """
- 解析本地路径的文件(用于 Flink 调用)
- Args:
- file_path: 文件路径
- send_to_kafka_flag: 是否发送解析结果到 Kafka
- Returns:
- 解析结果
- """
- global parser_factory
- if not parser_factory:
- raise HTTPException(status_code=500, detail="解析服务未初始化")
- task_id = str(uuid.uuid4())
- start_time = time.time()
- sent_to_kafka = False
- try:
- if not os.path.exists(file_path):
- raise HTTPException(status_code=404, detail=f"文件不存在: {file_path}")
- logger.info(f"[{task_id}] 开始解析文件: {file_path}")
- # 获取文件大小
- file_size = None
- try:
- file_size = os.path.getsize(file_path)
- except:
- pass
- # 执行解析
- result = await parser_factory.parse(file_path)
- parse_time_ms = (time.time() - start_time) * 1000
- logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms")
- # 如果启用 Kafka 且用户要求发送,则发送到 Kafka
- if KAFKA_ENABLED and send_to_kafka_flag:
- doc_id = f"parse-{uuid.uuid4().hex[:8]}"
- file_name = os.path.basename(file_path)
- task_message = build_embedding_task_message(
- doc_id=doc_id,
- file_name=file_name,
- full_text=result.content,
- file_path=file_path,
- file_size=file_size,
- file_type=result.file_type,
- metadata=result.metadata,
- task_id=task_id
- )
- sent_to_kafka = send_to_kafka(task_message)
- return ParseResponse.success(
- task_id=task_id,
- file_type=result.file_type,
- result=result,
- parse_time_ms=parse_time_ms,
- sent_to_kafka=sent_to_kafka
- )
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"[{task_id}] 文件解析失败: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(
- "main:app",
- host="0.0.0.0",
- port=8000,
- workers=1
- )
|