S3ReaderWriter.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
  2. from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key, join_path
  3. import boto3
  4. from loguru import logger
  5. from boto3.s3.transfer import TransferConfig
  6. from botocore.config import Config
  7. import os
  8. MODE_TXT = "text"
  9. MODE_BIN = "binary"
  10. class S3ReaderWriter(AbsReaderWriter):
  11. def __init__(self, ak: str, sk: str, endpoint_url: str, addressing_style: str = 'auto', parent_path: str = ''):
  12. self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
  13. self.path = parent_path
  14. def _get_client(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
  15. s3_client = boto3.client(
  16. service_name="s3",
  17. aws_access_key_id=ak,
  18. aws_secret_access_key=sk,
  19. endpoint_url=endpoint_url,
  20. config=Config(s3={"addressing_style": addressing_style},
  21. retries={'max_attempts': 5, 'mode': 'standard'}),
  22. )
  23. return s3_client
  24. def read(self, s3_relative_path, mode=MODE_TXT, encoding="utf-8"):
  25. if s3_relative_path.startswith("s3://"):
  26. s3_path = s3_relative_path
  27. else:
  28. s3_path = join_path(self.path, s3_relative_path)
  29. bucket_name, key = parse_bucket_key(s3_path)
  30. res = self.client.get_object(Bucket=bucket_name, Key=key)
  31. body = res["Body"].read()
  32. if mode == MODE_TXT:
  33. data = body.decode(encoding) # Decode bytes to text
  34. elif mode == MODE_BIN:
  35. data = body
  36. else:
  37. raise ValueError("Invalid mode. Use 'text' or 'binary'.")
  38. return data
  39. def write(self, content, s3_relative_path, mode=MODE_TXT, encoding="utf-8"):
  40. if s3_relative_path.startswith("s3://"):
  41. s3_path = s3_relative_path
  42. else:
  43. s3_path = join_path(self.path, s3_relative_path)
  44. if mode == MODE_TXT:
  45. body = content.encode(encoding) # Encode text data as bytes
  46. elif mode == MODE_BIN:
  47. body = content
  48. else:
  49. raise ValueError("Invalid mode. Use 'text' or 'binary'.")
  50. bucket_name, key = parse_bucket_key(s3_path)
  51. self.client.put_object(Body=body, Bucket=bucket_name, Key=key)
  52. logger.info(f"内容已写入 {s3_path} ")
  53. def read_jsonl(self, path: str, byte_start=0, byte_end=None, mode=MODE_TXT, encoding='utf-8'):
  54. if path.startswith("s3://"):
  55. s3_path = path
  56. else:
  57. s3_path = join_path(self.path, path)
  58. bucket_name, key = parse_bucket_key(s3_path)
  59. range_header = f'bytes={byte_start}-{byte_end}' if byte_end else f'bytes={byte_start}-'
  60. res = self.client.get_object(Bucket=bucket_name, Key=key, Range=range_header)
  61. body = res["Body"].read()
  62. if mode == MODE_TXT:
  63. data = body.decode(encoding) # Decode bytes to text
  64. elif mode == MODE_BIN:
  65. data = body
  66. else:
  67. raise ValueError("Invalid mode. Use 'text' or 'binary'.")
  68. return data
  69. if __name__ == "__main__":
  70. # Config the connection info
  71. ak = ""
  72. sk = ""
  73. endpoint_url = ""
  74. addressing_style = "auto"
  75. bucket_name = ""
  76. # Create an S3ReaderWriter object
  77. s3_reader_writer = S3ReaderWriter(ak, sk, endpoint_url, addressing_style, "s3://bucket_name/")
  78. # Write text data to S3
  79. text_data = "This is some text data"
  80. s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT)
  81. # Read text data from S3
  82. text_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_TXT)
  83. logger.info(f"Read text data from S3: {text_data_read}")
  84. # Write binary data to S3
  85. binary_data = b"This is some binary data"
  86. s3_reader_writer.write(data=text_data, s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN)
  87. # Read binary data from S3
  88. binary_data_read = s3_reader_writer.read(s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=MODE_BIN)
  89. logger.info(f"Read binary data from S3: {binary_data_read}")
  90. # Range Read text data from S3
  91. binary_data_read = s3_reader_writer.read_jsonl(path=f"s3://{bucket_name}/ebook/test/test.json",
  92. byte_start=0, byte_end=10, mode=MODE_BIN)
  93. logger.info(f"Read binary data from S3: {binary_data_read}")