ts_batch_sampler.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import os
  15. from pathlib import Path
  16. import pandas as pd
  17. from ....utils import logging
  18. from ....utils.cache import CACHE_DIR
  19. from ....utils.download import download
  20. from .base_batch_sampler import BaseBatchSampler, Batch
  21. class TSBatchSampler(BaseBatchSampler):
  22. """Batch sampler for time series data, supporting CSV file inputs."""
  23. SUFFIX = ["csv"]
  24. def _download_from_url(self, in_path: str) -> str:
  25. """Download a file from a URL to a cache directory.
  26. Args:
  27. in_path (str): URL of the file to be downloaded.
  28. Returns:
  29. str: Path to the downloaded file.
  30. """
  31. file_name = Path(in_path).name
  32. save_path = Path(CACHE_DIR) / "predict_input" / file_name
  33. download(in_path, save_path, overwrite=True)
  34. return save_path.as_posix()
  35. def _get_files_list(self, fp: str) -> list:
  36. """Get a list of CSV files from a directory or a single file path.
  37. Args:
  38. fp (str): Path to a directory or a single CSV file.
  39. Returns:
  40. list: Sorted list of CSV file paths.
  41. Raises:
  42. Exception: If no CSV file is found in the path.
  43. """
  44. file_list = []
  45. if fp is None or not os.path.exists(fp):
  46. raise Exception(f"Not found any csv file in path: {fp}")
  47. if os.path.isfile(fp) and fp.split(".")[-1] in self.SUFFIX:
  48. file_list.append(fp)
  49. elif os.path.isdir(fp):
  50. for root, dirs, files in os.walk(fp):
  51. for single_file in files:
  52. if single_file.split(".")[-1] in self.SUFFIX:
  53. file_list.append(os.path.join(root, single_file))
  54. if len(file_list) == 0:
  55. raise Exception("Not found any file in {}".format(fp))
  56. file_list = sorted(file_list)
  57. return file_list
  58. def sample(self, inputs: list) -> list:
  59. """Generate batches of data from inputs, which can be DataFrames or file paths.
  60. Args:
  61. inputs (list): List of DataFrames or file paths.
  62. Yields:
  63. list: A batch of data which is either DataFrames or file paths.
  64. """
  65. if not isinstance(inputs, list):
  66. inputs = [inputs]
  67. batch = Batch()
  68. for input in inputs:
  69. if isinstance(input, pd.DataFrame):
  70. batch.append(input, None)
  71. if len(batch) == self.batch_size:
  72. yield batch
  73. batch = Batch()
  74. elif isinstance(input, str):
  75. file_path = (
  76. self._download_from_url(input)
  77. if input.startswith("http")
  78. else input
  79. )
  80. file_list = self._get_files_list(file_path)
  81. for file_path in file_list:
  82. batch.append(file_path, file_path)
  83. if len(batch) == self.batch_size:
  84. yield batch
  85. batch = Batch()
  86. else:
  87. logging.warning(
  88. f"Not supported input data type! Only `pd.DataFrame` and `str` are supported! So has been ignored: {input}."
  89. )
  90. if len(batch) > 0:
  91. yield batch