import asyncio import os from core.router import ParserFactory from utils.logger import log from utils.stability import AsyncDispatcher async def test_single_file(file_path: str): """ 测试单个文件的解析 Args: file_path: 文件路径 """ log.info(f"\n{'='*80}") log.info(f"开始测试文件: {file_path}") log.info(f"{'='*80}") factory = ParserFactory() result = await factory.parse(file_path) # 输出解析结果的前500个字符 log.info(f"\n解析结果预览:") preview_content = result.content[:500] + "..." if len(result.content) > 500 else result.content log.info(preview_content) return result import json def save_results(results: list): """ 保存解析结果到文件 Args: results: 解析结果列表 """ import json import os # 创建output目录 output_dir = "./output" os.makedirs(output_dir, exist_ok=True) # 保存所有结果到一个JSON文件 all_results = [] for file_path, result in results: result_dict = { "file_path": file_path, "content": result.content, "metadata": result.metadata, "file_type": result.file_type, "tables": result.tables } all_results.append(result_dict) # 保存为JSON文件 json_path = os.path.join(output_dir, "all_results.json") with open(json_path, "w", encoding="utf-8") as f: json.dump(all_results, f, ensure_ascii=False, indent=2) log.info(f"解析结果已保存到: {json_path}") # 为每个文件单独保存结果 for file_path, result in results: file_name = os.path.basename(file_path) base_name = os.path.splitext(file_name)[0] # 保存为文本文件 txt_path = os.path.join(output_dir, f"{base_name}_result.txt") with open(txt_path, "w", encoding="utf-8") as f: f.write(result.content) # 保存为JSON文件 json_path = os.path.join(output_dir, f"{base_name}_result.json") result_dict = { "file_path": file_path, "content": result.content, "metadata": result.metadata, "file_type": result.file_type, "tables": result.tables } with open(json_path, "w", encoding="utf-8") as f: json.dump(result_dict, f, ensure_ascii=False, indent=2) log.info(f"文件 {file_name} 的解析结果已保存到:") log.info(f" 文本文件: {txt_path}") log.info(f" JSON文件: {json_path}") async def main(): """ 测试所有文件 """ # 获取examples文件夹中的所有文件 examples_dir = "./examples" file_paths = [] for file_name in os.listdir(examples_dir): file_path = os.path.join(examples_dir, file_name) if os.path.isfile(file_path): file_paths.append(file_path) log.info(f"找到 {len(file_paths)} 个文件需要测试") # 创建一个ParserFactory实例用于统计 factory = ParserFactory() # 逐个测试文件并收集结果 results = [] for file_path in file_paths: result = await factory.parse(file_path) results.append((file_path, result)) # 输出解析结果的前500个字符 log.info(f"\n解析结果预览:") preview_content = result.content[:500] + "..." if len(result.content) > 500 else result.content log.info(preview_content) # 保存解析结果 save_results(results) # 生成并显示性能报告 report = factory.generate_performance_report() log.info(f"\n{'='*80}") log.info("总体任务情况") log.info(f"{'='*80}") log.info(report) log.info(f"{'='*80}") log.info("所有文件测试完成") log.info(f"{'='*80}") if __name__ == "__main__": asyncio.run(main())