main.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. """
  2. 多模态解析服务 - FastAPI 主入口
  3. 功能:
  4. 1. 解析文件 (PDF/Office/图片/音视频)
  5. 2. 可选: 直接返回解析结果
  6. 3. 可选: 发送解析结果到 Kafka (供 schedule-embedding-api 消费)
  7. """
  8. import os
  9. import sys
  10. import uuid
  11. import time
  12. import tempfile
  13. import json
  14. from pathlib import Path
  15. from typing import Optional
  16. from contextlib import asynccontextmanager
  17. from fastapi import FastAPI, File, UploadFile, HTTPException, Query
  18. from fastapi.responses import JSONResponse
  19. from loguru import logger
  20. # 添加当前目录到路径,确保导入正确
  21. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
  22. from core.router import ParserFactory
  23. from models.result import ParseResult
  24. from utils.logger import log
  25. # ========== 配置 ==========
  26. UPLOAD_DIR = os.getenv("UPLOAD_DIR", "/tmp/parse-service/uploads")
  27. os.makedirs(UPLOAD_DIR, exist_ok=True)
  28. # Kafka 配置
  29. KAFKA_ENABLED = os.getenv("KAFKA_ENABLED", "false").lower() == "true"
  30. KAFKA_BOOTSTRAP_SERVERS = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "10.192.72.13:9092")
  31. KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "embedding-topic")
  32. # 全局解析器工厂和 Kafka producer
  33. parser_factory: Optional[ParserFactory] = None
  34. kafka_producer = None
  35. def init_kafka_producer():
  36. """初始化 Kafka Producer"""
  37. global kafka_producer
  38. if not KAFKA_ENABLED:
  39. logger.info("Kafka 未启用,跳过初始化")
  40. return
  41. try:
  42. from kafka import KafkaProducer
  43. logger.info(f"正在初始化 Kafka Producer: {KAFKA_BOOTSTRAP_SERVERS}")
  44. kafka_producer = KafkaProducer(
  45. bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS.split(","),
  46. value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
  47. retries=3,
  48. acks="all"
  49. )
  50. logger.info("Kafka Producer 初始化成功")
  51. except Exception as e:
  52. logger.error(f"Kafka Producer 初始化失败: {e}")
  53. kafka_producer = None
  54. def send_to_kafka(task_message: dict):
  55. """发送消息到 Kafka"""
  56. if not KAFKA_ENABLED or not kafka_producer:
  57. return False
  58. try:
  59. future = kafka_producer.send(KAFKA_TOPIC, task_message)
  60. future.get(timeout=10)
  61. logger.info(f"消息已发送到 Kafka: docId={task_message.get('docId')}")
  62. return True
  63. except Exception as e:
  64. logger.error(f"发送到 Kafka 失败: {e}")
  65. return False
  66. @asynccontextmanager
  67. async def lifespan(app: FastAPI):
  68. """应用生命周期管理"""
  69. global parser_factory
  70. logger.info("正在初始化解析服务...")
  71. # 初始化解析器工厂
  72. parser_factory = ParserFactory()
  73. # 初始化 Kafka Producer
  74. init_kafka_producer()
  75. logger.info("解析服务初始化完成")
  76. logger.info(f" - Kafka 启用: {KAFKA_ENABLED}")
  77. if KAFKA_ENABLED:
  78. logger.info(f" - Kafka Servers: {KAFKA_BOOTSTRAP_SERVERS}")
  79. logger.info(f" - Kafka Topic: {KAFKA_TOPIC}")
  80. yield
  81. logger.info("正在关闭解析服务...")
  82. if kafka_producer:
  83. kafka_producer.close()
  84. app = FastAPI(
  85. title="多模态解析服务",
  86. description="支持 PDF/Office/图片/音视频的多模态文件解析服务",
  87. version="1.0.0",
  88. lifespan=lifespan
  89. )
  90. class ParseResponse:
  91. """解析响应模型"""
  92. @staticmethod
  93. def success(task_id: str, file_type: str, result: ParseResult, parse_time_ms: float, sent_to_kafka: bool = False):
  94. return {
  95. "code": 200,
  96. "message": "success",
  97. "data": {
  98. "task_id": task_id,
  99. "file_type": file_type,
  100. "content": result.content,
  101. "metadata": result.metadata,
  102. "tables": result.tables,
  103. "parse_time_ms": parse_time_ms,
  104. "sent_to_kafka": sent_to_kafka
  105. }
  106. }
  107. @staticmethod
  108. def error(message: str, code: int = 500):
  109. return {
  110. "code": code,
  111. "message": message,
  112. "data": None
  113. }
  114. def build_embedding_task_message(
  115. doc_id: str,
  116. file_name: str,
  117. full_text: str,
  118. file_path: str,
  119. file_size: Optional[int],
  120. file_type: str,
  121. metadata: Optional[dict],
  122. task_id: Optional[str] = None
  123. ) -> dict:
  124. """
  125. 构建 Kafka 消息(匹配 EmbeddingTaskMessage 格式)
  126. 参考: schedule-embedding-api 中的 EmbeddingTaskMessage.java
  127. """
  128. return {
  129. "docId": doc_id,
  130. "fileName": file_name,
  131. "fullText": full_text,
  132. "filePath": file_path,
  133. "fileSize": file_size,
  134. "fileType": file_type,
  135. "metadata": metadata,
  136. "callbackUrl": None,
  137. "taskId": task_id or doc_id
  138. }
  139. @app.get("/health")
  140. async def health_check():
  141. """健康检查"""
  142. return {
  143. "status": "healthy",
  144. "service": "parse-service",
  145. "kafka_enabled": KAFKA_ENABLED,
  146. "models": {
  147. "qwen_vl": "connected",
  148. "qwen_asr": "connected",
  149. "mineru": "connected"
  150. }
  151. }
  152. @app.post("/api/v1/parse")
  153. async def parse_file(
  154. file: UploadFile = File(...),
  155. send_to_kafka_flag: bool = Query(False, description="是否发送到 Kafka")
  156. ):
  157. """
  158. 解析单个文件
  159. Args:
  160. file: 上传的文件
  161. send_to_kafka_flag: 是否发送解析结果到 Kafka
  162. Returns:
  163. 解析结果(Markdown + 元数据)
  164. """
  165. global parser_factory
  166. if not parser_factory:
  167. raise HTTPException(status_code=500, detail="解析服务未初始化")
  168. task_id = str(uuid.uuid4())
  169. start_time = time.time()
  170. sent_to_kafka = False
  171. try:
  172. # 保存上传的文件
  173. file_ext = Path(file.filename).suffix if file.filename else ""
  174. temp_file = tempfile.NamedTemporaryFile(
  175. delete=False, suffix=file_ext, dir=UPLOAD_DIR
  176. )
  177. try:
  178. content = await file.read()
  179. temp_file.write(content)
  180. temp_file_path = temp_file.name
  181. file_size = len(content)
  182. finally:
  183. temp_file.close()
  184. logger.info(f"[{task_id}] 开始解析文件: {file.filename}, 临时路径: {temp_file_path}")
  185. # 执行解析
  186. result = await parser_factory.parse(temp_file_path)
  187. parse_time_ms = (time.time() - start_time) * 1000
  188. logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms")
  189. # 如果启用 Kafka 且用户要求发送,则发送到 Kafka
  190. if KAFKA_ENABLED and send_to_kafka_flag:
  191. doc_id = f"parse-{uuid.uuid4().hex[:8]}"
  192. task_message = build_embedding_task_message(
  193. doc_id=doc_id,
  194. file_name=file.filename or "unknown",
  195. full_text=result.content,
  196. file_path=temp_file_path,
  197. file_size=file_size,
  198. file_type=result.file_type,
  199. metadata=result.metadata,
  200. task_id=task_id
  201. )
  202. sent_to_kafka = send_to_kafka(task_message)
  203. # 清理临时文件
  204. try:
  205. os.unlink(temp_file_path)
  206. except:
  207. pass
  208. return ParseResponse.success(
  209. task_id=task_id,
  210. file_type=result.file_type,
  211. result=result,
  212. parse_time_ms=parse_time_ms,
  213. sent_to_kafka=sent_to_kafka
  214. )
  215. except Exception as e:
  216. logger.error(f"[{task_id}] 文件解析失败: {str(e)}")
  217. raise HTTPException(status_code=500, detail=str(e))
  218. @app.post("/api/v1/parse/path")
  219. async def parse_file_path(
  220. file_path: str,
  221. send_to_kafka_flag: bool = Query(False, description="是否发送到 Kafka")
  222. ):
  223. """
  224. 解析本地路径的文件(用于 Flink 调用)
  225. Args:
  226. file_path: 文件路径
  227. send_to_kafka_flag: 是否发送解析结果到 Kafka
  228. Returns:
  229. 解析结果
  230. """
  231. global parser_factory
  232. if not parser_factory:
  233. raise HTTPException(status_code=500, detail="解析服务未初始化")
  234. task_id = str(uuid.uuid4())
  235. start_time = time.time()
  236. sent_to_kafka = False
  237. try:
  238. if not os.path.exists(file_path):
  239. raise HTTPException(status_code=404, detail=f"文件不存在: {file_path}")
  240. logger.info(f"[{task_id}] 开始解析文件: {file_path}")
  241. # 获取文件大小
  242. file_size = None
  243. try:
  244. file_size = os.path.getsize(file_path)
  245. except:
  246. pass
  247. # 执行解析
  248. result = await parser_factory.parse(file_path)
  249. parse_time_ms = (time.time() - start_time) * 1000
  250. logger.info(f"[{task_id}] 文件解析完成, 耗时: {parse_time_ms:.2f}ms")
  251. # 如果启用 Kafka 且用户要求发送,则发送到 Kafka
  252. if KAFKA_ENABLED and send_to_kafka_flag:
  253. doc_id = f"parse-{uuid.uuid4().hex[:8]}"
  254. file_name = os.path.basename(file_path)
  255. task_message = build_embedding_task_message(
  256. doc_id=doc_id,
  257. file_name=file_name,
  258. full_text=result.content,
  259. file_path=file_path,
  260. file_size=file_size,
  261. file_type=result.file_type,
  262. metadata=result.metadata,
  263. task_id=task_id
  264. )
  265. sent_to_kafka = send_to_kafka(task_message)
  266. return ParseResponse.success(
  267. task_id=task_id,
  268. file_type=result.file_type,
  269. result=result,
  270. parse_time_ms=parse_time_ms,
  271. sent_to_kafka=sent_to_kafka
  272. )
  273. except HTTPException:
  274. raise
  275. except Exception as e:
  276. logger.error(f"[{task_id}] 文件解析失败: {str(e)}")
  277. raise HTTPException(status_code=500, detail=str(e))
  278. if __name__ == "__main__":
  279. import uvicorn
  280. uvicorn.run(
  281. "main:app",
  282. host="0.0.0.0",
  283. port=8000,
  284. workers=1
  285. )