| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- # Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from typing import Any, Dict, List
- import numpy as np
- import pandas as pd
- from .....utils.deps import class_requires_deps, is_dep_available
- from ....utils.benchmark import benchmark
- from .funcs import load_from_dataframe, time_feature
- if is_dep_available("joblib"):
- import joblib
- __all__ = [
- "BuildTSDataset",
- "TSCutOff",
- "TSNormalize",
- "TimeFeature",
- "TStoArray",
- "TStoBatch",
- ]
- @benchmark.timeit
- class TSCutOff:
- """Truncates time series data to a specified length for training.
- This class provides a method to truncate or cut off time series data
- to a specified input length, optionally skipping some initial data
- points. This is useful for preparing data for training models that
- require a fixed input size.
- """
- def __init__(self, size: Dict[str, int]):
- """Initializes the TSCutOff with size configurations.
- Args:
- size (Dict[str, int]): Dictionary containing size configurations,
- including 'in_chunk_len' for the input chunk length and
- optionally 'skip_chunk_len' for the number of initial data
- points to skip.
- """
- super().__init__()
- self.size = size
- def __call__(self, ts_list: List) -> List:
- """Applies the cut off operation to a list of time series.
- Args:
- ts_list (List): List of time series data frames to be truncated.
- Returns:
- List: List of truncated time series data frames.
- """
- return [self.cutoff(ts) for ts in ts_list]
- def cutoff(self, ts: Any) -> Any:
- """Truncates a single time series data frame to the specified length.
- This method truncates the time series data to the specified input
- chunk length, optionally skipping some initial data points. It raises
- a ValueError if the time series is too short.
- Args:
- ts: A single time series data frame to be truncated.
- Returns:
- Any: The truncated time series data frame.
- Raises:
- ValueError: If the time series length is less than the required
- minimum length (input chunk length plus any skip chunk length).
- """
- skip_len = self.size.get("skip_chunk_len", 0)
- if len(ts) < self.size["in_chunk_len"] + skip_len:
- raise ValueError(
- f"The length of the input data is {len(ts)}, but it should be at least {self.size['in_chunk_len'] + self.size['skip_chunk_len']} for training."
- )
- ts_data = ts[-(self.size["in_chunk_len"] + skip_len) :]
- return ts_data
- @benchmark.timeit
- @class_requires_deps("joblib")
- class TSNormalize:
- """Normalizes time series data using a pre-fitted scaler.
- This class normalizes specified columns of time series data using a
- pre-fitted scaler loaded from a specified path. It supports normalization
- of both target and feature columns as specified in the parameters.
- """
- def __init__(self, scale_path: str, params_info: Dict[str, Any]):
- """Initializes the TSNormalize with a scaler and normalization parameters.
- Args:
- scale_path (str): Path to the pre-fitted scaler object file.
- params_info (Dict[str, Any]): Dictionary containing information
- about which columns to normalize, including 'target_cols'
- and 'feature_cols'.
- """
- super().__init__()
- self.scaler = joblib.load(scale_path)
- self.params_info = params_info
- def __call__(self, ts_list: List[pd.DataFrame]) -> List[pd.DataFrame]:
- """Applies normalization to a list of time series data frames.
- Args:
- ts_list (List[pd.DataFrame]): List of time series data frames to be normalized.
- Returns:
- List[pd.DataFrame]: List of normalized time series data frames.
- """
- return [self.tsnorm(ts) for ts in ts_list]
- def tsnorm(self, ts: pd.DataFrame) -> pd.DataFrame:
- """Normalizes specified columns of a single time series data frame.
- This method applies the scaler to normalize the specified target
- and feature columns of the time series.
- Args:
- ts (pd.DataFrame): A single time series data frame to be normalized.
- Returns:
- pd.DataFrame: The normalized time series data frame.
- """
- if self.params_info.get("target_cols", None) is not None:
- ts[self.params_info["target_cols"]] = self.scaler.transform(
- ts[self.params_info["target_cols"]]
- )
- if self.params_info.get("feature_cols", None) is not None:
- ts[self.params_info["feature_cols"]] = self.scaler.transform(
- ts[self.params_info["feature_cols"]]
- )
- return ts
- @benchmark.timeit
- class BuildTSDataset:
- """Constructs a time series dataset from a list of time series data frames."""
- def __init__(self, params_info: Dict[str, Any]):
- """Initializes the BuildTSDataset with parameters for dataset construction.
- Args:
- params_info (Dict[str, Any]): Dictionary containing parameters for
- constructing the time series dataset.
- """
- super().__init__()
- self.params_info = params_info
- def __call__(self, ts_list: List) -> List:
- """Applies the dataset construction to a list of time series.
- Args:
- ts_list (List): List of time series data frames.
- Returns:
- List: List of constructed time series datasets.
- """
- return [self.buildtsdata(ts) for ts in ts_list]
- def buildtsdata(self, ts) -> Any:
- """Builds a time series dataset from a single time series data frame.
- Args:
- ts: A single time series data frame.
- Returns:
- Any: A constructed time series dataset.
- """
- ts_data = load_from_dataframe(ts, **self.params_info)
- return ts_data
- @benchmark.timeit
- class TimeFeature:
- """Extracts time features from time series data for forecasting."""
- def __init__(
- self, params_info: Dict[str, Any], size: Dict[str, int], holiday: bool = False
- ):
- """Initializes the TimeFeature extractor.
- Args:
- params_info (Dict[str, Any]): Dictionary containing frequency information.
- size (Dict[str, int]): Dictionary containing the output chunk length.
- holiday (bool, optional): Whether to include holiday features. Defaults to False.
- """
- super().__init__()
- self.freq = params_info["freq"]
- self.size = size
- self.holiday = holiday
- def __call__(self, ts_list: List) -> List:
- """Applies time feature extraction to a list of time series.
- Args:
- ts_list (List): List of time series data frames.
- Returns:
- List: List of time series with extracted time features.
- """
- return [self.timefeat(ts) for ts in ts_list]
- def timefeat(self, ts: Dict[str, Any]) -> Any:
- """Extracts time features from a single time series data frame.
- Args:
- ts: A single time series data frame.
- Returns:
- Any: The time series with added time features.
- """
- if not self.holiday:
- ts = time_feature(
- ts,
- self.freq,
- ["hourofday", "dayofmonth", "dayofweek", "dayofyear"],
- self.size["out_chunk_len"],
- )
- else:
- ts = time_feature(
- ts,
- self.freq,
- [
- "minuteofhour",
- "hourofday",
- "dayofmonth",
- "dayofweek",
- "dayofyear",
- "monthofyear",
- "weekofyear",
- "holidays",
- ],
- self.size["out_chunk_len"],
- )
- return ts
- @benchmark.timeit
- class TStoArray:
- """Converts time series data into arrays for model input."""
- def __init__(self, input_data: Dict[str, Any]):
- """Initializes the TStoArray converter.
- Args:
- input_data (Dict[str, Any]): Dictionary specifying the input data format.
- """
- super().__init__()
- self.input_data = input_data
- def __call__(self, ts_list: List[Dict[str, Any]]) -> List[List[np.ndarray]]:
- """Converts a list of time series data frames into arrays.
- Args:
- ts_list (List[Dict[str, Any]]): List of time series data frames.
- Returns:
- List[List[np.ndarray]]: List of lists of arrays for each time series.
- """
- return [self.tstoarray(ts) for ts in ts_list]
- def tstoarray(self, ts: Dict[str, Any]) -> List[np.ndarray]:
- """Converts a single time series data frame into arrays.
- Args:
- ts (Dict[str, Any]): A single time series data frame.
- Returns:
- List[np.ndarray]: List of arrays representing the time series data.
- """
- ts_list = []
- input_name = list(self.input_data.keys())
- input_name.sort()
- for key in input_name:
- ts_list.append(np.array(ts[key]).astype("float32"))
- return ts_list
- @benchmark.timeit
- class TStoBatch:
- """Convert a list of time series into batches for processing.
- This class provides a method to convert a list of time series data into
- batches. Each time series in the list is assumed to be a sequence of
- equal-length arrays or DataFrames.
- """
- def __call__(self, ts_list: List[np.ndarray]) -> List[np.ndarray]:
- """Convert a list of time series into batches.
- This method stacks time series data along a new axis to create batches.
- It assumes that each time series in the list has the same length.
- Args:
- ts_list (List[np.ndarray]): A list of time series, where each
- time series is represented as a list or array of equal length.
- Returns:
- List[np.ndarray]: A list of batches, where each batch is a stacked
- array of time series data at the same index across all series.
- """
- n = len(ts_list[0])
- return [np.stack([ts[i] for ts in ts_list], axis=0) for i in range(n)]
|