s3.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # from app.common import s3
  2. import boto3
  3. from botocore.client import Config
  4. from app.common import s3_buckets, s3_clusters, get_cluster_name, s3_users
  5. import re
  6. import random
  7. from typing import List, Union
  8. __re_s3_path = re.compile("^s3a?://([^/]+)(?:/(.*))?$")
  9. def get_s3_config(path: Union[str, List[str]], outside=False):
  10. paths = [path] if type(path) == str else path
  11. bucket_config = None
  12. for p in paths:
  13. bc = __get_s3_bucket_config(p)
  14. if bucket_config in [bc, None]:
  15. bucket_config = bc
  16. continue
  17. raise Exception(f"{paths} have different s3 config, cannot read together.")
  18. if not bucket_config:
  19. raise Exception("path is empty.")
  20. return __get_s3_config(bucket_config, outside, prefer_ip=True)
  21. def __get_s3_config(
  22. bucket_config: tuple,
  23. outside: bool,
  24. prefer_ip=False,
  25. prefer_auto=False,
  26. ):
  27. cluster, user = bucket_config
  28. cluster_config = s3_clusters[cluster]
  29. if outside:
  30. endpoint_key = "outside"
  31. elif prefer_auto and "auto" in cluster_config:
  32. endpoint_key = "auto"
  33. elif cluster_config.get("cluster") == get_cluster_name():
  34. endpoint_key = "inside"
  35. else:
  36. endpoint_key = "outside"
  37. if prefer_ip and f"{endpoint_key}_ips" in cluster_config:
  38. endpoint_key = f"{endpoint_key}_ips"
  39. endpoints = cluster_config[endpoint_key]
  40. endpoint = random.choice(endpoints)
  41. return {"endpoint": endpoint, **s3_users[user]}
  42. def split_s3_path(path: str):
  43. "split bucket and key from path"
  44. m = __re_s3_path.match(path)
  45. if m is None:
  46. return "", ""
  47. return m.group(1), (m.group(2) or "")
  48. def __get_s3_bucket_config(path: str):
  49. bucket = split_s3_path(path)[0] if path else ""
  50. bucket_config = s3_buckets.get(bucket)
  51. if not bucket_config:
  52. bucket_config = s3_buckets.get("[default]")
  53. assert bucket_config is not None
  54. return bucket_config
  55. def get_s3_client(path: Union[str, List[str]], outside=False):
  56. s3_config = get_s3_config(path, outside)
  57. try:
  58. return boto3.client(
  59. "s3",
  60. aws_access_key_id=s3_config["ak"],
  61. aws_secret_access_key=s3_config["sk"],
  62. endpoint_url=s3_config["endpoint"],
  63. config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8, "mode": "standard"}),
  64. )
  65. except:
  66. # older boto3 do not support retries.mode param.
  67. return boto3.client(
  68. "s3",
  69. aws_access_key_id=s3_config["ak"],
  70. aws_secret_access_key=s3_config["sk"],
  71. endpoint_url=s3_config["endpoint"],
  72. config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8}),
  73. )