main.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. """
  2. 多模态解析服务 - FastAPI 主入口
  3. """
  4. import os
  5. import uuid
  6. import time
  7. import tempfile
  8. from pathlib import Path
  9. from typing import Optional
  10. from contextlib import asynccontextmanager
  11. from fastapi import FastAPI, File, UploadFile, HTTPException
  12. from fastapi.responses import JSONResponse
  13. from loguru import logger
  14. from core.router import ParserFactory
  15. from models.result import ParseResult
  16. from utils.logger import log
  17. # 配置
  18. UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/tmp/parse-service/uploads")
  19. os.makedirs(UPLOAD_DIR, exist_ok=True)
  20. # 全局解析器工厂
  21. parser_factory: Optional[ParserFactory] = None
  22. @asynccontextmanager
  23. async def lifespan(app: FastAPI):
  24. """应用生命周期管理"""
  25. global parser_factory
  26. logger.info("正在初始化解析服务...")
  27. # 初始化解析器工厂
  28. parser_factory = ParserFactory()
  29. logger.info("解析服务初始化完成")
  30. yield
  31. logger.info("正在关闭解析服务...")
  32. app = FastAPI(
  33. title="多模态解析服务",
  34. description="支持 PDF/Office/图片/音视频的多模态文件解析服务",
  35. version="1.0.0",
  36. lifespan=lifespan
  37. )
  38. class ParseResponse:
  39. """解析响应模型"""
  40. @staticmethod
  41. def success(task_id: str, file_type: str, result: ParseResult, parse_time_ms: float):
  42. return {
  43. "code": 200,
  44. "message": "success",
  45. "data": {
  46. "task_id": task_id,
  47. "file_type": file_type,
  48. "content": result.content,
  49. "metadata": result.metadata,
  50. "tables": result.tables,
  51. "parse_time_ms": parse_time_ms
  52. }
  53. }
  54. @staticmethod
  55. def error(message: str, code: int = 500):
  56. return {
  57. "code": code,
  58. "message": message,
  59. "data": None
  60. }
  61. @app.get("/health")
  62. async def health_check():
  63. """健康检查"""
  64. return {
  65. "status": "healthy",
  66. "service": "parse-service",
  67. "models": {
  68. "qwen_vl": "connected",
  69. "qwen_asr": "connected",
  70. "mineru": "connected"
  71. }
  72. }
  73. @app.post("/api/v1/parse")
  74. async def parse_file(file: UploadFile = File(...)):
  75. """
  76. 解析单个文件
  77. Args:
  78. file: 上传的文件
  79. Returns:
  80. 解析结果(Markdown + 元数据)
  81. """
  82. global parser_factory
  83. if not parser_factory:
  84. raise HTTPException(status_code=500, detail="解析服务未初始化")
  85. task_id = str(uuid.uuid4())
  86. start_time = time.time()
  87. try:
  88. # 保存上传的文件
  89. file_ext = Path(file.filename).suffix if file.filename else ""
  90. temp_file = tempfile.NamedTemporaryFile(
  91. delete=False, suffix=file_ext, dir=UPLOAD_DIR
  92. )
  93. try:
  94. content = await file.read()
  95. temp_file.write(content)
  96. temp_file_path = temp_file.name
  97. finally:
  98. temp_file.close()
  99. logger.info(f"[{task_id}] 开始解析文件: {file.filename}, 临时路径: {temp_file_path}")
  100. # 执行解析
  101. result = await parser_factory.parse(temp_file_path)
  102. parse_time_ms = (time.time() - start_time) * 1000
  103. logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms")
  104. # 清理临时文件
  105. try:
  106. os.unlink(temp_file_path)
  107. except:
  108. pass
  109. return ParseResponse.success(
  110. task_id=task_id,
  111. file_type=result.file_type,
  112. result=result,
  113. parse_time_ms=parse_time_ms
  114. )
  115. except Exception as e:
  116. logger.error(f"[{task_id}] 文件解析失败: {str(e)}")
  117. raise HTTPException(status_code=500, detail=str(e))
  118. @app.post("/api/v1/parse/path")
  119. async def parse_file_path(file_path: str):
  120. """
  121. 解析本地路径的文件(用于 Flink 调用)
  122. Args:
  123. file_path: 文件路径
  124. Returns:
  125. 解析结果
  126. """
  127. global parser_factory
  128. if not parser_factory:
  129. raise HTTPException(status_code=500, detail="解析服务未初始化")
  130. task_id = str(uuid.uuid4())
  131. start_time = time.time()
  132. try:
  133. if not os.path.exists(file_path):
  134. raise HTTPException(status_code=404, detail=f"文件不存在: {file_path}")
  135. logger.info(f"[{task_id}] 开始解析文件: {file_path}")
  136. # 执行解析
  137. result = await parser_factory.parse(file_path)
  138. parse_time_ms = (time.time() - start_time) * 1000
  139. logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms")
  140. return ParseResponse.success(
  141. task_id=task_id,
  142. file_type=result.file_type,
  143. result=result,
  144. parse_time_ms=parse_time_ms
  145. )
  146. except HTTPException:
  147. raise
  148. except Exception as e:
  149. logger.error(f"[{task_id}] 文件解析失败: {str(e)}")
  150. raise HTTPException(status_code=500, detail=str(e))
  151. if __name__ == "__main__":
  152. import uvicorn
  153. uvicorn.run(
  154. "app.main:app",
  155. host="0.0.0.0",
  156. port=8000,
  157. workers=1
  158. )