memory.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. from __future__ import annotations
  2. __all__ = (
  3. "MemoryObjectReceiveStream",
  4. "MemoryObjectSendStream",
  5. "MemoryObjectStreamStatistics",
  6. )
  7. import warnings
  8. from collections import OrderedDict, deque
  9. from dataclasses import dataclass, field
  10. from types import TracebackType
  11. from typing import Generic, NamedTuple, TypeVar
  12. from .. import (
  13. BrokenResourceError,
  14. ClosedResourceError,
  15. EndOfStream,
  16. WouldBlock,
  17. )
  18. from .._core._testing import TaskInfo, get_current_task
  19. from ..abc import Event, ObjectReceiveStream, ObjectSendStream
  20. from ..lowlevel import checkpoint
  21. T_Item = TypeVar("T_Item")
  22. T_co = TypeVar("T_co", covariant=True)
  23. T_contra = TypeVar("T_contra", contravariant=True)
  24. class MemoryObjectStreamStatistics(NamedTuple):
  25. current_buffer_used: int #: number of items stored in the buffer
  26. #: maximum number of items that can be stored on this stream (or :data:`math.inf`)
  27. max_buffer_size: float
  28. open_send_streams: int #: number of unclosed clones of the send stream
  29. open_receive_streams: int #: number of unclosed clones of the receive stream
  30. #: number of tasks blocked on :meth:`MemoryObjectSendStream.send`
  31. tasks_waiting_send: int
  32. #: number of tasks blocked on :meth:`MemoryObjectReceiveStream.receive`
  33. tasks_waiting_receive: int
  34. @dataclass(eq=False)
  35. class _MemoryObjectItemReceiver(Generic[T_Item]):
  36. task_info: TaskInfo = field(init=False, default_factory=get_current_task)
  37. item: T_Item = field(init=False)
  38. def __repr__(self) -> str:
  39. # When item is not defined, we get following error with default __repr__:
  40. # AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'
  41. item = getattr(self, "item", None)
  42. return f"{self.__class__.__name__}(task_info={self.task_info}, item={item!r})"
  43. @dataclass(eq=False)
  44. class _MemoryObjectStreamState(Generic[T_Item]):
  45. max_buffer_size: float = field()
  46. buffer: deque[T_Item] = field(init=False, default_factory=deque)
  47. open_send_channels: int = field(init=False, default=0)
  48. open_receive_channels: int = field(init=False, default=0)
  49. waiting_receivers: OrderedDict[Event, _MemoryObjectItemReceiver[T_Item]] = field(
  50. init=False, default_factory=OrderedDict
  51. )
  52. waiting_senders: OrderedDict[Event, T_Item] = field(
  53. init=False, default_factory=OrderedDict
  54. )
  55. def statistics(self) -> MemoryObjectStreamStatistics:
  56. return MemoryObjectStreamStatistics(
  57. len(self.buffer),
  58. self.max_buffer_size,
  59. self.open_send_channels,
  60. self.open_receive_channels,
  61. len(self.waiting_senders),
  62. len(self.waiting_receivers),
  63. )
  64. @dataclass(eq=False)
  65. class MemoryObjectReceiveStream(Generic[T_co], ObjectReceiveStream[T_co]):
  66. _state: _MemoryObjectStreamState[T_co]
  67. _closed: bool = field(init=False, default=False)
  68. def __post_init__(self) -> None:
  69. self._state.open_receive_channels += 1
  70. def receive_nowait(self) -> T_co:
  71. """
  72. Receive the next item if it can be done without waiting.
  73. :return: the received item
  74. :raises ~anyio.ClosedResourceError: if this send stream has been closed
  75. :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been
  76. closed from the sending end
  77. :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks
  78. waiting to send
  79. """
  80. if self._closed:
  81. raise ClosedResourceError
  82. if self._state.waiting_senders:
  83. # Get the item from the next sender
  84. send_event, item = self._state.waiting_senders.popitem(last=False)
  85. self._state.buffer.append(item)
  86. send_event.set()
  87. if self._state.buffer:
  88. return self._state.buffer.popleft()
  89. elif not self._state.open_send_channels:
  90. raise EndOfStream
  91. raise WouldBlock
  92. async def receive(self) -> T_co:
  93. await checkpoint()
  94. try:
  95. return self.receive_nowait()
  96. except WouldBlock:
  97. # Add ourselves in the queue
  98. receive_event = Event()
  99. receiver = _MemoryObjectItemReceiver[T_co]()
  100. self._state.waiting_receivers[receive_event] = receiver
  101. try:
  102. await receive_event.wait()
  103. finally:
  104. self._state.waiting_receivers.pop(receive_event, None)
  105. try:
  106. return receiver.item
  107. except AttributeError:
  108. raise EndOfStream from None
  109. def clone(self) -> MemoryObjectReceiveStream[T_co]:
  110. """
  111. Create a clone of this receive stream.
  112. Each clone can be closed separately. Only when all clones have been closed will
  113. the receiving end of the memory stream be considered closed by the sending ends.
  114. :return: the cloned stream
  115. """
  116. if self._closed:
  117. raise ClosedResourceError
  118. return MemoryObjectReceiveStream(_state=self._state)
  119. def close(self) -> None:
  120. """
  121. Close the stream.
  122. This works the exact same way as :meth:`aclose`, but is provided as a special
  123. case for the benefit of synchronous callbacks.
  124. """
  125. if not self._closed:
  126. self._closed = True
  127. self._state.open_receive_channels -= 1
  128. if self._state.open_receive_channels == 0:
  129. send_events = list(self._state.waiting_senders.keys())
  130. for event in send_events:
  131. event.set()
  132. async def aclose(self) -> None:
  133. self.close()
  134. def statistics(self) -> MemoryObjectStreamStatistics:
  135. """
  136. Return statistics about the current state of this stream.
  137. .. versionadded:: 3.0
  138. """
  139. return self._state.statistics()
  140. def __enter__(self) -> MemoryObjectReceiveStream[T_co]:
  141. return self
  142. def __exit__(
  143. self,
  144. exc_type: type[BaseException] | None,
  145. exc_val: BaseException | None,
  146. exc_tb: TracebackType | None,
  147. ) -> None:
  148. self.close()
  149. def __del__(self) -> None:
  150. if not self._closed:
  151. warnings.warn(
  152. f"Unclosed <{self.__class__.__name__} at {id(self):x}>",
  153. ResourceWarning,
  154. stacklevel=1,
  155. source=self,
  156. )
  157. @dataclass(eq=False)
  158. class MemoryObjectSendStream(Generic[T_contra], ObjectSendStream[T_contra]):
  159. _state: _MemoryObjectStreamState[T_contra]
  160. _closed: bool = field(init=False, default=False)
  161. def __post_init__(self) -> None:
  162. self._state.open_send_channels += 1
  163. def send_nowait(self, item: T_contra) -> None:
  164. """
  165. Send an item immediately if it can be done without waiting.
  166. :param item: the item to send
  167. :raises ~anyio.ClosedResourceError: if this send stream has been closed
  168. :raises ~anyio.BrokenResourceError: if the stream has been closed from the
  169. receiving end
  170. :raises ~anyio.WouldBlock: if the buffer is full and there are no tasks waiting
  171. to receive
  172. """
  173. if self._closed:
  174. raise ClosedResourceError
  175. if not self._state.open_receive_channels:
  176. raise BrokenResourceError
  177. while self._state.waiting_receivers:
  178. receive_event, receiver = self._state.waiting_receivers.popitem(last=False)
  179. if not receiver.task_info.has_pending_cancellation():
  180. receiver.item = item
  181. receive_event.set()
  182. return
  183. if len(self._state.buffer) < self._state.max_buffer_size:
  184. self._state.buffer.append(item)
  185. else:
  186. raise WouldBlock
  187. async def send(self, item: T_contra) -> None:
  188. """
  189. Send an item to the stream.
  190. If the buffer is full, this method blocks until there is again room in the
  191. buffer or the item can be sent directly to a receiver.
  192. :param item: the item to send
  193. :raises ~anyio.ClosedResourceError: if this send stream has been closed
  194. :raises ~anyio.BrokenResourceError: if the stream has been closed from the
  195. receiving end
  196. """
  197. await checkpoint()
  198. try:
  199. self.send_nowait(item)
  200. except WouldBlock:
  201. # Wait until there's someone on the receiving end
  202. send_event = Event()
  203. self._state.waiting_senders[send_event] = item
  204. try:
  205. await send_event.wait()
  206. except BaseException:
  207. self._state.waiting_senders.pop(send_event, None)
  208. raise
  209. if send_event in self._state.waiting_senders:
  210. del self._state.waiting_senders[send_event]
  211. raise BrokenResourceError from None
  212. def clone(self) -> MemoryObjectSendStream[T_contra]:
  213. """
  214. Create a clone of this send stream.
  215. Each clone can be closed separately. Only when all clones have been closed will
  216. the sending end of the memory stream be considered closed by the receiving ends.
  217. :return: the cloned stream
  218. """
  219. if self._closed:
  220. raise ClosedResourceError
  221. return MemoryObjectSendStream(_state=self._state)
  222. def close(self) -> None:
  223. """
  224. Close the stream.
  225. This works the exact same way as :meth:`aclose`, but is provided as a special
  226. case for the benefit of synchronous callbacks.
  227. """
  228. if not self._closed:
  229. self._closed = True
  230. self._state.open_send_channels -= 1
  231. if self._state.open_send_channels == 0:
  232. receive_events = list(self._state.waiting_receivers.keys())
  233. self._state.waiting_receivers.clear()
  234. for event in receive_events:
  235. event.set()
  236. async def aclose(self) -> None:
  237. self.close()
  238. def statistics(self) -> MemoryObjectStreamStatistics:
  239. """
  240. Return statistics about the current state of this stream.
  241. .. versionadded:: 3.0
  242. """
  243. return self._state.statistics()
  244. def __enter__(self) -> MemoryObjectSendStream[T_contra]:
  245. return self
  246. def __exit__(
  247. self,
  248. exc_type: type[BaseException] | None,
  249. exc_val: BaseException | None,
  250. exc_tb: TracebackType | None,
  251. ) -> None:
  252. self.close()
  253. def __del__(self) -> None:
  254. if not self._closed:
  255. warnings.warn(
  256. f"Unclosed <{self.__class__.__name__} at {id(self):x}>",
  257. ResourceWarning,
  258. stacklevel=1,
  259. source=self,
  260. )