_streams.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. from __future__ import annotations
  2. import sys
  3. from abc import ABCMeta, abstractmethod
  4. from collections.abc import Callable
  5. from typing import Any, Generic, TypeVar, Union
  6. from .._core._exceptions import EndOfStream
  7. from .._core._typedattr import TypedAttributeProvider
  8. from ._resources import AsyncResource
  9. from ._tasks import TaskGroup
  10. if sys.version_info >= (3, 10):
  11. from typing import TypeAlias
  12. else:
  13. from typing_extensions import TypeAlias
  14. T_Item = TypeVar("T_Item")
  15. T_co = TypeVar("T_co", covariant=True)
  16. T_contra = TypeVar("T_contra", contravariant=True)
  17. class UnreliableObjectReceiveStream(
  18. Generic[T_co], AsyncResource, TypedAttributeProvider
  19. ):
  20. """
  21. An interface for receiving objects.
  22. This interface makes no guarantees that the received messages arrive in the order in
  23. which they were sent, or that no messages are missed.
  24. Asynchronously iterating over objects of this type will yield objects matching the
  25. given type parameter.
  26. """
  27. def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]:
  28. return self
  29. async def __anext__(self) -> T_co:
  30. try:
  31. return await self.receive()
  32. except EndOfStream:
  33. raise StopAsyncIteration from None
  34. @abstractmethod
  35. async def receive(self) -> T_co:
  36. """
  37. Receive the next item.
  38. :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly
  39. closed
  40. :raises ~anyio.EndOfStream: if this stream has been closed from the other end
  41. :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
  42. due to external causes
  43. """
  44. class UnreliableObjectSendStream(
  45. Generic[T_contra], AsyncResource, TypedAttributeProvider
  46. ):
  47. """
  48. An interface for sending objects.
  49. This interface makes no guarantees that the messages sent will reach the
  50. recipient(s) in the same order in which they were sent, or at all.
  51. """
  52. @abstractmethod
  53. async def send(self, item: T_contra) -> None:
  54. """
  55. Send an item to the peer(s).
  56. :param item: the item to send
  57. :raises ~anyio.ClosedResourceError: if the send stream has been explicitly
  58. closed
  59. :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
  60. due to external causes
  61. """
  62. class UnreliableObjectStream(
  63. UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item]
  64. ):
  65. """
  66. A bidirectional message stream which does not guarantee the order or reliability of
  67. message delivery.
  68. """
  69. class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]):
  70. """
  71. A receive message stream which guarantees that messages are received in the same
  72. order in which they were sent, and that no messages are missed.
  73. """
  74. class ObjectSendStream(UnreliableObjectSendStream[T_contra]):
  75. """
  76. A send message stream which guarantees that messages are delivered in the same order
  77. in which they were sent, without missing any messages in the middle.
  78. """
  79. class ObjectStream(
  80. ObjectReceiveStream[T_Item],
  81. ObjectSendStream[T_Item],
  82. UnreliableObjectStream[T_Item],
  83. ):
  84. """
  85. A bidirectional message stream which guarantees the order and reliability of message
  86. delivery.
  87. """
  88. @abstractmethod
  89. async def send_eof(self) -> None:
  90. """
  91. Send an end-of-file indication to the peer.
  92. You should not try to send any further data to this stream after calling this
  93. method. This method is idempotent (does nothing on successive calls).
  94. """
  95. class ByteReceiveStream(AsyncResource, TypedAttributeProvider):
  96. """
  97. An interface for receiving bytes from a single peer.
  98. Iterating this byte stream will yield a byte string of arbitrary length, but no more
  99. than 65536 bytes.
  100. """
  101. def __aiter__(self) -> ByteReceiveStream:
  102. return self
  103. async def __anext__(self) -> bytes:
  104. try:
  105. return await self.receive()
  106. except EndOfStream:
  107. raise StopAsyncIteration from None
  108. @abstractmethod
  109. async def receive(self, max_bytes: int = 65536) -> bytes:
  110. """
  111. Receive at most ``max_bytes`` bytes from the peer.
  112. .. note:: Implementers of this interface should not return an empty
  113. :class:`bytes` object, and users should ignore them.
  114. :param max_bytes: maximum number of bytes to receive
  115. :return: the received bytes
  116. :raises ~anyio.EndOfStream: if this stream has been closed from the other end
  117. """
  118. class ByteSendStream(AsyncResource, TypedAttributeProvider):
  119. """An interface for sending bytes to a single peer."""
  120. @abstractmethod
  121. async def send(self, item: bytes) -> None:
  122. """
  123. Send the given bytes to the peer.
  124. :param item: the bytes to send
  125. """
  126. class ByteStream(ByteReceiveStream, ByteSendStream):
  127. """A bidirectional byte stream."""
  128. @abstractmethod
  129. async def send_eof(self) -> None:
  130. """
  131. Send an end-of-file indication to the peer.
  132. You should not try to send any further data to this stream after calling this
  133. method. This method is idempotent (does nothing on successive calls).
  134. """
  135. #: Type alias for all unreliable bytes-oriented receive streams.
  136. AnyUnreliableByteReceiveStream: TypeAlias = Union[
  137. UnreliableObjectReceiveStream[bytes], ByteReceiveStream
  138. ]
  139. #: Type alias for all unreliable bytes-oriented send streams.
  140. AnyUnreliableByteSendStream: TypeAlias = Union[
  141. UnreliableObjectSendStream[bytes], ByteSendStream
  142. ]
  143. #: Type alias for all unreliable bytes-oriented streams.
  144. AnyUnreliableByteStream: TypeAlias = Union[UnreliableObjectStream[bytes], ByteStream]
  145. #: Type alias for all bytes-oriented receive streams.
  146. AnyByteReceiveStream: TypeAlias = Union[ObjectReceiveStream[bytes], ByteReceiveStream]
  147. #: Type alias for all bytes-oriented send streams.
  148. AnyByteSendStream: TypeAlias = Union[ObjectSendStream[bytes], ByteSendStream]
  149. #: Type alias for all bytes-oriented streams.
  150. AnyByteStream: TypeAlias = Union[ObjectStream[bytes], ByteStream]
  151. class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider):
  152. """An interface for objects that let you accept incoming connections."""
  153. @abstractmethod
  154. async def serve(
  155. self, handler: Callable[[T_co], Any], task_group: TaskGroup | None = None
  156. ) -> None:
  157. """
  158. Accept incoming connections as they come in and start tasks to handle them.
  159. :param handler: a callable that will be used to handle each accepted connection
  160. :param task_group: the task group that will be used to start tasks for handling
  161. each accepted connection (if omitted, an ad-hoc task group will be created)
  162. """
  163. class ObjectStreamConnectable(Generic[T_co], metaclass=ABCMeta):
  164. @abstractmethod
  165. async def connect(self) -> ObjectStream[T_co]:
  166. """
  167. Connect to the remote endpoint.
  168. :return: an object stream connected to the remote end
  169. :raises ConnectionFailed: if the connection fails
  170. """
  171. class ByteStreamConnectable(metaclass=ABCMeta):
  172. @abstractmethod
  173. async def connect(self) -> ByteStream:
  174. """
  175. Connect to the remote endpoint.
  176. :return: a bytestream connected to the remote end
  177. :raises ConnectionFailed: if the connection fails
  178. """
  179. #: Type alias for all connectables returning bytestreams or bytes-oriented object streams
  180. AnyByteStreamConnectable: TypeAlias = Union[
  181. ObjectStreamConnectable[bytes], ByteStreamConnectable
  182. ]