""" Layout Inference Web Application with Gradio - Annotation Version A Gradio-based layout inference tool that supports image uploads and multiple backend inference engines. This version adds an image annotation feature, allowing users to draw bounding boxes on an image and send both the image and the boxes to the model. """ import gradio as gr import json import os import io import tempfile import base64 import zipfile import uuid import re from pathlib import Path from PIL import Image import requests from gradio_image_annotation import image_annotator # Local utility imports from dots_ocr.utils import dict_promptmode_to_prompt from dots_ocr.utils.consts import MIN_PIXELS, MAX_PIXELS from dots_ocr.utils.demo_utils.display import read_image from dots_ocr.utils.doc_utils import load_images_from_pdf # Add DotsOCRParser import from dots_ocr.parser import DotsOCRParser # ==================== Configuration ==================== DEFAULT_CONFIG = { 'ip': "127.0.0.1", 'port_vllm': 8000, 'min_pixels': MIN_PIXELS, 'max_pixels': MAX_PIXELS, 'test_images_dir': "./assets/showcase_origin", } # ==================== Global Variables ==================== # Store the current configuration current_config = DEFAULT_CONFIG.copy() # Create a DotsOCRParser instance dots_parser = DotsOCRParser( ip=DEFAULT_CONFIG['ip'], port=DEFAULT_CONFIG['port_vllm'], dpi=200, min_pixels=DEFAULT_CONFIG['min_pixels'], max_pixels=DEFAULT_CONFIG['max_pixels'] ) # Store processing results processing_results = { 'original_image': None, 'processed_image': None, 'layout_result': None, 'markdown_content': None, 'cells_data': None, 'temp_dir': None, 'session_id': None, 'result_paths': None, 'annotation_data': None # Store annotation data } # ==================== Utility Functions ==================== def read_image_v2(img): """Reads an image, supporting URLs and local paths.""" if isinstance(img, str) and img.startswith(("http://", "https://")): with requests.get(img, stream=True) as response: response.raise_for_status() img = Image.open(io.BytesIO(response.content)) elif isinstance(img, str): img, _, _ = read_image(img, use_native=True) elif isinstance(img, Image.Image): pass else: raise ValueError(f"Invalid image type: {type(img)}") return img def get_test_images(): """Gets the list of test images.""" test_images = [] test_dir = current_config['test_images_dir'] if os.path.exists(test_dir): test_images = [os.path.join(test_dir, name) for name in os.listdir(test_dir) if name.lower().endswith(('.png', '.jpg', '.jpeg'))] return test_images def create_temp_session_dir(): """Creates a unique temporary directory for each processing request.""" session_id = uuid.uuid4().hex[:8] temp_dir = os.path.join(tempfile.gettempdir(), f"dots_ocr_demo_{session_id}") os.makedirs(temp_dir, exist_ok=True) return temp_dir, session_id def parse_image_with_bbox(parser, image, prompt_mode, bbox=None, fitz_preprocess=False): """ Processes an image using DotsOCRParser, with support for the bbox parameter. """ # Create a temporary session directory temp_dir, session_id = create_temp_session_dir() try: # Save the PIL Image to a temporary file temp_image_path = os.path.join(temp_dir, f"input_{session_id}.png") image.save(temp_image_path, "PNG") # Use the high-level parse_image interface, passing the bbox parameter filename = f"demo_{session_id}" results = parser.parse_image( input_path=temp_image_path, filename=filename, prompt_mode=prompt_mode, save_dir=temp_dir, bbox=bbox, fitz_preprocess=fitz_preprocess ) # Parse the results if not results: raise ValueError("No results returned from parser") result = results[0] # parse_image returns a list with a single result # Read the result files layout_image = None cells_data = None md_content = None filtered = False # Read the layout image if 'layout_image_path' in result and os.path.exists(result['layout_image_path']): layout_image = Image.open(result['layout_image_path']) # Read the JSON data if 'layout_info_path' in result and os.path.exists(result['layout_info_path']): with open(result['layout_info_path'], 'r', encoding='utf-8') as f: cells_data = json.load(f) # Read the Markdown content if 'md_content_path' in result and os.path.exists(result['md_content_path']): with open(result['md_content_path'], 'r', encoding='utf-8') as f: md_content = f.read() # Check for the original response file (if JSON parsing fails) if 'filtered' in result: filtered = result['filtered'] return { 'layout_image': layout_image, 'cells_data': cells_data, 'md_content': md_content, 'filtered': filtered, 'temp_dir': temp_dir, 'session_id': session_id, 'result_paths': result } except Exception as e: # Clean up the temporary directory on error import shutil if os.path.exists(temp_dir): shutil.rmtree(temp_dir, ignore_errors=True) raise e def process_annotation_data(annotation_data): """Processes annotation data, converting it to the format required by the model.""" if not annotation_data or not annotation_data.get('boxes'): return None, None # Get image and box data image = annotation_data.get('image') boxes = annotation_data.get('boxes', []) if not boxes: return image, None # Ensure the image is in PIL Image format if image is not None: import numpy as np if isinstance(image, np.ndarray): image = Image.fromarray(image) elif not isinstance(image, Image.Image): # If it's another format, try to convert it try: image = Image.open(image) if isinstance(image, str) else Image.fromarray(image) except Exception as e: print(f"Image format conversion failed: {e}") return None, None # Get the coordinate information of the box (only one box) box = boxes[0] bbox = [box['xmin'], box['ymin'], box['xmax'], box['ymax']] return image, bbox # ==================== Core Processing Function ==================== def process_image_inference_with_annotation(annotation_data, test_image_input, prompt_mode, server_ip, server_port, min_pixels, max_pixels, fitz_preprocess=False ): """Core function for image inference, supporting annotation data.""" global current_config, processing_results, dots_parser # First, clean up previous processing results if processing_results.get('temp_dir') and os.path.exists(processing_results['temp_dir']): import shutil try: shutil.rmtree(processing_results['temp_dir'], ignore_errors=True) except Exception as e: print(f"Failed to clean up previous temporary directory: {e}") # Reset processing results processing_results = { 'original_image': None, 'processed_image': None, 'layout_result': None, 'markdown_content': None, 'cells_data': None, 'temp_dir': None, 'session_id': None, 'result_paths': None, 'annotation_data': annotation_data } # Update configuration current_config.update({ 'ip': server_ip, 'port_vllm': server_port, 'min_pixels': min_pixels, 'max_pixels': max_pixels }) # Update parser configuration dots_parser.ip = server_ip dots_parser.port = server_port dots_parser.min_pixels = min_pixels dots_parser.max_pixels = max_pixels # Determine the input source and process annotation data image = None bbox = None # Prioritize processing annotation data if annotation_data and annotation_data.get('image') is not None: image, bbox = process_annotation_data(annotation_data) if image is not None: # If there's a bbox, force the use of 'prompt_grounding_ocr' mode assert bbox is not None prompt_mode = "prompt_grounding_ocr" # If there's no annotation data, check the test image input if image is None and test_image_input and test_image_input != "": try: image = read_image_v2(test_image_input) except Exception as e: return None, f"Failed to read test image: {e}", "", "", gr.update(value=None), "" if image is None: return None, "Please select a test image or add an image in the annotation component", "", "", gr.update(value=None), "" if bbox is None: return None, "Please select a bounding box by mouse", "Please select a bounding box by mouse", "", "", gr.update(value=None) try: # Process using DotsOCRParser, passing the bbox parameter original_image = image parse_result = parse_image_with_bbox(dots_parser, image, prompt_mode, bbox, fitz_preprocess) # Extract parsing results layout_image = parse_result['layout_image'] cells_data = parse_result['cells_data'] md_content = parse_result['md_content'] filtered = parse_result['filtered'] # Store the results processing_results.update({ 'original_image': original_image, 'processed_image': None, 'layout_result': layout_image, 'markdown_content': md_content, 'cells_data': cells_data, 'temp_dir': parse_result['temp_dir'], 'session_id': parse_result['session_id'], 'result_paths': parse_result['result_paths'], 'annotation_data': annotation_data }) # Handle the case where parsing fails if filtered: info_text = f""" **Image Information:** - Original Dimensions: {original_image.width} x {original_image.height} - Processing Mode: {'Region OCR' if bbox else 'Full Image OCR'} - Processing Status: JSON parsing failed, using cleaned text output - Server: {current_config['ip']}:{current_config['port_vllm']} - Session ID: {parse_result['session_id']} - Box Coordinates: {bbox if bbox else 'None'} """ return ( md_content or "No markdown content generated", info_text, md_content or "No markdown content generated", md_content or "No markdown content generated", gr.update(visible=False), "" ) # Handle the case where JSON parsing succeeds num_elements = len(cells_data) if cells_data else 0 info_text = f""" **Image Information:** - Original Dimensions: {original_image.width} x {original_image.height} - Processing Mode: {'Region OCR' if bbox else 'Full Image OCR'} - Server: {current_config['ip']}:{current_config['port_vllm']} - Detected {num_elements} layout elements - Session ID: {parse_result['session_id']} - Box Coordinates: {bbox if bbox else 'None'} """ # Current page JSON output current_json = "" if cells_data: try: current_json = json.dumps(cells_data, ensure_ascii=False, indent=2) except: current_json = str(cells_data) # Create a downloadable ZIP file download_zip_path = None if parse_result['temp_dir']: download_zip_path = os.path.join(parse_result['temp_dir'], f"layout_results_{parse_result['session_id']}.zip") try: with zipfile.ZipFile(download_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(parse_result['temp_dir']): for file in files: if file.endswith('.zip'): continue file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, parse_result['temp_dir']) zipf.write(file_path, arcname) except Exception as e: print(f"Failed to create download ZIP: {e}") download_zip_path = None return ( md_content or "No markdown content generated", info_text, md_content or "No markdown content generated", md_content or "No markdown content generated", gr.update(value=download_zip_path, visible=True) if download_zip_path else gr.update(visible=False), current_json ) except Exception as e: return f"An error occurred during processing: {e}", f"An error occurred during processing: {e}", "", "", gr.update(value=None), "" def load_image_to_annotator(test_image_input): """Loads an image into the annotation component.""" image = None # Check the test image input if test_image_input and test_image_input != "": try: image = read_image_v2(test_image_input) except Exception as e: return None if image is None: return None # Return the format required by the annotation component return { "image": image, "boxes": [] } def clear_all_data(): """Clears all data.""" global processing_results # Clean up the temporary directory if processing_results.get('temp_dir') and os.path.exists(processing_results['temp_dir']): import shutil try: shutil.rmtree(processing_results['temp_dir'], ignore_errors=True) except Exception as e: print(f"Failed to clean up temporary directory: {e}") # Reset processing results processing_results = { 'original_image': None, 'processed_image': None, 'layout_result': None, 'markdown_content': None, 'cells_data': None, 'temp_dir': None, 'session_id': None, 'result_paths': None, 'annotation_data': None } return ( "", # Clear test image selection None, # Clear annotation component "Waiting for processing results...", # Reset info display "## Waiting for processing results...", # Reset Markdown display "🕐 Waiting for parsing results...", # Clear raw Markdown text gr.update(visible=False), # Hide download button "🕐 Waiting for parsing results..." # Clear JSON ) def update_prompt_display(prompt_mode): """Updates the displayed prompt content.""" return dict_promptmode_to_prompt[prompt_mode] # ==================== Gradio Interface ==================== def create_gradio_interface(): """Creates the Gradio interface.""" # CSS styling to match the reference style css = """ footer { visibility: hidden; } #info_box { padding: 10px; background-color: #f8f9fa; border-radius: 8px; border: 1px solid #dee2e6; margin: 10px 0; font-size: 14px; } #markdown_tabs { height: 100%; } #annotation_component { border-radius: 8px; } """ with gr.Blocks(theme="ocean", css=css, title='dots.ocr - Annotation') as demo: # Title gr.HTML("""

🔍 dots.ocr - Annotation Version

Supports image annotation, drawing boxes, and sending box information to the model for OCR.
""") with gr.Row(): # Left side: Input and Configuration with gr.Column(scale=1, variant="compact"): gr.Markdown("### 📁 Select Example") test_images = get_test_images() test_image_input = gr.Dropdown( label="Select Example", choices=[""] + test_images, value="", show_label=True ) # Button to load image into the annotation component load_btn = gr.Button("📷 Load Image to Annotation Area", variant="secondary") prompt_mode = gr.Dropdown( label="Select Prompt", # choices=["prompt_layout_all_en", "prompt_layout_only_en", "prompt_ocr", "prompt_grounding_ocr"], choices=["prompt_grounding_ocr"], value="prompt_grounding_ocr", show_label=True, info="If a box is drawn, 'prompt_grounding_ocr' mode will be used automatically." ) # Display the current prompt content prompt_display = gr.Textbox( label="Current Prompt Content", # value=dict_promptmode_to_prompt[list(dict_promptmode_to_prompt.keys())[0]], value=dict_promptmode_to_prompt["prompt_grounding_ocr"], lines=4, max_lines=8, interactive=False, show_copy_button=True ) gr.Markdown("### ⚙️ Actions") process_btn = gr.Button("🔍 Parse", variant="primary") clear_btn = gr.Button("🗑️ Clear", variant="secondary") gr.Markdown("### 🛠️ Configuration") fitz_preprocess = gr.Checkbox( label="Enable fitz_preprocess", value=False, info="Performs fitz preprocessing on the image input, converting the image to a PDF and then to a 200dpi image." ) with gr.Row(): server_ip = gr.Textbox( label="Server IP", value=DEFAULT_CONFIG['ip'] ) server_port = gr.Number( label="Port", value=DEFAULT_CONFIG['port_vllm'], precision=0 ) with gr.Row(): min_pixels = gr.Number( label="Min Pixels", value=DEFAULT_CONFIG['min_pixels'], precision=0 ) max_pixels = gr.Number( label="Max Pixels", value=DEFAULT_CONFIG['max_pixels'], precision=0 ) # Right side: Result Display with gr.Column(scale=6, variant="compact"): with gr.Row(): # Image Annotation Area with gr.Column(scale=3): gr.Markdown("### 🎯 Image Annotation Area") gr.Markdown(""" **Instructions:** - Method 1: Select an example image on the left and click "Load Image to Annotation Area". - Method 2: Upload an image directly in the annotation area below (drag and drop or click to upload). - Use the mouse to draw a box on the image to select the region for recognition. - Only one box can be drawn. To draw a new one, please delete the old one first. - **Hotkey: Press the Delete key to remove the selected box.** - After drawing a box, clicking Parse will automatically use the Region OCR mode. """) annotator = image_annotator( value=None, label="Image Annotation", height=600, show_label=False, elem_id="annotation_component", single_box=True, # Only allow one box; a new box will replace the old one box_min_size=10, interactive=True, disable_edit_boxes=True, # Disable the edit dialog label_list=["OCR Region"], # Set the default label label_colors=[(255, 0, 0)], # Set color to red use_default_label=True, # Use the default label image_type="pil" # Ensure it returns a PIL Image format ) # Information Display info_display = gr.Markdown( "Waiting for processing results...", elem_id="info_box" ) # Result Display Area with gr.Column(scale=3): gr.Markdown("### ✅ Results") with gr.Tabs(elem_id="markdown_tabs"): with gr.TabItem("Markdown Rendered View"): md_output = gr.Markdown( "## Please upload an image and click the Parse button for recognition...", label="Markdown Preview", max_height=1000, latex_delimiters=[ {"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False}, ], show_copy_button=False, elem_id="markdown_output" ) with gr.TabItem("Markdown Raw Text"): md_raw_output = gr.Textbox( value="🕐 Waiting for parsing results...", label="Markdown Raw Text", max_lines=100, lines=38, show_copy_button=True, elem_id="markdown_output", show_label=False ) with gr.TabItem("JSON Result"): json_output = gr.Textbox( value="🕐 Waiting for parsing results...", label="JSON Result", max_lines=100, lines=38, show_copy_button=True, elem_id="markdown_output", show_label=False ) # Download Button with gr.Row(): download_btn = gr.DownloadButton( "⬇️ Download Results", visible=False ) # Event Binding # When the prompt mode changes, update the displayed content prompt_mode.change( fn=update_prompt_display, inputs=prompt_mode, outputs=prompt_display, show_progress=False ) # Load image into the annotation component load_btn.click( fn=load_image_to_annotator, inputs=[test_image_input], outputs=annotator, show_progress=False ) # Process Inference process_btn.click( fn=process_image_inference_with_annotation, inputs=[ annotator, test_image_input, prompt_mode, server_ip, server_port, min_pixels, max_pixels, fitz_preprocess ], outputs=[ md_output, info_display, md_raw_output, md_raw_output, download_btn, json_output ], show_progress=True ) # Clear Data clear_btn.click( fn=clear_all_data, outputs=[ test_image_input, annotator, info_display, md_output, md_raw_output, download_btn, json_output ], show_progress=False ) return demo # ==================== Main Program ==================== if __name__ == "__main__": demo = create_gradio_interface() demo.queue().launch( server_name="0.0.0.0", server_port=7861, # Use a different port to avoid conflicts debug=True )