_eventloop.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. from __future__ import annotations
  2. import math
  3. import sys
  4. from abc import ABCMeta, abstractmethod
  5. from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
  6. from contextlib import AbstractContextManager
  7. from os import PathLike
  8. from signal import Signals
  9. from socket import AddressFamily, SocketKind, socket
  10. from typing import (
  11. IO,
  12. TYPE_CHECKING,
  13. Any,
  14. TypeVar,
  15. Union,
  16. overload,
  17. )
  18. if sys.version_info >= (3, 11):
  19. from typing import TypeVarTuple, Unpack
  20. else:
  21. from typing_extensions import TypeVarTuple, Unpack
  22. if sys.version_info >= (3, 10):
  23. from typing import TypeAlias
  24. else:
  25. from typing_extensions import TypeAlias
  26. if TYPE_CHECKING:
  27. from _typeshed import FileDescriptorLike
  28. from .._core._synchronization import CapacityLimiter, Event, Lock, Semaphore
  29. from .._core._tasks import CancelScope
  30. from .._core._testing import TaskInfo
  31. from ..from_thread import BlockingPortal
  32. from ._sockets import (
  33. ConnectedUDPSocket,
  34. ConnectedUNIXDatagramSocket,
  35. IPSockAddrType,
  36. SocketListener,
  37. SocketStream,
  38. UDPSocket,
  39. UNIXDatagramSocket,
  40. UNIXSocketStream,
  41. )
  42. from ._subprocesses import Process
  43. from ._tasks import TaskGroup
  44. from ._testing import TestRunner
  45. T_Retval = TypeVar("T_Retval")
  46. PosArgsT = TypeVarTuple("PosArgsT")
  47. StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]
  48. class AsyncBackend(metaclass=ABCMeta):
  49. @classmethod
  50. @abstractmethod
  51. def run(
  52. cls,
  53. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  54. args: tuple[Unpack[PosArgsT]],
  55. kwargs: dict[str, Any],
  56. options: dict[str, Any],
  57. ) -> T_Retval:
  58. """
  59. Run the given coroutine function in an asynchronous event loop.
  60. The current thread must not be already running an event loop.
  61. :param func: a coroutine function
  62. :param args: positional arguments to ``func``
  63. :param kwargs: positional arguments to ``func``
  64. :param options: keyword arguments to call the backend ``run()`` implementation
  65. with
  66. :return: the return value of the coroutine function
  67. """
  68. @classmethod
  69. @abstractmethod
  70. def current_token(cls) -> object:
  71. """
  72. Return an object that allows other threads to run code inside the event loop.
  73. :return: a token object, specific to the event loop running in the current
  74. thread
  75. """
  76. @classmethod
  77. @abstractmethod
  78. def current_time(cls) -> float:
  79. """
  80. Return the current value of the event loop's internal clock.
  81. :return: the clock value (seconds)
  82. """
  83. @classmethod
  84. @abstractmethod
  85. def cancelled_exception_class(cls) -> type[BaseException]:
  86. """Return the exception class that is raised in a task if it's cancelled."""
  87. @classmethod
  88. @abstractmethod
  89. async def checkpoint(cls) -> None:
  90. """
  91. Check if the task has been cancelled, and allow rescheduling of other tasks.
  92. This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
  93. :meth:`cancel_shielded_checkpoint`.
  94. """
  95. @classmethod
  96. async def checkpoint_if_cancelled(cls) -> None:
  97. """
  98. Check if the current task group has been cancelled.
  99. This will check if the task has been cancelled, but will not allow other tasks
  100. to be scheduled if not.
  101. """
  102. if cls.current_effective_deadline() == -math.inf:
  103. await cls.checkpoint()
  104. @classmethod
  105. async def cancel_shielded_checkpoint(cls) -> None:
  106. """
  107. Allow the rescheduling of other tasks.
  108. This will give other tasks the opportunity to run, but without checking if the
  109. current task group has been cancelled, unlike with :meth:`checkpoint`.
  110. """
  111. with cls.create_cancel_scope(shield=True):
  112. await cls.sleep(0)
  113. @classmethod
  114. @abstractmethod
  115. async def sleep(cls, delay: float) -> None:
  116. """
  117. Pause the current task for the specified duration.
  118. :param delay: the duration, in seconds
  119. """
  120. @classmethod
  121. @abstractmethod
  122. def create_cancel_scope(
  123. cls, *, deadline: float = math.inf, shield: bool = False
  124. ) -> CancelScope:
  125. pass
  126. @classmethod
  127. @abstractmethod
  128. def current_effective_deadline(cls) -> float:
  129. """
  130. Return the nearest deadline among all the cancel scopes effective for the
  131. current task.
  132. :return:
  133. - a clock value from the event loop's internal clock
  134. - ``inf`` if there is no deadline in effect
  135. - ``-inf`` if the current scope has been cancelled
  136. :rtype: float
  137. """
  138. @classmethod
  139. @abstractmethod
  140. def create_task_group(cls) -> TaskGroup:
  141. pass
  142. @classmethod
  143. @abstractmethod
  144. def create_event(cls) -> Event:
  145. pass
  146. @classmethod
  147. @abstractmethod
  148. def create_lock(cls, *, fast_acquire: bool) -> Lock:
  149. pass
  150. @classmethod
  151. @abstractmethod
  152. def create_semaphore(
  153. cls,
  154. initial_value: int,
  155. *,
  156. max_value: int | None = None,
  157. fast_acquire: bool = False,
  158. ) -> Semaphore:
  159. pass
  160. @classmethod
  161. @abstractmethod
  162. def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
  163. pass
  164. @classmethod
  165. @abstractmethod
  166. async def run_sync_in_worker_thread(
  167. cls,
  168. func: Callable[[Unpack[PosArgsT]], T_Retval],
  169. args: tuple[Unpack[PosArgsT]],
  170. abandon_on_cancel: bool = False,
  171. limiter: CapacityLimiter | None = None,
  172. ) -> T_Retval:
  173. pass
  174. @classmethod
  175. @abstractmethod
  176. def check_cancelled(cls) -> None:
  177. pass
  178. @classmethod
  179. @abstractmethod
  180. def run_async_from_thread(
  181. cls,
  182. func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
  183. args: tuple[Unpack[PosArgsT]],
  184. token: object,
  185. ) -> T_Retval:
  186. pass
  187. @classmethod
  188. @abstractmethod
  189. def run_sync_from_thread(
  190. cls,
  191. func: Callable[[Unpack[PosArgsT]], T_Retval],
  192. args: tuple[Unpack[PosArgsT]],
  193. token: object,
  194. ) -> T_Retval:
  195. pass
  196. @classmethod
  197. @abstractmethod
  198. def create_blocking_portal(cls) -> BlockingPortal:
  199. pass
  200. @classmethod
  201. @abstractmethod
  202. async def open_process(
  203. cls,
  204. command: StrOrBytesPath | Sequence[StrOrBytesPath],
  205. *,
  206. stdin: int | IO[Any] | None,
  207. stdout: int | IO[Any] | None,
  208. stderr: int | IO[Any] | None,
  209. **kwargs: Any,
  210. ) -> Process:
  211. pass
  212. @classmethod
  213. @abstractmethod
  214. def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
  215. pass
  216. @classmethod
  217. @abstractmethod
  218. async def connect_tcp(
  219. cls, host: str, port: int, local_address: IPSockAddrType | None = None
  220. ) -> SocketStream:
  221. pass
  222. @classmethod
  223. @abstractmethod
  224. async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
  225. pass
  226. @classmethod
  227. @abstractmethod
  228. def create_tcp_listener(cls, sock: socket) -> SocketListener:
  229. pass
  230. @classmethod
  231. @abstractmethod
  232. def create_unix_listener(cls, sock: socket) -> SocketListener:
  233. pass
  234. @classmethod
  235. @abstractmethod
  236. async def create_udp_socket(
  237. cls,
  238. family: AddressFamily,
  239. local_address: IPSockAddrType | None,
  240. remote_address: IPSockAddrType | None,
  241. reuse_port: bool,
  242. ) -> UDPSocket | ConnectedUDPSocket:
  243. pass
  244. @classmethod
  245. @overload
  246. async def create_unix_datagram_socket(
  247. cls, raw_socket: socket, remote_path: None
  248. ) -> UNIXDatagramSocket: ...
  249. @classmethod
  250. @overload
  251. async def create_unix_datagram_socket(
  252. cls, raw_socket: socket, remote_path: str | bytes
  253. ) -> ConnectedUNIXDatagramSocket: ...
  254. @classmethod
  255. @abstractmethod
  256. async def create_unix_datagram_socket(
  257. cls, raw_socket: socket, remote_path: str | bytes | None
  258. ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
  259. pass
  260. @classmethod
  261. @abstractmethod
  262. async def getaddrinfo(
  263. cls,
  264. host: bytes | str | None,
  265. port: str | int | None,
  266. *,
  267. family: int | AddressFamily = 0,
  268. type: int | SocketKind = 0,
  269. proto: int = 0,
  270. flags: int = 0,
  271. ) -> Sequence[
  272. tuple[
  273. AddressFamily,
  274. SocketKind,
  275. int,
  276. str,
  277. tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes],
  278. ]
  279. ]:
  280. pass
  281. @classmethod
  282. @abstractmethod
  283. async def getnameinfo(
  284. cls, sockaddr: IPSockAddrType, flags: int = 0
  285. ) -> tuple[str, str]:
  286. pass
  287. @classmethod
  288. @abstractmethod
  289. async def wait_readable(cls, obj: FileDescriptorLike) -> None:
  290. pass
  291. @classmethod
  292. @abstractmethod
  293. async def wait_writable(cls, obj: FileDescriptorLike) -> None:
  294. pass
  295. @classmethod
  296. @abstractmethod
  297. def notify_closing(cls, obj: FileDescriptorLike) -> None:
  298. pass
  299. @classmethod
  300. @abstractmethod
  301. async def wrap_listener_socket(cls, sock: socket) -> SocketListener:
  302. pass
  303. @classmethod
  304. @abstractmethod
  305. async def wrap_stream_socket(cls, sock: socket) -> SocketStream:
  306. pass
  307. @classmethod
  308. @abstractmethod
  309. async def wrap_unix_stream_socket(cls, sock: socket) -> UNIXSocketStream:
  310. pass
  311. @classmethod
  312. @abstractmethod
  313. async def wrap_udp_socket(cls, sock: socket) -> UDPSocket:
  314. pass
  315. @classmethod
  316. @abstractmethod
  317. async def wrap_connected_udp_socket(cls, sock: socket) -> ConnectedUDPSocket:
  318. pass
  319. @classmethod
  320. @abstractmethod
  321. async def wrap_unix_datagram_socket(cls, sock: socket) -> UNIXDatagramSocket:
  322. pass
  323. @classmethod
  324. @abstractmethod
  325. async def wrap_connected_unix_datagram_socket(
  326. cls, sock: socket
  327. ) -> ConnectedUNIXDatagramSocket:
  328. pass
  329. @classmethod
  330. @abstractmethod
  331. def current_default_thread_limiter(cls) -> CapacityLimiter:
  332. pass
  333. @classmethod
  334. @abstractmethod
  335. def open_signal_receiver(
  336. cls, *signals: Signals
  337. ) -> AbstractContextManager[AsyncIterator[Signals]]:
  338. pass
  339. @classmethod
  340. @abstractmethod
  341. def get_current_task(cls) -> TaskInfo:
  342. pass
  343. @classmethod
  344. @abstractmethod
  345. def get_running_tasks(cls) -> Sequence[TaskInfo]:
  346. pass
  347. @classmethod
  348. @abstractmethod
  349. async def wait_all_tasks_blocked(cls) -> None:
  350. pass
  351. @classmethod
  352. @abstractmethod
  353. def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
  354. pass