import asyncio import argparse from core.router import ParserFactory from utils.logger import log from utils.stability import AsyncDispatcher async def main(): """主函数""" # 解析命令行参数 parser = argparse.ArgumentParser(description="多模态文件解析服务") parser.add_argument("file_path", help="文件路径") parser.add_argument("--output", choices=["markdown", "json"], default="markdown", help="输出格式") args = parser.parse_args() log.info(f"开始解析文件: {args.file_path}") # 创建解析器工厂 factory = ParserFactory() # 解析文件 result = await factory.parse(args.file_path) # 输出结果 if args.output == "json": print(result.to_json()) else: print(result.content) # 生成并显示性能报告 report = factory.generate_performance_report() print("\n" + "="*80) print(report) print("="*80) log.info(f"文件解析完成,输出格式: {args.output}") async def batch_parse(): """批量解析示例""" file_paths = [ # 这里可以添加多个文件路径 ] factory = ParserFactory() dispatcher = AsyncDispatcher(max_concurrency=3) # 创建任务列表 tasks = [factory.parse(file_path) for file_path in file_paths] # 并发执行任务 results = await dispatcher.run(tasks) # 处理结果 for file_path, result in zip(file_paths, results): if result: log.info(f"文件 {file_path} 解析成功") print(f"\n=== {file_path} ===") print(result.content) else: log.error(f"文件 {file_path} 解析失败") # 生成并显示性能报告 report = factory.generate_performance_report() print("\n" + "="*80) print(report) print("="*80) if __name__ == "__main__": asyncio.run(main())