text.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. from __future__ import annotations
  2. __all__ = (
  3. "TextConnectable",
  4. "TextReceiveStream",
  5. "TextSendStream",
  6. "TextStream",
  7. )
  8. import codecs
  9. import sys
  10. from collections.abc import Callable, Mapping
  11. from dataclasses import InitVar, dataclass, field
  12. from typing import Any
  13. from ..abc import (
  14. AnyByteReceiveStream,
  15. AnyByteSendStream,
  16. AnyByteStream,
  17. AnyByteStreamConnectable,
  18. ObjectReceiveStream,
  19. ObjectSendStream,
  20. ObjectStream,
  21. ObjectStreamConnectable,
  22. )
  23. if sys.version_info >= (3, 12):
  24. from typing import override
  25. else:
  26. from typing_extensions import override
  27. @dataclass(eq=False)
  28. class TextReceiveStream(ObjectReceiveStream[str]):
  29. """
  30. Stream wrapper that decodes bytes to strings using the given encoding.
  31. Decoding is done using :class:`~codecs.IncrementalDecoder` which returns any
  32. completely received unicode characters as soon as they come in.
  33. :param transport_stream: any bytes-based receive stream
  34. :param encoding: character encoding to use for decoding bytes to strings (defaults
  35. to ``utf-8``)
  36. :param errors: handling scheme for decoding errors (defaults to ``strict``; see the
  37. `codecs module documentation`_ for a comprehensive list of options)
  38. .. _codecs module documentation:
  39. https://docs.python.org/3/library/codecs.html#codec-objects
  40. """
  41. transport_stream: AnyByteReceiveStream
  42. encoding: InitVar[str] = "utf-8"
  43. errors: InitVar[str] = "strict"
  44. _decoder: codecs.IncrementalDecoder = field(init=False)
  45. def __post_init__(self, encoding: str, errors: str) -> None:
  46. decoder_class = codecs.getincrementaldecoder(encoding)
  47. self._decoder = decoder_class(errors=errors)
  48. async def receive(self) -> str:
  49. while True:
  50. chunk = await self.transport_stream.receive()
  51. decoded = self._decoder.decode(chunk)
  52. if decoded:
  53. return decoded
  54. async def aclose(self) -> None:
  55. await self.transport_stream.aclose()
  56. self._decoder.reset()
  57. @property
  58. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  59. return self.transport_stream.extra_attributes
  60. @dataclass(eq=False)
  61. class TextSendStream(ObjectSendStream[str]):
  62. """
  63. Sends strings to the wrapped stream as bytes using the given encoding.
  64. :param AnyByteSendStream transport_stream: any bytes-based send stream
  65. :param str encoding: character encoding to use for encoding strings to bytes
  66. (defaults to ``utf-8``)
  67. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see
  68. the `codecs module documentation`_ for a comprehensive list of options)
  69. .. _codecs module documentation:
  70. https://docs.python.org/3/library/codecs.html#codec-objects
  71. """
  72. transport_stream: AnyByteSendStream
  73. encoding: InitVar[str] = "utf-8"
  74. errors: str = "strict"
  75. _encoder: Callable[..., tuple[bytes, int]] = field(init=False)
  76. def __post_init__(self, encoding: str) -> None:
  77. self._encoder = codecs.getencoder(encoding)
  78. async def send(self, item: str) -> None:
  79. encoded = self._encoder(item, self.errors)[0]
  80. await self.transport_stream.send(encoded)
  81. async def aclose(self) -> None:
  82. await self.transport_stream.aclose()
  83. @property
  84. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  85. return self.transport_stream.extra_attributes
  86. @dataclass(eq=False)
  87. class TextStream(ObjectStream[str]):
  88. """
  89. A bidirectional stream that decodes bytes to strings on receive and encodes strings
  90. to bytes on send.
  91. Extra attributes will be provided from both streams, with the receive stream
  92. providing the values in case of a conflict.
  93. :param AnyByteStream transport_stream: any bytes-based stream
  94. :param str encoding: character encoding to use for encoding/decoding strings to/from
  95. bytes (defaults to ``utf-8``)
  96. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see
  97. the `codecs module documentation`_ for a comprehensive list of options)
  98. .. _codecs module documentation:
  99. https://docs.python.org/3/library/codecs.html#codec-objects
  100. """
  101. transport_stream: AnyByteStream
  102. encoding: InitVar[str] = "utf-8"
  103. errors: InitVar[str] = "strict"
  104. _receive_stream: TextReceiveStream = field(init=False)
  105. _send_stream: TextSendStream = field(init=False)
  106. def __post_init__(self, encoding: str, errors: str) -> None:
  107. self._receive_stream = TextReceiveStream(
  108. self.transport_stream, encoding=encoding, errors=errors
  109. )
  110. self._send_stream = TextSendStream(
  111. self.transport_stream, encoding=encoding, errors=errors
  112. )
  113. async def receive(self) -> str:
  114. return await self._receive_stream.receive()
  115. async def send(self, item: str) -> None:
  116. await self._send_stream.send(item)
  117. async def send_eof(self) -> None:
  118. await self.transport_stream.send_eof()
  119. async def aclose(self) -> None:
  120. await self._send_stream.aclose()
  121. await self._receive_stream.aclose()
  122. @property
  123. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  124. return {
  125. **self._send_stream.extra_attributes,
  126. **self._receive_stream.extra_attributes,
  127. }
  128. class TextConnectable(ObjectStreamConnectable[str]):
  129. def __init__(self, connectable: AnyByteStreamConnectable):
  130. """
  131. :param connectable: the bytestream endpoint to wrap
  132. """
  133. self.connectable = connectable
  134. @override
  135. async def connect(self) -> TextStream:
  136. stream = await self.connectable.connect()
  137. return TextStream(stream)