request.py 8.2 KB


  1. from __future__ import annotations
  2. import io
  3. import sys
  4. import typing
  5. from base64 import b64encode
  6. from enum import Enum
  7. from ..exceptions import UnrewindableBodyError
  8. from .util import to_bytes
  9. if typing.TYPE_CHECKING:
  10. from typing import Final
  11. # Pass as a value within ``headers`` to skip
  12. # emitting some HTTP headers that are added automatically.
  13. # The only headers that are supported are ``Accept-Encoding``,
  14. # ``Host``, and ``User-Agent``.
  15. SKIP_HEADER = "@@@SKIP_HEADER@@@"
  16. SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"])
  17. ACCEPT_ENCODING = "gzip,deflate"
  18. try:
  19. try:
  20. import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
  21. except ImportError:
  22. import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
  23. except ImportError:
  24. pass
  25. else:
  26. ACCEPT_ENCODING += ",br"
  27. try:
  28. if sys.version_info >= (3, 14):
  29. from compression import zstd as _unused_module_zstd # noqa: F401
  30. else:
  31. from backports import zstd as _unused_module_zstd # noqa: F401
  32. except ImportError:
  33. pass
  34. else:
  35. ACCEPT_ENCODING += ",zstd"
  36. class _TYPE_FAILEDTELL(Enum):
  37. token = 0
  38. _FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token
  39. _TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL]
  40. # When sending a request with these methods we aren't expecting
  41. # a body so don't need to set an explicit 'Content-Length: 0'
  42. # The reason we do this in the negative instead of tracking methods
  43. # which 'should' have a body is because unknown methods should be
  44. # treated as if they were 'POST' which *does* expect a body.
  45. _METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"}
  46. def make_headers(
  47. keep_alive: bool | None = None,
  48. accept_encoding: bool | list[str] | str | None = None,
  49. user_agent: str | None = None,
  50. basic_auth: str | None = None,
  51. proxy_basic_auth: str | None = None,
  52. disable_cache: bool | None = None,
  53. ) -> dict[str, str]:
  54. """
  55. Shortcuts for generating request headers.
  56. :param keep_alive:
  57. If ``True``, adds 'connection: keep-alive' header.
  58. :param accept_encoding:
  59. Can be a boolean, list, or string.
  60. ``True`` translates to 'gzip,deflate'. If the dependencies for
  61. Brotli (either the ``brotli`` or ``brotlicffi`` package) and/or
  62. Zstandard (the ``backports.zstd`` package for Python before 3.14)
  63. algorithms are installed, then their encodings are
  64. included in the string ('br' and 'zstd', respectively).
  65. List will get joined by comma.
  66. String will be used as provided.
  67. :param user_agent:
  68. String representing the user-agent you want, such as
  69. "python-urllib3/0.6"
  70. :param basic_auth:
  71. Colon-separated username:password string for 'authorization: basic ...'
  72. auth header.
  73. :param proxy_basic_auth:
  74. Colon-separated username:password string for 'proxy-authorization: basic ...'
  75. auth header.
  76. :param disable_cache:
  77. If ``True``, adds 'cache-control: no-cache' header.
  78. Example:
  79. .. code-block:: python
  80. import urllib3
  81. print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0"))
  82. # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
  83. print(urllib3.util.make_headers(accept_encoding=True))
  84. # {'accept-encoding': 'gzip,deflate'}
  85. """
  86. headers: dict[str, str] = {}
  87. if accept_encoding:
  88. if isinstance(accept_encoding, str):
  89. pass
  90. elif isinstance(accept_encoding, list):
  91. accept_encoding = ",".join(accept_encoding)
  92. else:
  93. accept_encoding = ACCEPT_ENCODING
  94. headers["accept-encoding"] = accept_encoding
  95. if user_agent:
  96. headers["user-agent"] = user_agent
  97. if keep_alive:
  98. headers["connection"] = "keep-alive"
  99. if basic_auth:
  100. headers["authorization"] = (
  101. f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}"
  102. )
  103. if proxy_basic_auth:
  104. headers["proxy-authorization"] = (
  105. f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}"
  106. )
  107. if disable_cache:
  108. headers["cache-control"] = "no-cache"
  109. return headers
  110. def set_file_position(
  111. body: typing.Any, pos: _TYPE_BODY_POSITION | None
  112. ) -> _TYPE_BODY_POSITION | None:
  113. """
  114. If a position is provided, move file to that point.
  115. Otherwise, we'll attempt to record a position for future use.
  116. """
  117. if pos is not None:
  118. rewind_body(body, pos)
  119. elif getattr(body, "tell", None) is not None:
  120. try:
  121. pos = body.tell()
  122. except OSError:
  123. # This differentiates from None, allowing us to catch
  124. # a failed `tell()` later when trying to rewind the body.
  125. pos = _FAILEDTELL
  126. return pos
  127. def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None:
  128. """
  129. Attempt to rewind body to a certain position.
  130. Primarily used for request redirects and retries.
  131. :param body:
  132. File-like object that supports seek.
  133. :param int pos:
  134. Position to seek to in file.
  135. """
  136. body_seek = getattr(body, "seek", None)
  137. if body_seek is not None and isinstance(body_pos, int):
  138. try:
  139. body_seek(body_pos)
  140. except OSError as e:
  141. raise UnrewindableBodyError(
  142. "An error occurred when rewinding request body for redirect/retry."
  143. ) from e
  144. elif body_pos is _FAILEDTELL:
  145. raise UnrewindableBodyError(
  146. "Unable to record file position for rewinding "
  147. "request body during a redirect/retry."
  148. )
  149. else:
  150. raise ValueError(
  151. f"body_pos must be of type integer, instead it was {type(body_pos)}."
  152. )
  153. class ChunksAndContentLength(typing.NamedTuple):
  154. chunks: typing.Iterable[bytes] | None
  155. content_length: int | None
  156. def body_to_chunks(
  157. body: typing.Any | None, method: str, blocksize: int
  158. ) -> ChunksAndContentLength:
  159. """Takes the HTTP request method, body, and blocksize and
  160. transforms them into an iterable of chunks to pass to
  161. socket.sendall() and an optional 'Content-Length' header.
  162. A 'Content-Length' of 'None' indicates the length of the body
  163. can't be determined so should use 'Transfer-Encoding: chunked'
  164. for framing instead.
  165. """
  166. chunks: typing.Iterable[bytes] | None
  167. content_length: int | None
  168. # No body, we need to make a recommendation on 'Content-Length'
  169. # based on whether that request method is expected to have
  170. # a body or not.
  171. if body is None:
  172. chunks = None
  173. if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
  174. content_length = 0
  175. else:
  176. content_length = None
  177. # Bytes or strings become bytes
  178. elif isinstance(body, (str, bytes)):
  179. chunks = (to_bytes(body),)
  180. content_length = len(chunks[0])
  181. # File-like object, TODO: use seek() and tell() for length?
  182. elif hasattr(body, "read"):
  183. def chunk_readable() -> typing.Iterable[bytes]:
  184. encode = isinstance(body, io.TextIOBase)
  185. while True:
  186. datablock = body.read(blocksize)
  187. if not datablock:
  188. break
  189. if encode:
  190. datablock = datablock.encode("utf-8")
  191. yield datablock
  192. chunks = chunk_readable()
  193. content_length = None
  194. # Otherwise we need to start checking via duck-typing.
  195. else:
  196. try:
  197. # Check if the body implements the buffer API.
  198. mv = memoryview(body)
  199. except TypeError:
  200. try:
  201. # Check if the body is an iterable
  202. chunks = iter(body)
  203. content_length = None
  204. except TypeError:
  205. raise TypeError(
  206. f"'body' must be a bytes-like object, file-like "
  207. f"object, or iterable. Instead was {body!r}"
  208. ) from None
  209. else:
  210. # Since it implements the buffer API can be passed directly to socket.sendall()
  211. chunks = (body,)
  212. content_length = mv.nbytes
  213. return ChunksAndContentLength(chunks=chunks, content_length=content_length)