import asyncio import functools from typing import Callable, Any, List, Coroutine from utils.logger import log def timeout(seconds: int): """ 超时装饰器,防止函数执行时间过长 Args: seconds: 超时时间(秒) Returns: Callable: 装饰后的函数 """ def decorator(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(*args, **kwargs) -> Any: try: return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds) except asyncio.TimeoutError: log.error(f"函数 {func.__name__} 执行超时,已超过 {seconds} 秒") raise Exception(f"执行超时,已超过 {seconds} 秒") return wrapper return decorator class AsyncDispatcher: """异步调度器,支持并发处理多个任务""" def __init__(self, max_concurrency: int = 5): """ 初始化异步调度器 Args: max_concurrency: 最大并发数 """ self.max_concurrency = max_concurrency async def run(self, tasks: List[Coroutine]) -> List[Any]: """ 并发执行多个任务 Args: tasks: 任务列表 Returns: List[Any]: 任务执行结果列表 """ log.info(f"开始并发执行 {len(tasks)} 个任务,最大并发数: {self.max_concurrency}") # 创建信号量控制并发 semaphore = asyncio.Semaphore(self.max_concurrency) async def bounded_task(task: Coroutine) -> Any: async with semaphore: try: return await task except Exception as e: log.error(f"任务执行失败: {str(e)}") return None # 并发执行任务 results = await asyncio.gather( *[bounded_task(task) for task in tasks], return_exceptions=True ) # 处理异常结果 processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): log.error(f"第 {i+1} 个任务执行失败: {str(result)}") processed_results.append(None) else: processed_results.append(result) log.info(f"并发任务执行完成,成功 {sum(1 for r in processed_results if r is not None)} 个") return processed_results