| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- # from app.common import s3
- import boto3
- from botocore.client import Config
- from app.common import s3_buckets, s3_clusters, get_cluster_name, s3_users
- import re
- import random
- from typing import List, Union
- __re_s3_path = re.compile("^s3a?://([^/]+)(?:/(.*))?$")
- def get_s3_config(path: Union[str, List[str]], outside=False):
- paths = [path] if type(path) == str else path
- bucket_config = None
- for p in paths:
- bc = __get_s3_bucket_config(p)
- if bucket_config in [bc, None]:
- bucket_config = bc
- continue
- raise Exception(f"{paths} have different s3 config, cannot read together.")
- if not bucket_config:
- raise Exception("path is empty.")
- return __get_s3_config(bucket_config, outside, prefer_ip=True)
- def __get_s3_config(
- bucket_config: tuple,
- outside: bool,
- prefer_ip=False,
- prefer_auto=False,
- ):
- cluster, user = bucket_config
- cluster_config = s3_clusters[cluster]
- if outside:
- endpoint_key = "outside"
- elif prefer_auto and "auto" in cluster_config:
- endpoint_key = "auto"
- elif cluster_config.get("cluster") == get_cluster_name():
- endpoint_key = "inside"
- else:
- endpoint_key = "outside"
- if prefer_ip and f"{endpoint_key}_ips" in cluster_config:
- endpoint_key = f"{endpoint_key}_ips"
- endpoints = cluster_config[endpoint_key]
- endpoint = random.choice(endpoints)
- return {"endpoint": endpoint, **s3_users[user]}
- def split_s3_path(path: str):
- "split bucket and key from path"
- m = __re_s3_path.match(path)
- if m is None:
- return "", ""
- return m.group(1), (m.group(2) or "")
- def __get_s3_bucket_config(path: str):
- bucket = split_s3_path(path)[0] if path else ""
- bucket_config = s3_buckets.get(bucket)
- if not bucket_config:
- bucket_config = s3_buckets.get("[default]")
- assert bucket_config is not None
- return bucket_config
- def get_s3_client(path: Union[str, List[str]], outside=False):
- s3_config = get_s3_config(path, outside)
- try:
- return boto3.client(
- "s3",
- aws_access_key_id=s3_config["ak"],
- aws_secret_access_key=s3_config["sk"],
- endpoint_url=s3_config["endpoint"],
- config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8, "mode": "standard"}),
- )
- except:
- # older boto3 do not support retries.mode param.
- return boto3.client(
- "s3",
- aws_access_key_id=s3_config["ak"],
- aws_secret_access_key=s3_config["sk"],
- endpoint_url=s3_config["endpoint"],
- config=Config(s3={"addressing_style": "path"}, retries={"max_attempts": 8}),
- )
|