test_parser.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import asyncio
  2. import os
  3. from core.router import ParserFactory
  4. from utils.logger import log
  5. from utils.stability import AsyncDispatcher
  6. async def test_single_file(file_path: str):
  7. """
  8. 测试单个文件的解析
  9. Args:
  10. file_path: 文件路径
  11. """
  12. log.info(f"\n{'='*80}")
  13. log.info(f"开始测试文件: {file_path}")
  14. log.info(f"{'='*80}")
  15. factory = ParserFactory()
  16. result = await factory.parse(file_path)
  17. # 输出解析结果的前500个字符
  18. log.info(f"\n解析结果预览:")
  19. preview_content = result.content[:500] + "..." if len(result.content) > 500 else result.content
  20. log.info(preview_content)
  21. return result
  22. import json
  23. def save_results(results: list):
  24. """
  25. 保存解析结果到文件
  26. Args:
  27. results: 解析结果列表
  28. """
  29. import json
  30. import os
  31. # 创建output目录
  32. output_dir = "./output"
  33. os.makedirs(output_dir, exist_ok=True)
  34. # 保存所有结果到一个JSON文件
  35. all_results = []
  36. for file_path, result in results:
  37. result_dict = {
  38. "file_path": file_path,
  39. "content": result.content,
  40. "metadata": result.metadata,
  41. "file_type": result.file_type,
  42. "tables": result.tables
  43. }
  44. all_results.append(result_dict)
  45. # 保存为JSON文件
  46. json_path = os.path.join(output_dir, "all_results.json")
  47. with open(json_path, "w", encoding="utf-8") as f:
  48. json.dump(all_results, f, ensure_ascii=False, indent=2)
  49. log.info(f"解析结果已保存到: {json_path}")
  50. # 为每个文件单独保存结果
  51. for file_path, result in results:
  52. file_name = os.path.basename(file_path)
  53. base_name = os.path.splitext(file_name)[0]
  54. # 保存为文本文件
  55. txt_path = os.path.join(output_dir, f"{base_name}_result.txt")
  56. with open(txt_path, "w", encoding="utf-8") as f:
  57. f.write(result.content)
  58. # 保存为JSON文件
  59. json_path = os.path.join(output_dir, f"{base_name}_result.json")
  60. result_dict = {
  61. "file_path": file_path,
  62. "content": result.content,
  63. "metadata": result.metadata,
  64. "file_type": result.file_type,
  65. "tables": result.tables
  66. }
  67. with open(json_path, "w", encoding="utf-8") as f:
  68. json.dump(result_dict, f, ensure_ascii=False, indent=2)
  69. log.info(f"文件 {file_name} 的解析结果已保存到:")
  70. log.info(f" 文本文件: {txt_path}")
  71. log.info(f" JSON文件: {json_path}")
  72. async def main():
  73. """
  74. 测试所有文件
  75. """
  76. # 获取examples文件夹中的所有文件
  77. examples_dir = "./examples"
  78. file_paths = []
  79. for file_name in os.listdir(examples_dir):
  80. file_path = os.path.join(examples_dir, file_name)
  81. if os.path.isfile(file_path):
  82. file_paths.append(file_path)
  83. log.info(f"找到 {len(file_paths)} 个文件需要测试")
  84. # 创建一个ParserFactory实例用于统计
  85. factory = ParserFactory()
  86. # 逐个测试文件并收集结果
  87. results = []
  88. for file_path in file_paths:
  89. result = await factory.parse(file_path)
  90. results.append((file_path, result))
  91. # 输出解析结果的前500个字符
  92. log.info(f"\n解析结果预览:")
  93. preview_content = result.content[:500] + "..." if len(result.content) > 500 else result.content
  94. log.info(preview_content)
  95. # 保存解析结果
  96. save_results(results)
  97. # 生成并显示性能报告
  98. report = factory.generate_performance_report()
  99. log.info(f"\n{'='*80}")
  100. log.info("总体任务情况")
  101. log.info(f"{'='*80}")
  102. log.info(report)
  103. log.info(f"{'='*80}")
  104. log.info("所有文件测试完成")
  105. log.info(f"{'='*80}")
  106. if __name__ == "__main__":
  107. asyncio.run(main())