main.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import asyncio
  2. import argparse
  3. from core.router import ParserFactory
  4. from utils.logger import log
  5. from utils.stability import AsyncDispatcher
  6. async def main():
  7. """主函数"""
  8. # 解析命令行参数
  9. parser = argparse.ArgumentParser(description="多模态文件解析服务")
  10. parser.add_argument("file_path", help="文件路径")
  11. parser.add_argument("--output", choices=["markdown", "json"], default="markdown",
  12. help="输出格式")
  13. args = parser.parse_args()
  14. log.info(f"开始解析文件: {args.file_path}")
  15. # 创建解析器工厂
  16. factory = ParserFactory()
  17. # 解析文件
  18. result = await factory.parse(args.file_path)
  19. # 输出结果
  20. if args.output == "json":
  21. print(result.to_json())
  22. else:
  23. print(result.content)
  24. # 生成并显示性能报告
  25. report = factory.generate_performance_report()
  26. print("\n" + "="*80)
  27. print(report)
  28. print("="*80)
  29. log.info(f"文件解析完成,输出格式: {args.output}")
  30. async def batch_parse():
  31. """批量解析示例"""
  32. file_paths = [
  33. # 这里可以添加多个文件路径
  34. ]
  35. factory = ParserFactory()
  36. dispatcher = AsyncDispatcher(max_concurrency=3)
  37. # 创建任务列表
  38. tasks = [factory.parse(file_path) for file_path in file_paths]
  39. # 并发执行任务
  40. results = await dispatcher.run(tasks)
  41. # 处理结果
  42. for file_path, result in zip(file_paths, results):
  43. if result:
  44. log.info(f"文件 {file_path} 解析成功")
  45. print(f"\n=== {file_path} ===")
  46. print(result.content)
  47. else:
  48. log.error(f"文件 {file_path} 解析失败")
  49. # 生成并显示性能报告
  50. report = factory.generate_performance_report()
  51. print("\n" + "="*80)
  52. print(report)
  53. print("="*80)
  54. if __name__ == "__main__":
  55. asyncio.run(main())