file.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from __future__ import annotations
  2. from io import SEEK_SET, UnsupportedOperation
  3. from os import PathLike
  4. from pathlib import Path
  5. from typing import Any, BinaryIO, Callable, Mapping, cast
  6. from .. import (
  7. BrokenResourceError,
  8. ClosedResourceError,
  9. EndOfStream,
  10. TypedAttributeSet,
  11. to_thread,
  12. typed_attribute,
  13. )
  14. from ..abc import ByteReceiveStream, ByteSendStream
  15. class FileStreamAttribute(TypedAttributeSet):
  16. #: the open file descriptor
  17. file: BinaryIO = typed_attribute()
  18. #: the path of the file on the file system, if available (file must be a real file)
  19. path: Path = typed_attribute()
  20. #: the file number, if available (file must be a real file or a TTY)
  21. fileno: int = typed_attribute()
  22. class _BaseFileStream:
  23. def __init__(self, file: BinaryIO):
  24. self._file = file
  25. async def aclose(self) -> None:
  26. await to_thread.run_sync(self._file.close)
  27. @property
  28. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  29. attributes: dict[Any, Callable[[], Any]] = {
  30. FileStreamAttribute.file: lambda: self._file,
  31. }
  32. if hasattr(self._file, "name"):
  33. attributes[FileStreamAttribute.path] = lambda: Path(self._file.name)
  34. try:
  35. self._file.fileno()
  36. except UnsupportedOperation:
  37. pass
  38. else:
  39. attributes[FileStreamAttribute.fileno] = lambda: self._file.fileno()
  40. return attributes
  41. class FileReadStream(_BaseFileStream, ByteReceiveStream):
  42. """
  43. A byte stream that reads from a file in the file system.
  44. :param file: a file that has been opened for reading in binary mode
  45. .. versionadded:: 3.0
  46. """
  47. @classmethod
  48. async def from_path(cls, path: str | PathLike[str]) -> FileReadStream:
  49. """
  50. Create a file read stream by opening the given file.
  51. :param path: path of the file to read from
  52. """
  53. file = await to_thread.run_sync(Path(path).open, "rb")
  54. return cls(cast(BinaryIO, file))
  55. async def receive(self, max_bytes: int = 65536) -> bytes:
  56. try:
  57. data = await to_thread.run_sync(self._file.read, max_bytes)
  58. except ValueError:
  59. raise ClosedResourceError from None
  60. except OSError as exc:
  61. raise BrokenResourceError from exc
  62. if data:
  63. return data
  64. else:
  65. raise EndOfStream
  66. async def seek(self, position: int, whence: int = SEEK_SET) -> int:
  67. """
  68. Seek the file to the given position.
  69. .. seealso:: :meth:`io.IOBase.seek`
  70. .. note:: Not all file descriptors are seekable.
  71. :param position: position to seek the file to
  72. :param whence: controls how ``position`` is interpreted
  73. :return: the new absolute position
  74. :raises OSError: if the file is not seekable
  75. """
  76. return await to_thread.run_sync(self._file.seek, position, whence)
  77. async def tell(self) -> int:
  78. """
  79. Return the current stream position.
  80. .. note:: Not all file descriptors are seekable.
  81. :return: the current absolute position
  82. :raises OSError: if the file is not seekable
  83. """
  84. return await to_thread.run_sync(self._file.tell)
  85. class FileWriteStream(_BaseFileStream, ByteSendStream):
  86. """
  87. A byte stream that writes to a file in the file system.
  88. :param file: a file that has been opened for writing in binary mode
  89. .. versionadded:: 3.0
  90. """
  91. @classmethod
  92. async def from_path(
  93. cls, path: str | PathLike[str], append: bool = False
  94. ) -> FileWriteStream:
  95. """
  96. Create a file write stream by opening the given file for writing.
  97. :param path: path of the file to write to
  98. :param append: if ``True``, open the file for appending; if ``False``, any existing file
  99. at the given path will be truncated
  100. """
  101. mode = "ab" if append else "wb"
  102. file = await to_thread.run_sync(Path(path).open, mode)
  103. return cls(cast(BinaryIO, file))
  104. async def send(self, item: bytes) -> None:
  105. try:
  106. await to_thread.run_sync(self._file.write, item)
  107. except ValueError:
  108. raise ClosedResourceError from None
  109. except OSError as exc:
  110. raise BrokenResourceError from exc