|
|
@@ -12,6 +12,10 @@ from magic_pdf.data.data_reader_writer import FileBasedDataWriter
|
|
|
from magic_pdf.data.data_reader_writer import S3DataReader, S3DataWriter
|
|
|
from magic_pdf.config.make_content_config import DropMode, MakeMode
|
|
|
from magic_pdf.pipe.OCRPipe import OCRPipe
|
|
|
+from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
|
|
|
+from magic_pdf.data.dataset import PymuDocDataset
|
|
|
+from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
|
|
|
+from magic_pdf.config.enums import SupportedPdfParseMethod
|
|
|
model_config.__use_inside_model__ = True
|
|
|
pdf_res_path = conf.conf['pdf_res_path']
|
|
|
code_path = conf.conf['code_path']
|
|
|
@@ -40,103 +44,31 @@ class TestCli:
|
|
|
demo_names.append(pdf_file.split('.')[0])
|
|
|
for demo_name in demo_names:
|
|
|
pdf_path = os.path.join(pdf_dev_path, 'pdf', f'{demo_name}.pdf')
|
|
|
- print(pdf_path)
|
|
|
- pdf_bytes = open(pdf_path, 'rb').read()
|
|
|
local_image_dir = os.path.join(pdf_dev_path, 'pdf', 'images')
|
|
|
image_dir = str(os.path.basename(local_image_dir))
|
|
|
- image_writer = FileBasedDataWriter(local_image_dir)
|
|
|
- model_json = list()
|
|
|
- jso_useful_key = {'_pdf_type': '', 'model_list': model_json}
|
|
|
- pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
|
|
|
- pipe.pipe_classify()
|
|
|
- if len(model_json) == 0:
|
|
|
- if model_config.__use_inside_model__:
|
|
|
- pipe.pipe_analyze()
|
|
|
- else:
|
|
|
- exit(1)
|
|
|
- pipe.pipe_parse()
|
|
|
- md_content = pipe.pipe_mk_markdown(image_dir, drop_mode='none')
|
|
|
dir_path = os.path.join(pdf_dev_path, 'mineru')
|
|
|
- if not os.path.exists(dir_path):
|
|
|
- os.makedirs(dir_path, exist_ok=True)
|
|
|
- res_path = os.path.join(dir_path, f'{demo_name}.md')
|
|
|
- common.delete_file(res_path)
|
|
|
- with open(res_path, 'w+', encoding='utf-8') as f:
|
|
|
- f.write(md_content)
|
|
|
- common.sdk_count_folders_and_check_contents(res_path)
|
|
|
+ image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(dir_path)
|
|
|
+ reader1 = FileBasedDataReader("")
|
|
|
+ pdf_bytes = reader1.read(pdf_path)
|
|
|
+ ds = PymuDocDataset(pdf_bytes)
|
|
|
+ ## inference
|
|
|
+ if ds.classify() == SupportedPdfParseMethod.OCR:
|
|
|
+ infer_result = ds.apply(doc_analyze, ocr=True)
|
|
|
+ ## pipeline
|
|
|
+ pipe_result = infer_result.pipe_ocr_mode(image_writer)
|
|
|
+ else:
|
|
|
+ infer_result = ds.apply(doc_analyze, ocr=False)
|
|
|
+ ## pipeline
|
|
|
+ pipe_result = infer_result.pipe_txt_mode(image_writer)
|
|
|
+ common.delete_file(dir_path)
|
|
|
+ infer_result.draw_model(os.path.join(dir_path, f"{demo_name}_model.pdf"))
|
|
|
+ pipe_result.draw_layout(os.path.join(dir_path, f"{demo_name}_layout.pdf"))
|
|
|
+ pipe_result.draw_span(os.path.join(dir_path, f"{demo_name}_spans.pdf"))
|
|
|
+ pipe_result.dump_md(md_writer, f"{demo_name}.md", image_dir)
|
|
|
+ pipe_result.dump_content_list(md_writer, f"{demo_name}_content_list.json", image_dir)
|
|
|
+ common.sdk_count_folders_and_check_contents(dir_path)
|
|
|
|
|
|
@pytest.mark.P0
|
|
|
- def test_pdf_ocr_sdk(self):
|
|
|
- """pdf sdk ocr test."""
|
|
|
- time.sleep(2)
|
|
|
- demo_names = list()
|
|
|
- pdf_path = os.path.join(pdf_dev_path, 'pdf')
|
|
|
- for pdf_file in os.listdir(pdf_path):
|
|
|
- if pdf_file.endswith('.pdf'):
|
|
|
- demo_names.append(pdf_file.split('.')[0])
|
|
|
- for demo_name in demo_names:
|
|
|
- pdf_path = os.path.join(pdf_dev_path, 'pdf', f'{demo_name}.pdf')
|
|
|
- print(pdf_path)
|
|
|
- pdf_bytes = open(pdf_path, 'rb').read()
|
|
|
- local_image_dir = os.path.join(pdf_dev_path, 'pdf', 'images')
|
|
|
- image_dir = str(os.path.basename(local_image_dir))
|
|
|
- image_writer = FileBasedDataWriter(local_image_dir)
|
|
|
- model_json = list()
|
|
|
- jso_useful_key = {'_pdf_type': 'ocr', 'model_list': model_json}
|
|
|
- pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
|
|
|
- pipe.pipe_classify()
|
|
|
- if len(model_json) == 0:
|
|
|
- if model_config.__use_inside_model__:
|
|
|
- pipe.pipe_analyze()
|
|
|
- else:
|
|
|
- exit(1)
|
|
|
- pipe.pipe_parse()
|
|
|
- md_content = pipe.pipe_mk_markdown(image_dir, drop_mode='none')
|
|
|
- dir_path = os.path.join(pdf_dev_path, 'mineru')
|
|
|
- if not os.path.exists(dir_path):
|
|
|
- os.makedirs(dir_path, exist_ok=True)
|
|
|
- res_path = os.path.join(dir_path, f'{demo_name}.md')
|
|
|
- common.delete_file(res_path)
|
|
|
- with open(res_path, 'w+', encoding='utf-8') as f:
|
|
|
- f.write(md_content)
|
|
|
- common.sdk_count_folders_and_check_contents(res_path)
|
|
|
-
|
|
|
- @pytest.mark.P0
|
|
|
- def test_pdf_txt_sdk(self):
|
|
|
- """pdf sdk txt test."""
|
|
|
- time.sleep(2)
|
|
|
- demo_names = list()
|
|
|
- pdf_path = os.path.join(pdf_dev_path, 'pdf')
|
|
|
- for pdf_file in os.listdir(pdf_path):
|
|
|
- if pdf_file.endswith('.pdf'):
|
|
|
- demo_names.append(pdf_file.split('.')[0])
|
|
|
- for demo_name in demo_names:
|
|
|
- pdf_path = os.path.join(pdf_dev_path, 'pdf', f'{demo_name}.pdf')
|
|
|
- pdf_bytes = open(pdf_path, 'rb').read()
|
|
|
- local_image_dir = os.path.join(pdf_dev_path, 'pdf', 'images')
|
|
|
- image_dir = str(os.path.basename(local_image_dir))
|
|
|
- image_writer = FileBasedDataWriter(local_image_dir)
|
|
|
- model_json = list()
|
|
|
- jso_useful_key = {'_pdf_type': 'txt', 'model_list': model_json}
|
|
|
- pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
|
|
|
- pipe.pipe_classify()
|
|
|
- if len(model_json) == 0:
|
|
|
- if model_config.__use_inside_model__:
|
|
|
- pipe.pipe_analyze()
|
|
|
- else:
|
|
|
- exit(1)
|
|
|
- pipe.pipe_parse()
|
|
|
- md_content = pipe.pipe_mk_markdown(image_dir, drop_mode='none')
|
|
|
- dir_path = os.path.join(pdf_dev_path, 'mineru')
|
|
|
- if not os.path.exists(dir_path):
|
|
|
- os.makedirs(dir_path, exist_ok=True)
|
|
|
- res_path = os.path.join(dir_path, f'{demo_name}.md')
|
|
|
- common.delete_file(res_path)
|
|
|
- with open(res_path, 'w+', encoding='utf-8') as f:
|
|
|
- f.write(md_content)
|
|
|
- common.sdk_count_folders_and_check_contents(res_path)
|
|
|
-
|
|
|
- @pytest.mark.P0
|
|
|
def test_pdf_cli_auto(self):
|
|
|
"""magic_pdf cli test auto."""
|
|
|
time.sleep(2)
|
|
|
@@ -154,7 +86,7 @@ class TestCli:
|
|
|
os.system(cmd)
|
|
|
common.cli_count_folders_and_check_contents(
|
|
|
os.path.join(res_path, demo_name, 'auto'))
|
|
|
-
|
|
|
+
|
|
|
@pytest.mark.P0
|
|
|
def test_pdf_cli_txt(self):
|
|
|
"""magic_pdf cli test txt."""
|
|
|
@@ -273,9 +205,10 @@ class TestCli:
|
|
|
cmd = 'magic-pdf-dev --pdf %s --json %s --method %s' % (pdf_path, json_path, 'auto')
|
|
|
logging.info(cmd)
|
|
|
os.system(cmd)
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
@pytest.mark.P1
|
|
|
- def test_s3_sdk_suto(self):
|
|
|
+ def test_s3_sdk_auto(self):
|
|
|
"""
|
|
|
test s3 sdk auto.
|
|
|
"""
|
|
|
@@ -289,16 +222,46 @@ class TestCli:
|
|
|
image_dir = "s3://" + pdf_bucket + "/mineru/test/output"
|
|
|
prefix = "mineru/test/output"
|
|
|
reader = S3DataReader(prefix, pdf_bucket, pdf_ak, pdf_sk, pdf_endpoint)
|
|
|
+ writer = S3DataWriter(prefix, pdf_bucket, pdf_ak, pdf_sk, pdf_endpoint)
|
|
|
# = S3DataWriter(prefix, pdf_bucket, pdf_ak, pdf_sk, pdf_endpoint)
|
|
|
image_writer = S3DataWriter(prefix, pdf_bucket, pdf_ak, pdf_sk, pdf_endpoint)
|
|
|
- pdf_bytes = reader.read(s3_pdf_path)
|
|
|
- model_list = []
|
|
|
- pipe = OCRPipe(pdf_bytes, model_list, image_writer)
|
|
|
- pipe.pipe_classify()
|
|
|
- pipe.pipe_analyze()
|
|
|
- pipe.pipe_parse()
|
|
|
- md_content = pipe.pipe_mk_markdown(image_dir, drop_mode="none")
|
|
|
- assert len(md_content) > 0
|
|
|
+ local_dir = "output"
|
|
|
+ name_without_suff = os.path.basename(s3_pdf_path).split(".")[0]
|
|
|
+
|
|
|
+ # read bytes
|
|
|
+ pdf_bytes = reader.read(s3_pdf_path) # read the pdf content
|
|
|
+
|
|
|
+ # proc
|
|
|
+ ## Create Dataset Instance
|
|
|
+ ds = PymuDocDataset(pdf_bytes)
|
|
|
+
|
|
|
+ ## inference
|
|
|
+ if ds.classify() == SupportedPdfParseMethod.OCR:
|
|
|
+ infer_result = ds.apply(doc_analyze, ocr=True)
|
|
|
+
|
|
|
+ ## pipeline
|
|
|
+ pipe_result = infer_result.pipe_ocr_mode(image_writer)
|
|
|
+ else:
|
|
|
+ infer_result = ds.apply(doc_analyze, ocr=False)
|
|
|
+
|
|
|
+ ## pipeline
|
|
|
+ pipe_result = infer_result.pipe_txt_mode(image_writer)
|
|
|
+
|
|
|
+ ### draw model result on each page
|
|
|
+ infer_result.draw_model(os.path.join(local_dir, f'{name_without_suff}_model.pdf')) # dump to local
|
|
|
+
|
|
|
+ ### draw layout result on each page
|
|
|
+ pipe_result.draw_layout(os.path.join(local_dir, f'{name_without_suff}_layout.pdf')) # dump to local
|
|
|
+
|
|
|
+ ### draw spans result on each page
|
|
|
+ pipe_result.draw_span(os.path.join(local_dir, f'{name_without_suff}_spans.pdf')) # dump to local
|
|
|
+
|
|
|
+ ### dump markdown
|
|
|
+ pipe_result.dump_md(writer, f'{name_without_suff}.md', "unittest/tmp/images") # dump to remote s3
|
|
|
+
|
|
|
+ ### dump content list
|
|
|
+ pipe_result.dump_content_list(writer, f"{name_without_suff}_content_list.json", image_dir)
|
|
|
+
|
|
|
|
|
|
@pytest.mark.P1
|
|
|
def test_local_magic_pdf_open_st_table(self):
|