|
|
@@ -0,0 +1,1098 @@
|
|
|
+
|
|
|
+from langgraph.prebuilt import create_react_agent
|
|
|
+from langchain_openai import ChatOpenAI
|
|
|
+from typing import Dict, List, Any, Optional
|
|
|
+import pandas as pd
|
|
|
+import json
|
|
|
+from datetime import datetime
|
|
|
+from pathlib import Path
|
|
|
+import numpy as np
|
|
|
+from llmops.agents.tools.balance_info_missing_recognizer import BalanceInfoMissingRecognizer
|
|
|
+from llmops.agents.tools.inactive_account_recognizer import InactiveAccountRecognizer
|
|
|
+from llmops.agents.tools.balance_recognizer import BalanceContinuityRecognizer
|
|
|
+from llmops.agents.tools.night_transaction_recognizer import NightTransactionRecognizer
|
|
|
+from llmops.agents.tools.high_frequency_transaction_recognizer import HighFrequencyTransactionRecognizer
|
|
|
+from llmops.agents.tools.large_amount_transaction_recognizer import LargeAmountTransactionRecognizer
|
|
|
+from llmops.agents.tools.occasional_high_integer_transaction_recognizer import OccasionalHighIntegerTransactionRecognizer
|
|
|
+from llmops.agents.tools.low_interest_rate_recognizer import LowInterestRateRecognizer
|
|
|
+from llmops.agents.tools.over_book_transaction_recognizer import OverBookTransactionRecognizer
|
|
|
+from llmops.agents.data_manager import DataManager
|
|
|
+from llmops.config import LLM_API_KEY, LLM_BASE_URL, LLM_MODEL_NAME, anomaly_recognizer_config
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+class AnomalyRecognitionAgent:
|
|
|
+ """异常识别智能体"""
|
|
|
+
|
|
|
+ def __init__(self, csv_path: str, api_key: str, base_url: str = "https://api.deepseek.com",
|
|
|
+ model_name: str = "deepseek-chat", config: Optional[Dict] = None):
|
|
|
+ """
|
|
|
+ 初始化异常识别智能体
|
|
|
+ """
|
|
|
+ self.csv_path = csv_path
|
|
|
+ self.llm = ChatOpenAI(
|
|
|
+ model=model_name,
|
|
|
+ api_key=api_key,
|
|
|
+ base_url=base_url,
|
|
|
+ temperature=0.1
|
|
|
+ )
|
|
|
+ self.config = config or {}
|
|
|
+ self.transaction_data = None
|
|
|
+ self.data_summary = {}
|
|
|
+ self.recognizer_tools = []
|
|
|
+ self.agent = None
|
|
|
+ self.recognition_results = {}
|
|
|
+
|
|
|
+ # 初始化识别工具
|
|
|
+ self._initialize_recognizers()
|
|
|
+
|
|
|
+ # 如果提供了LLM,初始化Agent
|
|
|
+ self._initialize_agent()
|
|
|
+
|
|
|
+ def _initialize_recognizers(self):
|
|
|
+ """初始化所有异常识别工具"""
|
|
|
+ # 余额信息缺失检查
|
|
|
+ if self.config.get('enable_balance_missing_check', True):
|
|
|
+ balance_missing_config = self.config.get('balance_missing_check', {})
|
|
|
+ self.recognizer_tools.append(BalanceInfoMissingRecognizer(
|
|
|
+ csv_path=self.csv_path,
|
|
|
+ config={'balance_missing_check': balance_missing_config}
|
|
|
+ ))
|
|
|
+ print(f"✅ 初始化余额信息缺失检查器(高优先级)")
|
|
|
+ # 长期无交易账户识别器
|
|
|
+ if self.config.get('enable_inactive_account_check', True):
|
|
|
+ inactive_account_config = self.config.get('inactive_account_check', {})
|
|
|
+ self.recognizer_tools.append(InactiveAccountRecognizer(
|
|
|
+ csv_path=self.csv_path,
|
|
|
+ config={'inactive_account_check': inactive_account_config}
|
|
|
+ ))
|
|
|
+ print(f"✅ 初始化长期无交易账户识别器(高优先级)")
|
|
|
+
|
|
|
+ # 余额连续性识别
|
|
|
+ if self.config.get('enable_balance_recognition', True):
|
|
|
+ self.recognizer_tools.append(BalanceContinuityRecognizer(csv_path=self.csv_path))
|
|
|
+ print(f"✅ 初始化余额连续性识别器")
|
|
|
+
|
|
|
+ # 夜间交易识别
|
|
|
+ if self.config.get('enable_night_recognition', True):
|
|
|
+ night_config = self.config.get('night_recognition', {})
|
|
|
+ self.recognizer_tools.append(NightTransactionRecognizer(
|
|
|
+ csv_path=self.csv_path,
|
|
|
+ config={'night_transaction': night_config}
|
|
|
+ ))
|
|
|
+ print(f"✅ 初始化夜间交易识别器")
|
|
|
+
|
|
|
+ # 高频交易识别
|
|
|
+ if self.config.get('enable_high_frequency_recognition', True):
|
|
|
+ high_freq_config = self.config.get('high_frequency_recognition', {})
|
|
|
+ self.recognizer_tools.append(HighFrequencyTransactionRecognizer(
|
|
|
+ csv_path=self.csv_path,
|
|
|
+ config={'high_frequency': high_freq_config}
|
|
|
+ ))
|
|
|
+ print(f"✅ 初始化高频交易识别器")
|
|
|
+
|
|
|
+ # 大额交易识别
|
|
|
+ if self.config.get('enable_large_amount_recognition', True):
|
|
|
+ large_amount_recognition_config = self.config.get('large_amount_recognition', {})
|
|
|
+ self.recognizer_tools.append(LargeAmountTransactionRecognizer(
|
|
|
+ csv_path=self.csv_path,
|
|
|
+ config={'large_amount_recognition': large_amount_recognition_config}
|
|
|
+ ))
|
|
|
+ print(f"✅ 初始化大额交易识别器")
|
|
|
+
|
|
|
+ # 偶发大额整数交易识别
|
|
|
+ if self.config.get('enable_occasional_high_integer_recognition', True):
|
|
|
+ integer_config = self.config.get('occasional_high_integer_transaction', {})
|
|
|
+ self.recognizer_tools.append(OccasionalHighIntegerTransactionRecognizer(
|
|
|
+ csv_path=self.csv_path,
|
|
|
+ config={'occasional_high_integer_transaction': integer_config}
|
|
|
+ ))
|
|
|
+ print(f"✅ 初始化偶发高额整数交易识别器")
|
|
|
+
|
|
|
+ # 结算交易识别
|
|
|
+ if self.config.get('enable_low_interest_rate_recognition', True):
|
|
|
+ interest_config = self.config.get('low_interest_rate_recognition', {})
|
|
|
+ self.recognizer_tools.append(LowInterestRateRecognizer(
|
|
|
+ csv_path=self.csv_path,
|
|
|
+ config={'interest_rate_check': interest_config}
|
|
|
+ ))
|
|
|
+ print(f"✅ 初始化低利率结息识别器(高优先级)")
|
|
|
+
|
|
|
+ # 疑似过账交易识别
|
|
|
+ if self.config.get('enable_over_book_transaction_recognition', True): # 使用 "over_book" 而不是 "overbook"
|
|
|
+ overbook_config = self.config.get('over_book_transaction_recognition', {}) # 保持一致
|
|
|
+ self.recognizer_tools.append(OverBookTransactionRecognizer(
|
|
|
+ csv_path=self.csv_path,
|
|
|
+ config={'over_book_transaction_recognition': overbook_config} # 保持一致
|
|
|
+ ))
|
|
|
+ print(f"✅ 初始化疑似过账流水交易识别器")
|
|
|
+
|
|
|
+ print(f"📋 共初始化 {len(self.recognizer_tools)} 个识别器")
|
|
|
+
|
|
|
+ def _initialize_agent(self):
|
|
|
+ """初始化智能体 - 优化版本"""
|
|
|
+ try:
|
|
|
+ # 确保每个工具都有清晰的描述
|
|
|
+ for tool in self.recognizer_tools:
|
|
|
+ # 如果描述太短,添加说明
|
|
|
+ if len(tool.description) < 30:
|
|
|
+ tool.description = f"分析银行流水数据中的{tool.display_name}"
|
|
|
+
|
|
|
+ # 创建Agent
|
|
|
+ self.agent = create_react_agent(
|
|
|
+ model=self.llm,
|
|
|
+ tools=self.recognizer_tools
|
|
|
+ )
|
|
|
+
|
|
|
+ print("🤖 异常识别智能体初始化成功")
|
|
|
+ print(f"🛠️ 加载了 {len(self.recognizer_tools)} 个工具:")
|
|
|
+
|
|
|
+ for i, tool in enumerate(self.recognizer_tools, 1):
|
|
|
+ print(f" {i}. {tool.display_name} ({tool.name})")
|
|
|
+ print(f" 描述: {tool.description}")
|
|
|
+
|
|
|
+ # 测试工具是否可用
|
|
|
+ print("🧪 测试工具可用性...")
|
|
|
+ for tool in self.recognizer_tools:
|
|
|
+ try:
|
|
|
+ # 测试工具的基本属性
|
|
|
+ has_run = hasattr(tool, '_run')
|
|
|
+ has_name = hasattr(tool, 'name')
|
|
|
+ has_desc = hasattr(tool, 'description')
|
|
|
+ print(
|
|
|
+ f" ✓ {tool.name}: 接口完整" if all([has_run, has_name, has_desc]) else f" ⚠️ {tool.name}: 接口不完整")
|
|
|
+ except:
|
|
|
+ print(f" ❌ {tool.name}: 测试失败")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"智能体初始化失败: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+ self.agent = None
|
|
|
+
|
|
|
+ def load_transaction_data(self) -> pd.DataFrame:
|
|
|
+ """加载交易数据"""
|
|
|
+ try:
|
|
|
+ print(f"📥 正在加载交易数据: {self.csv_path}")
|
|
|
+ self.transaction_data = DataManager.load_from_standardized_csv(self.csv_path)
|
|
|
+ self.data_summary = self._generate_data_summary()
|
|
|
+ return self.transaction_data
|
|
|
+ except Exception as e:
|
|
|
+ print(f"数据加载失败: {e}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def _generate_data_summary(self) -> Dict[str, Any]:
|
|
|
+ """生成数据摘要"""
|
|
|
+ if self.transaction_data is None or len(self.transaction_data) == 0:
|
|
|
+ return {}
|
|
|
+
|
|
|
+ df = self.transaction_data
|
|
|
+
|
|
|
+ summary = {
|
|
|
+ 'transaction_count': len(df),
|
|
|
+ 'date_range': {
|
|
|
+ 'start': df['txDate'].min() if 'txDate' in df.columns else '未知',
|
|
|
+ 'end': df['txDate'].max() if 'txDate' in df.columns else '未知'
|
|
|
+ },
|
|
|
+ 'total_amount': float(df['txAmount'].sum()) if 'txAmount' in df.columns else 0,
|
|
|
+ 'income_amount': float(df[df['txDirection'] == '收入']['txAmount'].sum())
|
|
|
+ if 'txAmount' in df.columns and 'txDirection' in df.columns else 0,
|
|
|
+ 'expense_amount': float(df[df['txDirection'] == '支出']['txAmount'].sum())
|
|
|
+ if 'txAmount' in df.columns and 'txDirection' in df.columns else 0,
|
|
|
+ 'average_amount': float(df['txAmount'].mean()) if 'txAmount' in df.columns else 0,
|
|
|
+ 'max_amount': float(df['txAmount'].max()) if 'txAmount' in df.columns else 0,
|
|
|
+ 'min_amount': float(df['txAmount'].min()) if 'txAmount' in df.columns else 0,
|
|
|
+ 'unique_days': df['datetime'].dt.date.nunique() if 'datetime' in df.columns else 0,
|
|
|
+ 'direction_distribution': df['txDirection'].value_counts().to_dict()
|
|
|
+ if 'txDirection' in df.columns else {}
|
|
|
+ }
|
|
|
+
|
|
|
+ return summary
|
|
|
+
|
|
|
+ def execute_full_recognition(self) -> Dict[str, Any]:
|
|
|
+ """执行完整异常识别"""
|
|
|
+ if self.transaction_data is None:
|
|
|
+ raise ValueError("请先加载交易数据")
|
|
|
+
|
|
|
+ print("🔍 开始执行银行流水异常识别...")
|
|
|
+
|
|
|
+ # 清空之前的结果
|
|
|
+ self.recognition_results = {
|
|
|
+ 'agent_results': None,
|
|
|
+ 'direct_results': None,
|
|
|
+ 'all_anomalies': [],
|
|
|
+ 'summary': {}
|
|
|
+ }
|
|
|
+
|
|
|
+ # 先执行直接识别
|
|
|
+ # try:
|
|
|
+ # direct_results = self._execute_direct_recognition()
|
|
|
+ # self.recognition_results['direct_results'] = direct_results
|
|
|
+ # except Exception as e:
|
|
|
+ # print(f"⚠️ 直接异常识别失败: {e}")
|
|
|
+
|
|
|
+ # 执行Agent识别(如果可用)
|
|
|
+ if self.agent:
|
|
|
+ try:
|
|
|
+ agent_results = self._execute_agent_recognition()
|
|
|
+ self.recognition_results['agent_results'] = agent_results
|
|
|
+ except Exception as e:
|
|
|
+ print(f"⚠️ Agent异常识别失败: {e}")
|
|
|
+ else:
|
|
|
+ print("⚠️ Agent未初始化,跳过Agent识别")
|
|
|
+
|
|
|
+ # 合并所有识别的异常
|
|
|
+ self._consolidate_anomalies()
|
|
|
+
|
|
|
+ # 生成识别摘要
|
|
|
+ self._generate_recognition_summary()
|
|
|
+
|
|
|
+ print("✅ 异常识别完成")
|
|
|
+
|
|
|
+ return self.recognition_results
|
|
|
+
|
|
|
+ def _execute_direct_recognition(self) -> Dict[str, Any]:
|
|
|
+ """执行直接异常识别"""
|
|
|
+ print("🚀 开始直接异常识别...")
|
|
|
+
|
|
|
+ results = {}
|
|
|
+ all_anomalies = []
|
|
|
+
|
|
|
+ for recognizer in self.recognizer_tools:
|
|
|
+ try:
|
|
|
+ print(f" 🔍 执行 {recognizer.display_name}...")
|
|
|
+ # 不传入任何参数,让识别器使用初始化时的csv_path
|
|
|
+ result = recognizer._run()
|
|
|
+ results[recognizer.display_name] = result
|
|
|
+
|
|
|
+ # 处理结果
|
|
|
+ if isinstance(result, str):
|
|
|
+ # 字符串结果
|
|
|
+ print(f" 📝 {recognizer.display_name}: {result[:100]}...")
|
|
|
+ elif isinstance(result, dict):
|
|
|
+ # 字典结果
|
|
|
+ if 'identified_anomalies' in result:
|
|
|
+ for anomaly in result['identified_anomalies']:
|
|
|
+ anomaly['recognition_type'] = recognizer.display_name
|
|
|
+ all_anomalies.append(anomaly)
|
|
|
+
|
|
|
+ anomaly_count = result.get('identified_count', 0)
|
|
|
+ status = result.get('recognition_status', '未知')
|
|
|
+ print(f" ✅ {recognizer.display_name}: 识别完成,发现 {anomaly_count} 条异常 ({status})")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ error_msg = f"{recognizer.display_name} 识别失败: {e}"
|
|
|
+ print(f" ❌ {error_msg}")
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'results': results,
|
|
|
+ 'all_anomalies': all_anomalies,
|
|
|
+ 'total_recognizers': len(self.recognizer_tools),
|
|
|
+ 'completed_recognizers': len(results)
|
|
|
+ }
|
|
|
+
|
|
|
+ def _execute_agent_recognition(self) -> Dict[str, Any]:
|
|
|
+ """执行Agent异常识别"""
|
|
|
+ print("🤖 开始智能体异常识别...")
|
|
|
+
|
|
|
+ try:
|
|
|
+ agent_results = self.recognition_results.get('agent_results', {})
|
|
|
+ if agent_results and 'all_anomalies' in agent_results:
|
|
|
+ for anomaly in agent_results['all_anomalies']:
|
|
|
+ if anomaly.get('check_type') == 'balance_info_missing':
|
|
|
+ balance_missing_alert = f"""
|
|
|
+ ⚠️ **重要提示**:
|
|
|
+ 检测到数据完整性异常:银行流水缺少余额信息字段!
|
|
|
+ 这会影响以下分析的准确性:
|
|
|
+ 1. 余额连续性检查(可能无法执行)
|
|
|
+ 2. 资金存量波动分析
|
|
|
+ 3. 交易与余额的匹配验证
|
|
|
+
|
|
|
+ 请在分析时考虑这一限制条件。
|
|
|
+ """
|
|
|
+ break
|
|
|
+
|
|
|
+ # 准备工具信息
|
|
|
+ tools_info = self._prepare_tools_info_for_prompt()
|
|
|
+
|
|
|
+ # 生成通用提示词
|
|
|
+ prompt = self._generate_universal_prompt(tools_info)
|
|
|
+
|
|
|
+ # 创建初始状态
|
|
|
+ initial_state = {
|
|
|
+ "messages": [
|
|
|
+ {
|
|
|
+ "role": "system",
|
|
|
+ "content": self._get_universal_system_prompt()
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "role": "user",
|
|
|
+ "content": prompt
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ }
|
|
|
+
|
|
|
+ print("🔄 正在执行Agent...")
|
|
|
+ print("📋 提示词已发送:")
|
|
|
+ print("-" * 50)
|
|
|
+ print(prompt[:500] + "..." if len(prompt) > 500 else prompt)
|
|
|
+ print("-" * 50)
|
|
|
+
|
|
|
+ # 执行代理
|
|
|
+ result = self.agent.invoke(initial_state)
|
|
|
+
|
|
|
+ print(f"✅ Agent执行完成,共 {len(result['messages'])} 条消息")
|
|
|
+
|
|
|
+ # 处理结果
|
|
|
+ agent_output = self._process_agent_result(result)
|
|
|
+
|
|
|
+ # 如果没有调用工具,尝试备用方案
|
|
|
+ if len(agent_output['tool_calls']) == 0:
|
|
|
+ print("⚠️ Agent没有调用工具,启动备用方案...")
|
|
|
+ backup_result = self._execute_backup_recognition()
|
|
|
+ agent_output['all_anomalies'].extend(backup_result['all_anomalies'])
|
|
|
+ agent_output['backup_used'] = True
|
|
|
+
|
|
|
+ print(f"🤖 最终统计: {len(agent_output['tool_calls'])} 次工具调用, {len(agent_output['all_anomalies'])} 条异常")
|
|
|
+
|
|
|
+ return agent_output
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ error_msg = f"Agent识别执行失败: {str(e)}"
|
|
|
+ print(f"❌ {error_msg}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'final_output': f"Agent识别失败: {error_msg}",
|
|
|
+ 'tool_calls': [],
|
|
|
+ 'tool_results': [],
|
|
|
+ 'all_anomalies': [],
|
|
|
+ 'error': str(e)
|
|
|
+ }
|
|
|
+
|
|
|
+ def _execute_backup_recognition(self) -> Dict[str, Any]:
|
|
|
+ """备用识别方案:直接调用所有工具"""
|
|
|
+ print("🔄 启动备用识别方案:直接调用所有工具...")
|
|
|
+
|
|
|
+ backup_results = {
|
|
|
+ 'all_anomalies': [],
|
|
|
+ 'tool_results': [],
|
|
|
+ 'tool_names': []
|
|
|
+ }
|
|
|
+
|
|
|
+ for recognizer in self.recognizer_tools:
|
|
|
+ print(f" 🔧 调用 {recognizer.display_name}...")
|
|
|
+ try:
|
|
|
+ result = recognizer._run(csv_path=self.csv_path)
|
|
|
+ backup_results['tool_results'].append(result)
|
|
|
+ backup_results['tool_names'].append(recognizer.name)
|
|
|
+
|
|
|
+ # 提取异常
|
|
|
+ if isinstance(result, dict):
|
|
|
+ if 'identified_anomalies' in result:
|
|
|
+ anomalies = result['identified_anomalies']
|
|
|
+ for anomaly in anomalies:
|
|
|
+ standardized = self._standardize_anomaly_record(anomaly, result)
|
|
|
+ backup_results['all_anomalies'].append(standardized)
|
|
|
+ print(f" 发现 {len(anomalies)} 条异常")
|
|
|
+ elif 'identified_count' in result:
|
|
|
+ print(f" 工具返回 {result['identified_count']} 条异常(但未找到详细记录)")
|
|
|
+ else:
|
|
|
+ print(f" 工具返回非字典结果: {type(result)}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f" ❌ 工具调用失败: {e}")
|
|
|
+
|
|
|
+ print(f"✅ 备用方案完成: 调用了 {len(backup_results['tool_names'])} 个工具, 发现 {len(backup_results['all_anomalies'])} 条异常")
|
|
|
+
|
|
|
+ return backup_results
|
|
|
+
|
|
|
+ def _process_agent_result(self, result: Any) -> Dict[str, Any]:
|
|
|
+ """处理Agent执行结果"""
|
|
|
+ agent_output = {
|
|
|
+ 'final_output': '',
|
|
|
+ 'tool_calls': [],
|
|
|
+ 'tool_results': [],
|
|
|
+ 'all_anomalies': [],
|
|
|
+ 'messages_analysis': []
|
|
|
+ }
|
|
|
+
|
|
|
+ # 分析消息流
|
|
|
+ for i, message in enumerate(result["messages"]):
|
|
|
+ msg_info = {
|
|
|
+ 'index': i + 1,
|
|
|
+ 'type': message.type,
|
|
|
+ 'has_tool_calls': False,
|
|
|
+ 'tool_call_count': 0
|
|
|
+ }
|
|
|
+
|
|
|
+ # 记录工具调用
|
|
|
+ if hasattr(message, 'tool_calls') and message.tool_calls:
|
|
|
+ tool_calls = message.tool_calls
|
|
|
+ agent_output['tool_calls'].extend(tool_calls)
|
|
|
+ msg_info['has_tool_calls'] = True
|
|
|
+ msg_info['tool_call_count'] = len(tool_calls)
|
|
|
+
|
|
|
+ print(f"🛠️ 消息{i + 1}: 发现 {len(tool_calls)} 个工具调用")
|
|
|
+ for tc in tool_calls:
|
|
|
+ print(f" 工具: {tc.get('name', '未知')}")
|
|
|
+ print(f" 参数: {tc.get('args', {})}")
|
|
|
+
|
|
|
+ # 处理工具返回结果
|
|
|
+ if message.type == 'tool':
|
|
|
+ content = message.content
|
|
|
+ agent_output['tool_results'].append(content)
|
|
|
+
|
|
|
+ # ============ 新增调试信息 ============
|
|
|
+ print(f"\n🔍 工具返回内容类型: {type(content)}")
|
|
|
+ if isinstance(content, dict):
|
|
|
+ print(f"📋 工具返回字典键: {list(content.keys())}")
|
|
|
+ if 'identified_count' in content:
|
|
|
+ print(f"📊 工具报告的异常数量: {content['identified_count']}")
|
|
|
+ if 'identified_anomalies' in content:
|
|
|
+ print(f"📦 工具返回的异常列表长度: {len(content['identified_anomalies'])}")
|
|
|
+ # 显示前几条异常详情
|
|
|
+ for j, anomaly in enumerate(content['identified_anomalies'][:3], 1):
|
|
|
+ print(
|
|
|
+ f" 异常{j}: ID={anomaly.get('txId')}, 原因={anomaly.get('recognition_reason', '')[:50]}...")
|
|
|
+ elif isinstance(content, str):
|
|
|
+ print(f"📝 工具返回字符串长度: {len(content)}")
|
|
|
+ print(f" 前200字符: {content[:200]}...")
|
|
|
+ # ============ 调试信息结束 ============
|
|
|
+
|
|
|
+ # 处理异常数据
|
|
|
+ anomalies = self._extract_anomalies_from_content(content)
|
|
|
+ if anomalies:
|
|
|
+ print(f"✅ 从工具结果提取到 {len(anomalies)} 条异常")
|
|
|
+ agent_output['all_anomalies'].extend(anomalies)
|
|
|
+ else:
|
|
|
+ print(f"⚠️ 从工具结果提取到 0 条异常")
|
|
|
+
|
|
|
+ msg_info['content_type'] = type(content).__name__
|
|
|
+ msg_info['content_length'] = len(str(content))
|
|
|
+
|
|
|
+ # 记录最终AI输出
|
|
|
+ if message.type == 'ai' and i == len(result["messages"]) - 1:
|
|
|
+ agent_output['final_output'] = getattr(message, 'content', '')
|
|
|
+ msg_info['is_final'] = True
|
|
|
+ msg_info['output_length'] = len(agent_output['final_output'])
|
|
|
+
|
|
|
+ print(f"🤖 最终AI输出 ({msg_info['output_length']} 字符):")
|
|
|
+ print("-" * 40)
|
|
|
+ print(agent_output['final_output'][:300] + "..." if len(agent_output['final_output']) > 300 else
|
|
|
+ agent_output['final_output'])
|
|
|
+ print("-" * 40)
|
|
|
+
|
|
|
+ agent_output['messages_analysis'].append(msg_info)
|
|
|
+
|
|
|
+ return agent_output
|
|
|
+
|
|
|
+ def _extract_anomalies_from_content(self, content: Any) -> List[Dict[str, Any]]:
|
|
|
+ """从工具结果中提取异常数据 - 修复版"""
|
|
|
+ anomalies = []
|
|
|
+
|
|
|
+ try:
|
|
|
+ print(f"🔍 提取异常,输入类型: {type(content)}")
|
|
|
+
|
|
|
+ # ============ 第一步:统一转换为字典 ============
|
|
|
+ processed_content = None
|
|
|
+
|
|
|
+ if isinstance(content, dict):
|
|
|
+ print(f" ✅ 已经是字典,直接处理")
|
|
|
+ processed_content = content
|
|
|
+
|
|
|
+ elif isinstance(content, str):
|
|
|
+ print(f" 📝 处理字符串内容,长度: {len(content)}")
|
|
|
+ print(f" 预览: {content[:200]}...")
|
|
|
+
|
|
|
+ # 尝试多种解析方式,传入初始深度0
|
|
|
+ processed_content = self._parse_string_content(content, depth=0, max_depth=3)
|
|
|
+
|
|
|
+ if processed_content is None:
|
|
|
+ print(f" ❌ 无法解析字符串内容,返回空列表")
|
|
|
+ return anomalies
|
|
|
+
|
|
|
+ else:
|
|
|
+ print(f" ⚠️ 未知内容类型: {type(content)},返回空列表")
|
|
|
+ return anomalies
|
|
|
+
|
|
|
+ # ============ 第二步:从字典中提取异常 ============
|
|
|
+ if isinstance(processed_content, dict):
|
|
|
+ print(f" 📋 处理字典,键: {list(processed_content.keys())}")
|
|
|
+
|
|
|
+ # 可能包含异常的字段名列表(按优先级)
|
|
|
+ anomaly_fields = [
|
|
|
+ 'identified_anomalies',
|
|
|
+ 'all_anomalies',
|
|
|
+ 'anomalies',
|
|
|
+ 'abnormal_records',
|
|
|
+ 'identified_abnormalities'
|
|
|
+ ]
|
|
|
+
|
|
|
+ found_anomalies = False
|
|
|
+
|
|
|
+ for field in anomaly_fields:
|
|
|
+ if field in processed_content:
|
|
|
+ anomaly_list = processed_content[field]
|
|
|
+ print(f" ✅ 找到字段 '{field}',类型: {type(anomaly_list)}")
|
|
|
+
|
|
|
+ if isinstance(anomaly_list, list):
|
|
|
+ print(f" 列表长度: {len(anomaly_list)}")
|
|
|
+
|
|
|
+ for i, anomaly in enumerate(anomaly_list):
|
|
|
+ if isinstance(anomaly, dict):
|
|
|
+ standardized = self._standardize_anomaly_record(anomaly, processed_content)
|
|
|
+ anomalies.append(standardized)
|
|
|
+ print(f" ✓ 标准化异常 {i + 1}: ID={anomaly.get('txId', '未知')}")
|
|
|
+ else:
|
|
|
+ print(f" ⚠️ 异常记录 {i + 1} 不是字典: {type(anomaly)}")
|
|
|
+ # 尝试转换非字典异常
|
|
|
+ if hasattr(anomaly, '__dict__'):
|
|
|
+ anomaly_dict = anomaly.__dict__
|
|
|
+ standardized = self._standardize_anomaly_record(anomaly_dict, processed_content)
|
|
|
+ anomalies.append(standardized)
|
|
|
+
|
|
|
+ found_anomalies = True
|
|
|
+ print(f" 📊 从字段 '{field}' 提取到 {len(anomaly_list)} 条异常")
|
|
|
+ break # 找到一个就停止
|
|
|
+ else:
|
|
|
+ print(f" ⚠️ 字段 '{field}' 不是列表类型: {type(anomaly_list)}")
|
|
|
+
|
|
|
+ # 如果没有找到标准字段,搜索任何包含字典的列表字段
|
|
|
+ if not found_anomalies:
|
|
|
+ print(f" 🔎 没有找到标准异常字段,搜索其他列表字段...")
|
|
|
+ for key, value in processed_content.items():
|
|
|
+ if isinstance(value, list) and len(value) > 0:
|
|
|
+ print(f" 发现列表字段 '{key}',长度: {len(value)},元素类型: {type(value[0])}")
|
|
|
+
|
|
|
+ # 检查列表元素是否是字典(可能包含异常)
|
|
|
+ if len(value) > 0 and isinstance(value[0], dict):
|
|
|
+ print(f" ⚠️ 列表 '{key}' 包含字典,可能包含异常数据")
|
|
|
+ # 可以选择是否提取这些数据
|
|
|
+ # for item in value:
|
|
|
+ # if isinstance(item, dict) and 'txId' in item:
|
|
|
+ # standardized = self._standardize_anomaly_record(item, processed_content)
|
|
|
+ # anomalies.append(standardized)
|
|
|
+
|
|
|
+ print(f" 🎯 最终提取到 {len(anomalies)} 条异常")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"❌ 提取异常数据时出错: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|
|
|
+
|
|
|
+ return anomalies
|
|
|
+
|
|
|
+
|
|
|
+ def _standardize_anomaly_record(self, anomaly: Dict, source_content: Any) -> Dict[str, Any]:
|
|
|
+ """标准化异常记录"""
|
|
|
+ if not isinstance(anomaly, dict):
|
|
|
+ anomaly = {'raw_data': str(anomaly)}
|
|
|
+
|
|
|
+ # 提取识别器名称
|
|
|
+ recognizer_name = ''
|
|
|
+ if isinstance(source_content, dict):
|
|
|
+ recognizer_name = source_content.get('recognition_type', '未知')
|
|
|
+ # 从execution_info中提取更多信息
|
|
|
+ if 'execution_info' in source_content:
|
|
|
+ exec_info = source_content['execution_info']
|
|
|
+ recognizer_name = exec_info.get('display_name', recognizer_name)
|
|
|
+
|
|
|
+ # 确保有必要的字段
|
|
|
+ standardized = {
|
|
|
+ 'recognition_source': 'agent',
|
|
|
+ 'recognition_type': recognizer_name,
|
|
|
+ 'txId': str(anomaly.get('txId', anomaly.get('tx_id', ''))),
|
|
|
+ 'txDate': str(anomaly.get('txDate', anomaly.get('tx_date', ''))),
|
|
|
+ 'txTime': str(anomaly.get('txTime', anomaly.get('tx_time', ''))),
|
|
|
+ 'txAmount': float(anomaly.get('txAmount', anomaly.get('tx_amount', 0))),
|
|
|
+ 'txDirection': str(anomaly.get('txDirection', anomaly.get('tx_direction', ''))),
|
|
|
+ 'recognition_reason': str(anomaly.get('recognition_reason', anomaly.get('reason', ''))),
|
|
|
+ 'severity': str(anomaly.get('severity', 'medium')),
|
|
|
+ 'status': str(anomaly.get('status', '待核查')),
|
|
|
+ 'raw_anomaly': anomaly # 保留原始数据
|
|
|
+ }
|
|
|
+
|
|
|
+ # 添加datetime信息(如果存在)
|
|
|
+ if 'datetime' in anomaly and pd.notna(anomaly['datetime']):
|
|
|
+ standardized['datetime'] = str(anomaly['datetime'])
|
|
|
+
|
|
|
+ return standardized
|
|
|
+
|
|
|
+
|
|
|
+ def _generate_universal_prompt(self, tools_info: List[Dict[str, str]]) -> str:
|
|
|
+ """生成通用提示词"""
|
|
|
+
|
|
|
+ # 构建工具列表
|
|
|
+ tools_list = "\n".join([
|
|
|
+ f"{info['index']}. {info['display_name']} ({info['name']}): {info['short_desc']}"
|
|
|
+ for info in tools_info
|
|
|
+ ])
|
|
|
+
|
|
|
+ # 构建建议顺序
|
|
|
+ suggested_order = " → ".join([info['display_name'] for info in tools_info])
|
|
|
+
|
|
|
+ # 构建工具调用示例
|
|
|
+ first_tool = tools_info[0]
|
|
|
+ example_call = f"""{first_tool['name']}(csv_path="{self.csv_path}")"""
|
|
|
+
|
|
|
+ return f"""
|
|
|
+ # 银行交易流水异常识别分析任务
|
|
|
+
|
|
|
+ ## 数据文件:
|
|
|
+ {self.csv_path}
|
|
|
+
|
|
|
+ ## 可用分析工具(共{len(tools_info)}个):
|
|
|
+ {tools_list}
|
|
|
+
|
|
|
+ ## 执行要求:
|
|
|
+ 1. **必须使用上述工具**进行分析,不能跳过工具调用
|
|
|
+ 2. 建议按顺序执行:{suggested_order}
|
|
|
+ 3. 每个工具都需要传入csv_path参数,值为:{self.csv_path}
|
|
|
+ 4. 整合所有工具的结果生成综合报告
|
|
|
+
|
|
|
+ ## 工具调用示例:
|
|
|
+ 要调用第一个工具,使用:{example_call}
|
|
|
+
|
|
|
+ ## 请开始执行:
|
|
|
+ 请首先调用 {first_tool['display_name']} 工具开始分析。
|
|
|
+ """
|
|
|
+
|
|
|
+ def _prepare_tools_info_for_prompt(self) -> List[Dict[str, str]]:
|
|
|
+ """为提示词准备工具信息"""
|
|
|
+ tools_info = []
|
|
|
+
|
|
|
+ for i, tool in enumerate(self.recognizer_tools, 1):
|
|
|
+ tool_info = {
|
|
|
+ 'index': i,
|
|
|
+ 'name': tool.name,
|
|
|
+ 'display_name': getattr(tool, 'display_name', tool.name),
|
|
|
+ 'description': tool.description,
|
|
|
+ 'short_desc': tool.description[:100] + "..." if len(tool.description) > 100 else tool.description
|
|
|
+ }
|
|
|
+ tools_info.append(tool_info)
|
|
|
+
|
|
|
+ return tools_info
|
|
|
+
|
|
|
+ def _get_universal_system_prompt(self) -> str:
|
|
|
+ """获取通用系统提示词"""
|
|
|
+ return """
|
|
|
+ 你是一个银行流水异常识别专家AI助手。
|
|
|
+
|
|
|
+ ## 核心规则:
|
|
|
+ 1. 你必须使用提供的工具来分析数据
|
|
|
+ 2. 不能跳过工具直接回答问题
|
|
|
+ 3. 每次分析至少要调用一个工具
|
|
|
+ 4. 等待工具返回结果后再继续分析
|
|
|
+ 5. 基于工具结果生成报告
|
|
|
+
|
|
|
+ ## 工具使用说明:
|
|
|
+ - 每个工具都需要csv_path参数
|
|
|
+ - 使用用户提供的文件路径
|
|
|
+ - 可以按顺序调用多个工具
|
|
|
+ - 记录每个工具的结果
|
|
|
+
|
|
|
+ ## 输出要求:
|
|
|
+ - 总结每个工具的分析结果
|
|
|
+ - 列出所有发现的异常
|
|
|
+ - 提供综合风险评估
|
|
|
+ - 给出后续核查建议
|
|
|
+ """
|
|
|
+
|
|
|
+ def _debug_agent_execution(self, agent_output: Dict[str, Any]):
|
|
|
+ """调试Agent执行过程"""
|
|
|
+ print("\n🔍 Agent执行调试信息:")
|
|
|
+ print(f" 工具调用次数: {len(agent_output['tool_calls'])}")
|
|
|
+
|
|
|
+ for i, tool_call in enumerate(agent_output['tool_calls']):
|
|
|
+ print(f" 工具调用 {i + 1}:")
|
|
|
+ print(f" 名称: {tool_call.get('name', '未知')}")
|
|
|
+ print(f" 参数: {tool_call.get('args', {})}")
|
|
|
+
|
|
|
+ print(f" 工具结果数量: {len(agent_output['tool_results'])}")
|
|
|
+ for i, result in enumerate(agent_output['tool_results']):
|
|
|
+ print(f" 工具结果 {i + 1}: {str(result)[:150]}...")
|
|
|
+
|
|
|
+ def _consolidate_anomalies(self):
|
|
|
+ """合并所有识别的异常"""
|
|
|
+ all_anomalies = []
|
|
|
+
|
|
|
+ # # 从直接识别结果中收集异常
|
|
|
+ # direct_results = self.recognition_results.get('direct_results', {})
|
|
|
+ # if 'all_anomalies' in direct_results:
|
|
|
+ # all_anomalies.extend(direct_results['all_anomalies'])
|
|
|
+
|
|
|
+ # 从Agent结果中收集异常
|
|
|
+ agent_results = self.recognition_results.get('agent_results')
|
|
|
+ if agent_results and 'all_anomalies' in agent_results:
|
|
|
+ all_anomalies.extend(agent_results['all_anomalies'])
|
|
|
+
|
|
|
+ # 去重
|
|
|
+ unique_anomalies = []
|
|
|
+ seen = set()
|
|
|
+
|
|
|
+ for anomaly in all_anomalies:
|
|
|
+ key = f"{anomaly.get('txId', '')}_{anomaly.get('recognition_type', '')}"
|
|
|
+ if key not in seen:
|
|
|
+ seen.add(key)
|
|
|
+ unique_anomalies.append(anomaly)
|
|
|
+
|
|
|
+ self.recognition_results['all_anomalies'] = unique_anomalies
|
|
|
+ print(f"📊 合并后共有 {len(unique_anomalies)} 条异常")
|
|
|
+
|
|
|
+
|
|
|
+ def _generate_recognition_summary(self):
|
|
|
+ """生成识别摘要"""
|
|
|
+ all_anomalies = self.recognition_results.get('all_anomalies', [])
|
|
|
+
|
|
|
+ summary = {
|
|
|
+ 'total_transactions': self.data_summary.get('transaction_count', 0),
|
|
|
+ 'total_identified_anomalies': len(all_anomalies),
|
|
|
+ 'recognition_ratio': f"{(len(all_anomalies) / self.data_summary.get('transaction_count', 1) * 100):.2f}%"
|
|
|
+ if self.data_summary.get('transaction_count', 0) > 0 else "0%",
|
|
|
+ 'recognition_completion_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
+ 'enabled_recognizers': len(self.recognizer_tools),
|
|
|
+ 'anomaly_distribution': {}
|
|
|
+ }
|
|
|
+
|
|
|
+ # 按类型统计异常
|
|
|
+ for anomaly in all_anomalies:
|
|
|
+ anomaly_type = anomaly.get('recognition_type', '未知')
|
|
|
+ summary['anomaly_distribution'][anomaly_type] = summary['anomaly_distribution'].get(anomaly_type, 0) + 1
|
|
|
+
|
|
|
+ # 按严重程度统计
|
|
|
+ severity_counts = {'high': 0, 'medium': 0, 'low': 0}
|
|
|
+ for anomaly in all_anomalies:
|
|
|
+ severity = anomaly.get('severity', 'medium')
|
|
|
+ severity_counts[severity] = severity_counts.get(severity, 0) + 1
|
|
|
+
|
|
|
+ summary['severity_distribution'] = severity_counts
|
|
|
+
|
|
|
+ self.recognition_results['summary'] = summary
|
|
|
+
|
|
|
+ # 以下方法保持不变...
|
|
|
+ def generate_recognition_report(self, output_dir: str = "outputs/reports") -> str:
|
|
|
+ """生成异常识别报告"""
|
|
|
+ try:
|
|
|
+ # 创建输出目录
|
|
|
+ Path(output_dir).mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
+ report_id = f"anomaly_report_{timestamp}"
|
|
|
+
|
|
|
+ print(f"\n📊 生成异常识别报告...")
|
|
|
+
|
|
|
+ # 1. 保存识别的异常记录(CSV格式)
|
|
|
+ anomalies_path = self._save_anomalies_csv(output_dir, report_id)
|
|
|
+
|
|
|
+ # 2. 生成详细识别报告(JSON格式)
|
|
|
+ report_path = self._save_detailed_report(output_dir, report_id)
|
|
|
+
|
|
|
+ # 3. 生成识别摘要(文本格式)
|
|
|
+ summary_path = self._save_summary_txt(output_dir, report_id)
|
|
|
+
|
|
|
+
|
|
|
+ print(f"✅ 报告生成完成")
|
|
|
+ print(f" 异常记录: {anomalies_path}")
|
|
|
+ print(f" 详细报告: {report_path}")
|
|
|
+ print(f" 识别摘要: {summary_path}")
|
|
|
+
|
|
|
+ return report_path
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ raise
|
|
|
+
|
|
|
+ def _save_anomalies_csv(self, output_dir: str, report_id: str) -> str:
|
|
|
+ """保存异常记录为CSV文件"""
|
|
|
+ anomalies_df = pd.DataFrame(self.recognition_results['all_anomalies'])
|
|
|
+
|
|
|
+ # 定义列顺序
|
|
|
+ column_order = [
|
|
|
+ 'recognition_type', 'txId', 'txDate', 'txTime', 'txAmount',
|
|
|
+ 'txDirection', 'recognition_reason', 'severity', 'status'
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 只保留存在的列
|
|
|
+ existing_columns = [col for col in column_order if col in anomalies_df.columns]
|
|
|
+ other_columns = [col for col in anomalies_df.columns if col not in column_order]
|
|
|
+
|
|
|
+ # 重新排序列
|
|
|
+ anomalies_df = anomalies_df[existing_columns + other_columns]
|
|
|
+
|
|
|
+ # 保存CSV
|
|
|
+ anomalies_path = Path(output_dir) / f"{report_id}_anomalies.csv"
|
|
|
+ anomalies_df.to_csv(anomalies_path, index=False, encoding='utf-8-sig')
|
|
|
+
|
|
|
+ return str(anomalies_path)
|
|
|
+
|
|
|
+ # 其他方法保持不变...
|
|
|
+ # anomaly_recognizer_agent.py
|
|
|
+ # 在 AnomalyRecognitionAgent 类的末尾添加以下方法(在现有方法之后)
|
|
|
+
|
|
|
+ def get_recognition_summary(self) -> Dict[str, Any]:
|
|
|
+ """获取识别摘要"""
|
|
|
+ return self.recognition_results.get('summary', {})
|
|
|
+
|
|
|
+ def get_recognizer_stats(self) -> List[Dict[str, Any]]:
|
|
|
+ """获取识别器统计信息"""
|
|
|
+ stats = []
|
|
|
+ for recognizer in self.recognizer_tools:
|
|
|
+ stats.append(recognizer.get_summary())
|
|
|
+ return stats
|
|
|
+
|
|
|
+ def _save_detailed_report(self, output_dir: str, report_id: str) -> str:
|
|
|
+ """保存详细识别报告(JSON格式)"""
|
|
|
+ report_data = {
|
|
|
+ 'report_metadata': {
|
|
|
+ 'report_id': report_id,
|
|
|
+ 'generation_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
|
+ 'data_source': self.csv_path,
|
|
|
+ 'recognition_method': '混合模式' if self.recognition_results.get('agent_results') else '直接模式'
|
|
|
+ },
|
|
|
+ 'data_summary': self.data_summary,
|
|
|
+ 'recognition_configuration': {
|
|
|
+ 'enabled_recognizers': [
|
|
|
+ {
|
|
|
+ 'name': tool.name,
|
|
|
+ 'display_name': tool.display_name,
|
|
|
+ 'description': tool.description[:100] + '...' if len(
|
|
|
+ tool.description) > 100 else tool.description
|
|
|
+ }
|
|
|
+ for tool in self.recognizer_tools
|
|
|
+ ],
|
|
|
+ 'total_recognizers': len(self.recognizer_tools),
|
|
|
+ 'config': self.config
|
|
|
+ },
|
|
|
+ 'recognition_results': {
|
|
|
+ 'summary': self.recognition_results.get('summary', {}),
|
|
|
+ 'direct_results_summary': {},
|
|
|
+ 'agent_results_summary': {}
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ # 添加直接识别结果摘要
|
|
|
+ # direct_results = self.recognition_results.get('direct_results', {})
|
|
|
+ # if 'results' in direct_results:
|
|
|
+ # for recognizer_name, result in direct_results['results'].items():
|
|
|
+ # report_data['recognition_results']['direct_results_summary'][recognizer_name] = {
|
|
|
+ # 'identified_count': result.get('identified_count', 0),
|
|
|
+ # 'recognition_status': result.get('recognition_status', '未知'),
|
|
|
+ # 'execution_time': result.get('execution_info', {}).get('execution_time', '')
|
|
|
+ # }
|
|
|
+
|
|
|
+ # 添加Agent识别结果摘要
|
|
|
+ agent_results = self.recognition_results.get('agent_results')
|
|
|
+ if agent_results:
|
|
|
+ report_data['recognition_results']['agent_results_summary'] = {
|
|
|
+ 'iterations': agent_results.get('iterations', 0),
|
|
|
+ 'tool_calls_count': len(agent_results.get('tool_calls', [])),
|
|
|
+ 'final_output_preview': agent_results.get('final_output', '')[:500] + '...'
|
|
|
+ if agent_results.get('final_output') else '无'
|
|
|
+ }
|
|
|
+
|
|
|
+ # 保存JSON报告
|
|
|
+ report_path = Path(output_dir) / f"{report_id}.json"
|
|
|
+
|
|
|
+ def json_serializer(obj):
|
|
|
+ if isinstance(obj, (pd.Timestamp, datetime)):
|
|
|
+ return obj.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
+ elif isinstance(obj, np.integer):
|
|
|
+ return int(obj)
|
|
|
+ elif isinstance(obj, np.floating):
|
|
|
+ return float(obj)
|
|
|
+ elif isinstance(obj, np.ndarray):
|
|
|
+ return obj.tolist()
|
|
|
+ elif pd.isna(obj):
|
|
|
+ return None
|
|
|
+ elif hasattr(obj, '__dict__'):
|
|
|
+ return str(obj)
|
|
|
+ return str(obj)
|
|
|
+
|
|
|
+ with open(report_path, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(report_data, f, ensure_ascii=False, indent=2, default=json_serializer)
|
|
|
+
|
|
|
+ return str(report_path)
|
|
|
+
|
|
|
+ def _save_summary_txt(self, output_dir: str, report_id: str) -> str:
|
|
|
+ """保存识别摘要(文本格式)"""
|
|
|
+ summary = self.recognition_results.get('summary', {})
|
|
|
+ anomaly_distribution = summary.get('anomaly_distribution', {})
|
|
|
+ severity_distribution = summary.get('severity_distribution', {})
|
|
|
+
|
|
|
+ summary_path = Path(output_dir) / f"{report_id}_summary.txt"
|
|
|
+
|
|
|
+ with open(summary_path, 'w', encoding='utf-8') as f:
|
|
|
+ f.write("=" * 70 + "\n")
|
|
|
+ f.write("银行流水异常识别报告摘要\n")
|
|
|
+ f.write("=" * 70 + "\n\n")
|
|
|
+
|
|
|
+ # 报告信息
|
|
|
+ f.write("📅 报告信息:\n")
|
|
|
+ f.write(f" 报告ID: {report_id}\n")
|
|
|
+ f.write(f" 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
|
|
+ f.write(f" 数据源: {self.csv_path}\n\n")
|
|
|
+
|
|
|
+ # 数据概览
|
|
|
+ f.write("📈 数据概览:\n")
|
|
|
+ f.write(f" 总交易笔数: {summary.get('total_transactions', 0):,}\n")
|
|
|
+ f.write(
|
|
|
+ f" 时间范围: {self.data_summary.get('date_range', {}).get('start', '未知')} 至 {self.data_summary.get('date_range', {}).get('end', '未知')}\n")
|
|
|
+ f.write(f" 总交易金额: {self.data_summary.get('total_amount', 0):,.2f}元\n")
|
|
|
+ f.write(f" 平均交易金额: {self.data_summary.get('average_amount', 0):,.2f}元\n\n")
|
|
|
+
|
|
|
+ # 识别结果
|
|
|
+ f.write("🔍 异常识别结果:\n")
|
|
|
+ f.write(f" 启用的识别器: {summary.get('enabled_recognizers', 0)} 个\n")
|
|
|
+ f.write(f" 识别出的异常: {summary.get('total_identified_anomalies', 0)} 条\n")
|
|
|
+ f.write(f" 异常识别率: {summary.get('recognition_ratio', '0%')}\n\n")
|
|
|
+
|
|
|
+ # 异常类型分布
|
|
|
+ if anomaly_distribution:
|
|
|
+ f.write("📊 异常类型分布:\n")
|
|
|
+ for anomaly_type, count in anomaly_distribution.items():
|
|
|
+ f.write(f" - {anomaly_type}: {count} 条\n")
|
|
|
+ f.write("\n")
|
|
|
+
|
|
|
+ # 严重程度分布
|
|
|
+ if severity_distribution:
|
|
|
+ f.write("⚠️ 严重程度分布:\n")
|
|
|
+ for severity, count in severity_distribution.items():
|
|
|
+ f.write(f" - {severity.upper()}: {count} 条\n")
|
|
|
+ f.write("\n")
|
|
|
+
|
|
|
+ f.write("\n" + "=" * 70 + "\n")
|
|
|
+ f.write("报告生成完成\n")
|
|
|
+ f.write("=" * 70 + "\n")
|
|
|
+
|
|
|
+ return str(summary_path)
|
|
|
+
|
|
|
+ def _parse_string_content(self, content: str, depth: int = 0, max_depth: int = 3) -> Optional[Dict]:
|
|
|
+ """解析字符串内容为字典 - 支持多种格式,带递归深度控制"""
|
|
|
+
|
|
|
+ # 递归深度保护
|
|
|
+ if depth >= max_depth:
|
|
|
+ print(f" ⚠️ 达到最大递归深度 {max_depth},停止解析")
|
|
|
+ return None
|
|
|
+
|
|
|
+ if not content or not isinstance(content, str):
|
|
|
+ return None
|
|
|
+
|
|
|
+ print(f" [{depth}] 解析字符串,长度: {len(content)}")
|
|
|
+
|
|
|
+ # 尝试1: JSON解析(标准格式,双引号)
|
|
|
+ try:
|
|
|
+ import json
|
|
|
+ parsed = json.loads(content)
|
|
|
+ if isinstance(parsed, dict):
|
|
|
+ print(f" [{depth}] ✅ JSON解析成功")
|
|
|
+ return parsed
|
|
|
+ else:
|
|
|
+ print(f" [{depth}] ⚠️ JSON解析成功但不是字典: {type(parsed)}")
|
|
|
+ # 如果是列表或其他类型,包装成字典
|
|
|
+ return {
|
|
|
+ 'parsed_content': parsed,
|
|
|
+ 'original_type': type(parsed).__name__,
|
|
|
+ 'parse_method': 'json'
|
|
|
+ }
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f" [{depth}] ⚠️ JSON解析失败: {e}")
|
|
|
+
|
|
|
+ # 尝试2: Python字典字符串表示(单引号)
|
|
|
+ # 先清理字符串,移除可能的额外空白
|
|
|
+ cleaned_content = content.strip()
|
|
|
+ if cleaned_content.startswith('{') and cleaned_content.endswith('}'):
|
|
|
+ try:
|
|
|
+ import ast
|
|
|
+ parsed = ast.literal_eval(cleaned_content) # 安全解析Python表达式
|
|
|
+ if isinstance(parsed, dict):
|
|
|
+ print(f" [{depth}] ✅ ast解析成功(Python字典字符串)")
|
|
|
+ return parsed
|
|
|
+ else:
|
|
|
+ print(f" [{depth}] ⚠️ ast解析成功但不是字典: {type(parsed)}")
|
|
|
+ return {
|
|
|
+ 'parsed_content': parsed,
|
|
|
+ 'original_type': type(parsed).__name__,
|
|
|
+ 'parse_method': 'ast'
|
|
|
+ }
|
|
|
+ except (SyntaxError, ValueError, TypeError) as e:
|
|
|
+ print(f" [{depth}] ⚠️ ast解析失败: {e}")
|
|
|
+
|
|
|
+ # 尝试3: 包含字典的复杂字符串(如日志输出)
|
|
|
+ # 查找第一个{和最后一个},尝试提取字典部分
|
|
|
+ start_idx = content.find('{')
|
|
|
+ end_idx = content.rfind('}')
|
|
|
+
|
|
|
+ if start_idx >= 0 and end_idx > start_idx:
|
|
|
+ dict_str = content[start_idx:end_idx + 1]
|
|
|
+
|
|
|
+ # 避免提取的内容和原内容相同(会导致无限递归)
|
|
|
+ if dict_str == content:
|
|
|
+ print(f" [{depth}] ⚠️ 提取的子字符串与原字符串相同,跳过递归")
|
|
|
+ return None
|
|
|
+
|
|
|
+ print(f" [{depth}] 尝试提取子字符串,长度: {len(dict_str)}")
|
|
|
+ print(f" [{depth}] 子字符串前100字符: {dict_str[:100]}...")
|
|
|
+
|
|
|
+ # 递归尝试解析提取的部分,增加深度计数
|
|
|
+ result = self._parse_string_content(dict_str, depth + 1, max_depth)
|
|
|
+ if result:
|
|
|
+ return result
|
|
|
+
|
|
|
+ # 尝试4: 可能是eval安全的简单表示
|
|
|
+ try:
|
|
|
+ # 最后尝试:直接eval(仅用于调试,生产环境慎用)
|
|
|
+ # 这里用更安全的方式
|
|
|
+ import ast
|
|
|
+ parsed = ast.literal_eval(content)
|
|
|
+ print(f" [{depth}] ⚠️ 直接解析成功: {type(parsed)}")
|
|
|
+ return {
|
|
|
+ 'raw_content': content,
|
|
|
+ 'parsed_content': parsed,
|
|
|
+ 'original_type': type(parsed).__name__,
|
|
|
+ 'parse_method': 'direct'
|
|
|
+ }
|
|
|
+ except Exception as e:
|
|
|
+ print(f" [{depth}] ⚠️ 直接解析失败: {e}")
|
|
|
+
|
|
|
+ print(f" [{depth}] ❌ 所有解析方式都失败")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+# 修改主程序的这一部分
|
|
|
+if __name__ == '__main__':
|
|
|
+ import os
|
|
|
+
|
|
|
+
|
|
|
+ os.environ["LANGCHAIN_TRACING_V2"] = "false"
|
|
|
+ os.environ["LANGCHAIN_API_KEY"] = ""
|
|
|
+ # 禁用 LangGraph 的追踪
|
|
|
+ os.environ["LANGSMITH_TRACING"] = "false"
|
|
|
+
|
|
|
+ file_name = "11111_data_standard_20260113_112906.csv"
|
|
|
+ curr_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
+ file_path = os.path.join(curr_dir, "..", "..", "data_files", file_name)
|
|
|
+ print(f"csv文件:{file_path}, 是否存在: {os.path.exists(file_path)}")
|
|
|
+ agent = AnomalyRecognitionAgent(csv_path=file_path, api_key=LLM_API_KEY, base_url=LLM_BASE_URL, model_name=LLM_MODEL_NAME, config=anomaly_recognizer_config)
|
|
|
+ print("\n" + "=" * 60)
|
|
|
+ print("开始运行异常识别流程")
|
|
|
+ print("=" * 60)
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 1. 加载数据
|
|
|
+ print("\n📥 步骤1: 加载交易数据...")
|
|
|
+ transaction_data = agent.load_transaction_data()
|
|
|
+ print(f" 成功加载 {len(transaction_data)} 条交易记录")
|
|
|
+
|
|
|
+ # 2. 执行异常识别
|
|
|
+ print("\n🔍 步骤2: 执行异常识别...")
|
|
|
+ results = agent.execute_full_recognition()
|
|
|
+
|
|
|
+ # 3. 生成报告
|
|
|
+ print("\n📊 步骤3: 生成识别报告...")
|
|
|
+ report_path = agent.generate_recognition_report()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"\n❌ 执行过程中发生错误: {e}")
|
|
|
+ import traceback
|
|
|
+ traceback.print_exc()
|