text.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. from __future__ import annotations
  2. import codecs
  3. from dataclasses import InitVar, dataclass, field
  4. from typing import Any, Callable, Mapping
  5. from ..abc import (
  6. AnyByteReceiveStream,
  7. AnyByteSendStream,
  8. AnyByteStream,
  9. ObjectReceiveStream,
  10. ObjectSendStream,
  11. ObjectStream,
  12. )
  13. @dataclass(eq=False)
  14. class TextReceiveStream(ObjectReceiveStream[str]):
  15. """
  16. Stream wrapper that decodes bytes to strings using the given encoding.
  17. Decoding is done using :class:`~codecs.IncrementalDecoder` which returns any completely
  18. received unicode characters as soon as they come in.
  19. :param transport_stream: any bytes-based receive stream
  20. :param encoding: character encoding to use for decoding bytes to strings (defaults to
  21. ``utf-8``)
  22. :param errors: handling scheme for decoding errors (defaults to ``strict``; see the
  23. `codecs module documentation`_ for a comprehensive list of options)
  24. .. _codecs module documentation: https://docs.python.org/3/library/codecs.html#codec-objects
  25. """
  26. transport_stream: AnyByteReceiveStream
  27. encoding: InitVar[str] = "utf-8"
  28. errors: InitVar[str] = "strict"
  29. _decoder: codecs.IncrementalDecoder = field(init=False)
  30. def __post_init__(self, encoding: str, errors: str) -> None:
  31. decoder_class = codecs.getincrementaldecoder(encoding)
  32. self._decoder = decoder_class(errors=errors)
  33. async def receive(self) -> str:
  34. while True:
  35. chunk = await self.transport_stream.receive()
  36. decoded = self._decoder.decode(chunk)
  37. if decoded:
  38. return decoded
  39. async def aclose(self) -> None:
  40. await self.transport_stream.aclose()
  41. self._decoder.reset()
  42. @property
  43. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  44. return self.transport_stream.extra_attributes
  45. @dataclass(eq=False)
  46. class TextSendStream(ObjectSendStream[str]):
  47. """
  48. Sends strings to the wrapped stream as bytes using the given encoding.
  49. :param AnyByteSendStream transport_stream: any bytes-based send stream
  50. :param str encoding: character encoding to use for encoding strings to bytes (defaults to
  51. ``utf-8``)
  52. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see the
  53. `codecs module documentation`_ for a comprehensive list of options)
  54. .. _codecs module documentation: https://docs.python.org/3/library/codecs.html#codec-objects
  55. """
  56. transport_stream: AnyByteSendStream
  57. encoding: InitVar[str] = "utf-8"
  58. errors: str = "strict"
  59. _encoder: Callable[..., tuple[bytes, int]] = field(init=False)
  60. def __post_init__(self, encoding: str) -> None:
  61. self._encoder = codecs.getencoder(encoding)
  62. async def send(self, item: str) -> None:
  63. encoded = self._encoder(item, self.errors)[0]
  64. await self.transport_stream.send(encoded)
  65. async def aclose(self) -> None:
  66. await self.transport_stream.aclose()
  67. @property
  68. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  69. return self.transport_stream.extra_attributes
  70. @dataclass(eq=False)
  71. class TextStream(ObjectStream[str]):
  72. """
  73. A bidirectional stream that decodes bytes to strings on receive and encodes strings to bytes on
  74. send.
  75. Extra attributes will be provided from both streams, with the receive stream providing the
  76. values in case of a conflict.
  77. :param AnyByteStream transport_stream: any bytes-based stream
  78. :param str encoding: character encoding to use for encoding/decoding strings to/from bytes
  79. (defaults to ``utf-8``)
  80. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see the
  81. `codecs module documentation`_ for a comprehensive list of options)
  82. .. _codecs module documentation: https://docs.python.org/3/library/codecs.html#codec-objects
  83. """
  84. transport_stream: AnyByteStream
  85. encoding: InitVar[str] = "utf-8"
  86. errors: InitVar[str] = "strict"
  87. _receive_stream: TextReceiveStream = field(init=False)
  88. _send_stream: TextSendStream = field(init=False)
  89. def __post_init__(self, encoding: str, errors: str) -> None:
  90. self._receive_stream = TextReceiveStream(
  91. self.transport_stream, encoding=encoding, errors=errors
  92. )
  93. self._send_stream = TextSendStream(
  94. self.transport_stream, encoding=encoding, errors=errors
  95. )
  96. async def receive(self) -> str:
  97. return await self._receive_stream.receive()
  98. async def send(self, item: str) -> None:
  99. await self._send_stream.send(item)
  100. async def send_eof(self) -> None:
  101. await self.transport_stream.send_eof()
  102. async def aclose(self) -> None:
  103. await self._send_stream.aclose()
  104. await self._receive_stream.aclose()
  105. @property
  106. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  107. return {
  108. **self._send_stream.extra_attributes,
  109. **self._receive_stream.extra_attributes,
  110. }