file.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. from __future__ import annotations
  2. __all__ = (
  3. "FileReadStream",
  4. "FileStreamAttribute",
  5. "FileWriteStream",
  6. )
  7. from collections.abc import Callable, Mapping
  8. from io import SEEK_SET, UnsupportedOperation
  9. from os import PathLike
  10. from pathlib import Path
  11. from typing import Any, BinaryIO, cast
  12. from .. import (
  13. BrokenResourceError,
  14. ClosedResourceError,
  15. EndOfStream,
  16. TypedAttributeSet,
  17. to_thread,
  18. typed_attribute,
  19. )
  20. from ..abc import ByteReceiveStream, ByteSendStream
  21. class FileStreamAttribute(TypedAttributeSet):
  22. #: the open file descriptor
  23. file: BinaryIO = typed_attribute()
  24. #: the path of the file on the file system, if available (file must be a real file)
  25. path: Path = typed_attribute()
  26. #: the file number, if available (file must be a real file or a TTY)
  27. fileno: int = typed_attribute()
  28. class _BaseFileStream:
  29. def __init__(self, file: BinaryIO):
  30. self._file = file
  31. async def aclose(self) -> None:
  32. await to_thread.run_sync(self._file.close)
  33. @property
  34. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  35. attributes: dict[Any, Callable[[], Any]] = {
  36. FileStreamAttribute.file: lambda: self._file,
  37. }
  38. if hasattr(self._file, "name"):
  39. attributes[FileStreamAttribute.path] = lambda: Path(self._file.name)
  40. try:
  41. self._file.fileno()
  42. except UnsupportedOperation:
  43. pass
  44. else:
  45. attributes[FileStreamAttribute.fileno] = lambda: self._file.fileno()
  46. return attributes
  47. class FileReadStream(_BaseFileStream, ByteReceiveStream):
  48. """
  49. A byte stream that reads from a file in the file system.
  50. :param file: a file that has been opened for reading in binary mode
  51. .. versionadded:: 3.0
  52. """
  53. @classmethod
  54. async def from_path(cls, path: str | PathLike[str]) -> FileReadStream:
  55. """
  56. Create a file read stream by opening the given file.
  57. :param path: path of the file to read from
  58. """
  59. file = await to_thread.run_sync(Path(path).open, "rb")
  60. return cls(cast(BinaryIO, file))
  61. async def receive(self, max_bytes: int = 65536) -> bytes:
  62. try:
  63. data = await to_thread.run_sync(self._file.read, max_bytes)
  64. except ValueError:
  65. raise ClosedResourceError from None
  66. except OSError as exc:
  67. raise BrokenResourceError from exc
  68. if data:
  69. return data
  70. else:
  71. raise EndOfStream
  72. async def seek(self, position: int, whence: int = SEEK_SET) -> int:
  73. """
  74. Seek the file to the given position.
  75. .. seealso:: :meth:`io.IOBase.seek`
  76. .. note:: Not all file descriptors are seekable.
  77. :param position: position to seek the file to
  78. :param whence: controls how ``position`` is interpreted
  79. :return: the new absolute position
  80. :raises OSError: if the file is not seekable
  81. """
  82. return await to_thread.run_sync(self._file.seek, position, whence)
  83. async def tell(self) -> int:
  84. """
  85. Return the current stream position.
  86. .. note:: Not all file descriptors are seekable.
  87. :return: the current absolute position
  88. :raises OSError: if the file is not seekable
  89. """
  90. return await to_thread.run_sync(self._file.tell)
  91. class FileWriteStream(_BaseFileStream, ByteSendStream):
  92. """
  93. A byte stream that writes to a file in the file system.
  94. :param file: a file that has been opened for writing in binary mode
  95. .. versionadded:: 3.0
  96. """
  97. @classmethod
  98. async def from_path(
  99. cls, path: str | PathLike[str], append: bool = False
  100. ) -> FileWriteStream:
  101. """
  102. Create a file write stream by opening the given file for writing.
  103. :param path: path of the file to write to
  104. :param append: if ``True``, open the file for appending; if ``False``, any
  105. existing file at the given path will be truncated
  106. """
  107. mode = "ab" if append else "wb"
  108. file = await to_thread.run_sync(Path(path).open, mode)
  109. return cls(cast(BinaryIO, file))
  110. async def send(self, item: bytes) -> None:
  111. try:
  112. await to_thread.run_sync(self._file.write, item)
  113. except ValueError:
  114. raise ClosedResourceError from None
  115. except OSError as exc:
  116. raise BrokenResourceError from exc