Преглед на файлове

add updated example project based on 2.0

ca1yz преди 5 месеца
родител
ревизия
dbfd392f05
променени са 4 файла, в които са добавени 330 реда и са изтрити 0 реда
  1. 81 0
      projects/multi_gpu_v2/README.md
  2. 59 0
      projects/multi_gpu_v2/_config_endpoint.py
  3. 82 0
      projects/multi_gpu_v2/client.py
  4. 108 0
      projects/multi_gpu_v2/server.py

+ 81 - 0
projects/multi_gpu_v2/README.md

@@ -0,0 +1,81 @@
+# MinerU v2.0 Multi-GPU Server
+
+## Quick Start
+
+### 1. install MinerU
+
+```bash
+pip install --upgrade pip
+pip install uv
+uv pip install -U "mineru[core]"
+uv pip install litserve aiohttp loguru
+```
+
+### 2. Start the Server
+```bash
+python server.py
+```
+
+### 3. Start the Client
+```bash
+python client.py
+```
+
+Now, pdf files under folder [demo](../../demo/) will be processed in parallel. Assuming you have 2 gpus, if you change the `workers_per_device` to `2`, 4 pdf files will be processed at the same time!
+
+## Customize
+
+### Server 
+
+Example showing how to start the server with custom settings:
+```python
+server = ls.LitServer(
+    MinerUAPI(output_dir='/tmp/mineru_output'),
+    accelerator='auto',  # You can specify 'cuda'
+    devices='auto',  # "auto" uses all available GPUs
+    workers_per_device=1,  # One worker instance per GPU
+    timeout=False  # Disable timeout for long processing
+)
+server.run(port=8000, generate_client_file=False)
+```
+
+### Client 
+
+The client supports both synchronous and asynchronous processing:
+
+```python
+import asyncio
+import aiohttp
+from client import mineru_parse_async
+
+async def process_documents():
+    async with aiohttp.ClientSession() as session:
+        # Basic usage
+        result = await mineru_parse_async(session, 'document.pdf')
+        
+        # With custom options
+        result = await mineru_parse_async(
+            session, 
+            'document.pdf',
+            backend='pipeline',
+            lang='ch',
+            formula_enable=True,
+            table_enable=True
+        )
+
+# Run async processing
+asyncio.run(process_documents())
+```
+
+### Concurrent Processing
+Process multiple files simultaneously:
+```python
+async def process_multiple_files():
+    files = ['doc1.pdf', 'doc2.pdf', 'doc3.pdf']
+    
+    async with aiohttp.ClientSession() as session:
+        tasks = [mineru_parse_async(session, file) for file in files]
+        results = await asyncio.gather(*tasks)
+    
+    return results
+```

+ 59 - 0
projects/multi_gpu_v2/_config_endpoint.py

@@ -0,0 +1,59 @@
+import requests
+import os
+import logging
+
+logging.basicConfig(level=logging.INFO)
+
+# test connection to huggingface
+TIMEOUT = 3
+
+def config_endpoint():
+    """
+    Checks for connectivity to Hugging Face and sets the model source accordingly.
+    If the Hugging Face endpoint is reachable, it sets MINERU_MODEL_SOURCE to 'huggingface'.
+    Otherwise, it falls back to 'modelscope'.
+    """
+
+    os.environ.setdefault('MINERU_MODEL_SOURCE', 'huggingface')
+    model_list_url = f'https://huggingface.co/models'
+    
+    # Use a specific check for the Hugging Face source
+    if os.environ['MINERU_MODEL_SOURCE'] == 'huggingface':
+        try:
+            response = requests.head(model_list_url, timeout=TIMEOUT)
+            
+            # Check for any successful status code (2xx)
+            if response.ok:
+                logging.info(f"Successfully connected to Hugging Face. Using 'huggingface' as model source.")
+                return True
+            else:
+                logging.warning(f"Hugging Face endpoint returned a non-200 status code: {response.status_code}")
+
+        except requests.exceptions.RequestException as e:
+            logging.error(f"Failed to connect to Hugging Face at {model_list_url}: {e}")
+
+        # If any of the above checks fail, switch to modelscope
+        logging.info("Falling back to 'modelscope' as model source.")
+        os.environ['MINERU_MODEL_SOURCE'] = 'modelscope'
+    
+    elif os.environ['MINERU_MODEL_SOURCE'] == 'modelscope':
+        try:
+            response = requests.head(model_list_url, timeout=TIMEOUT)
+            if response.ok:
+                logging.info(f"Successfully connected to ModelScope. Using 'modelscope' as model source.")
+                return True
+        except requests.exceptions.RequestException as e:
+            logging.error(f"Failed to connect to ModelScope at {model_list_url}: {e}")
+        
+    elif os.environ['MINERU_MODEL_SOURCE'] == 'local':
+        logging.info("Using 'local' as model source.")
+        return True
+    
+    else:
+        logging.error(f"Using custom model source: {os.environ['MINERU_MODEL_SOURCE']}")
+        return True
+    
+    return False
+
+if __name__ == '__main__':
+    print(config_endpoint())

+ 82 - 0
projects/multi_gpu_v2/client.py

@@ -0,0 +1,82 @@
+import base64
+import requests
+import os
+from loguru import logger
+import asyncio
+import aiohttp
+
+async def mineru_parse_async(session, file_path, server_url='http://127.0.0.1:8000/predict', **options):
+    """
+    Asynchronous version of the parse function.
+    """
+    try:
+        # Asynchronously read and encode the file
+        with open(file_path, 'rb') as f:
+            file_b64 = base64.b64encode(f.read()).decode('utf-8')
+
+        payload = {
+            'file': file_b64,
+            'options': options
+        }
+
+        # Use the aiohttp session to send the request
+        async with session.post(server_url, json=payload) as response:
+            if response.status == 200:
+                result = await response.json()
+                logger.info(f"✅ Processed: {file_path} -> {result.get('output_dir', 'N/A')}")
+                return result
+            else:
+                error_text = await response.text()
+                logger.error(f"❌ Server error for {file_path}: {error_text}")
+                return {'error': error_text}
+
+    except Exception as e:
+        logger.error(f"❌ Failed to process {file_path}: {e}")
+        return {'error': str(e)}
+
+
+async def main():
+    """
+    Main function to run all parsing tasks concurrently.
+    """
+    test_files = [
+        '../../demo/pdfs/demo1.pdf',
+        '../../demo/pdfs/demo2.pdf',
+        '../../demo/pdfs/demo3.pdf',
+        '../../demo/pdfs/small_ocr.pdf',
+    ]
+
+    test_files = [os.path.join(os.path.dirname(__file__), f) for f in test_files]
+    
+    existing_files = [f for f in test_files if os.path.exists(f)]
+    if not existing_files:
+        logger.warning("No test files found.")
+        return
+
+    # Create an aiohttp session to be reused across requests
+    async with aiohttp.ClientSession() as session:
+        # === Basic Processing ===
+        basic_tasks = [mineru_parse_async(session, file_path) for file_path in existing_files[:2]]
+
+        # === Custom Options ===
+        custom_options = {
+            'backend': 'pipeline', 'lang': 'ch', 'method': 'auto',
+            'formula_enable': True, 'table_enable': True
+        }
+        # 'backend': 'sglang-engine' requires 24+ GB VRAM per worker
+
+        custom_tasks = [mineru_parse_async(session, file_path, **custom_options) for file_path in existing_files[2:]]
+
+        # Start all tasks
+        all_tasks = basic_tasks + custom_tasks
+
+        all_results = await asyncio.gather(*all_tasks)
+
+        logger.info(f"All Results: {all_results}")
+
+        
+    logger.info("🎉 All processing completed!")
+
+if __name__ == '__main__':
+    # Run the async main function
+    asyncio.run(main())

+ 108 - 0
projects/multi_gpu_v2/server.py

@@ -0,0 +1,108 @@
+import os
+import base64
+import tempfile
+from pathlib import Path
+import litserve as ls
+from fastapi import HTTPException
+from loguru import logger
+
+from mineru.cli.common import do_parse, read_fn
+from mineru.utils.config_reader import get_device
+from mineru.utils.model_utils import get_vram
+from _config_endpoint import config_endpoint
+
+class MinerUAPI(ls.LitAPI):
+    def __init__(self, output_dir='/tmp'):
+        super().__init__()
+        self.output_dir = output_dir
+
+    def setup(self, device):
+        """Setup environment variables exactly like MinerU CLI does"""
+        logger.info(f"Setting up on device: {device}")
+                
+        if os.getenv('MINERU_DEVICE_MODE', None) == None:
+            os.environ['MINERU_DEVICE_MODE'] = device if device != 'auto' else get_device()
+
+        device_mode = os.environ['MINERU_DEVICE_MODE']
+        if os.getenv('MINERU_VIRTUAL_VRAM_SIZE', None) == None:
+            if device_mode.startswith("cuda") or device_mode.startswith("npu"):
+                vram = round(get_vram(device_mode))
+                os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = str(vram)
+            else:
+                os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = '1'
+        logger.info(f"MINERU_VIRTUAL_VRAM_SIZE: {os.environ['MINERU_VIRTUAL_VRAM_SIZE']}")
+
+        if os.getenv('MINERU_MODEL_SOURCE', None) in ['huggingface', None]:
+            config_endpoint()
+        logger.info(f"MINERU_MODEL_SOURCE: {os.environ['MINERU_MODEL_SOURCE']}")
+
+
+    def decode_request(self, request):
+        """Decode file and options from request"""
+        file_b64 = request['file']
+        options = request.get('options', {})
+        
+        file_bytes = base64.b64decode(file_b64)
+        temp_file = Path(tempfile.mktemp(suffix='.pdf'))
+        temp_file.write_bytes(file_bytes)
+        
+        return {
+            'input_path': str(temp_file),
+            'backend': options.get('backend', 'pipeline'),
+            'method': options.get('method', 'auto'),
+            'lang': options.get('lang', 'ch'),
+            'formula_enable': options.get('formula_enable', True),
+            'table_enable': options.get('table_enable', True),
+            'start_page_id': options.get('start_page_id', 0),
+            'end_page_id': options.get('end_page_id', None),
+            'server_url': options.get('server_url', None),
+        }
+
+    def predict(self, inputs):
+        """Call MinerU's do_parse - same as CLI"""
+        input_path = inputs['input_path']
+        output_dir = Path(self.output_dir) / Path(input_path).stem
+        
+        try:
+            os.makedirs(output_dir, exist_ok=True)
+            
+            file_name = Path(input_path).stem
+            pdf_bytes = read_fn(Path(input_path))
+            
+            do_parse(
+                output_dir=str(output_dir),
+                pdf_file_names=[file_name],
+                pdf_bytes_list=[pdf_bytes],
+                p_lang_list=[inputs['lang']],
+                backend=inputs['backend'],
+                parse_method=inputs['method'],
+                p_formula_enable=inputs['formula_enable'],
+                p_table_enable=inputs['table_enable'],
+                server_url=inputs['server_url'],
+                start_page_id=inputs['start_page_id'],
+                end_page_id=inputs['end_page_id']
+            )
+            
+            return str(output_dir)
+            
+        except Exception as e:
+            logger.error(f"Processing failed: {e}")
+            raise HTTPException(status_code=500, detail=str(e))
+        finally:
+            # Cleanup temp file
+            if Path(input_path).exists():
+                Path(input_path).unlink()
+
+    def encode_response(self, response):
+        return {'output_dir': response}
+
+if __name__ == '__main__':
+    server = ls.LitServer(
+        MinerUAPI(output_dir='/tmp/mineru_output'),
+        accelerator='auto',
+        devices='auto',
+        workers_per_device=1,
+        timeout=False
+    )
+    logger.info("Starting MinerU server on port 8000")
+    server.run(port=8000, generate_client_file=False)