| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
- from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key, join_path
- import boto3
- from loguru import logger
- from botocore.config import Config
- class S3ReaderWriter(AbsReaderWriter):
- def __init__(
- self,
- ak: str,
- sk: str,
- endpoint_url: str,
- addressing_style: str = "auto",
- parent_path: str = "",
- ):
- self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
- self.path = parent_path
- def _get_client(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
- s3_client = boto3.client(
- service_name="s3",
- aws_access_key_id=ak,
- aws_secret_access_key=sk,
- endpoint_url=endpoint_url,
- config=Config(
- s3={"addressing_style": addressing_style},
- retries={"max_attempts": 5, "mode": "standard"},
- ),
- )
- return s3_client
- def read(self, s3_relative_path, mode=AbsReaderWriter.MODE_TXT, encoding="utf-8"):
- if s3_relative_path.startswith("s3://"):
- s3_path = s3_relative_path
- else:
- s3_path = join_path(self.path, s3_relative_path)
- bucket_name, key = parse_bucket_key(s3_path)
- res = self.client.get_object(Bucket=bucket_name, Key=key)
- body = res["Body"].read()
- if mode == AbsReaderWriter.MODE_TXT:
- data = body.decode(encoding) # Decode bytes to text
- elif mode == AbsReaderWriter.MODE_BIN:
- data = body
- else:
- raise ValueError("Invalid mode. Use 'text' or 'binary'.")
- return data
- def write(self, content, s3_relative_path, mode=AbsReaderWriter.MODE_TXT, encoding="utf-8"):
- if s3_relative_path.startswith("s3://"):
- s3_path = s3_relative_path
- else:
- s3_path = join_path(self.path, s3_relative_path)
- if mode == AbsReaderWriter.MODE_TXT:
- body = content.encode(encoding) # Encode text data as bytes
- elif mode == AbsReaderWriter.MODE_BIN:
- body = content
- else:
- raise ValueError("Invalid mode. Use 'text' or 'binary'.")
- bucket_name, key = parse_bucket_key(s3_path)
- self.client.put_object(Body=body, Bucket=bucket_name, Key=key)
- logger.info(f"内容已写入 {s3_path} ")
- def read_offset(self, path: str, offset=0, limit=None) -> bytes:
- if path.startswith("s3://"):
- s3_path = path
- else:
- s3_path = join_path(self.path, path)
- bucket_name, key = parse_bucket_key(s3_path)
- range_header = (
- f"bytes={offset}-{offset+limit-1}" if limit else f"bytes={offset}-"
- )
- res = self.client.get_object(Bucket=bucket_name, Key=key, Range=range_header)
- return res["Body"].read()
- if __name__ == "__main__":
- if 0:
- # Config the connection info
- ak = ""
- sk = ""
- endpoint_url = ""
- addressing_style = "auto"
- bucket_name = ""
- # Create an S3ReaderWriter object
- s3_reader_writer = S3ReaderWriter(
- ak, sk, endpoint_url, addressing_style, "s3://bucket_name/"
- )
- # Write text data to S3
- text_data = "This is some text data"
- s3_reader_writer.write(
- text_data,
- s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json",
- mode=AbsReaderWriter.MODE_TXT,
- )
- # Read text data from S3
- text_data_read = s3_reader_writer.read(
- s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=AbsReaderWriter.MODE_TXT
- )
- logger.info(f"Read text data from S3: {text_data_read}")
- # Write binary data to S3
- binary_data = b"This is some binary data"
- s3_reader_writer.write(
- text_data,
- s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json",
- mode=AbsReaderWriter.MODE_BIN,
- )
- # Read binary data from S3
- binary_data_read = s3_reader_writer.read(
- s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=AbsReaderWriter.MODE_BIN
- )
- logger.info(f"Read binary data from S3: {binary_data_read}")
- # Range Read text data from S3
- binary_data_read = s3_reader_writer.read_offset(
- path=f"s3://{bucket_name}/ebook/test/test.json", offset=0, limit=10
- )
- logger.info(f"Read binary data from S3: {binary_data_read}")
- if 1:
- import os
- import json
- ak = os.getenv("AK", "")
- sk = os.getenv("SK", "")
- endpoint_url = os.getenv("ENDPOINT", "")
- bucket = os.getenv("S3_BUCKET", "")
- prefix = os.getenv("S3_PREFIX", "")
- key_basename = os.getenv("S3_KEY_BASENAME", "")
- s3_reader_writer = S3ReaderWriter(
- ak, sk, endpoint_url, "auto", f"s3://{bucket}/{prefix}"
- )
- content_bin = s3_reader_writer.read_offset(key_basename)
- assert content_bin[:10] == b'{"track_id'
- assert content_bin[-10:] == b'r":null}}\n'
- content_bin = s3_reader_writer.read_offset(key_basename, offset=424, limit=426)
- jso = json.dumps(content_bin.decode("utf-8"))
- print(jso)
|