markdown_generator.py 15 KB


  1. """
  2. Markdown 生成器模块
  3. 提供 Markdown 输出功能:
  4. - 完整文档 Markdown 生成
  5. - 按页 Markdown 生成
  6. - MinerU union_make 集成
  7. - 金额数字标准化(全角→半角)
  8. """
  9. import sys
  10. from pathlib import Path
  11. from typing import Dict, Any, List, Tuple, Optional
  12. from loguru import logger
  13. # 导入 MinerU 组件
  14. mineru_path = Path(__file__).parents[3]
  15. if str(mineru_path) not in sys.path:
  16. sys.path.insert(0, str(mineru_path))
  17. try:
  18. from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make
  19. from mineru.utils.enum_class import MakeMode
  20. MINERU_AVAILABLE = True
  21. except ImportError:
  22. MINERU_AVAILABLE = False
  23. vlm_union_make = None
  24. class MakeMode:
  25. MM_MD = 'mm_md'
  26. NLP_MD = 'nlp_md'
  27. # 导入数字标准化工具
  28. from .normalize_financial_numbers import normalize_markdown_table
  29. class MarkdownGenerator:
  30. """Markdown 生成器类"""
  31. @staticmethod
  32. def save_markdown(
  33. results: Dict[str, Any],
  34. middle_json: Dict[str, Any],
  35. output_dir: Path,
  36. doc_name: str,
  37. use_mineru_union: bool = False,
  38. normalize_numbers: bool = True
  39. ) -> Tuple[Path, Optional[Path]]:
  40. """
  41. 保存 Markdown 文件
  42. 默认使用自定义实现,确保所有元素类型(包括 table_caption 等)都被正确处理
  43. 可选使用 MinerU union_make(但它不处理 table_caption 等独立元素)
  44. Args:
  45. results: 处理结果
  46. middle_json: middle.json 格式数据
  47. output_dir: 输出目录
  48. doc_name: 文档名称
  49. use_mineru_union: 是否使用 MinerU union_make(默认 False)
  50. normalize_numbers: 是否标准化金额数字(全角→半角)
  51. Returns:
  52. (Markdown 文件路径, 原始文件路径 或 None)
  53. """
  54. md_path = output_dir / f"{doc_name}.md"
  55. original_path = None
  56. if use_mineru_union and MINERU_AVAILABLE and vlm_union_make is not None:
  57. try:
  58. img_bucket_path = "images"
  59. markdown_content = vlm_union_make(
  60. middle_json['pdf_info'],
  61. MakeMode.MM_MD,
  62. img_bucket_path
  63. )
  64. if markdown_content:
  65. if isinstance(markdown_content, list):
  66. markdown_content = '\n\n'.join(markdown_content)
  67. header = MarkdownGenerator._generate_header(results)
  68. markdown_content = header + str(markdown_content)
  69. # 金额数字标准化
  70. if normalize_numbers:
  71. original_content = markdown_content
  72. markdown_content = normalize_markdown_table(markdown_content)
  73. if markdown_content != original_content:
  74. original_path = output_dir / f"{doc_name}_original.md"
  75. with open(original_path, 'w', encoding='utf-8') as f:
  76. f.write(original_content)
  77. logger.info(f"📝 Original Markdown saved: {original_path}")
  78. with open(md_path, 'w', encoding='utf-8') as f:
  79. f.write(markdown_content)
  80. logger.info(f"📝 Markdown saved (MinerU format): {md_path}")
  81. return md_path, original_path
  82. except Exception as e:
  83. logger.warning(f"MinerU union_make failed: {e}, falling back to custom implementation")
  84. # 使用自定义实现,确保所有元素类型都被处理
  85. markdown_content = MarkdownGenerator._generate_full_markdown(results)
  86. # 金额数字标准化
  87. if normalize_numbers:
  88. original_content = markdown_content
  89. markdown_content = normalize_markdown_table(markdown_content)
  90. if markdown_content != original_content:
  91. original_path = output_dir / f"{doc_name}_original.md"
  92. with open(original_path, 'w', encoding='utf-8') as f:
  93. f.write(original_content)
  94. logger.info(f"📝 Original Markdown saved: {original_path}")
  95. with open(md_path, 'w', encoding='utf-8') as f:
  96. f.write(markdown_content)
  97. logger.info(f"📝 Markdown saved (custom format): {md_path}")
  98. return md_path, original_path
  99. @staticmethod
  100. def save_page_markdowns(
  101. results: Dict[str, Any],
  102. output_dir: Path,
  103. doc_name: str,
  104. is_pdf: bool = True,
  105. normalize_numbers: bool = True
  106. ) -> List[str]:
  107. """
  108. 按页保存 Markdown 文件
  109. 命名规则:
  110. - PDF输入: 文件名_page_001.md
  111. - 图片输入(单页): 文件名.md(跳过,因为已有完整版)
  112. Args:
  113. results: 处理结果
  114. output_dir: 输出目录
  115. doc_name: 文档名称
  116. is_pdf: 是否为 PDF 输入
  117. normalize_numbers: 是否标准化金额数字(全角→半角)
  118. Returns:
  119. 保存的 Markdown 文件路径列表
  120. """
  121. saved_paths = []
  122. total_pages = len(results.get('pages', []))
  123. # 单个图片输入时,跳过按页保存(因为已有完整版 doc_name.md)
  124. if not is_pdf and total_pages == 1:
  125. logger.debug("📝 Single image input, skipping page markdown (full version exists)")
  126. return saved_paths
  127. for page in results.get('pages', []):
  128. page_idx = page.get('page_idx', 0)
  129. # 根据输入类型决定命名
  130. if is_pdf or total_pages > 1:
  131. page_name = f"{doc_name}_page_{page_idx + 1:03d}"
  132. else:
  133. page_name = doc_name
  134. # 生成单页 Markdown
  135. md_content = MarkdownGenerator._generate_page_markdown(page, doc_name, page_idx)
  136. # 金额数字标准化
  137. if normalize_numbers:
  138. original_content = md_content
  139. md_content = normalize_markdown_table(md_content)
  140. if md_content != original_content:
  141. original_path = output_dir / f"{page_name}_original.md"
  142. with open(original_path, 'w', encoding='utf-8') as f:
  143. f.write(original_content)
  144. logger.debug(f"📝 Original page Markdown saved: {original_path}")
  145. # 保存
  146. md_path = output_dir / f"{page_name}.md"
  147. with open(md_path, 'w', encoding='utf-8') as f:
  148. f.write(md_content)
  149. saved_paths.append(str(md_path))
  150. logger.debug(f"📝 Page Markdown saved: {md_path}")
  151. if saved_paths:
  152. logger.info(f"📝 {len(saved_paths)} page Markdowns saved")
  153. return saved_paths
  154. @staticmethod
  155. def _generate_header(results: Dict[str, Any]) -> str:
  156. """生成 Markdown 文件头"""
  157. return f"""<!--
  158. scene: {results.get('scene', 'unknown')}
  159. document: {results.get('document_path', '')}
  160. pages: {len(results.get('pages', []))}
  161. -->
  162. """
  163. @staticmethod
  164. def _generate_full_markdown(results: Dict[str, Any]) -> str:
  165. """
  166. 生成完整文档的 Markdown(自定义实现)
  167. 确保所有元素类型都被正确处理,包括 table_caption、table_footnote 等
  168. Args:
  169. results: 处理结果
  170. Returns:
  171. Markdown 内容字符串
  172. """
  173. md_lines = [
  174. f"<!-- ",
  175. f"scene: {results.get('scene', 'unknown')}",
  176. f"document: {results.get('document_path', '')}",
  177. f"pages: {len(results.get('pages', []))}",
  178. f"-->",
  179. "",
  180. ]
  181. for page in results.get('pages', []):
  182. # 按阅读顺序处理元素
  183. for element in page.get('elements', []):
  184. elem_type = element.get('type', '')
  185. content = element.get('content', {})
  186. if elem_type == 'title':
  187. text = content.get('text', '') if isinstance(content, dict) else str(content)
  188. level = element.get('level', 1)
  189. if text:
  190. md_lines.append(f"{'#' * min(level, 6)} {text}")
  191. md_lines.append("")
  192. elif elem_type in ['text', 'ocr_text', 'ref_text']:
  193. text = content.get('text', '') if isinstance(content, dict) else str(content)
  194. if text:
  195. md_lines.append(text)
  196. md_lines.append("")
  197. elif elem_type in ['table', 'table_body']:
  198. html = content.get('html', '')
  199. if html:
  200. md_lines.append(f"\n{html}\n")
  201. md_lines.append("")
  202. elif elem_type in ['image', 'image_body', 'figure']:
  203. img_filename = content.get('image_path', '')
  204. if img_filename:
  205. md_lines.append(f"![](images/{img_filename})")
  206. md_lines.append("")
  207. elif elem_type in ['interline_equation', 'inline_equation', 'equation']:
  208. latex = content.get('latex', '')
  209. if latex:
  210. md_lines.append(f"$$\n{latex}\n$$")
  211. md_lines.append("")
  212. elif elem_type in ['table_caption', 'table_footnote']:
  213. text = content.get('text', '') if isinstance(content, dict) else str(content)
  214. if text:
  215. if elem_type == 'table_caption':
  216. md_lines.append(f"**{text}**")
  217. else:
  218. md_lines.append(f"*{text}*")
  219. md_lines.append("")
  220. elif elem_type in ['image_caption', 'image_footnote']:
  221. text = content.get('text', '') if isinstance(content, dict) else str(content)
  222. if text:
  223. if elem_type == 'image_caption':
  224. md_lines.append(f"**{text}**")
  225. else:
  226. md_lines.append(f"*{text}*")
  227. md_lines.append("")
  228. return '\n'.join(md_lines)
  229. @staticmethod
  230. def _generate_page_markdown(
  231. page: Dict[str, Any],
  232. doc_name: str,
  233. page_idx: int
  234. ) -> str:
  235. """
  236. 生成单页的 Markdown 内容
  237. Args:
  238. page: 页面数据
  239. doc_name: 文档名称
  240. page_idx: 页码索引
  241. Returns:
  242. Markdown 内容字符串
  243. """
  244. md_lines = [
  245. f"<!--",
  246. f"document: {doc_name}",
  247. f"page: {page_idx + 1}",
  248. f"angle: {page.get('angle', 0)}",
  249. f"-->",
  250. "",
  251. ]
  252. md_lines.append("")
  253. for element in page.get('elements', []):
  254. elem_type = element.get('type', '')
  255. content = element.get('content', {})
  256. bbox = element.get('bbox', [])
  257. reading_order = element.get('reading_order', 0)
  258. # 添加元素注释
  259. md_lines.append(f"<!-- reading_order: {reading_order}, bbox: {bbox} -->")
  260. if elem_type == 'title':
  261. text = content.get('text', '') if isinstance(content, dict) else str(content)
  262. level = element.get('level', 1)
  263. md_lines.append(f"{'#' * min(level, 6)} {text}")
  264. md_lines.append("")
  265. elif elem_type in ['text', 'ocr_text', 'ref_text']:
  266. text = content.get('text', '') if isinstance(content, dict) else str(content)
  267. if text:
  268. md_lines.append(text)
  269. md_lines.append("")
  270. elif elem_type in ['table', 'table_body']:
  271. table_captions = content.get('table_caption', [])
  272. if isinstance(table_captions, str):
  273. table_captions = [table_captions] if table_captions else []
  274. for caption in table_captions:
  275. md_lines.append(f"**{caption}**")
  276. html = content.get('html', '')
  277. if html:
  278. md_lines.append(f"\n{html}\n")
  279. md_lines.append("")
  280. elif elem_type in ['image', 'image_body', 'figure']:
  281. img_filename = content.get('image_path', '')
  282. if img_filename:
  283. md_lines.append(f"![](images/{img_filename})")
  284. md_lines.append("")
  285. elif elem_type in ['interline_equation', 'inline_equation', 'equation']:
  286. latex = content.get('latex', '')
  287. if latex:
  288. md_lines.append(f"$$\n{latex}\n$$")
  289. md_lines.append("")
  290. elif elem_type in ['table_caption', 'table_footnote']:
  291. text = content.get('text', '') if isinstance(content, dict) else str(content)
  292. if text:
  293. # 表格标题加粗,表格脚注斜体
  294. if elem_type == 'table_caption':
  295. md_lines.append(f"**{text}**")
  296. else:
  297. md_lines.append(f"*{text}*")
  298. md_lines.append("")
  299. elif elem_type in ['image_caption', 'image_footnote']:
  300. text = content.get('text', '') if isinstance(content, dict) else str(content)
  301. if text:
  302. # 图片标题加粗,图片脚注斜体
  303. if elem_type == 'image_caption':
  304. md_lines.append(f"**{text}**")
  305. else:
  306. md_lines.append(f"*{text}*")
  307. md_lines.append("")
  308. elif elem_type == 'discarded':
  309. text = content.get('text', '') if isinstance(content, dict) else ''
  310. if text:
  311. md_lines.append(f"<!-- [discarded: {element.get('original_category', 'unknown')}] {text} -->")
  312. md_lines.append("")
  313. # 处理丢弃元素
  314. for element in page.get('discarded_blocks', []):
  315. content = element.get('content', {})
  316. bbox = element.get('bbox', [])
  317. reading_order = element.get('reading_order', 0)
  318. original_category = element.get('original_category', 'unknown')
  319. md_lines.append(f"<!-- reading_order: {reading_order}, bbox: {bbox} -->")
  320. text = content.get('text', '') if isinstance(content, dict) else ''
  321. if text:
  322. md_lines.append(f"<!-- [discarded: {original_category}] {text} -->")
  323. else:
  324. md_lines.append(f"<!-- [discarded: {original_category}] (no text) -->")
  325. md_lines.append("")
  326. return '\n'.join(md_lines)