stapled.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. from __future__ import annotations
  2. from dataclasses import dataclass
  3. from typing import Any, Callable, Generic, Mapping, Sequence, TypeVar
  4. from ..abc import (
  5. ByteReceiveStream,
  6. ByteSendStream,
  7. ByteStream,
  8. Listener,
  9. ObjectReceiveStream,
  10. ObjectSendStream,
  11. ObjectStream,
  12. TaskGroup,
  13. )
  14. T_Item = TypeVar("T_Item")
  15. T_Stream = TypeVar("T_Stream")
  16. @dataclass(eq=False)
  17. class StapledByteStream(ByteStream):
  18. """
  19. Combines two byte streams into a single, bidirectional byte stream.
  20. Extra attributes will be provided from both streams, with the receive stream providing the
  21. values in case of a conflict.
  22. :param ByteSendStream send_stream: the sending byte stream
  23. :param ByteReceiveStream receive_stream: the receiving byte stream
  24. """
  25. send_stream: ByteSendStream
  26. receive_stream: ByteReceiveStream
  27. async def receive(self, max_bytes: int = 65536) -> bytes:
  28. return await self.receive_stream.receive(max_bytes)
  29. async def send(self, item: bytes) -> None:
  30. await self.send_stream.send(item)
  31. async def send_eof(self) -> None:
  32. await self.send_stream.aclose()
  33. async def aclose(self) -> None:
  34. await self.send_stream.aclose()
  35. await self.receive_stream.aclose()
  36. @property
  37. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  38. return {
  39. **self.send_stream.extra_attributes,
  40. **self.receive_stream.extra_attributes,
  41. }
  42. @dataclass(eq=False)
  43. class StapledObjectStream(Generic[T_Item], ObjectStream[T_Item]):
  44. """
  45. Combines two object streams into a single, bidirectional object stream.
  46. Extra attributes will be provided from both streams, with the receive stream providing the
  47. values in case of a conflict.
  48. :param ObjectSendStream send_stream: the sending object stream
  49. :param ObjectReceiveStream receive_stream: the receiving object stream
  50. """
  51. send_stream: ObjectSendStream[T_Item]
  52. receive_stream: ObjectReceiveStream[T_Item]
  53. async def receive(self) -> T_Item:
  54. return await self.receive_stream.receive()
  55. async def send(self, item: T_Item) -> None:
  56. await self.send_stream.send(item)
  57. async def send_eof(self) -> None:
  58. await self.send_stream.aclose()
  59. async def aclose(self) -> None:
  60. await self.send_stream.aclose()
  61. await self.receive_stream.aclose()
  62. @property
  63. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  64. return {
  65. **self.send_stream.extra_attributes,
  66. **self.receive_stream.extra_attributes,
  67. }
  68. @dataclass(eq=False)
  69. class MultiListener(Generic[T_Stream], Listener[T_Stream]):
  70. """
  71. Combines multiple listeners into one, serving connections from all of them at once.
  72. Any MultiListeners in the given collection of listeners will have their listeners moved into
  73. this one.
  74. Extra attributes are provided from each listener, with each successive listener overriding any
  75. conflicting attributes from the previous one.
  76. :param listeners: listeners to serve
  77. :type listeners: Sequence[Listener[T_Stream]]
  78. """
  79. listeners: Sequence[Listener[T_Stream]]
  80. def __post_init__(self) -> None:
  81. listeners: list[Listener[T_Stream]] = []
  82. for listener in self.listeners:
  83. if isinstance(listener, MultiListener):
  84. listeners.extend(listener.listeners)
  85. del listener.listeners[:] # type: ignore[attr-defined]
  86. else:
  87. listeners.append(listener)
  88. self.listeners = listeners
  89. async def serve(
  90. self, handler: Callable[[T_Stream], Any], task_group: TaskGroup | None = None
  91. ) -> None:
  92. from .. import create_task_group
  93. async with create_task_group() as tg:
  94. for listener in self.listeners:
  95. tg.start_soon(listener.serve, handler, task_group)
  96. async def aclose(self) -> None:
  97. for listener in self.listeners:
  98. await listener.aclose()
  99. @property
  100. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  101. attributes: dict = {}
  102. for listener in self.listeners:
  103. attributes.update(listener.extra_attributes)
  104. return attributes