jiayongqiang преди 4 седмици
родител
ревизия
aa491a1105
променени са 9 файла, в които са добавени 88 реда и са изтрити 35 реда
  1. 31 2
      agent/pdm.lock
  2. 1 0
      agent/pyproject.toml
  3. 1 0
      agent/src/agent/agent.py
  4. 8 12
      agent/src/agent/api_inner.py
  5. 16 15
      agent/src/agent/api_outter.py
  6. 25 0
      agent/src/agent/logger.py
  7. 3 3
      agent/src/agent/main.py
  8. 1 1
      agent/tests/test_query.py
  9. 2 2
      agent/tests/test_tagging.py

+ 31 - 2
agent/pdm.lock

@@ -5,7 +5,7 @@
 groups = ["default"]
 strategy = ["inherit_metadata"]
 lock_version = "4.5.0"
-content_hash = "sha256:6733268f6b19c3ef998cc0e93e0c7287e601b0ce3a4de4d63bcb10731f9ab579"
+content_hash = "sha256:6a1a545a1c99640c343310f0f13ce6066b537cd47b5ed4701c17c2b07a3be751"
 
 [[metadata.targets]]
 requires_python = "==3.10.*"
@@ -132,7 +132,7 @@ version = "0.4.6"
 requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
 summary = "Cross-platform colored terminal text."
 groups = ["default"]
-marker = "platform_system == \"Windows\" and python_version == \"3.10\""
+marker = "sys_platform == \"win32\" and python_version == \"3.10\" or platform_system == \"Windows\" and python_version == \"3.10\""
 files = [
     {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
     {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
@@ -470,6 +470,23 @@ files = [
 ]
 
 [[package]]
+name = "loguru"
+version = "0.7.3"
+requires_python = "<4.0,>=3.5"
+summary = "Python logging made (stupidly) simple"
+groups = ["default"]
+marker = "python_version == \"3.10\""
+dependencies = [
+    "aiocontextvars>=0.2.0; python_version < \"3.7\"",
+    "colorama>=0.3.4; sys_platform == \"win32\"",
+    "win32-setctime>=1.0.0; sys_platform == \"win32\"",
+]
+files = [
+    {file = "loguru-0.7.3-py3-none-any.whl", hash = "sha256:31a33c10c8e1e10422bfd431aeb5d351c7cf7fa671e3c4df004162264b28220c"},
+    {file = "loguru-0.7.3.tar.gz", hash = "sha256:19480589e77d47b8d85b2c827ad95d49bf31b0dcde16593892eb51dd18706eb6"},
+]
+
+[[package]]
 name = "numpy"
 version = "2.2.6"
 requires_python = ">=3.10"
@@ -886,6 +903,18 @@ files = [
 ]
 
 [[package]]
+name = "win32-setctime"
+version = "1.2.0"
+requires_python = ">=3.5"
+summary = "A small Python utility to set file creation time on Windows"
+groups = ["default"]
+marker = "sys_platform == \"win32\" and python_version == \"3.10\""
+files = [
+    {file = "win32_setctime-1.2.0-py3-none-any.whl", hash = "sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390"},
+    {file = "win32_setctime-1.2.0.tar.gz", hash = "sha256:ae1fdf948f5640aae05c511ade119313fb6a30d7eabe25fef9764dca5873c4c0"},
+]
+
+[[package]]
 name = "xxhash"
 version = "3.6.0"
 requires_python = ">=3.7"

+ 1 - 0
agent/pyproject.toml

@@ -15,6 +15,7 @@ dependencies = [
     "numpy>=2.2.6",
     "asyncer>=0.0.17",
     "psycopg2-binary>=2.9.11",
+    "loguru>=0.7.3",
 ]
 requires-python = ">=3.10"
 readme = "README.md"

+ 1 - 0
agent/src/agent/agent.py

@@ -28,6 +28,7 @@ class Lable(BaseModel):
     tag_code: str = Field(description="标签编码")
     tag_name: str = Field(description="标签名称")
     tag_path: str = Field(description="标签路径")
+    category_id: str = Field(description="标签类别ID")
     desc: str = Field(description="给出简要的理由说明该标签为何被保留或剔除。")
     passr:bool = Field(description="是否保留该标签,true表示保留,false表示剔除。")
 

+ 8 - 12
agent/src/agent/api_inner.py

@@ -5,9 +5,9 @@ from agent.core import es, dao
 from agent.core.vector import get_embeddings
 import configparser
 from fastapi import BackgroundTasks
-import logging
 import agent.agent as agent
 from fastapi import BackgroundTasks
+from agent.logger import logger
 
 
 router = APIRouter(prefix="/v1", tags=["平台内部接口"])
@@ -48,7 +48,8 @@ def load_tag_2_es(tag_ids: list[str]):
                     tti.tag_path,
                     ttc.category_nm,
                     ttc.category_code,
-                    tti.tag_prompt
+                    tti.tag_prompt,
+                    tti.category_id
                     from ai_tagging.aitag_tag_info tti left join ai_tagging.aitag_tag_category  ttc 
                     on tti.category_id = ttc.id 
                     where ttc.is_delete=0 and tti.is_delete=0 and ttc.state =  0 and tti.state = 0 and tti.tag_level = ttc.visibility_level
@@ -63,12 +64,14 @@ def load_tag_2_es(tag_ids: list[str]):
         "tag_remark": label[3],
         "tag_reg": label[4],
         "tag_prompt": label[9],
+        "category_id":label[10],
         "tag_vector": get_embeddings([(label[6] or '')+(label[4] or '')])[0] 
     } for label in labels])
     return labels
 
 @router.post("/synchronize_tag")
 def synchronize_tag(request: SynchronizeTagRequest):
+    logger.info(f"Received request to synchronize tags: {request.tag_ids}")
     load_tag_2_es(request.tag_ids)
     return {
         "code": 200,
@@ -77,6 +80,7 @@ def synchronize_tag(request: SynchronizeTagRequest):
 
 @router.post("/delete_tag")
 def delete_tag(request: SynchronizeTagRequest):
+    logger.info(f"Received request to delete tags: {request.tag_ids}")
     for tag_id in request.tag_ids:
         es.delete_document(tag_id)
     return {
@@ -84,27 +88,19 @@ def delete_tag(request: SynchronizeTagRequest):
         "message": "delete_tag successful"
     }   
 
-
 class GenerateRegRequest(BaseModel):
     tag_name: str = Field(..., description="标签名称")
     tag_remark: str = Field(..., description="标签定义")
 
 async def generate_and_update_reg(tag_name: str, tag_remark: str):
     reg = await agent.generate_reg(tag_name,tag_remark)
-    logging.info(f"Generated reg for tag {tag_name}: {reg}")
     reg = reg.replace('))(', ')).*(')
-    logging.info(f"Generated reg for tag {tag_name}: {reg}")
-    logging.info(f"Updated reg in database for tag {tag_name}: {reg}")
+    logger.info(f"Generated reg for tag {tag_name}: {reg}")
     return reg
 
 @router.post("/generate_reg")
 async def generate_reg(request: GenerateRegRequest, background_tasks: BackgroundTasks):
-    print(f"Received request to generate reg for tag remark: {request.tag_remark}")
-    # background_tasks.add_task(
-    #     generate_and_update_reg,  # 后台任务函数
-    #     tag_name=request.tag_name,
-    #     tag_remark=request.tag_remark
-    # )
+    logger.info(f"Received request to generate reg for tag: {request.tag_name}, remark: {request.tag_remark}")
     reg = await generate_and_update_reg(request.tag_name, request.tag_remark)
     return {
         "code": 200,

+ 16 - 15
agent/src/agent/api_outter.py

@@ -4,7 +4,6 @@ from fastapi import APIRouter , UploadFile
 from pydantic import BaseModel,Field
 from requests import request
 from agent.core.sign_check import check
-import logging
 import agent.core.dao as dao
 from asyncer import asyncify
 from agent.core.vector import get_embeddings
@@ -15,6 +14,7 @@ import uuid
 from fastapi import BackgroundTasks
 from typing import Optional
 import json
+from agent.logger import logger
 
 router = APIRouter(prefix="/v1", tags=["AI Tagging"])
 
@@ -47,7 +47,6 @@ async def execute_reg(log_id:str,tag_category_id:str,phrase: str)-> list:
     try:
         for label in labels:
             reg = label[1]
-            logging.info(f"Executing regex for label_id {label[0]}")
             if reg is not None:
                 pattern = re.compile(reg, re.VERBOSE)
                 if pattern.match(phrase):
@@ -55,13 +54,13 @@ async def execute_reg(log_id:str,tag_category_id:str,phrase: str)-> list:
             else:
                 result.append(label[0])
     except Exception as e:
-        logging.error(f"Regex execution failed: {e}")
-    logging.info(f"Regex filtering candidates: {result}")
+        logger.error(f"Regex execution failed: {e}")
+    logger.info(f"Regex filtering candidates: {result}")
     dao.execute(
             """UPDATE ai_tagging.aitag_tag_log SET reg_result = %s WHERE id = %s""",
             (str(result), log_id)
         )
-    logging.info(f"Updated reg_result for log_id {id}")
+    logger.info(f"Updated reg_result for log_id {id}")
     return result
 
 def vector_similarity_search(phrase: str, ids:list)-> list:
@@ -69,7 +68,7 @@ def vector_similarity_search(phrase: str, ids:list)-> list:
     query = get_embeddings([phrase])[0]
     results = hybrid_search(ids, query, top_k=TOP_K)
     # return [{"id": r["_id"], "score": r["_score"], "tag_prompt": r["_source"]["tag_prompt"],"tag_name": r["_source"]["tag_name"],"tag_code": r["_source"]["tag_code"]} for r in results]
-    return [{"id": r["_id"], "tag_remark":r["_source"]["tag_remark"], "tag_prompt": r["_source"]["tag_prompt"],"tag_name": r["_source"]["tag_name"],"tag_code": r["_source"]["tag_code"],"tag_path": r["_source"]["tag_path"]   } for r in results]
+    return [{"id": r["_id"], "tag_remark":r["_source"]["tag_remark"], "tag_prompt": r["_source"]["tag_prompt"],"tag_name": r["_source"]["tag_name"],"tag_code": r["_source"]["tag_code"],"tag_path": r["_source"]["tag_path"],"category_id": r["_source"]["category_id"]   } for r in results]
 
 def init_tag_log(request: TaggingRequest):
     id = uuid.uuid4().hex
@@ -96,29 +95,30 @@ async def run_ai_pipeline(log_id: str, tag_category_id: str, phrase: str):
     try:
         # step1: 正则过滤
         result = await execute_reg(log_id,tag_category_id,phrase)
-        logging.info(f"[{log_id}] Regex filtering result: {result}")
+        logger.info(f"[{log_id}] Regex filtering result: {result}")
 
         # step2: 向量检索
         if result:
             result = vector_similarity_search(phrase, result)
-            logging.info(f"[{log_id}] Vector search result: {result}")
+            logger.info(f"[{log_id}] Vector search result: {result}")
 
         # step3: LLM 打标
         if result:
             result = await reflect_check(phrase, result)
             result =  [r.dict() for r in result.labels]
             result = json.dumps(result, ensure_ascii=False)
-            logging.info(f"[{log_id}] LLM result: {result}")
+            logger.info(f"[{log_id}] LLM result: {result}")
 
         # step4: 更新数据库
-        update_tag_log(log_id, str(result))
+        # 如果result是个空集合,插入None
+        update_tag_log(log_id, result if result else None)
             
     except Exception as e:
-        logging.error(f"[{log_id}] Pipeline failed: {e}")
+        logger.error(f"[{log_id}] Pipeline failed: {e}")
 
 @router.post("/tagging")
 async def ai_tagging(request: TaggingRequest,background_tasks: BackgroundTasks):
-    logging.info(f"app_id: {request.app_id}, timestamp: {request.timestamp}, sign: {request.sign}, business_attr: {request.business_attr}, phrase: {request.phrase}")
+    logger.info(f"app_id: {request.app_id}, timestamp: {request.timestamp}, sign: {request.sign}, business_attr: {request.business_attr}, phrase: {request.phrase}")
     # 数据库中插入一条记录,记录请求的app_id、timestamp、business_attr、phrase等信息,状态设为“处理中”,后续步骤完成后更新状态和结果
     id = init_tag_log(request)
     # 执行异步任务
@@ -128,7 +128,7 @@ async def ai_tagging(request: TaggingRequest,background_tasks: BackgroundTasks):
         tag_category_id=request.tag_category_id,    
         phrase=request.phrase
     )
-    logging.info(f"Started background task for log_id: {id}")
+    logger.info(f"Started background task for log_id: {id}")
     return {
         "code": 200,
         "message": "submit successful"
@@ -136,10 +136,11 @@ async def ai_tagging(request: TaggingRequest,background_tasks: BackgroundTasks):
 
 @router.get("/query")
 def query(business_attr: str):
+    logger.info(f"Querying tag log for business_attr: {business_attr}")
     # 查询指定business_attr对应的标签信息
     sql = """SELECT * FROM ai_tagging.aitag_tag_log WHERE business_attr = %s and is_delete = 0"""
     result = dao.query_dict(sql, (business_attr,))
-    print(result)
+    logger.info(f"Query result: {result}")
     return {"code": 200, "message": "AI Query Endpoint", "data": result[0] if result else None}
 
 class FeedbackRequest(BaseModel):
@@ -154,7 +155,7 @@ class FeedbackRequest(BaseModel):
 
 @router.post("/feedback")
 def ai_feedback(feedback_request: FeedbackRequest):
-    logging.info(f"Received feedback request: {feedback_request}")
+    logger.info(f"Received feedback request: {feedback_request}")
     # 这里将用户的反馈信息保存到数据库中aitag_tag_log,供后续分析和模型优化使用
     dao.execute(
         """update ai_tagging.aitag_tag_log set feedback = %s, feedback_result = %s, feedback_time = %s, feedback_user_id = %s, feedback_user_nm = %s, contract_no = %s, feedback_user_org = %s, feedback_user_endpoint = %s, state = %s where business_attr = %s""",   

+ 25 - 0
agent/src/agent/logger.py

@@ -0,0 +1,25 @@
+from loguru import logger
+import sys
+
+logger.remove()
+
+logger.add(
+    sys.stderr,
+    format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
+    level="INFO"
+)
+
+
+logger.add(
+    "logs/ai-tagging.log",
+    rotation="00:00",        # 每天午夜轮转
+    retention="30 days",      # 保留 7 天
+    compression="zip",       # 轮转后压缩(可选)
+    encoding="utf-8",
+    level="DEBUG",
+    format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
+    enqueue=True,            # 异步安全(多进程/线程安全)
+)
+
+# 导出 logger 实例供其他模块使用
+__all__ = ["logger"]

+ 3 - 3
agent/src/agent/main.py

@@ -1,9 +1,9 @@
 from fastapi import FastAPI,APIRouter   
 from agent.api_outter import router as outter_router
 from agent.api_inner import router as inner_router  
-import logging
-logging.basicConfig(level=logging.INFO, force=True,format='%(asctime)s - %(levelname)s - %(message)s')
-logging.info("app starting!")
+from agent.logger import logger
+
+logger.info("ai-tagging starting!")
 
 api_router = APIRouter()
 api_router.include_router(inner_router,prefix="/admin")

+ 1 - 1
agent/tests/test_query.py

@@ -1,4 +1,4 @@
 import requests
 
-result = requests.get("http://localhost:9876/api/aitag/v1/query?business_attr=test_attr")
+result = requests.get("http://10.192.72.13:9876/api/aitag/v1/query?business_attr=17f02e5c92cb41c6ae3e835bcefb3523")
 print(result.text)

+ 2 - 2
agent/tests/test_tagging.py

@@ -3,12 +3,12 @@ import logging
 logging.basicConfig(level=logging.INFO, force=True,format='%(asctime)s - %(levelname)s - %(message)s')
 logging.info("app starting!")
 
-res = requests.post("http://localhost:9876/api/aitag/v1/tagging", json={
+res = requests.post("http://10.192.72.13:9876/api/aitag/v1/tagging", json={
     # "app_id": "test_app",
     # "timestamp": 1234567890,
     # "sign": "test_sign",
     "business_attr": "test_attr",
-    "phrase": "职业:水产养殖人员 投向:海水养殖 用途:个人经营"
+    "phrase": "职业:水产养殖人员 投向:海水养殖 用途:渔排养殖"
 })
 logging.info(res.text)