commons.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import datetime
  2. import os, re, configparser
  3. import time
  4. import boto3
  5. from loguru import logger
  6. from boto3.s3.transfer import TransferConfig
  7. from botocore.config import Config
  8. import fitz # 1.23.9中已经切换到rebase
  9. # import fitz_old as fitz # 使用1.23.9之前的pymupdf库
  10. def get_delta_time(input_time):
  11. return round(time.time() - input_time, 2)
  12. def join_path(*args):
  13. return '/'.join(s.rstrip('/') for s in args)
  14. #配置全局的errlog_path,方便demo同步引用
  15. error_log_path = "s3://llm-pdf-text/err_logs/"
  16. # json_dump_path = "s3://pdf_books_temp/json_dump/" # 这条路径仅用于临时本地测试,不能提交到main
  17. json_dump_path = "s3://llm-pdf-text/json_dump/"
  18. def get_top_percent_list(num_list, percent):
  19. """
  20. 获取列表中前百分之多少的元素
  21. :param num_list:
  22. :param percent:
  23. :return:
  24. """
  25. if len(num_list) == 0:
  26. top_percent_list = []
  27. else:
  28. # 对imgs_len_list排序
  29. sorted_imgs_len_list = sorted(num_list, reverse=True)
  30. # 计算 percent 的索引
  31. top_percent_index = int(len(sorted_imgs_len_list) * percent)
  32. # 取前80%的元素
  33. top_percent_list = sorted_imgs_len_list[:top_percent_index]
  34. return top_percent_list
  35. def formatted_time(time_stamp):
  36. dt_object = datetime.datetime.fromtimestamp(time_stamp)
  37. output_time = dt_object.strftime("%Y-%m-%d-%H:%M:%S")
  38. return output_time
  39. def mymax(alist: list):
  40. if len(alist) == 0:
  41. return 0 # 空是0, 0*0也是0大小q
  42. else:
  43. return max(alist)
  44. def parse_aws_param(profile):
  45. if isinstance(profile, str):
  46. # 解析配置文件
  47. config_file = join_path(os.path.expanduser("~"), ".aws", "config")
  48. credentials_file = join_path(os.path.expanduser("~"), ".aws", "credentials")
  49. config = configparser.ConfigParser()
  50. config.read(credentials_file)
  51. config.read(config_file)
  52. # 获取 AWS 账户相关信息
  53. ak = config.get(profile, "aws_access_key_id")
  54. sk = config.get(profile, "aws_secret_access_key")
  55. if profile == "default":
  56. s3_str = config.get(f"{profile}", "s3")
  57. else:
  58. s3_str = config.get(f"profile {profile}", "s3")
  59. end_match = re.search("endpoint_url[\s]*=[\s]*([^\s\n]+)[\s\n]*$", s3_str, re.MULTILINE)
  60. if end_match:
  61. endpoint = end_match.group(1)
  62. else:
  63. raise ValueError(f"aws 配置文件中没有找到 endpoint_url")
  64. style_match = re.search("addressing_style[\s]*=[\s]*([^\s\n]+)[\s\n]*$", s3_str, re.MULTILINE)
  65. if style_match:
  66. addressing_style = style_match.group(1)
  67. else:
  68. addressing_style = "path"
  69. elif isinstance(profile, dict):
  70. ak = profile["ak"]
  71. sk = profile["sk"]
  72. endpoint = profile["endpoint"]
  73. addressing_style = "auto"
  74. return ak, sk, endpoint, addressing_style
  75. def parse_bucket_key(s3_full_path: str):
  76. """
  77. 输入 s3://bucket/path/to/my/file.txt
  78. 输出 bucket, path/to/my/file.txt
  79. """
  80. s3_full_path = s3_full_path.strip()
  81. if s3_full_path.startswith("s3://"):
  82. s3_full_path = s3_full_path[5:]
  83. if s3_full_path.startswith("/"):
  84. s3_full_path = s3_full_path[1:]
  85. bucket, key = s3_full_path.split("/", 1)
  86. return bucket, key
  87. def read_file(pdf_path: str, s3_profile):
  88. if pdf_path.startswith("s3://"):
  89. ak, sk, end_point, addressing_style = parse_aws_param(s3_profile)
  90. cli = boto3.client(service_name="s3", aws_access_key_id=ak, aws_secret_access_key=sk, endpoint_url=end_point,
  91. config=Config(s3={'addressing_style': addressing_style}, retries={'max_attempts': 10, 'mode': 'standard'}))
  92. bucket_name, bucket_key = parse_bucket_key(pdf_path)
  93. res = cli.get_object(Bucket=bucket_name, Key=bucket_key)
  94. file_content = res["Body"].read()
  95. return file_content
  96. else:
  97. with open(pdf_path, "rb") as f:
  98. return f.read()
  99. def list_dir(dir_path:str, s3_profile:str):
  100. """
  101. 列出dir_path下的所有文件
  102. """
  103. ret = []
  104. if dir_path.startswith("s3"):
  105. ak, sk, end_point, addressing_style = parse_aws_param(s3_profile)
  106. s3info = re.findall(r"s3:\/\/([^\/]+)\/(.*)", dir_path)
  107. bucket, path = s3info[0][0], s3info[0][1]
  108. try:
  109. cli = boto3.client(service_name="s3", aws_access_key_id=ak, aws_secret_access_key=sk, endpoint_url=end_point,
  110. config=Config(s3={'addressing_style': addressing_style}))
  111. def list_obj_scluster():
  112. marker = None
  113. while True:
  114. list_kwargs = dict(MaxKeys=1000, Bucket=bucket, Prefix=path)
  115. if marker:
  116. list_kwargs['Marker'] = marker
  117. response = cli.list_objects(**list_kwargs)
  118. contents = response.get("Contents", [])
  119. yield from contents
  120. if not response.get("IsTruncated") or len(contents)==0:
  121. break
  122. marker = contents[-1]['Key']
  123. for info in list_obj_scluster():
  124. file_path = info['Key']
  125. #size = info['Size']
  126. if path!="":
  127. afile = file_path[len(path):]
  128. if afile.endswith(".json"):
  129. ret.append(f"s3://{bucket}/{file_path}")
  130. return ret
  131. except Exception as e:
  132. logger.exception(e)
  133. exit(-1)
  134. else: #本地的目录,那么扫描本地目录并返会这个目录里的所有jsonl文件
  135. for root, dirs, files in os.walk(dir_path):
  136. for file in files:
  137. if file.endswith(".json"):
  138. ret.append(join_path(root, file))
  139. ret.sort()
  140. return ret
  141. def get_img_s3_client(save_path:str, image_s3_config:str):
  142. """
  143. """
  144. if save_path.startswith("s3://"): # 放这里是为了最少创建一个s3 client
  145. ak, sk, end_point, addressing_style = parse_aws_param(image_s3_config)
  146. img_s3_client = boto3.client(
  147. service_name="s3",
  148. aws_access_key_id=ak,
  149. aws_secret_access_key=sk,
  150. endpoint_url=end_point,
  151. config=Config(s3={"addressing_style": addressing_style}, retries={'max_attempts': 5, 'mode': 'standard'}),
  152. )
  153. else:
  154. img_s3_client = None
  155. return img_s3_client
  156. # def get_s3_object(path):
  157. # src_cli_config = Config(**{
  158. #
  159. # "connect_timeout": 60,
  160. # "read_timeout": 20,
  161. # "max_pool_connections": 500,
  162. # "s3": {
  163. # "addressing_style": "path",
  164. # },
  165. # "retries": {
  166. # "max_attempts": 3,
  167. # }
  168. # })
  169. # full_path = f"{bucket_name}/{bucket_prefix}/{path}"
  170. # try:
  171. # src_cli = boto3.session.Session().client("s3", aws_access_key_id=ak, aws_secret_access_key=sk, endpoint_url=endpoint, region_name='', config=src_cli_config)
  172. # res = src_cli.get_object(Bucket=bucket_name, Key=f"{bucket_prefix}/{path}")
  173. # file_content = res["Body"].read()
  174. # return file_content
  175. # except Exception as e:
  176. # logger.error(f"get_s3_object({full_path}) error: {e}")
  177. # return b''
  178. if __name__=="__main__":
  179. s3_path = "s3://llm-pdf-text/layout_det/scihub/scimag07865000-07865999/10.1007/s10729-011-9175-6.pdf/"
  180. s3_profile = "langchao"
  181. ret = list_dir(s3_path, s3_profile)
  182. print(ret)