| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- """
- Markdown 生成器模块
- 提供 Markdown 输出功能:
- - 完整文档 Markdown 生成
- - 按页 Markdown 生成
- - MinerU union_make 集成
- """
- import sys
- from pathlib import Path
- from typing import Dict, Any, List
- from loguru import logger
- # 导入 MinerU 组件
- mineru_path = Path(__file__).parents[3]
- if str(mineru_path) not in sys.path:
- sys.path.insert(0, str(mineru_path))
- try:
- from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make
- from mineru.utils.enum_class import MakeMode
- MINERU_AVAILABLE = True
- except ImportError:
- MINERU_AVAILABLE = False
- vlm_union_make = None
-
- class MakeMode:
- MM_MD = 'mm_md'
- NLP_MD = 'nlp_md'
- class MarkdownGenerator:
- """Markdown 生成器类"""
-
- @staticmethod
- def save_markdown(
- results: Dict[str, Any],
- middle_json: Dict[str, Any],
- output_dir: Path,
- doc_name: str,
- use_mineru_union: bool = False
- ) -> Path:
- """
- 保存 Markdown 文件
-
- 默认使用自定义实现,确保所有元素类型(包括 table_caption 等)都被正确处理
- 可选使用 MinerU union_make(但它不处理 table_caption 等独立元素)
-
- Args:
- results: 处理结果
- middle_json: middle.json 格式数据
- output_dir: 输出目录
- doc_name: 文档名称
- use_mineru_union: 是否使用 MinerU union_make(默认 False)
-
- Returns:
- Markdown 文件路径
- """
- md_path = output_dir / f"{doc_name}.md"
-
- if use_mineru_union and MINERU_AVAILABLE and vlm_union_make is not None:
- try:
- img_bucket_path = "images"
- markdown_content = vlm_union_make(
- middle_json['pdf_info'],
- MakeMode.MM_MD,
- img_bucket_path
- )
-
- if markdown_content:
- if isinstance(markdown_content, list):
- markdown_content = '\n\n'.join(markdown_content)
-
- header = MarkdownGenerator._generate_header(results)
- markdown_content = header + str(markdown_content)
-
- with open(md_path, 'w', encoding='utf-8') as f:
- f.write(markdown_content)
-
- logger.info(f"📝 Markdown saved (MinerU format): {md_path}")
- return md_path
-
- except Exception as e:
- logger.warning(f"MinerU union_make failed: {e}, falling back to custom implementation")
-
- # 使用自定义实现,确保所有元素类型都被处理
- markdown_content = MarkdownGenerator._generate_full_markdown(results)
- with open(md_path, 'w', encoding='utf-8') as f:
- f.write(markdown_content)
-
- logger.info(f"📝 Markdown saved (custom format): {md_path}")
- return md_path
-
- @staticmethod
- def save_page_markdowns(
- results: Dict[str, Any],
- output_dir: Path,
- doc_name: str
- ) -> List[str]:
- """
- 按页保存 Markdown 文件
-
- Args:
- results: 处理结果
- output_dir: 输出目录
- doc_name: 文档名称
-
- Returns:
- 保存的 Markdown 文件路径列表
- """
- saved_paths = []
-
- for page in results.get('pages', []):
- page_idx = page.get('page_idx', 0)
- page_name = f"{doc_name}_page_{page_idx + 1:03d}"
-
- # 生成单页 Markdown
- md_content = MarkdownGenerator._generate_page_markdown(page, doc_name, page_idx)
-
- # 保存
- md_path = output_dir / f"{page_name}.md"
- with open(md_path, 'w', encoding='utf-8') as f:
- f.write(md_content)
-
- saved_paths.append(str(md_path))
- logger.debug(f"📝 Page Markdown saved: {md_path}")
-
- if saved_paths:
- logger.info(f"📝 {len(saved_paths)} page Markdowns saved")
-
- return saved_paths
-
- @staticmethod
- def _generate_header(results: Dict[str, Any]) -> str:
- """生成 Markdown 文件头"""
- return f"""---
- scene: {results.get('scene', 'unknown')}
- document: {results.get('document_path', '')}
- pages: {len(results.get('pages', []))}
- ---
- """
-
- @staticmethod
- def _generate_full_markdown(results: Dict[str, Any]) -> str:
- """
- 生成完整文档的 Markdown(自定义实现)
-
- 确保所有元素类型都被正确处理,包括 table_caption、table_footnote 等
-
- Args:
- results: 处理结果
-
- Returns:
- Markdown 内容字符串
- """
- md_lines = [
- f"---",
- f"scene: {results.get('scene', 'unknown')}",
- f"document: {results.get('document_path', '')}",
- f"pages: {len(results.get('pages', []))}",
- f"---",
- "",
- ]
-
- for page in results.get('pages', []):
- # 按阅读顺序处理元素
- for element in page.get('elements', []):
- elem_type = element.get('type', '')
- content = element.get('content', {})
-
- if elem_type == 'title':
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- level = element.get('level', 1)
- if text:
- md_lines.append(f"{'#' * min(level, 6)} {text}")
- md_lines.append("")
-
- elif elem_type in ['text', 'ocr_text', 'ref_text']:
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- if text:
- md_lines.append(text)
- md_lines.append("")
-
- elif elem_type in ['table', 'table_body']:
- html = content.get('html', '')
- if html:
- md_lines.append(f"\n{html}\n")
- md_lines.append("")
-
- elif elem_type in ['image', 'image_body', 'figure']:
- img_filename = content.get('image_path', '')
- if img_filename:
- md_lines.append(f"")
- md_lines.append("")
-
- elif elem_type in ['interline_equation', 'inline_equation', 'equation']:
- latex = content.get('latex', '')
- if latex:
- md_lines.append(f"$$\n{latex}\n$$")
- md_lines.append("")
-
- elif elem_type in ['table_caption', 'table_footnote']:
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- if text:
- if elem_type == 'table_caption':
- md_lines.append(f"**{text}**")
- else:
- md_lines.append(f"*{text}*")
- md_lines.append("")
-
- elif elem_type in ['image_caption', 'image_footnote']:
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- if text:
- if elem_type == 'image_caption':
- md_lines.append(f"**{text}**")
- else:
- md_lines.append(f"*{text}*")
- md_lines.append("")
-
- return '\n'.join(md_lines)
-
- @staticmethod
- def _generate_fallback(results: Dict[str, Any]) -> str:
- """降级方案:自定义 Markdown 生成"""
- md_lines = [
- f"---",
- f"scene: {results.get('scene', 'unknown')}",
- f"document: {results.get('document_path', '')}",
- f"pages: {len(results.get('pages', []))}",
- f"---",
- "",
- ]
-
- for page in results.get('pages', []):
- for element in page.get('elements', []):
- elem_type = element.get('type', '')
- content = element.get('content', {})
- bbox = element.get('bbox', [])
-
- # 添加 bbox 注释
- if bbox:
- md_lines.append(f"<!-- bbox: {bbox} -->")
-
- if elem_type == 'title':
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- level = element.get('level', 1)
- md_lines.append(f"{'#' * min(level, 6)} {text}")
- md_lines.append("")
-
- elif elem_type in ['text', 'ocr_text', 'ref_text']:
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- if text:
- md_lines.append(text)
- md_lines.append("")
-
- elif elem_type in ['table', 'table_body']:
- # 表格标题
- table_captions = content.get('table_caption', [])
- if isinstance(table_captions, str):
- table_captions = [table_captions] if table_captions else []
- for caption in table_captions:
- md_lines.append(f"**{caption}**")
-
- html = content.get('html', '')
- if html:
- md_lines.append(f"\n{html}\n")
- md_lines.append("")
-
- elif elem_type in ['image', 'image_body', 'figure']:
- img_filename = content.get('image_path', '')
- if img_filename:
- md_lines.append(f"")
- md_lines.append("")
-
- elif elem_type in ['interline_equation', 'inline_equation', 'equation']:
- latex = content.get('latex', '')
- if latex:
- md_lines.append(f"$$\n{latex}\n$$")
- md_lines.append("")
-
- elif elem_type in ['table_caption', 'table_footnote']:
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- if text:
- if elem_type == 'table_caption':
- md_lines.append(f"**{text}**")
- else:
- md_lines.append(f"*{text}*")
- md_lines.append("")
-
- elif elem_type in ['image_caption', 'image_footnote']:
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- if text:
- if elem_type == 'image_caption':
- md_lines.append(f"**{text}**")
- else:
- md_lines.append(f"*{text}*")
- md_lines.append("")
-
- return '\n'.join(md_lines)
-
- @staticmethod
- def _generate_page_markdown(
- page: Dict[str, Any],
- doc_name: str,
- page_idx: int
- ) -> str:
- """
- 生成单页的 Markdown 内容
-
- Args:
- page: 页面数据
- doc_name: 文档名称
- page_idx: 页码索引
-
- Returns:
- Markdown 内容字符串
- """
- md_lines = [
- f"---",
- f"document: {doc_name}",
- f"page: {page_idx + 1}",
- f"angle: {page.get('angle', 0)}",
- f"---",
- "",
- ]
-
- for element in page.get('elements', []):
- elem_type = element.get('type', '')
- content = element.get('content', {})
- bbox = element.get('bbox', [])
- reading_order = element.get('reading_order', 0)
-
- # 添加元素注释
- md_lines.append(f"<!-- reading_order: {reading_order}, bbox: {bbox} -->")
-
- if elem_type == 'title':
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- level = element.get('level', 1)
- md_lines.append(f"{'#' * min(level, 6)} {text}")
- md_lines.append("")
-
- elif elem_type in ['text', 'ocr_text', 'ref_text']:
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- if text:
- md_lines.append(text)
- md_lines.append("")
-
- elif elem_type in ['table', 'table_body']:
- table_captions = content.get('table_caption', [])
- if isinstance(table_captions, str):
- table_captions = [table_captions] if table_captions else []
- for caption in table_captions:
- md_lines.append(f"**{caption}**")
-
- html = content.get('html', '')
- if html:
- md_lines.append(f"\n{html}\n")
- md_lines.append("")
-
- elif elem_type in ['image', 'image_body', 'figure']:
- img_filename = content.get('image_path', '')
- if img_filename:
- md_lines.append(f"")
- md_lines.append("")
-
- elif elem_type in ['interline_equation', 'inline_equation', 'equation']:
- latex = content.get('latex', '')
- if latex:
- md_lines.append(f"$$\n{latex}\n$$")
- md_lines.append("")
-
- elif elem_type in ['table_caption', 'table_footnote']:
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- if text:
- # 表格标题加粗,表格脚注斜体
- if elem_type == 'table_caption':
- md_lines.append(f"**{text}**")
- else:
- md_lines.append(f"*{text}*")
- md_lines.append("")
-
- elif elem_type in ['image_caption', 'image_footnote']:
- text = content.get('text', '') if isinstance(content, dict) else str(content)
- if text:
- # 图片标题加粗,图片脚注斜体
- if elem_type == 'image_caption':
- md_lines.append(f"**{text}**")
- else:
- md_lines.append(f"*{text}*")
- md_lines.append("")
-
- elif elem_type == 'discarded':
- text = content.get('text', '') if isinstance(content, dict) else ''
- if text:
- md_lines.append(f"<!-- [discarded: {element.get('original_category', 'unknown')}] {text} -->")
- md_lines.append("")
-
- # 处理丢弃元素
- for element in page.get('discarded_blocks', []):
- content = element.get('content', {})
- bbox = element.get('bbox', [])
- reading_order = element.get('reading_order', 0)
- original_category = element.get('original_category', 'unknown')
-
- md_lines.append(f"<!-- reading_order: {reading_order}, bbox: {bbox} -->")
- text = content.get('text', '') if isinstance(content, dict) else ''
- if text:
- md_lines.append(f"<!-- [discarded: {original_category}] {text} -->")
- else:
- md_lines.append(f"<!-- [discarded: {original_category}] (no text) -->")
- md_lines.append("")
-
- return '\n'.join(md_lines)
|