parse_service.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import uvicorn
  2. import time
  3. import threading
  4. import argparse
  5. from fastapi import FastAPI, BackgroundTasks
  6. from pydantic import BaseModel, Field
  7. from typing import Dict, Optional, List
  8. import os
  9. import httpx
  10. # 初始化FastAPI应用
  11. app = FastAPI(title="Python解析服务", version="1.0")
  12. # 存储任务状态:key=task_id, value={status, progress, result, error}
  13. task_status: Dict[str, dict] = {}
  14. # 存储服务状态
  15. service_status = {
  16. "status": "running",
  17. "pid": os.getpid(),
  18. "start_time": time.time()
  19. }
  20. # 锁机制,保证多线程安全
  21. task_lock = threading.Lock()
  22. # ------------------------------
  23. # 请求模型定义
  24. # ------------------------------
  25. class ExecuteRequest(BaseModel):
  26. file_path: str=Field(alias="filePath")
  27. task_id:str=Field(alias="taskId")
  28. # ------------------------------
  29. # 接口定义
  30. # ------------------------------
  31. @app.post("/execute", summary="接收解析任务并执行")
  32. async def execute_task(request: ExecuteRequest):
  33. """
  34. 接收Java端下发的解析任务,后台异步执行
  35. """
  36. try:
  37. # 验证请求参数
  38. if not request.file_path or not request.task_id:
  39. return {
  40. "code": 400,
  41. "msg": "参数错误:filePath和taskId不能为空",
  42. "data": None
  43. }
  44. # 调用远程解析服务
  45. remote_url = "http://localhost:1086/execute"
  46. async with httpx.AsyncClient(timeout=300) as client:
  47. try:
  48. response = await client.post(
  49. remote_url,
  50. json={
  51. "filePath": request.file_path,
  52. "taskId": request.task_id
  53. }
  54. )
  55. response.raise_for_status()
  56. return response.json()
  57. except Exception as e:
  58. return {
  59. "code": 500,
  60. "msg": f"调用远程解析服务失败: {str(e)}",
  61. "data": None
  62. }
  63. except Exception as e:
  64. return {
  65. "code": 500,
  66. "msg": f"服务内部错误: {str(e)}",
  67. "data": None
  68. }
  69. @app.get("/status", summary="状态接口")
  70. async def health_check():
  71. """
  72. 返回实例健康状态、资源使用情况
  73. """
  74. return {
  75. "code": 200,
  76. "msg": "success",
  77. "data": {
  78. "status": 0,
  79. "cpu_usage": 0.2,
  80. "gpu_usage": 0.0,
  81. "memory_usage": 0.2,
  82. "gpu_memory": 0.0
  83. }
  84. }
  85. @app.get("/shutdown", summary="优雅关闭服务(供Java端终结实例调用)")
  86. async def shutdown_service():
  87. """
  88. 优雅关闭服务,清理资源
  89. """
  90. service_status["status"] = "stopping"
  91. # 可添加清理逻辑:如保存未完成任务、关闭解析器连接等
  92. return {
  93. "code": 200,
  94. "msg": "服务开始优雅关闭",
  95. "data": None
  96. }
  97. # ------------------------------
  98. # 启动入口
  99. # ------------------------------
  100. def main():
  101. parser = argparse.ArgumentParser(description="Python解析服务启动参数")
  102. parser.add_argument("--host", default="0.0.0.0", help="服务监听地址")
  103. parser.add_argument("--port", type=int, default=8000, help="服务监听端口")
  104. args = parser.parse_args()
  105. # 启动FastAPI服务
  106. uvicorn.run(
  107. "parse_service:app",
  108. host=args.host,
  109. port=args.port,
  110. reload=False, # 生产环境关闭热重载
  111. workers=1 # 单进程运行,避免任务状态共享问题
  112. )
  113. if __name__ == "__main__":
  114. main()