| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- import asyncio
- import os
- from core.router import ParserFactory
- from utils.logger import log
- from utils.stability import AsyncDispatcher
- async def test_single_file(file_path: str):
- """
- 测试单个文件的解析
-
- Args:
- file_path: 文件路径
- """
- log.info(f"\n{'='*80}")
- log.info(f"开始测试文件: {file_path}")
- log.info(f"{'='*80}")
-
- factory = ParserFactory()
- result = await factory.parse(file_path)
-
- # 输出解析结果的前500个字符
- log.info(f"\n解析结果预览:")
- preview_content = result.content[:500] + "..." if len(result.content) > 500 else result.content
- log.info(preview_content)
-
- return result
- import json
- def save_results(results: list):
- """
- 保存解析结果到文件
-
- Args:
- results: 解析结果列表
- """
- import json
- import os
-
- # 创建output目录
- output_dir = "./output"
- os.makedirs(output_dir, exist_ok=True)
-
- # 保存所有结果到一个JSON文件
- all_results = []
- for file_path, result in results:
- result_dict = {
- "file_path": file_path,
- "content": result.content,
- "metadata": result.metadata,
- "file_type": result.file_type,
- "tables": result.tables
- }
- all_results.append(result_dict)
-
- # 保存为JSON文件
- json_path = os.path.join(output_dir, "all_results.json")
- with open(json_path, "w", encoding="utf-8") as f:
- json.dump(all_results, f, ensure_ascii=False, indent=2)
-
- log.info(f"解析结果已保存到: {json_path}")
-
- # 为每个文件单独保存结果
- for file_path, result in results:
- file_name = os.path.basename(file_path)
- base_name = os.path.splitext(file_name)[0]
-
- # 保存为文本文件
- txt_path = os.path.join(output_dir, f"{base_name}_result.txt")
- with open(txt_path, "w", encoding="utf-8") as f:
- f.write(result.content)
-
- # 保存为JSON文件
- json_path = os.path.join(output_dir, f"{base_name}_result.json")
- result_dict = {
- "file_path": file_path,
- "content": result.content,
- "metadata": result.metadata,
- "file_type": result.file_type,
- "tables": result.tables
- }
- with open(json_path, "w", encoding="utf-8") as f:
- json.dump(result_dict, f, ensure_ascii=False, indent=2)
-
- log.info(f"文件 {file_name} 的解析结果已保存到:")
- log.info(f" 文本文件: {txt_path}")
- log.info(f" JSON文件: {json_path}")
- async def main():
- """
- 测试所有文件
- """
- # 获取examples文件夹中的所有文件
- examples_dir = "./examples"
- file_paths = []
-
- for file_name in os.listdir(examples_dir):
- file_path = os.path.join(examples_dir, file_name)
- if os.path.isfile(file_path):
- file_paths.append(file_path)
-
- log.info(f"找到 {len(file_paths)} 个文件需要测试")
-
- # 创建一个ParserFactory实例用于统计
- factory = ParserFactory()
-
- # 逐个测试文件并收集结果
- results = []
- for file_path in file_paths:
- result = await factory.parse(file_path)
- results.append((file_path, result))
-
- # 输出解析结果的前500个字符
- log.info(f"\n解析结果预览:")
- preview_content = result.content[:500] + "..." if len(result.content) > 500 else result.content
- log.info(preview_content)
-
- # 保存解析结果
- save_results(results)
-
- # 生成并显示性能报告
- report = factory.generate_performance_report()
- log.info(f"\n{'='*80}")
- log.info("总体任务情况")
- log.info(f"{'='*80}")
- log.info(report)
- log.info(f"{'='*80}")
- log.info("所有文件测试完成")
- log.info(f"{'='*80}")
- if __name__ == "__main__":
- asyncio.run(main())
|