""" Markdown 生成器模块 提供 Markdown 输出功能: - 完整文档 Markdown 生成 - 按页 Markdown 生成 - MinerU union_make 集成 """ import sys from pathlib import Path from typing import Dict, Any, List from loguru import logger # 导入 MinerU 组件 mineru_path = Path(__file__).parents[3] if str(mineru_path) not in sys.path: sys.path.insert(0, str(mineru_path)) try: from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make from mineru.utils.enum_class import MakeMode MINERU_AVAILABLE = True except ImportError: MINERU_AVAILABLE = False vlm_union_make = None class MakeMode: MM_MD = 'mm_md' NLP_MD = 'nlp_md' class MarkdownGenerator: """Markdown 生成器类""" @staticmethod def save_markdown( results: Dict[str, Any], middle_json: Dict[str, Any], output_dir: Path, doc_name: str, use_mineru_union: bool = False ) -> Path: """ 保存 Markdown 文件 默认使用自定义实现,确保所有元素类型(包括 table_caption 等)都被正确处理 可选使用 MinerU union_make(但它不处理 table_caption 等独立元素) Args: results: 处理结果 middle_json: middle.json 格式数据 output_dir: 输出目录 doc_name: 文档名称 use_mineru_union: 是否使用 MinerU union_make(默认 False) Returns: Markdown 文件路径 """ md_path = output_dir / f"{doc_name}.md" if use_mineru_union and MINERU_AVAILABLE and vlm_union_make is not None: try: img_bucket_path = "images" markdown_content = vlm_union_make( middle_json['pdf_info'], MakeMode.MM_MD, img_bucket_path ) if markdown_content: if isinstance(markdown_content, list): markdown_content = '\n\n'.join(markdown_content) header = MarkdownGenerator._generate_header(results) markdown_content = header + str(markdown_content) with open(md_path, 'w', encoding='utf-8') as f: f.write(markdown_content) logger.info(f"📝 Markdown saved (MinerU format): {md_path}") return md_path except Exception as e: logger.warning(f"MinerU union_make failed: {e}, falling back to custom implementation") # 使用自定义实现,确保所有元素类型都被处理 markdown_content = MarkdownGenerator._generate_full_markdown(results) with open(md_path, 'w', encoding='utf-8') as f: f.write(markdown_content) logger.info(f"📝 Markdown saved (custom format): {md_path}") return md_path @staticmethod def save_page_markdowns( results: Dict[str, Any], output_dir: Path, doc_name: str ) -> List[str]: """ 按页保存 Markdown 文件 Args: results: 处理结果 output_dir: 输出目录 doc_name: 文档名称 Returns: 保存的 Markdown 文件路径列表 """ saved_paths = [] for page in results.get('pages', []): page_idx = page.get('page_idx', 0) page_name = f"{doc_name}_page_{page_idx + 1:03d}" # 生成单页 Markdown md_content = MarkdownGenerator._generate_page_markdown(page, doc_name, page_idx) # 保存 md_path = output_dir / f"{page_name}.md" with open(md_path, 'w', encoding='utf-8') as f: f.write(md_content) saved_paths.append(str(md_path)) logger.debug(f"📝 Page Markdown saved: {md_path}") if saved_paths: logger.info(f"📝 {len(saved_paths)} page Markdowns saved") return saved_paths @staticmethod def _generate_header(results: Dict[str, Any]) -> str: """生成 Markdown 文件头""" return f"""--- scene: {results.get('scene', 'unknown')} document: {results.get('document_path', '')} pages: {len(results.get('pages', []))} --- """ @staticmethod def _generate_full_markdown(results: Dict[str, Any]) -> str: """ 生成完整文档的 Markdown(自定义实现) 确保所有元素类型都被正确处理,包括 table_caption、table_footnote 等 Args: results: 处理结果 Returns: Markdown 内容字符串 """ md_lines = [ f"---", f"scene: {results.get('scene', 'unknown')}", f"document: {results.get('document_path', '')}", f"pages: {len(results.get('pages', []))}", f"---", "", ] for page in results.get('pages', []): # 按阅读顺序处理元素 for element in page.get('elements', []): elem_type = element.get('type', '') content = element.get('content', {}) if elem_type == 'title': text = content.get('text', '') if isinstance(content, dict) else str(content) level = element.get('level', 1) if text: md_lines.append(f"{'#' * min(level, 6)} {text}") md_lines.append("") elif elem_type in ['text', 'ocr_text', 'ref_text']: text = content.get('text', '') if isinstance(content, dict) else str(content) if text: md_lines.append(text) md_lines.append("") elif elem_type in ['table', 'table_body']: html = content.get('html', '') if html: md_lines.append(f"\n{html}\n") md_lines.append("") elif elem_type in ['image', 'image_body', 'figure']: img_filename = content.get('image_path', '') if img_filename: md_lines.append(f"![](images/{img_filename})") md_lines.append("") elif elem_type in ['interline_equation', 'inline_equation', 'equation']: latex = content.get('latex', '') if latex: md_lines.append(f"$$\n{latex}\n$$") md_lines.append("") elif elem_type in ['table_caption', 'table_footnote']: text = content.get('text', '') if isinstance(content, dict) else str(content) if text: if elem_type == 'table_caption': md_lines.append(f"**{text}**") else: md_lines.append(f"*{text}*") md_lines.append("") elif elem_type in ['image_caption', 'image_footnote']: text = content.get('text', '') if isinstance(content, dict) else str(content) if text: if elem_type == 'image_caption': md_lines.append(f"**{text}**") else: md_lines.append(f"*{text}*") md_lines.append("") return '\n'.join(md_lines) @staticmethod def _generate_fallback(results: Dict[str, Any]) -> str: """降级方案:自定义 Markdown 生成""" md_lines = [ f"---", f"scene: {results.get('scene', 'unknown')}", f"document: {results.get('document_path', '')}", f"pages: {len(results.get('pages', []))}", f"---", "", ] for page in results.get('pages', []): for element in page.get('elements', []): elem_type = element.get('type', '') content = element.get('content', {}) bbox = element.get('bbox', []) # 添加 bbox 注释 if bbox: md_lines.append(f"") if elem_type == 'title': text = content.get('text', '') if isinstance(content, dict) else str(content) level = element.get('level', 1) md_lines.append(f"{'#' * min(level, 6)} {text}") md_lines.append("") elif elem_type in ['text', 'ocr_text', 'ref_text']: text = content.get('text', '') if isinstance(content, dict) else str(content) if text: md_lines.append(text) md_lines.append("") elif elem_type in ['table', 'table_body']: # 表格标题 table_captions = content.get('table_caption', []) if isinstance(table_captions, str): table_captions = [table_captions] if table_captions else [] for caption in table_captions: md_lines.append(f"**{caption}**") html = content.get('html', '') if html: md_lines.append(f"\n{html}\n") md_lines.append("") elif elem_type in ['image', 'image_body', 'figure']: img_filename = content.get('image_path', '') if img_filename: md_lines.append(f"![](images/{img_filename})") md_lines.append("") elif elem_type in ['interline_equation', 'inline_equation', 'equation']: latex = content.get('latex', '') if latex: md_lines.append(f"$$\n{latex}\n$$") md_lines.append("") elif elem_type in ['table_caption', 'table_footnote']: text = content.get('text', '') if isinstance(content, dict) else str(content) if text: if elem_type == 'table_caption': md_lines.append(f"**{text}**") else: md_lines.append(f"*{text}*") md_lines.append("") elif elem_type in ['image_caption', 'image_footnote']: text = content.get('text', '') if isinstance(content, dict) else str(content) if text: if elem_type == 'image_caption': md_lines.append(f"**{text}**") else: md_lines.append(f"*{text}*") md_lines.append("") return '\n'.join(md_lines) @staticmethod def _generate_page_markdown( page: Dict[str, Any], doc_name: str, page_idx: int ) -> str: """ 生成单页的 Markdown 内容 Args: page: 页面数据 doc_name: 文档名称 page_idx: 页码索引 Returns: Markdown 内容字符串 """ md_lines = [ f"---", f"document: {doc_name}", f"page: {page_idx + 1}", f"angle: {page.get('angle', 0)}", f"---", "", ] for element in page.get('elements', []): elem_type = element.get('type', '') content = element.get('content', {}) bbox = element.get('bbox', []) reading_order = element.get('reading_order', 0) # 添加元素注释 md_lines.append(f"") if elem_type == 'title': text = content.get('text', '') if isinstance(content, dict) else str(content) level = element.get('level', 1) md_lines.append(f"{'#' * min(level, 6)} {text}") md_lines.append("") elif elem_type in ['text', 'ocr_text', 'ref_text']: text = content.get('text', '') if isinstance(content, dict) else str(content) if text: md_lines.append(text) md_lines.append("") elif elem_type in ['table', 'table_body']: table_captions = content.get('table_caption', []) if isinstance(table_captions, str): table_captions = [table_captions] if table_captions else [] for caption in table_captions: md_lines.append(f"**{caption}**") html = content.get('html', '') if html: md_lines.append(f"\n{html}\n") md_lines.append("") elif elem_type in ['image', 'image_body', 'figure']: img_filename = content.get('image_path', '') if img_filename: md_lines.append(f"![](images/{img_filename})") md_lines.append("") elif elem_type in ['interline_equation', 'inline_equation', 'equation']: latex = content.get('latex', '') if latex: md_lines.append(f"$$\n{latex}\n$$") md_lines.append("") elif elem_type in ['table_caption', 'table_footnote']: text = content.get('text', '') if isinstance(content, dict) else str(content) if text: # 表格标题加粗,表格脚注斜体 if elem_type == 'table_caption': md_lines.append(f"**{text}**") else: md_lines.append(f"*{text}*") md_lines.append("") elif elem_type in ['image_caption', 'image_footnote']: text = content.get('text', '') if isinstance(content, dict) else str(content) if text: # 图片标题加粗,图片脚注斜体 if elem_type == 'image_caption': md_lines.append(f"**{text}**") else: md_lines.append(f"*{text}*") md_lines.append("") elif elem_type == 'discarded': text = content.get('text', '') if isinstance(content, dict) else '' if text: md_lines.append(f"") md_lines.append("") # 处理丢弃元素 for element in page.get('discarded_blocks', []): content = element.get('content', {}) bbox = element.get('bbox', []) reading_order = element.get('reading_order', 0) original_category = element.get('original_category', 'unknown') md_lines.append(f"") text = content.get('text', '') if isinstance(content, dict) else '' if text: md_lines.append(f"") else: md_lines.append(f"") md_lines.append("") return '\n'.join(md_lines)