S3ReaderWriter.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from magic_pdf.rw.AbsReaderWriter import AbsReaderWriter
  2. from magic_pdf.libs.commons import parse_aws_param, parse_bucket_key, join_path
  3. import boto3
  4. from loguru import logger
  5. from botocore.config import Config
  6. class S3ReaderWriter(AbsReaderWriter):
  7. def __init__(
  8. self,
  9. ak: str,
  10. sk: str,
  11. endpoint_url: str,
  12. addressing_style: str = "auto",
  13. parent_path: str = "",
  14. ):
  15. self.client = self._get_client(ak, sk, endpoint_url, addressing_style)
  16. self.path = parent_path
  17. def _get_client(self, ak: str, sk: str, endpoint_url: str, addressing_style: str):
  18. s3_client = boto3.client(
  19. service_name="s3",
  20. aws_access_key_id=ak,
  21. aws_secret_access_key=sk,
  22. endpoint_url=endpoint_url,
  23. config=Config(
  24. s3={"addressing_style": addressing_style},
  25. retries={"max_attempts": 5, "mode": "standard"},
  26. ),
  27. )
  28. return s3_client
  29. def read(self, s3_relative_path, mode=AbsReaderWriter.MODE_TXT, encoding="utf-8"):
  30. if s3_relative_path.startswith("s3://"):
  31. s3_path = s3_relative_path
  32. else:
  33. s3_path = join_path(self.path, s3_relative_path)
  34. bucket_name, key = parse_bucket_key(s3_path)
  35. res = self.client.get_object(Bucket=bucket_name, Key=key)
  36. body = res["Body"].read()
  37. if mode == AbsReaderWriter.MODE_TXT:
  38. data = body.decode(encoding) # Decode bytes to text
  39. elif mode == AbsReaderWriter.MODE_BIN:
  40. data = body
  41. else:
  42. raise ValueError("Invalid mode. Use 'text' or 'binary'.")
  43. return data
  44. def write(self, content, s3_relative_path, mode=AbsReaderWriter.MODE_TXT, encoding="utf-8"):
  45. if s3_relative_path.startswith("s3://"):
  46. s3_path = s3_relative_path
  47. else:
  48. s3_path = join_path(self.path, s3_relative_path)
  49. if mode == AbsReaderWriter.MODE_TXT:
  50. body = content.encode(encoding) # Encode text data as bytes
  51. elif mode == AbsReaderWriter.MODE_BIN:
  52. body = content
  53. else:
  54. raise ValueError("Invalid mode. Use 'text' or 'binary'.")
  55. bucket_name, key = parse_bucket_key(s3_path)
  56. self.client.put_object(Body=body, Bucket=bucket_name, Key=key)
  57. logger.info(f"内容已写入 {s3_path} ")
  58. def read_offset(self, path: str, offset=0, limit=None) -> bytes:
  59. if path.startswith("s3://"):
  60. s3_path = path
  61. else:
  62. s3_path = join_path(self.path, path)
  63. bucket_name, key = parse_bucket_key(s3_path)
  64. range_header = (
  65. f"bytes={offset}-{offset+limit-1}" if limit else f"bytes={offset}-"
  66. )
  67. res = self.client.get_object(Bucket=bucket_name, Key=key, Range=range_header)
  68. return res["Body"].read()
  69. if __name__ == "__main__":
  70. if 0:
  71. # Config the connection info
  72. ak = ""
  73. sk = ""
  74. endpoint_url = ""
  75. addressing_style = "auto"
  76. bucket_name = ""
  77. # Create an S3ReaderWriter object
  78. s3_reader_writer = S3ReaderWriter(
  79. ak, sk, endpoint_url, addressing_style, "s3://bucket_name/"
  80. )
  81. # Write text data to S3
  82. text_data = "This is some text data"
  83. s3_reader_writer.write(
  84. text_data,
  85. s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json",
  86. mode=AbsReaderWriter.MODE_TXT,
  87. )
  88. # Read text data from S3
  89. text_data_read = s3_reader_writer.read(
  90. s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=AbsReaderWriter.MODE_TXT
  91. )
  92. logger.info(f"Read text data from S3: {text_data_read}")
  93. # Write binary data to S3
  94. binary_data = b"This is some binary data"
  95. s3_reader_writer.write(
  96. text_data,
  97. s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json",
  98. mode=AbsReaderWriter.MODE_BIN,
  99. )
  100. # Read binary data from S3
  101. binary_data_read = s3_reader_writer.read(
  102. s3_relative_path=f"s3://{bucket_name}/ebook/test/test.json", mode=AbsReaderWriter.MODE_BIN
  103. )
  104. logger.info(f"Read binary data from S3: {binary_data_read}")
  105. # Range Read text data from S3
  106. binary_data_read = s3_reader_writer.read_offset(
  107. path=f"s3://{bucket_name}/ebook/test/test.json", offset=0, limit=10
  108. )
  109. logger.info(f"Read binary data from S3: {binary_data_read}")
  110. if 1:
  111. import os
  112. import json
  113. ak = os.getenv("AK", "")
  114. sk = os.getenv("SK", "")
  115. endpoint_url = os.getenv("ENDPOINT", "")
  116. bucket = os.getenv("S3_BUCKET", "")
  117. prefix = os.getenv("S3_PREFIX", "")
  118. key_basename = os.getenv("S3_KEY_BASENAME", "")
  119. s3_reader_writer = S3ReaderWriter(
  120. ak, sk, endpoint_url, "auto", f"s3://{bucket}/{prefix}"
  121. )
  122. content_bin = s3_reader_writer.read_offset(key_basename)
  123. assert content_bin[:10] == b'{"track_id'
  124. assert content_bin[-10:] == b'r":null}}\n'
  125. content_bin = s3_reader_writer.read_offset(key_basename, offset=424, limit=426)
  126. jso = json.dumps(content_bin.decode("utf-8"))
  127. print(jso)