main.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from fastapi import FastAPI, File, UploadFile, Form
  2. import os
  3. from llmops.agents.data_classify_agent import DataClassifyAgent
  4. from llmops.agents.data_manager import DataManager
  5. import csv
  6. from fastapi.responses import StreamingResponse
  7. import io
  8. import urllib.parse
  9. from llmops.complete_agent_flow_rule import run_complete_agent_flow
  10. from llmops.config import DEEPSEEK_API_KEY
  11. # 初始化FastAPI应用
  12. app = FastAPI(
  13. title="智能体服务<Agents API>",
  14. description="提供数据分类打标服务",
  15. version="1.0"
  16. )
  17. @app.get("/")
  18. def root():
  19. return {"message": "Hello, Agents for you!"}
  20. # 设置文件保存路径
  21. UPLOAD_FOLDER = "uploads"
  22. os.makedirs(UPLOAD_FOLDER, exist_ok=True)
  23. data_classify_agent = DataClassifyAgent()
  24. @app.post("/api/dataset/classify")
  25. async def dataset_classify(file: UploadFile = File(...), industry: str = Form(...)):
  26. """
  27. 上传原始数据文件(格式CSV)进行分类打标
  28. Args:
  29. file: 用户上传的CSV数据文件
  30. industry: 行业
  31. """
  32. # 获取文件的存储路径
  33. file_path = os.path.join(UPLOAD_FOLDER, file.filename)
  34. full_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path)
  35. print(f"上传文件绝对路径:{full_path}")
  36. try:
  37. # 将文件内容写入到上传目录中
  38. with open(full_path, "wb") as f:
  39. f.write(file.file.read())
  40. # 读取文件内容
  41. data_set = DataManager.load_data_from_csv_file(full_path)
  42. # 对数据进行分类打标
  43. data_set_classified = data_classify_agent.invoke_data_classify(industry=industry, data_set=data_set, file_name=file.filename)
  44. # 格式转换 [{}] -> csv 列表形式
  45. # 生成器函数,用于逐行生成 CSV 内容
  46. def generate_csv_rows():
  47. # 创建内存中的字符串缓冲区
  48. output = io.StringIO()
  49. writer = csv.writer(output)
  50. output.write('\ufeff') # BOM头,确保Excel正确显示中文
  51. # 写入表头
  52. writer.writerow(data_classify_agent.fields_order)
  53. yield output.getvalue()
  54. output.seek(0)
  55. output.truncate(0)
  56. # 逐行写入数据
  57. for row in data_set_classified:
  58. writer.writerow([row["txId"], row["txDate"], row["txTime"], row["txAmount"], row["txBalance"],
  59. row["txDirection"], row["txSummary"], row["txCounterparty"], row["createdAt"],
  60. row["businessType"]])
  61. yield output.getvalue()
  62. output.seek(0)
  63. output.truncate(0)
  64. # 输出文件名(打好标文件)
  65. output_file = str(file.filename).split(".")[0] + "_label.csv"
  66. # 对文件名进行URL编码,处理中文等非ASCII字符
  67. encoded_filename = urllib.parse.quote(output_file)
  68. # 使用 StreamingResponse 返回 CSV 文件
  69. return StreamingResponse(
  70. generate_csv_rows(),
  71. media_type="text/csv;charset=utf-8-sig",
  72. headers={
  73. "Content-Disposition": f"attachment; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}"
  74. }
  75. )
  76. except Exception as e:
  77. print(f"上传文件分类打标异常 {e}")
  78. import traceback
  79. traceback.print_exc()
  80. @app.post("/api/report/gen")
  81. async def gen_report(file: UploadFile = File(...), question: str = Form(...), industry: str = Form(...)):
  82. """
  83. 上传原始数据文件(格式CSV),输入问题和行业,生成对应的分析报告
  84. Args:
  85. file: 用户上传的CSV数据文件
  86. question: 用户问题
  87. industry: 行业
  88. Returns:
  89. 报告的JSON结构
  90. """
  91. # 获取文件的存储路径
  92. file_path = os.path.join(UPLOAD_FOLDER, file.filename)
  93. full_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), file_path)
  94. print(f"上传文件绝对路径:{full_path}")
  95. try:
  96. # 将文件内容写入到上传目录中
  97. with open(full_path, "wb") as f:
  98. f.write(file.file.read())
  99. # 读取文件内容,标准化文件
  100. data_set = DataManager.load_data_from_csv_file(full_path)
  101. # 执行测试
  102. result = await run_complete_agent_flow(
  103. question=question,
  104. industry=industry,
  105. data=data_set,
  106. file_name=file.filename,
  107. api_key=DEEPSEEK_API_KEY,
  108. session_id="direct-test"
  109. )
  110. print(result)
  111. return {
  112. "status": 0,
  113. "message": "success",
  114. "outline_draft": result["result"]["outline_draft"],
  115. "computed_metrics": result["result"]["computed_metrics"]
  116. }
  117. except Exception as e:
  118. print(f"生成流水分析报告异常: {e}")
  119. import traceback
  120. traceback.print_exc()
  121. return {
  122. "status": 1,
  123. "message": "error",
  124. "report": {}
  125. }
  126. if __name__ == "__main__":
  127. import uvicorn
  128. uvicorn.run("main:app", host="0.0.0.0", port=3699, reload=True)