| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- import asyncio
- import functools
- from typing import Callable, Any, List, Coroutine
- from utils.logger import log
- def timeout(seconds: int):
- """
- 超时装饰器,防止函数执行时间过长
-
- Args:
- seconds: 超时时间(秒)
-
- Returns:
- Callable: 装饰后的函数
- """
- def decorator(func: Callable) -> Callable:
- @functools.wraps(func)
- async def wrapper(*args, **kwargs) -> Any:
- try:
- return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
- except asyncio.TimeoutError:
- log.error(f"函数 {func.__name__} 执行超时,已超过 {seconds} 秒")
- raise Exception(f"执行超时,已超过 {seconds} 秒")
- return wrapper
- return decorator
- class AsyncDispatcher:
- """异步调度器,支持并发处理多个任务"""
-
- def __init__(self, max_concurrency: int = 5):
- """
- 初始化异步调度器
-
- Args:
- max_concurrency: 最大并发数
- """
- self.max_concurrency = max_concurrency
-
- async def run(self, tasks: List[Coroutine]) -> List[Any]:
- """
- 并发执行多个任务
-
- Args:
- tasks: 任务列表
-
- Returns:
- List[Any]: 任务执行结果列表
- """
- log.info(f"开始并发执行 {len(tasks)} 个任务,最大并发数: {self.max_concurrency}")
-
- # 创建信号量控制并发
- semaphore = asyncio.Semaphore(self.max_concurrency)
-
- async def bounded_task(task: Coroutine) -> Any:
- async with semaphore:
- try:
- return await task
- except Exception as e:
- log.error(f"任务执行失败: {str(e)}")
- return None
-
- # 并发执行任务
- results = await asyncio.gather(
- *[bounded_task(task) for task in tasks],
- return_exceptions=True
- )
-
- # 处理异常结果
- processed_results = []
- for i, result in enumerate(results):
- if isinstance(result, Exception):
- log.error(f"第 {i+1} 个任务执行失败: {str(result)}")
- processed_results.append(None)
- else:
- processed_results.append(result)
-
- log.info(f"并发任务执行完成,成功 {sum(1 for r in processed_results if r is not None)} 个")
- return processed_results
|