S3ReaderWriter.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from magic_pdf.io.AbsReaderWriter import AbsReaderWriter
  2. from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key
  3. import boto3
  4. from loguru import logger
  5. from boto3.s3.transfer import TransferConfig
  6. from botocore.config import Config
  7. class S3ReaderWriter(AbsReaderWriter):
  8. def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
  9. self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
  10. def _get_client(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
  11. s3_client = boto3.client(
  12. service_name="s3",
  13. aws_access_key_id=ak,
  14. aws_secret_access_key=sk,
  15. endpoint_url=endpoint_url,
  16. config=Config(s3={"addressing_style": addressing_style},
  17. retries={'max_attempts': 5, 'mode': 'standard'}),
  18. )
  19. return s3_client
  20. def read(self, s3_path, mode="text", encoding="utf-8"):
  21. bucket_name, bucket_key = parse_bucket_key(s3_path)
  22. res = self.client.get_object(Bucket=bucket_name, Key=bucket_key)
  23. body = res["Body"].read()
  24. if mode == 'text':
  25. data = body.decode(encoding) # Decode bytes to text
  26. elif mode == 'binary':
  27. data = body
  28. else:
  29. raise ValueError("Invalid mode. Use 'text' or 'binary'.")
  30. return data
  31. def write(self, data, s3_path, mode="text", encoding="utf-8"):
  32. if mode == 'text':
  33. body = data.encode(encoding) # Encode text data as bytes
  34. elif mode == 'binary':
  35. body = data
  36. else:
  37. raise ValueError("Invalid mode. Use 'text' or 'binary'.")
  38. bucket_name, bucket_key = parse_bucket_key(s3_path)
  39. self.client.put_object(Body=body, Bucket=bucket_name, Key=bucket_key)
  40. logger.info(f"内容已写入 {s3_path} ")
  41. if __name__ == "__main__":
  42. # Config the connection info
  43. ak = ""
  44. sk = ""
  45. endpoint_url = ""
  46. addressing_style = ""
  47. # Create an S3ReaderWriter object
  48. s3_reader_writer = S3ReaderWriter(ak, sk, endpoint_url, addressing_style)
  49. # Write text data to S3
  50. text_data = "This is some text data"
  51. s3_reader_writer.write(data=text_data, s3_path = "s3://bucket_name/ebook/test/test.json", mode='text')
  52. # Read text data from S3
  53. text_data_read = s3_reader_writer.read(s3_path = "s3://bucket_name/ebook/test/test.json", mode='text')
  54. logger.info(f"Read text data from S3: {text_data_read}")
  55. # Write binary data to S3
  56. binary_data = b"This is some binary data"
  57. s3_reader_writer.write(data=text_data, s3_path = "s3://bucket_name/ebook/test/test2.json", mode='binary')
  58. # Read binary data from S3
  59. binary_data_read = s3_reader_writer.read(s3_path = "s3://bucket_name/ebook/test/test2.json", mode='binary')
  60. logger.info(f"Read binary data from S3: {binary_data_read}")