commons.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. import datetime
  2. import json
  3. import os, re, configparser
  4. import subprocess
  5. import time
  6. import boto3
  7. from loguru import logger
  8. from boto3.s3.transfer import TransferConfig
  9. from botocore.config import Config
  10. import fitz # 1.23.9中已经切换到rebase
  11. # import fitz_old as fitz # 使用1.23.9之前的pymupdf库
  12. def get_version():
  13. command = ["git", "describe", "--tags"]
  14. try:
  15. version = subprocess.check_output(command).decode().strip()
  16. version_parts = version.split("-")
  17. if len(version_parts) > 1 and version_parts[0].startswith("magic_pdf"):
  18. return version_parts[1]
  19. else:
  20. raise ValueError(f"Invalid version tag {version}. Expected format is magic_pdf-<version>-released.")
  21. except Exception as e:
  22. print(e)
  23. return "0.0.0"
  24. def get_delta_time(input_time):
  25. return round(time.time() - input_time, 2)
  26. def join_path(*args):
  27. return '/'.join(str(s).rstrip('/') for s in args)
  28. #配置全局的errlog_path,方便demo同步引用
  29. error_log_path = "s3://llm-pdf-text/err_logs/"
  30. # json_dump_path = "s3://pdf_books_temp/json_dump/" # 这条路径仅用于临时本地测试,不能提交到main
  31. json_dump_path = "s3://llm-pdf-text/json_dump/"
  32. # s3_image_save_path = "s3://mllm-raw-media/pdf2md_img/" # 基础库不应该有这些存在的路径,应该在业务代码中定义
  33. def get_top_percent_list(num_list, percent):
  34. """
  35. 获取列表中前百分之多少的元素
  36. :param num_list:
  37. :param percent:
  38. :return:
  39. """
  40. if len(num_list) == 0:
  41. top_percent_list = []
  42. else:
  43. # 对imgs_len_list排序
  44. sorted_imgs_len_list = sorted(num_list, reverse=True)
  45. # 计算 percent 的索引
  46. top_percent_index = int(len(sorted_imgs_len_list) * percent)
  47. # 取前80%的元素
  48. top_percent_list = sorted_imgs_len_list[:top_percent_index]
  49. return top_percent_list
  50. def formatted_time(time_stamp):
  51. dt_object = datetime.datetime.fromtimestamp(time_stamp)
  52. output_time = dt_object.strftime("%Y-%m-%d-%H:%M:%S")
  53. return output_time
  54. def mymax(alist: list):
  55. if len(alist) == 0:
  56. return 0 # 空是0, 0*0也是0大小q
  57. else:
  58. return max(alist)
  59. def parse_aws_param(profile):
  60. if isinstance(profile, str):
  61. # 解析配置文件
  62. config_file = join_path(os.path.expanduser("~"), ".aws", "config")
  63. credentials_file = join_path(os.path.expanduser("~"), ".aws", "credentials")
  64. config = configparser.ConfigParser()
  65. config.read(credentials_file)
  66. config.read(config_file)
  67. # 获取 AWS 账户相关信息
  68. ak = config.get(profile, "aws_access_key_id")
  69. sk = config.get(profile, "aws_secret_access_key")
  70. if profile == "default":
  71. s3_str = config.get(f"{profile}", "s3")
  72. else:
  73. s3_str = config.get(f"profile {profile}", "s3")
  74. end_match = re.search("endpoint_url[\s]*=[\s]*([^\s\n]+)[\s\n]*$", s3_str, re.MULTILINE)
  75. if end_match:
  76. endpoint = end_match.group(1)
  77. else:
  78. raise ValueError(f"aws 配置文件中没有找到 endpoint_url")
  79. style_match = re.search("addressing_style[\s]*=[\s]*([^\s\n]+)[\s\n]*$", s3_str, re.MULTILINE)
  80. if style_match:
  81. addressing_style = style_match.group(1)
  82. else:
  83. addressing_style = "path"
  84. elif isinstance(profile, dict):
  85. ak = profile["ak"]
  86. sk = profile["sk"]
  87. endpoint = profile["endpoint"]
  88. addressing_style = "auto"
  89. return ak, sk, endpoint, addressing_style
  90. def parse_bucket_key(s3_full_path: str):
  91. """
  92. 输入 s3://bucket/path/to/my/file.txt
  93. 输出 bucket, path/to/my/file.txt
  94. """
  95. s3_full_path = s3_full_path.strip()
  96. if s3_full_path.startswith("s3://"):
  97. s3_full_path = s3_full_path[5:]
  98. if s3_full_path.startswith("/"):
  99. s3_full_path = s3_full_path[1:]
  100. bucket, key = s3_full_path.split("/", 1)
  101. return bucket, key
  102. def read_file(pdf_path: str, s3_profile):
  103. if pdf_path.startswith("s3://"):
  104. ak, sk, end_point, addressing_style = parse_aws_param(s3_profile)
  105. cli = boto3.client(service_name="s3", aws_access_key_id=ak, aws_secret_access_key=sk, endpoint_url=end_point,
  106. config=Config(s3={'addressing_style': addressing_style}, retries={'max_attempts': 10, 'mode': 'standard'}))
  107. bucket_name, bucket_key = parse_bucket_key(pdf_path)
  108. res = cli.get_object(Bucket=bucket_name, Key=bucket_key)
  109. file_content = res["Body"].read()
  110. return file_content
  111. else:
  112. with open(pdf_path, "rb") as f:
  113. return f.read()
  114. def get_docx_model_output(pdf_model_output, page_id):
  115. model_output_json = pdf_model_output[page_id]
  116. return model_output_json
  117. def list_dir(dir_path:str, s3_profile:str):
  118. """
  119. 列出dir_path下的所有文件
  120. """
  121. ret = []
  122. if dir_path.startswith("s3"):
  123. ak, sk, end_point, addressing_style = parse_aws_param(s3_profile)
  124. s3info = re.findall(r"s3:\/\/([^\/]+)\/(.*)", dir_path)
  125. bucket, path = s3info[0][0], s3info[0][1]
  126. try:
  127. cli = boto3.client(service_name="s3", aws_access_key_id=ak, aws_secret_access_key=sk, endpoint_url=end_point,
  128. config=Config(s3={'addressing_style': addressing_style}))
  129. def list_obj_scluster():
  130. marker = None
  131. while True:
  132. list_kwargs = dict(MaxKeys=1000, Bucket=bucket, Prefix=path)
  133. if marker:
  134. list_kwargs['Marker'] = marker
  135. response = cli.list_objects(**list_kwargs)
  136. contents = response.get("Contents", [])
  137. yield from contents
  138. if not response.get("IsTruncated") or len(contents)==0:
  139. break
  140. marker = contents[-1]['Key']
  141. for info in list_obj_scluster():
  142. file_path = info['Key']
  143. #size = info['Size']
  144. if path!="":
  145. afile = file_path[len(path):]
  146. if afile.endswith(".json"):
  147. ret.append(f"s3://{bucket}/{file_path}")
  148. return ret
  149. except Exception as e:
  150. logger.exception(e)
  151. exit(-1)
  152. else: #本地的目录,那么扫描本地目录并返会这个目录里的所有jsonl文件
  153. for root, dirs, files in os.walk(dir_path):
  154. for file in files:
  155. if file.endswith(".json"):
  156. ret.append(join_path(root, file))
  157. ret.sort()
  158. return ret
  159. def get_img_s3_client(save_path:str, image_s3_config:str):
  160. """
  161. """
  162. if save_path.startswith("s3://"): # 放这里是为了最少创建一个s3 client
  163. ak, sk, end_point, addressing_style = parse_aws_param(image_s3_config)
  164. img_s3_client = boto3.client(
  165. service_name="s3",
  166. aws_access_key_id=ak,
  167. aws_secret_access_key=sk,
  168. endpoint_url=end_point,
  169. config=Config(s3={"addressing_style": addressing_style}, retries={'max_attempts': 5, 'mode': 'standard'}),
  170. )
  171. else:
  172. img_s3_client = None
  173. return img_s3_client
  174. if __name__=="__main__":
  175. s3_path = "s3://llm-pdf-text/layout_det/scihub/scimag07865000-07865999/10.1007/s10729-011-9175-6.pdf/"
  176. s3_profile = "langchao"
  177. ret = list_dir(s3_path, s3_profile)
  178. print(ret)