|
|
@@ -1,10 +1,8 @@
|
|
|
-import configparser
|
|
|
import re
|
|
|
from fastapi import APIRouter , UploadFile
|
|
|
from pydantic import BaseModel,Field
|
|
|
from requests import request
|
|
|
from agent.core.sign_check import check
|
|
|
-import logging
|
|
|
import agent.core.dao as dao
|
|
|
from asyncer import asyncify
|
|
|
from agent.core.vector import get_embeddings
|
|
|
@@ -15,12 +13,13 @@ import uuid
|
|
|
from fastapi import BackgroundTasks
|
|
|
from typing import Optional
|
|
|
import json
|
|
|
+from agent.logger import logger
|
|
|
+from agent.core.config import get_config_path
|
|
|
+config = get_config_path()
|
|
|
+TOP_K = config['app']['top_k']
|
|
|
|
|
|
router = APIRouter(prefix="/v1", tags=["AI Tagging"])
|
|
|
|
|
|
-config = configparser.ConfigParser()
|
|
|
-config.read('config.ini')
|
|
|
-TOP_K = config['app']['top_k']
|
|
|
|
|
|
|
|
|
class TaggingRequest(BaseModel):
|
|
|
@@ -35,7 +34,7 @@ async def execute_reg(log_id:str,tag_category_id:str,phrase: str)-> list:
|
|
|
sql = f"""select
|
|
|
tti.id,
|
|
|
tti.reg
|
|
|
- from ai_tagging.aitag_tag_info tti left join ai_tagging.aitag_tag_category ttc
|
|
|
+ from aitag_tag_info tti left join aitag_tag_category ttc
|
|
|
on tti.category_id = ttc.id
|
|
|
where ttc.is_delete=0 and tti.is_delete=0 and ttc.state = 0 and tti.state = 0 and tti.tag_level = ttc.visibility_level
|
|
|
"""
|
|
|
@@ -47,21 +46,22 @@ async def execute_reg(log_id:str,tag_category_id:str,phrase: str)-> list:
|
|
|
try:
|
|
|
for label in labels:
|
|
|
reg = label[1]
|
|
|
- logging.info(f"Executing regex for label_id {label[0]}")
|
|
|
if reg is not None:
|
|
|
pattern = re.compile(reg, re.VERBOSE)
|
|
|
+ logger.info(f"Executing regex for label_id {label[0]}: {reg}")
|
|
|
if pattern.match(phrase):
|
|
|
+ logger.info(f"Executing regex for label_id {label[0]}: {reg} true")
|
|
|
result.append(label[0])
|
|
|
else:
|
|
|
result.append(label[0])
|
|
|
except Exception as e:
|
|
|
- logging.error(f"Regex execution failed: {e}")
|
|
|
- logging.info(f"Regex filtering candidates: {result}")
|
|
|
+ logger.error(f"Regex execution failed: {e}")
|
|
|
+ logger.info(f"Regex filtering candidates: {result}")
|
|
|
dao.execute(
|
|
|
- """UPDATE ai_tagging.aitag_tag_log SET reg_result = %s WHERE id = %s""",
|
|
|
+ """UPDATE aitag_tag_log SET reg_result = %s WHERE id = %s""",
|
|
|
(str(result), log_id)
|
|
|
)
|
|
|
- logging.info(f"Updated reg_result for log_id {id}")
|
|
|
+ logger.info(f"Updated reg_result for log_id {id}")
|
|
|
return result
|
|
|
|
|
|
def vector_similarity_search(phrase: str, ids:list)-> list:
|
|
|
@@ -69,26 +69,26 @@ def vector_similarity_search(phrase: str, ids:list)-> list:
|
|
|
query = get_embeddings([phrase])[0]
|
|
|
results = hybrid_search(ids, query, top_k=TOP_K)
|
|
|
# return [{"id": r["_id"], "score": r["_score"], "tag_prompt": r["_source"]["tag_prompt"],"tag_name": r["_source"]["tag_name"],"tag_code": r["_source"]["tag_code"]} for r in results]
|
|
|
- return [{"id": r["_id"], "tag_remark":r["_source"]["tag_remark"], "tag_prompt": r["_source"]["tag_prompt"],"tag_name": r["_source"]["tag_name"],"tag_code": r["_source"]["tag_code"],"tag_path": r["_source"]["tag_path"] } for r in results]
|
|
|
+ return [{"id": r["_id"], "tag_remark":r["_source"]["tag_remark"], "tag_prompt": r["_source"]["tag_prompt"],"tag_name": r["_source"]["tag_name"],"tag_code": r["_source"]["tag_code"],"tag_path": r["_source"]["tag_path"],"category_id": r["_source"]["category_id"] } for r in results]
|
|
|
|
|
|
def init_tag_log(request: TaggingRequest):
|
|
|
id = uuid.uuid4().hex
|
|
|
# 保证business_attr的唯一性,如果已经存在相同business_attr且状态为0(处理中)的记录,则将is_delete设为1(已删除),然后插入新记录
|
|
|
dao.execute(
|
|
|
- """UPDATE ai_tagging.aitag_tag_log SET is_delete = 1 WHERE business_attr = %s""",
|
|
|
+ """UPDATE aitag_tag_log SET is_delete = 1 WHERE business_attr = %s""",
|
|
|
(request.business_attr,)
|
|
|
)
|
|
|
# 业务编号如果以test开头,则tag_scope = 1,否则都是0
|
|
|
tag_scope = 1 if request.business_attr.startswith("test") else 0
|
|
|
dao.execute(
|
|
|
- """INSERT INTO ai_tagging.aitag_tag_log (id,app_id, insert_time, business_attr, phrase, state, tag_scope) VALUES (%s, %s, %s, %s, %s, %s, %s)""",
|
|
|
+ """INSERT INTO aitag_tag_log (id,app_id, insert_time, business_attr, phrase, state, tag_scope) VALUES (%s, %s, %s, %s, %s, %s, %s)""",
|
|
|
(id,request.app_id, datetime.now(), request.business_attr, request.phrase, 0, tag_scope)
|
|
|
)
|
|
|
return id
|
|
|
|
|
|
def update_tag_log(id:str, result:str):
|
|
|
dao.execute(
|
|
|
- """UPDATE ai_tagging.aitag_tag_log SET state = %s, result = %s WHERE id = %s""",
|
|
|
+ """UPDATE aitag_tag_log SET state = %s, result = %s WHERE id = %s""",
|
|
|
(1, result, id)
|
|
|
)
|
|
|
|
|
|
@@ -96,29 +96,30 @@ async def run_ai_pipeline(log_id: str, tag_category_id: str, phrase: str):
|
|
|
try:
|
|
|
# step1: 正则过滤
|
|
|
result = await execute_reg(log_id,tag_category_id,phrase)
|
|
|
- logging.info(f"[{log_id}] Regex filtering result: {result}")
|
|
|
+ logger.info(f"[{log_id}] Regex filtering result: {result}")
|
|
|
|
|
|
# step2: 向量检索
|
|
|
if result:
|
|
|
result = vector_similarity_search(phrase, result)
|
|
|
- logging.info(f"[{log_id}] Vector search result: {result}")
|
|
|
+ logger.info(f"[{log_id}] Vector search result: {result}")
|
|
|
|
|
|
# step3: LLM 打标
|
|
|
if result:
|
|
|
result = await reflect_check(phrase, result)
|
|
|
result = [r.dict() for r in result.labels]
|
|
|
result = json.dumps(result, ensure_ascii=False)
|
|
|
- logging.info(f"[{log_id}] LLM result: {result}")
|
|
|
+ logger.info(f"[{log_id}] LLM result: {result}")
|
|
|
|
|
|
# step4: 更新数据库
|
|
|
- update_tag_log(log_id, str(result))
|
|
|
+ # 如果result是个空集合,插入None
|
|
|
+ update_tag_log(log_id, result if result else None)
|
|
|
|
|
|
except Exception as e:
|
|
|
- logging.error(f"[{log_id}] Pipeline failed: {e}")
|
|
|
+ logger.error(f"[{log_id}] Pipeline failed: {e}")
|
|
|
|
|
|
@router.post("/tagging")
|
|
|
async def ai_tagging(request: TaggingRequest,background_tasks: BackgroundTasks):
|
|
|
- logging.info(f"app_id: {request.app_id}, timestamp: {request.timestamp}, sign: {request.sign}, business_attr: {request.business_attr}, phrase: {request.phrase}")
|
|
|
+ logger.info(f"app_id: {request.app_id}, timestamp: {request.timestamp}, sign: {request.sign}, business_attr: {request.business_attr}, phrase: {request.phrase}")
|
|
|
# 数据库中插入一条记录,记录请求的app_id、timestamp、business_attr、phrase等信息,状态设为“处理中”,后续步骤完成后更新状态和结果
|
|
|
id = init_tag_log(request)
|
|
|
# 执行异步任务
|
|
|
@@ -128,7 +129,7 @@ async def ai_tagging(request: TaggingRequest,background_tasks: BackgroundTasks):
|
|
|
tag_category_id=request.tag_category_id,
|
|
|
phrase=request.phrase
|
|
|
)
|
|
|
- logging.info(f"Started background task for log_id: {id}")
|
|
|
+ logger.info(f"Started background task for log_id: {id}")
|
|
|
return {
|
|
|
"code": 200,
|
|
|
"message": "submit successful"
|
|
|
@@ -136,10 +137,11 @@ async def ai_tagging(request: TaggingRequest,background_tasks: BackgroundTasks):
|
|
|
|
|
|
@router.get("/query")
|
|
|
def query(business_attr: str):
|
|
|
+ logger.info(f"Querying tag log for business_attr: {business_attr}")
|
|
|
# 查询指定business_attr对应的标签信息
|
|
|
- sql = """SELECT * FROM ai_tagging.aitag_tag_log WHERE business_attr = %s and is_delete = 0"""
|
|
|
+ sql = """SELECT * FROM aitag_tag_log WHERE business_attr = %s and is_delete = 0"""
|
|
|
result = dao.query_dict(sql, (business_attr,))
|
|
|
- print(result)
|
|
|
+ logger.info(f"Query result: {result}")
|
|
|
return {"code": 200, "message": "AI Query Endpoint", "data": result[0] if result else None}
|
|
|
|
|
|
class FeedbackRequest(BaseModel):
|
|
|
@@ -154,10 +156,10 @@ class FeedbackRequest(BaseModel):
|
|
|
|
|
|
@router.post("/feedback")
|
|
|
def ai_feedback(feedback_request: FeedbackRequest):
|
|
|
- logging.info(f"Received feedback request: {feedback_request}")
|
|
|
+ logger.info(f"Received feedback request: {feedback_request}")
|
|
|
# 这里将用户的反馈信息保存到数据库中aitag_tag_log,供后续分析和模型优化使用
|
|
|
dao.execute(
|
|
|
- """update ai_tagging.aitag_tag_log set feedback = %s, feedback_result = %s, feedback_time = %s, feedback_user_id = %s, feedback_user_nm = %s, contract_no = %s, feedback_user_org = %s, feedback_user_endpoint = %s, state = %s where business_attr = %s""",
|
|
|
+ """update aitag_tag_log set feedback = %s, feedback_result = %s, feedback_time = %s, feedback_user_id = %s, feedback_user_nm = %s, contract_no = %s, feedback_user_org = %s, feedback_user_endpoint = %s, state = %s where business_attr = %s""",
|
|
|
(feedback_request.feedback, feedback_request.feedback_result, datetime.now(), feedback_request.user_id, feedback_request.user_nm, feedback_request.contract_no, feedback_request.user_org, feedback_request.user_endpoint, 2, feedback_request.business_attr)
|
|
|
)
|
|
|
return {"code": 200, "message": "Feedback received successfully"}
|