|
|
@@ -1,4 +1,5 @@
|
|
|
import re
|
|
|
+import threading
|
|
|
from fastapi import APIRouter , UploadFile
|
|
|
from pydantic import BaseModel,Field, field_validator
|
|
|
from requests import request
|
|
|
@@ -6,7 +7,6 @@ from agent.core.sign_check import check
|
|
|
import agent.core.dao as dao
|
|
|
from agent.core.vector import get_embeddings
|
|
|
from agent.core.es import hybrid_search,bm25_vector_search
|
|
|
-from agent.agent import reflect_check
|
|
|
from datetime import datetime
|
|
|
import uuid
|
|
|
from fastapi import BackgroundTasks
|
|
|
@@ -15,12 +15,17 @@ import json
|
|
|
from agent.logger import logger
|
|
|
from agent.core.config import get_config_path
|
|
|
import asyncio
|
|
|
-from agent.core.tagging_state import TAGGING_STATE
|
|
|
+from agent.core.tagging_state import TAGGING_STATE,TAGGING_CHANNEL
|
|
|
import time
|
|
|
+from concurrent.futures import ThreadPoolExecutor
|
|
|
+import requests
|
|
|
+
|
|
|
|
|
|
config = get_config_path()
|
|
|
CONCURRENCE = int(config['app']['concurrence'])
|
|
|
-background_semaphore = asyncio.Semaphore(CONCURRENCE)
|
|
|
+# background_semaphore = threading.BoundedSemaphore(CONCURRENCE)
|
|
|
+executor = ThreadPoolExecutor(max_workers=CONCURRENCE)
|
|
|
+ESB_CALLBACK = config['app']['esb_callback']
|
|
|
|
|
|
router = APIRouter(prefix="/v1", tags=["AI Tagging"])
|
|
|
|
|
|
@@ -56,7 +61,7 @@ class TaggingRequest(BaseModel):
|
|
|
return None
|
|
|
return str(v)
|
|
|
|
|
|
-async def execute_reg(log_id:str,tag_category_id:str,phrase: str)-> list:
|
|
|
+def execute_reg(log_id:str,tag_category_id:str,phrase: str)-> list:
|
|
|
sql = f"""select
|
|
|
tti.id,
|
|
|
tti.reg
|
|
|
@@ -70,18 +75,23 @@ async def execute_reg(log_id:str,tag_category_id:str,phrase: str)-> list:
|
|
|
labels = dao.query(sql)
|
|
|
# 循环调用reg匹配phrase,匹配成功则返回标签id
|
|
|
result = [label[0] for label in labels]
|
|
|
- dao.execute(
|
|
|
- """UPDATE aitag_tag_log SET reg_result = %s WHERE id = %s""",
|
|
|
- (str(result), log_id)
|
|
|
- )
|
|
|
+ if result and len(result) > 0:
|
|
|
+ dao.execute(
|
|
|
+ """UPDATE aitag_tag_log SET reg_result = %s,tagging_channel = %s WHERE id = %s""",
|
|
|
+ (str(result), TAGGING_CHANNEL.REG.value, log_id)
|
|
|
+ )
|
|
|
logger.info(f"[{log_id}] Regex filtering result: {result}")
|
|
|
return result
|
|
|
|
|
|
-def vector_similarity_search(phrase: str)-> list:
|
|
|
+def vector_similarity_search(log_id:str,phrase: str)-> list:
|
|
|
logger.info("Starting vector similarity search...")
|
|
|
# 这里应该调用向量数据库进行相似度检索,返回相关标签id列表
|
|
|
query = get_embeddings([phrase])[0]
|
|
|
results = bm25_vector_search(phrase,query)
|
|
|
+ dao.execute(
|
|
|
+ """UPDATE aitag_tag_log SET tagging_channel = %s WHERE id = %s""",
|
|
|
+ (TAGGING_CHANNEL.VECTOR.value, log_id)
|
|
|
+ )
|
|
|
return results
|
|
|
|
|
|
def init_tag_log(request: TaggingRequest):
|
|
|
@@ -158,15 +168,20 @@ def defined_rule_match(phrase: str):
|
|
|
result = []
|
|
|
return result
|
|
|
|
|
|
-def end_tagging_predefined_rule(id:str, result:str):
|
|
|
+def end_tagging_predefined_rule(id:str, result:str, business_attr: Optional[str] = None):
|
|
|
dao.execute(
|
|
|
- """UPDATE aitag_tag_log SET state = %s, result = %s,ai_result_endtime = %s WHERE id = %s""",
|
|
|
- (TAGGING_STATE.PREDEFINED_RULE_MATCH.value, result,datetime.now(), id)
|
|
|
+ """UPDATE aitag_tag_log SET state = %s, result = %s,ai_result_endtime = %s,tagging_channel = %s WHERE id = %s""",
|
|
|
+ (TAGGING_STATE.PREDEFINED_RULE_MATCH.value, result,datetime.now(), TAGGING_CHANNEL.RULES.value, id)
|
|
|
)
|
|
|
+ if business_attr is not None and ESB_CALLBACK is not None:
|
|
|
+ try:
|
|
|
+ requests.get(ESB_CALLBACK,timeout=3, params={"businessAttr": business_attr})
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"ESB callback failed: {e}")
|
|
|
|
|
|
-async def run_ai_pipeline(log_id: str, tag_category_id: str, phrase: str, instucde: Optional[str] = None):
|
|
|
+def run_ai_pipeline(log_id: str, tag_category_id: str, phrase: str, instucde: Optional[str] = None, business_attr: Optional[str] = None):
|
|
|
try:
|
|
|
- async with background_semaphore:
|
|
|
+ # with background_semaphore:
|
|
|
logger.info(f"开始打标:{log_id}, {phrase}")
|
|
|
# step0: 开始打标,初始化打标日志,如果是在行社白名单中,标记下
|
|
|
is_marine = start_tagging(log_id, instucde)
|
|
|
@@ -175,20 +190,21 @@ async def run_ai_pipeline(log_id: str, tag_category_id: str, phrase: str, instuc
|
|
|
defined_rule_result = defined_rule_match(phrase)
|
|
|
if defined_rule_result:
|
|
|
logger.info(f"预设规则匹配成功,直接返回结果: {defined_rule_result}")
|
|
|
- end_tagging_predefined_rule(log_id, json.dumps(defined_rule_result))
|
|
|
+ end_tagging_predefined_rule(log_id, json.dumps(defined_rule_result, business_attr))
|
|
|
return
|
|
|
|
|
|
# step1: 正则过滤
|
|
|
- result = await execute_reg(log_id,tag_category_id,phrase)
|
|
|
+ result = execute_reg(log_id,tag_category_id,phrase)
|
|
|
# step2: 向量检索
|
|
|
if not result or len(result) == 0:
|
|
|
- result = vector_similarity_search(phrase)
|
|
|
+ result = vector_similarity_search(log_id,phrase)
|
|
|
# step3: LLM 打标
|
|
|
if result and len(result) > 0:
|
|
|
try:
|
|
|
tags = dao.query_dict(""" select id,tag_nm as tag_name,tag_code, tag_path,category_id,tag_prompt from aitag_tag_info where id in %s """, (tuple(result),))
|
|
|
logger.info(f"筛选结果: {tags}")
|
|
|
- result = await reflect_check(phrase,is_marine, tags)
|
|
|
+ from agent.agent import reflect_check_sync
|
|
|
+ result = reflect_check_sync(phrase,is_marine, tags)
|
|
|
except Exception as e:
|
|
|
logger.error(f"LLM reflection check failed: {e}")
|
|
|
result = None
|
|
|
@@ -222,13 +238,14 @@ async def ai_tagging(request: TaggingRequest,background_tasks: BackgroundTasks):
|
|
|
logger.info(f"esb_seq_no: {request.esb_seq_no}, business_attr: {request.business_attr}, phrase: {request.phrase}")
|
|
|
# 数据库中插入一条记录,记录请求的app_id、timestamp、business_attr、phrase等信息,状态设为“处理中”,后续步骤完成后更新状态和结果
|
|
|
id = init_tag_log(request)
|
|
|
- # 执行异步任务
|
|
|
- background_tasks.add_task(
|
|
|
- run_ai_pipeline, # 后台任务函数
|
|
|
+ # 执行异步任务:直接调用同步的 run_ai_pipeline,FastAPI 会在后台线程池中执行
|
|
|
+ future = executor.submit(
|
|
|
+ run_ai_pipeline, # 直接使用同步的 run_ai_pipeline
|
|
|
log_id=id,
|
|
|
tag_category_id=request.tag_category_id,
|
|
|
phrase=request.phrase,
|
|
|
- instucde=request.instucde
|
|
|
+ instucde=request.instucde,
|
|
|
+ business_attr=request.business_attr
|
|
|
)
|
|
|
logger.info(f"Started background task for log_id: {id}")
|
|
|
return {
|