networks.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. import re
  2. from ipaddress import (
  3. IPv4Address,
  4. IPv4Interface,
  5. IPv4Network,
  6. IPv6Address,
  7. IPv6Interface,
  8. IPv6Network,
  9. _BaseAddress,
  10. _BaseNetwork,
  11. )
  12. from typing import (
  13. TYPE_CHECKING,
  14. Any,
  15. Collection,
  16. Dict,
  17. Generator,
  18. List,
  19. Match,
  20. Optional,
  21. Pattern,
  22. Set,
  23. Tuple,
  24. Type,
  25. Union,
  26. cast,
  27. no_type_check,
  28. )
  29. from pydantic.v1 import errors
  30. from pydantic.v1.utils import Representation, update_not_none
  31. from pydantic.v1.validators import constr_length_validator, str_validator
  32. if TYPE_CHECKING:
  33. import email_validator
  34. from typing_extensions import TypedDict
  35. from pydantic.v1.config import BaseConfig
  36. from pydantic.v1.fields import ModelField
  37. from pydantic.v1.typing import AnyCallable
  38. CallableGenerator = Generator[AnyCallable, None, None]
  39. class Parts(TypedDict, total=False):
  40. scheme: str
  41. user: Optional[str]
  42. password: Optional[str]
  43. ipv4: Optional[str]
  44. ipv6: Optional[str]
  45. domain: Optional[str]
  46. port: Optional[str]
  47. path: Optional[str]
  48. query: Optional[str]
  49. fragment: Optional[str]
  50. class HostParts(TypedDict, total=False):
  51. host: str
  52. tld: Optional[str]
  53. host_type: Optional[str]
  54. port: Optional[str]
  55. rebuild: bool
  56. else:
  57. email_validator = None
  58. class Parts(dict):
  59. pass
  60. NetworkType = Union[str, bytes, int, Tuple[Union[str, bytes, int], Union[str, int]]]
  61. __all__ = [
  62. 'AnyUrl',
  63. 'AnyHttpUrl',
  64. 'FileUrl',
  65. 'HttpUrl',
  66. 'stricturl',
  67. 'EmailStr',
  68. 'NameEmail',
  69. 'IPvAnyAddress',
  70. 'IPvAnyInterface',
  71. 'IPvAnyNetwork',
  72. 'PostgresDsn',
  73. 'CockroachDsn',
  74. 'AmqpDsn',
  75. 'RedisDsn',
  76. 'MongoDsn',
  77. 'KafkaDsn',
  78. 'validate_email',
  79. ]
  80. _url_regex_cache = None
  81. _multi_host_url_regex_cache = None
  82. _ascii_domain_regex_cache = None
  83. _int_domain_regex_cache = None
  84. _host_regex_cache = None
  85. _host_regex = (
  86. r'(?:'
  87. r'(?P<ipv4>(?:\d{1,3}\.){3}\d{1,3})(?=$|[/:#?])|' # ipv4
  88. r'(?P<ipv6>\[[A-F0-9]*:[A-F0-9:]+\])(?=$|[/:#?])|' # ipv6
  89. r'(?P<domain>[^\s/:?#]+)' # domain, validation occurs later
  90. r')?'
  91. r'(?::(?P<port>\d+))?' # port
  92. )
  93. _scheme_regex = r'(?:(?P<scheme>[a-z][a-z0-9+\-.]+)://)?' # scheme https://tools.ietf.org/html/rfc3986#appendix-A
  94. _user_info_regex = r'(?:(?P<user>[^\s:/]*)(?::(?P<password>[^\s/]*))?@)?'
  95. _path_regex = r'(?P<path>/[^\s?#]*)?'
  96. _query_regex = r'(?:\?(?P<query>[^\s#]*))?'
  97. _fragment_regex = r'(?:#(?P<fragment>[^\s#]*))?'
  98. def url_regex() -> Pattern[str]:
  99. global _url_regex_cache
  100. if _url_regex_cache is None:
  101. _url_regex_cache = re.compile(
  102. rf'{_scheme_regex}{_user_info_regex}{_host_regex}{_path_regex}{_query_regex}{_fragment_regex}',
  103. re.IGNORECASE,
  104. )
  105. return _url_regex_cache
  106. def multi_host_url_regex() -> Pattern[str]:
  107. """
  108. Compiled multi host url regex.
  109. Additionally to `url_regex` it allows to match multiple hosts.
  110. E.g. host1.db.net,host2.db.net
  111. """
  112. global _multi_host_url_regex_cache
  113. if _multi_host_url_regex_cache is None:
  114. _multi_host_url_regex_cache = re.compile(
  115. rf'{_scheme_regex}{_user_info_regex}'
  116. r'(?P<hosts>([^/]*))' # validation occurs later
  117. rf'{_path_regex}{_query_regex}{_fragment_regex}',
  118. re.IGNORECASE,
  119. )
  120. return _multi_host_url_regex_cache
  121. def ascii_domain_regex() -> Pattern[str]:
  122. global _ascii_domain_regex_cache
  123. if _ascii_domain_regex_cache is None:
  124. ascii_chunk = r'[_0-9a-z](?:[-_0-9a-z]{0,61}[_0-9a-z])?'
  125. ascii_domain_ending = r'(?P<tld>\.[a-z]{2,63})?\.?'
  126. _ascii_domain_regex_cache = re.compile(
  127. fr'(?:{ascii_chunk}\.)*?{ascii_chunk}{ascii_domain_ending}', re.IGNORECASE
  128. )
  129. return _ascii_domain_regex_cache
  130. def int_domain_regex() -> Pattern[str]:
  131. global _int_domain_regex_cache
  132. if _int_domain_regex_cache is None:
  133. int_chunk = r'[_0-9a-\U00040000](?:[-_0-9a-\U00040000]{0,61}[_0-9a-\U00040000])?'
  134. int_domain_ending = r'(?P<tld>(\.[^\W\d_]{2,63})|(\.(?:xn--)[_0-9a-z-]{2,63}))?\.?'
  135. _int_domain_regex_cache = re.compile(fr'(?:{int_chunk}\.)*?{int_chunk}{int_domain_ending}', re.IGNORECASE)
  136. return _int_domain_regex_cache
  137. def host_regex() -> Pattern[str]:
  138. global _host_regex_cache
  139. if _host_regex_cache is None:
  140. _host_regex_cache = re.compile(
  141. _host_regex,
  142. re.IGNORECASE,
  143. )
  144. return _host_regex_cache
  145. class AnyUrl(str):
  146. strip_whitespace = True
  147. min_length = 1
  148. max_length = 2**16
  149. allowed_schemes: Optional[Collection[str]] = None
  150. tld_required: bool = False
  151. user_required: bool = False
  152. host_required: bool = True
  153. hidden_parts: Set[str] = set()
  154. __slots__ = ('scheme', 'user', 'password', 'host', 'tld', 'host_type', 'port', 'path', 'query', 'fragment')
  155. @no_type_check
  156. def __new__(cls, url: Optional[str], **kwargs) -> object:
  157. return str.__new__(cls, cls.build(**kwargs) if url is None else url)
  158. def __init__(
  159. self,
  160. url: str,
  161. *,
  162. scheme: str,
  163. user: Optional[str] = None,
  164. password: Optional[str] = None,
  165. host: Optional[str] = None,
  166. tld: Optional[str] = None,
  167. host_type: str = 'domain',
  168. port: Optional[str] = None,
  169. path: Optional[str] = None,
  170. query: Optional[str] = None,
  171. fragment: Optional[str] = None,
  172. ) -> None:
  173. str.__init__(url)
  174. self.scheme = scheme
  175. self.user = user
  176. self.password = password
  177. self.host = host
  178. self.tld = tld
  179. self.host_type = host_type
  180. self.port = port
  181. self.path = path
  182. self.query = query
  183. self.fragment = fragment
  184. @classmethod
  185. def build(
  186. cls,
  187. *,
  188. scheme: str,
  189. user: Optional[str] = None,
  190. password: Optional[str] = None,
  191. host: str,
  192. port: Optional[str] = None,
  193. path: Optional[str] = None,
  194. query: Optional[str] = None,
  195. fragment: Optional[str] = None,
  196. **_kwargs: str,
  197. ) -> str:
  198. parts = Parts(
  199. scheme=scheme,
  200. user=user,
  201. password=password,
  202. host=host,
  203. port=port,
  204. path=path,
  205. query=query,
  206. fragment=fragment,
  207. **_kwargs, # type: ignore[misc]
  208. )
  209. url = scheme + '://'
  210. if user:
  211. url += user
  212. if password:
  213. url += ':' + password
  214. if user or password:
  215. url += '@'
  216. url += host
  217. if port and ('port' not in cls.hidden_parts or cls.get_default_parts(parts).get('port') != port):
  218. url += ':' + port
  219. if path:
  220. url += path
  221. if query:
  222. url += '?' + query
  223. if fragment:
  224. url += '#' + fragment
  225. return url
  226. @classmethod
  227. def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
  228. update_not_none(field_schema, minLength=cls.min_length, maxLength=cls.max_length, format='uri')
  229. @classmethod
  230. def __get_validators__(cls) -> 'CallableGenerator':
  231. yield cls.validate
  232. @classmethod
  233. def validate(cls, value: Any, field: 'ModelField', config: 'BaseConfig') -> 'AnyUrl':
  234. if value.__class__ == cls:
  235. return value
  236. value = str_validator(value)
  237. if cls.strip_whitespace:
  238. value = value.strip()
  239. url: str = cast(str, constr_length_validator(value, field, config))
  240. m = cls._match_url(url)
  241. # the regex should always match, if it doesn't please report with details of the URL tried
  242. assert m, 'URL regex failed unexpectedly'
  243. original_parts = cast('Parts', m.groupdict())
  244. parts = cls.apply_default_parts(original_parts)
  245. parts = cls.validate_parts(parts)
  246. if m.end() != len(url):
  247. raise errors.UrlExtraError(extra=url[m.end() :])
  248. return cls._build_url(m, url, parts)
  249. @classmethod
  250. def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'AnyUrl':
  251. """
  252. Validate hosts and build the AnyUrl object. Split from `validate` so this method
  253. can be altered in `MultiHostDsn`.
  254. """
  255. host, tld, host_type, rebuild = cls.validate_host(parts)
  256. return cls(
  257. None if rebuild else url,
  258. scheme=parts['scheme'],
  259. user=parts['user'],
  260. password=parts['password'],
  261. host=host,
  262. tld=tld,
  263. host_type=host_type,
  264. port=parts['port'],
  265. path=parts['path'],
  266. query=parts['query'],
  267. fragment=parts['fragment'],
  268. )
  269. @staticmethod
  270. def _match_url(url: str) -> Optional[Match[str]]:
  271. return url_regex().match(url)
  272. @staticmethod
  273. def _validate_port(port: Optional[str]) -> None:
  274. if port is not None and int(port) > 65_535:
  275. raise errors.UrlPortError()
  276. @classmethod
  277. def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts':
  278. """
  279. A method used to validate parts of a URL.
  280. Could be overridden to set default values for parts if missing
  281. """
  282. scheme = parts['scheme']
  283. if scheme is None:
  284. raise errors.UrlSchemeError()
  285. if cls.allowed_schemes and scheme.lower() not in cls.allowed_schemes:
  286. raise errors.UrlSchemePermittedError(set(cls.allowed_schemes))
  287. if validate_port:
  288. cls._validate_port(parts['port'])
  289. user = parts['user']
  290. if cls.user_required and user is None:
  291. raise errors.UrlUserInfoError()
  292. return parts
  293. @classmethod
  294. def validate_host(cls, parts: 'Parts') -> Tuple[str, Optional[str], str, bool]:
  295. tld, host_type, rebuild = None, None, False
  296. for f in ('domain', 'ipv4', 'ipv6'):
  297. host = parts[f] # type: ignore[literal-required]
  298. if host:
  299. host_type = f
  300. break
  301. if host is None:
  302. if cls.host_required:
  303. raise errors.UrlHostError()
  304. elif host_type == 'domain':
  305. is_international = False
  306. d = ascii_domain_regex().fullmatch(host)
  307. if d is None:
  308. d = int_domain_regex().fullmatch(host)
  309. if d is None:
  310. raise errors.UrlHostError()
  311. is_international = True
  312. tld = d.group('tld')
  313. if tld is None and not is_international:
  314. d = int_domain_regex().fullmatch(host)
  315. assert d is not None
  316. tld = d.group('tld')
  317. is_international = True
  318. if tld is not None:
  319. tld = tld[1:]
  320. elif cls.tld_required:
  321. raise errors.UrlHostTldError()
  322. if is_international:
  323. host_type = 'int_domain'
  324. rebuild = True
  325. host = host.encode('idna').decode('ascii')
  326. if tld is not None:
  327. tld = tld.encode('idna').decode('ascii')
  328. return host, tld, host_type, rebuild # type: ignore
  329. @staticmethod
  330. def get_default_parts(parts: 'Parts') -> 'Parts':
  331. return {}
  332. @classmethod
  333. def apply_default_parts(cls, parts: 'Parts') -> 'Parts':
  334. for key, value in cls.get_default_parts(parts).items():
  335. if not parts[key]: # type: ignore[literal-required]
  336. parts[key] = value # type: ignore[literal-required]
  337. return parts
  338. def __repr__(self) -> str:
  339. extra = ', '.join(f'{n}={getattr(self, n)!r}' for n in self.__slots__ if getattr(self, n) is not None)
  340. return f'{self.__class__.__name__}({super().__repr__()}, {extra})'
  341. class AnyHttpUrl(AnyUrl):
  342. allowed_schemes = {'http', 'https'}
  343. __slots__ = ()
  344. class HttpUrl(AnyHttpUrl):
  345. tld_required = True
  346. # https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers
  347. max_length = 2083
  348. hidden_parts = {'port'}
  349. @staticmethod
  350. def get_default_parts(parts: 'Parts') -> 'Parts':
  351. return {'port': '80' if parts['scheme'] == 'http' else '443'}
  352. class FileUrl(AnyUrl):
  353. allowed_schemes = {'file'}
  354. host_required = False
  355. __slots__ = ()
  356. class MultiHostDsn(AnyUrl):
  357. __slots__ = AnyUrl.__slots__ + ('hosts',)
  358. def __init__(self, *args: Any, hosts: Optional[List['HostParts']] = None, **kwargs: Any):
  359. super().__init__(*args, **kwargs)
  360. self.hosts = hosts
  361. @staticmethod
  362. def _match_url(url: str) -> Optional[Match[str]]:
  363. return multi_host_url_regex().match(url)
  364. @classmethod
  365. def validate_parts(cls, parts: 'Parts', validate_port: bool = True) -> 'Parts':
  366. return super().validate_parts(parts, validate_port=False)
  367. @classmethod
  368. def _build_url(cls, m: Match[str], url: str, parts: 'Parts') -> 'MultiHostDsn':
  369. hosts_parts: List['HostParts'] = []
  370. host_re = host_regex()
  371. for host in m.groupdict()['hosts'].split(','):
  372. d: Parts = host_re.match(host).groupdict() # type: ignore
  373. host, tld, host_type, rebuild = cls.validate_host(d)
  374. port = d.get('port')
  375. cls._validate_port(port)
  376. hosts_parts.append(
  377. {
  378. 'host': host,
  379. 'host_type': host_type,
  380. 'tld': tld,
  381. 'rebuild': rebuild,
  382. 'port': port,
  383. }
  384. )
  385. if len(hosts_parts) > 1:
  386. return cls(
  387. None if any([hp['rebuild'] for hp in hosts_parts]) else url,
  388. scheme=parts['scheme'],
  389. user=parts['user'],
  390. password=parts['password'],
  391. path=parts['path'],
  392. query=parts['query'],
  393. fragment=parts['fragment'],
  394. host_type=None,
  395. hosts=hosts_parts,
  396. )
  397. else:
  398. # backwards compatibility with single host
  399. host_part = hosts_parts[0]
  400. return cls(
  401. None if host_part['rebuild'] else url,
  402. scheme=parts['scheme'],
  403. user=parts['user'],
  404. password=parts['password'],
  405. host=host_part['host'],
  406. tld=host_part['tld'],
  407. host_type=host_part['host_type'],
  408. port=host_part.get('port'),
  409. path=parts['path'],
  410. query=parts['query'],
  411. fragment=parts['fragment'],
  412. )
  413. class PostgresDsn(MultiHostDsn):
  414. allowed_schemes = {
  415. 'postgres',
  416. 'postgresql',
  417. 'postgresql+asyncpg',
  418. 'postgresql+pg8000',
  419. 'postgresql+psycopg',
  420. 'postgresql+psycopg2',
  421. 'postgresql+psycopg2cffi',
  422. 'postgresql+py-postgresql',
  423. 'postgresql+pygresql',
  424. }
  425. user_required = True
  426. __slots__ = ()
  427. class CockroachDsn(AnyUrl):
  428. allowed_schemes = {
  429. 'cockroachdb',
  430. 'cockroachdb+psycopg2',
  431. 'cockroachdb+asyncpg',
  432. }
  433. user_required = True
  434. class AmqpDsn(AnyUrl):
  435. allowed_schemes = {'amqp', 'amqps'}
  436. host_required = False
  437. class RedisDsn(AnyUrl):
  438. __slots__ = ()
  439. allowed_schemes = {'redis', 'rediss'}
  440. host_required = False
  441. @staticmethod
  442. def get_default_parts(parts: 'Parts') -> 'Parts':
  443. return {
  444. 'domain': 'localhost' if not (parts['ipv4'] or parts['ipv6']) else '',
  445. 'port': '6379',
  446. 'path': '/0',
  447. }
  448. class MongoDsn(AnyUrl):
  449. allowed_schemes = {'mongodb'}
  450. # TODO: Needed to generic "Parts" for "Replica Set", "Sharded Cluster", and other mongodb deployment modes
  451. @staticmethod
  452. def get_default_parts(parts: 'Parts') -> 'Parts':
  453. return {
  454. 'port': '27017',
  455. }
  456. class KafkaDsn(AnyUrl):
  457. allowed_schemes = {'kafka'}
  458. @staticmethod
  459. def get_default_parts(parts: 'Parts') -> 'Parts':
  460. return {
  461. 'domain': 'localhost',
  462. 'port': '9092',
  463. }
  464. def stricturl(
  465. *,
  466. strip_whitespace: bool = True,
  467. min_length: int = 1,
  468. max_length: int = 2**16,
  469. tld_required: bool = True,
  470. host_required: bool = True,
  471. allowed_schemes: Optional[Collection[str]] = None,
  472. ) -> Type[AnyUrl]:
  473. # use kwargs then define conf in a dict to aid with IDE type hinting
  474. namespace = dict(
  475. strip_whitespace=strip_whitespace,
  476. min_length=min_length,
  477. max_length=max_length,
  478. tld_required=tld_required,
  479. host_required=host_required,
  480. allowed_schemes=allowed_schemes,
  481. )
  482. return type('UrlValue', (AnyUrl,), namespace)
  483. def import_email_validator() -> None:
  484. global email_validator
  485. try:
  486. import email_validator
  487. except ImportError as e:
  488. raise ImportError('email-validator is not installed, run `pip install pydantic[email]`') from e
  489. class EmailStr(str):
  490. @classmethod
  491. def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
  492. field_schema.update(type='string', format='email')
  493. @classmethod
  494. def __get_validators__(cls) -> 'CallableGenerator':
  495. # included here and below so the error happens straight away
  496. import_email_validator()
  497. yield str_validator
  498. yield cls.validate
  499. @classmethod
  500. def validate(cls, value: Union[str]) -> str:
  501. return validate_email(value)[1]
  502. class NameEmail(Representation):
  503. __slots__ = 'name', 'email'
  504. def __init__(self, name: str, email: str):
  505. self.name = name
  506. self.email = email
  507. def __eq__(self, other: Any) -> bool:
  508. return isinstance(other, NameEmail) and (self.name, self.email) == (other.name, other.email)
  509. @classmethod
  510. def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
  511. field_schema.update(type='string', format='name-email')
  512. @classmethod
  513. def __get_validators__(cls) -> 'CallableGenerator':
  514. import_email_validator()
  515. yield cls.validate
  516. @classmethod
  517. def validate(cls, value: Any) -> 'NameEmail':
  518. if value.__class__ == cls:
  519. return value
  520. value = str_validator(value)
  521. return cls(*validate_email(value))
  522. def __str__(self) -> str:
  523. return f'{self.name} <{self.email}>'
  524. class IPvAnyAddress(_BaseAddress):
  525. __slots__ = ()
  526. @classmethod
  527. def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
  528. field_schema.update(type='string', format='ipvanyaddress')
  529. @classmethod
  530. def __get_validators__(cls) -> 'CallableGenerator':
  531. yield cls.validate
  532. @classmethod
  533. def validate(cls, value: Union[str, bytes, int]) -> Union[IPv4Address, IPv6Address]:
  534. try:
  535. return IPv4Address(value)
  536. except ValueError:
  537. pass
  538. try:
  539. return IPv6Address(value)
  540. except ValueError:
  541. raise errors.IPvAnyAddressError()
  542. class IPvAnyInterface(_BaseAddress):
  543. __slots__ = ()
  544. @classmethod
  545. def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
  546. field_schema.update(type='string', format='ipvanyinterface')
  547. @classmethod
  548. def __get_validators__(cls) -> 'CallableGenerator':
  549. yield cls.validate
  550. @classmethod
  551. def validate(cls, value: NetworkType) -> Union[IPv4Interface, IPv6Interface]:
  552. try:
  553. return IPv4Interface(value)
  554. except ValueError:
  555. pass
  556. try:
  557. return IPv6Interface(value)
  558. except ValueError:
  559. raise errors.IPvAnyInterfaceError()
  560. class IPvAnyNetwork(_BaseNetwork): # type: ignore
  561. @classmethod
  562. def __modify_schema__(cls, field_schema: Dict[str, Any]) -> None:
  563. field_schema.update(type='string', format='ipvanynetwork')
  564. @classmethod
  565. def __get_validators__(cls) -> 'CallableGenerator':
  566. yield cls.validate
  567. @classmethod
  568. def validate(cls, value: NetworkType) -> Union[IPv4Network, IPv6Network]:
  569. # Assume IP Network is defined with a default value for ``strict`` argument.
  570. # Define your own class if you want to specify network address check strictness.
  571. try:
  572. return IPv4Network(value)
  573. except ValueError:
  574. pass
  575. try:
  576. return IPv6Network(value)
  577. except ValueError:
  578. raise errors.IPvAnyNetworkError()
  579. pretty_email_regex = re.compile(r'([\w ]*?) *<(.*)> *')
  580. MAX_EMAIL_LENGTH = 2048
  581. """Maximum length for an email.
  582. A somewhat arbitrary but very generous number compared to what is allowed by most implementations.
  583. """
  584. def validate_email(value: Union[str]) -> Tuple[str, str]:
  585. """
  586. Email address validation using https://pypi.org/project/email-validator/
  587. Notes:
  588. * raw ip address (literal) domain parts are not allowed.
  589. * "John Doe <local_part@domain.com>" style "pretty" email addresses are processed
  590. * spaces are striped from the beginning and end of addresses but no error is raised
  591. """
  592. if email_validator is None:
  593. import_email_validator()
  594. if len(value) > MAX_EMAIL_LENGTH:
  595. raise errors.EmailError()
  596. m = pretty_email_regex.fullmatch(value)
  597. name: Union[str, None] = None
  598. if m:
  599. name, value = m.groups()
  600. email = value.strip()
  601. try:
  602. parts = email_validator.validate_email(email, check_deliverability=False)
  603. except email_validator.EmailNotValidError as e:
  604. raise errors.EmailError from e
  605. if hasattr(parts, 'normalized'):
  606. # email-validator >= 2
  607. email = parts.normalized
  608. assert email is not None
  609. name = name or parts.local_part
  610. return name, email
  611. else:
  612. # email-validator >1, <2
  613. at_index = email.index('@')
  614. local_part = email[:at_index] # RFC 5321, local part must be case-sensitive.
  615. global_part = email[at_index:].lower()
  616. return name or local_part, local_part + global_part