|
|
@@ -22,7 +22,7 @@ import requests
|
|
|
|
|
|
|
|
|
config = get_config_path()
|
|
|
-CONCURRENCE = int(config['app']['concurrence'])
|
|
|
+CONCURRENCE = config['app'].getint("concurrence", 1)
|
|
|
# background_semaphore = threading.BoundedSemaphore(CONCURRENCE)
|
|
|
executor = ThreadPoolExecutor(max_workers=CONCURRENCE)
|
|
|
ESB_CALLBACK = config['app']['esb_callback']
|
|
|
@@ -65,7 +65,7 @@ def reg_match(query:str, reglist:list[any]):
|
|
|
result = []
|
|
|
for id,reg in reglist:
|
|
|
try:
|
|
|
- if re.search(reg, query):
|
|
|
+ if re.search(reg, query,re.DOTALL | re.MULTILINE):
|
|
|
result.append(id)
|
|
|
except re.error:
|
|
|
continue
|
|
|
@@ -95,23 +95,23 @@ def execute_reg(log_id:str,tag_category_id:str,phrase: str)-> list:
|
|
|
logger.info(f"[{log_id}] Regex filtering result: {result}")
|
|
|
return result
|
|
|
|
|
|
-def vector_similarity_search(log_id:str,phrase: str,tag_ids:list[str]=None)-> list:
|
|
|
+def vector_similarity_search(log_id:str,phrase: str,tag_ids:list[str]=None):
|
|
|
logger.info("Starting vector similarity search...")
|
|
|
# 这里应该调用向量数据库进行相似度检索,返回相关标签id列表
|
|
|
l1 = time.time()
|
|
|
- query = get_embeddings([phrase])[0]
|
|
|
+ query = get_embeddings([f"{phrase}"])[0]
|
|
|
l2 = time.time()
|
|
|
logger.info(f"[{log_id}] Vector embedding time: {l2-l1}")
|
|
|
l3 = time.time()
|
|
|
rrf_score_threshold = float(config['es'].get('rrf_score_threshold',0.016))
|
|
|
- results = bm25_vector_search(phrase,query,tag_ids=tag_ids,rrf_score_threshold=rrf_score_threshold)
|
|
|
+ results,full_results = bm25_vector_search(phrase,query,tag_ids=tag_ids,rrf_score_threshold=rrf_score_threshold)
|
|
|
l4 = time.time()
|
|
|
logger.info(f"[{log_id}] Vector search time: {l4-l3}")
|
|
|
dao.execute(
|
|
|
"""UPDATE aitag_tag_log SET tagging_channel = %s WHERE id = %s""",
|
|
|
(TAGGING_CHANNEL.VECTOR.value, log_id)
|
|
|
)
|
|
|
- return results
|
|
|
+ return results,full_results
|
|
|
|
|
|
def init_tag_log(request: TaggingRequest):
|
|
|
id = uuid.uuid4().hex
|
|
|
@@ -195,20 +195,22 @@ order by r.tag_type,r.tag_nm,r.defined_rule desc"""
|
|
|
if rules and len(rules) > 0:
|
|
|
for matched in rules:
|
|
|
try:
|
|
|
- if re.search(matched[2], phrase):
|
|
|
+ if re.search(matched[2], phrase,re.DOTALL | re.MULTILINE):
|
|
|
tag_info = dao.query("""select ati.id,ati.category_id, ati.tag_nm, ati.tag_path,ati.tag_code from aitag_tag_info ati left join aitag_tag_category atc on ati.category_id = atc.id where ati.tag_nm = %s and ati.is_delete = 0 and atc.category_code = %s""", (matched[1], matched[0]))
|
|
|
# 安全检查:只有当 tag_info 有数据时才加入结果
|
|
|
- if tag_info and len(tag_info) > 0 and tag_info[0][0] not in seen_ids:
|
|
|
- seen_ids.add(tag_info[0][0])
|
|
|
- result.append({
|
|
|
- "id": tag_info[0][0],
|
|
|
- "desc": generate_html(phrase, matched[2], tag_info[0][2]),
|
|
|
- "passr": True,
|
|
|
- "tag_code": tag_info[0][4],
|
|
|
- "tag_name": tag_info[0][2],
|
|
|
- "tag_path": tag_info[0][3],
|
|
|
- "category_id": tag_info[0][1]
|
|
|
- })
|
|
|
+ if tag_info and len(tag_info) > 0:
|
|
|
+ for ti in tag_info:
|
|
|
+ if ti[0] not in seen_ids:
|
|
|
+ seen_ids.add(ti[0])
|
|
|
+ result.append({
|
|
|
+ "id": ti[0],
|
|
|
+ "desc": generate_html(phrase, matched[2], ti[2]),
|
|
|
+ "passr": True,
|
|
|
+ "tag_code": ti[4],
|
|
|
+ "tag_name": ti[2],
|
|
|
+ "tag_path": ti[3],
|
|
|
+ "category_id": ti[1]
|
|
|
+ })
|
|
|
except Exception as e:
|
|
|
logger.error(f"Defined rule match failed 1: {e}")
|
|
|
except Exception as e:
|
|
|
@@ -236,32 +238,32 @@ def run_ai_pipeline(log_id: str, tag_category_id: str, phrase: str, instucde: Op
|
|
|
|
|
|
# step0.5: 预设规则匹配,如果匹配成功则直接更新结果并结束打标流程
|
|
|
defined_rule_result = defined_rule_match(phrase)
|
|
|
- if defined_rule_result:
|
|
|
- logger.info(f"预设规则匹配成功,直接返回结果: {defined_rule_result}")
|
|
|
- end_tagging_predefined_rule(log_id, json.dumps(defined_rule_result),business_attr)
|
|
|
- return
|
|
|
+ # if defined_rule_result:
|
|
|
+ # logger.info(f"预设规则匹配成功,直接返回结果: {defined_rule_result}")
|
|
|
+ # end_tagging_predefined_rule(log_id, json.dumps(defined_rule_result),business_attr)
|
|
|
+ # return
|
|
|
|
|
|
# step1: 正则过滤
|
|
|
result = execute_reg(log_id,tag_category_id,phrase)
|
|
|
logger.info(f"正则过滤结果: {result}")
|
|
|
+
|
|
|
# step2: 向量检索
|
|
|
- # if not result or len(result) == 0 or len(result) >TOP_N: # 正则过滤结果过多或没有结果都进行向量检索,避免正则规则不完善导致的漏匹配问题,同时也避免正则规则过于宽泛导致的过多匹配问题
|
|
|
- v_result = vector_similarity_search(log_id,phrase)
|
|
|
+ v_result,full_results = vector_similarity_search(log_id,phrase)
|
|
|
logger.info(f"向量检索结果: {v_result}")
|
|
|
# step2.5: 合并结果,取交集优先,交集为空则取并集
|
|
|
if result and len(result) > 0:
|
|
|
v_result1 = list(set(result) & set(v_result)) # 取交集,既满足正则规则又满足向量相似度的标签,优先级更高
|
|
|
if v_result1 and len(v_result1) > 0:
|
|
|
- result = v_result1
|
|
|
- logger.info(f"交集结果: {v_result1}")
|
|
|
+ result = v_result
|
|
|
else:
|
|
|
result = list(set(result) | set(v_result)) # 取并集,满足正则规则或者满足向量相似度的标签
|
|
|
if result and len(result) > TOP_N:
|
|
|
- result = vector_similarity_search(log_id,phrase,tag_ids=result) # 如果合并后结果过多,则再次进行向量检索过滤一次
|
|
|
+ result,full_results = vector_similarity_search(log_id,phrase,tag_ids=result) # 如果合并后结果过多,则再次进行向量检索过滤一次
|
|
|
logger.info(f"并集后再次向量检索结果: {result}")
|
|
|
else:
|
|
|
result = v_result
|
|
|
logger.info(f"最终候选结果: {result}")
|
|
|
+ logger.info(f"排序结果: {full_results}")
|
|
|
# step3: LLM 打标
|
|
|
if result and len(result) > 0:
|
|
|
try:
|
|
|
@@ -269,14 +271,20 @@ def run_ai_pipeline(log_id: str, tag_category_id: str, phrase: str, instucde: Op
|
|
|
logger.info(f"筛选结果: {tags}")
|
|
|
from agent.agent import reflect_check_sync
|
|
|
result, x_input = reflect_check_sync(phrase,is_marine, tags)
|
|
|
+ if result and len(result) > 0:
|
|
|
+ for item in result:
|
|
|
+ _id = item["id"]
|
|
|
+ if _id in full_results:
|
|
|
+ item["rank"] = full_results[_id]["rank"]
|
|
|
+ item["score"] = full_results[_id]["score"]
|
|
|
except Exception as e:
|
|
|
logger.error(f"LLM reflection check failed: {e}")
|
|
|
result = None
|
|
|
fail_tagging(log_id)
|
|
|
return
|
|
|
# step4: 更新数据库
|
|
|
- # 如果result是个空集合,插入None
|
|
|
- end_tagging(log_id, result if result else None,x_input)
|
|
|
+ merged_list = list({item["id"]: item for item in result + defined_rule_result}.values())
|
|
|
+ end_tagging(log_id, json.dumps(merged_list, ensure_ascii=False),x_input)
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"[{log_id}] Pipeline failed: {e}")
|