stability.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import asyncio
  2. import functools
  3. from typing import Callable, Any, List, Coroutine
  4. from utils.logger import log
  5. def timeout(seconds: int):
  6. """
  7. 超时装饰器,防止函数执行时间过长
  8. Args:
  9. seconds: 超时时间(秒)
  10. Returns:
  11. Callable: 装饰后的函数
  12. """
  13. def decorator(func: Callable) -> Callable:
  14. @functools.wraps(func)
  15. async def wrapper(*args, **kwargs) -> Any:
  16. try:
  17. return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
  18. except asyncio.TimeoutError:
  19. log.error(f"函数 {func.__name__} 执行超时,已超过 {seconds} 秒")
  20. raise Exception(f"执行超时,已超过 {seconds} 秒")
  21. return wrapper
  22. return decorator
  23. class AsyncDispatcher:
  24. """异步调度器,支持并发处理多个任务"""
  25. def __init__(self, max_concurrency: int = 5):
  26. """
  27. 初始化异步调度器
  28. Args:
  29. max_concurrency: 最大并发数
  30. """
  31. self.max_concurrency = max_concurrency
  32. async def run(self, tasks: List[Coroutine]) -> List[Any]:
  33. """
  34. 并发执行多个任务
  35. Args:
  36. tasks: 任务列表
  37. Returns:
  38. List[Any]: 任务执行结果列表
  39. """
  40. log.info(f"开始并发执行 {len(tasks)} 个任务,最大并发数: {self.max_concurrency}")
  41. # 创建信号量控制并发
  42. semaphore = asyncio.Semaphore(self.max_concurrency)
  43. async def bounded_task(task: Coroutine) -> Any:
  44. async with semaphore:
  45. try:
  46. return await task
  47. except Exception as e:
  48. log.error(f"任务执行失败: {str(e)}")
  49. return None
  50. # 并发执行任务
  51. results = await asyncio.gather(
  52. *[bounded_task(task) for task in tasks],
  53. return_exceptions=True
  54. )
  55. # 处理异常结果
  56. processed_results = []
  57. for i, result in enumerate(results):
  58. if isinstance(result, Exception):
  59. log.error(f"第 {i+1} 个任务执行失败: {str(result)}")
  60. processed_results.append(None)
  61. else:
  62. processed_results.append(result)
  63. log.info(f"并发任务执行完成,成功 {sum(1 for r in processed_results if r is not None)} 个")
  64. return processed_results