Procházet zdrojové kódy

Add parse service code

zsh před 3 týdny
revize
ca7536201d
4 změnil soubory, kde provedl 211 přidání a 0 odebrání
  1. 2 0
      .gitignore
  2. 34 0
      dockerfile
  3. 170 0
      parse_service.py
  4. 5 0
      requirements.txt

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+parse/
+__pycache__/

+ 34 - 0
dockerfile

@@ -0,0 +1,34 @@
+# 基础镜像:使用Python 3.12-alpine
+FROM python:3.12-alpine
+
+# 设置工作目录
+WORKDIR /app
+
+
+COPY requirements.txt .
+
+# 设置环境变量
+ENV PYTHONUNBUFFERED=1 \
+    PYTHONDONTWRITEBYTECODE=1 \
+    PIP_NO_CACHE_DIR=off \
+    PIP_DISABLE_PIP_VERSION_CHECK=on
+
+# 安装编译依赖并编译安装所有需要的包
+RUN apk add --no-cache \
+    gcc \
+    musl-dev \
+    linux-headers \
+    && pip install -i https://mirrors.aliyun.com/pypi/simple/ psutil==5.9.5 \
+    # 注意:在这里安装所有需要编译的依赖
+    && pip install -i https://mirrors.aliyun.com/pypi/simple/ -r requirements.txt \
+    # 最后清理编译工具
+    && apk del gcc musl-dev linux-headers
+
+# 复制服务代码
+COPY parse_service.py .
+
+# 暴露服务端口
+EXPOSE 8000
+
+# 启动命令
+CMD ["python", "parse_service.py", "--host", "0.0.0.0", "--port", "8000"]

+ 170 - 0
parse_service.py

@@ -0,0 +1,170 @@
+import uvicorn
+import time
+import threading
+import argparse
+from fastapi import FastAPI, BackgroundTasks
+from pydantic import BaseModel
+from typing import Dict, Optional, List
+import psutil
+import os
+
+# 初始化FastAPI应用
+app = FastAPI(title="Python解析服务", version="1.0")
+
+# 存储任务状态:key=task_id, value={status, progress, result, error}
+task_status: Dict[str, dict] = {}
+# 存储服务状态
+service_status = {
+    "status": "running",
+    "pid": os.getpid(),
+    "start_time": time.time()
+}
+# 锁机制,保证多线程安全
+task_lock = threading.Lock()
+
+# 任务请求模型
+class ParseTask(BaseModel):
+    task_id: str
+    file_path: str
+    parse_params: Optional[Dict] = None  # 解析参数,如解析类型、阈值等
+
+# ------------------------------
+# 核心解析逻辑(模拟真实解析器调用)
+# ------------------------------
+def parse_task_worker(task_id: str, file_path: str, parse_params: Dict):
+    """
+    后台解析任务执行函数
+    """
+    try:
+        with task_lock:
+            task_status[task_id] = {
+                "status": "running",
+                "progress": 0,
+                "result": None,
+                "error": None
+            }
+        
+        # 模拟解析过程(实际场景替换为真实解析器调用)
+        total_steps = 10
+        for step in range(total_steps):
+            time.sleep(1)  # 模拟解析耗时
+            progress = (step + 1) * 10
+            with task_lock:
+                task_status[task_id]["progress"] = progress
+        
+        # 解析完成,模拟返回结果
+        with task_lock:
+            task_status[task_id]["status"] = "completed"
+            task_status[task_id]["result"] = {
+                "file_path": file_path,
+                "parse_result": "解析成功(模拟结果)",
+                "parse_params": parse_params,
+                "finish_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
+            }
+    
+    except Exception as e:
+        with task_lock:
+            task_status[task_id]["status"] = "failed"
+            task_status[task_id]["error"] = str(e)
+
+# ------------------------------
+# 接口定义
+# ------------------------------
+@app.post("/execute", summary="接收解析任务并执行")
+async def execute_task(task: ParseTask, background_tasks: BackgroundTasks):
+    """
+    接收Java端下发的解析任务,后台异步执行
+    """
+    # 检查任务ID是否已存在
+    if task.task_id in task_status:
+        return {
+            "code": 400,
+            "msg": f"任务ID {task.task_id} 已存在",
+            "data": None
+        }
+    
+    # 提交后台任务执行
+    background_tasks.add_task(parse_task_worker, task.task_id, task.file_path, task.parse_params)
+    
+    return {
+        "code": 200,
+        "msg": "任务接收成功,已开始执行",
+        "data": {"task_id": task.task_id}
+    }
+
+@app.get("/status/{task_id}", summary="查询指定任务状态")
+async def get_task_status(task_id: str):
+    """
+    查询单个任务的执行状态、进度、结果
+    """
+    if task_id not in task_status:
+        return {
+            "code": 404,
+            "msg": f"任务ID {task_id} 不存在",
+            "data": None
+        }
+    
+    return {
+        "code": 200,
+        "msg": "查询成功",
+        "data": task_status[task_id]
+    }
+
+
+@app.get("/status", summary="状态接口")
+async def health_check():
+    """
+    返回实例健康状态、资源使用情况
+    """
+    cpu_usage = psutil.cpu_percent(interval=1)
+    
+    # 内存信息
+    memory = psutil.virtual_memory()
+    memory_usage = memory.percent
+    
+    return {
+    "code": 200,
+    "msg": "success",
+    "data": {
+        "status": 0,
+        "cpu_usage": cpu_usage,
+        "gpu_usage": 0.0,
+        "memory_usage": memory_usage,
+        "gpu_memory": 0.0
+    }
+}
+
+
+@app.get("/shutdown", summary="优雅关闭服务(供Java端终结实例调用)")
+async def shutdown_service():
+    """
+    优雅关闭服务,清理资源
+    """
+    service_status["status"] = "stopping"
+    # 可添加清理逻辑:如保存未完成任务、关闭解析器连接等
+    return {
+        "code": 200,
+        "msg": "服务开始优雅关闭",
+        "data": None
+    }
+
+# ------------------------------
+# 启动入口
+# ------------------------------
+def main():
+    parser = argparse.ArgumentParser(description="Python解析服务启动参数")
+    parser.add_argument("--host", default="0.0.0.0", help="服务监听地址")
+    parser.add_argument("--port", type=int, default=8000, help="服务监听端口")
+    args = parser.parse_args()
+    
+    # 启动FastAPI服务
+    uvicorn.run(
+        "parse_service:app",
+        host=args.host,
+        port=args.port,
+        reload=False,  # 生产环境关闭热重载
+        workers=1  # 单进程运行,避免任务状态共享问题
+    )
+
+if __name__ == "__main__":
+    main()

+ 5 - 0
requirements.txt

@@ -0,0 +1,5 @@
+fastapi==0.104.1
+uvicorn==0.24.0
+pydantic==2.4.2
+#psutil==5.9.6
+python-multipart==0.0.6