parse_service.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import uvicorn
  2. import time
  3. import threading
  4. import argparse
  5. from fastapi import FastAPI, BackgroundTasks
  6. from pydantic import BaseModel
  7. from typing import Dict, Optional, List
  8. import psutil
  9. import os
  10. # 初始化FastAPI应用
  11. app = FastAPI(title="Python解析服务", version="1.0")
  12. # 存储任务状态:key=task_id, value={status, progress, result, error}
  13. task_status: Dict[str, dict] = {}
  14. # 存储服务状态
  15. service_status = {
  16. "status": "running",
  17. "pid": os.getpid(),
  18. "start_time": time.time()
  19. }
  20. # 锁机制,保证多线程安全
  21. task_lock = threading.Lock()
  22. # 任务请求模型
  23. class ParseTask(BaseModel):
  24. task_id: str
  25. file_path: str
  26. parse_params: Optional[Dict] = None # 解析参数,如解析类型、阈值等
  27. # ------------------------------
  28. # 核心解析逻辑(模拟真实解析器调用)
  29. # ------------------------------
  30. def parse_task_worker(task_id: str, file_path: str, parse_params: Dict):
  31. """
  32. 后台解析任务执行函数
  33. """
  34. try:
  35. with task_lock:
  36. task_status[task_id] = {
  37. "status": "running",
  38. "progress": 0,
  39. "result": None,
  40. "error": None
  41. }
  42. # 模拟解析过程(实际场景替换为真实解析器调用)
  43. total_steps = 10
  44. for step in range(total_steps):
  45. time.sleep(1) # 模拟解析耗时
  46. progress = (step + 1) * 10
  47. with task_lock:
  48. task_status[task_id]["progress"] = progress
  49. # 解析完成,模拟返回结果
  50. with task_lock:
  51. task_status[task_id]["status"] = "completed"
  52. task_status[task_id]["result"] = {
  53. "file_path": file_path,
  54. "parse_result": "解析成功(模拟结果)",
  55. "parse_params": parse_params,
  56. "finish_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  57. }
  58. except Exception as e:
  59. with task_lock:
  60. task_status[task_id]["status"] = "failed"
  61. task_status[task_id]["error"] = str(e)
  62. # ------------------------------
  63. # 接口定义
  64. # ------------------------------
  65. @app.post("/execute", summary="接收解析任务并执行")
  66. async def execute_task(task: ParseTask, background_tasks: BackgroundTasks):
  67. """
  68. 接收Java端下发的解析任务,后台异步执行
  69. """
  70. # 检查任务ID是否已存在
  71. if task.task_id in task_status:
  72. return {
  73. "code": 400,
  74. "msg": f"任务ID {task.task_id} 已存在",
  75. "data": None
  76. }
  77. # 提交后台任务执行
  78. background_tasks.add_task(parse_task_worker, task.task_id, task.file_path, task.parse_params)
  79. return {
  80. "code": 200,
  81. "msg": "任务接收成功,已开始执行",
  82. "data": {"task_id": task.task_id}
  83. }
  84. @app.get("/status/{task_id}", summary="查询指定任务状态")
  85. async def get_task_status(task_id: str):
  86. """
  87. 查询单个任务的执行状态、进度、结果
  88. """
  89. if task_id not in task_status:
  90. return {
  91. "code": 404,
  92. "msg": f"任务ID {task_id} 不存在",
  93. "data": None
  94. }
  95. return {
  96. "code": 200,
  97. "msg": "查询成功",
  98. "data": task_status[task_id]
  99. }
  100. @app.get("/status", summary="状态接口")
  101. async def health_check():
  102. """
  103. 返回实例健康状态、资源使用情况
  104. """
  105. cpu_usage = psutil.cpu_percent(interval=1)
  106. # 内存信息
  107. memory = psutil.virtual_memory()
  108. memory_usage = memory.percent
  109. return {
  110. "code": 200,
  111. "msg": "success",
  112. "data": {
  113. "status": 0,
  114. "cpu_usage": cpu_usage,
  115. "gpu_usage": 0.0,
  116. "memory_usage": memory_usage,
  117. "gpu_memory": 0.0
  118. }
  119. }
  120. @app.get("/shutdown", summary="优雅关闭服务(供Java端终结实例调用)")
  121. async def shutdown_service():
  122. """
  123. 优雅关闭服务,清理资源
  124. """
  125. service_status["status"] = "stopping"
  126. # 可添加清理逻辑:如保存未完成任务、关闭解析器连接等
  127. return {
  128. "code": 200,
  129. "msg": "服务开始优雅关闭",
  130. "data": None
  131. }
  132. # ------------------------------
  133. # 启动入口
  134. # ------------------------------
  135. def main():
  136. parser = argparse.ArgumentParser(description="Python解析服务启动参数")
  137. parser.add_argument("--host", default="0.0.0.0", help="服务监听地址")
  138. parser.add_argument("--port", type=int, default=8000, help="服务监听端口")
  139. args = parser.parse_args()
  140. # 启动FastAPI服务
  141. uvicorn.run(
  142. "parse_service:app",
  143. host=args.host,
  144. port=args.port,
  145. reload=False, # 生产环境关闭热重载
  146. workers=1 # 单进程运行,避免任务状态共享问题
  147. )
  148. if __name__ == "__main__":
  149. main()