|
|
@@ -6,10 +6,12 @@ from conf import conf
|
|
|
from lib import common
|
|
|
import time
|
|
|
import magic_pdf.model as model_config
|
|
|
-import os
|
|
|
-from magic_pdf.data.data_reader_writer import FileBasedDataWriter
|
|
|
+from magic_pdf.data.read_api import read_local_images
|
|
|
+from magic_pdf.pipe.UNIPipe import UNIPipe
|
|
|
+from magic_pdf.data.read_api import read_local_office
|
|
|
from magic_pdf.data.data_reader_writer import S3DataReader, S3DataWriter
|
|
|
from magic_pdf.config.make_content_config import DropMode, MakeMode
|
|
|
+from magic_pdf.pipe.OCRPipe import OCRPipe
|
|
|
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
|
|
|
from magic_pdf.data.dataset import PymuDocDataset
|
|
|
from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
|
|
|
@@ -33,7 +35,7 @@ class TestCli:
|
|
|
yield
|
|
|
|
|
|
@pytest.mark.P0
|
|
|
- def test_pdf_auto_sdk(self):
|
|
|
+ def test_pdf_local_sdk(self):
|
|
|
"""pdf sdk auto test."""
|
|
|
demo_names = list()
|
|
|
pdf_path = os.path.join(pdf_dev_path, 'pdf')
|
|
|
@@ -44,6 +46,63 @@ class TestCli:
|
|
|
pdf_path = os.path.join(pdf_dev_path, 'pdf', f'{demo_name}.pdf')
|
|
|
local_image_dir = os.path.join(pdf_dev_path, 'pdf', 'images')
|
|
|
image_dir = str(os.path.basename(local_image_dir))
|
|
|
+ name_without_suff = os.path.basename(pdf_path).split(".pdf")[0]
|
|
|
+ dir_path = os.path.join(pdf_dev_path, 'mineru')
|
|
|
+ image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(dir_path)
|
|
|
+ reader1 = FileBasedDataReader("")
|
|
|
+ pdf_bytes = reader1.read(pdf_path)
|
|
|
+ ds = PymuDocDataset(pdf_bytes)
|
|
|
+ ## inference
|
|
|
+ if ds.classify() == SupportedPdfParseMethod.OCR:
|
|
|
+ infer_result = ds.apply(doc_analyze, ocr=True)
|
|
|
+ ## pipeline
|
|
|
+ pipe_result = infer_result.pipe_ocr_mode(image_writer)
|
|
|
+ else:
|
|
|
+ infer_result = ds.apply(doc_analyze, ocr=False)
|
|
|
+ ## pipeline
|
|
|
+ pipe_result = infer_result.pipe_txt_mode(image_writer)
|
|
|
+ common.delete_file(dir_path)
|
|
|
+ ### draw model result on each page
|
|
|
+ infer_result.draw_model(os.path.join(dir_path, f"{name_without_suff}_model.pdf"))
|
|
|
+
|
|
|
+ ### get model inference result
|
|
|
+ model_inference_result = infer_result.get_infer_res()
|
|
|
+
|
|
|
+ ### draw layout result on each page
|
|
|
+ pipe_result.draw_layout(os.path.join(dir_path, f"{name_without_suff}_layout.pdf"))
|
|
|
+
|
|
|
+ ### draw spans result on each page
|
|
|
+ pipe_result.draw_span(os.path.join(dir_path, f"{name_without_suff}_spans.pdf"))
|
|
|
+
|
|
|
+ ### dump markdown
|
|
|
+ pipe_result.dump_md(md_writer, f"{name_without_suff}.md", image_dir)
|
|
|
+
|
|
|
+ ### dump content list
|
|
|
+ pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir)
|
|
|
+
|
|
|
+ ### get markdown content
|
|
|
+ md_content = pipe_result.get_markdown(image_dir, drop_mode=DropMode.WHOLE_PDF, md_make_mode=MakeMode.MM_MD)
|
|
|
+
|
|
|
+ ### get content list content
|
|
|
+ content_list_content = pipe_result.get_content_list(image_dir, drop_mode=DropMode.NONE, md_make_mode=MakeMode.STANDARD_FORMAT)
|
|
|
+
|
|
|
+ ### get middle json
|
|
|
+ middle_json_content = pipe_result.get_middle_json()
|
|
|
+ common.sdk_count_folders_and_check_contents(dir_path)
|
|
|
+
|
|
|
+ @pytest.mark.P0
|
|
|
+ def test_pdf_s3_sdk(self):
|
|
|
+ """pdf s3 sdk test."""
|
|
|
+ demo_names = list()
|
|
|
+ pdf_path = os.path.join(pdf_dev_path, 'pdf')
|
|
|
+ for pdf_file in os.listdir(pdf_path):
|
|
|
+ if pdf_file.endswith('.pdf'):
|
|
|
+ demo_names.append(pdf_file.split('.')[0])
|
|
|
+ for demo_name in demo_names:
|
|
|
+ pdf_path = os.path.join(pdf_dev_path, 'pdf', f'{demo_name}.pdf')
|
|
|
+ local_image_dir = os.path.join(pdf_dev_path, 'pdf', 'images')
|
|
|
+ image_dir = str(os.path.basename(local_image_dir))
|
|
|
+ name_without_suff = os.path.basename(pdf_path).split(".pdf")[0]
|
|
|
dir_path = os.path.join(pdf_dev_path, 'mineru')
|
|
|
image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(dir_path)
|
|
|
reader1 = FileBasedDataReader("")
|
|
|
@@ -59,13 +118,120 @@ class TestCli:
|
|
|
## pipeline
|
|
|
pipe_result = infer_result.pipe_txt_mode(image_writer)
|
|
|
common.delete_file(dir_path)
|
|
|
- infer_result.draw_model(os.path.join(dir_path, f"{demo_name}_model.pdf"))
|
|
|
- pipe_result.draw_layout(os.path.join(dir_path, f"{demo_name}_layout.pdf"))
|
|
|
- pipe_result.draw_span(os.path.join(dir_path, f"{demo_name}_spans.pdf"))
|
|
|
- pipe_result.dump_md(md_writer, f"{demo_name}.md", image_dir)
|
|
|
- pipe_result.dump_content_list(md_writer, f"{demo_name}_content_list.json", image_dir)
|
|
|
+ ### draw model result on each page
|
|
|
+ infer_result.draw_model(os.path.join(dir_path, f"{name_without_suff}_model.pdf"))
|
|
|
+
|
|
|
+ ### get model inference result
|
|
|
+ model_inference_result = infer_result.get_infer_res()
|
|
|
+
|
|
|
+ ### draw layout result on each page
|
|
|
+ pipe_result.draw_layout(os.path.join(dir_path, f"{name_without_suff}_layout.pdf"))
|
|
|
+
|
|
|
+ ### draw spans result on each page
|
|
|
+ pipe_result.draw_span(os.path.join(dir_path, f"{name_without_suff}_spans.pdf"))
|
|
|
+
|
|
|
+ ### dump markdown
|
|
|
+ pipe_result.dump_md(md_writer, f"{name_without_suff}.md", image_dir)
|
|
|
+
|
|
|
+ ### dump content list
|
|
|
+ pipe_result.dump_content_list(md_writer, f"{name_without_suff}_content_list.json", image_dir)
|
|
|
+
|
|
|
+ ### get markdown content
|
|
|
+ md_content = pipe_result.get_markdown(image_dir, drop_mode=DropMode.WHOLE_PDF, md_make_mode=MakeMode.MM_MD)
|
|
|
+
|
|
|
+ ### get content list content
|
|
|
+ content_list_content = pipe_result.get_content_list(image_dir, drop_mode=DropMode.NONE, md_make_mode=MakeMode.STANDARD_FORMAT)
|
|
|
+
|
|
|
+ ### get middle json
|
|
|
+ middle_json_content = pipe_result.get_middle_json()
|
|
|
+ common.sdk_count_folders_and_check_contents(dir_path)
|
|
|
+
|
|
|
+
|
|
|
+ @pytest.mark.P0
|
|
|
+ def test_pdf_local_ppt(self):
|
|
|
+ """pdf sdk auto test."""
|
|
|
+ demo_names = list()
|
|
|
+ pdf_path = os.path.join(pdf_dev_path, 'ppt')
|
|
|
+ for pdf_file in os.listdir(pdf_path):
|
|
|
+ if pdf_file.endswith('.pptx'):
|
|
|
+ demo_names.append(pdf_file.split('.')[0])
|
|
|
+ for demo_name in demo_names:
|
|
|
+ pdf_path = os.path.join(pdf_dev_path, 'ppt', f'{demo_name}.pptx')
|
|
|
+ local_image_dir = os.path.join(pdf_dev_path, 'mineru', 'images')
|
|
|
+ image_dir = str(os.path.basename(local_image_dir))
|
|
|
+ name_without_suff = os.path.basename(pdf_path).split(".pptx")[0]
|
|
|
+ dir_path = os.path.join(pdf_dev_path, 'mineru')
|
|
|
+ image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(dir_path)
|
|
|
+ ds = read_local_office(pdf_path)[0]
|
|
|
+ common.delete_file(dir_path)
|
|
|
+
|
|
|
+ ds.apply(doc_analyze, ocr=True).pipe_txt_mode(image_writer).dump_md(md_writer, f"{name_without_suff}.md", image_dir)
|
|
|
+ common.sdk_count_folders_and_check_contents(dir_path)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ @pytest.mark.P0
|
|
|
+ def test_pdf_local_image(self):
|
|
|
+ """pdf sdk auto test."""
|
|
|
+ demo_names = list()
|
|
|
+ pdf_path = os.path.join(pdf_dev_path, 'images')
|
|
|
+ for pdf_file in os.listdir(pdf_path):
|
|
|
+ if pdf_file.endswith('.jpg'):
|
|
|
+ demo_names.append(pdf_file.split('.')[0])
|
|
|
+ for demo_name in demo_names:
|
|
|
+ pdf_path = os.path.join(pdf_dev_path, 'images', f'{demo_name}.jpg')
|
|
|
+ local_image_dir = os.path.join(pdf_dev_path, 'mineru', 'images')
|
|
|
+ image_dir = str(os.path.basename(local_image_dir))
|
|
|
+ name_without_suff = os.path.basename(pdf_path).split(".jpg")[0]
|
|
|
+ dir_path = os.path.join(pdf_dev_path, 'mineru')
|
|
|
+ common.delete_file(dir_path)
|
|
|
+ image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(dir_path)
|
|
|
+ ds = read_local_images(pdf_path)[0]
|
|
|
+ ds.apply(doc_analyze, ocr=True).pipe_ocr_mode(image_writer).dump_md(
|
|
|
+ md_writer, f"{name_without_suff}.md", image_dir)
|
|
|
+ common.sdk_count_folders_and_check_contents(dir_path)
|
|
|
+
|
|
|
+
|
|
|
+ @pytest.mark.P0
|
|
|
+ def test_local_image_dir(self):
|
|
|
+ """local image dir."""
|
|
|
+ demo_names = list()
|
|
|
+ pdf_path = os.path.join(pdf_dev_path, 'images')
|
|
|
+ dir_path = os.path.join(pdf_dev_path, 'mineru')
|
|
|
+ local_image_dir = os.path.join(pdf_dev_path, 'mineru', 'images')
|
|
|
+ image_dir = str(os.path.basename(local_image_dir))
|
|
|
+ image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(dir_path)
|
|
|
+ common.delete_file(dir_path)
|
|
|
+ dss = read_local_images(pdf_path, suffixes=['.png', '.jpg'])
|
|
|
+ count = 0
|
|
|
+ for ds in dss:
|
|
|
+ ds.apply(doc_analyze, ocr=True).pipe_ocr_mode(image_writer).dump_md(md_writer, f"{count}.md", image_dir)
|
|
|
+ count += 1
|
|
|
+ common.sdk_count_folders_and_check_contents(dir_path)
|
|
|
+
|
|
|
+ def test_local_doc_parse(self):
|
|
|
+ """
|
|
|
+ doc 解析
|
|
|
+ """
|
|
|
+ demo_names = list()
|
|
|
+ pdf_path = os.path.join(pdf_dev_path, 'doc')
|
|
|
+ for pdf_file in os.listdir(pdf_path):
|
|
|
+ if pdf_file.endswith('.docx'):
|
|
|
+ demo_names.append(pdf_file.split('.')[0])
|
|
|
+ for demo_name in demo_names:
|
|
|
+ pdf_path = os.path.join(pdf_dev_path, 'doc', f'{demo_name}.docx')
|
|
|
+ local_image_dir = os.path.join(pdf_dev_path, 'mineru', 'images')
|
|
|
+ image_dir = str(os.path.basename(local_image_dir))
|
|
|
+ name_without_suff = os.path.basename(pdf_path).split(".docx")[0]
|
|
|
+ dir_path = os.path.join(pdf_dev_path, 'mineru')
|
|
|
+ image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(dir_path)
|
|
|
+ ds = read_local_office(pdf_path)[0]
|
|
|
+ common.delete_file(dir_path)
|
|
|
+
|
|
|
+ ds.apply(doc_analyze, ocr=True).pipe_txt_mode(image_writer).dump_md(md_writer, f"{name_without_suff}.md", image_dir)
|
|
|
common.sdk_count_folders_and_check_contents(dir_path)
|
|
|
|
|
|
+
|
|
|
@pytest.mark.P0
|
|
|
def test_pdf_cli_auto(self):
|
|
|
"""magic_pdf cli test auto."""
|