s3.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. # from app.common import s3
  2. import boto3
  3. from botocore.client import Config
  4. import re
  5. import random
  6. from typing import List, Union
  7. try:
  8. from app.config import s3_buckets, s3_clusters, s3_users # TODO delete 循环依赖
  9. from app.common.runtime import get_cluster_name
  10. except ImportError:
  11. from magic_pdf.config import s3_buckets, s3_clusters, get_cluster_name, s3_users
  12. __re_s3_path = re.compile("^s3a?://([^/]+)(?:/(.*))?$")
  13. def get_s3_config(path: Union[str, List[str]], outside=False):
  14. paths = [path] if type(path) == str else path
  15. bucket_config = None
  16. for p in paths:
  17. bc = __get_s3_bucket_config(p)
  18. if bucket_config in [bc, None]:
  19. bucket_config = bc
  20. continue
  21. raise Exception(f"{paths} have different s3 config, cannot read together.")
  22. if not bucket_config:
  23. raise Exception("path is empty.")
  24. return __get_s3_config(bucket_config, outside, prefer_ip=True)
  25. def __get_s3_config(
  26. bucket_config: tuple,
  27. outside: bool,
  28. prefer_ip=False,
  29. prefer_auto=False,
  30. ):
  31. cluster, user = bucket_config
  32. cluster_config = s3_clusters[cluster]
  33. if outside:
  34. endpoint_key = "outside"
  35. elif prefer_auto and "auto" in cluster_config:
  36. endpoint_key = "auto"
  37. elif cluster_config.get("cluster") == get_cluster_name():
  38. endpoint_key = "inside"
  39. else:
  40. endpoint_key = "outside"
  41. if prefer_ip and f"{endpoint_key}_ips" in cluster_config:
  42. endpoint_key = f"{endpoint_key}_ips"
  43. endpoints = cluster_config[endpoint_key]
  44. endpoint = random.choice(endpoints)
  45. return {"endpoint": endpoint, **s3_users[user]}
  46. def split_s3_path(path: str):
  47. "split bucket and key from path"
  48. m = __re_s3_path.match(path)
  49. if m is None:
  50. return "", ""
  51. return m.group(1), (m.group(2) or "")
  52. def __get_s3_bucket_config(path: str):
  53. bucket = split_s3_path(path)[0] if path else ""
  54. bucket_config = s3_buckets.get(bucket)
  55. if not bucket_config:
  56. bucket_config = s3_buckets.get("[default]")
  57. assert bucket_config is not None
  58. return bucket_config
  59. def get_s3_client(path: Union[str, List[str]], outside=False):
  60. s3_config = get_s3_config(path, outside)
  61. try:
  62. return boto3.client(
  63. "s3",
  64. aws_access_key_id=s3_config["ak"],
  65. aws_secret_access_key=s3_config["sk"],
  66. endpoint_url=s3_config["endpoint"],
  67. config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8, "mode": "standard"}),
  68. )
  69. except:
  70. # older boto3 do not support retries.mode param.
  71. return boto3.client(
  72. "s3",
  73. aws_access_key_id=s3_config["ak"],
  74. aws_secret_access_key=s3_config["sk"],
  75. endpoint_url=s3_config["endpoint"],
  76. config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8}),
  77. )