_compat.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629
  1. from collections import deque
  2. from copy import copy
  3. from dataclasses import dataclass, is_dataclass
  4. from enum import Enum
  5. from typing import (
  6. Any,
  7. Callable,
  8. Deque,
  9. Dict,
  10. FrozenSet,
  11. List,
  12. Mapping,
  13. Sequence,
  14. Set,
  15. Tuple,
  16. Type,
  17. Union,
  18. )
  19. from fastapi.exceptions import RequestErrorModel
  20. from fastapi.types import IncEx, ModelNameMap, UnionType
  21. from pydantic import BaseModel, create_model
  22. from pydantic.version import VERSION as PYDANTIC_VERSION
  23. from starlette.datastructures import UploadFile
  24. from typing_extensions import Annotated, Literal, get_args, get_origin
  25. PYDANTIC_V2 = PYDANTIC_VERSION.startswith("2.")
  26. sequence_annotation_to_type = {
  27. Sequence: list,
  28. List: list,
  29. list: list,
  30. Tuple: tuple,
  31. tuple: tuple,
  32. Set: set,
  33. set: set,
  34. FrozenSet: frozenset,
  35. frozenset: frozenset,
  36. Deque: deque,
  37. deque: deque,
  38. }
  39. sequence_types = tuple(sequence_annotation_to_type.keys())
  40. if PYDANTIC_V2:
  41. from pydantic import PydanticSchemaGenerationError as PydanticSchemaGenerationError
  42. from pydantic import TypeAdapter
  43. from pydantic import ValidationError as ValidationError
  44. from pydantic._internal._schema_generation_shared import ( # type: ignore[attr-defined]
  45. GetJsonSchemaHandler as GetJsonSchemaHandler,
  46. )
  47. from pydantic._internal._typing_extra import eval_type_lenient
  48. from pydantic._internal._utils import lenient_issubclass as lenient_issubclass
  49. from pydantic.fields import FieldInfo
  50. from pydantic.json_schema import GenerateJsonSchema as GenerateJsonSchema
  51. from pydantic.json_schema import JsonSchemaValue as JsonSchemaValue
  52. from pydantic_core import CoreSchema as CoreSchema
  53. from pydantic_core import PydanticUndefined, PydanticUndefinedType
  54. from pydantic_core import Url as Url
  55. try:
  56. from pydantic_core.core_schema import (
  57. with_info_plain_validator_function as with_info_plain_validator_function,
  58. )
  59. except ImportError: # pragma: no cover
  60. from pydantic_core.core_schema import (
  61. general_plain_validator_function as with_info_plain_validator_function, # noqa: F401
  62. )
  63. Required = PydanticUndefined
  64. Undefined = PydanticUndefined
  65. UndefinedType = PydanticUndefinedType
  66. evaluate_forwardref = eval_type_lenient
  67. Validator = Any
  68. class BaseConfig:
  69. pass
  70. class ErrorWrapper(Exception):
  71. pass
  72. @dataclass
  73. class ModelField:
  74. field_info: FieldInfo
  75. name: str
  76. mode: Literal["validation", "serialization"] = "validation"
  77. @property
  78. def alias(self) -> str:
  79. a = self.field_info.alias
  80. return a if a is not None else self.name
  81. @property
  82. def required(self) -> bool:
  83. return self.field_info.is_required()
  84. @property
  85. def default(self) -> Any:
  86. return self.get_default()
  87. @property
  88. def type_(self) -> Any:
  89. return self.field_info.annotation
  90. def __post_init__(self) -> None:
  91. self._type_adapter: TypeAdapter[Any] = TypeAdapter(
  92. Annotated[self.field_info.annotation, self.field_info]
  93. )
  94. def get_default(self) -> Any:
  95. if self.field_info.is_required():
  96. return Undefined
  97. return self.field_info.get_default(call_default_factory=True)
  98. def validate(
  99. self,
  100. value: Any,
  101. values: Dict[str, Any] = {}, # noqa: B006
  102. *,
  103. loc: Tuple[Union[int, str], ...] = (),
  104. ) -> Tuple[Any, Union[List[Dict[str, Any]], None]]:
  105. try:
  106. return (
  107. self._type_adapter.validate_python(value, from_attributes=True),
  108. None,
  109. )
  110. except ValidationError as exc:
  111. return None, _regenerate_error_with_loc(
  112. errors=exc.errors(), loc_prefix=loc
  113. )
  114. def serialize(
  115. self,
  116. value: Any,
  117. *,
  118. mode: Literal["json", "python"] = "json",
  119. include: Union[IncEx, None] = None,
  120. exclude: Union[IncEx, None] = None,
  121. by_alias: bool = True,
  122. exclude_unset: bool = False,
  123. exclude_defaults: bool = False,
  124. exclude_none: bool = False,
  125. ) -> Any:
  126. # What calls this code passes a value that already called
  127. # self._type_adapter.validate_python(value)
  128. return self._type_adapter.dump_python(
  129. value,
  130. mode=mode,
  131. include=include,
  132. exclude=exclude,
  133. by_alias=by_alias,
  134. exclude_unset=exclude_unset,
  135. exclude_defaults=exclude_defaults,
  136. exclude_none=exclude_none,
  137. )
  138. def __hash__(self) -> int:
  139. # Each ModelField is unique for our purposes, to allow making a dict from
  140. # ModelField to its JSON Schema.
  141. return id(self)
  142. def get_annotation_from_field_info(
  143. annotation: Any, field_info: FieldInfo, field_name: str
  144. ) -> Any:
  145. return annotation
  146. def _normalize_errors(errors: Sequence[Any]) -> List[Dict[str, Any]]:
  147. return errors # type: ignore[return-value]
  148. def _model_rebuild(model: Type[BaseModel]) -> None:
  149. model.model_rebuild()
  150. def _model_dump(
  151. model: BaseModel, mode: Literal["json", "python"] = "json", **kwargs: Any
  152. ) -> Any:
  153. return model.model_dump(mode=mode, **kwargs)
  154. def _get_model_config(model: BaseModel) -> Any:
  155. return model.model_config
  156. def get_schema_from_model_field(
  157. *,
  158. field: ModelField,
  159. schema_generator: GenerateJsonSchema,
  160. model_name_map: ModelNameMap,
  161. field_mapping: Dict[
  162. Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue
  163. ],
  164. separate_input_output_schemas: bool = True,
  165. ) -> Dict[str, Any]:
  166. override_mode: Union[Literal["validation"], None] = (
  167. None if separate_input_output_schemas else "validation"
  168. )
  169. # This expects that GenerateJsonSchema was already used to generate the definitions
  170. json_schema = field_mapping[(field, override_mode or field.mode)]
  171. if "$ref" not in json_schema:
  172. # TODO remove when deprecating Pydantic v1
  173. # Ref: https://github.com/pydantic/pydantic/blob/d61792cc42c80b13b23e3ffa74bc37ec7c77f7d1/pydantic/schema.py#L207
  174. json_schema["title"] = (
  175. field.field_info.title or field.alias.title().replace("_", " ")
  176. )
  177. return json_schema
  178. def get_compat_model_name_map(fields: List[ModelField]) -> ModelNameMap:
  179. return {}
  180. def get_definitions(
  181. *,
  182. fields: List[ModelField],
  183. schema_generator: GenerateJsonSchema,
  184. model_name_map: ModelNameMap,
  185. separate_input_output_schemas: bool = True,
  186. ) -> Tuple[
  187. Dict[
  188. Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue
  189. ],
  190. Dict[str, Dict[str, Any]],
  191. ]:
  192. override_mode: Union[Literal["validation"], None] = (
  193. None if separate_input_output_schemas else "validation"
  194. )
  195. inputs = [
  196. (field, override_mode or field.mode, field._type_adapter.core_schema)
  197. for field in fields
  198. ]
  199. field_mapping, definitions = schema_generator.generate_definitions(
  200. inputs=inputs
  201. )
  202. return field_mapping, definitions # type: ignore[return-value]
  203. def is_scalar_field(field: ModelField) -> bool:
  204. from fastapi import params
  205. return field_annotation_is_scalar(
  206. field.field_info.annotation
  207. ) and not isinstance(field.field_info, params.Body)
  208. def is_sequence_field(field: ModelField) -> bool:
  209. return field_annotation_is_sequence(field.field_info.annotation)
  210. def is_scalar_sequence_field(field: ModelField) -> bool:
  211. return field_annotation_is_scalar_sequence(field.field_info.annotation)
  212. def is_bytes_field(field: ModelField) -> bool:
  213. return is_bytes_or_nonable_bytes_annotation(field.type_)
  214. def is_bytes_sequence_field(field: ModelField) -> bool:
  215. return is_bytes_sequence_annotation(field.type_)
  216. def copy_field_info(*, field_info: FieldInfo, annotation: Any) -> FieldInfo:
  217. return type(field_info).from_annotation(annotation)
  218. def serialize_sequence_value(*, field: ModelField, value: Any) -> Sequence[Any]:
  219. origin_type = (
  220. get_origin(field.field_info.annotation) or field.field_info.annotation
  221. )
  222. assert issubclass(origin_type, sequence_types) # type: ignore[arg-type]
  223. return sequence_annotation_to_type[origin_type](value) # type: ignore[no-any-return]
  224. def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]:
  225. error = ValidationError.from_exception_data(
  226. "Field required", [{"type": "missing", "loc": loc, "input": {}}]
  227. ).errors()[0]
  228. error["input"] = None
  229. return error # type: ignore[return-value]
  230. def create_body_model(
  231. *, fields: Sequence[ModelField], model_name: str
  232. ) -> Type[BaseModel]:
  233. field_params = {f.name: (f.field_info.annotation, f.field_info) for f in fields}
  234. BodyModel: Type[BaseModel] = create_model(model_name, **field_params) # type: ignore[call-overload]
  235. return BodyModel
  236. else:
  237. from fastapi.openapi.constants import REF_PREFIX as REF_PREFIX
  238. from pydantic import AnyUrl as Url # noqa: F401
  239. from pydantic import ( # type: ignore[assignment]
  240. BaseConfig as BaseConfig, # noqa: F401
  241. )
  242. from pydantic import ValidationError as ValidationError # noqa: F401
  243. from pydantic.class_validators import ( # type: ignore[no-redef]
  244. Validator as Validator, # noqa: F401
  245. )
  246. from pydantic.error_wrappers import ( # type: ignore[no-redef]
  247. ErrorWrapper as ErrorWrapper, # noqa: F401
  248. )
  249. from pydantic.errors import MissingError
  250. from pydantic.fields import ( # type: ignore[attr-defined]
  251. SHAPE_FROZENSET,
  252. SHAPE_LIST,
  253. SHAPE_SEQUENCE,
  254. SHAPE_SET,
  255. SHAPE_SINGLETON,
  256. SHAPE_TUPLE,
  257. SHAPE_TUPLE_ELLIPSIS,
  258. )
  259. from pydantic.fields import FieldInfo as FieldInfo
  260. from pydantic.fields import ( # type: ignore[no-redef,attr-defined]
  261. ModelField as ModelField, # noqa: F401
  262. )
  263. from pydantic.fields import ( # type: ignore[no-redef,attr-defined]
  264. Required as Required, # noqa: F401
  265. )
  266. from pydantic.fields import ( # type: ignore[no-redef,attr-defined]
  267. Undefined as Undefined,
  268. )
  269. from pydantic.fields import ( # type: ignore[no-redef, attr-defined]
  270. UndefinedType as UndefinedType, # noqa: F401
  271. )
  272. from pydantic.schema import (
  273. field_schema,
  274. get_flat_models_from_fields,
  275. get_model_name_map,
  276. model_process_schema,
  277. )
  278. from pydantic.schema import ( # type: ignore[no-redef] # noqa: F401
  279. get_annotation_from_field_info as get_annotation_from_field_info,
  280. )
  281. from pydantic.typing import ( # type: ignore[no-redef]
  282. evaluate_forwardref as evaluate_forwardref, # noqa: F401
  283. )
  284. from pydantic.utils import ( # type: ignore[no-redef]
  285. lenient_issubclass as lenient_issubclass, # noqa: F401
  286. )
  287. GetJsonSchemaHandler = Any # type: ignore[assignment,misc]
  288. JsonSchemaValue = Dict[str, Any] # type: ignore[misc]
  289. CoreSchema = Any # type: ignore[assignment,misc]
  290. sequence_shapes = {
  291. SHAPE_LIST,
  292. SHAPE_SET,
  293. SHAPE_FROZENSET,
  294. SHAPE_TUPLE,
  295. SHAPE_SEQUENCE,
  296. SHAPE_TUPLE_ELLIPSIS,
  297. }
  298. sequence_shape_to_type = {
  299. SHAPE_LIST: list,
  300. SHAPE_SET: set,
  301. SHAPE_TUPLE: tuple,
  302. SHAPE_SEQUENCE: list,
  303. SHAPE_TUPLE_ELLIPSIS: list,
  304. }
  305. @dataclass
  306. class GenerateJsonSchema: # type: ignore[no-redef]
  307. ref_template: str
  308. class PydanticSchemaGenerationError(Exception): # type: ignore[no-redef]
  309. pass
  310. def with_info_plain_validator_function( # type: ignore[misc]
  311. function: Callable[..., Any],
  312. *,
  313. ref: Union[str, None] = None,
  314. metadata: Any = None,
  315. serialization: Any = None,
  316. ) -> Any:
  317. return {}
  318. def get_model_definitions(
  319. *,
  320. flat_models: Set[Union[Type[BaseModel], Type[Enum]]],
  321. model_name_map: Dict[Union[Type[BaseModel], Type[Enum]], str],
  322. ) -> Dict[str, Any]:
  323. definitions: Dict[str, Dict[str, Any]] = {}
  324. for model in flat_models:
  325. m_schema, m_definitions, m_nested_models = model_process_schema(
  326. model, model_name_map=model_name_map, ref_prefix=REF_PREFIX
  327. )
  328. definitions.update(m_definitions)
  329. model_name = model_name_map[model]
  330. if "description" in m_schema:
  331. m_schema["description"] = m_schema["description"].split("\f")[0]
  332. definitions[model_name] = m_schema
  333. return definitions
  334. def is_pv1_scalar_field(field: ModelField) -> bool:
  335. from fastapi import params
  336. field_info = field.field_info
  337. if not (
  338. field.shape == SHAPE_SINGLETON # type: ignore[attr-defined]
  339. and not lenient_issubclass(field.type_, BaseModel)
  340. and not lenient_issubclass(field.type_, dict)
  341. and not field_annotation_is_sequence(field.type_)
  342. and not is_dataclass(field.type_)
  343. and not isinstance(field_info, params.Body)
  344. ):
  345. return False
  346. if field.sub_fields: # type: ignore[attr-defined]
  347. if not all(
  348. is_pv1_scalar_field(f)
  349. for f in field.sub_fields # type: ignore[attr-defined]
  350. ):
  351. return False
  352. return True
  353. def is_pv1_scalar_sequence_field(field: ModelField) -> bool:
  354. if (field.shape in sequence_shapes) and not lenient_issubclass( # type: ignore[attr-defined]
  355. field.type_, BaseModel
  356. ):
  357. if field.sub_fields is not None: # type: ignore[attr-defined]
  358. for sub_field in field.sub_fields: # type: ignore[attr-defined]
  359. if not is_pv1_scalar_field(sub_field):
  360. return False
  361. return True
  362. if _annotation_is_sequence(field.type_):
  363. return True
  364. return False
  365. def _normalize_errors(errors: Sequence[Any]) -> List[Dict[str, Any]]:
  366. use_errors: List[Any] = []
  367. for error in errors:
  368. if isinstance(error, ErrorWrapper):
  369. new_errors = ValidationError( # type: ignore[call-arg]
  370. errors=[error], model=RequestErrorModel
  371. ).errors()
  372. use_errors.extend(new_errors)
  373. elif isinstance(error, list):
  374. use_errors.extend(_normalize_errors(error))
  375. else:
  376. use_errors.append(error)
  377. return use_errors
  378. def _model_rebuild(model: Type[BaseModel]) -> None:
  379. model.update_forward_refs()
  380. def _model_dump(
  381. model: BaseModel, mode: Literal["json", "python"] = "json", **kwargs: Any
  382. ) -> Any:
  383. return model.dict(**kwargs)
  384. def _get_model_config(model: BaseModel) -> Any:
  385. return model.__config__ # type: ignore[attr-defined]
  386. def get_schema_from_model_field(
  387. *,
  388. field: ModelField,
  389. schema_generator: GenerateJsonSchema,
  390. model_name_map: ModelNameMap,
  391. field_mapping: Dict[
  392. Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue
  393. ],
  394. separate_input_output_schemas: bool = True,
  395. ) -> Dict[str, Any]:
  396. # This expects that GenerateJsonSchema was already used to generate the definitions
  397. return field_schema( # type: ignore[no-any-return]
  398. field, model_name_map=model_name_map, ref_prefix=REF_PREFIX
  399. )[0]
  400. def get_compat_model_name_map(fields: List[ModelField]) -> ModelNameMap:
  401. models = get_flat_models_from_fields(fields, known_models=set())
  402. return get_model_name_map(models) # type: ignore[no-any-return]
  403. def get_definitions(
  404. *,
  405. fields: List[ModelField],
  406. schema_generator: GenerateJsonSchema,
  407. model_name_map: ModelNameMap,
  408. separate_input_output_schemas: bool = True,
  409. ) -> Tuple[
  410. Dict[
  411. Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue
  412. ],
  413. Dict[str, Dict[str, Any]],
  414. ]:
  415. models = get_flat_models_from_fields(fields, known_models=set())
  416. return {}, get_model_definitions(
  417. flat_models=models, model_name_map=model_name_map
  418. )
  419. def is_scalar_field(field: ModelField) -> bool:
  420. return is_pv1_scalar_field(field)
  421. def is_sequence_field(field: ModelField) -> bool:
  422. return field.shape in sequence_shapes or _annotation_is_sequence(field.type_) # type: ignore[attr-defined]
  423. def is_scalar_sequence_field(field: ModelField) -> bool:
  424. return is_pv1_scalar_sequence_field(field)
  425. def is_bytes_field(field: ModelField) -> bool:
  426. return lenient_issubclass(field.type_, bytes)
  427. def is_bytes_sequence_field(field: ModelField) -> bool:
  428. return field.shape in sequence_shapes and lenient_issubclass(field.type_, bytes) # type: ignore[attr-defined]
  429. def copy_field_info(*, field_info: FieldInfo, annotation: Any) -> FieldInfo:
  430. return copy(field_info)
  431. def serialize_sequence_value(*, field: ModelField, value: Any) -> Sequence[Any]:
  432. return sequence_shape_to_type[field.shape](value) # type: ignore[no-any-return,attr-defined]
  433. def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]:
  434. missing_field_error = ErrorWrapper(MissingError(), loc=loc) # type: ignore[call-arg]
  435. new_error = ValidationError([missing_field_error], RequestErrorModel)
  436. return new_error.errors()[0] # type: ignore[return-value]
  437. def create_body_model(
  438. *, fields: Sequence[ModelField], model_name: str
  439. ) -> Type[BaseModel]:
  440. BodyModel = create_model(model_name)
  441. for f in fields:
  442. BodyModel.__fields__[f.name] = f # type: ignore[index]
  443. return BodyModel
  444. def _regenerate_error_with_loc(
  445. *, errors: Sequence[Any], loc_prefix: Tuple[Union[str, int], ...]
  446. ) -> List[Dict[str, Any]]:
  447. updated_loc_errors: List[Any] = [
  448. {**err, "loc": loc_prefix + err.get("loc", ())}
  449. for err in _normalize_errors(errors)
  450. ]
  451. return updated_loc_errors
  452. def _annotation_is_sequence(annotation: Union[Type[Any], None]) -> bool:
  453. if lenient_issubclass(annotation, (str, bytes)):
  454. return False
  455. return lenient_issubclass(annotation, sequence_types)
  456. def field_annotation_is_sequence(annotation: Union[Type[Any], None]) -> bool:
  457. return _annotation_is_sequence(annotation) or _annotation_is_sequence(
  458. get_origin(annotation)
  459. )
  460. def value_is_sequence(value: Any) -> bool:
  461. return isinstance(value, sequence_types) and not isinstance(value, (str, bytes)) # type: ignore[arg-type]
  462. def _annotation_is_complex(annotation: Union[Type[Any], None]) -> bool:
  463. return (
  464. lenient_issubclass(annotation, (BaseModel, Mapping, UploadFile))
  465. or _annotation_is_sequence(annotation)
  466. or is_dataclass(annotation)
  467. )
  468. def field_annotation_is_complex(annotation: Union[Type[Any], None]) -> bool:
  469. origin = get_origin(annotation)
  470. if origin is Union or origin is UnionType:
  471. return any(field_annotation_is_complex(arg) for arg in get_args(annotation))
  472. return (
  473. _annotation_is_complex(annotation)
  474. or _annotation_is_complex(origin)
  475. or hasattr(origin, "__pydantic_core_schema__")
  476. or hasattr(origin, "__get_pydantic_core_schema__")
  477. )
  478. def field_annotation_is_scalar(annotation: Any) -> bool:
  479. # handle Ellipsis here to make tuple[int, ...] work nicely
  480. return annotation is Ellipsis or not field_annotation_is_complex(annotation)
  481. def field_annotation_is_scalar_sequence(annotation: Union[Type[Any], None]) -> bool:
  482. origin = get_origin(annotation)
  483. if origin is Union or origin is UnionType:
  484. at_least_one_scalar_sequence = False
  485. for arg in get_args(annotation):
  486. if field_annotation_is_scalar_sequence(arg):
  487. at_least_one_scalar_sequence = True
  488. continue
  489. elif not field_annotation_is_scalar(arg):
  490. return False
  491. return at_least_one_scalar_sequence
  492. return field_annotation_is_sequence(annotation) and all(
  493. field_annotation_is_scalar(sub_annotation)
  494. for sub_annotation in get_args(annotation)
  495. )
  496. def is_bytes_or_nonable_bytes_annotation(annotation: Any) -> bool:
  497. if lenient_issubclass(annotation, bytes):
  498. return True
  499. origin = get_origin(annotation)
  500. if origin is Union or origin is UnionType:
  501. for arg in get_args(annotation):
  502. if lenient_issubclass(arg, bytes):
  503. return True
  504. return False
  505. def is_uploadfile_or_nonable_uploadfile_annotation(annotation: Any) -> bool:
  506. if lenient_issubclass(annotation, UploadFile):
  507. return True
  508. origin = get_origin(annotation)
  509. if origin is Union or origin is UnionType:
  510. for arg in get_args(annotation):
  511. if lenient_issubclass(arg, UploadFile):
  512. return True
  513. return False
  514. def is_bytes_sequence_annotation(annotation: Any) -> bool:
  515. origin = get_origin(annotation)
  516. if origin is Union or origin is UnionType:
  517. at_least_one = False
  518. for arg in get_args(annotation):
  519. if is_bytes_sequence_annotation(arg):
  520. at_least_one = True
  521. continue
  522. return at_least_one
  523. return field_annotation_is_sequence(annotation) and all(
  524. is_bytes_or_nonable_bytes_annotation(sub_annotation)
  525. for sub_annotation in get_args(annotation)
  526. )
  527. def is_uploadfile_sequence_annotation(annotation: Any) -> bool:
  528. origin = get_origin(annotation)
  529. if origin is Union or origin is UnionType:
  530. at_least_one = False
  531. for arg in get_args(annotation):
  532. if is_uploadfile_sequence_annotation(arg):
  533. at_least_one = True
  534. continue
  535. return at_least_one
  536. return field_annotation_is_sequence(annotation) and all(
  537. is_uploadfile_or_nonable_uploadfile_annotation(sub_annotation)
  538. for sub_annotation in get_args(annotation)
  539. )