import uvicorn import time import threading import argparse from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from typing import Dict, Optional, List import psutil import os # 初始化FastAPI应用 app = FastAPI(title="Python解析服务", version="1.0") # 存储任务状态:key=task_id, value={status, progress, result, error} task_status: Dict[str, dict] = {} # 存储服务状态 service_status = { "status": "running", "pid": os.getpid(), "start_time": time.time() } # 锁机制,保证多线程安全 task_lock = threading.Lock() # 任务请求模型 class ParseTask(BaseModel): task_id: str file_path: str parse_params: Optional[Dict] = None # 解析参数,如解析类型、阈值等 # ------------------------------ # 核心解析逻辑(模拟真实解析器调用) # ------------------------------ def parse_task_worker(task_id: str, file_path: str, parse_params: Dict): """ 后台解析任务执行函数 """ try: with task_lock: task_status[task_id] = { "status": "running", "progress": 0, "result": None, "error": None } # 模拟解析过程(实际场景替换为真实解析器调用) total_steps = 10 for step in range(total_steps): time.sleep(1) # 模拟解析耗时 progress = (step + 1) * 10 with task_lock: task_status[task_id]["progress"] = progress # 解析完成,模拟返回结果 with task_lock: task_status[task_id]["status"] = "completed" task_status[task_id]["result"] = { "file_path": file_path, "parse_result": "解析成功(模拟结果)", "parse_params": parse_params, "finish_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) } except Exception as e: with task_lock: task_status[task_id]["status"] = "failed" task_status[task_id]["error"] = str(e) # ------------------------------ # 接口定义 # ------------------------------ @app.post("/execute", summary="接收解析任务并执行") async def execute_task(task: ParseTask, background_tasks: BackgroundTasks): """ 接收Java端下发的解析任务,后台异步执行 """ # 检查任务ID是否已存在 if task.task_id in task_status: return { "code": 400, "msg": f"任务ID {task.task_id} 已存在", "data": None } # 提交后台任务执行 background_tasks.add_task(parse_task_worker, task.task_id, task.file_path, task.parse_params) return { "code": 200, "msg": "任务接收成功,已开始执行", "data": {"task_id": task.task_id} } @app.get("/status/{task_id}", summary="查询指定任务状态") async def get_task_status(task_id: str): """ 查询单个任务的执行状态、进度、结果 """ if task_id not in task_status: return { "code": 404, "msg": f"任务ID {task_id} 不存在", "data": None } return { "code": 200, "msg": "查询成功", "data": task_status[task_id] } @app.get("/status", summary="状态接口") async def health_check(): """ 返回实例健康状态、资源使用情况 """ cpu_usage = psutil.cpu_percent(interval=1) # 内存信息 memory = psutil.virtual_memory() memory_usage = memory.percent return { "code": 200, "msg": "success", "data": { "status": 0, "cpu_usage": cpu_usage, "gpu_usage": 0.0, "memory_usage": memory_usage, "gpu_memory": 0.0 } } @app.get("/shutdown", summary="优雅关闭服务(供Java端终结实例调用)") async def shutdown_service(): """ 优雅关闭服务,清理资源 """ service_status["status"] = "stopping" # 可添加清理逻辑:如保存未完成任务、关闭解析器连接等 return { "code": 200, "msg": "服务开始优雅关闭", "data": None } # ------------------------------ # 启动入口 # ------------------------------ def main(): parser = argparse.ArgumentParser(description="Python解析服务启动参数") parser.add_argument("--host", default="0.0.0.0", help="服务监听地址") parser.add_argument("--port", type=int, default=8000, help="服务监听端口") args = parser.parse_args() # 启动FastAPI服务 uvicorn.run( "parse_service:app", host=args.host, port=args.port, reload=False, # 生产环境关闭热重载 workers=1 # 单进程运行,避免任务状态共享问题 ) if __name__ == "__main__": main()