client.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import base64
  2. import requests
  3. import os
  4. from loguru import logger
  5. import asyncio
  6. import aiohttp
  7. async def mineru_parse_async(session, file_path, server_url='http://127.0.0.1:8000/predict', **options):
  8. """
  9. Asynchronous version of the parse function.
  10. """
  11. try:
  12. # Asynchronously read and encode the file
  13. with open(file_path, 'rb') as f:
  14. file_b64 = base64.b64encode(f.read()).decode('utf-8')
  15. payload = {
  16. 'file': file_b64,
  17. 'options': options
  18. }
  19. # Use the aiohttp session to send the request
  20. async with session.post(server_url, json=payload) as response:
  21. if response.status == 200:
  22. result = await response.json()
  23. logger.info(f"✅ Processed: {file_path} -> {result.get('output_dir', 'N/A')}")
  24. return result
  25. else:
  26. error_text = await response.text()
  27. logger.error(f"❌ Server error for {file_path}: {error_text}")
  28. return {'error': error_text}
  29. except Exception as e:
  30. logger.error(f"❌ Failed to process {file_path}: {e}")
  31. return {'error': str(e)}
  32. async def main():
  33. """
  34. Main function to run all parsing tasks concurrently.
  35. """
  36. test_files = [
  37. '../../demo/pdfs/demo1.pdf',
  38. '../../demo/pdfs/demo2.pdf',
  39. '../../demo/pdfs/demo3.pdf',
  40. '../../demo/pdfs/small_ocr.pdf',
  41. ]
  42. test_files = [os.path.join(os.path.dirname(__file__), f) for f in test_files]
  43. existing_files = [f for f in test_files if os.path.exists(f)]
  44. if not existing_files:
  45. logger.warning("No test files found.")
  46. return
  47. # Create an aiohttp session to be reused across requests
  48. async with aiohttp.ClientSession() as session:
  49. # === Basic Processing ===
  50. basic_tasks = [mineru_parse_async(session, file_path) for file_path in existing_files[:2]]
  51. # === Custom Options ===
  52. custom_options = {
  53. 'backend': 'pipeline', 'lang': 'ch', 'method': 'auto',
  54. 'formula_enable': True, 'table_enable': True
  55. }
  56. # 'backend': 'sglang-engine' requires 24+ GB VRAM per worker
  57. custom_tasks = [mineru_parse_async(session, file_path, **custom_options) for file_path in existing_files[2:]]
  58. # Start all tasks
  59. all_tasks = basic_tasks + custom_tasks
  60. all_results = await asyncio.gather(*all_tasks)
  61. logger.info(f"All Results: {all_results}")
  62. logger.info("🎉 All processing completed!")
  63. if __name__ == '__main__':
  64. # Run the async main function
  65. asyncio.run(main())