processors.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import List, Dict, Any
  15. from pathlib import Path
  16. from copy import deepcopy
  17. import joblib
  18. import numpy as np
  19. import pandas as pd
  20. from .funcs import load_from_dataframe, time_feature
  21. from ....utils.benchmark import benchmark
  22. __all__ = [
  23. "BuildTSDataset",
  24. "TSCutOff",
  25. "TSNormalize",
  26. "TimeFeature",
  27. "TStoArray",
  28. "TStoBatch",
  29. ]
  30. @benchmark.timeit
  31. class TSCutOff:
  32. """Truncates time series data to a specified length for training.
  33. This class provides a method to truncate or cut off time series data
  34. to a specified input length, optionally skipping some initial data
  35. points. This is useful for preparing data for training models that
  36. require a fixed input size.
  37. """
  38. def __init__(self, size: Dict[str, int]):
  39. """Initializes the TSCutOff with size configurations.
  40. Args:
  41. size (Dict[str, int]): Dictionary containing size configurations,
  42. including 'in_chunk_len' for the input chunk length and
  43. optionally 'skip_chunk_len' for the number of initial data
  44. points to skip.
  45. """
  46. super().__init__()
  47. self.size = size
  48. def __call__(self, ts_list: List) -> List:
  49. """Applies the cut off operation to a list of time series.
  50. Args:
  51. ts_list (List): List of time series data frames to be truncated.
  52. Returns:
  53. List: List of truncated time series data frames.
  54. """
  55. return [self.cutoff(ts) for ts in ts_list]
  56. def cutoff(self, ts: Any) -> Any:
  57. """Truncates a single time series data frame to the specified length.
  58. This method truncates the time series data to the specified input
  59. chunk length, optionally skipping some initial data points. It raises
  60. a ValueError if the time series is too short.
  61. Args:
  62. ts: A single time series data frame to be truncated.
  63. Returns:
  64. Any: The truncated time series data frame.
  65. Raises:
  66. ValueError: If the time series length is less than the required
  67. minimum length (input chunk length plus any skip chunk length).
  68. """
  69. skip_len = self.size.get("skip_chunk_len", 0)
  70. if len(ts) < self.size["in_chunk_len"] + skip_len:
  71. raise ValueError(
  72. f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
  73. )
  74. ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
  75. return ts_data
  76. @benchmark.timeit
  77. class TSNormalize:
  78. """Normalizes time series data using a pre-fitted scaler.
  79. This class normalizes specified columns of time series data using a
  80. pre-fitted scaler loaded from a specified path. It supports normalization
  81. of both target and feature columns as specified in the parameters.
  82. """
  83. def __init__(self, scale_path: str, params_info: Dict[str, Any]):
  84. """Initializes the TSNormalize with a scaler and normalization parameters.
  85. Args:
  86. scale_path (str): Path to the pre-fitted scaler object file.
  87. params_info (Dict[str, Any]): Dictionary containing information
  88. about which columns to normalize, including 'target_cols'
  89. and 'feature_cols'.
  90. """
  91. super().__init__()
  92. self.scaler = joblib.load(scale_path)
  93. self.params_info = params_info
  94. def __call__(self, ts_list: List[pd.DataFrame]) -> List[pd.DataFrame]:
  95. """Applies normalization to a list of time series data frames.
  96. Args:
  97. ts_list (List[pd.DataFrame]): List of time series data frames to be normalized.
  98. Returns:
  99. List[pd.DataFrame]: List of normalized time series data frames.
  100. """
  101. return [self.tsnorm(ts) for ts in ts_list]
  102. def tsnorm(self, ts: pd.DataFrame) -> pd.DataFrame:
  103. """Normalizes specified columns of a single time series data frame.
  104. This method applies the scaler to normalize the specified target
  105. and feature columns of the time series.
  106. Args:
  107. ts (pd.DataFrame): A single time series data frame to be normalized.
  108. Returns:
  109. pd.DataFrame: The normalized time series data frame.
  110. """
  111. if self.params_info.get("target_cols", None) is not None:
  112. ts[self.params_info["target_cols"]] = self.scaler.transform(
  113. ts[self.params_info["target_cols"]]
  114. )
  115. if self.params_info.get("feature_cols", None) is not None:
  116. ts[self.params_info["feature_cols"]] = self.scaler.transform(
  117. ts[self.params_info["feature_cols"]]
  118. )
  119. return ts
  120. @benchmark.timeit
  121. class BuildTSDataset:
  122. """Constructs a time series dataset from a list of time series data frames."""
  123. def __init__(self, params_info: Dict[str, Any]):
  124. """Initializes the BuildTSDataset with parameters for dataset construction.
  125. Args:
  126. params_info (Dict[str, Any]): Dictionary containing parameters for
  127. constructing the time series dataset.
  128. """
  129. super().__init__()
  130. self.params_info = params_info
  131. def __call__(self, ts_list: List) -> List:
  132. """Applies the dataset construction to a list of time series.
  133. Args:
  134. ts_list (List): List of time series data frames.
  135. Returns:
  136. List: List of constructed time series datasets.
  137. """
  138. return [self.buildtsdata(ts) for ts in ts_list]
  139. def buildtsdata(self, ts) -> Any:
  140. """Builds a time series dataset from a single time series data frame.
  141. Args:
  142. ts: A single time series data frame.
  143. Returns:
  144. Any: A constructed time series dataset.
  145. """
  146. ts_data = load_from_dataframe(ts, **self.params_info)
  147. return ts_data
  148. @benchmark.timeit
  149. class TimeFeature:
  150. """Extracts time features from time series data for forecasting."""
  151. def __init__(
  152. self, params_info: Dict[str, Any], size: Dict[str, int], holiday: bool = False
  153. ):
  154. """Initializes the TimeFeature extractor.
  155. Args:
  156. params_info (Dict[str, Any]): Dictionary containing frequency information.
  157. size (Dict[str, int]): Dictionary containing the output chunk length.
  158. holiday (bool, optional): Whether to include holiday features. Defaults to False.
  159. """
  160. super().__init__()
  161. self.freq = params_info["freq"]
  162. self.size = size
  163. self.holiday = holiday
  164. def __call__(self, ts_list: List) -> List:
  165. """Applies time feature extraction to a list of time series.
  166. Args:
  167. ts_list (List): List of time series data frames.
  168. Returns:
  169. List: List of time series with extracted time features.
  170. """
  171. return [self.timefeat(ts) for ts in ts_list]
  172. def timefeat(self, ts: Dict[str, Any]) -> Any:
  173. """Extracts time features from a single time series data frame.
  174. Args:
  175. ts: A single time series data frame.
  176. Returns:
  177. Any: The time series with added time features.
  178. """
  179. if not self.holiday:
  180. ts = time_feature(
  181. ts,
  182. self.freq,
  183. ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
  184. self.size["out_chunk_len"],
  185. )
  186. else:
  187. ts = time_feature(
  188. ts,
  189. self.freq,
  190. [
  191. "minuteofhour",
  192. "hourofday",
  193. "dayofmonth",
  194. "dayofweek",
  195. "dayofyear",
  196. "monthofyear",
  197. "weekofyear",
  198. "holidays",
  199. ],
  200. self.size["out_chunk_len"],
  201. )
  202. return ts
  203. @benchmark.timeit
  204. class TStoArray:
  205. """Converts time series data into arrays for model input."""
  206. def __init__(self, input_data: Dict[str, Any]):
  207. """Initializes the TStoArray converter.
  208. Args:
  209. input_data (Dict[str, Any]): Dictionary specifying the input data format.
  210. """
  211. super().__init__()
  212. self.input_data = input_data
  213. def __call__(self, ts_list: List[Dict[str, Any]]) -> List[List[np.ndarray]]:
  214. """Converts a list of time series data frames into arrays.
  215. Args:
  216. ts_list (List[Dict[str, Any]]): List of time series data frames.
  217. Returns:
  218. List[List[np.ndarray]]: List of lists of arrays for each time series.
  219. """
  220. return [self.tstoarray(ts) for ts in ts_list]
  221. def tstoarray(self, ts: Dict[str, Any]) -> List[np.ndarray]:
  222. """Converts a single time series data frame into arrays.
  223. Args:
  224. ts (Dict[str, Any]): A single time series data frame.
  225. Returns:
  226. List[np.ndarray]: List of arrays representing the time series data.
  227. """
  228. ts_list = []
  229. input_name = list(self.input_data.keys())
  230. input_name.sort()
  231. for key in input_name:
  232. ts_list.append(np.array(ts[key]).astype("float32"))
  233. return ts_list
  234. @benchmark.timeit
  235. class TStoBatch:
  236. """Convert a list of time series into batches for processing.
  237. This class provides a method to convert a list of time series data into
  238. batches. Each time series in the list is assumed to be a sequence of
  239. equal-length arrays or DataFrames.
  240. """
  241. def __call__(self, ts_list: List[np.ndarray]) -> List[np.ndarray]:
  242. """Convert a list of time series into batches.
  243. This method stacks time series data along a new axis to create batches.
  244. It assumes that each time series in the list has the same length.
  245. Args:
  246. ts_list (List[np.ndarray]): A list of time series, where each
  247. time series is represented as a list or array of equal length.
  248. Returns:
  249. List[np.ndarray]: A list of batches, where each batch is a stacked
  250. array of time series data at the same index across all series.
  251. """
  252. n = len(ts_list[0])
  253. return [np.stack([ts[i] for ts in ts_list], axis=0) for i in range(n)]