processors.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. # Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Any, Dict, List
  15. import numpy as np
  16. import pandas as pd
  17. from .....utils.deps import class_requires_deps, is_dep_available
  18. from ....utils.benchmark import benchmark
  19. from .funcs import load_from_dataframe, time_feature
  20. if is_dep_available("joblib"):
  21. import joblib
  22. __all__ = [
  23. "BuildTSDataset",
  24. "TSCutOff",
  25. "TSNormalize",
  26. "TimeFeature",
  27. "TStoArray",
  28. "TStoBatch",
  29. ]
  30. @benchmark.timeit
  31. class TSCutOff:
  32. """Truncates time series data to a specified length for training.
  33. This class provides a method to truncate or cut off time series data
  34. to a specified input length, optionally skipping some initial data
  35. points. This is useful for preparing data for training models that
  36. require a fixed input size.
  37. """
  38. def __init__(self, size: Dict[str, int]):
  39. """Initializes the TSCutOff with size configurations.
  40. Args:
  41. size (Dict[str, int]): Dictionary containing size configurations,
  42. including 'in_chunk_len' for the input chunk length and
  43. optionally 'skip_chunk_len' for the number of initial data
  44. points to skip.
  45. """
  46. super().__init__()
  47. self.size = size
  48. def __call__(self, ts_list: List) -> List:
  49. """Applies the cut off operation to a list of time series.
  50. Args:
  51. ts_list (List): List of time series data frames to be truncated.
  52. Returns:
  53. List: List of truncated time series data frames.
  54. """
  55. return [self.cutoff(ts) for ts in ts_list]
  56. def cutoff(self, ts: Any) -> Any:
  57. """Truncates a single time series data frame to the specified length.
  58. This method truncates the time series data to the specified input
  59. chunk length, optionally skipping some initial data points. It raises
  60. a ValueError if the time series is too short.
  61. Args:
  62. ts: A single time series data frame to be truncated.
  63. Returns:
  64. Any: The truncated time series data frame.
  65. Raises:
  66. ValueError: If the time series length is less than the required
  67. minimum length (input chunk length plus any skip chunk length).
  68. """
  69. skip_len = self.size.get("skip_chunk_len", 0)
  70. if len(ts) < self.size["in_chunk_len"] + skip_len:
  71. raise ValueError(
  72. f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
  73. )
  74. ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
  75. return ts_data
  76. @benchmark.timeit
  77. @class_requires_deps("joblib")
  78. class TSNormalize:
  79. """Normalizes time series data using a pre-fitted scaler.
  80. This class normalizes specified columns of time series data using a
  81. pre-fitted scaler loaded from a specified path. It supports normalization
  82. of both target and feature columns as specified in the parameters.
  83. """
  84. def __init__(self, scale_path: str, params_info: Dict[str, Any]):
  85. """Initializes the TSNormalize with a scaler and normalization parameters.
  86. Args:
  87. scale_path (str): Path to the pre-fitted scaler object file.
  88. params_info (Dict[str, Any]): Dictionary containing information
  89. about which columns to normalize, including 'target_cols'
  90. and 'feature_cols'.
  91. """
  92. super().__init__()
  93. self.scaler = joblib.load(scale_path)
  94. self.params_info = params_info
  95. def __call__(self, ts_list: List[pd.DataFrame]) -> List[pd.DataFrame]:
  96. """Applies normalization to a list of time series data frames.
  97. Args:
  98. ts_list (List[pd.DataFrame]): List of time series data frames to be normalized.
  99. Returns:
  100. List[pd.DataFrame]: List of normalized time series data frames.
  101. """
  102. return [self.tsnorm(ts) for ts in ts_list]
  103. def tsnorm(self, ts: pd.DataFrame) -> pd.DataFrame:
  104. """Normalizes specified columns of a single time series data frame.
  105. This method applies the scaler to normalize the specified target
  106. and feature columns of the time series.
  107. Args:
  108. ts (pd.DataFrame): A single time series data frame to be normalized.
  109. Returns:
  110. pd.DataFrame: The normalized time series data frame.
  111. """
  112. if self.params_info.get("target_cols", None) is not None:
  113. ts[self.params_info["target_cols"]] = self.scaler.transform(
  114. ts[self.params_info["target_cols"]]
  115. )
  116. if self.params_info.get("feature_cols", None) is not None:
  117. ts[self.params_info["feature_cols"]] = self.scaler.transform(
  118. ts[self.params_info["feature_cols"]]
  119. )
  120. return ts
  121. @benchmark.timeit
  122. class BuildTSDataset:
  123. """Constructs a time series dataset from a list of time series data frames."""
  124. def __init__(self, params_info: Dict[str, Any]):
  125. """Initializes the BuildTSDataset with parameters for dataset construction.
  126. Args:
  127. params_info (Dict[str, Any]): Dictionary containing parameters for
  128. constructing the time series dataset.
  129. """
  130. super().__init__()
  131. self.params_info = params_info
  132. def __call__(self, ts_list: List) -> List:
  133. """Applies the dataset construction to a list of time series.
  134. Args:
  135. ts_list (List): List of time series data frames.
  136. Returns:
  137. List: List of constructed time series datasets.
  138. """
  139. return [self.buildtsdata(ts) for ts in ts_list]
  140. def buildtsdata(self, ts) -> Any:
  141. """Builds a time series dataset from a single time series data frame.
  142. Args:
  143. ts: A single time series data frame.
  144. Returns:
  145. Any: A constructed time series dataset.
  146. """
  147. ts_data = load_from_dataframe(ts, **self.params_info)
  148. return ts_data
  149. @benchmark.timeit
  150. class TimeFeature:
  151. """Extracts time features from time series data for forecasting."""
  152. def __init__(
  153. self, params_info: Dict[str, Any], size: Dict[str, int], holiday: bool = False
  154. ):
  155. """Initializes the TimeFeature extractor.
  156. Args:
  157. params_info (Dict[str, Any]): Dictionary containing frequency information.
  158. size (Dict[str, int]): Dictionary containing the output chunk length.
  159. holiday (bool, optional): Whether to include holiday features. Defaults to False.
  160. """
  161. super().__init__()
  162. self.freq = params_info["freq"]
  163. self.size = size
  164. self.holiday = holiday
  165. def __call__(self, ts_list: List) -> List:
  166. """Applies time feature extraction to a list of time series.
  167. Args:
  168. ts_list (List): List of time series data frames.
  169. Returns:
  170. List: List of time series with extracted time features.
  171. """
  172. return [self.timefeat(ts) for ts in ts_list]
  173. def timefeat(self, ts: Dict[str, Any]) -> Any:
  174. """Extracts time features from a single time series data frame.
  175. Args:
  176. ts: A single time series data frame.
  177. Returns:
  178. Any: The time series with added time features.
  179. """
  180. if not self.holiday:
  181. ts = time_feature(
  182. ts,
  183. self.freq,
  184. ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
  185. self.size["out_chunk_len"],
  186. )
  187. else:
  188. ts = time_feature(
  189. ts,
  190. self.freq,
  191. [
  192. "minuteofhour",
  193. "hourofday",
  194. "dayofmonth",
  195. "dayofweek",
  196. "dayofyear",
  197. "monthofyear",
  198. "weekofyear",
  199. "holidays",
  200. ],
  201. self.size["out_chunk_len"],
  202. )
  203. return ts
  204. @benchmark.timeit
  205. class TStoArray:
  206. """Converts time series data into arrays for model input."""
  207. def __init__(self, input_data: Dict[str, Any]):
  208. """Initializes the TStoArray converter.
  209. Args:
  210. input_data (Dict[str, Any]): Dictionary specifying the input data format.
  211. """
  212. super().__init__()
  213. self.input_data = input_data
  214. def __call__(self, ts_list: List[Dict[str, Any]]) -> List[List[np.ndarray]]:
  215. """Converts a list of time series data frames into arrays.
  216. Args:
  217. ts_list (List[Dict[str, Any]]): List of time series data frames.
  218. Returns:
  219. List[List[np.ndarray]]: List of lists of arrays for each time series.
  220. """
  221. return [self.tstoarray(ts) for ts in ts_list]
  222. def tstoarray(self, ts: Dict[str, Any]) -> List[np.ndarray]:
  223. """Converts a single time series data frame into arrays.
  224. Args:
  225. ts (Dict[str, Any]): A single time series data frame.
  226. Returns:
  227. List[np.ndarray]: List of arrays representing the time series data.
  228. """
  229. ts_list = []
  230. input_name = list(self.input_data.keys())
  231. input_name.sort()
  232. for key in input_name:
  233. ts_list.append(np.array(ts[key]).astype("float32"))
  234. return ts_list
  235. @benchmark.timeit
  236. class TStoBatch:
  237. """Convert a list of time series into batches for processing.
  238. This class provides a method to convert a list of time series data into
  239. batches. Each time series in the list is assumed to be a sequence of
  240. equal-length arrays or DataFrames.
  241. """
  242. def __call__(self, ts_list: List[np.ndarray]) -> List[np.ndarray]:
  243. """Convert a list of time series into batches.
  244. This method stacks time series data along a new axis to create batches.
  245. It assumes that each time series in the list has the same length.
  246. Args:
  247. ts_list (List[np.ndarray]): A list of time series, where each
  248. time series is represented as a list or array of equal length.
  249. Returns:
  250. List[np.ndarray]: A list of batches, where each batch is a stacked
  251. array of time series data at the same index across all series.
  252. """
  253. n = len(ts_list[0])
  254. return [np.stack([ts[i] for ts in ts_list], axis=0) for i in range(n)]