buffered.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from __future__ import annotations
  2. from dataclasses import dataclass, field
  3. from typing import Any, Callable, Mapping
  4. from .. import ClosedResourceError, DelimiterNotFound, EndOfStream, IncompleteRead
  5. from ..abc import AnyByteReceiveStream, ByteReceiveStream
  6. @dataclass(eq=False)
  7. class BufferedByteReceiveStream(ByteReceiveStream):
  8. """
  9. Wraps any bytes-based receive stream and uses a buffer to provide sophisticated receiving
  10. capabilities in the form of a byte stream.
  11. """
  12. receive_stream: AnyByteReceiveStream
  13. _buffer: bytearray = field(init=False, default_factory=bytearray)
  14. _closed: bool = field(init=False, default=False)
  15. async def aclose(self) -> None:
  16. await self.receive_stream.aclose()
  17. self._closed = True
  18. @property
  19. def buffer(self) -> bytes:
  20. """The bytes currently in the buffer."""
  21. return bytes(self._buffer)
  22. @property
  23. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  24. return self.receive_stream.extra_attributes
  25. async def receive(self, max_bytes: int = 65536) -> bytes:
  26. if self._closed:
  27. raise ClosedResourceError
  28. if self._buffer:
  29. chunk = bytes(self._buffer[:max_bytes])
  30. del self._buffer[:max_bytes]
  31. return chunk
  32. elif isinstance(self.receive_stream, ByteReceiveStream):
  33. return await self.receive_stream.receive(max_bytes)
  34. else:
  35. # With a bytes-oriented object stream, we need to handle any surplus bytes we get from
  36. # the receive() call
  37. chunk = await self.receive_stream.receive()
  38. if len(chunk) > max_bytes:
  39. # Save the surplus bytes in the buffer
  40. self._buffer.extend(chunk[max_bytes:])
  41. return chunk[:max_bytes]
  42. else:
  43. return chunk
  44. async def receive_exactly(self, nbytes: int) -> bytes:
  45. """
  46. Read exactly the given amount of bytes from the stream.
  47. :param nbytes: the number of bytes to read
  48. :return: the bytes read
  49. :raises ~anyio.IncompleteRead: if the stream was closed before the requested
  50. amount of bytes could be read from the stream
  51. """
  52. while True:
  53. remaining = nbytes - len(self._buffer)
  54. if remaining <= 0:
  55. retval = self._buffer[:nbytes]
  56. del self._buffer[:nbytes]
  57. return bytes(retval)
  58. try:
  59. if isinstance(self.receive_stream, ByteReceiveStream):
  60. chunk = await self.receive_stream.receive(remaining)
  61. else:
  62. chunk = await self.receive_stream.receive()
  63. except EndOfStream as exc:
  64. raise IncompleteRead from exc
  65. self._buffer.extend(chunk)
  66. async def receive_until(self, delimiter: bytes, max_bytes: int) -> bytes:
  67. """
  68. Read from the stream until the delimiter is found or max_bytes have been read.
  69. :param delimiter: the marker to look for in the stream
  70. :param max_bytes: maximum number of bytes that will be read before raising
  71. :exc:`~anyio.DelimiterNotFound`
  72. :return: the bytes read (not including the delimiter)
  73. :raises ~anyio.IncompleteRead: if the stream was closed before the delimiter
  74. was found
  75. :raises ~anyio.DelimiterNotFound: if the delimiter is not found within the
  76. bytes read up to the maximum allowed
  77. """
  78. delimiter_size = len(delimiter)
  79. offset = 0
  80. while True:
  81. # Check if the delimiter can be found in the current buffer
  82. index = self._buffer.find(delimiter, offset)
  83. if index >= 0:
  84. found = self._buffer[:index]
  85. del self._buffer[: index + len(delimiter) :]
  86. return bytes(found)
  87. # Check if the buffer is already at or over the limit
  88. if len(self._buffer) >= max_bytes:
  89. raise DelimiterNotFound(max_bytes)
  90. # Read more data into the buffer from the socket
  91. try:
  92. data = await self.receive_stream.receive()
  93. except EndOfStream as exc:
  94. raise IncompleteRead from exc
  95. # Move the offset forward and add the new data to the buffer
  96. offset = max(len(self._buffer) - delimiter_size + 1, 0)
  97. self._buffer.extend(data)