_resources.py 763 B

12345678910111213141516171819202122232425262728293031
  1. from __future__ import annotations
  2. from abc import ABCMeta, abstractmethod
  3. from types import TracebackType
  4. from typing import TypeVar
  5. T = TypeVar("T")
  6. class AsyncResource(metaclass=ABCMeta):
  7. """
  8. Abstract base class for all closeable asynchronous resources.
  9. Works as an asynchronous context manager which returns the instance itself on enter, and calls
  10. :meth:`aclose` on exit.
  11. """
  12. async def __aenter__(self: T) -> T:
  13. return self
  14. async def __aexit__(
  15. self,
  16. exc_type: type[BaseException] | None,
  17. exc_val: BaseException | None,
  18. exc_tb: TracebackType | None,
  19. ) -> None:
  20. await self.aclose()
  21. @abstractmethod
  22. async def aclose(self) -> None:
  23. """Close the resource."""