processors.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import List, Dict, Any
  15. from pathlib import Path
  16. from copy import deepcopy
  17. import joblib
  18. import numpy as np
  19. import pandas as pd
  20. from .funcs import load_from_dataframe, time_feature
  21. __all__ = [
  22. "BuildTSDataset",
  23. "TSCutOff",
  24. "TSNormalize",
  25. "TimeFeature",
  26. "TStoArray",
  27. "TStoBatch",
  28. ]
  29. class TSCutOff:
  30. """Truncates time series data to a specified length for training.
  31. This class provides a method to truncate or cut off time series data
  32. to a specified input length, optionally skipping some initial data
  33. points. This is useful for preparing data for training models that
  34. require a fixed input size.
  35. """
  36. def __init__(self, size: Dict[str, int]):
  37. """Initializes the TSCutOff with size configurations.
  38. Args:
  39. size (Dict[str, int]): Dictionary containing size configurations,
  40. including 'in_chunk_len' for the input chunk length and
  41. optionally 'skip_chunk_len' for the number of initial data
  42. points to skip.
  43. """
  44. super().__init__()
  45. self.size = size
  46. def __call__(self, ts_list: List) -> List:
  47. """Applies the cut off operation to a list of time series.
  48. Args:
  49. ts_list (List): List of time series data frames to be truncated.
  50. Returns:
  51. List: List of truncated time series data frames.
  52. """
  53. return [self.cutoff(ts) for ts in ts_list]
  54. def cutoff(self, ts: Any) -> Any:
  55. """Truncates a single time series data frame to the specified length.
  56. This method truncates the time series data to the specified input
  57. chunk length, optionally skipping some initial data points. It raises
  58. a ValueError if the time series is too short.
  59. Args:
  60. ts: A single time series data frame to be truncated.
  61. Returns:
  62. Any: The truncated time series data frame.
  63. Raises:
  64. ValueError: If the time series length is less than the required
  65. minimum length (input chunk length plus any skip chunk length).
  66. """
  67. skip_len = self.size.get("skip_chunk_len", 0)
  68. if len(ts) < self.size["in_chunk_len"] + skip_len:
  69. raise ValueError(
  70. f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
  71. )
  72. ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
  73. return ts_data
  74. class TSNormalize:
  75. """Normalizes time series data using a pre-fitted scaler.
  76. This class normalizes specified columns of time series data using a
  77. pre-fitted scaler loaded from a specified path. It supports normalization
  78. of both target and feature columns as specified in the parameters.
  79. """
  80. def __init__(self, scale_path: str, params_info: Dict[str, Any]):
  81. """Initializes the TSNormalize with a scaler and normalization parameters.
  82. Args:
  83. scale_path (str): Path to the pre-fitted scaler object file.
  84. params_info (Dict[str, Any]): Dictionary containing information
  85. about which columns to normalize, including 'target_cols'
  86. and 'feature_cols'.
  87. """
  88. super().__init__()
  89. self.scaler = joblib.load(scale_path)
  90. self.params_info = params_info
  91. def __call__(self, ts_list: List[pd.DataFrame]) -> List[pd.DataFrame]:
  92. """Applies normalization to a list of time series data frames.
  93. Args:
  94. ts_list (List[pd.DataFrame]): List of time series data frames to be normalized.
  95. Returns:
  96. List[pd.DataFrame]: List of normalized time series data frames.
  97. """
  98. return [self.tsnorm(ts) for ts in ts_list]
  99. def tsnorm(self, ts: pd.DataFrame) -> pd.DataFrame:
  100. """Normalizes specified columns of a single time series data frame.
  101. This method applies the scaler to normalize the specified target
  102. and feature columns of the time series.
  103. Args:
  104. ts (pd.DataFrame): A single time series data frame to be normalized.
  105. Returns:
  106. pd.DataFrame: The normalized time series data frame.
  107. """
  108. if self.params_info.get("target_cols", None) is not None:
  109. ts[self.params_info["target_cols"]] = self.scaler.transform(
  110. ts[self.params_info["target_cols"]]
  111. )
  112. if self.params_info.get("feature_cols", None) is not None:
  113. ts[self.params_info["feature_cols"]] = self.scaler.transform(
  114. ts[self.params_info["feature_cols"]]
  115. )
  116. return ts
  117. class BuildTSDataset:
  118. """Constructs a time series dataset from a list of time series data frames."""
  119. def __init__(self, params_info: Dict[str, Any]):
  120. """Initializes the BuildTSDataset with parameters for dataset construction.
  121. Args:
  122. params_info (Dict[str, Any]): Dictionary containing parameters for
  123. constructing the time series dataset.
  124. """
  125. super().__init__()
  126. self.params_info = params_info
  127. def __call__(self, ts_list: List) -> List:
  128. """Applies the dataset construction to a list of time series.
  129. Args:
  130. ts_list (List): List of time series data frames.
  131. Returns:
  132. List: List of constructed time series datasets.
  133. """
  134. return [self.buildtsdata(ts) for ts in ts_list]
  135. def buildtsdata(self, ts) -> Any:
  136. """Builds a time series dataset from a single time series data frame.
  137. Args:
  138. ts: A single time series data frame.
  139. Returns:
  140. Any: A constructed time series dataset.
  141. """
  142. ts_data = load_from_dataframe(ts, **self.params_info)
  143. return ts_data
  144. class TimeFeature:
  145. """Extracts time features from time series data for forecasting."""
  146. def __init__(
  147. self, params_info: Dict[str, Any], size: Dict[str, int], holiday: bool = False
  148. ):
  149. """Initializes the TimeFeature extractor.
  150. Args:
  151. params_info (Dict[str, Any]): Dictionary containing frequency information.
  152. size (Dict[str, int]): Dictionary containing the output chunk length.
  153. holiday (bool, optional): Whether to include holiday features. Defaults to False.
  154. """
  155. super().__init__()
  156. self.freq = params_info["freq"]
  157. self.size = size
  158. self.holiday = holiday
  159. def __call__(self, ts_list: List) -> List:
  160. """Applies time feature extraction to a list of time series.
  161. Args:
  162. ts_list (List): List of time series data frames.
  163. Returns:
  164. List: List of time series with extracted time features.
  165. """
  166. return [self.timefeat(ts) for ts in ts_list]
  167. def timefeat(self, ts: Dict[str, Any]) -> Any:
  168. """Extracts time features from a single time series data frame.
  169. Args:
  170. ts: A single time series data frame.
  171. Returns:
  172. Any: The time series with added time features.
  173. """
  174. if not self.holiday:
  175. ts = time_feature(
  176. ts,
  177. self.freq,
  178. ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
  179. self.size["out_chunk_len"],
  180. )
  181. else:
  182. ts = time_feature(
  183. ts,
  184. self.freq,
  185. [
  186. "minuteofhour",
  187. "hourofday",
  188. "dayofmonth",
  189. "dayofweek",
  190. "dayofyear",
  191. "monthofyear",
  192. "weekofyear",
  193. "holidays",
  194. ],
  195. self.size["out_chunk_len"],
  196. )
  197. return ts
  198. class TStoArray:
  199. """Converts time series data into arrays for model input."""
  200. def __init__(self, input_data: Dict[str, Any]):
  201. """Initializes the TStoArray converter.
  202. Args:
  203. input_data (Dict[str, Any]): Dictionary specifying the input data format.
  204. """
  205. super().__init__()
  206. self.input_data = input_data
  207. def __call__(self, ts_list: List[Dict[str, Any]]) -> List[List[np.ndarray]]:
  208. """Converts a list of time series data frames into arrays.
  209. Args:
  210. ts_list (List[Dict[str, Any]]): List of time series data frames.
  211. Returns:
  212. List[List[np.ndarray]]: List of lists of arrays for each time series.
  213. """
  214. return [self.tstoarray(ts) for ts in ts_list]
  215. def tstoarray(self, ts: Dict[str, Any]) -> List[np.ndarray]:
  216. """Converts a single time series data frame into arrays.
  217. Args:
  218. ts (Dict[str, Any]): A single time series data frame.
  219. Returns:
  220. List[np.ndarray]: List of arrays representing the time series data.
  221. """
  222. ts_list = []
  223. input_name = list(self.input_data.keys())
  224. input_name.sort()
  225. for key in input_name:
  226. ts_list.append(np.array(ts[key]).astype("float32"))
  227. return ts_list
  228. class TStoBatch:
  229. """Convert a list of time series into batches for processing.
  230. This class provides a method to convert a list of time series data into
  231. batches. Each time series in the list is assumed to be a sequence of
  232. equal-length arrays or DataFrames.
  233. """
  234. def __call__(self, ts_list: List[np.ndarray]) -> List[np.ndarray]:
  235. """Convert a list of time series into batches.
  236. This method stacks time series data along a new axis to create batches.
  237. It assumes that each time series in the list has the same length.
  238. Args:
  239. ts_list (List[np.ndarray]): A list of time series, where each
  240. time series is represented as a list or array of equal length.
  241. Returns:
  242. List[np.ndarray]: A list of batches, where each batch is a stacked
  243. array of time series data at the same index across all series.
  244. """
  245. n = len(ts_list[0])
  246. return [np.stack([ts[i] for ts in ts_list], axis=0) for i in range(n)]