jiayongqiang 2 долоо хоног өмнө
parent
commit
ba429d06f1

+ 3 - 1
agent/config.ini

@@ -16,14 +16,16 @@ api_key =
 model = Qwen3-Embedding-8B
 base_url = http://10.192.72.11:18081/v1/embeddings
 api_key =
+default_dims = 4096
+enable_config = false
 
 [es]
 url = http://10.192.72.13:9200
-vecotr_dims = 4096
 
 [app]
 top_k = 2
 port=9876
+concurrence=3
 
 [logging]
 log_path= logs/aitagging-app.log

BIN
agent/logs/aitagging-app.2026-03-12_15-29-18_138302.log.zip


BIN
agent/logs/aitagging-app.2026-03-13_09-47-02_435341.log.zip


+ 13 - 1
agent/pdm.lock

@@ -5,7 +5,7 @@
 groups = ["default"]
 strategy = ["inherit_metadata"]
 lock_version = "4.5.0"
-content_hash = "sha256:18528969456576163740af5ea9d8d07f14204187b864c4a5918cb728c31716b5"
+content_hash = "sha256:b92e7824261b29937eaae4defc966808f8dfd685ab873fc1a9d629b638045af9"
 
 [[metadata.targets]]
 requires_python = "==3.10.*"
@@ -72,6 +72,18 @@ files = [
 ]
 
 [[package]]
+name = "asyncio"
+version = "4.0.0"
+requires_python = ">=3.4"
+summary = "Deprecated backport of asyncio; use the stdlib package instead"
+groups = ["default"]
+marker = "python_version == \"3.10\""
+files = [
+    {file = "asyncio-4.0.0-py3-none-any.whl", hash = "sha256:c1eddb0659231837046809e68103969b2bef8b0400d59cfa6363f6b5ed8cc88b"},
+    {file = "asyncio-4.0.0.tar.gz", hash = "sha256:570cd9e50db83bc1629152d4d0b7558d6451bb1bfd5dfc2e935d96fc2f40329b"},
+]
+
+[[package]]
 name = "certifi"
 version = "2026.1.4"
 requires_python = ">=3.7"

+ 2 - 1
agent/pyproject.toml

@@ -1,6 +1,6 @@
 [project]
 name = "agent"
-version = "0.1.1"
+version = "0.1.3"
 description = "Default template for PDM package"
 authors = [
     {name = "jiayongqiang", email = "15936285643@163.com"},
@@ -16,6 +16,7 @@ dependencies = [
     "psycopg2-binary>=2.9.11",
     "loguru>=0.7.3",
     "openpyxl>=3.1.5",
+    "asyncio>=4.0.0",
 ]
 requires-python = ">=3.10"
 readme = "README.md"

+ 11 - 3
agent/src/agent/agent.py

@@ -1,8 +1,10 @@
 from langchain.chat_models import init_chat_model
 from langchain.agents import create_agent
 from pydantic import BaseModel,Field
-
+import json
 from agent.core.config import get_config_path
+from agent.logger import logger
+
 config = get_config_path()
 TOP_K = config['app']['top_k']
 
@@ -83,11 +85,17 @@ async def reflect_check(context: str, labels: list):
         3.不以企业整体行业属性替代对贷款具体内容的判断
         4.所有判断应服务于产业打标的可用性与可解释性
     """
-    print(prompt)
     response = await agent.ainvoke({
         "messages": [{"role":"system","content": prompt}]
     },context = {})
-    return response["structured_response"]
+    result = response["structured_response"]
+
+
+    result =  [r.dict() for r in result.labels]
+    result = json.dumps(result, ensure_ascii=False)
+    logger.info(f"{context} LLM result: {result}")
+
+    return result
 
 class GenerateReg(BaseModel):
     """ 大模型生成的正则表达式 """

+ 28 - 25
agent/src/agent/api_outter.py

@@ -15,13 +15,15 @@ from typing import Optional
 import json
 from agent.logger import logger
 from agent.core.config import get_config_path
+import asyncio
+
 config = get_config_path()
 TOP_K = config['app']['top_k']
+CONCURRENCE = int(config['app']['concurrence'])
+background_semaphore = asyncio.Semaphore(CONCURRENCE)
 
 router = APIRouter(prefix="/v1", tags=["AI Tagging"])
 
-
-
 class TaggingRequest(BaseModel):
     app_id:  Optional[str] = Field(None, description="应用ID")
     timestamp: Optional[int] = Field(None, description="请求时间戳")
@@ -61,15 +63,18 @@ async def execute_reg(log_id:str,tag_category_id:str,phrase: str)-> list:
             """UPDATE aitag_tag_log SET reg_result = %s WHERE id = %s""",
             (str(result), log_id)
         )
-    logger.info(f"Updated reg_result for log_id {id}")
+    logger.info(f"[{log_id}] Regex filtering result: {result}")
     return result
 
 def vector_similarity_search(phrase: str, ids:list)-> list:
+    logger.info("Starting vector similarity search...")
     # 这里应该调用向量数据库进行相似度检索,返回相关标签id列表
     query = get_embeddings([phrase])[0]
     results = hybrid_search(ids, query, top_k=TOP_K)
     # return [{"id": r["_id"], "score": r["_score"], "tag_prompt": r["_source"]["tag_prompt"],"tag_name": r["_source"]["tag_name"],"tag_code": r["_source"]["tag_code"]} for r in results]
-    return [{"id": r["_id"], "tag_remark":r["_source"]["tag_remark"], "tag_prompt": r["_source"]["tag_prompt"],"tag_name": r["_source"]["tag_name"],"tag_code": r["_source"]["tag_code"],"tag_path": r["_source"]["tag_path"],"category_id": r["_source"]["category_id"]   } for r in results]
+    r = [{"id": r["_id"], "tag_remark":r["_source"]["tag_remark"], "tag_prompt": r["_source"]["tag_prompt"],"tag_name": r["_source"]["tag_name"],"tag_code": r["_source"]["tag_code"],"tag_path": r["_source"]["tag_path"],"category_id": r["_source"]["category_id"]   } for r in results]
+    logger.info(f"{phrase} Vector search result: {r}")
+    return r
 
 def init_tag_log(request: TaggingRequest):
     id = uuid.uuid4().hex
@@ -88,34 +93,32 @@ def init_tag_log(request: TaggingRequest):
 
 def update_tag_log(id:str, result:str):
     dao.execute(
-            """UPDATE aitag_tag_log SET state = %s, result = %s WHERE id = %s""",
-            (1, result, id)
+            """UPDATE aitag_tag_log SET state = %s, result = %s, ai_result_endtime = %s WHERE id = %s""",
+            (1, result, datetime.now(), id)
         )
 
 async def run_ai_pipeline(log_id: str, tag_category_id: str, phrase: str):
     try:
-        # step1: 正则过滤
-        result = await execute_reg(log_id,tag_category_id,phrase)
-        logger.info(f"[{log_id}] Regex filtering result: {result}")
-
-        # step2: 向量检索
-        if result:
-            result = vector_similarity_search(phrase, result)
-            logger.info(f"[{log_id}] Vector search result: {result}")
-
-        # step3: LLM 打标
-        if result:
-            result = await reflect_check(phrase, result)
-            result =  [r.dict() for r in result.labels]
-            result = json.dumps(result, ensure_ascii=False)
-            logger.info(f"[{log_id}] LLM result: {result}")
-
-        # step4: 更新数据库
-        # 如果result是个空集合,插入None
-        update_tag_log(log_id, result if result else None)
+        async with background_semaphore:
+            # step1: 正则过滤
+            result = await execute_reg(log_id,tag_category_id,phrase)
+            # step2: 向量检索
+            if result:
+                result = vector_similarity_search(phrase, result)
+            # step3: LLM 打标
+            if result:
+                try:
+                    result = await reflect_check(phrase, result)
+                except Exception as e:
+                    logger.error(f"LLM reflection check failed: {e}")
+                    result = None
+            # step4: 更新数据库
+            # 如果result是个空集合,插入None
+            update_tag_log(log_id, result if result else None)
             
     except Exception as e:
         logger.error(f"[{log_id}] Pipeline failed: {e}")
+        update_tag_log(log_id, None)
 
 @router.post("/tagging")
 async def ai_tagging(request: TaggingRequest,background_tasks: BackgroundTasks):

+ 9 - 6
agent/src/agent/core/es.py

@@ -1,12 +1,12 @@
 from elasticsearch import Elasticsearch,helpers
-import logging
+from agent.logger import logger
 
 from agent.core.config import get_config_path
 config = get_config_path()
 TOP_K = config['app']['top_k']
 
 url = config['es']['url']
-DIMS = int(config['es']['vecotr_dims'])
+DIMS = int(config['embedding']['default_dims'])
 
 es = Elasticsearch(
     hosts=[url],
@@ -28,6 +28,7 @@ if not es.indices.exists(index=INDEX_NAME):
                 "tag_remark": {"type": "text"},
                 "tag_reg": {"type": "text"},
                 "tag_prompt": {"type": "text"},
+                "category_id": {"type": "text"},
                 "tag_vector": {
                     "type": "dense_vector",
                     "dims": DIMS,
@@ -86,15 +87,17 @@ def bulk_upsert(documents):
         for doc in documents
     ]
     try:
-        helpers.bulk(es, actions,chunk_size=500, request_timeout=60)
+        success, errors = helpers.bulk(es, actions,chunk_size=500, request_timeout=60,raise_on_error=False)
+        logger.info(f"Bulk upsert completed: {success} successful, {len(errors)} errors")
+        logger.error(f"Bulk upsert errors: {errors}")
     except Exception as e:
-        print(f"Bulk upsert failed: {e}")
+        logger.error(f"Bulk upsert failed: {e}")
         for error in e.errors:
-            print(error)
+            logger.error(f"Error: {error}")
         raise
 
 def hybrid_search( target_doc_ids, query_vector, top_k=2):
-    logging.info(f"Performing hybrid search with target_doc_ids: {target_doc_ids}, query_vector: {len(query_vector)}, top_k: {top_k}")
+    logger.info(f"Performing hybrid search with target_doc_ids: {target_doc_ids}, query_vector: {len(query_vector)}, top_k: {top_k}")
     response = es.search(
         index=INDEX_NAME,
         knn={

+ 15 - 4
agent/src/agent/core/vector.py

@@ -1,6 +1,6 @@
 import requests
 import json
-
+from agent.logger import logger
 from agent.core.config import get_config_path
 config = get_config_path()
 TOP_K = config['app']['top_k']
@@ -8,7 +8,11 @@ TOP_K = config['app']['top_k']
 model = config['embedding']['model']
 base_url = config['embedding']['base_url']
 api_key = config['embedding']['api_key']
-
+dims = config["embedding"]["default_dims"]
+enable_config = config["embedding"]["enable_config"]
+# 将enable_config转换为布尔值
+if isinstance(enable_config, str):
+    enable_config = enable_config.lower() in ['true', '1', 'yes']
 def get_embeddings(texts):
     url = base_url
     headers = {
@@ -17,8 +21,14 @@ def get_embeddings(texts):
     }
     data = {
         "input": texts,
-        "model": model
+        "model": model,
     }
+    if enable_config:
+        data = {
+            "input": texts,
+            "model": model,
+            "dimensions": dims
+        }
     response = requests.post(url, headers=headers, data=json.dumps(data))
     embeddings = response.json()['data']
     return [e['embedding'] for e in embeddings]
@@ -27,4 +37,5 @@ if __name__ == "__main__":
     texts = ["Hello, world!", "How are you?"]
     embeddings = get_embeddings(texts)
     # print(embeddings)
-    print(f"Embedding for '{texts[0]}': {embeddings[0][:5]}...")  # 打印前5个维度的值
+    logger.info(f"Embedding for '{texts[0]}': len({embeddings[0]})") 
+    logger.info(f"Embedding for '{texts[0]}': {embeddings[0][:5]}...")  # 打印前5个维度的值

+ 2 - 0
agent/src/agent/logger.py

@@ -22,6 +22,8 @@ logger.add(
     level="DEBUG",
     format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
     enqueue=True,            # 异步安全(多进程/线程安全)
+    backtrace=True,
+    diagnose=True
 )
 
 # 导出 logger 实例供其他模块使用

+ 0 - 1
agent/src/agent/main.py

@@ -3,7 +3,6 @@ from agent.api_outter import router as outter_router
 from agent.api_inner import router as inner_router  
 from agent.logger import logger
 import uvicorn
-print('hello')
 logger.info("ai-tagging starting!")
 
 api_router = APIRouter()

+ 0 - 18
agent/src/agent/service_vector.py

@@ -1,18 +0,0 @@
-
-import time
-
-# EMBEDDING_MODEL = 'models/bge-small-zh-v1.5'
-# transformer = SentenceTransformer(EMBEDDING_MODEL)
-
-# def compute_embedding(text: str):
-#     embedding = transformer.encode(text)
-#     return embedding
-
-# if __name__ == "__main__":
-#     text = "你好,世界!"
-#     l1 = time.time()
-#     embedding = compute_embedding([text,"hello,world"])
-#     l2 = time.time()
-#     print(f"Time taken: {l2 - l1} seconds")
-#     print(embedding)
-#     print(f"Embedding dimension: {len(embedding)}")

+ 1 - 1
agent/tests/test_sync_category.py

@@ -1,6 +1,6 @@
 import requests
 
-res = requests.post("http://10.192.72.13:9876/api/aitag/admin/v1/synchronize_category", json={
+res = requests.post("http://localhost:9876/api/aitag/admin/v1/synchronize_category", json={
     "category_id": "f47ac10b-58cc-4372-a567-0e02b2c3d479"
 })
 print(res.text)

+ 1 - 2
agent/tests/test_tagging.py

@@ -8,9 +8,8 @@ res = requests.post("http://10.192.72.13:9876/api/aitag/v1/tagging", json={
     # "timestamp": 1234567890,
     # "sign": "test_sign",
     "business_attr": "test_attr",
-    "phrase": "职业:水产养殖人员 投向:海水养殖 用途:养殖鲍鱼"
+    "phrase": "5职业:水产养殖人员 投向:海水养殖 用途:养殖鲍鱼"
 })
 
-
 logging.info(res.text)