| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import uvicorn
- import time
- import threading
- import argparse
- from fastapi import FastAPI, BackgroundTasks
- from pydantic import BaseModel
- from typing import Dict, Optional, List
- import psutil
- import os
- # 初始化FastAPI应用
- app = FastAPI(title="Python解析服务", version="1.0")
- # 存储任务状态:key=task_id, value={status, progress, result, error}
- task_status: Dict[str, dict] = {}
- # 存储服务状态
- service_status = {
- "status": "running",
- "pid": os.getpid(),
- "start_time": time.time()
- }
- # 锁机制,保证多线程安全
- task_lock = threading.Lock()
- # 任务请求模型
- class ParseTask(BaseModel):
- task_id: str
- file_path: str
- parse_params: Optional[Dict] = None # 解析参数,如解析类型、阈值等
- # ------------------------------
- # 核心解析逻辑(模拟真实解析器调用)
- # ------------------------------
- def parse_task_worker(task_id: str, file_path: str, parse_params: Dict):
- """
- 后台解析任务执行函数
- """
- try:
- with task_lock:
- task_status[task_id] = {
- "status": "running",
- "progress": 0,
- "result": None,
- "error": None
- }
-
- # 模拟解析过程(实际场景替换为真实解析器调用)
- total_steps = 10
- for step in range(total_steps):
- time.sleep(1) # 模拟解析耗时
- progress = (step + 1) * 10
- with task_lock:
- task_status[task_id]["progress"] = progress
-
- # 解析完成,模拟返回结果
- with task_lock:
- task_status[task_id]["status"] = "completed"
- task_status[task_id]["result"] = {
- "file_path": file_path,
- "parse_result": "解析成功(模拟结果)",
- "parse_params": parse_params,
- "finish_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- }
-
- except Exception as e:
- with task_lock:
- task_status[task_id]["status"] = "failed"
- task_status[task_id]["error"] = str(e)
- # ------------------------------
- # 接口定义
- # ------------------------------
- @app.post("/execute", summary="接收解析任务并执行")
- async def execute_task(task: ParseTask, background_tasks: BackgroundTasks):
- """
- 接收Java端下发的解析任务,后台异步执行
- """
- # 检查任务ID是否已存在
- if task.task_id in task_status:
- return {
- "code": 400,
- "msg": f"任务ID {task.task_id} 已存在",
- "data": None
- }
-
- # 提交后台任务执行
- background_tasks.add_task(parse_task_worker, task.task_id, task.file_path, task.parse_params)
-
- return {
- "code": 200,
- "msg": "任务接收成功,已开始执行",
- "data": {"task_id": task.task_id}
- }
- @app.get("/status/{task_id}", summary="查询指定任务状态")
- async def get_task_status(task_id: str):
- """
- 查询单个任务的执行状态、进度、结果
- """
- if task_id not in task_status:
- return {
- "code": 404,
- "msg": f"任务ID {task_id} 不存在",
- "data": None
- }
-
- return {
- "code": 200,
- "msg": "查询成功",
- "data": task_status[task_id]
- }
- @app.get("/status", summary="状态接口")
- async def health_check():
- """
- 返回实例健康状态、资源使用情况
- """
- cpu_usage = psutil.cpu_percent(interval=1)
-
- # 内存信息
- memory = psutil.virtual_memory()
- memory_usage = memory.percent
-
- return {
- "code": 200,
- "msg": "success",
- "data": {
- "status": 0,
- "cpu_usage": cpu_usage,
- "gpu_usage": 0.0,
- "memory_usage": memory_usage,
- "gpu_memory": 0.0
- }
- }
- @app.get("/shutdown", summary="优雅关闭服务(供Java端终结实例调用)")
- async def shutdown_service():
- """
- 优雅关闭服务,清理资源
- """
- service_status["status"] = "stopping"
- # 可添加清理逻辑:如保存未完成任务、关闭解析器连接等
- return {
- "code": 200,
- "msg": "服务开始优雅关闭",
- "data": None
- }
- # ------------------------------
- # 启动入口
- # ------------------------------
- def main():
- parser = argparse.ArgumentParser(description="Python解析服务启动参数")
- parser.add_argument("--host", default="0.0.0.0", help="服务监听地址")
- parser.add_argument("--port", type=int, default=8000, help="服务监听端口")
- args = parser.parse_args()
-
- # 启动FastAPI服务
- uvicorn.run(
- "parse_service:app",
- host=args.host,
- port=args.port,
- reload=False, # 生产环境关闭热重载
- workers=1 # 单进程运行,避免任务状态共享问题
- )
- if __name__ == "__main__":
- main()
|