| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import asyncio
- import argparse
- from core.router import ParserFactory
- from utils.logger import log
- from utils.stability import AsyncDispatcher
- async def main():
- """主函数"""
- # 解析命令行参数
- parser = argparse.ArgumentParser(description="多模态文件解析服务")
- parser.add_argument("file_path", help="文件路径")
- parser.add_argument("--output", choices=["markdown", "json"], default="markdown",
- help="输出格式")
- args = parser.parse_args()
-
- log.info(f"开始解析文件: {args.file_path}")
-
- # 创建解析器工厂
- factory = ParserFactory()
-
- # 解析文件
- result = await factory.parse(args.file_path)
-
- # 输出结果
- if args.output == "json":
- print(result.to_json())
- else:
- print(result.content)
-
- # 生成并显示性能报告
- report = factory.generate_performance_report()
- print("\n" + "="*80)
- print(report)
- print("="*80)
-
- log.info(f"文件解析完成,输出格式: {args.output}")
- async def batch_parse():
- """批量解析示例"""
- file_paths = [
- # 这里可以添加多个文件路径
- ]
-
- factory = ParserFactory()
- dispatcher = AsyncDispatcher(max_concurrency=3)
-
- # 创建任务列表
- tasks = [factory.parse(file_path) for file_path in file_paths]
-
- # 并发执行任务
- results = await dispatcher.run(tasks)
-
- # 处理结果
- for file_path, result in zip(file_paths, results):
- if result:
- log.info(f"文件 {file_path} 解析成功")
- print(f"\n=== {file_path} ===")
- print(result.content)
- else:
- log.error(f"文件 {file_path} 解析失败")
-
- # 生成并显示性能报告
- report = factory.generate_performance_report()
- print("\n" + "="*80)
- print(report)
- print("="*80)
- if __name__ == "__main__":
- asyncio.run(main())
|