markdown_generator.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. """
  2. Markdown 生成模块
  3. 负责将合并后的数据生成 Markdown 文件
  4. """
  5. import shutil
  6. from pathlib import Path
  7. from typing import List, Dict, Optional
  8. class MarkdownGenerator:
  9. """Markdown 生成器"""
  10. @staticmethod
  11. def detect_data_format(merged_data: List[Dict]) -> str:
  12. """
  13. 检测数据格式
  14. Returns:
  15. 'mineru' 或 'paddleocr_vl'
  16. """
  17. if not merged_data:
  18. return 'mineru'
  19. first_item = merged_data[0]
  20. # 检查是否有 PaddleOCR_VL 特有字段
  21. if 'block_label' in first_item and 'block_content' in first_item:
  22. return 'paddleocr_vl'
  23. # 检查是否有 MinerU 特有字段
  24. if 'type' in first_item and ('table_body' in first_item or 'text' in first_item):
  25. return 'mineru'
  26. # 默认按 MinerU 格式处理
  27. return 'mineru'
  28. @staticmethod
  29. def generate_enhanced_markdown(merged_data: List[Dict],
  30. output_path: Optional[str] = None,
  31. source_file: Optional[str] = None,
  32. data_format: Optional[str] = None) -> str:
  33. """
  34. 生成增强的 Markdown(包含 bbox 信息的注释)
  35. Args:
  36. merged_data: 合并后的数据
  37. output_path: 输出路径
  38. source_file: 源文件路径(用于复制图片)
  39. data_format: 数据格式 ('mineru' 或 'paddleocr_vl'),None 则自动检测
  40. Returns:
  41. Markdown 内容
  42. """
  43. # ✅ 自动检测数据格式
  44. if data_format is None:
  45. data_format = MarkdownGenerator.detect_data_format(merged_data)
  46. print(f"ℹ️ 检测到数据格式: {data_format}")
  47. # ✅ 根据格式选择处理函数
  48. if data_format == 'paddleocr_vl':
  49. return MarkdownGenerator._generate_paddleocr_vl_markdown(
  50. merged_data, output_path, source_file
  51. )
  52. else:
  53. return MarkdownGenerator._generate_mineru_markdown(
  54. merged_data, output_path, source_file
  55. )
  56. @staticmethod
  57. def _generate_mineru_markdown(merged_data: List[Dict],
  58. output_path: Optional[str] = None,
  59. source_file: Optional[str] = None) -> str:
  60. """生成 MinerU 格式的 Markdown"""
  61. md_lines = []
  62. for item in merged_data:
  63. item_type = item.get('type', '')
  64. if item_type == 'title':
  65. md_lines.extend(MarkdownGenerator._format_mineru_title(item))
  66. elif item_type == 'text':
  67. md_lines.extend(MarkdownGenerator._format_mineru_text(item))
  68. elif item_type == 'list':
  69. md_lines.extend(MarkdownGenerator._format_mineru_list(item))
  70. elif item_type == 'table':
  71. md_lines.extend(MarkdownGenerator._format_mineru_table(item))
  72. elif item_type == 'image':
  73. md_lines.extend(MarkdownGenerator._format_mineru_image(
  74. item, output_path, source_file
  75. ))
  76. elif item_type in ['equation', 'interline_equation']:
  77. md_lines.extend(MarkdownGenerator._format_equation(item))
  78. elif item_type == 'inline_equation':
  79. md_lines.extend(MarkdownGenerator._format_inline_equation(item))
  80. elif item_type == 'header':
  81. md_lines.extend(MarkdownGenerator._format_mineru_header(item))
  82. elif item_type == 'footer':
  83. md_lines.extend(MarkdownGenerator._format_mineru_footer(item))
  84. elif item_type == 'page_number':
  85. md_lines.extend(MarkdownGenerator._format_mineru_page_number(item))
  86. elif item_type == 'ref_text':
  87. md_lines.extend(MarkdownGenerator._format_reference(item))
  88. else:
  89. md_lines.extend(MarkdownGenerator._format_unknown(item))
  90. markdown_content = '\n'.join(md_lines)
  91. if output_path:
  92. with open(output_path, 'w', encoding='utf-8') as f:
  93. f.write(markdown_content)
  94. return markdown_content
  95. @staticmethod
  96. def _generate_paddleocr_vl_markdown(merged_data: List[Dict],
  97. output_path: Optional[str] = None,
  98. source_file: Optional[str] = None) -> str:
  99. """生成 PaddleOCR_VL 格式的 Markdown"""
  100. md_lines = []
  101. for item in merged_data:
  102. block_label = item.get('block_label', '')
  103. if 'title' in block_label:
  104. md_lines.extend(MarkdownGenerator._format_paddleocr_vl_title(item))
  105. elif block_label == 'text':
  106. md_lines.extend(MarkdownGenerator._format_paddleocr_vl_text(item))
  107. elif block_label == 'table':
  108. md_lines.extend(MarkdownGenerator._format_paddleocr_vl_table(item))
  109. elif block_label == 'image':
  110. md_lines.extend(MarkdownGenerator._format_paddleocr_vl_figure(item))
  111. elif block_label == 'equation':
  112. md_lines.extend(MarkdownGenerator._format_paddleocr_vl_equation(item))
  113. elif block_label == 'reference':
  114. md_lines.extend(MarkdownGenerator._format_paddleocr_vl_reference(item))
  115. else:
  116. md_lines.extend(MarkdownGenerator._format_paddleocr_vl_unknown(item))
  117. markdown_content = '\n'.join(md_lines)
  118. if output_path:
  119. with open(output_path, 'w', encoding='utf-8') as f:
  120. f.write(markdown_content)
  121. return markdown_content
  122. # ================== MinerU 格式化方法 ==================
  123. @staticmethod
  124. def _format_mineru_title(item: Dict) -> List[str]:
  125. """格式化 MinerU 标题"""
  126. lines = []
  127. bbox = item.get('bbox', [])
  128. if bbox:
  129. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  130. text = item.get('text', '')
  131. text_level = item.get('text_level', 1)
  132. heading = '#' * min(text_level, 6)
  133. lines.append(f"{heading} {text}\n")
  134. return lines
  135. @staticmethod
  136. def _format_mineru_text(item: Dict) -> List[str]:
  137. """格式化 MinerU 文本"""
  138. lines = []
  139. bbox = item.get('bbox', [])
  140. if bbox:
  141. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  142. text = item.get('text', '')
  143. text_level = item.get('text_level', 0)
  144. if text_level > 0:
  145. heading = '#' * min(text_level, 6)
  146. lines.append(f"{heading} {text}\n")
  147. else:
  148. lines.append(f"{text}\n")
  149. return lines
  150. @staticmethod
  151. def _format_mineru_list(item: Dict) -> List[str]:
  152. """格式化 MinerU 列表"""
  153. lines = []
  154. bbox = item.get('bbox', [])
  155. if bbox:
  156. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  157. list_items = item.get('list_items', [])
  158. for list_item in list_items:
  159. lines.append(f"{list_item}\n")
  160. lines.append("")
  161. return lines
  162. @staticmethod
  163. def _format_mineru_table(item: Dict) -> List[str]:
  164. """格式化 MinerU 表格"""
  165. lines = []
  166. bbox = item.get('bbox', [])
  167. if bbox:
  168. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  169. # 表格标题
  170. table_caption = item.get('table_caption', [])
  171. for caption in table_caption:
  172. if caption:
  173. lines.append(f"**{caption}**\n")
  174. # 表格内容
  175. table_body = item.get('table_body_with_bbox', item.get('table_body', ''))
  176. if table_body:
  177. lines.append(table_body)
  178. lines.append("")
  179. # 表格脚注
  180. table_footnote = item.get('table_footnote', [])
  181. for footnote in table_footnote:
  182. if footnote:
  183. lines.append(f"*{footnote}*")
  184. if table_footnote:
  185. lines.append("")
  186. return lines
  187. @staticmethod
  188. def _format_mineru_image(item: Dict, output_path: Optional[str],
  189. source_file: Optional[str]) -> List[str]:
  190. """格式化 MinerU 图片"""
  191. lines = []
  192. bbox = item.get('bbox', [])
  193. if bbox:
  194. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  195. img_path = item.get('img_path', '')
  196. # 复制图片
  197. if img_path and source_file and output_path:
  198. MarkdownGenerator._copy_image(img_path, source_file, output_path)
  199. # 图片标题
  200. image_caption = item.get('image_caption', [])
  201. for caption in image_caption:
  202. if caption:
  203. lines.append(f"**{caption}**\n")
  204. lines.append(f"![Image]({img_path})\n")
  205. # 图片脚注
  206. image_footnote = item.get('image_footnote', [])
  207. for footnote in image_footnote:
  208. if footnote:
  209. lines.append(f"*{footnote}*")
  210. if image_footnote:
  211. lines.append("")
  212. return lines
  213. @staticmethod
  214. def _format_mineru_header(item: Dict) -> List[str]:
  215. """格式化MinerU header"""
  216. lines = []
  217. bbox = item.get('bbox', [])
  218. if bbox:
  219. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  220. text = item.get('text', '')
  221. lines.append(f"<!-- 页眉: {text} -->\n")
  222. return lines
  223. @staticmethod
  224. def _format_mineru_footer(item: Dict) -> List[str]:
  225. """格式化MinerU footer"""
  226. lines = []
  227. bbox = item.get('bbox', [])
  228. if bbox:
  229. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  230. text = item.get('text', '')
  231. lines.append(f"<!-- 页脚: {text} -->\n")
  232. return lines
  233. @staticmethod
  234. def _format_mineru_page_number(item: Dict) -> List[str]:
  235. """格式化MinerU page_number"""
  236. lines = []
  237. bbox = item.get('bbox', [])
  238. if bbox:
  239. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  240. text = item.get('text', '')
  241. lines.append(f"<!-- 页码: {text} -->\n")
  242. return lines
  243. # ================== PaddleOCR_VL 格式化方法 ==================
  244. @staticmethod
  245. def _format_paddleocr_vl_title(item: Dict) -> List[str]:
  246. """格式化 PaddleOCR_VL 标题"""
  247. lines = []
  248. bbox = item.get('block_bbox', [])
  249. if bbox:
  250. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  251. text = item.get('block_content', '')
  252. block_label = item.get('block_label', '')
  253. # 根据 block_label 确定标题级别
  254. level_map = {
  255. 'paragraph_title': 1,
  256. 'figure_title': 2,
  257. 'title': 1
  258. }
  259. text_level = level_map.get(block_label, 1)
  260. heading = '#' * min(text_level, 6)
  261. lines.append(f"{heading} {text}\n")
  262. return lines
  263. @staticmethod
  264. def _format_paddleocr_vl_text(item: Dict) -> List[str]:
  265. """格式化 PaddleOCR_VL 文本"""
  266. lines = []
  267. bbox = item.get('block_bbox', [])
  268. if bbox:
  269. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  270. text = item.get('block_content', '')
  271. lines.append(f"{text}\n")
  272. return lines
  273. @staticmethod
  274. def _format_paddleocr_vl_table(item: Dict) -> List[str]:
  275. """格式化 PaddleOCR_VL 表格"""
  276. lines = []
  277. bbox = item.get('block_bbox', [])
  278. if bbox:
  279. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  280. # 表格内容
  281. table_content = item.get('block_content_with_bbox',
  282. item.get('block_content', ''))
  283. if table_content:
  284. lines.append(table_content)
  285. lines.append("")
  286. return lines
  287. @staticmethod
  288. def _format_paddleocr_vl_figure(item: Dict) -> List[str]:
  289. """格式化 PaddleOCR_VL 图片"""
  290. lines = []
  291. bbox = item.get('block_bbox', [])
  292. if bbox:
  293. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  294. # PaddleOCR_VL 图片信息在 block_content 中
  295. content = item.get('block_content', '')
  296. lines.append(f"![Figure]({content})\n")
  297. return lines
  298. @staticmethod
  299. def _format_paddleocr_vl_equation(item: Dict) -> List[str]:
  300. """格式化 PaddleOCR_VL 公式"""
  301. lines = []
  302. bbox = item.get('block_bbox', [])
  303. if bbox:
  304. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  305. latex = item.get('block_content', '')
  306. if latex:
  307. lines.append(f"$$\n{latex}\n$$\n")
  308. return lines
  309. @staticmethod
  310. def _format_paddleocr_vl_reference(item: Dict) -> List[str]:
  311. """格式化 PaddleOCR_VL 参考文献"""
  312. text = item.get('block_content', '')
  313. return [f"> {text}\n"]
  314. @staticmethod
  315. def _format_paddleocr_vl_unknown(item: Dict) -> List[str]:
  316. """格式化 PaddleOCR_VL 未知类型"""
  317. lines = []
  318. bbox = item.get('block_bbox', [])
  319. if bbox:
  320. lines.append(MarkdownGenerator._add_bbox_comment(bbox))
  321. text = item.get('block_content', '')
  322. if text:
  323. lines.append(f"{text}\n")
  324. return lines
  325. # ================== 通用方法 ==================
  326. @staticmethod
  327. def _add_bbox_comment(bbox: List) -> str:
  328. """添加 bbox 注释"""
  329. return f"<!-- bbox: {bbox} -->"
  330. @staticmethod
  331. def _format_equation(item: Dict) -> List[str]:
  332. """格式化公式(通用)"""
  333. latex = item.get('latex', '')
  334. if latex:
  335. return [f"$$\n{latex}\n$$\n"]
  336. return []
  337. @staticmethod
  338. def _format_inline_equation(item: Dict) -> List[str]:
  339. """格式化行内公式(通用)"""
  340. latex = item.get('latex', '')
  341. if latex:
  342. return [f"${latex}$\n"]
  343. return []
  344. @staticmethod
  345. def _format_metadata(item: Dict, item_type: str) -> List[str]:
  346. """格式化元数据(通用)"""
  347. text = item.get('text', '')
  348. type_map = {
  349. 'page_number': '页码',
  350. 'header': '页眉',
  351. 'footer': '页脚'
  352. }
  353. if text:
  354. return [f"<!-- {type_map.get(item_type, item_type)}: {text} -->\n"]
  355. return []
  356. @staticmethod
  357. def _format_reference(item: Dict) -> List[str]:
  358. """格式化参考文献(MinerU)"""
  359. text = item.get('text', '')
  360. return [f"> {text}\n"]
  361. @staticmethod
  362. def _format_unknown(item: Dict) -> List[str]:
  363. """格式化未知类型(MinerU)"""
  364. text = item.get('text', '')
  365. if text:
  366. return [f"{text}\n"]
  367. return []
  368. @staticmethod
  369. def _copy_image(img_path: str, source_file: str, output_path: str):
  370. """复制图片到输出目录"""
  371. source_dir = Path(source_file).parent
  372. img_full_path = source_dir / img_path
  373. if img_full_path.exists():
  374. output_img_path = Path(output_path).parent / img_path
  375. output_img_path.parent.mkdir(parents=True, exist_ok=True)
  376. shutil.copy(img_full_path, output_img_path)