_streams.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. from __future__ import annotations
  2. from abc import abstractmethod
  3. from typing import Any, Callable, Generic, TypeVar, Union
  4. from .._core._exceptions import EndOfStream
  5. from .._core._typedattr import TypedAttributeProvider
  6. from ._resources import AsyncResource
  7. from ._tasks import TaskGroup
  8. T_Item = TypeVar("T_Item")
  9. T_co = TypeVar("T_co", covariant=True)
  10. T_contra = TypeVar("T_contra", contravariant=True)
  11. class UnreliableObjectReceiveStream(
  12. Generic[T_co], AsyncResource, TypedAttributeProvider
  13. ):
  14. """
  15. An interface for receiving objects.
  16. This interface makes no guarantees that the received messages arrive in the order in which they
  17. were sent, or that no messages are missed.
  18. Asynchronously iterating over objects of this type will yield objects matching the given type
  19. parameter.
  20. """
  21. def __aiter__(self) -> UnreliableObjectReceiveStream[T_co]:
  22. return self
  23. async def __anext__(self) -> T_co:
  24. try:
  25. return await self.receive()
  26. except EndOfStream:
  27. raise StopAsyncIteration
  28. @abstractmethod
  29. async def receive(self) -> T_co:
  30. """
  31. Receive the next item.
  32. :raises ~anyio.ClosedResourceError: if the receive stream has been explicitly
  33. closed
  34. :raises ~anyio.EndOfStream: if this stream has been closed from the other end
  35. :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
  36. due to external causes
  37. """
  38. class UnreliableObjectSendStream(
  39. Generic[T_contra], AsyncResource, TypedAttributeProvider
  40. ):
  41. """
  42. An interface for sending objects.
  43. This interface makes no guarantees that the messages sent will reach the recipient(s) in the
  44. same order in which they were sent, or at all.
  45. """
  46. @abstractmethod
  47. async def send(self, item: T_contra) -> None:
  48. """
  49. Send an item to the peer(s).
  50. :param item: the item to send
  51. :raises ~anyio.ClosedResourceError: if the send stream has been explicitly
  52. closed
  53. :raises ~anyio.BrokenResourceError: if this stream has been rendered unusable
  54. due to external causes
  55. """
  56. class UnreliableObjectStream(
  57. UnreliableObjectReceiveStream[T_Item], UnreliableObjectSendStream[T_Item]
  58. ):
  59. """
  60. A bidirectional message stream which does not guarantee the order or reliability of message
  61. delivery.
  62. """
  63. class ObjectReceiveStream(UnreliableObjectReceiveStream[T_co]):
  64. """
  65. A receive message stream which guarantees that messages are received in the same order in
  66. which they were sent, and that no messages are missed.
  67. """
  68. class ObjectSendStream(UnreliableObjectSendStream[T_contra]):
  69. """
  70. A send message stream which guarantees that messages are delivered in the same order in which
  71. they were sent, without missing any messages in the middle.
  72. """
  73. class ObjectStream(
  74. ObjectReceiveStream[T_Item],
  75. ObjectSendStream[T_Item],
  76. UnreliableObjectStream[T_Item],
  77. ):
  78. """
  79. A bidirectional message stream which guarantees the order and reliability of message delivery.
  80. """
  81. @abstractmethod
  82. async def send_eof(self) -> None:
  83. """
  84. Send an end-of-file indication to the peer.
  85. You should not try to send any further data to this stream after calling this method.
  86. This method is idempotent (does nothing on successive calls).
  87. """
  88. class ByteReceiveStream(AsyncResource, TypedAttributeProvider):
  89. """
  90. An interface for receiving bytes from a single peer.
  91. Iterating this byte stream will yield a byte string of arbitrary length, but no more than
  92. 65536 bytes.
  93. """
  94. def __aiter__(self) -> ByteReceiveStream:
  95. return self
  96. async def __anext__(self) -> bytes:
  97. try:
  98. return await self.receive()
  99. except EndOfStream:
  100. raise StopAsyncIteration
  101. @abstractmethod
  102. async def receive(self, max_bytes: int = 65536) -> bytes:
  103. """
  104. Receive at most ``max_bytes`` bytes from the peer.
  105. .. note:: Implementors of this interface should not return an empty :class:`bytes` object,
  106. and users should ignore them.
  107. :param max_bytes: maximum number of bytes to receive
  108. :return: the received bytes
  109. :raises ~anyio.EndOfStream: if this stream has been closed from the other end
  110. """
  111. class ByteSendStream(AsyncResource, TypedAttributeProvider):
  112. """An interface for sending bytes to a single peer."""
  113. @abstractmethod
  114. async def send(self, item: bytes) -> None:
  115. """
  116. Send the given bytes to the peer.
  117. :param item: the bytes to send
  118. """
  119. class ByteStream(ByteReceiveStream, ByteSendStream):
  120. """A bidirectional byte stream."""
  121. @abstractmethod
  122. async def send_eof(self) -> None:
  123. """
  124. Send an end-of-file indication to the peer.
  125. You should not try to send any further data to this stream after calling this method.
  126. This method is idempotent (does nothing on successive calls).
  127. """
  128. #: Type alias for all unreliable bytes-oriented receive streams.
  129. AnyUnreliableByteReceiveStream = Union[
  130. UnreliableObjectReceiveStream[bytes], ByteReceiveStream
  131. ]
  132. #: Type alias for all unreliable bytes-oriented send streams.
  133. AnyUnreliableByteSendStream = Union[UnreliableObjectSendStream[bytes], ByteSendStream]
  134. #: Type alias for all unreliable bytes-oriented streams.
  135. AnyUnreliableByteStream = Union[UnreliableObjectStream[bytes], ByteStream]
  136. #: Type alias for all bytes-oriented receive streams.
  137. AnyByteReceiveStream = Union[ObjectReceiveStream[bytes], ByteReceiveStream]
  138. #: Type alias for all bytes-oriented send streams.
  139. AnyByteSendStream = Union[ObjectSendStream[bytes], ByteSendStream]
  140. #: Type alias for all bytes-oriented streams.
  141. AnyByteStream = Union[ObjectStream[bytes], ByteStream]
  142. class Listener(Generic[T_co], AsyncResource, TypedAttributeProvider):
  143. """An interface for objects that let you accept incoming connections."""
  144. @abstractmethod
  145. async def serve(
  146. self,
  147. handler: Callable[[T_co], Any],
  148. task_group: TaskGroup | None = None,
  149. ) -> None:
  150. """
  151. Accept incoming connections as they come in and start tasks to handle them.
  152. :param handler: a callable that will be used to handle each accepted connection
  153. :param task_group: the task group that will be used to start tasks for handling each
  154. accepted connection (if omitted, an ad-hoc task group will be created)
  155. """