| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- import json
- import os
- from base64 import b64encode
- from glob import glob
- from io import StringIO
- import tempfile
- from typing import Tuple, Union
- import uvicorn
- from fastapi import FastAPI, HTTPException, UploadFile
- from fastapi.responses import JSONResponse
- from loguru import logger
- from magic_pdf.data.read_api import read_local_images, read_local_office
- import magic_pdf.model as model_config
- from magic_pdf.config.enums import SupportedPdfParseMethod
- from magic_pdf.data.data_reader_writer import DataWriter, FileBasedDataWriter
- from magic_pdf.data.data_reader_writer.s3 import S3DataReader, S3DataWriter
- from magic_pdf.data.dataset import ImageDataset, PymuDocDataset
- from magic_pdf.libs.config_reader import get_bucket_name, get_s3_config
- from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
- from magic_pdf.operators.models import InferenceResult
- from magic_pdf.operators.pipes import PipeResult
- model_config.__use_inside_model__ = True
- app = FastAPI()
- pdf_extensions = [".pdf"]
- office_extensions = [".ppt", ".pptx", ".doc", ".docx"]
- image_extensions = [".png", ".jpg"]
- class MemoryDataWriter(DataWriter):
- def __init__(self):
- self.buffer = StringIO()
- def write(self, path: str, data: bytes) -> None:
- if isinstance(data, str):
- self.buffer.write(data)
- else:
- self.buffer.write(data.decode("utf-8"))
- def write_string(self, path: str, data: str) -> None:
- self.buffer.write(data)
- def get_value(self) -> str:
- return self.buffer.getvalue()
- def close(self):
- self.buffer.close()
- def init_writers(
- file_path: str = None,
- file: UploadFile = None,
- output_path: str = None,
- output_image_path: str = None,
- ) -> Tuple[
- Union[S3DataWriter, FileBasedDataWriter],
- Union[S3DataWriter, FileBasedDataWriter],
- bytes,
- ]:
- """
- Initialize writers based on path type
- Args:
- file_path: file path (local path or S3 path)
- file: Uploaded file object
- output_path: Output directory path
- output_image_path: Image output directory path
- Returns:
- Tuple[writer, image_writer, file_bytes]: Returns initialized writer tuple and file content
- """
- file_extension:str = None
- if file_path:
- is_s3_path = file_path.startswith("s3://")
- if is_s3_path:
- bucket = get_bucket_name(file_path)
- ak, sk, endpoint = get_s3_config(bucket)
- writer = S3DataWriter(
- output_path, bucket=bucket, ak=ak, sk=sk, endpoint_url=endpoint
- )
- image_writer = S3DataWriter(
- output_image_path, bucket=bucket, ak=ak, sk=sk, endpoint_url=endpoint
- )
- # 临时创建reader读取文件内容
- temp_reader = S3DataReader(
- "", bucket=bucket, ak=ak, sk=sk, endpoint_url=endpoint
- )
- file_bytes = temp_reader.read(file_path)
- file_extension = os.path.splitext(file_path)[1]
- else:
- writer = FileBasedDataWriter(output_path)
- image_writer = FileBasedDataWriter(output_image_path)
- os.makedirs(output_image_path, exist_ok=True)
- with open(file_path, "rb") as f:
- file_bytes = f.read()
- file_extension = os.path.splitext(file_path)[1]
- else:
- # 处理上传的文件
- file_bytes = file.file.read()
- file_extension = os.path.splitext(file.filename)[1]
- writer = FileBasedDataWriter(output_path)
- image_writer = FileBasedDataWriter(output_image_path)
- os.makedirs(output_image_path, exist_ok=True)
- return writer, image_writer, file_bytes, file_extension
- def process_file(
- file_bytes: bytes,
- file_extension: str,
- parse_method: str,
- image_writer: Union[S3DataWriter, FileBasedDataWriter],
- ) -> Tuple[InferenceResult, PipeResult]:
- """
- Process PDF file content
- Args:
- file_bytes: Binary content of file
- file_extension: file extension
- parse_method: Parse method ('ocr', 'txt', 'auto')
- image_writer: Image writer
- Returns:
- Tuple[InferenceResult, PipeResult]: Returns inference result and pipeline result
- """
- ds = Union[PymuDocDataset, ImageDataset]
- if file_extension in pdf_extensions:
- ds = PymuDocDataset(file_bytes)
- elif file_extension in office_extensions:
- # 需要使用office解析
- temp_dir = tempfile.mkdtemp()
- with open(os.path.join(temp_dir, f"temp_file.{file_extension}"), "wb") as f:
- f.write(file_bytes)
- ds = read_local_office(temp_dir)[0]
- elif file_extension in image_extensions:
- # 需要使用ocr解析
- temp_dir = tempfile.mkdtemp()
- with open(os.path.join(temp_dir, f"temp_file.{file_extension}"), "wb") as f:
- f.write(file_bytes)
- ds = read_local_images(temp_dir)[0]
- infer_result: InferenceResult = None
- pipe_result: PipeResult = None
- if parse_method == "ocr":
- infer_result = ds.apply(doc_analyze, ocr=True)
- pipe_result = infer_result.pipe_ocr_mode(image_writer)
- elif parse_method == "txt":
- infer_result = ds.apply(doc_analyze, ocr=False)
- pipe_result = infer_result.pipe_txt_mode(image_writer)
- else: # auto
- if ds.classify() == SupportedPdfParseMethod.OCR:
- infer_result = ds.apply(doc_analyze, ocr=True)
- pipe_result = infer_result.pipe_ocr_mode(image_writer)
- else:
- infer_result = ds.apply(doc_analyze, ocr=False)
- pipe_result = infer_result.pipe_txt_mode(image_writer)
- return infer_result, pipe_result
- def encode_image(image_path: str) -> str:
- """Encode image using base64"""
- with open(image_path, "rb") as f:
- return b64encode(f.read()).decode()
- @app.post(
- "/file_parse",
- tags=["projects"],
- summary="Parse files (supports local files and S3)",
- )
- async def file_parse(
- file: UploadFile = None,
- file_path: str = None,
- parse_method: str = "auto",
- is_json_md_dump: bool = False,
- output_dir: str = "output",
- return_layout: bool = False,
- return_info: bool = False,
- return_content_list: bool = False,
- return_images: bool = False,
- ):
- """
- Execute the process of converting PDF to JSON and MD, outputting MD and JSON files
- to the specified directory.
- Args:
- file: The PDF file to be parsed. Must not be specified together with
- `file_path`
- file_path: The path to the PDF file to be parsed. Must not be specified together
- with `file`
- parse_method: Parsing method, can be auto, ocr, or txt. Default is auto. If
- results are not satisfactory, try ocr
- is_json_md_dump: Whether to write parsed data to .json and .md files. Default
- to False. Different stages of data will be written to different .json files
- (3 in total), md content will be saved to .md file
- output_dir: Output directory for results. A folder named after the PDF file
- will be created to store all results
- return_layout: Whether to return parsed PDF layout. Default to False
- return_info: Whether to return parsed PDF info. Default to False
- return_content_list: Whether to return parsed PDF content list. Default to False
- """
- try:
- if (file is None and file_path is None) or (
- file is not None and file_path is not None
- ):
- return JSONResponse(
- content={"error": "Must provide either file or file_path"},
- status_code=400,
- )
- # Get PDF filename
- file_name = os.path.basename(file_path if file_path else file.filename).split(
- "."
- )[0]
- output_path = f"{output_dir}/{file_name}"
- output_image_path = f"{output_path}/images"
- # Initialize readers/writers and get PDF content
- writer, image_writer, file_bytes, file_extension = init_writers(
- file_path=file_path,
- file=file,
- output_path=output_path,
- output_image_path=output_image_path,
- )
- # Process PDF
- infer_result, pipe_result = process_file(file_bytes, file_extension, parse_method, image_writer)
- # Use MemoryDataWriter to get results
- content_list_writer = MemoryDataWriter()
- md_content_writer = MemoryDataWriter()
- middle_json_writer = MemoryDataWriter()
- # Use PipeResult's dump method to get data
- pipe_result.dump_content_list(content_list_writer, "", "images")
- pipe_result.dump_md(md_content_writer, "", "images")
- pipe_result.dump_middle_json(middle_json_writer, "")
- # Get content
- content_list = json.loads(content_list_writer.get_value())
- md_content = md_content_writer.get_value()
- middle_json = json.loads(middle_json_writer.get_value())
- model_json = infer_result.get_infer_res()
- # If results need to be saved
- if is_json_md_dump:
- writer.write_string(
- f"{file_name}_content_list.json", content_list_writer.get_value()
- )
- writer.write_string(f"{file_name}.md", md_content)
- writer.write_string(
- f"{file_name}_middle.json", middle_json_writer.get_value()
- )
- writer.write_string(
- f"{file_name}_model.json",
- json.dumps(model_json, indent=4, ensure_ascii=False),
- )
- # Save visualization results
- pipe_result.draw_layout(os.path.join(output_path, f"{file_name}_layout.pdf"))
- pipe_result.draw_span(os.path.join(output_path, f"{file_name}_spans.pdf"))
- pipe_result.draw_line_sort(
- os.path.join(output_path, f"{file_name}_line_sort.pdf")
- )
- infer_result.draw_model(os.path.join(output_path, f"{file_name}_model.pdf"))
- # Build return data
- data = {}
- if return_layout:
- data["layout"] = model_json
- if return_info:
- data["info"] = middle_json
- if return_content_list:
- data["content_list"] = content_list
- if return_images:
- image_paths = glob(f"{output_image_path}/*.jpg")
- data["images"] = {
- os.path.basename(
- image_path
- ): f"data:image/jpeg;base64,{encode_image(image_path)}"
- for image_path in image_paths
- }
- data["md_content"] = md_content # md_content is always returned
- # Clean up memory writers
- content_list_writer.close()
- md_content_writer.close()
- middle_json_writer.close()
- return JSONResponse(data, status_code=200)
- except Exception as e:
- logger.exception(e)
- return JSONResponse(content={"error": str(e)}, status_code=500)
- if __name__ == "__main__":
- uvicorn.run(app, host="0.0.0.0", port=8888)
|