s3.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. # from app.common import s3
  2. import boto3
  3. from botocore.client import Config
  4. import re
  5. import random
  6. from typing import List, Union
  7. try:
  8. from app.config import s3_buckets, s3_clusters, get_cluster_name, s3_users
  9. except ImportError:
  10. from magic_pdf.spark import s3_buckets, s3_clusters, get_cluster_name, s3_users
  11. __re_s3_path = re.compile("^s3a?://([^/]+)(?:/(.*))?$")
  12. def get_s3_config(path: Union[str, List[str]], outside=False):
  13. paths = [path] if type(path) == str else path
  14. bucket_config = None
  15. for p in paths:
  16. bc = __get_s3_bucket_config(p)
  17. if bucket_config in [bc, None]:
  18. bucket_config = bc
  19. continue
  20. raise Exception(f"{paths} have different s3 config, cannot read together.")
  21. if not bucket_config:
  22. raise Exception("path is empty.")
  23. return __get_s3_config(bucket_config, outside, prefer_ip=True)
  24. def __get_s3_config(
  25. bucket_config: tuple,
  26. outside: bool,
  27. prefer_ip=False,
  28. prefer_auto=False,
  29. ):
  30. cluster, user = bucket_config
  31. cluster_config = s3_clusters[cluster]
  32. if outside:
  33. endpoint_key = "outside"
  34. elif prefer_auto and "auto" in cluster_config:
  35. endpoint_key = "auto"
  36. elif cluster_config.get("cluster") == get_cluster_name():
  37. endpoint_key = "inside"
  38. else:
  39. endpoint_key = "outside"
  40. if prefer_ip and f"{endpoint_key}_ips" in cluster_config:
  41. endpoint_key = f"{endpoint_key}_ips"
  42. endpoints = cluster_config[endpoint_key]
  43. endpoint = random.choice(endpoints)
  44. return {"endpoint": endpoint, **s3_users[user]}
  45. def split_s3_path(path: str):
  46. "split bucket and key from path"
  47. m = __re_s3_path.match(path)
  48. if m is None:
  49. return "", ""
  50. return m.group(1), (m.group(2) or "")
  51. def __get_s3_bucket_config(path: str):
  52. bucket = split_s3_path(path)[0] if path else ""
  53. bucket_config = s3_buckets.get(bucket)
  54. if not bucket_config:
  55. bucket_config = s3_buckets.get("[default]")
  56. assert bucket_config is not None
  57. return bucket_config
  58. def get_s3_client(path: Union[str, List[str]], outside=False):
  59. s3_config = get_s3_config(path, outside)
  60. try:
  61. return boto3.client(
  62. "s3",
  63. aws_access_key_id=s3_config["ak"],
  64. aws_secret_access_key=s3_config["sk"],
  65. endpoint_url=s3_config["endpoint"],
  66. config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8, "mode": "standard"}),
  67. )
  68. except:
  69. # older boto3 do not support retries.mode param.
  70. return boto3.client(
  71. "s3",
  72. aws_access_key_id=s3_config["ak"],
  73. aws_secret_access_key=s3_config["sk"],
  74. endpoint_url=s3_config["endpoint"],
  75. config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8}),
  76. )