commons.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import datetime
  2. import json
  3. import os, re, configparser
  4. import subprocess
  5. import time
  6. import boto3
  7. from loguru import logger
  8. from boto3.s3.transfer import TransferConfig
  9. from botocore.config import Config
  10. import fitz # 1.23.9中已经切换到rebase
  11. # import fitz_old as fitz # 使用1.23.9之前的pymupdf库
  12. def get_delta_time(input_time):
  13. return round(time.time() - input_time, 2)
  14. def join_path(*args):
  15. return '/'.join(str(s).rstrip('/') for s in args)
  16. #配置全局的errlog_path,方便demo同步引用
  17. error_log_path = "s3://llm-pdf-text/err_logs/"
  18. # json_dump_path = "s3://pdf_books_temp/json_dump/" # 这条路径仅用于临时本地测试,不能提交到main
  19. json_dump_path = "s3://llm-pdf-text/json_dump/"
  20. # s3_image_save_path = "s3://mllm-raw-media/pdf2md_img/" # 基础库不应该有这些存在的路径,应该在业务代码中定义
  21. def get_top_percent_list(num_list, percent):
  22. """
  23. 获取列表中前百分之多少的元素
  24. :param num_list:
  25. :param percent:
  26. :return:
  27. """
  28. if len(num_list) == 0:
  29. top_percent_list = []
  30. else:
  31. # 对imgs_len_list排序
  32. sorted_imgs_len_list = sorted(num_list, reverse=True)
  33. # 计算 percent 的索引
  34. top_percent_index = int(len(sorted_imgs_len_list) * percent)
  35. # 取前80%的元素
  36. top_percent_list = sorted_imgs_len_list[:top_percent_index]
  37. return top_percent_list
  38. def formatted_time(time_stamp):
  39. dt_object = datetime.datetime.fromtimestamp(time_stamp)
  40. output_time = dt_object.strftime("%Y-%m-%d-%H:%M:%S")
  41. return output_time
  42. def mymax(alist: list):
  43. if len(alist) == 0:
  44. return 0 # 空是0, 0*0也是0大小q
  45. else:
  46. return max(alist)
  47. def parse_aws_param(profile):
  48. if isinstance(profile, str):
  49. # 解析配置文件
  50. config_file = join_path(os.path.expanduser("~"), ".aws", "config")
  51. credentials_file = join_path(os.path.expanduser("~"), ".aws", "credentials")
  52. config = configparser.ConfigParser()
  53. config.read(credentials_file)
  54. config.read(config_file)
  55. # 获取 AWS 账户相关信息
  56. ak = config.get(profile, "aws_access_key_id")
  57. sk = config.get(profile, "aws_secret_access_key")
  58. if profile == "default":
  59. s3_str = config.get(f"{profile}", "s3")
  60. else:
  61. s3_str = config.get(f"profile {profile}", "s3")
  62. end_match = re.search("endpoint_url[\s]*=[\s]*([^\s\n]+)[\s\n]*$", s3_str, re.MULTILINE)
  63. if end_match:
  64. endpoint = end_match.group(1)
  65. else:
  66. raise ValueError(f"aws 配置文件中没有找到 endpoint_url")
  67. style_match = re.search("addressing_style[\s]*=[\s]*([^\s\n]+)[\s\n]*$", s3_str, re.MULTILINE)
  68. if style_match:
  69. addressing_style = style_match.group(1)
  70. else:
  71. addressing_style = "path"
  72. elif isinstance(profile, dict):
  73. ak = profile["ak"]
  74. sk = profile["sk"]
  75. endpoint = profile["endpoint"]
  76. addressing_style = "auto"
  77. return ak, sk, endpoint, addressing_style
  78. def parse_bucket_key(s3_full_path: str):
  79. """
  80. 输入 s3://bucket/path/to/my/file.txt
  81. 输出 bucket, path/to/my/file.txt
  82. """
  83. s3_full_path = s3_full_path.strip()
  84. if s3_full_path.startswith("s3://"):
  85. s3_full_path = s3_full_path[5:]
  86. if s3_full_path.startswith("/"):
  87. s3_full_path = s3_full_path[1:]
  88. bucket, key = s3_full_path.split("/", 1)
  89. return bucket, key
  90. def read_file(pdf_path: str, s3_profile):
  91. if pdf_path.startswith("s3://"):
  92. ak, sk, end_point, addressing_style = parse_aws_param(s3_profile)
  93. cli = boto3.client(service_name="s3", aws_access_key_id=ak, aws_secret_access_key=sk, endpoint_url=end_point,
  94. config=Config(s3={'addressing_style': addressing_style}, retries={'max_attempts': 10, 'mode': 'standard'}))
  95. bucket_name, bucket_key = parse_bucket_key(pdf_path)
  96. res = cli.get_object(Bucket=bucket_name, Key=bucket_key)
  97. file_content = res["Body"].read()
  98. return file_content
  99. else:
  100. with open(pdf_path, "rb") as f:
  101. return f.read()
  102. def get_docx_model_output(pdf_model_output, page_id):
  103. model_output_json = pdf_model_output[page_id]
  104. return model_output_json
  105. def list_dir(dir_path:str, s3_profile:str):
  106. """
  107. 列出dir_path下的所有文件
  108. """
  109. ret = []
  110. if dir_path.startswith("s3"):
  111. ak, sk, end_point, addressing_style = parse_aws_param(s3_profile)
  112. s3info = re.findall(r"s3:\/\/([^\/]+)\/(.*)", dir_path)
  113. bucket, path = s3info[0][0], s3info[0][1]
  114. try:
  115. cli = boto3.client(service_name="s3", aws_access_key_id=ak, aws_secret_access_key=sk, endpoint_url=end_point,
  116. config=Config(s3={'addressing_style': addressing_style}))
  117. def list_obj_scluster():
  118. marker = None
  119. while True:
  120. list_kwargs = dict(MaxKeys=1000, Bucket=bucket, Prefix=path)
  121. if marker:
  122. list_kwargs['Marker'] = marker
  123. response = cli.list_objects(**list_kwargs)
  124. contents = response.get("Contents", [])
  125. yield from contents
  126. if not response.get("IsTruncated") or len(contents)==0:
  127. break
  128. marker = contents[-1]['Key']
  129. for info in list_obj_scluster():
  130. file_path = info['Key']
  131. #size = info['Size']
  132. if path!="":
  133. afile = file_path[len(path):]
  134. if afile.endswith(".json"):
  135. ret.append(f"s3://{bucket}/{file_path}")
  136. return ret
  137. except Exception as e:
  138. logger.exception(e)
  139. exit(-1)
  140. else: #本地的目录,那么扫描本地目录并返会这个目录里的所有jsonl文件
  141. for root, dirs, files in os.walk(dir_path):
  142. for file in files:
  143. if file.endswith(".json"):
  144. ret.append(join_path(root, file))
  145. ret.sort()
  146. return ret
  147. def get_img_s3_client(save_path:str, image_s3_config:str):
  148. """
  149. """
  150. if save_path.startswith("s3://"): # 放这里是为了最少创建一个s3 client
  151. ak, sk, end_point, addressing_style = parse_aws_param(image_s3_config)
  152. img_s3_client = boto3.client(
  153. service_name="s3",
  154. aws_access_key_id=ak,
  155. aws_secret_access_key=sk,
  156. endpoint_url=end_point,
  157. config=Config(s3={"addressing_style": addressing_style}, retries={'max_attempts': 5, 'mode': 'standard'}),
  158. )
  159. else:
  160. img_s3_client = None
  161. return img_s3_client
  162. if __name__=="__main__":
  163. s3_path = "s3://llm-pdf-text/layout_det/scihub/scimag07865000-07865999/10.1007/s10729-011-9175-6.pdf/"
  164. s3_profile = "langchao"
  165. ret = list_dir(s3_path, s3_profile)
  166. print(ret)