run_async.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import asyncio
  2. import threading
  3. from queue import Queue
  4. from typing import Any, AsyncIterable, Coroutine, Iterable, TypeVar
  5. T = TypeVar("T")
  6. def run_async(coroutine: Coroutine[Any, Any, T]) -> T:
  7. if not asyncio.iscoroutine(coroutine):
  8. raise ValueError("a coroutine was expected, got {!r}".format(coroutine))
  9. try:
  10. loop = asyncio.get_running_loop()
  11. except RuntimeError:
  12. loop = None
  13. if loop is not None:
  14. return loop.run_until_complete(coroutine)
  15. else:
  16. return asyncio.run(coroutine)
  17. def iter_async(iterable: AsyncIterable[T]) -> Iterable[T]:
  18. if not isinstance(iterable, AsyncIterable):
  19. raise ValueError("an async iterable was expected, got {!r}".format(iterable))
  20. queue = Queue()
  21. async def async_helper():
  22. try:
  23. async for chunk in iterable:
  24. queue.put(chunk)
  25. queue.put(None)
  26. except Exception as e:
  27. queue.put(e)
  28. def helper():
  29. run_async(async_helper())
  30. thread = threading.Thread(target=helper, daemon=True)
  31. thread.start()
  32. while True:
  33. chunk = queue.get()
  34. if chunk is None:
  35. break
  36. if isinstance(chunk, Exception):
  37. raise chunk
  38. yield chunk
  39. thread.join()