processors.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import List, Dict, Any
  15. from pathlib import Path
  16. from copy import deepcopy
  17. import joblib
  18. import numpy as np
  19. import pandas as pd
  20. from .funcs import load_from_dataframe, time_feature
  21. from ....utils.benchmark import benchmark
  22. __all__ = [
  23. "BuildTSDataset",
  24. "TSCutOff",
  25. "TSNormalize",
  26. "TimeFeature",
  27. "TStoArray",
  28. "TStoBatch",
  29. ]
  30. class TSCutOff:
  31. """Truncates time series data to a specified length for training.
  32. This class provides a method to truncate or cut off time series data
  33. to a specified input length, optionally skipping some initial data
  34. points. This is useful for preparing data for training models that
  35. require a fixed input size.
  36. """
  37. def __init__(self, size: Dict[str, int]):
  38. """Initializes the TSCutOff with size configurations.
  39. Args:
  40. size (Dict[str, int]): Dictionary containing size configurations,
  41. including 'in_chunk_len' for the input chunk length and
  42. optionally 'skip_chunk_len' for the number of initial data
  43. points to skip.
  44. """
  45. super().__init__()
  46. self.size = size
  47. @benchmark.timeit
  48. def __call__(self, ts_list: List) -> List:
  49. """Applies the cut off operation to a list of time series.
  50. Args:
  51. ts_list (List): List of time series data frames to be truncated.
  52. Returns:
  53. List: List of truncated time series data frames.
  54. """
  55. return [self.cutoff(ts) for ts in ts_list]
  56. def cutoff(self, ts: Any) -> Any:
  57. """Truncates a single time series data frame to the specified length.
  58. This method truncates the time series data to the specified input
  59. chunk length, optionally skipping some initial data points. It raises
  60. a ValueError if the time series is too short.
  61. Args:
  62. ts: A single time series data frame to be truncated.
  63. Returns:
  64. Any: The truncated time series data frame.
  65. Raises:
  66. ValueError: If the time series length is less than the required
  67. minimum length (input chunk length plus any skip chunk length).
  68. """
  69. skip_len = self.size.get("skip_chunk_len", 0)
  70. if len(ts) < self.size["in_chunk_len"] + skip_len:
  71. raise ValueError(
  72. f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
  73. )
  74. ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
  75. return ts_data
  76. class TSNormalize:
  77. """Normalizes time series data using a pre-fitted scaler.
  78. This class normalizes specified columns of time series data using a
  79. pre-fitted scaler loaded from a specified path. It supports normalization
  80. of both target and feature columns as specified in the parameters.
  81. """
  82. def __init__(self, scale_path: str, params_info: Dict[str, Any]):
  83. """Initializes the TSNormalize with a scaler and normalization parameters.
  84. Args:
  85. scale_path (str): Path to the pre-fitted scaler object file.
  86. params_info (Dict[str, Any]): Dictionary containing information
  87. about which columns to normalize, including 'target_cols'
  88. and 'feature_cols'.
  89. """
  90. super().__init__()
  91. self.scaler = joblib.load(scale_path)
  92. self.params_info = params_info
  93. @benchmark.timeit
  94. def __call__(self, ts_list: List[pd.DataFrame]) -> List[pd.DataFrame]:
  95. """Applies normalization to a list of time series data frames.
  96. Args:
  97. ts_list (List[pd.DataFrame]): List of time series data frames to be normalized.
  98. Returns:
  99. List[pd.DataFrame]: List of normalized time series data frames.
  100. """
  101. return [self.tsnorm(ts) for ts in ts_list]
  102. def tsnorm(self, ts: pd.DataFrame) -> pd.DataFrame:
  103. """Normalizes specified columns of a single time series data frame.
  104. This method applies the scaler to normalize the specified target
  105. and feature columns of the time series.
  106. Args:
  107. ts (pd.DataFrame): A single time series data frame to be normalized.
  108. Returns:
  109. pd.DataFrame: The normalized time series data frame.
  110. """
  111. if self.params_info.get("target_cols", None) is not None:
  112. ts[self.params_info["target_cols"]] = self.scaler.transform(
  113. ts[self.params_info["target_cols"]]
  114. )
  115. if self.params_info.get("feature_cols", None) is not None:
  116. ts[self.params_info["feature_cols"]] = self.scaler.transform(
  117. ts[self.params_info["feature_cols"]]
  118. )
  119. return ts
  120. class BuildTSDataset:
  121. """Constructs a time series dataset from a list of time series data frames."""
  122. def __init__(self, params_info: Dict[str, Any]):
  123. """Initializes the BuildTSDataset with parameters for dataset construction.
  124. Args:
  125. params_info (Dict[str, Any]): Dictionary containing parameters for
  126. constructing the time series dataset.
  127. """
  128. super().__init__()
  129. self.params_info = params_info
  130. @benchmark.timeit
  131. def __call__(self, ts_list: List) -> List:
  132. """Applies the dataset construction to a list of time series.
  133. Args:
  134. ts_list (List): List of time series data frames.
  135. Returns:
  136. List: List of constructed time series datasets.
  137. """
  138. return [self.buildtsdata(ts) for ts in ts_list]
  139. def buildtsdata(self, ts) -> Any:
  140. """Builds a time series dataset from a single time series data frame.
  141. Args:
  142. ts: A single time series data frame.
  143. Returns:
  144. Any: A constructed time series dataset.
  145. """
  146. ts_data = load_from_dataframe(ts, **self.params_info)
  147. return ts_data
  148. class TimeFeature:
  149. """Extracts time features from time series data for forecasting."""
  150. def __init__(
  151. self, params_info: Dict[str, Any], size: Dict[str, int], holiday: bool = False
  152. ):
  153. """Initializes the TimeFeature extractor.
  154. Args:
  155. params_info (Dict[str, Any]): Dictionary containing frequency information.
  156. size (Dict[str, int]): Dictionary containing the output chunk length.
  157. holiday (bool, optional): Whether to include holiday features. Defaults to False.
  158. """
  159. super().__init__()
  160. self.freq = params_info["freq"]
  161. self.size = size
  162. self.holiday = holiday
  163. @benchmark.timeit
  164. def __call__(self, ts_list: List) -> List:
  165. """Applies time feature extraction to a list of time series.
  166. Args:
  167. ts_list (List): List of time series data frames.
  168. Returns:
  169. List: List of time series with extracted time features.
  170. """
  171. return [self.timefeat(ts) for ts in ts_list]
  172. def timefeat(self, ts: Dict[str, Any]) -> Any:
  173. """Extracts time features from a single time series data frame.
  174. Args:
  175. ts: A single time series data frame.
  176. Returns:
  177. Any: The time series with added time features.
  178. """
  179. if not self.holiday:
  180. ts = time_feature(
  181. ts,
  182. self.freq,
  183. ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
  184. self.size["out_chunk_len"],
  185. )
  186. else:
  187. ts = time_feature(
  188. ts,
  189. self.freq,
  190. [
  191. "minuteofhour",
  192. "hourofday",
  193. "dayofmonth",
  194. "dayofweek",
  195. "dayofyear",
  196. "monthofyear",
  197. "weekofyear",
  198. "holidays",
  199. ],
  200. self.size["out_chunk_len"],
  201. )
  202. return ts
  203. class TStoArray:
  204. """Converts time series data into arrays for model input."""
  205. def __init__(self, input_data: Dict[str, Any]):
  206. """Initializes the TStoArray converter.
  207. Args:
  208. input_data (Dict[str, Any]): Dictionary specifying the input data format.
  209. """
  210. super().__init__()
  211. self.input_data = input_data
  212. @benchmark.timeit
  213. def __call__(self, ts_list: List[Dict[str, Any]]) -> List[List[np.ndarray]]:
  214. """Converts a list of time series data frames into arrays.
  215. Args:
  216. ts_list (List[Dict[str, Any]]): List of time series data frames.
  217. Returns:
  218. List[List[np.ndarray]]: List of lists of arrays for each time series.
  219. """
  220. return [self.tstoarray(ts) for ts in ts_list]
  221. def tstoarray(self, ts: Dict[str, Any]) -> List[np.ndarray]:
  222. """Converts a single time series data frame into arrays.
  223. Args:
  224. ts (Dict[str, Any]): A single time series data frame.
  225. Returns:
  226. List[np.ndarray]: List of arrays representing the time series data.
  227. """
  228. ts_list = []
  229. input_name = list(self.input_data.keys())
  230. input_name.sort()
  231. for key in input_name:
  232. ts_list.append(np.array(ts[key]).astype("float32"))
  233. return ts_list
  234. class TStoBatch:
  235. """Convert a list of time series into batches for processing.
  236. This class provides a method to convert a list of time series data into
  237. batches. Each time series in the list is assumed to be a sequence of
  238. equal-length arrays or DataFrames.
  239. """
  240. @benchmark.timeit
  241. def __call__(self, ts_list: List[np.ndarray]) -> List[np.ndarray]:
  242. """Convert a list of time series into batches.
  243. This method stacks time series data along a new axis to create batches.
  244. It assumes that each time series in the list has the same length.
  245. Args:
  246. ts_list (List[np.ndarray]): A list of time series, where each
  247. time series is represented as a list or array of equal length.
  248. Returns:
  249. List[np.ndarray]: A list of batches, where each batch is a stacked
  250. array of time series data at the same index across all series.
  251. """
  252. n = len(ts_list[0])
  253. return [np.stack([ts[i] for ts in ts_list], axis=0) for i in range(n)]